❄️Получение данных в многопотоке из файлов или БД без блокировки.

bvbfor

Client
Joined
Apr 10, 2016
Messages
425
Reaction score
235
Points
43
Получение данных в многопотоке из файлов или БД без блокировки.

Иногда спрашивают, и я не видел этого решения на форуме или может не искал как следует.

Если нужно построчно брать какие либо данные из файла или БД, например логин и пароль от каких либо аккаунтов , то при многопотоке приходится делать блокировку файла, что существенно замедляет работу, особенно если записей много и потоков сотни и более.

Можно использовать вариант загрузки записей в память и раздавать их при обращении по какому либо порту. Это называется TCP сервер, если не ошибаюсь.
В этом случае нет обращения потоков к диску и не нужно блокировка. Соответственно увеличивается срок службы жесткого диска и значительно увеличивается скорость получения данных.

Допустим у нас есть файл с аккаунтами mail:pass c количеством строк около миллиона.
128597


Посмотрим как с этим работает шаблон со стандартным способом получения данных из файла и шаблон с получением данных из памяти по порту.


Как видно из видео разница по скорости колоссальная.
Ну и о реализации:
Полный код сервера на c#:
using System;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using System.Linq;

class MyServer
{
    private static string[] records;
    private static int currentIndex = 0;
    private const string END_MESSAGE = "ВСЕ_ЗАПИСИ_ВЫДАНЫ";
    private static System.Timers.Timer saveTimer;
    private const string SAVE_FILE_NAME = "last.txt";
    private const string IMAP_FILE_NAME = "accounts.txt";


    static async Task Main()
    {
        // Загрузка записей из файла
        LoadRecords(IMAP_FILE_NAME);

        // Настройка таймера для сохранения
        SetupSaveTimer();

        // Настройка сервера
        int port = 8888;
        TcpListener server = new TcpListener(IPAddress.Any, port);
        server.Start();
        Console.WriteLine($"Сервер запущен на порту {port}");

        while (true)
        {
            TcpClient client = await server.AcceptTcpClientAsync(); // ждём запрос
            _ = HandleClientAsync(client); // асинхронная обработка запроса
        }
    }

    static void LoadRecords(string filename)
    {
        records = File.ReadAllLines(filename); 
        Console.WriteLine($"Загружено {records.Length} записей");
    }


    static void SetupSaveTimer()
    {
        saveTimer = new System.Timers.Timer(5 * 60 * 1000); // 5 минут
        saveTimer.Elapsed += SaveRemainingRecords;
        saveTimer.AutoReset = true;
        saveTimer.Enabled = true;
    }

    static void SaveRemainingRecords(object sender, ElapsedEventArgs e)
    {
        int remainingCount = records.Length - currentIndex;
        if (remainingCount > 0)
        {
            string[] remainingRecords = new string[remainingCount];
            Array.Copy(records, currentIndex, remainingRecords, 0, remainingCount);
            File.WriteAllLines(SAVE_FILE_NAME, remainingRecords);
            Console.WriteLine($"Сохранено {remainingCount} записей в файл {SAVE_FILE_NAME}");
        }
        else
        {
            File.WriteAllText(SAVE_FILE_NAME, END_MESSAGE);
            Console.WriteLine($"Все записи выданы. Сохранено сообщение о конце в файл {SAVE_FILE_NAME}");
        }
    }

    static async Task HandleClientAsync(TcpClient client) // обработка запроса клиента в асинхронном режиме
    {
        using (NetworkStream stream = client.GetStream())
        {
            string record = await GetNextRecordAsync();
            byte[] data = Encoding.UTF8.GetBytes(record);
            await stream.WriteAsync(data, 0, data.Length);
        }
        client.Close();
    }

    static async Task<string> GetNextRecordAsync()
    {
        int index = Interlocked.Increment(ref currentIndex) - 1;
        if (index < records.Length)
        {
            return records[index];
        }
        else
        {
            // Если записи закончились, формируем файл заново     
            LoadRecords(IMAP_FILE_NAME);
            currentIndex = 0;
            return await GetNextRecordAsync(); // вызов для получения первой записи из нового набора
        }
    }
}

