Многопоточная работа с базой данных

  • Автор темы Автор темы lokus
  • Дата начала Дата начала

lokus

Client
Регистрация
10.08.2017
Сообщения
31
Реакции
1
Баллы
8
Есть большая таблица в БД MySQL с исходными данными и есть шаблон который берет одну запись из этой таблицы, обрабатывает ее и сохраняет обработанные данные в ту же таблицу. Вопрос в том как сделать так чтобы в многопотоке шаблоны брали каждый свою запись и не мешали друг другу?

Пробовал вот такие варианты:
1. Через транзакции с "select for update"

C#:
Развернуть Свернуть Копировать
var db = project.Context["db"];

List<string> row = db.getRow("start transaction;SELECT id, data FROM `table` WHERE `result` is NULL  limit 1 for update;");

if ( row.Count > 0 ){
    string free_id = row[0];
    project.Variables["id"].Value = row[0];
    project.Variables["data"].Value = row[1];
    
    project.SendInfoToLog("Есть задание id: "+free_id, true);
    
    db.query("update table set result = 'zenno' where id="+free_id+" ; commit;");
}
else {
    db.query("commit;");
    project.Variables["id"].Value = "-1";
}

Работает только если потоков немного, при увеличении количества потоков таблица постоянно заблокирована

2. Обычный select и список с последними id взятыми в работу
109162

получаем необработанную строку из БД
проверяем есть ли в списке такая строка
если есть снова берем необработанную строку из БД
если нет то добавляем строку в начало списка, удаляем из списка последнюю, помечаем в БД что строка взята в работу, обрабатываем

В общем работает, но за время пока один поток проверяет список еще несколько потоков успевают взять в обработку ту же строку и несколько потоков обрабатывают одно и то же

3. "Lock" файл

Перед выполнением запроса шаблон проверяет наличие определенного файла, если его нет то создает такой файл и выполняет запрос, после выполнения удаляет. Если файл есть то ждет и проверяет снова

Все работает но очень медленно и увеличение количества потоков ничего не дает так как одновременно все равно работает только один


Какие есть еще способы? ну или как докрутить один из этих?
 
В таблице добавить поле с названием например "status"
0 - стандартное значение, готово к работе
1 - строка "занята" в работе на данный момент
 
  • Спасибо
Реакции: Mikhail B.
Там сейчас по сути так и есть, поле "result"
NULL - стандартное значение, готово к работе
"zenno" - строка "занята" в работе на данный момент
"результат обработки" - строка обработана

только это не помогает никак в многопоточной работе...
 
Со статусом все нормально должно работать, значит реализация отстой. Если уж прям поизвращаться, то делается отдельный метод на изменение статуса записи в БД (с локом) при этом с метода возвращаете айди этой записи. В итоге что получается, поток находит запись с статусом free, тут же в локе меняет ее статус на busy ( это происходит очень быстро, поэтому на скорость особо повлиять лок не должен) и с метода возвращает вам айди записи которую он только-что сделал busy, и вы по ид с этой записью уже спокойно работаете и меняете там данные которые надо.
 
спец столбец в таблице, по умолчанию null. В зенно генерируешь случайную строку, например guid. Делаешь апдейт 1й строки у которой этот столбец null, меняя его на сгенерированный id. Всё. Эту строку никто не возьмёт кроме потока, который сгенерировал этот guid
 
  • Спасибо
Реакции: lokus
Спасибо, кажется мысль понял. Схватить одну и ту же запись все равно успеют несколько потоков но обрабатывать будет только один, чей guid записался последним. Попробую
 
Спасибо, кажется мысль понял. Схватить одну и ту же запись все равно успеют несколько потоков но обрабатывать будет только один, чей guid записался последним. Попробую
Не будет брать несколько записей, если лочить таблицу , выбранным строкам менять статус и потом UNLOCK TABLES. В данный момент работает шаблон, в 50 потоков, такой проблемы нету. Бывают случаи когда, вырубается сервак или интернет. В таком случае некоторые ячейки не поменяют статус обратно, но для этого в БД есть процедура, которая возвращает аккаунты в работу, если они со статусом busy и не обновляли его в течении 5 минут(максимальное время выполнения).
 
