А вы знали, что брокеры сообщений решают проблему перегруженных многопоточных шаблонов?

Viking01

Client
Регистрация
19.08.2017
Сообщения
240
Благодарностей
180
Баллы
43
Зеннопостер + брокер сообщений, или как упростить многопоточные шаблоны раза в два.

134097

Статья ориентирована на продвинутых пользователей, которые представляют, что такое докер и микро-сервисы.
Показывает, как упростить многопоточные шаблоны в несколько раз и сделать логику простой и линейной, как шпала.


... Бывало ли у вас так, что задумываете написать шаблон...
... и в голове появляется некое сопротивление, что вот опять надо будет тупые циклы прописывать...
... на проверку прокси, на то, на се, на такие-то ситуации, на разные ошибки...
... ой как в лом делать это в сотый раз?..


Работать с зенкой начал лет 8 назад. Софт предпочитаю писать сам, под конкретный проект и стек, но у меня скопилась масса наработок по ней, например, по использованию микросервисной архитектуры, и наверно, пришла пора делиться)

== Что раздражает больше всего?

Берем многопоточный шаблон. Например, спарсить сайт. Или нагулять профили. Или поведенческие покачать.
В нем надо реализовать блоки логики.
Например, прописать логику выбора прокси - выбирать прокси только определенной страны (наш сайт в зависимости от страны может менять язык страницы и валюту).
Прописать логику выбора url для парсинга - чтобы не пересеклись разные экземпляры, в многопотоке.
Прописать логику сохранения результатов - предположим, результаты надо предварительно обрабатывать, не совать же чистый html в большем объеме в базу, нам же данные нужны.
Какая-то дополнительная логика - вдруг надо еще между разными экземплярами синхронизировать, например, отпечатки устройств, или какие-то специфические настройки.

Ну, шаблон уже выглядит не маленьким, да?


== И что? Все муторно-напряжно, но решаемо же.

Есть несколько типовых вариантов решения такой задачи:

1. Синхронизация через базу - ставим тот же sqlLite или mysql, заводим таблицы, и предполагаем, что шаблоны будут стучаться в базу, отмечать, что взяли задание, отрабатывать и сохранять результаты обратно в базу.

Решение рабочее, но со своими минусами:
- параллельный доступ в базу - надо предусмотреть ситуацию в коде, когда два шаблона одновременно постучались в базу - чтобы они не взяли одну и ту же строку задания.
- аналогично и с сохранением.
- в зенке писать кучу кода - или в кубиках, или отдельным проектом с использованием ef - проще, но все равно куча кода.
- предусмотреть ошибки - например, для sqllite чтобы не словить database locked - эти ошибки один фиг надо дополнительным кодом обрабатывать, даже если wal включили.
- если шаблон рухнул в процессе исполнения - также новые блоки кода сохранить его результаты или отметить, что эти данные можно снова взять в работу.
Итого - дополнительный код, код, код...

2. Через родные таблицы и списки, которые привязываются к файликам.

Ну прокси можно брать из файла - взять строчку с удалением, добавить в конец очереди
Аналогично с заданиями - брать строчку таблицы, добавлять в конец или в отдельную таблицу, использовать маркеры статуса задания или маркеры какой экземпляр взял.
Но тоже не все так просто -
- опять, предусматривать ошибки, что шаблон может рухнуть на любой стадии, что тогда делать с данными?
- а если вы парсите сайт на 300 - 500 тысяч страниц, какого размера будут таблицы и списки?
- а если пока вы проверяете список на уникальность, кто-то возьмет да и добавит туда запись?
- а как убедиться, что шаблоны взяли уникальные данные, не глюкнули и не хапнули одно и то же? Например, если для парсинга вы используете авторизацию на сайте, и два шаблона зайдут с одним и тем же акком с разных ip, то легко и бан всего акка можно словить.
- продвинутые пользователи могут передавать еще словари через контекст или использовать глобальные переменные, но мы все равно упираемся в дополнительный код.
Итого - опять, всю логику в код, код, код...

3. Отдельным приложением с rest api.

Есть и такой вариант - написать отдельное микро-приложение, которое будет для всех шаблонов единой точкой по выдаче данных.
То есть, они отправляют get запрос - получают нужные данные.
Тут есть свое преимущество - одна точка по выдаче данных, вся логика выбора проксей, url, авторизационных данных в одном месте - конфликтов не будет.
Шаблон упал - вовремя не ответил, что работу закончил, - не проблема, выдадим данные другому.
Но опять - это надо немного покодить, подебажить и опубликовать такое свое приложение.
Ну и править и перепубликовать его каждый раз, когда логику захотите изменить.
(Тут тоже есть свои читы, как-нибудь, может и расскажу, как быстро писать такие приложения за 15 минут, но не в этой статье)