Сервер считывает данные из файла и больше к нему не обращается, пока не раздаст все записи.
Когда раздаст все записи, снова считывает возможно уже обновленные записи из файла.
И продолжает раздавать
Main:
static async Task Main()
{
    // Загрузка записей из файла
    LoadRecords(IMAP_FILE_NAME);

    // Настройка таймера для сохранения
    SetupSaveTimer();

    // Настройка сервера
    int port = 8888;
    TcpListener server = new TcpListener(IPAddress.Any, port);
    server.Start();
    Console.WriteLine($"Сервер запущен на порту {port}");

    while (true)
    {
        TcpClient client = await server.AcceptTcpClientAsync(); // ждём запрос
        _ = HandleClientAsync(client); // асинхронная обработка запроса
    }
}
загружаем записи из файла в массив records
LoadRecords:
static void LoadRecords(string filename)
{
    records = File.ReadAllLines(filename); 
    Console.WriteLine($"Загружено {records.Length} записей");
}
настраиваем таймер для сохранения невыданных записей. В случае если записей миллионы, при аварийном выключении компьютера можно начать с файла last.txt.
SetupSaveTimer:
static void SetupSaveTimer()
{
    saveTimer = new System.Timers.Timer(5 * 60 * 1000); // 5 минут
    saveTimer.Elapsed += SaveRemainingRecords;
    saveTimer.AutoReset = true;
    saveTimer.Enabled = true;
}

static void SaveRemainingRecords(object sender, ElapsedEventArgs e)
{
    int remainingCount = records.Length - currentIndex;
    if (remainingCount > 0)
    {
        string[] remainingRecords = new string[remainingCount];
        Array.Copy(records, currentIndex, remainingRecords, 0, remainingCount);
        File.WriteAllLines(SAVE_FILE_NAME, remainingRecords);
        Console.WriteLine($"Сохранено {remainingCount} записей в файл {SAVE_FILE_NAME}");
    }
    else
    {
        File.WriteAllText(SAVE_FILE_NAME, END_MESSAGE);
        Console.WriteLine($"Все записи выданы. Сохранено сообщение о конце в файл {SAVE_FILE_NAME}");
    }
}
Запускаем сервер на порту 8888 (можно взять любой свободный порт)
Запускаем сервер на порту 8888:
// Настройка сервера
    int port = 8888;
    TcpListener server = new TcpListener(IPAddress.Any, port);
    server.Start();
    Console.WriteLine($"Сервер запущен на порту {port}");
Ждем запросы и выдаем строки массива
ждем запросы и выдаем строки массива:
static async Task HandleClientAsync(TcpClient client) // обработка запроса клиента в асинхронном режиме
{
    using (NetworkStream stream = client.GetStream())
    {
        string record = await GetNextRecordAsync();
        byte[] data = Encoding.UTF8.GetBytes(record);
        await stream.WriteAsync(data, 0, data.Length);
    }
    client.Close();
}

static async Task<string> GetNextRecordAsync()
{
    int index = Interlocked.Increment(ref currentIndex) - 1;
    if (index < records.Length)
    {
        return records[index];
    }
    else
    {
        // Если записи закончились, формируем файл заново     
        LoadRecords(IMAP_FILE_NAME);
        currentIndex = 0;
        return await GetNextRecordAsync(); // вызов для получения первой записи из нового набора
    }
}
Код запроса и получения записей, шаблон work_from_memory.zp
Код запроса и получения записей, шаблон work_from_memory.zp:
string s = "";
using (TcpClient client = new TcpClient("localhost", 8888))
    using (NetworkStream stream = client.GetStream())
    {
        byte[] data = new byte[1024];
        int bytesRead = stream.Read(data, 0, data.Length);
        s = Encoding.UTF8.GetString(data, 0, bytesRead);
    }
Стандартное подключение к TCP серверу по порту 8888 и потоковое получение записи.

Нужная запись в переменной s. Дальше можно делать с ней что угодно.

Если записи в базе данных или ещё где то, можно периодически считывать любым запросом нужные данные и сохранять в файл. Сделать такой шаблон и запускать по расписанию.

К серверу можно обращаться не только из потоков 1 шаблона, но и из любого количества шаблонов.

Также можно запустить сервер и открыть порт наружу и к нему можно обращаться с любых компьютеров из любых программ.
Например у вас несколько зеннопостеров работают на нескольких компьютерах(VDS), они все могут получать данные из одного файла, который может периодически обновляться из нужных источников.

