А вы знали, что брокеры сообщений решают проблему перегруженных многопоточных шаблонов?

Viking01

Client
Регистрация
19.08.2017
Сообщения
238
Благодарностей
176
Баллы
43
Зеннопостер + брокер сообщений, или как упростить многопоточные шаблоны раза в два.

134097

Статья ориентирована на продвинутых пользователей, которые представляют, что такое докер и микро-сервисы.
Показывает, как упростить многопоточные шаблоны в несколько раз и сделать логику простой и линейной, как шпала.


... Бывало ли у вас так, что задумываете написать шаблон...
... и в голове появляется некое сопротивление, что вот опять надо будет тупые циклы прописывать...
... на проверку прокси, на то, на се, на такие-то ситуации, на разные ошибки...
... ой как в лом делать это в сотый раз?..


Работать с зенкой начал лет 8 назад. Софт предпочитаю писать сам, под конкретный проект и стек, но у меня скопилась масса наработок по ней, например, по использованию микросервисной архитектуры, и наверно, пришла пора делиться)

== Что раздражает больше всего?

Берем многопоточный шаблон. Например, спарсить сайт. Или нагулять профили. Или поведенческие покачать.
В нем надо реализовать блоки логики.
Например, прописать логику выбора прокси - выбирать прокси только определенной страны (наш сайт в зависимости от страны может менять язык страницы и валюту).
Прописать логику выбора url для парсинга - чтобы не пересеклись разные экземпляры, в многопотоке.
Прописать логику сохранения результатов - предположим, результаты надо предварительно обрабатывать, не совать же чистый html в большем объеме в базу, нам же данные нужны.
Какая-то дополнительная логика - вдруг надо еще между разными экземплярами синхронизировать, например, отпечатки устройств, или какие-то специфические настройки.

Ну, шаблон уже выглядит не маленьким, да?


== И что? Все муторно-напряжно, но решаемо же.

Есть несколько типовых вариантов решения такой задачи:

1. Синхронизация через базу - ставим тот же sqlLite или mysql, заводим таблицы, и предполагаем, что шаблоны будут стучаться в базу, отмечать, что взяли задание, отрабатывать и сохранять результаты обратно в базу.

Решение рабочее, но со своими минусами:
- параллельный доступ в базу - надо предусмотреть ситуацию в коде, когда два шаблона одновременно постучались в базу - чтобы они не взяли одну и ту же строку задания.
- аналогично и с сохранением.
- в зенке писать кучу кода - или в кубиках, или отдельным проектом с использованием ef - проще, но все равно куча кода.
- предусмотреть ошибки - например, для sqllite чтобы не словить database locked - эти ошибки один фиг надо дополнительным кодом обрабатывать, даже если wal включили.
- если шаблон рухнул в процессе исполнения - также новые блоки кода сохранить его результаты или отметить, что эти данные можно снова взять в работу.
Итого - дополнительный код, код, код...

2. Через родные таблицы и списки, которые привязываются к файликам.

Ну прокси можно брать из файла - взять строчку с удалением, добавить в конец очереди
Аналогично с заданиями - брать строчку таблицы, добавлять в конец или в отдельную таблицу, использовать маркеры статуса задания или маркеры какой экземпляр взял.
Но тоже не все так просто -
- опять, предусматривать ошибки, что шаблон может рухнуть на любой стадии, что тогда делать с данными?
- а если вы парсите сайт на 300 - 500 тысяч страниц, какого размера будут таблицы и списки?
- а если пока вы проверяете список на уникальность, кто-то возьмет да и добавит туда запись?
- а как убедиться, что шаблоны взяли уникальные данные, не глюкнули и не хапнули одно и то же? Например, если для парсинга вы используете авторизацию на сайте, и два шаблона зайдут с одним и тем же акком с разных ip, то легко и бан всего акка можно словить.
- продвинутые пользователи могут передавать еще словари через контекст или использовать глобальные переменные, но мы все равно упираемся в дополнительный код.
Итого - опять, всю логику в код, код, код...

3. Отдельным приложением с rest api.