4. есть и еще варианты, но везде будет логика и дополнительный код, код, код...

Все это понятно. Все это можно реализовать, долго, упорно и методично прописывая и отлавливая ошибки. Но эта рутина... и время на нее тратить так не хочется...


== Всего этого геморроя можно избежать?

Так к чему все эти рассуждения?
Есть вариант "весь-этот-код-код-код" уместить в один кубик. В один get-запрос.

Представьте, что у нас есть сторонний сервис, куда мы одним запросом можем добавить данные, а другим запросом когда понадобится - данные забрать?
И не париться об уникальности, производительности, ошибок параллельных взятий данных?
Тогда шаблон мы делаем с простой, как шпала, логикой - на входе сделал get-запрос, данные линейно отработал, на выходе сделал post-запрос, вернул результаты. А в случае ошибки - добавляем кубик Bad-End и после него сделать запрос - вернуть свои данные в очередь.
И еще подсластим - если этот сервис надо будет развернуть только один раз, а использовать его смогут разные шаблоны из разных проектов? В смысле, не надо будет его настраивать под каждый проект?

Это могло бы выглядеть так...
- нужен прокси? - не парься, сделай гет запрос и получи что надо
- нужен url для парсинга? - сделай запрос, и не парься об остальной логике.
- получил результаты? - отправь запрос с ними, и не парься насчет сохранения.
- получил ошибку? - не парься, отправь запрос, твою работу продолжит другой экземпляр.

Другими словами, логика работы может выглядеть так:
- шаблон далет get-запрос и получает json с данными для работы - прокси, url, иные уникальные данные.
- отрабатывает свою задачу
- когда сделал - отправляет post запрос с тем же json с результатами. Вывалился с ошибкой - просто делает другой запрос, и работа продолжается.
Не нравится json? можно слать и получать простой текст, просто url передать к примеру.

И тогда:
- дебажить и ловить ошибки - в разы проще,
- логику править в разы проще,
- шаблоны писать легче, быстрее и в разы проще,
- вероятно, жрут системных ресурсов они меньше, и работают лучше,
- ну и другие выгоды, представьте сами)


== В чем идея?

Ну а теперь поговорим серьезно.
Вся эта прелюдия может способствовать одной цели - чтобы вы посмотрели в сторону одного из ключевых элементов микросервисных комбайнов - брокеров сообщений.

В двух словах, в брокере есть очереди сообщений. Он позволяет создавать очереди чего угодно - сообщений из текста (например, url, прокси, html спаршенной страницы), json (вот готовая задача, десериализуй в класс с нужными полями), картинок и так далее.
Один шаблон может добавлять сообщения, другие - получать и обрабатывать их. Даже запущенные в зенке на разных машинах.
Большая гибкость в настройках очереди - могут быть постоянные (не теряться с перезагрузкой), или временные пока жив хоть один получатель сообщений, сообщения можно выдавать только в одни руки или каждому подписчику, и так далее. Плюс там еще внутри много дополнительных функций, вроде маршрутизации и эксклюзивности сообщений, но это не тема данной статьи, легко посмотреть на ютубе.
134132

В качестве брокеров обычно используют или Kafka, или RabbitMq, или делают его из Redis. У каждого свои плюсы и минусы. Самый простой и легкий в настроке и понимании - Rabbit, Kafka сложнее, Redis же просто специфичен с точки зрения прикручивания к зенке.

За основу возьмем Rabbit. Из коробки он с зенкой работать не будет, потому что должно быть постоянное подключение к серверу. А в зенке - кубики! Или отдельный проект Visual Studio, но все равно - проект или кубик завершился, коннект порвался, Rabbit на него среагировал как потеря клиента. Костыльный вариант для продвинутых пользователей - сохранять и передавать подключение через контекст, но это сложно дебажить, и не будет оно работать в Project Maker.

Поэтому... я просто написал свой микросервис, который позволяет зенке работать с RabbitMq через один кубик - get/post запрос. Вот так просто)
Ну и до кучи упаковал все в докер и добавил подробное описание, как это использовать.
Да, в одной из своих статей я показывал уже, как выполнять код на питоне в зенке, ну а чО, теперь и микросервисы завезем)


Как это работает?