Вроде всё, буду рад, если кто не знал такой способ и воспользуется.

Всем удачи! С наступающим Новым годом! Не забываем про лайки!
 

Attachments

Last edited:

bvbfor

Client
Joined
Apr 10, 2016
Messages
425
Reaction score
235
Points
43
Как пример - практическое применение для шаблона из статьи о генерации прокси из ВПН

В коде сервера указываем файл Good_Proxy.txt
Запускаем сервер.

В шаблоне Test_Job.zp вместо этого кубика
128904


используем кубик c# с кодом
Берем прокси с сервера:
using (TcpClient client = new TcpClient("localhost", 8888))
    using (NetworkStream stream = client.GetStream())
    {
        byte[] data = new byte[1024];
        int bytesRead = stream.Read(data, 0, data.Length);
        project.Variables["VPN_Proxy"].Value = Encoding.UTF8.GetString(data, 0, bytesRead);
    }
project.SendInfoToLog("Взяли прокси - " + project.Variables["VPN_Proxy"].Value);
Прокси будут выдаваться по кругу.

И по ветке выхода по недоступности прокси можно удалять прокси из файла, и такие прокси больше не будут выдаваться.
Удаление невалидного прокси из файла:
string filePath = "Путь к файлу Good_Proxy.txt";
lock(CommonCode.SyncObject)
{
   var lines = File.ReadAllLines(filePath).Where(line =>
      !line.Contains(project.Variables["VPN_Proxy"].Value)).ToList();
   File.WriteAllLines(filePath , lines);
}
project.SendInfoToLog("Осталось прокси в файле - " + lines.Count.ToString());
 
Last edited:

bvbfor

Client
Joined
Apr 10, 2016
Messages
425
Reaction score
235
Points
43
И если на компьютере, где запущен TCP сервер, открыть порт, можно получать VPN прокси с любого компьютера и поднимать на нём локальный прокси.
 
  • Thank you
Reactions: toad@ and Greenya

capvad91

Client
Joined
Mar 27, 2019
Messages
51
Reaction score
56
Points
18
И если на компьютере, где запущен TCP сервер, открыть порт, можно получать VPN прокси с любого компьютера и поднимать на нём локальный прокси.
Класс! Обязательно попробую.
Для меня так эти 2 статьи однозначно лучшие!
 

toad@

Client
Joined
Sep 28, 2023
Messages
6
Reaction score
0
Points
1
Получение данных в многопотоке из файлов или БД без блокировки.

Иногда спрашивают, и я не видел этого решения на форуме или может не искал как следует.

Если нужно построчно брать какие либо данные из файла или БД, например логин и пароль от каких либо аккаунтов , то при многопотоке приходится делать блокировку файла, что существенно замедляет работу, особенно если записей много и потоков сотни и более.

Можно использовать вариант загрузки записей в память и раздавать их при обращении по какому либо порту. Это называется TCP сервер, если не ошибаюсь.
В этом случае нет обращения потоков к диску и не нужно блокировка. Соответственно увеличивается срок службы жесткого диска и значительно увеличивается скорость получения данных.

Допустим у нас есть файл с аккаунтами mail:pass c количеством строк около миллиона.
View attachment 128597

Посмотрим как с этим работает шаблон со стандартным способом получения данных из файла и шаблон с получением данных из памяти по порту.


Как видно из видео разница по скорости колоссальная.
Ну и о реализации:
Полный код сервера на c#:
using System;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using System.Linq;

class MyServer
{
    private static string[] records;
    private static int currentIndex = 0;
    private const string END_MESSAGE = "ВСЕ_ЗАПИСИ_ВЫДАНЫ";
    private static System.Timers.Timer saveTimer;
    private const string SAVE_FILE_NAME = "last.txt";
    private const string IMAP_FILE_NAME = "accounts.txt";


    static async Task Main()
    {
        // Загрузка записей из файла
        LoadRecords(IMAP_FILE_NAME);

        // Настройка таймера для сохранения
        SetupSaveTimer();

        // Настройка сервера
        int port = 8888;
        TcpListener server = new TcpListener(IPAddress.Any, port);
        server.Start();
        Console.WriteLine($"Сервер запущен на порту {port}");

        while (true)
        {
            TcpClient client = await server.AcceptTcpClientAsync(); // ждём запрос
            _ = HandleClientAsync(client); // асинхронная обработка запроса
        }
    }

