❄️Получение данных в многопотоке из файлов или БД без блокировки.

  • Автор темы Автор темы bvbfor
  • Дата начала Дата начала

bvbfor

Client
Регистрация
10.04.2016
Сообщения
493
Реакции
396
Баллы
63
Получение данных в многопотоке из файлов или БД без блокировки.

Иногда спрашивают, и я не видел этого решения на форуме или может не искал как следует.

Если нужно построчно брать какие либо данные из файла или БД, например логин и пароль от каких либо аккаунтов , то при многопотоке приходится делать блокировку файла, что существенно замедляет работу, особенно если записей много и потоков сотни и более.

Можно использовать вариант загрузки записей в память и раздавать их при обращении по какому либо порту. Это называется TCP сервер, если не ошибаюсь.
В этом случае нет обращения потоков к диску и не нужно блокировка. Соответственно увеличивается срок службы жесткого диска и значительно увеличивается скорость получения данных.

Допустим у нас есть файл с аккаунтами mail:pass c количеством строк около миллиона.
128597


Посмотрим как с этим работает шаблон со стандартным способом получения данных из файла и шаблон с получением данных из памяти по порту.


Как видно из видео разница по скорости колоссальная.
Ну и о реализации:
Полный код сервера на c#:
Развернуть Свернуть Копировать
using System;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using System.Linq;

class MyServer
{
    private static string[] records;
    private static int currentIndex = 0;
    private const string END_MESSAGE = "ВСЕ_ЗАПИСИ_ВЫДАНЫ";
    private static System.Timers.Timer saveTimer;
    private const string SAVE_FILE_NAME = "last.txt";
    private const string IMAP_FILE_NAME = "accounts.txt";


    static async Task Main()
    {
        // Загрузка записей из файла
        LoadRecords(IMAP_FILE_NAME);

        // Настройка таймера для сохранения
        SetupSaveTimer();

        // Настройка сервера
        int port = 8888;
        TcpListener server = new TcpListener(IPAddress.Any, port);
        server.Start();
        Console.WriteLine($"Сервер запущен на порту {port}");

        while (true)
        {
            TcpClient client = await server.AcceptTcpClientAsync(); // ждём запрос
            _ = HandleClientAsync(client); // асинхронная обработка запроса
        }
    }

    static void LoadRecords(string filename)
    {
        records = File.ReadAllLines(filename); 
        Console.WriteLine($"Загружено {records.Length} записей");
    }


    static void SetupSaveTimer()
    {
        saveTimer = new System.Timers.Timer(5 * 60 * 1000); // 5 минут
        saveTimer.Elapsed += SaveRemainingRecords;
        saveTimer.AutoReset = true;
        saveTimer.Enabled = true;
    }

    static void SaveRemainingRecords(object sender, ElapsedEventArgs e)
    {
        int remainingCount = records.Length - currentIndex;
        if (remainingCount > 0)
        {
            string[] remainingRecords = new string[remainingCount];
            Array.Copy(records, currentIndex, remainingRecords, 0, remainingCount);
            File.WriteAllLines(SAVE_FILE_NAME, remainingRecords);
            Console.WriteLine($"Сохранено {remainingCount} записей в файл {SAVE_FILE_NAME}");
        }
        else
        {
            File.WriteAllText(SAVE_FILE_NAME, END_MESSAGE);
            Console.WriteLine($"Все записи выданы. Сохранено сообщение о конце в файл {SAVE_FILE_NAME}");
        }
    }

    static async Task HandleClientAsync(TcpClient client) // обработка запроса клиента в асинхронном режиме
    {
        using (NetworkStream stream = client.GetStream())
        {
            string record = await GetNextRecordAsync();
            byte[] data = Encoding.UTF8.GetBytes(record);
            await stream.WriteAsync(data, 0, data.Length);
        }
        client.Close();
    }

    static async Task<string> GetNextRecordAsync()
    {
        int index = Interlocked.Increment(ref currentIndex) - 1;
        if (index < records.Length)
        {
            return records[index];
        }
        else
        {
            // Если записи закончились, формируем файл заново     
            LoadRecords(IMAP_FILE_NAME);
            currentIndex = 0;
            return await GetNextRecordAsync(); // вызов для получения первой записи из нового набора
        }
    }
}