Есть и такой вариант - написать отдельное микро-приложение, которое будет для всех шаблонов единой точкой по выдаче данных.
То есть, они отправляют get запрос - получают нужные данные.
Тут есть свое преимущество - одна точка по выдаче данных, вся логика выбора проксей, url, авторизационных данных в одном месте - конфликтов не будет.
Шаблон упал - вовремя не ответил, что работу закончил, - не проблема, выдадим данные другому.
Но опять - это надо немного покодить, подебажить и опубликовать такое свое приложение.
Ну и править и перепубликовать его каждый раз, когда логику захотите изменить.
(Тут тоже есть свои читы, как-нибудь, может и расскажу, как быстро писать такие приложения за 15 минут, но не в этой статье)

4. есть и еще варианты, но везде будет логика и дополнительный код, код, код...

Все это понятно. Все это можно реализовать, долго, упорно и методично прописывая и отлавливая ошибки. Но эта рутина... и время на нее тратить так не хочется...


== Всего этого геморроя можно избежать?

Так к чему все эти рассуждения?
Есть вариант "весь-этот-код-код-код" уместить в один кубик. В один get-запрос.

Представьте, что у нас есть сторонний сервис, куда мы одним запросом можем добавить данные, а другим запросом когда понадобится - данные забрать?
И не париться об уникальности, производительности, ошибок параллельных взятий данных?
Тогда шаблон мы делаем с простой, как шпала, логикой - на входе сделал get-запрос, данные линейно отработал, на выходе сделал post-запрос, вернул результаты. А в случае ошибки - добавляем кубик Bad-End и после него сделать запрос - вернуть свои данные в очередь.
И еще подсластим - если этот сервис надо будет развернуть только один раз, а использовать его смогут разные шаблоны из разных проектов? В смысле, не надо будет его настраивать под каждый проект?

Это могло бы выглядеть так...
- нужен прокси? - не парься, сделай гет запрос и получи что надо
- нужен url для парсинга? - сделай запрос, и не парься об остальной логике.
- получил результаты? - отправь запрос с ними, и не парься насчет сохранения.
- получил ошибку? - не парься, отправь запрос, твою работу продолжит другой экземпляр.

Другими словами, логика работы может выглядеть так:
- шаблон далет get-запрос и получает json с данными для работы - прокси, url, иные уникальные данные.
- отрабатывает свою задачу
- когда сделал - отправляет post запрос с тем же json с результатами. Вывалился с ошибкой - просто делает другой запрос, и работа продолжается.
Не нравится json? можно слать и получать простой текст, просто url передать к примеру.

И тогда:
- дебажить и ловить ошибки - в разы проще,
- логику править в разы проще,
- шаблоны писать легче, быстрее и в разы проще,
- вероятно, жрут системных ресурсов они меньше, и работают лучше,
- ну и другие выгоды, представьте сами)


== В чем идея?

Ну а теперь поговорим серьезно.
Вся эта прелюдия может способствовать одной цели - чтобы вы посмотрели в сторону одного из ключевых элементов микросервисных комбайнов - брокеров сообщений.

В двух словах, в брокере есть очереди сообщений. Он позволяет создавать очереди чего угодно - сообщений из текста (например, url, прокси, html спаршенной страницы), json (вот готовая задача, десериализуй в класс с нужными полями), картинок и так далее.
Один шаблон может добавлять сообщения, другие - получать и обрабатывать их. Даже запущенные в зенке на разных машинах.
Большая гибкость в настройках очереди - могут быть постоянные (не теряться с перезагрузкой), или временные пока жив хоть один получатель сообщений, сообщения можно выдавать только в одни руки или каждому подписчику, и так далее. Плюс там еще внутри много дополнительных функций, вроде маршрутизации и эксклюзивности сообщений, но это не тема данной статьи, легко посмотреть на ютубе.
134132

В качестве брокеров обычно используют или Kafka, или RabbitMq, или делают его из Redis. У каждого свои плюсы и минусы. Самый простой и легкий в настроке и понимании - Rabbit, Kafka сложнее, Redis же просто специфичен с точки зрения прикручивания к зенке.

