❄️Получение данных в многопотоке из файлов или БД без блокировки.

bvbfor

Client
Регистрация
10.04.2016
Сообщения
384
Благодарностей
212
Баллы
43
Получение данных в многопотоке из файлов или БД без блокировки.

Иногда спрашивают, и я не видел этого решения на форуме или может не искал как следует.

Если нужно построчно брать какие либо данные из файла или БД, например логин и пароль от каких либо аккаунтов , то при многопотоке приходится делать блокировку файла, что существенно замедляет работу, особенно если записей много и потоков сотни и более.

Можно использовать вариант загрузки записей в память и раздавать их при обращении по какому либо порту. Это называется TCP сервер, если не ошибаюсь.
В этом случае нет обращения потоков к диску и не нужно блокировка. Соответственно увеличивается срок службы жесткого диска и значительно увеличивается скорость получения данных.

Допустим у нас есть файл с аккаунтами mail:pass c количеством строк около миллиона.
128597


Посмотрим как с этим работает шаблон со стандартным способом получения данных из файла и шаблон с получением данных из памяти по порту.


Как видно из видео разница по скорости колоссальная.
Ну и о реализации:
Полный код сервера на c#:
using System;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using System.Linq;

class MyServer
{
    private static string[] records;
    private static int currentIndex = 0;
    private const string END_MESSAGE = "ВСЕ_ЗАПИСИ_ВЫДАНЫ";
    private static System.Timers.Timer saveTimer;
    private const string SAVE_FILE_NAME = "last.txt";
    private const string IMAP_FILE_NAME = "accounts.txt";


    static async Task Main()
    {
        // Загрузка записей из файла
        LoadRecords(IMAP_FILE_NAME);

        // Настройка таймера для сохранения
        SetupSaveTimer();

        // Настройка сервера
        int port = 8888;
        TcpListener server = new TcpListener(IPAddress.Any, port);
        server.Start();
        Console.WriteLine($"Сервер запущен на порту {port}");

        while (true)
        {
            TcpClient client = await server.AcceptTcpClientAsync(); // ждём запрос
            _ = HandleClientAsync(client); // асинхронная обработка запроса
        }
    }

    static void LoadRecords(string filename)
    {
        records = File.ReadAllLines(filename); 
        Console.WriteLine($"Загружено {records.Length} записей");
    }


    static void SetupSaveTimer()
    {
        saveTimer = new System.Timers.Timer(5 * 60 * 1000); // 5 минут
        saveTimer.Elapsed += SaveRemainingRecords;
        saveTimer.AutoReset = true;
        saveTimer.Enabled = true;
    }

    static void SaveRemainingRecords(object sender, ElapsedEventArgs e)
    {
        int remainingCount = records.Length - currentIndex;
        if (remainingCount > 0)
        {
            string[] remainingRecords = new string[remainingCount];
            Array.Copy(records, currentIndex, remainingRecords, 0, remainingCount);
            File.WriteAllLines(SAVE_FILE_NAME, remainingRecords);
            Console.WriteLine($"Сохранено {remainingCount} записей в файл {SAVE_FILE_NAME}");
        }
        else
        {
            File.WriteAllText(SAVE_FILE_NAME, END_MESSAGE);
            Console.WriteLine($"Все записи выданы. Сохранено сообщение о конце в файл {SAVE_FILE_NAME}");
        }
    }

    static async Task HandleClientAsync(TcpClient client) // обработка запроса клиента в асинхронном режиме
    {
        using (NetworkStream stream = client.GetStream())
        {
            string record = await GetNextRecordAsync();
            byte[] data = Encoding.UTF8.GetBytes(record);
            await stream.WriteAsync(data, 0, data.Length);
        }
        client.Close();
    }

    static async Task<string> GetNextRecordAsync()
    {
        int index = Interlocked.Increment(ref currentIndex) - 1;
        if (index < records.Length)
        {
            return records[index];
        }
        else
        {
            // Если записи закончились, формируем файл заново     
            LoadRecords(IMAP_FILE_NAME);
            currentIndex = 0;
            return await GetNextRecordAsync(); // вызов для получения первой записи из нового набора
        }
    }
}

Сервер считывает данные из файла и больше к нему не обращается, пока не раздаст все записи.
Когда раздаст все записи, снова считывает возможно уже обновленные записи из файла.
И продолжает раздавать
Main:
static async Task Main()
{
    // Загрузка записей из файла
    LoadRecords(IMAP_FILE_NAME);

    // Настройка таймера для сохранения
    SetupSaveTimer();

    // Настройка сервера
    int port = 8888;
    TcpListener server = new TcpListener(IPAddress.Any, port);
    server.Start();
    Console.WriteLine($"Сервер запущен на порту {port}");

    while (true)
    {
        TcpClient client = await server.AcceptTcpClientAsync(); // ждём запрос
        _ = HandleClientAsync(client); // асинхронная обработка запроса
    }
}
загружаем записи из файла в массив records
LoadRecords:
static void LoadRecords(string filename)
{
    records = File.ReadAllLines(filename); 
    Console.WriteLine($"Загружено {records.Length} записей");
}
настраиваем таймер для сохранения невыданных записей. В случае если записей миллионы, при аварийном выключении компьютера можно начать с файла last.txt.
SetupSaveTimer:
static void SetupSaveTimer()
{
    saveTimer = new System.Timers.Timer(5 * 60 * 1000); // 5 минут
    saveTimer.Elapsed += SaveRemainingRecords;
    saveTimer.AutoReset = true;
    saveTimer.Enabled = true;
}