Сервер считывает данные из файла и больше к нему не обращается, пока не раздаст все записи.
Когда раздаст все записи, снова считывает возможно уже обновленные записи из файла.
И продолжает раздавать
Main:
Развернуть Свернуть Копировать
static async Task Main()
{
    // Загрузка записей из файла
    LoadRecords(IMAP_FILE_NAME);

    // Настройка таймера для сохранения
    SetupSaveTimer();

    // Настройка сервера
    int port = 8888;
    TcpListener server = new TcpListener(IPAddress.Any, port);
    server.Start();
    Console.WriteLine($"Сервер запущен на порту {port}");

    while (true)
    {
        TcpClient client = await server.AcceptTcpClientAsync(); // ждём запрос
        _ = HandleClientAsync(client); // асинхронная обработка запроса
    }
}

загружаем записи из файла в массив records
LoadRecords:
Развернуть Свернуть Копировать
static void LoadRecords(string filename)
{
    records = File.ReadAllLines(filename); 
    Console.WriteLine($"Загружено {records.Length} записей");
}

настраиваем таймер для сохранения невыданных записей. В случае если записей миллионы, при аварийном выключении компьютера можно начать с файла last.txt.
SetupSaveTimer:
Развернуть Свернуть Копировать
static void SetupSaveTimer()
{
    saveTimer = new System.Timers.Timer(5 * 60 * 1000); // 5 минут
    saveTimer.Elapsed += SaveRemainingRecords;
    saveTimer.AutoReset = true;
    saveTimer.Enabled = true;
}

static void SaveRemainingRecords(object sender, ElapsedEventArgs e)
{
    int remainingCount = records.Length - currentIndex;
    if (remainingCount > 0)
    {
        string[] remainingRecords = new string[remainingCount];
        Array.Copy(records, currentIndex, remainingRecords, 0, remainingCount);
        File.WriteAllLines(SAVE_FILE_NAME, remainingRecords);
        Console.WriteLine($"Сохранено {remainingCount} записей в файл {SAVE_FILE_NAME}");
    }
    else
    {
        File.WriteAllText(SAVE_FILE_NAME, END_MESSAGE);
        Console.WriteLine($"Все записи выданы. Сохранено сообщение о конце в файл {SAVE_FILE_NAME}");
    }
}

Запускаем сервер на порту 8888 (можно взять любой свободный порт)
Запускаем сервер на порту 8888:
Развернуть Свернуть Копировать
// Настройка сервера
    int port = 8888;
    TcpListener server = new TcpListener(IPAddress.Any, port);
    server.Start();
    Console.WriteLine($"Сервер запущен на порту {port}");

Ждем запросы и выдаем строки массива
ждем запросы и выдаем строки массива:
Развернуть Свернуть Копировать
static async Task HandleClientAsync(TcpClient client) // обработка запроса клиента в асинхронном режиме
{
    using (NetworkStream stream = client.GetStream())
    {
        string record = await GetNextRecordAsync();
        byte[] data = Encoding.UTF8.GetBytes(record);
        await stream.WriteAsync(data, 0, data.Length);
    }
    client.Close();
}

static async Task<string> GetNextRecordAsync()
{
    int index = Interlocked.Increment(ref currentIndex) - 1;
    if (index < records.Length)
    {
        return records[index];
    }
    else
    {
        // Если записи закончились, формируем файл заново     
        LoadRecords(IMAP_FILE_NAME);
        currentIndex = 0;
        return await GetNextRecordAsync(); // вызов для получения первой записи из нового набора
    }
}
Код запроса и получения записей, шаблон work_from_memory.zp
Код запроса и получения записей, шаблон work_from_memory.zp:
Развернуть Свернуть Копировать
string s = "";
using (TcpClient client = new TcpClient("localhost", 8888))
    using (NetworkStream stream = client.GetStream())
    {
        byte[] data = new byte[1024];
        int bytesRead = stream.Read(data, 0, data.Length);
        s = Encoding.UTF8.GetString(data, 0, bytesRead);
    }

Стандартное подключение к TCP серверу по порту 8888 и потоковое получение записи.

Нужная запись в переменной s. Дальше можно делать с ней что угодно.