За основу возьмем Rabbit. Из коробки он с зенкой работать не будет, потому что должно быть постоянное подключение к серверу. А в зенке - кубики! Или отдельный проект Visual Studio, но все равно - проект или кубик завершился, коннект порвался, Rabbit на него среагировал как потеря клиента. Костыльный вариант для продвинутых пользователей - сохранять и передавать подключение через контекст, но это сложно дебажить, и не будет оно работать в Project Maker.

Поэтому... я просто написал свой микросервис, который позволяет зенке работать с RabbitMq через один кубик - get/post запрос. Вот так просто)
Ну и до кучи упаковал все в докер и добавил подробное описание, как это использовать.
Да, в одной из своих статей я показывал уже, как выполнять код на питоне в зенке, ну а чО, теперь и микросервисы завезем)


Как это работает?

1. Поднять RabbitMq или в докере, или получить бесплатную учетку на https://www.cloudamqp.com - минимальный тариф там бесплатен, получаете свой экземпляр RabbitMq.
2. Поднять мой микросервис RabbitMQ-HttpApi - https://github.com/weselow/RabbitMQ-HttpApi

Как запустить по шагам:

Вариант 1
- в докере одной командой.
Скачиваем из репозитория файл docker-compose.yml, кладем в папку.
docker-compose.yml:
services:
  rabbitmq:
    image: rabbitmq:3-management
    container_name: rabbitmq
    ports:
      - "5672:5672"   # AMQP
      - "15672:15672" # Management UI
    environment:
      RABBITMQ_DEFAULT_USER: viking01
      RABBITMQ_DEFAULT_PASS: viking01
      RABBITMQ_DEFAULT_VHOST: /
    healthcheck:
      test: ["CMD", "rabbitmqctl", "status"]
      interval: 10s
      timeout: 5s
      retries: 5
    volumes:
      - data:/var/lib/rabbitmq

  rabbitmq-httpapi:
    #build:
    #  context: ./RabbitMQ-HttpApi
    image: aweselow/rabbitmqhttpapi:latest
    container_name: rabbitmq-httpapi
    ports:
      - "80:5000"
    depends_on:
      rabbitmq:
        condition: service_healthy
    environment:
      RabbitMQ__Host: rabbitmq
      RabbitMQ__Port: 5672
      RabbitMQ__Username: viking01
      RabbitMQ__Password: viking01
      RabbitMQ__VirtualHost: /
      Api__AuthToken: YOUR_SUPER_SECRET_TOKEN
      Api__Port: 5000

volumes:
  data:
    driver: local
    driver_opts:
      type: 'none'
      o: 'bind'
      device: './data'
Запускаем командой:
Код:
docker compose up
Если ничего не меняли, то:
- сам микросервис RabbitMQ-HttpApi будет доступен по адресу: http://localhost:80 принять/получить сообщение
- *добавил Swagger - посмотреть потестировать запросы: http://localhost:80/swagger - токен только не забудьте добавить.
- админка RabbitMQ будет по адресу: http://localhost:15672, логин viking01, пароль viking01, на вкладке Queues and Streams будут ваши очереди сообщений. Можно также провалиться в очередь и посмотреть, какие там есть сообщения.

Вариант 2. Отдельно запустить приложение - релиз выложен на репозитории github в релизах: Release Первая версия · weselow/RabbitMQ-HttpApi
- скачать и распаковатьrabbitmq-httpapi-1.0-win64.zip
- отредактировать appsettings.json - комментарии что менять в тексте самого файла есть, или для любителей разбираться глубоко - подробное описание составил на гитхабе.
- запустить приложение и отдельно запустить RabbitMq - или где-то разворачиваете свой инстанс, или получаете бесплатный экземпляр на cloudamqp.com


== Как использовать?

1. Для авторизации используется токен в заголовках. Надо в запрос добавлять Authorization: Bearer YOUR_SUPER_SECRET_TOKEN

2. Чтобы добавить сообщение в очередь, отправьте запрос на /add/{queue},
где queue - имя очереди. Если ее нет, то она будет создана микросервисом автоматически.
например, http://127.0.0.1/add/us-proxies
http://127.0.0.1/add/url-todo
http://127.0.0.1/add/parsed-pages
134079
134080


