You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
messenger/pages/index.tsx

858 lines
36 KiB

import type {NextPage} from 'next'
import Head from 'next/head'
import Image from 'next/image'
import styles from '../styles/Home.module.css'
import '@fontsource/roboto/300.css';
import '@fontsource/roboto/400.css';
import '@fontsource/roboto/500.css';
import '@fontsource/roboto/700.css';
import {CopyBlock, github} from "react-code-blocks";
import {Table, TableBody, TableRow, TableCell, TableHead} from '@mui/material';
const Home: NextPage = () => {
return (
<div className={styles.container}>
<Head>
<title>Create Next App</title>
<meta name="description" content="Generated by create next app"/>
<link rel="icon" href="/favicon.ico"/>
</Head>
<main className={styles.main}>
<h1>Symfony messenger</h1>
<section>
<p>Компонент Messenger позволяет управлять асинхронным кодом в Symfony.</p>
<p>В нем заложен ряд архитектурных решений, которые делают этот компонент очень гибким в настройке.</p>
<p>С некоторыми из них мы скоро познакомимся.</p>
</section>
<section>
<h2>Пример, конфигурации с транспортом amqp может выглядеть так:</h2>
<div className={styles.code}>
<CopyBlock
text={`# messenger.yaml
parameters:
messenger: # Это кастомный блок. На его основе генерируется конфигурация supervisor.
workers:
- async:
numprocs: 5 # Количество запущенных процессов
# queues: [test, mail] # Позволяет указать очереди с которыми будет работать consumer
- sync:
framework:
messenger:
buses:
messenger.bus.default:
middleware:
- 'SVEAK\\MessengerBundle\\Middleware\\ChainMiddleware'
- 'SVEAK\\MessengerBundle\\Middleware\\ParallelMiddleware'
- 'SVEAK\\MessengerBundle\\Middleware\\AutoRoutingMiddleware'
- 'SVEAK\\MessengerBundle\\Middleware\\TtlMiddleware'
- 'SVEAK\\MessengerBundle\\Middleware\\LockMiddleware'
- 'SVEAK\\MessengerBundle\\Middleware\\SendTimestampMiddleware'
- 'SVEAK\\MessengerBundle\\Middleware\\LockRoutingMiddleware'
# Uncomment this (and the failed transport below) to send failed messages to this transport for later handling.
# failure_transport: failed
transports:
# https://symfony.com/doc/current/messenger.html#transport-configuration
# async: '%env(MESSENGER_TRANSPORT_DSN)%'
# failed: 'doctrine://default?queue_name=failed'
# sync: 'sync://'
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
retry_strategy:
max_retries: 0
options:
exchange:
name: async
type: direct
default_publish_routing_key: lost
queues:
lost:
binding_keys: [ lost ]
arguments:
x-dead-letter-exchange: async.fallback
test:
binding_keys: [ test ]
arguments:
x-dead-letter-exchange: async.fallback
async_fallback:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
exchange:
name: async.fallback
type: direct
default_publish_routing_key: lost.fallback
queues:
lost.fallback:
binding_keys: [ lost ]
arguments:
x-message-ttl: 60000
x-dead-letter-exchange: async
test.fallback:
binding_keys: [ test ]
arguments:
x-message-ttl: 60000
x-dead-letter-exchange: async
lock:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
exchange:
name: lock
type: direct
sync:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
exchange:
name: sync
type: direct
default_publish_routing_key: sync
queues:
sync:
binding_keys: [ sync ]
arguments:
x-single-active-consumer: true
x-dead-letter-exchange: sync.fallback
sync_fallback:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
exchange:
name: sync.fallback
type: direct
default_publish_routing_key: sync
queues:
sync.fallback:
binding_keys: [ sync ]
arguments:
x-message-ttl: 60000
x-dead-letter-exchange: sync
routing:
'App\\Message\\Test': async
`}
language={`yaml`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
</div>
</section>
<section>
<h2>Отладка</h2>
<div className={styles.code}>
<CopyBlock
text={`bin/console debug:container --show-arguments messenger.bus.default.inner`}
language={`bash`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
</div>
<p>
С помощью команды можно посмотреть как сконфигурирован транспорт. Посмотреть всю цепочку обработки сообщения перед отправкой и обработкой.
</p>
<div className={styles.code}>
<CopyBlock
text={`
Information for Service "debug.traced.messenger.bus.default.inner"
==================================================================
---------------- ---------------------------------------------------------------------------
Option Value
---------------- ---------------------------------------------------------------------------
Service ID debug.traced.messenger.bus.default.inner
Class Symfony\\Component\\Messenger\\MessageBus
Tags -
Public no
Synthetic no
Lazy no
Shared yes
Abstract no
Autowired no
Autoconfigured no
Arguments Iterator (14 element(s))
- Service(messenger.bus.default.middleware.traceable)
- Service(messenger.bus.default.middleware.add_bus_name_stamp_middleware)
- Service(messenger.middleware.reject_redelivered_message_middleware)
- Service(messenger.middleware.dispatch_after_current_bus)
- Service(messenger.middleware.failed_message_processing_middleware)
- Service(SVEAK\\MessengerBundle\\Middleware\\ChainMiddleware)
- Service(SVEAK\\MessengerBundle\\Middleware\\ParallelMiddleware)
- Service(SVEAK\\MessengerBundle\\Middleware\\AutoRoutingMiddleware)
- Service(SVEAK\\MessengerBundle\\Middleware\\TtlMiddleware)
- Service(SVEAK\\MessengerBundle\\Middleware\\LockMiddleware)
- Service(SVEAK\\MessengerBundle\\Middleware\\SendTimestampMiddleware)
- Service(SVEAK\\MessengerBundle\\Middleware\\LockRoutingMiddleware)
- Service(messenger.middleware.send_message)
- Service(messenger.bus.default.middleware.handle_message)
---------------- ---------------------------------------------------------------------------
! [NOTE] The "debug.traced.messenger.bus.default.inner" service or alias has been removed or inlined when the container
! was compiled.
`}
language={`txt`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
</div>
<p>Messenger использует паттерн - &quot;Цепочка обязанностей&quot; для обработки сообщений.</p>
<p>Здесь мы видим, что наши кастомные middleware`ы были добавлены в середину цепочки между служебными и core обработчиками.</p>
<p>Служебные отвечают за перенаправление сообщения обработанного с ошибкой в другую fallback очередь, за механизм повтора обработки, за выполнение определенных сообщений после того как обработается текущее и т.д.</p>
<p>Core обработчики отвечают за непосредственно отправку сообщения и конечную обработку</p>
</section>
<section>
<h2>Использование</h2>
<p>Чтобы начать использование messenger нужно создать DTO:</p>
<div className={styles.code}>
<CopyBlock
text={`// src/Message/CommentMessage.php
namespace App\\Message;
class CommentMessage
{
public function __construct(
private int $id,
private array $context = [],
) {
}
public function getId(): int
{
return $this->id;
}
public function getContext(): array
{
return $this->context;
}
}
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
</div>
<p>И затем handler:</p>
<div className={styles.code}>
<CopyBlock
text={`// src/MessageHandler/CommentMessageHandler.php
namespace App\\MessageHandler;
use App\\Message\\CommentMessage;
use App\\Repository\\CommentRepository;
use App\\SpamChecker;
use Doctrine\\ORM\\EntityManagerInterface;
use Symfony\\Component\\Messenger\\Attribute\\AsMessageHandler;
#[AsMessageHandler]
class CommentMessageHandler
{
public function __construct(
private EntityManagerInterface $entityManager,
private SpamChecker $spamChecker,
private CommentRepository $commentRepository,
) {
}
public function __invoke(CommentMessage $message)
{
$comment = $this->commentRepository->find($message->getId());
if (!$comment) {
return;
}
if (2 === $this->spamChecker->getSpamScore($comment, $message->getContext())) {
$comment->setState('spam');
} else {
$comment->setState('published');
}
$this->entityManager->flush();
}
}
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
</div>
<p>Допустимо создавать множество handler`ов на одну DTO. В этом случае вызовутся все обработчики.</p>
</section>
<section>
<h2>Отправка сообщения</h2>
<p>В простом виде для того чтобы отправить сообщение нужно &quot;заинжектить&quot; транспорт в конструкторе и затем отправить через него DTO.</p>
<div className={styles.code}>
<CopyBlock
text={`
class ConferenceController extends AbstractController
{
public function __construct(
private CommentRepository $commentRepository,
private MessageBusInterface $bus,
) {
}
public function exampleAction(
Request $request,
): Response {
$comment = $this->commentRepository->find(1);
$this->bus->dispatch(new CommentMessage($comment->getId(), []));
return $this->redirectToRoute('conference', ['slug' => $conference->getSlug()]);
}
}
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
</div>
<p>В этом случае сообщение пройдет всю цепочку обработки через все middleware`ы.</p>
<p>В messenger.middleware.send_message обработчике сообщение будет сериализовано и отправлено в очередь.</p>
<p>В более сложных вариантах использования может потребоваться сделать какую-то обработку сообщения.</p>
<p>Например:</p>
<p>
<ul>
<li>Добавить доп данные в сообщение</li>
<li>Прервать выполнение сообщения</li>
<li>Поменять трансфер или очередь</li>
<li>Создать ttl очередь и перенаправить в нее сообщение</li>
<li>и т.д.</li>
</ul>
</p>
<p>
В этом случае нужно обернуть DTO в спец объект Envelope, который помимо самого сообщения может содержать вспомогательную информацию - объекты имплементирующие интерфейс Symfony\Component\Messenger\Stamp\StampInterface. Например:
</p>
<div className={styles.code}>
<CopyBlock
text={`// Наш кастомный штамп
<?php
declare(strict_types=1);
namespace SVEAK\\MessengerBundle\\Stamp;
use Symfony\\Component\\Messenger\\Stamp\\StampInterface;
class ContextStamp implements StampInterface
{
private ?string $key = null;
private ?int $ttl = null;
private ?bool $useLock = null;
private ?bool $parallel = null;
private ?int $timestamp = null;
private ?bool $needResolveOrderConflict = null;
private array $messages = [];
private ?object $afterParallelMessage = null;
public function getKey(): ?string
{
return $this->key;
}
public function setKey(?string $key): self
{
$this->key = $key;
return $this;
}
public function getTtl(): ?int
{
return $this->ttl;
}
public function setTtl(?int $ttl): self
{
$this->ttl = $ttl;
return $this;
}
public function getUseLock(): ?bool
{
return $this->useLock;
}
public function setUseLock(?bool $useLock): self
{
$this->useLock = $useLock;
return $this;
}
public function getParallel(): ?bool
{
return $this->parallel;
}
public function setParallel(?bool $parallel): self
{
$this->parallel = $parallel;
return $this;
}
public function getAfterParallelMessage(): ?object
{
return $this->afterParallelMessage;
}
public function setAfterParallelMessage(?object $afterParallelMessage): self
{
$this->afterParallelMessage = $afterParallelMessage;
return $this;
}
public function getTimestamp(): ?int
{
return $this->timestamp;
}
public function setTimestamp(?int $timestamp): self
{
$this->timestamp = $timestamp;
return $this;
}
public function getNeedResolveOrderConflict(): ?bool
{
return $this->needResolveOrderConflict;
}
public function setNeedResolveOrderConflict(?bool $needResolveOrderConflict): self
{
$this->needResolveOrderConflict = $needResolveOrderConflict;
return $this;
}
public function getMessages(): array
{
return $this->messages;
}
public function addMessage(object $message): self
{
$this->messages[] = $message;
return $this;
}
public function popMessage(): ?object
{
return array_shift($this->messages);
}
public function getLockKey(): string
{
$this->assertHasKey();
return $this->key . ':lock';
}
public function getTimestampKey(): string
{
$this->assertHasKey();
return $this->key . ':timestamp';
}
public function getOrderConflictKey(): string
{
$this->assertHasKey();
return $this->key . ':order_conflict';
}
public function getParallelKey(): string
{
$this->assertHasKey();
return $this->key . ':parallel';
}
public function assertHasKey(): void
{
if (!$this->key) {
throw new \\InvalidArgumentException('Key not found');
}
}
}
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
</div>
<p>Отправить штамп можно так:</p>
<div className={styles.code}>
<CopyBlock
text={`
use Symfony\\Component\\Messenger\\Envelope;
$context = new ContextStamp();
$context
->key('user_1')
->lock();
$envelope = Envelope::wrap($data);
$envelope = $envelope->with($context);
$this->bus->dispatch($envelope);
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
</div>
<p>Эта информация будет доступна в middleware и ее можно использовать, например так:</p>
<div className={styles.code}>
<CopyBlock
text={`// bundles/MessengerBundle/src/Middleware/LockMiddleware.php
<?php
declare(strict_types=1);
namespace SVEAK\\MessengerBundle\\Middleware;
use SVEAK\\MessengerBundle\\Stamp\\ContextStamp;
use SVEAK\\RedisBundle\\Service\\RedisClient;
use Symfony\\Component\\Messenger\\Envelope;
use Symfony\\Component\\Messenger\\Middleware\\MiddlewareInterface;
use Symfony\\Component\\Messenger\\Middleware\\StackInterface;
use Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp;
class LockMiddleware implements MiddlewareInterface
{
public function __construct(
private RedisClient $client,
) {
}
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
/** @var ?ContextStamp $stamp */
$stamp = $envelope->last(ContextStamp::class);
if (!$stamp || !$stamp->getUseLock()) {
return $stack->next()->handle($envelope, $stack);
}
$receivedStamp = $envelope->last(ReceivedStamp::class);
// Если воркер обработал сообщение, то снимаем lock. Даже если код упал.
if ($receivedStamp) {
try {
return $stack->next()->handle($envelope, $stack);
} finally {
$this->client->unlock($stamp->getLockKey());
}
}
// Если отправка залочена, то прекращаем обработку
if (!$this->client->lock($stamp->getLockKey())) {
return $envelope;
}
return $stack->next()->handle($envelope, $stack);
}
}
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
</div>
<p>В начале middeware есть проверка на наличие нашего штампа. Если его нет, то обработчик пропускает и запускается следующий элемент цепочки.</p>
<p>
Далее мы проверяем наличие ReceivedStamp.
Если его нет, значит мы находимся на этапе отправки.
Мы пробуем поставить блокировку в редисе и если нам удалось, то идем дальше, иначе останавливаем обработку сообщения.
Если же ReceivedStamp есть, то значит мы в консамере. Здесь нам нужно прокинуть сообщение дальше по цепочке и после обработки снимаем блокировку. Даже в случае ошибки. Чтобы не заблочить выполнение других сообщений.
</p>
</section>
<section>
<h2>Получаем сообщение</h2>
<p>Для того чтобы получить сообщение из очереди и обработать его, нужно выполнить команду</p>
<div className={styles.code}>
<CopyBlock
text={`bin/console messenger:consume async`}
language={`bash`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
</div>
<p>Команда будет работать в режиме демона. То есть работать в бесконечном цикле, засыпая на каждой итерации.</p>
<p>На каждой итерации messenger будет проходиться по всем сконфигурированным транспортам, которые должны обработаться командой.</p>
<p>В каждом транспорте компонент в цикле проходит все сконфигурированные очереди. То есть читает сообщения из первой очереди. Как только все сообщения в первой очереди обработаны, переходим к следующей очереди и т. д. В бесконечном цикле читаем сообщения из очередей по кругу.</p>
<p>Для локальной разработки чтобы при переходе с ветки на ветку код обновлялся в демоне было принято решение добавить опцию <b>--time-limit=10</b> в команду консамера, чтобы каждые 10 секунд демон перезагружался</p>
</section>
<section>
<h2>Наша реализация</h2>
<p>Для более удобного использования было принято решения сделать сервис для работы с messenger.</p>
<div className={styles.code}>
<CopyBlock
text={`
<?php
declare(strict_types=1);
namespace SVEAK\\MessengerBundle\\Service;
use SVEAK\\MessengerBundle\\Stamp\\ContextStamp;
use Symfony\\Component\\Messenger\\Envelope;
use Symfony\\Component\\Messenger\\MessageBusInterface;
class Bus
{
private ?ContextStamp $context = null;
public function __construct(
private MessageBusInterface $bus,
) {
}
public function send(): void
{
$context = $this->grabContextStamp();
$data = $context->popMessage();
if (!$data) {
throw new \\InvalidArgumentException('Message not found!!!');
}
$envelope = Envelope::wrap($data);
$envelope = $envelope->with($context);
$this->bus->dispatch($envelope);
$this->context = null;
}
public function key(string $key): self
{
$context = $this->grabContextStamp();
$context->setKey($key);
return $this;
}
public function ttl(int $ttl): self
{
$context = $this->grabContextStamp();
$context->setTtl($ttl);
return $this;
}
public function lock(): self
{
$context = $this->grabContextStamp();
$context->setUseLock(true);
return $this;
}
public function parallel(): self
{
$context = $this->grabContextStamp();
$context->setParallel(true);
return $this;
}
public function ifNewer(int $timestamp): self
{
$context = $this->grabContextStamp();
$context->setTimestamp($timestamp);
return $this;
}
public function resolveOrderConflict(): self
{
$context = $this->grabContextStamp();
$context->setNeedResolveOrderConflict(true);
return $this;
}
public function addMessage(object $message): self
{
$context = $this->grabContextStamp();
$context->addMessage($message);
return $this;
}
public function setAfterParallelMessage(object $message): self
{
$context = $this->grabContextStamp();
$context->setAfterParallelMessage($message);
return $this;
}
private function grabContextStamp(): ContextStamp
{
return $this->context = ($this->context ?? new ContextStamp());
}
}
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
</div>
<p>Он реализован по паттерну builder, где все состояние хранится в объекте контексте, который всегда очищается после вызова метода send</p>
<p>В сервисе заложен набор фич с реализацией определенных кейсов работы с очередями, которые неоднократно применялись на проектах.</p>
<p>Кейсы:</p>
<h3>Задача должна выполнить 1 раз. Следующая может начаться только если 1 закончилась.</h3>
<p>В данном случае ставится лок в редис. Если лок есть, то новое сообщение игнорируется</p>
<div className={styles.code}>
<CopyBlock
text={`
$this->superBus
->key('user_1') // Ключ запроса
->lock() // Флаг блокировки. Если стоит, то применится механизм. Ключ будет построен на основе key
->addMessage(new Test('asdf'))
->send();
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
</div>
<h3>Нужно игнорировать сообщения дата отправки которых меньше, чем те которые уже получили</h3>
<p>В данном случае в редис пишется метка времени последнего отправленного сообщения
Следующее сообщение будет сравниваться с установленным значением. Если значение меньше, сообщение игнорится.
Если больше, то отправляем и ставим новое значение в редис.</p>
<div className={styles.code}>
<CopyBlock
text={`
$this->superBus
->key('user_1')
->ifNewer(121)
->addMessage(new Test('asdf'))
->send(); // Отправится, так как первое
$this->superBus
->key('user_1')
->ifNewer(120)
->addMessage(new Test('asdf'))
->send(); // Будет проигнорировано
$this->superBus
->key('user_1')
->ifNewer(122)
->addMessage(new Test('asdf'))
->send(); // Отправится, так как оно более свежее
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
</div>
<h3>Отправка сообщения, которое должно обработаться через 23 секунды</h3>
<p>В данном случае добавляется stamp с delay меткой. Messenger сам поймет что сообщение ttl. Создаст новую ttl ветку и отправит туда сообщение.</p>
<div className={styles.code}>
<CopyBlock
text={`
$this->superBus
->ttl(23)
->addMessage(new Test('asdf'))
->send();
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
</div>
<h3>Гарантирование порядка выполнения сообщений</h3>
<p>
1. 1 очередь и много воркеров. 2 сообщение может обработаться быстрее более быстрым воркером чем первое
</p>
<p>
2. Сообщения в разных очередях и разные воркеры. Когда есть 2 зависимые задачи. Например, в одну очередь отправляем сообщение о добавлении продукта, а в другую о добавлении настроек продуктов.
</p>
<p>
В данном случае в редисе ставится лок на отправку сообщений.
Если ключ уже залочен, то бедет создана лок очередь и сообщения будут отпарвлены туда.
Как только базовое сообщение обработается, воркер заберет остальные сообщения из лок очереди и выполнит их в этом же потоке.
Если базовое сообщение не успеет обработаться в течение часа, сообщения будет перенаправлено в sync очередь, где обработаются синхронно 1 воркером</p>
<div className={styles.code}>
<CopyBlock
text={`
$this->superBus
->key('product_1')
->resolveOrderConflict()
->addMessage(new Product('111'))
->send();
$this->superBus
->key('product_1')
->resolveOrderConflict()
->addMessage(new ProductSettings('111'))
->send();
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
</div>
<h3>Цепочка задач. Когда нужно гарантировать порядок выполнения задач и все задачи заранее известны.</h3>
<p>В данном случае в rabbit улетят все сообщения разом и обработаются в такой же последовательности</p>
<div className={styles.code}>
<CopyBlock
text={`
$this->superBus
->addMessage(new Product('111'))
->addMessage(new Product('222'))
->addMessage(new Product('333'))
->addMessage(new Product('444'))
->send();
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
</div>
<h3>Запустить задачи параллельно</h3>
<p>В данном случае в rabbit улетят все сообщения отдельно. Последовательность обработки не гарантирована</p>
<div className={styles.code}>
<CopyBlock
text={`
$this->superBus
->parallel()
->addMessage(new Product('111'))
->addMessage(new Product('222'))
->addMessage(new Product('333'))
->addMessage(new Product('444'))
->send();
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
</div>
<h3>Выполнить задачу после выполнения ряда параллельных задач</h3>
<p>В данном случае в rabbit улетят все параллельные сообщения отдельно. Последовательность обработки не гарантирована.
После того как все сообщения выполнятся будет снята блокировка в редисе и выполнится последнее сообщение</p>
<div className={styles.code}>
<CopyBlock
text={`
$this->superBus
->key('user_2')
->parallel()
->addMessage(new Test('111'))
->addMessage(new Test('222'))
->addMessage(new Test('333'))
->addMessage(new Test('444'))
->setAfterParallelMessage(new Test('555'))
->send();
`}
language={`php`}
showLineNumbers={false}
startingLineNumber={1}
theme={github}
/>
</div>
</section>
</main>
<footer className={styles.footer}>
Powered by&nbsp;<b>Rinsvent</b>
</footer>
</div>
)
}
export default Home