    static void LoadRecords(string filename)
    {
        records = File.ReadAllLines(filename);
        Console.WriteLine($"Загружено {records.Length} записей");
    }


    static void SetupSaveTimer()
    {
        saveTimer = new System.Timers.Timer(5 * 60 * 1000); // 5 минут
        saveTimer.Elapsed += SaveRemainingRecords;
        saveTimer.AutoReset = true;
        saveTimer.Enabled = true;
    }

    static void SaveRemainingRecords(object sender, ElapsedEventArgs e)
    {
        int remainingCount = records.Length - currentIndex;
        if (remainingCount > 0)
        {
            string[] remainingRecords = new string[remainingCount];
            Array.Copy(records, currentIndex, remainingRecords, 0, remainingCount);
            File.WriteAllLines(SAVE_FILE_NAME, remainingRecords);
            Console.WriteLine($"Сохранено {remainingCount} записей в файл {SAVE_FILE_NAME}");
        }
        else
        {
            File.WriteAllText(SAVE_FILE_NAME, END_MESSAGE);
            Console.WriteLine($"Все записи выданы. Сохранено сообщение о конце в файл {SAVE_FILE_NAME}");
        }
    }

    static async Task HandleClientAsync(TcpClient client) // обработка запроса клиента в асинхронном режиме
    {
        using (NetworkStream stream = client.GetStream())
        {
            string record = await GetNextRecordAsync();
            byte[] data = Encoding.UTF8.GetBytes(record);
            await stream.WriteAsync(data, 0, data.Length);
        }
        client.Close();
    }

    static async Task<string> GetNextRecordAsync()
    {
        int index = Interlocked.Increment(ref currentIndex) - 1;
        if (index < records.Length)
        {
            return records[index];
        }
        else
        {
            // Если записи закончились, формируем файл заново 
            LoadRecords(IMAP_FILE_NAME);
            currentIndex = 0;
            return await GetNextRecordAsync(); // вызов для получения первой записи из нового набора
        }
    }
}

Сервер считывает данные из файла и больше к нему не обращается, пока не раздаст все записи.
Когда раздаст все записи, снова считывает возможно уже обновленные записи из файла.
И продолжает раздавать
Main:
static async Task Main()
{
    // Загрузка записей из файла
    LoadRecords(IMAP_FILE_NAME);

    // Настройка таймера для сохранения
    SetupSaveTimer();

    // Настройка сервера
    int port = 8888;
    TcpListener server = new TcpListener(IPAddress.Any, port);
    server.Start();
    Console.WriteLine($"Сервер запущен на порту {port}");

    while (true)
    {
        TcpClient client = await server.AcceptTcpClientAsync(); // ждём запрос
        _ = HandleClientAsync(client); // асинхронная обработка запроса
    }
}
загружаем записи из файла в массив records
LoadRecords:
static void LoadRecords(string filename)
{
    records = File.ReadAllLines(filename);
    Console.WriteLine($"Загружено {records.Length} записей");
}
настраиваем таймер для сохранения невыданных записей. В случае если записей миллионы, при аварийном выключении компьютера можно начать с файла last.txt.
SetupSaveTimer:
static void SetupSaveTimer()
{
    saveTimer = new System.Timers.Timer(5 * 60 * 1000); // 5 минут
    saveTimer.Elapsed += SaveRemainingRecords;
    saveTimer.AutoReset = true;
    saveTimer.Enabled = true;
}

static void SaveRemainingRecords(object sender, ElapsedEventArgs e)
{
    int remainingCount = records.Length - currentIndex;
    if (remainingCount > 0)
    {
        string[] remainingRecords = new string[remainingCount];
        Array.Copy(records, currentIndex, remainingRecords, 0, remainingCount);
        File.WriteAllLines(SAVE_FILE_NAME, remainingRecords);
        Console.WriteLine($"Сохранено {remainingCount} записей в файл {SAVE_FILE_NAME}");
    }
    else
    {
        File.WriteAllText(SAVE_FILE_NAME, END_MESSAGE);
        Console.WriteLine($"Все записи выданы. Сохранено сообщение о конце в файл {SAVE_FILE_NAME}");
    }
}
Запускаем сервер на порту 8888 (можно взять любой свободный порт)
Запускаем сервер на порту 8888:
// Настройка сервера
    int port = 8888;
    TcpListener server = new TcpListener(IPAddress.Any, port);
    server.Start();
    Console.WriteLine($"Сервер запущен на порту {port}");
