- Регистрация
- 10.04.2016
- Сообщения
- 384
- Благодарностей
- 212
- Баллы
- 43
Получение данных в многопотоке из файлов или БД без блокировки.
Иногда спрашивают, и я не видел этого решения на форуме или может не искал как следует.
Если нужно построчно брать какие либо данные из файла или БД, например логин и пароль от каких либо аккаунтов , то при многопотоке приходится делать блокировку файла, что существенно замедляет работу, особенно если записей много и потоков сотни и более.
Можно использовать вариант загрузки записей в память и раздавать их при обращении по какому либо порту. Это называется TCP сервер, если не ошибаюсь.
В этом случае нет обращения потоков к диску и не нужно блокировка. Соответственно увеличивается срок службы жесткого диска и значительно увеличивается скорость получения данных.
Допустим у нас есть файл с аккаунтами mail:pass c количеством строк около миллиона.
Посмотрим как с этим работает шаблон со стандартным способом получения данных из файла и шаблон с получением данных из памяти по порту.
Как видно из видео разница по скорости колоссальная.
Ну и о реализации:
Полный код сервера на c#:
using System;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using System.Linq;
class MyServer
{
private static string[] records;
private static int currentIndex = 0;
private const string END_MESSAGE = "ВСЕ_ЗАПИСИ_ВЫДАНЫ";
private static System.Timers.Timer saveTimer;
private const string SAVE_FILE_NAME = "last.txt";
private const string IMAP_FILE_NAME = "accounts.txt";
static async Task Main()
{
// Загрузка записей из файла
LoadRecords(IMAP_FILE_NAME);
// Настройка таймера для сохранения
SetupSaveTimer();
// Настройка сервера
int port = 8888;
TcpListener server = new TcpListener(IPAddress.Any, port);
server.Start();
Console.WriteLine($"Сервер запущен на порту {port}");
while (true)
{
TcpClient client = await server.AcceptTcpClientAsync(); // ждём запрос
_ = HandleClientAsync(client); // асинхронная обработка запроса
}
}
static void LoadRecords(string filename)
{
records = File.ReadAllLines(filename);
Console.WriteLine($"Загружено {records.Length} записей");
}
static void SetupSaveTimer()
{
saveTimer = new System.Timers.Timer(5 * 60 * 1000); // 5 минут
saveTimer.Elapsed += SaveRemainingRecords;
saveTimer.AutoReset = true;
saveTimer.Enabled = true;
}
static void SaveRemainingRecords(object sender, ElapsedEventArgs e)
{
int remainingCount = records.Length - currentIndex;
if (remainingCount > 0)
{
string[] remainingRecords = new string[remainingCount];
Array.Copy(records, currentIndex, remainingRecords, 0, remainingCount);
File.WriteAllLines(SAVE_FILE_NAME, remainingRecords);
Console.WriteLine($"Сохранено {remainingCount} записей в файл {SAVE_FILE_NAME}");
}
else
{
File.WriteAllText(SAVE_FILE_NAME, END_MESSAGE);
Console.WriteLine($"Все записи выданы. Сохранено сообщение о конце в файл {SAVE_FILE_NAME}");
}
}
static async Task HandleClientAsync(TcpClient client) // обработка запроса клиента в асинхронном режиме
{
using (NetworkStream stream = client.GetStream())
{
string record = await GetNextRecordAsync();
byte[] data = Encoding.UTF8.GetBytes(record);
await stream.WriteAsync(data, 0, data.Length);
}
client.Close();
}
static async Task<string> GetNextRecordAsync()
{
int index = Interlocked.Increment(ref currentIndex) - 1;
if (index < records.Length)
{
return records[index];
}
else
{
// Если записи закончились, формируем файл заново
LoadRecords(IMAP_FILE_NAME);
currentIndex = 0;
return await GetNextRecordAsync(); // вызов для получения первой записи из нового набора
}
}
}
Сервер считывает данные из файла и больше к нему не обращается, пока не раздаст все записи.
Когда раздаст все записи, снова считывает возможно уже обновленные записи из файла.
И продолжает раздавать
Main:
static async Task Main()
{
// Загрузка записей из файла
LoadRecords(IMAP_FILE_NAME);
// Настройка таймера для сохранения
SetupSaveTimer();
// Настройка сервера
int port = 8888;
TcpListener server = new TcpListener(IPAddress.Any, port);
server.Start();
Console.WriteLine($"Сервер запущен на порту {port}");
while (true)
{
TcpClient client = await server.AcceptTcpClientAsync(); // ждём запрос
_ = HandleClientAsync(client); // асинхронная обработка запроса
}
}
LoadRecords:
static void LoadRecords(string filename)
{
records = File.ReadAllLines(filename);
Console.WriteLine($"Загружено {records.Length} записей");
}
SetupSaveTimer:
static void SetupSaveTimer()
{
saveTimer = new System.Timers.Timer(5 * 60 * 1000); // 5 минут
saveTimer.Elapsed += SaveRemainingRecords;
saveTimer.AutoReset = true;
saveTimer.Enabled = true;
}
static void SaveRemainingRecords(object sender, ElapsedEventArgs e)
{
int remainingCount = records.Length - currentIndex;
if (remainingCount > 0)
{
string[] remainingRecords = new string[remainingCount];
Array.Copy(records, currentIndex, remainingRecords, 0, remainingCount);
File.WriteAllLines(SAVE_FILE_NAME, remainingRecords);
Console.WriteLine($"Сохранено {remainingCount} записей в файл {SAVE_FILE_NAME}");
}
else
{
File.WriteAllText(SAVE_FILE_NAME, END_MESSAGE);
Console.WriteLine($"Все записи выданы. Сохранено сообщение о конце в файл {SAVE_FILE_NAME}");
}
}
Запускаем сервер на порту 8888:
// Настройка сервера
int port = 8888;
TcpListener server = new TcpListener(IPAddress.Any, port);
server.Start();
Console.WriteLine($"Сервер запущен на порту {port}");
ждем запросы и выдаем строки массива:
static async Task HandleClientAsync(TcpClient client) // обработка запроса клиента в асинхронном режиме
{
using (NetworkStream stream = client.GetStream())
{
string record = await GetNextRecordAsync();
byte[] data = Encoding.UTF8.GetBytes(record);
await stream.WriteAsync(data, 0, data.Length);
}
client.Close();
}
static async Task<string> GetNextRecordAsync()
{
int index = Interlocked.Increment(ref currentIndex) - 1;
if (index < records.Length)
{
return records[index];
}
else
{
// Если записи закончились, формируем файл заново
LoadRecords(IMAP_FILE_NAME);
currentIndex = 0;
return await GetNextRecordAsync(); // вызов для получения первой записи из нового набора
}
}
Код запроса и получения записей, шаблон work_from_memory.zp:
string s = "";
using (TcpClient client = new TcpClient("localhost", 8888))
using (NetworkStream stream = client.GetStream())
{
byte[] data = new byte[1024];
int bytesRead = stream.Read(data, 0, data.Length);
s = Encoding.UTF8.GetString(data, 0, bytesRead);
}
Нужная запись в переменной s. Дальше можно делать с ней что угодно.
Если записи в базе данных или ещё где то, можно периодически считывать любым запросом нужные данные и сохранять в файл. Сделать такой шаблон и запускать по расписанию.
К серверу можно обращаться не только из потоков 1 шаблона, но и из любого количества шаблонов.
Также можно запустить сервер и открыть порт наружу и к нему можно обращаться с любых компьютеров из любых программ.
Например у вас несколько зеннопостеров работают на нескольких компьютерах(VDS), они все могут получать данные из одного файла, который может периодически обновляться из нужных источников.
Вроде всё, буду рад, если кто не знал такой способ и воспользуется.
Всем удачи! С наступающим Новым годом! Не забываем про лайки!
Вложения
-
51,7 МБ Просмотры: 13
-
161,8 КБ Просмотры: 12
Последнее редактирование: