import type {NextPage} from 'next' import Head from 'next/head' import Image from 'next/image' import styles from '../styles/Home.module.css' import '@fontsource/roboto/300.css'; import '@fontsource/roboto/400.css'; import '@fontsource/roboto/500.css'; import '@fontsource/roboto/700.css'; import {CopyBlock, github} from "react-code-blocks"; import {Table, TableBody, TableRow, TableCell, TableHead} from '@mui/material'; const Home: NextPage = () => { return (
Create Next App

Symfony messenger

Компонент Messenger позволяет управлять асинхронным кодом в Symfony.

В нем заложен ряд архитектурных решений, которые делают этот компонент очень гибким в настройке.

С некоторыми из них мы скоро познакомимся.

Пример, конфигурации с транспортом amqp может выглядеть так:

Отладка

С помощью команды можно посмотреть как сконфигурирован транспорт. Посмотреть всю цепочку обработки сообщения перед отправкой и обработкой.

Messenger использует паттерн - "Цепочка обязанностей" для обработки сообщений.

Здесь мы видим, что наши кастомные middleware`ы были добавлены в середину цепочки между служебными и core обработчиками.

Служебные отвечают за перенаправление сообщения обработанного с ошибкой в другую fallback очередь, за механизм повтора обработки, за выполнение определенных сообщений после того как обработается текущее и т.д.

Core обработчики отвечают за непосредственно отправку сообщения и конечную обработку

Использование

Чтобы начать использование messenger нужно создать DTO:

id; } public function getContext(): array { return $this->context; } } `} language={`php`} showLineNumbers={false} startingLineNumber={1} theme={github} />

И затем handler:

commentRepository->find($message->getId()); if (!$comment) { return; } if (2 === $this->spamChecker->getSpamScore($comment, $message->getContext())) { $comment->setState('spam'); } else { $comment->setState('published'); } $this->entityManager->flush(); } } `} language={`php`} showLineNumbers={false} startingLineNumber={1} theme={github} />

Допустимо создавать множество handler`ов на одну DTO. В этом случае вызовутся все обработчики.

Отправка сообщения

В простом виде для того чтобы отправить сообщение нужно "заинжектить" транспорт в конструкторе и затем отправить через него DTO.

commentRepository->find(1); $this->bus->dispatch(new CommentMessage($comment->getId(), [])); return $this->redirectToRoute('conference', ['slug' => $conference->getSlug()]); } } `} language={`php`} showLineNumbers={false} startingLineNumber={1} theme={github} />

В этом случае сообщение пройдет всю цепочку обработки через все middleware`ы.

В messenger.middleware.send_message обработчике сообщение будет сериализовано и отправлено в очередь.

В более сложных вариантах использования может потребоваться сделать какую-то обработку сообщения.

Например:

  • Добавить доп данные в сообщение
  • Прервать выполнение сообщения
  • Поменять трансфер или очередь
  • Создать ttl очередь и перенаправить в нее сообщение
  • и т.д.

В этом случае нужно обернуть DTO в спец объект Envelope, который помимо самого сообщения может содержать вспомогательную информацию - объекты имплементирующие интерфейс Symfony\Component\Messenger\Stamp\StampInterface. Например:

key; } public function setKey(?string $key): self { $this->key = $key; return $this; } public function getTtl(): ?int { return $this->ttl; } public function setTtl(?int $ttl): self { $this->ttl = $ttl; return $this; } public function getUseLock(): ?bool { return $this->useLock; } public function setUseLock(?bool $useLock): self { $this->useLock = $useLock; return $this; } public function getParallel(): ?bool { return $this->parallel; } public function setParallel(?bool $parallel): self { $this->parallel = $parallel; return $this; } public function getAfterParallelMessage(): ?object { return $this->afterParallelMessage; } public function setAfterParallelMessage(?object $afterParallelMessage): self { $this->afterParallelMessage = $afterParallelMessage; return $this; } public function getTimestamp(): ?int { return $this->timestamp; } public function setTimestamp(?int $timestamp): self { $this->timestamp = $timestamp; return $this; } public function getNeedResolveOrderConflict(): ?bool { return $this->needResolveOrderConflict; } public function setNeedResolveOrderConflict(?bool $needResolveOrderConflict): self { $this->needResolveOrderConflict = $needResolveOrderConflict; return $this; } public function getMessages(): array { return $this->messages; } public function addMessage(object $message): self { $this->messages[] = $message; return $this; } public function popMessage(): ?object { return array_shift($this->messages); } public function getLockKey(): string { $this->assertHasKey(); return $this->key . ':lock'; } public function getTimestampKey(): string { $this->assertHasKey(); return $this->key . ':timestamp'; } public function getOrderConflictKey(): string { $this->assertHasKey(); return $this->key . ':order_conflict'; } public function getParallelKey(): string { $this->assertHasKey(); return $this->key . ':parallel'; } public function assertHasKey(): void { if (!$this->key) { throw new \\InvalidArgumentException('Key not found'); } } } `} language={`php`} showLineNumbers={false} startingLineNumber={1} theme={github} />

Отправить штамп можно так:

key('user_1') ->lock(); $envelope = Envelope::wrap($data); $envelope = $envelope->with($context); $this->bus->dispatch($envelope); `} language={`php`} showLineNumbers={false} startingLineNumber={1} theme={github} />

Эта информация будет доступна в middleware и ее можно использовать, например так:

last(ContextStamp::class); if (!$stamp || !$stamp->getUseLock()) { return $stack->next()->handle($envelope, $stack); } $receivedStamp = $envelope->last(ReceivedStamp::class); // Если воркер обработал сообщение, то снимаем lock. Даже если код упал. if ($receivedStamp) { try { return $stack->next()->handle($envelope, $stack); } finally { $this->client->unlock($stamp->getLockKey()); } } // Если отправка залочена, то прекращаем обработку if (!$this->client->lock($stamp->getLockKey())) { return $envelope; } return $stack->next()->handle($envelope, $stack); } } `} language={`php`} showLineNumbers={false} startingLineNumber={1} theme={github} />

В начале middeware есть проверка на наличие нашего штампа. Если его нет, то обработчик пропускает и запускается следующий элемент цепочки.

Далее мы проверяем наличие ReceivedStamp. Если его нет, значит мы находимся на этапе отправки. Мы пробуем поставить блокировку в редисе и если нам удалось, то идем дальше, иначе останавливаем обработку сообщения. Если же ReceivedStamp есть, то значит мы в консамере. Здесь нам нужно прокинуть сообщение дальше по цепочке и после обработки снимаем блокировку. Даже в случае ошибки. Чтобы не заблочить выполнение других сообщений.

Получаем сообщение

Для того чтобы получить сообщение из очереди и обработать его, нужно выполнить команду

Команда будет работать в режиме демона. То есть работать в бесконечном цикле, засыпая на каждой итерации.

На каждой итерации messenger будет проходиться по всем сконфигурированным транспортам, которые должны обработаться командой.

В каждом транспорте компонент в цикле проходит все сконфигурированные очереди. То есть читает сообщения из первой очереди. Как только все сообщения в первой очереди обработаны, переходим к следующей очереди и т. д. В бесконечном цикле читаем сообщения из очередей по кругу.

Для локальной разработки чтобы при переходе с ветки на ветку код обновлялся в демоне было принято решение добавить опцию --time-limit=10 в команду консамера, чтобы каждые 10 секунд демон перезагружался

Наша реализация

Для более удобного использования было принято решения сделать сервис для работы с messenger.

grabContextStamp(); $data = $context->popMessage(); if (!$data) { throw new \\InvalidArgumentException('Message not found!!!'); } $envelope = Envelope::wrap($data); $envelope = $envelope->with($context); $this->bus->dispatch($envelope); $this->context = null; } public function key(string $key): self { $context = $this->grabContextStamp(); $context->setKey($key); return $this; } public function ttl(int $ttl): self { $context = $this->grabContextStamp(); $context->setTtl($ttl); return $this; } public function lock(): self { $context = $this->grabContextStamp(); $context->setUseLock(true); return $this; } public function parallel(): self { $context = $this->grabContextStamp(); $context->setParallel(true); return $this; } public function ifNewer(int $timestamp): self { $context = $this->grabContextStamp(); $context->setTimestamp($timestamp); return $this; } public function resolveOrderConflict(): self { $context = $this->grabContextStamp(); $context->setNeedResolveOrderConflict(true); return $this; } public function addMessage(object $message): self { $context = $this->grabContextStamp(); $context->addMessage($message); return $this; } public function setAfterParallelMessage(object $message): self { $context = $this->grabContextStamp(); $context->setAfterParallelMessage($message); return $this; } private function grabContextStamp(): ContextStamp { return $this->context = ($this->context ?? new ContextStamp()); } } `} language={`php`} showLineNumbers={false} startingLineNumber={1} theme={github} />

Он реализован по паттерну builder, где все состояние хранится в объекте контексте, который всегда очищается после вызова метода send

В сервисе заложен набор фич с реализацией определенных кейсов работы с очередями, которые неоднократно применялись на проектах.

Кейсы:

Задача должна выполнить 1 раз. Следующая может начаться только если 1 закончилась.

В данном случае ставится лок в редис. Если лок есть, то новое сообщение игнорируется

superBus ->key('user_1') // Ключ запроса ->lock() // Флаг блокировки. Если стоит, то применится механизм. Ключ будет построен на основе key ->addMessage(new Test('asdf')) ->send(); `} language={`php`} showLineNumbers={false} startingLineNumber={1} theme={github} />

