Его Величество CDP. Слушаем websocket с помощью связки ZennoBrowser + C#

Porosenok

Client
Регистрация
26.09.2010
Сообщения
1 284
Благодарностей
108
Баллы
63
137872


Дисклеймер: я не являюсь сеньором-экспертом в C#, поэтому конструктивная критика приветствуется.​

Всем привет, ну вот и пришел мой черед радовать пользователей форума своим материалом (надеюсь интересным). В своей статье хочу затронуть такую тему как CDP и управление ZennoBrowser через него.
Ни для кого не секрет что многие сайты сегодня используют технологию Websocket для передачи данных клиентам. И иногда возникает необходимость послушать что же они там передают. Постоянно парсить новые данные среди элементов страницы очень неудобно. И тут то на помощь нам приходит великий и ужасный CDP. В статье мы сначала подключимся через CDP к ZennoBrowser, а затем в качестве примера зайдем на сайт биржи Gate и в режиме реального времени будем слушать данные которые биржа отдают юзеру через Websocket.

1) Что такое CDP?
Для начала немножко теории. Если коротко то CDP это такое API для управления браузерами на основе Chrome. Только работает это API не через обычные запросы, а все по той же технологии WebSocket, то есть мы открываем подключение, и в рамках одного подключения шлем свои запросы и получаем ответы. К нашей с вами удачи, ZennoBrowser как раз хромоподобный браузер, поэтому погнали дальше.

2) Готовимся подключаться к CDP
Как видно из названия статьи для взаимодействия с CDP будем использовать язык программирования C#. Для этого с официального сайта скачиваем Visual Studio Community.
Создаем проект - Консольное приложение:
137874


Называем его любым названием и вот мы в самом редакторе кода. Теперь необходимо установить несколько пакетов: во-первых библиотека для работы с Websocket (я буду использовать WatsonWebsocket) и во-вторых удобную библиотеку для работы с json - Newtonsoft.Json.
Для этого нажимаем "Проект" и в выпадающем меню выбираем "Управление пакетами NuGet", ищем и устанавливаем пакеты "Newtonsoft.Json" и "WatsonWebsocket". По итогу все должно выглядеть примерно так:
137875


Для дальнейших действий нам понадобится API-ключ ZennoBrowser. Его как вы знаете можно получить в ЛК.
Итак ключ получен, переходим к самому интересному - к коду. Нам необходимо получить ссылку для коннекта к самому инстансу ZennoBrowser через CDP. Для этого берем айди профиля и через API ZennoBrowser открываем его. Сразу обращаю внимание что для нашего примера нам нужен профиль где нет никаких открытых вкладок с сайтами, просто чистый профиль который при открытии показывает чистую страницу.
C#:
string apiKey = "ваш_апи_ключ";
string profileId = "айди_профиля";
var client = new HttpClient();

client.DefaultRequestHeaders.Add("Api-Token", apiKey);
var url = $"http://localhost:8160/v1/browser_instances/create?profileId={profileId}&workspaceId=-1&desktopName=&threadToken=";
var response = await client.PostAsync(url, null);

string result = await response.Content.ReadAsStringAsync();

var node = JObject.Parse(result);
string connectionString = node?["connectionString"]?.ToString();
В результате мы должны получить ссылку вида "ws://127.0.0.1:59141/devtools/browser/4cb32200-f6e5-4743-85b1-bd6838cb3cb1" в переменной connectionString, через эту ссылку мы и будем непосредственно подключаться к CDP ZennoBrowser.

3) Подключаемся и взаимодействуем с CDP
А теперь самое сложное. Нам необходим класс для взаимодействия с CDP через вебсокет. Для того чтобы его написать необходимо понимать как устроена работа CDP.
Сначала мы открываем вебсокет-соединение, затем отправляем свои запросы и слушаем ответы (и то и то в формате json). У каждого отправленного запроса должно быть int поле id, для того чтобы среди пришедших сообщений найти ответ на наш запрос. Так же могут приходить события - сообщения которые приходят сами по себе, без нашего предварительного запроса. У таких сообщений есть поле method.
Ниже сам класс, с небольшими комментариями.
C#:
public class CDPClient : IDisposable
{
    //Сам вебсокет-клиент из библиотеки WatsonWebsocket
    private readonly WatsonWsClient _client;
    //Начальное значение для id наших отправленных сообщений
    private int _messageId = 0;
    //Словарик где мы храним id отправленного сообщения и объект TaskCompletionSource откуда мы сможем получить ответ на наше сообщений
    private readonly ConcurrentDictionary<int, TaskCompletionSource<JObject>> _pendingRequests = new ConcurrentDictionary<int, TaskCompletionSource<JObject>>();
    //Словарик с обработчиками событий
    private readonly Dictionary<string, List<Action<JObject>>> _eventHandlers = new Dictionary<string, List<Action<JObject>>>();
    private readonly object _eventHandlersLock = new object();