1. Поднять RabbitMq или в докере, или получить бесплатную учетку на https://www.cloudamqp.com - минимальный тариф там бесплатен, получаете свой экземпляр RabbitMq.
2. Поднять мой микросервис RabbitMQ-HttpApi - https://github.com/weselow/RabbitMQ-HttpApi

Как запустить по шагам:

Вариант 1
- в докере одной командой.
Скачиваем из репозитория файл docker-compose.yml, кладем в папку.
docker-compose.yml:
services:
  rabbitmq:
    image: rabbitmq:3-management
    container_name: rabbitmq
    ports:
      - "5672:5672"   # AMQP
      - "15672:15672" # Management UI
    environment:
      RABBITMQ_DEFAULT_USER: viking01
      RABBITMQ_DEFAULT_PASS: viking01
      RABBITMQ_DEFAULT_VHOST: /
    healthcheck:
      test: ["CMD", "rabbitmqctl", "status"]
      interval: 10s
      timeout: 5s
      retries: 5
    volumes:
      - data:/var/lib/rabbitmq

  rabbitmq-httpapi:
    #build:
    #  context: ./RabbitMQ-HttpApi
    image: aweselow/rabbitmqhttpapi:latest
    container_name: rabbitmq-httpapi
    ports:
      - "80:5000"
    depends_on:
      rabbitmq:
        condition: service_healthy
    environment:
      RabbitMQ__Host: rabbitmq
      RabbitMQ__Port: 5672
      RabbitMQ__Username: viking01
      RabbitMQ__Password: viking01
      RabbitMQ__VirtualHost: /
      Api__AuthToken: YOUR_SUPER_SECRET_TOKEN
      Api__Port: 5000

volumes:
  data:
    driver: local
    driver_opts:
      type: 'none'
      o: 'bind'
      device: './data'
Запускаем командой:
Код:
docker compose up
Если ничего не меняли, то:
- сам микросервис RabbitMQ-HttpApi будет доступен по адресу: http://localhost:80 принять/получить сообщение
- *добавил Swagger - посмотреть потестировать запросы: http://localhost:80/swagger - токен только не забудьте добавить.
- админка RabbitMQ будет по адресу: http://localhost:15672, логин viking01, пароль viking01, на вкладке Queues and Streams будут ваши очереди сообщений. Можно также провалиться в очередь и посмотреть, какие там есть сообщения.

Вариант 2. Отдельно запустить приложение - релиз выложен на репозитории github в релизах: Release Первая версия · weselow/RabbitMQ-HttpApi
- скачать и распаковатьrabbitmq-httpapi-1.0-win64.zip
- отредактировать appsettings.json - комментарии что менять в тексте самого файла есть, или для любителей разбираться глубоко - подробное описание составил на гитхабе.
- запустить приложение и отдельно запустить RabbitMq - или где-то разворачиваете свой инстанс, или получаете бесплатный экземпляр на cloudamqp.com


== Как использовать?

1. Для авторизации используется токен в заголовках. Надо в запрос добавлять Authorization: Bearer YOUR_SUPER_SECRET_TOKEN

2. Чтобы добавить сообщение в очередь, отправьте запрос на /add/{queue},
где queue - имя очереди. Если ее нет, то она будет создана микросервисом автоматически.
например, http://127.0.0.1/add/us-proxies
http://127.0.0.1/add/url-todo
http://127.0.0.1/add/parsed-pages
134079
134080


3. Чтобы получить сообщение, отправьте запрос на /get/{queue}
Например, http://127.0.0.1/get/us-proxies
http://127.0.0.1/get/url-todo
http://127.0.0.1/add/parsed-pages

4. Сколько можно слать и получать сообщений в секунду? Миллион) Кролик рассчитан)
Проверить можно на тестовом шаблоне:
134081



Под капотом микросервиса - и отслеживание подключений, и восстановление коннекта, создание очередей при get add запросах, обработка ошибок и так далее.

Пробуйте и наслаждайтесь)



P.S. Рекомендую вообще поизучать возможности RabbitMq, там много что может жизнь упростить.

P.P.S. Репозиторий на гитхабе открыты, если захотите заняться дальше разработкой этой фишки, то велкам, комиттесь)
По такой аналогии можно и интеграцию с Kafka сделать, и с Redis)

P.P.S. В репо на гитхабе в RabbitMQ-HttpApi/Linux добавил инструкцию - заметки, как запускать как сервис в linux, если использовать его на отдельной виртуалке с Github runner'ом.
 

Вложения