Ждем запросы и выдаем строки массива
ждем запросы и выдаем строки массива:
static async Task HandleClientAsync(TcpClient client) // обработка запроса клиента в асинхронном режиме
{
    using (NetworkStream stream = client.GetStream())
    {
        string record = await GetNextRecordAsync();
        byte[] data = Encoding.UTF8.GetBytes(record);
        await stream.WriteAsync(data, 0, data.Length);
    }
    client.Close();
}

static async Task<string> GetNextRecordAsync()
{
    int index = Interlocked.Increment(ref currentIndex) - 1;
    if (index < records.Length)
    {
        return records[index];
    }
    else
    {
        // Если записи закончились, формируем файл заново 
        LoadRecords(IMAP_FILE_NAME);
        currentIndex = 0;
        return await GetNextRecordAsync(); // вызов для получения первой записи из нового набора
    }
}
Код запроса и получения записей, шаблон work_from_memory.zp
Код запроса и получения записей, шаблон work_from_memory.zp:
string s = "";
using (TcpClient client = new TcpClient("localhost", 8888))
    using (NetworkStream stream = client.GetStream())
    {
        byte[] data = new byte[1024];
        int bytesRead = stream.Read(data, 0, data.Length);
        s = Encoding.UTF8.GetString(data, 0, bytesRead);
    }
Стандартное подключение к TCP серверу по порту 8888 и потоковое получение записи.

Нужная запись в переменной s. Дальше можно делать с ней что угодно.

Если записи в базе данных или ещё где то, можно периодически считывать любым запросом нужные данные и сохранять в файл. Сделать такой шаблон и запускать по расписанию.

К серверу можно обращаться не только из потоков 1 шаблона, но и из любого количества шаблонов.

Также можно запустить сервер и открыть порт наружу и к нему можно обращаться с любых компьютеров из любых программ.
Например у вас несколько зеннопостеров работают на нескольких компьютерах(VDS), они все могут получать данные из одного файла, который может периодически обновляться из нужных источников.

Вроде всё, буду рад, если кто не знал такой способ и воспользуется.

Всем удачи! С наступающим Новым годом! Не забываем про лайки!
Можно понизить до 7.7.13.0 ?
 

bvbfor

Client
Joined
Apr 10, 2016
Messages
425
Reaction score
235
Points
43

Attachments

  • Thank you
Reactions: toad@

toad@

Client
Joined
Sep 28, 2023
Messages
6
Reaction score
0
Points
1
Не получается запустить сервер, все как на видео делаю, вот такое сообщение выдает:
Выполнение действия CSharp OwnCode. Подключение не установлено, т.к. конечный компьютер отверг запрос на подключение 127.0.0.1:8888
 

bvbfor

Client
Joined
Apr 10, 2016
Messages
425
Reaction score
235
Points
43
Не получается запустить сервер, все как на видео делаю, вот такое сообщение выдает:
Выполнение действия CSharp OwnCode. Подключение не установлено, т.к. конечный компьютер отверг запрос на подключение 127.0.0.1:8888
запусти MyServer.exe
 

toad@

Client
Joined
Sep 28, 2023
Messages
6
Reaction score
0
Points
1
запусти MyServer.exe
Запускаю с правами админа или без, на секунду появляется командная строка и пропадает, запускаю скрипт, и без результата
 

bvbfor

Client
Joined
Apr 10, 2016
Messages
425
Reaction score
235
Points
43
Запускаю с правами админа или без, на секунду появляется командная строка и пропадает, запускаю скрипт, и без результата
Наверное не хватает Net8 - установи
 
  • Thank you
Reactions: toad@

bvbfor

Client
Joined
Apr 10, 2016
Messages
425
Reaction score
235
Points
43
 
  • Thank you
Reactions: toad@

toad@

Client
Joined
Sep 28, 2023
Messages
6
Reaction score
0
Points
1
Наверное не хватает Net8 - установи
Спасибо, установил и все получилось!
 

Users Who Are Viewing This Thread (Total: 0, Members: 0, Guests: 0)