3. Чтобы получить сообщение, отправьте запрос на /get/{queue}
Например, http://127.0.0.1/get/us-proxies
http://127.0.0.1/get/url-todo
http://127.0.0.1/add/parsed-pages

4. Сколько можно слать и получать сообщений в секунду? Миллион) Кролик рассчитан)
Проверить можно на тестовом шаблоне:
134081



Под капотом микросервиса - и отслеживание подключений, и восстановление коннекта, создание очередей при get add запросах, обработка ошибок и так далее.

Пробуйте и наслаждайтесь)



P.S. Рекомендую вообще поизучать возможности RabbitMq, там много что может жизнь упростить.

P.P.S. Репозиторий на гитхабе открыты, если захотите заняться дальше разработкой этой фишки, то велкам, комиттесь)
По такой аналогии можно и интеграцию с Kafka сделать, и с Redis)

P.P.S. В репо на гитхабе в RabbitMQ-HttpApi/Linux добавил инструкцию - заметки, как запускать как сервис в linux, если использовать его на отдельной виртуалке с Github runner'ом.
 

Вложения

Последнее редактирование:

seodamage

Client
Регистрация
08.09.2014
Сообщения
244
Благодарностей
76
Баллы
28
привет, интересная идея

если я правильно понял то вместо 80 порта можно назначить любой? UPD: да можно на любой порт

с перечисленными инструментами пока не довелось работать, не мог бы ты описать как будет выглядеть например работа по получению прокси? где они будут храниться ?

читал внимательно, но не понял как это будет выглядеть
 
Последнее редактирование:

seodamage

Client
Регистрация
08.09.2014
Сообщения
244
Благодарностей
76
Баллы
28
завелось данные пишутся)

логика такая что наотправлять туда инфы и она будет храниться в оперативке, до тех пор пока другим запросом не заберёшь эту инфу?
или он пишет эту инфу на диск, а не в оперативку?



134360
 
Последнее редактирование:

Viking01

Client
Регистрация
19.08.2017
Сообщения
238
Благодарностей
176
Баллы
43
привет, очереди создаются постоянные, то есть, при перезагрузке сервиса или кролика данные не потеряются.
Очереди эксклюзивные - то есть, сообщение выдается только в одни руки.
Они на вкладке Queues - там можно и состав очереди посмотреть кстати.
Наверно надо записать видосик как оно работает?
 
  • Спасибо
Реакции: seodamage

seodamage

Client
Регистрация
08.09.2014
Сообщения
244
Благодарностей
76
Баллы
28
Наверно надо записать видосик как оно работает?
привет, было бы здорово. я посмотрел чутка на ютубе нашёл для старта такую инфу


из видосов стало ясно что там есть разные штуки, по типу отправлять кому то одному всю инфу, либо например будет 3 человека и у каждого будет что то типо своей очереди, по крайней мере я так это понял, опыта мало с этими решениями, могу ошибаться.

в твоём решении можно так делать или там только одна очередь для всех? всмысле что клиентов может быть сколько угодно и каждый кто запрашивает из этой очереди как бы берёт эту запись с удалением, если я правильно понял то можно ещё сделать чтобы у каждого клиента была своя очередь при этом id у этой очереди будет одинаков для всех и данные не будут удаляться если какой то 1 клиент запросил их, а будут удаляться для конкретного клиента.


ещё не понятно как сделать например такую штуку: есть 10 прокси, клиенты обращаются на эндпоинт очереди, и после каждого обращения получают по 1 строке, после того как они получают строку она удаляется из очереди. а как потом обратно вернуть эти 10 проксей, чтобы например шабы зенки работали с пулом из 10 проксей, и получалось что прокси как бы перебираются по очереди, до тех пор пока список не закончится, а потом список бы снова содержал эти 10 проксей. на сколько я понял так можно сделать если после получения прокси - сразу опять её добавлять в эту же очередь, а можно как то по другому сделать?

также пока не понятно как делать очереди durable и transient