Последнее редактирование:

seodamage

Client
Регистрация
08.09.2014
Сообщения
250
Благодарностей
76
Баллы
28
привет, интересная идея

если я правильно понял то вместо 80 порта можно назначить любой? UPD: да можно на любой порт

с перечисленными инструментами пока не довелось работать, не мог бы ты описать как будет выглядеть например работа по получению прокси? где они будут храниться ?

читал внимательно, но не понял как это будет выглядеть
 
Последнее редактирование:

seodamage

Client
Регистрация
08.09.2014
Сообщения
250
Благодарностей
76
Баллы
28
завелось данные пишутся)

логика такая что наотправлять туда инфы и она будет храниться в оперативке, до тех пор пока другим запросом не заберёшь эту инфу?
или он пишет эту инфу на диск, а не в оперативку?



134360
 
Последнее редактирование:

Viking01

Client
Регистрация
19.08.2017
Сообщения
240
Благодарностей
180
Баллы
43
привет, очереди создаются постоянные, то есть, при перезагрузке сервиса или кролика данные не потеряются.
Очереди эксклюзивные - то есть, сообщение выдается только в одни руки.
Они на вкладке Queues - там можно и состав очереди посмотреть кстати.
Наверно надо записать видосик как оно работает?
 
  • Спасибо
Реакции: ParadoxRU и seodamage

seodamage

Client
Регистрация
08.09.2014
Сообщения
250
Благодарностей
76
Баллы
28
Наверно надо записать видосик как оно работает?
привет, было бы здорово. я посмотрел чутка на ютубе нашёл для старта такую инфу


из видосов стало ясно что там есть разные штуки, по типу отправлять кому то одному всю инфу, либо например будет 3 человека и у каждого будет что то типо своей очереди, по крайней мере я так это понял, опыта мало с этими решениями, могу ошибаться.

в твоём решении можно так делать или там только одна очередь для всех? всмысле что клиентов может быть сколько угодно и каждый кто запрашивает из этой очереди как бы берёт эту запись с удалением, если я правильно понял то можно ещё сделать чтобы у каждого клиента была своя очередь при этом id у этой очереди будет одинаков для всех и данные не будут удаляться если какой то 1 клиент запросил их, а будут удаляться для конкретного клиента.


ещё не понятно как сделать например такую штуку: есть 10 прокси, клиенты обращаются на эндпоинт очереди, и после каждого обращения получают по 1 строке, после того как они получают строку она удаляется из очереди. а как потом обратно вернуть эти 10 проксей, чтобы например шабы зенки работали с пулом из 10 проксей, и получалось что прокси как бы перебираются по очереди, до тех пор пока список не закончится, а потом список бы снова содержал эти 10 проксей. на сколько я понял так можно сделать если после получения прокси - сразу опять её добавлять в эту же очередь, а можно как то по другому сделать?

также пока не понятно как делать очереди durable и transient

и пока не получилось поработать с json, менял вот тут
134363


но почему то всё равно текст возвращается, а не json



в целом вопросов много, и сама статья интересная, буду голосовать за неё)
 
Последнее редактирование:

Viking01

Client
Регистрация
19.08.2017
Сообщения
240
Благодарностей
180
Баллы
43
в твоём решении можно так делать или там только одна очередь для всех? всмысле что клиентов может быть сколько угодно и каждый кто запрашивает из этой очереди как бы берёт эту запись с удалением
очередей может быть сколько угодно.
/add/{queue} или /get/{queue} - если очереди нет, то он создает ее.
когда клиент запрашивает сообщение, то он забирает сообщение с удалением.
и потом можешь спокойно ее возвращать обратно в эту же очередь.
То есть, если берем прокси, то он взял прокси с удалением, поработал с ним, потом вернул обратно в очередь, и тогда проксю потом подхватит следующий клиент.
Или он может возвращать в специальную очередь для проверки проксей - проверяльщик как проверит прокси на живость, снова
вернет ее в очередь для работы.

а как потом обратно вернуть эти 10 проксей, чтобы например шабы зенки работали с пулом из 10 проксей, и получалось что прокси как бы перебираются по очереди
а вот как раз эта штука для таких задач)
в начале шаблона - получаем прокси запросом на /get/{имя-очереди-с проксями}
в конце шаблона - добавляем отработанную проксю в эту очередь /add/{имя-очереди-с проксями}
и также кубик ошибки не забываем добавить - в случае ошибки также возвращает проксю запросом в очередь /add/{имя-очереди-с проксями}

