No description
  • Go 99%
  • Dockerfile 1%
Find a file
Липатов Артём Сергеевич d2944534bd Dev
2026-03-16 16:09:44 +00:00
cmd/worker Dev 2026-03-16 16:09:44 +00:00
contracts Dev 2026-03-16 13:11:41 +00:00
ent Dev 2026-03-16 13:11:41 +00:00
internal Dev 2026-03-16 13:11:41 +00:00
migrations Dev 2026-03-16 16:09:44 +00:00
ml-mock Worker 2026-03-15 11:17:06 +00:00
schema Dev 2026-03-16 13:11:41 +00:00
test/pub add minio 2026-03-15 15:01:31 +00:00
testdata Dev 2026-03-15 18:23:28 +00:00
.env.example add minio 2026-03-15 15:01:31 +00:00
.gitignore Dev 2026-03-16 13:11:41 +00:00
.gitlab-ci.yml Ci Fix 2026-03-15 14:41:12 +00:00
.golangci.yml Worker 2026-03-15 11:17:06 +00:00
docker-compose.yml Dev 2026-03-16 13:11:41 +00:00
Dockerfile Worker 2026-03-15 09:11:48 +00:00
Dockerfile.ml-mock init 2026-03-14 20:21:29 +03:00
e2e_test.go Worker 2026-03-15 11:17:06 +00:00
go.mod Dev 2026-03-16 13:11:41 +00:00
go.sum Dev 2026-03-16 13:11:41 +00:00
policy.yml [skip ci] Initial commit 2026-03-14 16:28:17 +00:00
README.md Dev 2026-03-16 13:11:41 +00:00

Worker

Получает данные о постах, отправляет их ml, принимает обработанные данные и записывает в базу.

Как работает?

Worker принимает сообщение из nats от сервиса crawler, сообщение содержит тип события и данные.

Типы событий:

  1. Post
  2. Error

Обработка сообщений, организация отправки и основной функционал сервиса сосредоточен в procerssor'e.

В зависимости от типа события он его обрабатывает.

Если post - пытается распарсить json с данными (а именно с mention_id), берет сам пост из minio, получает все необходимые данные и отдает их модели. Для взаимодействия с ml существует ml-client. После обработки моделью, worker записывает полученные данные в бд.

Если error - пытается распарсить json, и после получения данных об ошибке записывает его в бд.

Ретрай

Парсинг происходит в 2 этапа:

  1. спарсить тип события
  2. спарсить данные

Если спрасить не получилось - выходим из функции. Из битого json-a ничего не получим. А если распрасить получилось, но ошибка в сервисе все-таки возникла, мы пытаемся выполнить действие еще раз. То есть если worker'y придет сообщение от crawler'a, он его обработает, но не сможет записать в бд - он попытается сделать это снова. За это отвечает consumer.

Consumer

consumer — компонент, который читает сообщения из nats и передаёт их processor-у. Он решает, что делать с результатом обработки: подтвердить сообщение или вернуть в очередь.

Consumer сам тянет сообщения из nats, а не получает их push-ом. Вызывает Fetch(batch_size, timeout). Если очередь пуста — ждёт и повторяет. Если worker занят — не тянет новые. Нет риска переполнения.

При запуске создаётся несколько горутин (по умолчанию 4). Каждая независимо тянет сообщения из одной и той же подписки. nats гарантирует, что одно сообщение попадёт только в одну горутину.

После того как processor обработал сообщение, consumer смотрит на результат:

  • Processor вернул nil → ACK. Сообщение удаляется из NATS. Обработка прошла успешно, либо сообщение было битым и ретраить его бессмысленно.

  • Processor вернул error → NAK. Сообщение возвращается в NATS и будет доставлено повторно через некоторое время.

Graceful shutdown

При получении сигнала завершения (SIGINT/SIGTERM) consumer перестаёт тянуть новые сообщения. Необработанные сообщения остаются в очереди и будут обработаны при следующем запуске