Нужно игнорировать сообщения дата отправки которых меньше, чем те которые уже получили

В данном случае в редис пишется метка времени последнего отправленного сообщения Следующее сообщение будет сравниваться с установленным значением. Если значение меньше, сообщение игнорится. Если больше, то отправляем и ставим новое значение в редис.

superBus ->key('user_1') ->ifNewer(121) ->addMessage(new Test('asdf')) ->send(); // Отправится, так как первое $this->superBus ->key('user_1') ->ifNewer(120) ->addMessage(new Test('asdf')) ->send(); // Будет проигнорировано $this->superBus ->key('user_1') ->ifNewer(122) ->addMessage(new Test('asdf')) ->send(); // Отправится, так как оно более свежее `} language={`php`} showLineNumbers={false} startingLineNumber={1} theme={github} />

Отправка сообщения, которое должно обработаться через 23 секунды

В данном случае добавляется stamp с delay меткой. Messenger сам поймет что сообщение ttl. Создаст новую ttl ветку и отправит туда сообщение.

superBus ->ttl(23) ->addMessage(new Test('asdf')) ->send(); `} language={`php`} showLineNumbers={false} startingLineNumber={1} theme={github} />

Гарантирование порядка выполнения сообщений

1. 1 очередь и много воркеров. 2 сообщение может обработаться быстрее более быстрым воркером чем первое

2. Сообщения в разных очередях и разные воркеры. Когда есть 2 зависимые задачи. Например, в одну очередь отправляем сообщение о добавлении продукта, а в другую о добавлении настроек продуктов.

В данном случае в редисе ставится лок на отправку сообщений. Если ключ уже залочен, то бедет создана лок очередь и сообщения будут отпарвлены туда. Как только базовое сообщение обработается, воркер заберет остальные сообщения из лок очереди и выполнит их в этом же потоке. Если базовое сообщение не успеет обработаться в течение часа, сообщения будет перенаправлено в sync очередь, где обработаются синхронно 1 воркером

superBus ->key('product_1') ->resolveOrderConflict() ->addMessage(new Product('111')) ->send(); $this->superBus ->key('product_1') ->resolveOrderConflict() ->addMessage(new ProductSettings('111')) ->send(); `} language={`php`} showLineNumbers={false} startingLineNumber={1} theme={github} />

Цепочка задач. Когда нужно гарантировать порядок выполнения задач и все задачи заранее известны.

В данном случае в rabbit улетят все сообщения разом и обработаются в такой же последовательности

superBus ->addMessage(new Product('111')) ->addMessage(new Product('222')) ->addMessage(new Product('333')) ->addMessage(new Product('444')) ->send(); `} language={`php`} showLineNumbers={false} startingLineNumber={1} theme={github} />

Запустить задачи параллельно

В данном случае в rabbit улетят все сообщения отдельно. Последовательность обработки не гарантирована

superBus ->parallel() ->addMessage(new Product('111')) ->addMessage(new Product('222')) ->addMessage(new Product('333')) ->addMessage(new Product('444')) ->send(); `} language={`php`} showLineNumbers={false} startingLineNumber={1} theme={github} />

Выполнить задачу после выполнения ряда параллельных задач

В данном случае в rabbit улетят все параллельные сообщения отдельно. Последовательность обработки не гарантирована. После того как все сообщения выполнятся будет снята блокировка в редисе и выполнится последнее сообщение

superBus ->key('user_2') ->parallel() ->addMessage(new Test('111')) ->addMessage(new Test('222')) ->addMessage(new Test('333')) ->addMessage(new Test('444')) ->setAfterParallelMessage(new Test('555')) ->send(); `} language={`php`} showLineNumbers={false} startingLineNumber={1} theme={github} />
) } export default Home