но почему то всё равно текст возвращается, а не json
а какое добавляешь сообщение? тоже json?

в целом вопросов много, и сама статья интересная
спасибо) давай потихоньку по шагам отвечу)
 
  • Спасибо
Реакции: seodamage

seodamage

Client
Регистрация
08.09.2014
Сообщения
250
Благодарностей
76
Баллы
28
а какое добавляешь сообщение? тоже json?
тупанул, думал что будет формат вывода json и пихал туда обычный текст, сейчас попробовал с json всё получилось


давай потихоньку по шагам отвечу)
подскажи плиз как делать очереди durable и transient? по дефолту получается очередь делается durable и храниться на винче, тестовый комп и винч старые, даже не ссдшник, интересно какая будет производительность на оперативке)


как удалить очереди чтобы они не копились, например для парсинга 2гиса я создам временную очередь с произвольным именем с преффиксом 2gis, а после парсинга она мне уже не понадобится, как её удалить(запросом) чтобы она исчезла из веб интерфейса?
 
Последнее редактирование:

seodamage

Client
Регистрация
08.09.2014
Сообщения
250
Благодарностей
76
Баллы
28
сегодня заметил что сервис падает. я сделал 2 проекта, чекнуть паб прокси, чекал на своём хосте(nginx в docker-compose ip:port без домена)
влепил в 50 потоков выполняться, но почему то экшн получения строки из rabbitmq вываливался в ошибку по таймауту. при этом сама веб панель rabbitmq работала, но показывала всё время статично какие то данные, будто они локнулись как то. рестарт контейнера помогал, но опять запустив в 50 потоков всё тушилось.

я не знаю может это с конкретно моим компом трабл какой то, я его вообще достал чисто для того чтобы туда n8n влепить, он работает всё ок. потом ужедобавил туда твой контейнер с микросервисом. на том компе стоит alpine linux, судя по htop ресурсы есть и в целом в момент этого фриза другие сервисы работают, например n8n.

подскажи плиз в чём может быть проблема?

134393




у меня есть и другие компы, помощнее, но этот в целом должен подходить для такой задачи, там 4 гига оперы и 2 ядра, в целом помощнее душной впски и без виртуализации
 

seodamage

Client
Регистрация
08.09.2014
Сообщения
250
Благодарностей
76
Баллы
28
на всякий случай шабы которые делал и запускал в 50 потоков
 

Вложения

Viking01

Client
Регистрация
19.08.2017
Сообщения
240
Благодарностей
180
Баллы
43
веб панель rabbitmq работала, но показывала всё время статично какие то данные, будто они локнулись как то
привет, судя по тексту ошибки, не смог объявить очередь.
в целом кролик не требовательный к ресурсам,
но если для начала исключить проблему с кроликом - может попробуешь задеплоить кролик на отдельной машине или внешний сервис использовать https://www.cloudamqp.com ?
я так понял ты запускаешь через docker compose?
 

seodamage

Client
Регистрация
08.09.2014
Сообщения
250
Благодарностей
76
Баллы
28
может попробуешь задеплоить кролик на отдельной машине
привет, он запущен на отдельной машине 2cpu / 4 gb ram / sata hdd с alpine linux в docker-compose

134403


тут твой микросервис + n8n + nginx proxy manager




скорее всего будут траблы поставить его на эту машину т.к. в alpine нету libc

в целом я предпочитаю ставить всё в docker чтобы не захламлять систему

а на сервисы не хочется свои данные пушить, они же ведь для этого и существуют ("Если вы не платите за продукт, то вы и есть продукт.")



мне кажется что возможная проблема тут sata hdd, типо не вывозит 50 потоков, хотя как и писал раньше, тот же n8n работает исправно в момент фризов сервиса rabbitmq
 
Последнее редактирование:

Viking01

Client
Регистрация
19.08.2017
Сообщения
240
Благодарностей
180
Баллы
43
оке, гляну чуть позже.
50 потоков не должно быть проблемой, он на миллионы сообщений рассчитан)
докер - да хорошая тема)
 
  • Спасибо
Реакции: seodamage

Zedx

Client
Регистрация
12.06.2018
Сообщения
1 428
Благодарностей
968
Баллы
113
Спасибо за статью, очень годная тема. Пару лет назад вскользь прочитал про подобные сервисы, но как-то не было времени разобраться подробнее. А оказывается это очень крутая штука.
 
Последнее редактирование:

Кто просматривает тему: (Всего: 3, Пользователи: 0, Гости: 3)