и пока не получилось поработать с json, менял вот тут
134363


но почему то всё равно текст возвращается, а не json



в целом вопросов много, и сама статья интересная, буду голосовать за неё)
 
Последнее редактирование:

Viking01

Client
Регистрация
19.08.2017
Сообщения
238
Благодарностей
176
Баллы
43
в твоём решении можно так делать или там только одна очередь для всех? всмысле что клиентов может быть сколько угодно и каждый кто запрашивает из этой очереди как бы берёт эту запись с удалением
очередей может быть сколько угодно.
/add/{queue} или /get/{queue} - если очереди нет, то он создает ее.
когда клиент запрашивает сообщение, то он забирает сообщение с удалением.
и потом можешь спокойно ее возвращать обратно в эту же очередь.
То есть, если берем прокси, то он взял прокси с удалением, поработал с ним, потом вернул обратно в очередь, и тогда проксю потом подхватит следующий клиент.
Или он может возвращать в специальную очередь для проверки проксей - проверяльщик как проверит прокси на живость, снова
вернет ее в очередь для работы.

а как потом обратно вернуть эти 10 проксей, чтобы например шабы зенки работали с пулом из 10 проксей, и получалось что прокси как бы перебираются по очереди
а вот как раз эта штука для таких задач)
в начале шаблона - получаем прокси запросом на /get/{имя-очереди-с проксями}
в конце шаблона - добавляем отработанную проксю в эту очередь /add/{имя-очереди-с проксями}
и также кубик ошибки не забываем добавить - в случае ошибки также возвращает проксю запросом в очередь /add/{имя-очереди-с проксями}

но почему то всё равно текст возвращается, а не json
а какое добавляешь сообщение? тоже json?

в целом вопросов много, и сама статья интересная
спасибо) давай потихоньку по шагам отвечу)
 
  • Спасибо
Реакции: seodamage

seodamage

Client
Регистрация
08.09.2014
Сообщения
244
Благодарностей
76
Баллы
28
а какое добавляешь сообщение? тоже json?
тупанул, думал что будет формат вывода json и пихал туда обычный текст, сейчас попробовал с json всё получилось


давай потихоньку по шагам отвечу)
подскажи плиз как делать очереди durable и transient? по дефолту получается очередь делается durable и храниться на винче, тестовый комп и винч старые, даже не ссдшник, интересно какая будет производительность на оперативке)


как удалить очереди чтобы они не копились, например для парсинга 2гиса я создам временную очередь с произвольным именем с преффиксом 2gis, а после парсинга она мне уже не понадобится, как её удалить(запросом) чтобы она исчезла из веб интерфейса?
 
Последнее редактирование:

seodamage

Client
Регистрация
08.09.2014
Сообщения
244
Благодарностей
76
Баллы
28
сегодня заметил что сервис падает. я сделал 2 проекта, чекнуть паб прокси, чекал на своём хосте(nginx в docker-compose ip:port без домена)
влепил в 50 потоков выполняться, но почему то экшн получения строки из rabbitmq вываливался в ошибку по таймауту. при этом сама веб панель rabbitmq работала, но показывала всё время статично какие то данные, будто они локнулись как то. рестарт контейнера помогал, но опять запустив в 50 потоков всё тушилось.

я не знаю может это с конкретно моим компом трабл какой то, я его вообще достал чисто для того чтобы туда n8n влепить, он работает всё ок. потом ужедобавил туда твой контейнер с микросервисом. на том компе стоит alpine linux, судя по htop ресурсы есть и в целом в момент этого фриза другие сервисы работают, например n8n.

подскажи плиз в чём может быть проблема?

134393




у меня есть и другие компы, помощнее, но этот в целом должен подходить для такой задачи, там 4 гига оперы и 2 ядра, в целом помощнее душной впски и без виртуализации
 

seodamage

Client
Регистрация
08.09.2014
Сообщения
244
Благодарностей
76
Баллы
28
на всякий случай шабы которые делал и запускал в 50 потоков
 

Вложения

Кто просматривает тему: (Всего: 0, Пользователи: 0, Гости: 0)