- Go 99%
- Dockerfile 1%
| cmd/worker | ||
| contracts | ||
| ent | ||
| internal | ||
| migrations | ||
| ml-mock | ||
| schema | ||
| test/pub | ||
| testdata | ||
| .env.example | ||
| .gitignore | ||
| .gitlab-ci.yml | ||
| .golangci.yml | ||
| docker-compose.yml | ||
| Dockerfile | ||
| Dockerfile.ml-mock | ||
| e2e_test.go | ||
| go.mod | ||
| go.sum | ||
| policy.yml | ||
| README.md | ||
Worker
Получает данные о постах, отправляет их ml, принимает обработанные данные и записывает в базу.
Как работает?
Worker принимает сообщение из nats от сервиса crawler, сообщение содержит тип события и данные.
Типы событий:
- Post
- Error
Обработка сообщений, организация отправки и основной функционал сервиса сосредоточен в procerssor'e.
В зависимости от типа события он его обрабатывает.
Если post -
пытается распарсить json с данными (а именно с mention_id), берет
сам пост из minio, получает все необходимые данные и отдает их модели. Для взаимодействия с ml существует ml-client. После обработки моделью, worker записывает полученные данные в бд.
Если error - пытается распарсить json, и после получения данных об ошибке записывает его в бд.
Ретрай
Парсинг происходит в 2 этапа:
- спарсить тип события
- спарсить данные
Если спрасить не получилось - выходим из функции. Из битого json-a ничего не получим. А если распрасить получилось, но ошибка в сервисе все-таки возникла, мы пытаемся выполнить действие еще раз. То есть если worker'y придет сообщение от crawler'a, он его обработает, но не сможет записать в бд - он попытается сделать это снова. За это отвечает consumer.
Consumer
consumer — компонент, который читает сообщения из nats и передаёт их processor-у. Он решает, что делать с результатом обработки: подтвердить сообщение или вернуть в очередь.
Consumer сам тянет сообщения из nats, а не получает их push-ом. Вызывает Fetch(batch_size, timeout). Если очередь пуста — ждёт и повторяет. Если worker занят — не тянет новые. Нет риска переполнения.
При запуске создаётся несколько горутин (по умолчанию 4). Каждая независимо тянет сообщения из одной и той же подписки. nats гарантирует, что одно сообщение попадёт только в одну горутину.
После того как processor обработал сообщение, consumer смотрит на результат:
-
Processor вернул nil → ACK. Сообщение удаляется из NATS. Обработка прошла успешно, либо сообщение было битым и ретраить его бессмысленно.
-
Processor вернул error → NAK. Сообщение возвращается в NATS и будет доставлено повторно через некоторое время.
Graceful shutdown
При получении сигнала завершения (SIGINT/SIGTERM) consumer перестаёт тянуть новые сообщения. Необработанные сообщения остаются в очереди и будут обработаны при следующем запуске