Если записи в базе данных или ещё где то, можно периодически считывать любым запросом нужные данные и сохранять в файл. Сделать такой шаблон и запускать по расписанию.

К серверу можно обращаться не только из потоков 1 шаблона, но и из любого количества шаблонов.

Также можно запустить сервер и открыть порт наружу и к нему можно обращаться с любых компьютеров из любых программ.
Например у вас несколько зеннопостеров работают на нескольких компьютерах(VDS), они все могут получать данные из одного файла, который может периодически обновляться из нужных источников.

Вроде всё, буду рад, если кто не знал такой способ и воспользуется.

Всем удачи! С наступающим Новым годом! Не забываем про лайки!

 

Вложения

Последнее редактирование модератором:
Как пример - практическое применение для шаблона из статьи о генерации прокси из ВПН

В коде сервера указываем файл Good_Proxy.txt
Запускаем сервер.

В шаблоне Test_Job.zp вместо этого кубика
128904


используем кубик c# с кодом
Берем прокси с сервера:
Развернуть Свернуть Копировать
using (TcpClient client = new TcpClient("localhost", 8888))
    using (NetworkStream stream = client.GetStream())
    {
        byte[] data = new byte[1024];
        int bytesRead = stream.Read(data, 0, data.Length);
        project.Variables["VPN_Proxy"].Value = Encoding.UTF8.GetString(data, 0, bytesRead);
    }
project.SendInfoToLog("Взяли прокси - " + project.Variables["VPN_Proxy"].Value);

Прокси будут выдаваться по кругу.

И по ветке выхода по недоступности прокси можно удалять прокси из файла, и такие прокси больше не будут выдаваться.
Удаление невалидного прокси из файла:
Развернуть Свернуть Копировать
string filePath = "Путь к файлу Good_Proxy.txt";
lock(CommonCode.SyncObject)
{
   var lines = File.ReadAllLines(filePath).Where(line =>
      !line.Contains(project.Variables["VPN_Proxy"].Value)).ToList();
   File.WriteAllLines(filePath , lines);
}
project.SendInfoToLog("Осталось прокси в файле - " + lines.Count.ToString());
 
Последнее редактирование:
И если на компьютере, где запущен TCP сервер, открыть порт, можно получать VPN прокси с любого компьютера и поднимать на нём локальный прокси.
 
  • Спасибо
Реакции: toad@ и Greenya
И если на компьютере, где запущен TCP сервер, открыть порт, можно получать VPN прокси с любого компьютера и поднимать на нём локальный прокси.
Класс! Обязательно попробую.
Для меня так эти 2 статьи однозначно лучшие!
 
Получение данных в многопотоке из файлов или БД без блокировки.

Иногда спрашивают, и я не видел этого решения на форуме или может не искал как следует.

Если нужно построчно брать какие либо данные из файла или БД, например логин и пароль от каких либо аккаунтов , то при многопотоке приходится делать блокировку файла, что существенно замедляет работу, особенно если записей много и потоков сотни и более.

Можно использовать вариант загрузки записей в память и раздавать их при обращении по какому либо порту. Это называется TCP сервер, если не ошибаюсь.
В этом случае нет обращения потоков к диску и не нужно блокировка. Соответственно увеличивается срок службы жесткого диска и значительно увеличивается скорость получения данных.

Допустим у нас есть файл с аккаунтами mail:pass c количеством строк около миллиона.
Посмотреть вложение 128597

Посмотрим как с этим работает шаблон со стандартным способом получения данных из файла и шаблон с получением данных из памяти по порту.


Как видно из видео разница по скорости колоссальная.
Ну и о реализации:
Полный код сервера на c#:
Развернуть Свернуть Копировать
using System;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using System.Linq;

class MyServer
{
    private static string[] records;
    private static int currentIndex = 0;
    private const string END_MESSAGE = "ВСЕ_ЗАПИСИ_ВЫДАНЫ";
    private static System.Timers.Timer saveTimer;
    private const string SAVE_FILE_NAME = "last.txt";
    private const string IMAP_FILE_NAME = "accounts.txt";