У меня таблица очень большая, одна запись берется несколько секунд скорее всего в этом проблема, на небольшой мой первый метод с транзакциями и select for update отлично работает, он лочит не всю таблицу а только одну запись, а на такой таблице постоянно дедлок ловлю и вообще все стоит, ну и потоков мне нужно несколько сотен
 
Спасибо, кажется мысль понял. Схватить одну и ту же запись все равно успеют несколько потоков но обрабатывать будет только один, чей guid записался последним. Попробую
не будут, если хватать по guid
 
тогда получается мысль я не понял... чтобы схватить по guid этот самый guid нужно записать, а чтобы его записать нужно сначала взять необработанную запись, тут то проблема и вылезет... разве что отдельный шаблон который будет генерировать guidы и записывать их по очереди в необработанные записи, а шаблоны "обработчики" в многопотоке будут брать записи каждый по своему guidу, только тогда не совсем понятно каким образом шаблоны "обработчики" узнают где чей?
 
BEGIN
declare hash2 bigint default UUID_SHORT();
START TRANSACTION;
update meta.proxy set work_status = 1, hash = hash2 where id IN (select id from meta.proxy where work_status = 0 and time < ADDTIME(now(), -400) for update skip locked) limit limit;
COMMIT;
select id, type, ip, port, login, password from meta.proxy where hash = hash2;
END

Вот примерно так.
 
  • Спасибо
Реакции: lokus
Огромное спасибо за "skip locked" всего то это нужно было добавить в мой первый способ и все заработало, даже не стал заморачиваться с hash и UUID

В общем, если кому-то понадобится то же самое, вот рабочий способ на стандартном кубике "БД" для mysql:
SQL:
Развернуть Свернуть Копировать
start transaction;
set @id=-1;
set @data='';
SELECT id, data FROM `table` WHERE `result` is NULL limit 1 into @id, @data  for update skip locked;
update `table` set result = 'processing' where id=@id;
select @id, @data;
commit;

MySql должен работать на движке InnoDB, уровень изоляции транзакций нужно поменять на "REPEATABLE READ" или "SERIALIZABLE", на столбцах который используете в ваших запросах обязательно должны быть индексы
 
Последнее редактирование:
  • Спасибо
Реакции: rol
тогда получается мысль я не понял... чтобы схватить по guid этот самый guid нужно записать, а чтобы его записать нужно сначала взять необработанную запись, тут то проблема и вылезет... разве что отдельный шаблон который будет генерировать guidы и записывать их по очереди в необработанные записи, а шаблоны "обработчики" в многопотоке будут брать записи каждый по своему guidу, только тогда не совсем понятно каким образом шаблоны "обработчики" узнают где чей?
апдейт разных потоков не схватит одну и ту же запись
 
апдейт разных потоков не схватит одну и ту же запись
У меня схожая история, но пока в процессе реализации. Есть 100 строк в таблице "логин - пароль", нужно чтоб каждый поток брал строку и записывал статус "в работе" и так до конца, но после обработки всех строк в планах чтобы все статусы обнулялись и по расписанию в ZP всё запускалось с начала. В этом случае не нужно никаких локов выставлять? (На счет Логинов и паролей - создавать отдельные профиль папки и профили не вариант, нужно именно заново логиниться)
 
У меня схожая история, но пока в процессе реализации. Есть 100 строк в таблице "логин - пароль", нужно чтоб каждый поток брал строку и записывал статус "в работе" и так до конца, но после обработки всех строк в планах чтобы все статусы обнулялись и по расписанию в ZP всё запускалось с начала. В этом случае не нужно никаких локов выставлять? (На счет Логинов и паролей - создавать отдельные профиль папки и профили не вариант, нужно именно заново логиниться)
ну тебя проблема другая. Как-то дать понять всем потокам, что пока работа всё. И как-то понять, что вот вот последняя запись в обработке и надо базу всю обновить
 
  • Спасибо
Реакции: Vadim3851

Кто просматривает тему: (Всего: 0, Пользователи: 0, Гости: 0)