static void SaveRemainingRecords(object sender, ElapsedEventArgs e)
{
    int remainingCount = records.Length - currentIndex;
    if (remainingCount > 0)
    {
        string[] remainingRecords = new string[remainingCount];
        Array.Copy(records, currentIndex, remainingRecords, 0, remainingCount);
        File.WriteAllLines(SAVE_FILE_NAME, remainingRecords);
        Console.WriteLine($"Сохранено {remainingCount} записей в файл {SAVE_FILE_NAME}");
    }
    else
    {
        File.WriteAllText(SAVE_FILE_NAME, END_MESSAGE);
        Console.WriteLine($"Все записи выданы. Сохранено сообщение о конце в файл {SAVE_FILE_NAME}");
    }
}
Запускаем сервер на порту 8888 (можно взять любой свободный порт)
Запускаем сервер на порту 8888:
// Настройка сервера
    int port = 8888;
    TcpListener server = new TcpListener(IPAddress.Any, port);
    server.Start();
    Console.WriteLine($"Сервер запущен на порту {port}");
Ждем запросы и выдаем строки массива
ждем запросы и выдаем строки массива:
static async Task HandleClientAsync(TcpClient client) // обработка запроса клиента в асинхронном режиме
{
    using (NetworkStream stream = client.GetStream())
    {
        string record = await GetNextRecordAsync();
        byte[] data = Encoding.UTF8.GetBytes(record);
        await stream.WriteAsync(data, 0, data.Length);
    }
    client.Close();
}

static async Task<string> GetNextRecordAsync()
{
    int index = Interlocked.Increment(ref currentIndex) - 1;
    if (index < records.Length)
    {
        return records[index];
    }
    else
    {
        // Если записи закончились, формируем файл заново     
        LoadRecords(IMAP_FILE_NAME);
        currentIndex = 0;
        return await GetNextRecordAsync(); // вызов для получения первой записи из нового набора
    }
}
Код запроса и получения записей, шаблон work_from_memory.zp
Код запроса и получения записей, шаблон work_from_memory.zp:
string s = "";
using (TcpClient client = new TcpClient("localhost", 8888))
    using (NetworkStream stream = client.GetStream())
    {
        byte[] data = new byte[1024];
        int bytesRead = stream.Read(data, 0, data.Length);
        s = Encoding.UTF8.GetString(data, 0, bytesRead);
    }
Стандартное подключение к TCP серверу по порту 8888 и потоковое получение записи.

Нужная запись в переменной s. Дальше можно делать с ней что угодно.

Если записи в базе данных или ещё где то, можно периодически считывать любым запросом нужные данные и сохранять в файл. Сделать такой шаблон и запускать по расписанию.

К серверу можно обращаться не только из потоков 1 шаблона, но и из любого количества шаблонов.

Также можно запустить сервер и открыть порт наружу и к нему можно обращаться с любых компьютеров из любых программ.
Например у вас несколько зеннопостеров работают на нескольких компьютерах(VDS), они все могут получать данные из одного файла, который может периодически обновляться из нужных источников.

Вроде всё, буду рад, если кто не знал такой способ и воспользуется.

Всем удачи! С наступающим Новым годом! Не забываем про лайки!
 

Вложения

Последнее редактирование:

bvbfor

Client
Регистрация
10.04.2016
Сообщения
384
Благодарностей
212
Баллы
43
Как пример - практическое применение для шаблона из статьи о генерации прокси из ВПН

В коде сервера указываем файл Good_Proxy.txt
Запускаем сервер.

В шаблоне Test_Job.zp вместо этого кубика
128904


используем кубик c# с кодом
Берем прокси с сервера:
using (TcpClient client = new TcpClient("localhost", 8888))
    using (NetworkStream stream = client.GetStream())
    {
        byte[] data = new byte[1024];
        int bytesRead = stream.Read(data, 0, data.Length);
        project.Variables["VPN_Proxy"].Value = Encoding.UTF8.GetString(data, 0, bytesRead);
    }
project.SendInfoToLog("Взяли прокси - " + project.Variables["VPN_Proxy"].Value);
Прокси будут выдаваться по кругу.

И по ветке выхода по недоступности прокси можно удалять прокси из файла, и такие прокси больше не будут выдаваться.
Удаление невалидного прокси из файла:
string filePath = "Путь к файлу Good_Proxy.txt";
lock(CommonCode.SyncObject)
{
   var lines = File.ReadAllLines(filePath).Where(line =>
      !line.Contains(project.Variables["VPN_Proxy"].Value)).ToList();
   File.WriteAllLines(filePath , lines);
}
project.SendInfoToLog("Осталось прокси в файле - " + lines.Count.ToString());
 
Последнее редактирование:

bvbfor

Client
Регистрация
10.04.2016
Сообщения
384
Благодарностей
212
Баллы
43
И если на компьютере, где запущен TCP сервер, открыть порт, можно получать VPN прокси с любого компьютера и поднимать на нём локальный прокси.
 
  • Спасибо
Реакции: Greenya

capvad91

Client
Регистрация
27.03.2019
Сообщения
44
Благодарностей
47
Баллы
18
И если на компьютере, где запущен TCP сервер, открыть порт, можно получать VPN прокси с любого компьютера и поднимать на нём локальный прокси.
Класс! Обязательно попробую.
Для меня так эти 2 статьи однозначно лучшие!
 

Кто просматривает тему: (Всего: 1, Пользователи: 0, Гости: 1)