    static async Task Main()
    {
        // Загрузка записей из файла
        LoadRecords(IMAP_FILE_NAME);

        // Настройка таймера для сохранения
        SetupSaveTimer();

        // Настройка сервера
        int port = 8888;
        TcpListener server = new TcpListener(IPAddress.Any, port);
        server.Start();
        Console.WriteLine($"Сервер запущен на порту {port}");

        while (true)
        {
            TcpClient client = await server.AcceptTcpClientAsync(); // ждём запрос
            _ = HandleClientAsync(client); // асинхронная обработка запроса
        }
    }

    static void LoadRecords(string filename)
    {
        records = File.ReadAllLines(filename);
        Console.WriteLine($"Загружено {records.Length} записей");
    }


    static void SetupSaveTimer()
    {
        saveTimer = new System.Timers.Timer(5 * 60 * 1000); // 5 минут
        saveTimer.Elapsed += SaveRemainingRecords;
        saveTimer.AutoReset = true;
        saveTimer.Enabled = true;
    }

    static void SaveRemainingRecords(object sender, ElapsedEventArgs e)
    {
        int remainingCount = records.Length - currentIndex;
        if (remainingCount > 0)
        {
            string[] remainingRecords = new string[remainingCount];
            Array.Copy(records, currentIndex, remainingRecords, 0, remainingCount);
            File.WriteAllLines(SAVE_FILE_NAME, remainingRecords);
            Console.WriteLine($"Сохранено {remainingCount} записей в файл {SAVE_FILE_NAME}");
        }
        else
        {
            File.WriteAllText(SAVE_FILE_NAME, END_MESSAGE);
            Console.WriteLine($"Все записи выданы. Сохранено сообщение о конце в файл {SAVE_FILE_NAME}");
        }
    }

    static async Task HandleClientAsync(TcpClient client) // обработка запроса клиента в асинхронном режиме
    {
        using (NetworkStream stream = client.GetStream())
        {
            string record = await GetNextRecordAsync();
            byte[] data = Encoding.UTF8.GetBytes(record);
            await stream.WriteAsync(data, 0, data.Length);
        }
        client.Close();
    }

    static async Task<string> GetNextRecordAsync()
    {
        int index = Interlocked.Increment(ref currentIndex) - 1;
        if (index < records.Length)
        {
            return records[index];
        }
        else
        {
            // Если записи закончились, формируем файл заново 
            LoadRecords(IMAP_FILE_NAME);
            currentIndex = 0;
            return await GetNextRecordAsync(); // вызов для получения первой записи из нового набора
        }
    }
}

Сервер считывает данные из файла и больше к нему не обращается, пока не раздаст все записи.
Когда раздаст все записи, снова считывает возможно уже обновленные записи из файла.
И продолжает раздавать
Main:
Развернуть Свернуть Копировать
static async Task Main()
{
    // Загрузка записей из файла
    LoadRecords(IMAP_FILE_NAME);

    // Настройка таймера для сохранения
    SetupSaveTimer();

    // Настройка сервера
    int port = 8888;
    TcpListener server = new TcpListener(IPAddress.Any, port);
    server.Start();
    Console.WriteLine($"Сервер запущен на порту {port}");

    while (true)
    {
        TcpClient client = await server.AcceptTcpClientAsync(); // ждём запрос
        _ = HandleClientAsync(client); // асинхронная обработка запроса
    }
}

загружаем записи из файла в массив records
LoadRecords:
Развернуть Свернуть Копировать
static void LoadRecords(string filename)
{
    records = File.ReadAllLines(filename);
    Console.WriteLine($"Загружено {records.Length} записей");
}

настраиваем таймер для сохранения невыданных записей. В случае если записей миллионы, при аварийном выключении компьютера можно начать с файла last.txt.
SetupSaveTimer:
Развернуть Свернуть Копировать
static void SetupSaveTimer()
{
    saveTimer = new System.Timers.Timer(5 * 60 * 1000); // 5 минут
    saveTimer.Elapsed += SaveRemainingRecords;
    saveTimer.AutoReset = true;
    saveTimer.Enabled = true;
}