    public CDPClient(string url)
    {
        //Подключаемся к CDP по урлу, настраиваем обработчик входящих сообщений OnMessageReceived и другие методы
        _client = new WatsonWsClient(new Uri(url));
        _client.MessageReceived += OnMessageReceived;
        _client.ServerConnected += (s, e) => Console.WriteLine("Подключились к CDP. " + url);
        _client.ServerDisconnected += OnServerDisconnected;
        _client.Start();
    }

    private void OnServerDisconnected(object sender, EventArgs args)
    {
        foreach (var tcs in _pendingRequests.Values)
        {
            tcs.TrySetException(new Exception("Соединение с CDP было разорвано."));
        }
        _pendingRequests.Clear();
    }

    //Метод-обработчик входящих сообщений. Каждое сообщение полученное по вебсокету от CDP мы получаем и обрабатываем с помощью этого метода
    private void OnMessageReceived(object sender, MessageReceivedEventArgs args)
    {
        try
        {
            var message = Encoding.UTF8.GetString(args.Data);
            var json = JObject.Parse(message);

            //Если полученный ответ это сообщение на ранее отправленный запрос (есть поле id), то удаляем запись с этим id из словарика _pendingRequests и возвращаем полученный ответ при помощи TaskCompletionSource
            if (json["id"] != null)
            {
                int id = json["id"].Value<int>();
                if (_pendingRequests.TryRemove(id, out var tcs))
                {
                    tcs.SetResult(json);
                }
            }
            //А если входящее сообщение это событие, то проверяем нет ли у нас в словарике _eventHandlers обработчика для этого события, и если есть выполняем его
            else if (json["method"] != null)
            {
                string methodName = json["method"].Value<string>();
                JObject parameters = json["params"] as JObject;

                List<Action<JObject>> handlers = null;

                lock (_eventHandlersLock)
                {
                    if (_eventHandlers.TryGetValue(methodName, out var handlerList))
                    {
                        handlers = new List<Action<JObject>>(handlerList);
                    }
                }

                if (handlers != null)
                {
                    foreach (var handler in handlers)
                    {
                        try
                        {
                            handler(parameters);
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine($"Ошибка в обработчике события '{methodName}': {ex.Message}");
                        }
                    }
                }
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Ошибка в OnMessageReceived: {ex.Message}");
        
        }
    }

    //метод для отправки сообщения на CDP и ожидания ответа
    public async Task<JObject> SendCommandAsync(string method, object parameters = null, int timeoutMs = 7000, CancellationToken cancellationToken = default, string sessionId = null)
    {
        int id = Interlocked.Increment(ref _messageId);
        var tcs = new TaskCompletionSource<JObject>(TaskCreationOptions.RunContinuationsAsynchronously);
        _pendingRequests.TryAdd(id, tcs);

        var message = new JObject
        {
            ["id"] = id,
            ["method"] = method,
            ["params"] = parameters != null ? JObject.FromObject(parameters) : new JObject()
        };
        if (!string.IsNullOrEmpty(sessionId))
        {
            message["sessionId"] = sessionId;
        }
        var jsonString = message.ToString();
        await _client.SendAsync(jsonString);

        using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
        {
            if (timeoutMs > 0)
            {
                cts.CancelAfter(timeoutMs);
            }

            using (cts.Token.Register(() =>
            {
                _pendingRequests.TryRemove(id, out _);

                var errorMessage = $"Таймаут для команды {jsonString}";
                tcs.TrySetException(new TaskCanceledException(errorMessage));
            }))
            {
                return await tcs.Task;
            }
        }
    }

    // Методы для подписки на события
    public void SubscribeEvent(string methodName, Action<JObject> handler)
    {
        lock (_eventHandlersLock)
        {
            if (!_eventHandlers.ContainsKey(methodName))
            {
                _eventHandlers[methodName] = new List<Action<JObject>>();
            }
            _eventHandlers[methodName].Add(handler);
        }
    }

    //Метод для отписки от события
    public void UnsubscribeEvent(string methodName, Action<JObject> handler)
    {
        lock (_eventHandlersLock)
        {
            if (_eventHandlers.TryGetValue(methodName, out var handlers))
            {
                handlers.Remove(handler);
                if (handlers.Count == 0)
                {
                    _eventHandlers.Remove(methodName);
                }
            }
        }
    }

    public void Dispose()
    {
        _client.Dispose();
    }
}
Теперь приступаем непосредственно к работе с CDP. Ранее мы запустили инстанс ZennoBrowser, теперь нам нужно подключиться к нему через connectionString, а затем получить sessionId от нужной вкладки и дальше уже работать с ней.
C#:
//Подключаемся к CDP через наш класс
using (var cdpClient = new CDPClient(connectionString))
{
    //получаем список таргетов - всех вкладок (и не только)
    var targets = await cdpClient.SendCommandAsync("Target.getTargets");

    //ищем первую открытую вкладку
    string targetId = (string)targets["result"]["targetInfos"]
        .Children()
        .FirstOrDefault(t => (string)t["url"] == "chrome://newtab/")?
        ["targetId"];

    //Подключаемся к нужной вкладке, получаем ее sessionId
    var command = await cdpClient.SendCommandAsync("Target.attachToTarget", new
    {
        targetId = targetId,
        flatten = true
    });
    string firstPageSessionId = command["result"]["sessionId"].Value<string>();
    if (String.IsNullOrEmpty(firstPageSessionId))
    {
        throw new Exception("Не удалось получить firstPageSessionId.");
    }

    //Для того чтобы работать работать с вебсокетом нужно включить сеть
    await cdpClient.SendCommandAsync("Network.enable", null, 5000, sessionId: firstPageSessionId);

    //Переходим на целевой сайт
    await cdpClient.SendCommandAsync("Page.navigate", new { url = "https://www.gate.com/ru/trade/BTC_USDT" }, 10000, sessionId: firstPageSessionId);

    //Метод мониторинга вебсокета на сайте
    await MonitorWS(cdpClient);
}
Почти готово, теперь распишем метод MonitorWS, в котором и будет происходить вся магия. Если вкратце, то все полученные браузером сообщения с вебсокета приходят в CDP как события типа "Network.webSocketFrameReceived", мы подписываемся на такие события, если событие случается то выполняется заранее созданный метод-обработчик EventHandlerAsync (внутри MonitorWS). В этом примере просто делаем вывод входящих сообщений в консоль. Сам метод MonitorWS будет работать бесконечно, пока не поймает первое исключение (например придет ответ который не получится распарсить). В случае ошибки отписываемся от события.
C#:
static async Task MonitorWS(CDPClient cdpClient)
{
    var eventHandlerTcs = new TaskCompletionSource<bool>();

    void EventHandlerAsync(JObject parameters)
    {
        var task = Task.Run(async () =>
        {
            string response = null;

            try
            {
                string requestId = parameters["requestId"].Value<string>();
                response = parameters["response"]?["payloadData"]?.Value<string>();
            
                //Вот тут логика обработки входящего сообщения. В данном случае мы просто выводим его в консоль.
                Console.WriteLine(response);
            }
            catch (Exception ex)
            {
                eventHandlerTcs.TrySetException(ex);
            }
        });
    }

    cdpClient.SubscribeEvent("Network.webSocketFrameReceived", EventHandlerAsync);

    try
    {
        await eventHandlerTcs.Task;
    }
    finally
    {
        cdpClient.UnsubscribeEvent("Network.webSocketFrameReceived", EventHandlerAsync);
    }
}
Запускаем наш код и видим в консоли бесконечный поток входящих сообщений:
137893


Поздравляю! У нас получилось!


Благодарю всех за внимание! Надеюсь данная статья оказалась полезной. Если есть вопросы, то пишите - отвечу. Ниже прикрепляю полный проект.
 

Вложения

Последнее редактирование:
Регистрация
19.10.2018
Сообщения
205
Благодарностей
60
Баллы
28
Что можно делать потом с этими данными? Есть какие-то примеры монетизации :-) Просто ценыкрипты можно и по апи получать отовсюду
 

Porosenok

Client
Регистрация
26.09.2010
Сообщения
1 284
Благодарностей
108
Баллы
63
В данном случае это просто пример реализации взаимодействия с CDP и вебсокетом. А для чего использовать и как монетизировать у каждого свои цели могут быть, например вот на форуме человека искал для своей какой-то темы как реализовать прослушивание с сайта.
 
  • Спасибо
Реакции: 6585_Лягуша

Кто просматривает тему: (Всего: 1, Пользователи: 0, Гости: 1)