Symfony messenger
Компонент Messenger позволяет управлять асинхронным кодом в Symfony.
В нем заложен ряд архитектурных решений, которые делают этот компонент очень гибким в настройке.
С некоторыми из них мы скоро познакомимся.
Пример, конфигурации с транспортом amqp может выглядеть так:
Отладка
С помощью команды можно посмотреть как сконфигурирован транспорт. Посмотреть всю цепочку обработки сообщения перед отправкой и обработкой.
Messenger использует паттерн - "Цепочка обязанностей" для обработки сообщений.
Здесь мы видим, что наши кастомные middleware`ы были добавлены в середину цепочки между служебными и core обработчиками.
Служебные отвечают за перенаправление сообщения обработанного с ошибкой в другую fallback очередь, за механизм повтора обработки, за выполнение определенных сообщений после того как обработается текущее и т.д.
Core обработчики отвечают за непосредственно отправку сообщения и конечную обработку
Использование
Чтобы начать использование messenger нужно создать DTO:
id;
}
public function getContext(): array
{
return $this->context;
}
}
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
И затем handler:
commentRepository->find($message->getId());
if (!$comment) {
return;
}
if (2 === $this->spamChecker->getSpamScore($comment, $message->getContext())) {
$comment->setState('spam');
} else {
$comment->setState('published');
}
$this->entityManager->flush();
}
}
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
Допустимо создавать множество handler`ов на одну DTO. В этом случае вызовутся все обработчики.
Отправка сообщения
В простом виде для того чтобы отправить сообщение нужно "заинжектить" транспорт в конструкторе и затем отправить через него DTO.
commentRepository->find(1);
$this->bus->dispatch(new CommentMessage($comment->getId(), []));
return $this->redirectToRoute('conference', ['slug' => $conference->getSlug()]);
}
}
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
В этом случае сообщение пройдет всю цепочку обработки через все middleware`ы.
В messenger.middleware.send_message обработчике сообщение будет сериализовано и отправлено в очередь.
В более сложных вариантах использования может потребоваться сделать какую-то обработку сообщения.
Например:
- Добавить доп данные в сообщение
- Прервать выполнение сообщения
- Поменять трансфер или очередь
- Создать ttl очередь и перенаправить в нее сообщение
- и т.д.
В этом случае нужно обернуть DTO в спец объект Envelope, который помимо самого сообщения может содержать вспомогательную информацию - объекты имплементирующие интерфейс Symfony\Component\Messenger\Stamp\StampInterface. Например:
key;
}
public function setKey(?string $key): self
{
$this->key = $key;
return $this;
}
public function getTtl(): ?int
{
return $this->ttl;
}
public function setTtl(?int $ttl): self
{
$this->ttl = $ttl;
return $this;
}
public function getUseLock(): ?bool
{
return $this->useLock;
}
public function setUseLock(?bool $useLock): self
{
$this->useLock = $useLock;
return $this;
}
public function getParallel(): ?bool
{
return $this->parallel;
}
public function setParallel(?bool $parallel): self
{
$this->parallel = $parallel;
return $this;
}
public function getAfterParallelMessage(): ?object
{
return $this->afterParallelMessage;
}
public function setAfterParallelMessage(?object $afterParallelMessage): self
{
$this->afterParallelMessage = $afterParallelMessage;
return $this;
}
public function getTimestamp(): ?int
{
return $this->timestamp;
}
public function setTimestamp(?int $timestamp): self
{
$this->timestamp = $timestamp;
return $this;
}
public function getNeedResolveOrderConflict(): ?bool
{
return $this->needResolveOrderConflict;
}
public function setNeedResolveOrderConflict(?bool $needResolveOrderConflict): self
{
$this->needResolveOrderConflict = $needResolveOrderConflict;
return $this;
}
public function getMessages(): array
{
return $this->messages;
}
public function addMessage(object $message): self
{
$this->messages[] = $message;
return $this;
}
public function popMessage(): ?object
{
return array_shift($this->messages);
}
public function getLockKey(): string
{
$this->assertHasKey();
return $this->key . ':lock';
}
public function getTimestampKey(): string
{
$this->assertHasKey();
return $this->key . ':timestamp';
}
public function getOrderConflictKey(): string
{
$this->assertHasKey();
return $this->key . ':order_conflict';
}
public function getParallelKey(): string
{
$this->assertHasKey();
return $this->key . ':parallel';
}
public function assertHasKey(): void
{
if (!$this->key) {
throw new \\InvalidArgumentException('Key not found');
}
}
}
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
Отправить штамп можно так:
key('user_1')
->lock();
$envelope = Envelope::wrap($data);
$envelope = $envelope->with($context);
$this->bus->dispatch($envelope);
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
Эта информация будет доступна в middleware и ее можно использовать, например так:
last(ContextStamp::class);
if (!$stamp || !$stamp->getUseLock()) {
return $stack->next()->handle($envelope, $stack);
}
$receivedStamp = $envelope->last(ReceivedStamp::class);
// Если воркер обработал сообщение, то снимаем lock. Даже если код упал.
if ($receivedStamp) {
try {
return $stack->next()->handle($envelope, $stack);
} finally {
$this->client->unlock($stamp->getLockKey());
}
}
// Если отправка залочена, то прекращаем обработку
if (!$this->client->lock($stamp->getLockKey())) {
return $envelope;
}
return $stack->next()->handle($envelope, $stack);
}
}
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
В начале middeware есть проверка на наличие нашего штампа. Если его нет, то обработчик пропускает и запускается следующий элемент цепочки.
Далее мы проверяем наличие ReceivedStamp.
Если его нет, значит мы находимся на этапе отправки.
Мы пробуем поставить блокировку в редисе и если нам удалось, то идем дальше, иначе останавливаем обработку сообщения.
Если же ReceivedStamp есть, то значит мы в консамере. Здесь нам нужно прокинуть сообщение дальше по цепочке и после обработки снимаем блокировку. Даже в случае ошибки. Чтобы не заблочить выполнение других сообщений.
Получаем сообщение
Для того чтобы получить сообщение из очереди и обработать его, нужно выполнить команду
Команда будет работать в режиме демона. То есть работать в бесконечном цикле, засыпая на каждой итерации.
На каждой итерации messenger будет проходиться по всем сконфигурированным транспортам, которые должны обработаться командой.
В каждом транспорте компонент в цикле проходит все сконфигурированные очереди. То есть читает сообщения из первой очереди. Как только все сообщения в первой очереди обработаны, переходим к следующей очереди и т. д. В бесконечном цикле читаем сообщения из очередей по кругу.
Для локальной разработки чтобы при переходе с ветки на ветку код обновлялся в демоне было принято решение добавить опцию --time-limit=10 в команду консамера, чтобы каждые 10 секунд демон перезагружался
Наша реализация
Для более удобного использования было принято решения сделать сервис для работы с messenger.
grabContextStamp();
$data = $context->popMessage();
if (!$data) {
throw new \\InvalidArgumentException('Message not found!!!');
}
$envelope = Envelope::wrap($data);
$envelope = $envelope->with($context);
$this->bus->dispatch($envelope);
$this->context = null;
}
public function key(string $key): self
{
$context = $this->grabContextStamp();
$context->setKey($key);
return $this;
}
public function ttl(int $ttl): self
{
$context = $this->grabContextStamp();
$context->setTtl($ttl);
return $this;
}
public function lock(): self
{
$context = $this->grabContextStamp();
$context->setUseLock(true);
return $this;
}
public function parallel(): self
{
$context = $this->grabContextStamp();
$context->setParallel(true);
return $this;
}
public function ifNewer(int $timestamp): self
{
$context = $this->grabContextStamp();
$context->setTimestamp($timestamp);
return $this;
}
public function resolveOrderConflict(): self
{
$context = $this->grabContextStamp();
$context->setNeedResolveOrderConflict(true);
return $this;
}
public function addMessage(object $message): self
{
$context = $this->grabContextStamp();
$context->addMessage($message);
return $this;
}
public function setAfterParallelMessage(object $message): self
{
$context = $this->grabContextStamp();
$context->setAfterParallelMessage($message);
return $this;
}
private function grabContextStamp(): ContextStamp
{
return $this->context = ($this->context ?? new ContextStamp());
}
}
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
Он реализован по паттерну builder, где все состояние хранится в объекте контексте, который всегда очищается после вызова метода send
В сервисе заложен набор фич с реализацией определенных кейсов работы с очередями, которые неоднократно применялись на проектах.
Кейсы:
Задача должна выполнить 1 раз. Следующая может начаться только если 1 закончилась.
В данном случае ставится лок в редис. Если лок есть, то новое сообщение игнорируется
superBus
->key('user_1') // Ключ запроса
->lock() // Флаг блокировки. Если стоит, то применится механизм. Ключ будет построен на основе key
->addMessage(new Test('asdf'))
->send();
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
Нужно игнорировать сообщения дата отправки которых меньше, чем те которые уже получили
В данном случае в редис пишется метка времени последнего отправленного сообщения
Следующее сообщение будет сравниваться с установленным значением. Если значение меньше, сообщение игнорится.
Если больше, то отправляем и ставим новое значение в редис.
superBus
->key('user_1')
->ifNewer(121)
->addMessage(new Test('asdf'))
->send(); // Отправится, так как первое
$this->superBus
->key('user_1')
->ifNewer(120)
->addMessage(new Test('asdf'))
->send(); // Будет проигнорировано
$this->superBus
->key('user_1')
->ifNewer(122)
->addMessage(new Test('asdf'))
->send(); // Отправится, так как оно более свежее
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
Отправка сообщения, которое должно обработаться через 23 секунды
В данном случае добавляется stamp с delay меткой. Messenger сам поймет что сообщение ttl. Создаст новую ttl ветку и отправит туда сообщение.
superBus
->ttl(23)
->addMessage(new Test('asdf'))
->send();
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
Гарантирование порядка выполнения сообщений
1. 1 очередь и много воркеров. 2 сообщение может обработаться быстрее более быстрым воркером чем первое
2. Сообщения в разных очередях и разные воркеры. Когда есть 2 зависимые задачи. Например, в одну очередь отправляем сообщение о добавлении продукта, а в другую о добавлении настроек продуктов.
В данном случае в редисе ставится лок на отправку сообщений.
Если ключ уже залочен, то бедет создана лок очередь и сообщения будут отпарвлены туда.
Как только базовое сообщение обработается, воркер заберет остальные сообщения из лок очереди и выполнит их в этом же потоке.
Если базовое сообщение не успеет обработаться в течение часа, сообщения будет перенаправлено в sync очередь, где обработаются синхронно 1 воркером
superBus
->key('product_1')
->resolveOrderConflict()
->addMessage(new Product('111'))
->send();
$this->superBus
->key('product_1')
->resolveOrderConflict()
->addMessage(new ProductSettings('111'))
->send();
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
Цепочка задач. Когда нужно гарантировать порядок выполнения задач и все задачи заранее известны.
В данном случае в rabbit улетят все сообщения разом и обработаются в такой же последовательности
superBus
->addMessage(new Product('111'))
->addMessage(new Product('222'))
->addMessage(new Product('333'))
->addMessage(new Product('444'))
->send();
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
Запустить задачи параллельно
В данном случае в rabbit улетят все сообщения отдельно. Последовательность обработки не гарантирована
superBus
->parallel()
->addMessage(new Product('111'))
->addMessage(new Product('222'))
->addMessage(new Product('333'))
->addMessage(new Product('444'))
->send();
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
Выполнить задачу после выполнения ряда параллельных задач
В данном случае в rabbit улетят все параллельные сообщения отдельно. Последовательность обработки не гарантирована.
После того как все сообщения выполнятся будет снята блокировка в редисе и выполнится последнее сообщение
superBus
->key('user_2')
->parallel()
->addMessage(new Test('111'))
->addMessage(new Test('222'))
->addMessage(new Test('333'))
->addMessage(new Test('444'))
->setAfterParallelMessage(new Test('555'))
->send();
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>