static void SaveRemainingRecords(object sender, ElapsedEventArgs e)
{
    int remainingCount = records.Length - currentIndex;
    if (remainingCount > 0)
    {
        string[] remainingRecords = new string[remainingCount];
        Array.Copy(records, currentIndex, remainingRecords, 0, remainingCount);
        File.WriteAllLines(SAVE_FILE_NAME, remainingRecords);
        Console.WriteLine($"Сохранено {remainingCount} записей в файл {SAVE_FILE_NAME}");
    }
    else
    {
        File.WriteAllText(SAVE_FILE_NAME, END_MESSAGE);
        Console.WriteLine($"Все записи выданы. Сохранено сообщение о конце в файл {SAVE_FILE_NAME}");
    }
}

Запускаем сервер на порту 8888 (можно взять любой свободный порт)
Запускаем сервер на порту 8888:
Развернуть Свернуть Копировать
// Настройка сервера
    int port = 8888;
    TcpListener server = new TcpListener(IPAddress.Any, port);
    server.Start();
    Console.WriteLine($"Сервер запущен на порту {port}");

Ждем запросы и выдаем строки массива
ждем запросы и выдаем строки массива:
Развернуть Свернуть Копировать
static async Task HandleClientAsync(TcpClient client) // обработка запроса клиента в асинхронном режиме
{
    using (NetworkStream stream = client.GetStream())
    {
        string record = await GetNextRecordAsync();
        byte[] data = Encoding.UTF8.GetBytes(record);
        await stream.WriteAsync(data, 0, data.Length);
    }
    client.Close();
}

static async Task<string> GetNextRecordAsync()
{
    int index = Interlocked.Increment(ref currentIndex) - 1;
    if (index < records.Length)
    {
        return records[index];
    }
    else
    {
        // Если записи закончились, формируем файл заново 
        LoadRecords(IMAP_FILE_NAME);
        currentIndex = 0;
        return await GetNextRecordAsync(); // вызов для получения первой записи из нового набора
    }
}
Код запроса и получения записей, шаблон work_from_memory.zp
Код запроса и получения записей, шаблон work_from_memory.zp:
Развернуть Свернуть Копировать
string s = "";
using (TcpClient client = new TcpClient("localhost", 8888))
    using (NetworkStream stream = client.GetStream())
    {
        byte[] data = new byte[1024];
        int bytesRead = stream.Read(data, 0, data.Length);
        s = Encoding.UTF8.GetString(data, 0, bytesRead);
    }

Стандартное подключение к TCP серверу по порту 8888 и потоковое получение записи.

Нужная запись в переменной s. Дальше можно делать с ней что угодно.

Если записи в базе данных или ещё где то, можно периодически считывать любым запросом нужные данные и сохранять в файл. Сделать такой шаблон и запускать по расписанию.

К серверу можно обращаться не только из потоков 1 шаблона, но и из любого количества шаблонов.

Также можно запустить сервер и открыть порт наружу и к нему можно обращаться с любых компьютеров из любых программ.
Например у вас несколько зеннопостеров работают на нескольких компьютерах(VDS), они все могут получать данные из одного файла, который может периодически обновляться из нужных источников.

Вроде всё, буду рад, если кто не знал такой способ и воспользуется.

Всем удачи! С наступающим Новым годом! Не забываем про лайки!
Можно понизить до 7.7.13.0 ?
 

Вложения

  • Спасибо
Реакции: toad@
Не получается запустить сервер, все как на видео делаю, вот такое сообщение выдает:
Выполнение действия CSharp OwnCode. Подключение не установлено, т.к. конечный компьютер отверг запрос на подключение 127.0.0.1:8888
 
Не получается запустить сервер, все как на видео делаю, вот такое сообщение выдает:
Выполнение действия CSharp OwnCode. Подключение не установлено, т.к. конечный компьютер отверг запрос на подключение 127.0.0.1:8888
запусти MyServer.exe
 
Запускаю с правами админа или без, на секунду появляется командная строка и пропадает, запускаю скрипт, и без результата
Наверное не хватает Net8 - установи
 
  • Спасибо
Реакции: toad@
  • Спасибо
Реакции: toad@
Скажите пожалуйста, где редактировать код сервера для того что-бы указать свои файлы?
И еще вопрос.
Есть у меня несколько шаблонов и у каждого свои файлы с разными данными которые нужно брать в многопотоке. Как можно это сделать? несколько файлов указать в настройке сервера? или под каждый проект запускать свой сервер со своим портом?
 

Кто просматривает тему: (Всего: 0, Пользователи: 0, Гости: 0)