|
|
|
import type {NextPage} from 'next'
|
|
|
|
import Head from 'next/head'
|
|
|
|
import Image from 'next/image'
|
|
|
|
import styles from '../styles/Home.module.css'
|
|
|
|
import '@fontsource/roboto/300.css';
|
|
|
|
import '@fontsource/roboto/400.css';
|
|
|
|
import '@fontsource/roboto/500.css';
|
|
|
|
import '@fontsource/roboto/700.css';
|
|
|
|
import {CopyBlock, github} from "react-code-blocks";
|
|
|
|
import {Table, TableBody, TableRow, TableCell, TableHead} from '@mui/material';
|
|
|
|
|
|
|
|
const Home: NextPage = () => {
|
|
|
|
return (
|
|
|
|
<div className={styles.container}>
|
|
|
|
<Head>
|
|
|
|
<title>Create Next App</title>
|
|
|
|
<meta name="description" content="Generated by create next app"/>
|
|
|
|
<link rel="icon" href="/favicon.ico"/>
|
|
|
|
</Head>
|
|
|
|
|
|
|
|
<main className={styles.main}>
|
|
|
|
<h1>Symfony messenger</h1>
|
|
|
|
<section>
|
|
|
|
<p>Компонент Messenger позволяет управлять асинхронным кодом в Symfony.</p>
|
|
|
|
<p>В нем заложен ряд архитектурных решений, которые делают этот компонент очень гибким в настройке.</p>
|
|
|
|
<p>С некоторыми из них мы скоро познакомимся.</p>
|
|
|
|
</section>
|
|
|
|
|
|
|
|
<section>
|
|
|
|
<h2>Пример, конфигурации с транспортом amqp может выглядеть так:</h2>
|
|
|
|
<div className={styles.code}>
|
|
|
|
<CopyBlock
|
|
|
|
text={`# messenger.yaml
|
|
|
|
parameters:
|
|
|
|
messenger: # Это кастомный блок. На его основе генерируется конфигурация supervisor.
|
|
|
|
workers:
|
|
|
|
- async:
|
|
|
|
numprocs: 5 # Количество запущенных процессов
|
|
|
|
# queues: [test, mail] # Позволяет указать очереди с которыми будет работать consumer
|
|
|
|
- sync:
|
|
|
|
|
|
|
|
framework:
|
|
|
|
messenger:
|
|
|
|
buses:
|
|
|
|
messenger.bus.default:
|
|
|
|
middleware:
|
|
|
|
- 'SVEAK\\MessengerBundle\\Middleware\\ChainMiddleware'
|
|
|
|
- 'SVEAK\\MessengerBundle\\Middleware\\ParallelMiddleware'
|
|
|
|
- 'SVEAK\\MessengerBundle\\Middleware\\AutoRoutingMiddleware'
|
|
|
|
- 'SVEAK\\MessengerBundle\\Middleware\\TtlMiddleware'
|
|
|
|
- 'SVEAK\\MessengerBundle\\Middleware\\LockMiddleware'
|
|
|
|
- 'SVEAK\\MessengerBundle\\Middleware\\SendTimestampMiddleware'
|
|
|
|
- 'SVEAK\\MessengerBundle\\Middleware\\LockRoutingMiddleware'
|
|
|
|
# Uncomment this (and the failed transport below) to send failed messages to this transport for later handling.
|
|
|
|
# failure_transport: failed
|
|
|
|
|
|
|
|
transports:
|
|
|
|
# https://symfony.com/doc/current/messenger.html#transport-configuration
|
|
|
|
# async: '%env(MESSENGER_TRANSPORT_DSN)%'
|
|
|
|
# failed: 'doctrine://default?queue_name=failed'
|
|
|
|
# sync: 'sync://'
|
|
|
|
async:
|
|
|
|
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
|
|
|
|
retry_strategy:
|
|
|
|
max_retries: 0
|
|
|
|
options:
|
|
|
|
exchange:
|
|
|
|
name: async
|
|
|
|
type: direct
|
|
|
|
default_publish_routing_key: lost
|
|
|
|
queues:
|
|
|
|
lost:
|
|
|
|
binding_keys: [ lost ]
|
|
|
|
arguments:
|
|
|
|
x-dead-letter-exchange: async.fallback
|
|
|
|
test:
|
|
|
|
binding_keys: [ test ]
|
|
|
|
arguments:
|
|
|
|
x-dead-letter-exchange: async.fallback
|
|
|
|
async_fallback:
|
|
|
|
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
|
|
|
|
options:
|
|
|
|
exchange:
|
|
|
|
name: async.fallback
|
|
|
|
type: direct
|
|
|
|
default_publish_routing_key: lost.fallback
|
|
|
|
queues:
|
|
|
|
lost.fallback:
|
|
|
|
binding_keys: [ lost ]
|
|
|
|
arguments:
|
|
|
|
x-message-ttl: 60000
|
|
|
|
x-dead-letter-exchange: async
|
|
|
|
test.fallback:
|
|
|
|
binding_keys: [ test ]
|
|
|
|
arguments:
|
|
|
|
x-message-ttl: 60000
|
|
|
|
x-dead-letter-exchange: async
|
|
|
|
lock:
|
|
|
|
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
|
|
|
|
options:
|
|
|
|
exchange:
|
|
|
|
name: lock
|
|
|
|
type: direct
|
|
|
|
sync:
|
|
|
|
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
|
|
|
|
options:
|
|
|
|
exchange:
|
|
|
|
name: sync
|
|
|
|
type: direct
|
|
|
|
default_publish_routing_key: sync
|
|
|
|
queues:
|
|
|
|
sync:
|
|
|
|
binding_keys: [ sync ]
|
|
|
|
arguments:
|
|
|
|
x-single-active-consumer: true
|
|
|
|
x-dead-letter-exchange: sync.fallback
|
|
|
|
sync_fallback:
|
|
|
|
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
|
|
|
|
options:
|
|
|
|
exchange:
|
|
|
|
name: sync.fallback
|
|
|
|
type: direct
|
|
|
|
default_publish_routing_key: sync
|
|
|
|
queues:
|
|
|
|
sync.fallback:
|
|
|
|
binding_keys: [ sync ]
|
|
|
|
arguments:
|
|
|
|
x-message-ttl: 60000
|
|
|
|
x-dead-letter-exchange: sync
|
|
|
|
routing:
|
|
|
|
'App\\Message\\Test': async
|
|
|
|
`}
|
|
|
|
language={`yaml`}
|
|
|
|
showLineNumbers={false}
|
|
|
|
startingLineNumber={1}
|
|
|
|
theme={github}
|
|
|
|
/>
|
|
|
|
</div>
|
|
|
|
</section>
|
|
|
|
<section>
|
|
|
|
<h2>Отладка</h2>
|
|
|
|
<div className={styles.code}>
|
|
|
|
<CopyBlock
|
|
|
|
text={`bin/console debug:container --show-arguments messenger.bus.default.inner`}
|
|
|
|
language={`bash`}
|
|
|
|
showLineNumbers={false}
|
|
|
|
startingLineNumber={1}
|
|
|
|
theme={github}
|
|
|
|
/>
|
|
|
|
</div>
|
|
|
|
<p>
|
|
|
|
С помощью команды можно посмотреть как сконфигурирован транспорт. Посмотреть всю цепочку обработки сообщения перед отправкой и обработкой.
|
|
|
|
</p>
|
|
|
|
<div className={styles.code}>
|
|
|
|
<CopyBlock
|
|
|
|
text={`
|
|
|
|
Information for Service "debug.traced.messenger.bus.default.inner"
|
|
|
|
==================================================================
|
|
|
|
|
|
|
|
---------------- ---------------------------------------------------------------------------
|
|
|
|
Option Value
|
|
|
|
---------------- ---------------------------------------------------------------------------
|
|
|
|
Service ID debug.traced.messenger.bus.default.inner
|
|
|
|
Class Symfony\\Component\\Messenger\\MessageBus
|
|
|
|
Tags -
|
|
|
|
Public no
|
|
|
|
Synthetic no
|
|
|
|
Lazy no
|
|
|
|
Shared yes
|
|
|
|
Abstract no
|
|
|
|
Autowired no
|
|
|
|
Autoconfigured no
|
|
|
|
Arguments Iterator (14 element(s))
|
|
|
|
- Service(messenger.bus.default.middleware.traceable)
|
|
|
|
- Service(messenger.bus.default.middleware.add_bus_name_stamp_middleware)
|
|
|
|
- Service(messenger.middleware.reject_redelivered_message_middleware)
|
|
|
|
- Service(messenger.middleware.dispatch_after_current_bus)
|
|
|
|
- Service(messenger.middleware.failed_message_processing_middleware)
|
|
|
|
- Service(SVEAK\\MessengerBundle\\Middleware\\ChainMiddleware)
|
|
|
|
- Service(SVEAK\\MessengerBundle\\Middleware\\ParallelMiddleware)
|
|
|
|
- Service(SVEAK\\MessengerBundle\\Middleware\\AutoRoutingMiddleware)
|
|
|
|
- Service(SVEAK\\MessengerBundle\\Middleware\\TtlMiddleware)
|
|
|
|
- Service(SVEAK\\MessengerBundle\\Middleware\\LockMiddleware)
|
|
|
|
- Service(SVEAK\\MessengerBundle\\Middleware\\SendTimestampMiddleware)
|
|
|
|
- Service(SVEAK\\MessengerBundle\\Middleware\\LockRoutingMiddleware)
|
|
|
|
- Service(messenger.middleware.send_message)
|
|
|
|
- Service(messenger.bus.default.middleware.handle_message)
|
|
|
|
---------------- ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
! [NOTE] The "debug.traced.messenger.bus.default.inner" service or alias has been removed or inlined when the container
|
|
|
|
! was compiled.
|
|
|
|
`}
|
|
|
|
language={`txt`}
|
|
|
|
showLineNumbers={false}
|
|
|
|
startingLineNumber={1}
|
|
|
|
theme={github}
|
|
|
|
/>
|
|
|
|
</div>
|
|
|
|
<p>Messenger использует паттерн - "Цепочка обязанностей" для обработки сообщений.</p>
|
|
|
|
<p>Здесь мы видим, что наши кастомные middleware`ы были добавлены в середину цепочки между служебными и core обработчиками.</p>
|
|
|
|
<p>Служебные отвечают за перенаправление сообщения обработанного с ошибкой в другую fallback очередь, за механизм повтора обработки, за выполнение определенных сообщений после того как обработается текущее и т.д.</p>
|
|
|
|
<p>Core обработчики отвечают за непосредственно отправку сообщения и конечную обработку</p>
|
|
|
|
</section>
|
|
|
|
<section>
|
|
|
|
<h2>Использование</h2>
|
|
|
|
<p>Чтобы начать использование messenger нужно создать DTO:</p>
|
|
|
|
<div className={styles.code}>
|
|
|
|
<CopyBlock
|
|
|
|
text={`// src/Message/CommentMessage.php
|
|
|
|
namespace App\\Message;
|
|
|
|
|
|
|
|
class CommentMessage
|
|
|
|
{
|
|
|
|
public function __construct(
|
|
|
|
private int $id,
|
|
|
|
private array $context = [],
|
|
|
|
) {
|
|
|
|
}
|
|
|
|
|
|
|
|
public function getId(): int
|
|
|
|
{
|
|
|
|
return $this->id;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function getContext(): array
|
|
|
|
{
|
|
|
|
return $this->context;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
`}
|
|
|
|
language={`php`}
|
|
|
|
showLineNumbers={false}
|
|
|
|
startingLineNumber={1}
|
|
|
|
theme={github}
|
|
|
|
/>
|
|
|
|
</div>
|
|
|
|
<p>И затем handler:</p>
|
|
|
|
<div className={styles.code}>
|
|
|
|
<CopyBlock
|
|
|
|
text={`// src/MessageHandler/CommentMessageHandler.php
|
|
|
|
namespace App\\MessageHandler;
|
|
|
|
|
|
|
|
use App\\Message\\CommentMessage;
|
|
|
|
use App\\Repository\\CommentRepository;
|
|
|
|
use App\\SpamChecker;
|
|
|
|
use Doctrine\\ORM\\EntityManagerInterface;
|
|
|
|
use Symfony\\Component\\Messenger\\Attribute\\AsMessageHandler;
|
|
|
|
|
|
|
|
#[AsMessageHandler]
|
|
|
|
class CommentMessageHandler
|
|
|
|
{
|
|
|
|
public function __construct(
|
|
|
|
private EntityManagerInterface $entityManager,
|
|
|
|
private SpamChecker $spamChecker,
|
|
|
|
private CommentRepository $commentRepository,
|
|
|
|
) {
|
|
|
|
}
|
|
|
|
|
|
|
|
public function __invoke(CommentMessage $message)
|
|
|
|
{
|
|
|
|
$comment = $this->commentRepository->find($message->getId());
|
|
|
|
if (!$comment) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (2 === $this->spamChecker->getSpamScore($comment, $message->getContext())) {
|
|
|
|
$comment->setState('spam');
|
|
|
|
} else {
|
|
|
|
$comment->setState('published');
|
|
|
|
}
|
|
|
|
|
|
|
|
$this->entityManager->flush();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
`}
|
|
|
|
language={`php`}
|
|
|
|
showLineNumbers={false}
|
|
|
|
startingLineNumber={1}
|
|
|
|
theme={github}
|
|
|
|
/>
|
|
|
|
</div>
|
|
|
|
<p>Допустимо создавать множество handler`ов на одну DTO. В этом случае вызовутся все обработчики.</p>
|
|
|
|
</section>
|
|
|
|
<section>
|
|
|
|
<h2>Отправка сообщения</h2>
|
|
|
|
<p>В простом виде для того чтобы отправить сообщение нужно "заинжектить" транспорт в конструкторе и затем отправить через него DTO.</p>
|
|
|
|
<div className={styles.code}>
|
|
|
|
<CopyBlock
|
|
|
|
text={`
|
|
|
|
class ConferenceController extends AbstractController
|
|
|
|
{
|
|
|
|
public function __construct(
|
|
|
|
private CommentRepository $commentRepository,
|
|
|
|
private MessageBusInterface $bus,
|
|
|
|
) {
|
|
|
|
}
|
|
|
|
|
|
|
|
public function exampleAction(
|
|
|
|
Request $request,
|
|
|
|
): Response {
|
|
|
|
$comment = $this->commentRepository->find(1);
|
|
|
|
$this->bus->dispatch(new CommentMessage($comment->getId(), []));
|
|
|
|
return $this->redirectToRoute('conference', ['slug' => $conference->getSlug()]);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
`}
|
|
|
|
language={`php`}
|
|
|
|
showLineNumbers={false}
|
|
|
|
startingLineNumber={1}
|
|
|
|
theme={github}
|
|
|
|
/>
|
|
|
|
</div>
|
|
|
|
<p>В этом случае сообщение пройдет всю цепочку обработки через все middleware`ы.</p>
|
|
|
|
<p>В messenger.middleware.send_message обработчике сообщение будет сериализовано и отправлено в очередь.</p>
|
|
|
|
<p>В более сложных вариантах использования может потребоваться сделать какую-то обработку сообщения.</p>
|
|
|
|
<p>Например:</p>
|
|
|
|
<p>
|
|
|
|
<ul>
|
|
|
|
<li>Добавить доп данные в сообщение</li>
|
|
|
|
<li>Прервать выполнение сообщения</li>
|
|
|
|
<li>Поменять трансфер или очередь</li>
|
|
|
|
<li>Создать ttl очередь и перенаправить в нее сообщение</li>
|
|
|
|
<li>и т.д.</li>
|
|
|
|
</ul>
|
|
|
|
</p>
|
|
|
|
<p>
|
|
|
|
В этом случае нужно обернуть DTO в спец объект Envelope, который помимо самого сообщения может содержать вспомогательную информацию - объекты имплементирующие интерфейс Symfony\Component\Messenger\Stamp\StampInterface. Например:
|
|
|
|
</p>
|
|
|
|
<div className={styles.code}>
|
|
|
|
<CopyBlock
|
|
|
|
text={`// Наш кастомный штамп
|
|
|
|
<?php
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
namespace SVEAK\\MessengerBundle\\Stamp;
|
|
|
|
|
|
|
|
use Symfony\\Component\\Messenger\\Stamp\\StampInterface;
|
|
|
|
|
|
|
|
class ContextStamp implements StampInterface
|
|
|
|
{
|
|
|
|
private ?string $key = null;
|
|
|
|
private ?int $ttl = null;
|
|
|
|
private ?bool $useLock = null;
|
|
|
|
private ?bool $parallel = null;
|
|
|
|
private ?int $timestamp = null;
|
|
|
|
private ?bool $needResolveOrderConflict = null;
|
|
|
|
private array $messages = [];
|
|
|
|
private ?object $afterParallelMessage = null;
|
|
|
|
|
|
|
|
public function getKey(): ?string
|
|
|
|
{
|
|
|
|
return $this->key;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function setKey(?string $key): self
|
|
|
|
{
|
|
|
|
$this->key = $key;
|
|
|
|
return $this;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function getTtl(): ?int
|
|
|
|
{
|
|
|
|
return $this->ttl;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function setTtl(?int $ttl): self
|
|
|
|
{
|
|
|
|
$this->ttl = $ttl;
|
|
|
|
return $this;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function getUseLock(): ?bool
|
|
|
|
{
|
|
|
|
return $this->useLock;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function setUseLock(?bool $useLock): self
|
|
|
|
{
|
|
|
|
$this->useLock = $useLock;
|
|
|
|
return $this;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function getParallel(): ?bool
|
|
|
|
{
|
|
|
|
return $this->parallel;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function setParallel(?bool $parallel): self
|
|
|
|
{
|
|
|
|
$this->parallel = $parallel;
|
|
|
|
return $this;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function getAfterParallelMessage(): ?object
|
|
|
|
{
|
|
|
|
return $this->afterParallelMessage;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function setAfterParallelMessage(?object $afterParallelMessage): self
|
|
|
|
{
|
|
|
|
$this->afterParallelMessage = $afterParallelMessage;
|
|
|
|
return $this;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function getTimestamp(): ?int
|
|
|
|
{
|
|
|
|
return $this->timestamp;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function setTimestamp(?int $timestamp): self
|
|
|
|
{
|
|
|
|
$this->timestamp = $timestamp;
|
|
|
|
return $this;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function getNeedResolveOrderConflict(): ?bool
|
|
|
|
{
|
|
|
|
return $this->needResolveOrderConflict;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function setNeedResolveOrderConflict(?bool $needResolveOrderConflict): self
|
|
|
|
{
|
|
|
|
$this->needResolveOrderConflict = $needResolveOrderConflict;
|
|
|
|
return $this;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function getMessages(): array
|
|
|
|
{
|
|
|
|
return $this->messages;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function addMessage(object $message): self
|
|
|
|
{
|
|
|
|
$this->messages[] = $message;
|
|
|
|
return $this;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function popMessage(): ?object
|
|
|
|
{
|
|
|
|
return array_shift($this->messages);
|
|
|
|
}
|
|
|
|
|
|
|
|
public function getLockKey(): string
|
|
|
|
{
|
|
|
|
$this->assertHasKey();
|
|
|
|
return $this->key . ':lock';
|
|
|
|
}
|
|
|
|
|
|
|
|
public function getTimestampKey(): string
|
|
|
|
{
|
|
|
|
$this->assertHasKey();
|
|
|
|
return $this->key . ':timestamp';
|
|
|
|
}
|
|
|
|
|
|
|
|
public function getOrderConflictKey(): string
|
|
|
|
{
|
|
|
|
$this->assertHasKey();
|
|
|
|
return $this->key . ':order_conflict';
|
|
|
|
}
|
|
|
|
|
|
|
|
public function getParallelKey(): string
|
|
|
|
{
|
|
|
|
$this->assertHasKey();
|
|
|
|
return $this->key . ':parallel';
|
|
|
|
}
|
|
|
|
|
|
|
|
public function assertHasKey(): void
|
|
|
|
{
|
|
|
|
if (!$this->key) {
|
|
|
|
throw new \\InvalidArgumentException('Key not found');
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
`}
|
|
|
|
language={`php`}
|
|
|
|
showLineNumbers={false}
|
|
|
|
startingLineNumber={1}
|
|
|
|
theme={github}
|
|
|
|
/>
|
|
|
|
</div>
|
|
|
|
<p>Отправить штамп можно так:</p>
|
|
|
|
<div className={styles.code}>
|
|
|
|
<CopyBlock
|
|
|
|
text={`
|
|
|
|
use Symfony\\Component\\Messenger\\Envelope;
|
|
|
|
|
|
|
|
$context = new ContextStamp();
|
|
|
|
$context
|
|
|
|
->key('user_1')
|
|
|
|
->lock();
|
|
|
|
$envelope = Envelope::wrap($data);
|
|
|
|
$envelope = $envelope->with($context);
|
|
|
|
|
|
|
|
$this->bus->dispatch($envelope);
|
|
|
|
`}
|
|
|
|
language={`php`}
|
|
|
|
showLineNumbers={false}
|
|
|
|
startingLineNumber={1}
|
|
|
|
theme={github}
|
|
|
|
/>
|
|
|
|
</div>
|
|
|
|
<p>Эта информация будет доступна в middleware и ее можно использовать, например так:</p>
|
|
|
|
<div className={styles.code}>
|
|
|
|
<CopyBlock
|
|
|
|
text={`// bundles/MessengerBundle/src/Middleware/LockMiddleware.php
|
|
|
|
<?php
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
namespace SVEAK\\MessengerBundle\\Middleware;
|
|
|
|
|
|
|
|
use SVEAK\\MessengerBundle\\Stamp\\ContextStamp;
|
|
|
|
use SVEAK\\RedisBundle\\Service\\RedisClient;
|
|
|
|
use Symfony\\Component\\Messenger\\Envelope;
|
|
|
|
use Symfony\\Component\\Messenger\\Middleware\\MiddlewareInterface;
|
|
|
|
use Symfony\\Component\\Messenger\\Middleware\\StackInterface;
|
|
|
|
use Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp;
|
|
|
|
|
|
|
|
class LockMiddleware implements MiddlewareInterface
|
|
|
|
{
|
|
|
|
public function __construct(
|
|
|
|
private RedisClient $client,
|
|
|
|
) {
|
|
|
|
}
|
|
|
|
|
|
|
|
public function handle(Envelope $envelope, StackInterface $stack): Envelope
|
|
|
|
{
|
|
|
|
/** @var ?ContextStamp $stamp */
|
|
|
|
$stamp = $envelope->last(ContextStamp::class);
|
|
|
|
if (!$stamp || !$stamp->getUseLock()) {
|
|
|
|
return $stack->next()->handle($envelope, $stack);
|
|
|
|
}
|
|
|
|
|
|
|
|
$receivedStamp = $envelope->last(ReceivedStamp::class);
|
|
|
|
// Если воркер обработал сообщение, то снимаем lock. Даже если код упал.
|
|
|
|
if ($receivedStamp) {
|
|
|
|
try {
|
|
|
|
return $stack->next()->handle($envelope, $stack);
|
|
|
|
} finally {
|
|
|
|
$this->client->unlock($stamp->getLockKey());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Если отправка залочена, то прекращаем обработку
|
|
|
|
if (!$this->client->lock($stamp->getLockKey())) {
|
|
|
|
return $envelope;
|
|
|
|
}
|
|
|
|
|
|
|
|
return $stack->next()->handle($envelope, $stack);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
`}
|
|
|
|
language={`php`}
|
|
|
|
showLineNumbers={false}
|
|
|
|
startingLineNumber={1}
|
|
|
|
theme={github}
|
|
|
|
/>
|
|
|
|
</div>
|
|
|
|
<p>В начале middeware есть проверка на наличие нашего штампа. Если его нет, то обработчик пропускает и запускается следующий элемент цепочки.</p>
|
|
|
|
<p>
|
|
|
|
Далее мы проверяем наличие ReceivedStamp.
|
|
|
|
Если его нет, значит мы находимся на этапе отправки.
|
|
|
|
Мы пробуем поставить блокировку в редисе и если нам удалось, то идем дальше, иначе останавливаем обработку сообщения.
|
|
|
|
Если же ReceivedStamp есть, то значит мы в консамере. Здесь нам нужно прокинуть сообщение дальше по цепочке и после обработки снимаем блокировку. Даже в случае ошибки. Чтобы не заблочить выполнение других сообщений.
|
|
|
|
</p>
|
|
|
|
</section>
|
|
|
|
<section>
|
|
|
|
<h2>Получаем сообщение</h2>
|
|
|
|
<p>Для того чтобы получить сообщение из очереди и обработать его, нужно выполнить команду</p>
|
|
|
|
<div className={styles.code}>
|
|
|
|
<CopyBlock
|
|
|
|
text={`bin/console messenger:consume async`}
|
|
|
|
language={`bash`}
|
|
|
|
showLineNumbers={false}
|
|
|
|
startingLineNumber={1}
|
|
|
|
theme={github}
|
|
|
|
/>
|
|
|
|
</div>
|
|
|
|
<p>Команда будет работать в режиме демона. То есть работать в бесконечном цикле, засыпая на каждой итерации.</p>
|
|
|
|
<p>На каждой итерации messenger будет проходиться по всем сконфигурированным транспортам, которые должны обработаться командой.</p>
|
|
|
|
<p>В каждом транспорте компонент в цикле проходит все сконфигурированные очереди. То есть читает сообщения из первой очереди. Как только все сообщения в первой очереди обработаны, переходим к следующей очереди и т. д. В бесконечном цикле читаем сообщения из очередей по кругу.</p>
|
|
|
|
<p>Для локальной разработки чтобы при переходе с ветки на ветку код обновлялся в демоне было принято решение добавить опцию <b>--time-limit=10</b> в команду консамера, чтобы каждые 10 секунд демон перезагружался</p>
|
|
|
|
</section>
|
|
|
|
<section>
|
|
|
|
<h2>Наша реализация</h2>
|
|
|
|
<p>Для более удобного использования было принято решения сделать сервис для работы с messenger.</p>
|
|
|
|
<div className={styles.code}>
|
|
|
|
<CopyBlock
|
|
|
|
text={`
|
|
|
|
<?php
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
namespace SVEAK\\MessengerBundle\\Service;
|
|
|
|
|
|
|
|
use SVEAK\\MessengerBundle\\Stamp\\ContextStamp;
|
|
|
|
use Symfony\\Component\\Messenger\\Envelope;
|
|
|
|
use Symfony\\Component\\Messenger\\MessageBusInterface;
|
|
|
|
|
|
|
|
class Bus
|
|
|
|
{
|
|
|
|
private ?ContextStamp $context = null;
|
|
|
|
|
|
|
|
public function __construct(
|
|
|
|
private MessageBusInterface $bus,
|
|
|
|
) {
|
|
|
|
}
|
|
|
|
|
|
|
|
public function send(): void
|
|
|
|
{
|
|
|
|
$context = $this->grabContextStamp();
|
|
|
|
$data = $context->popMessage();
|
|
|
|
if (!$data) {
|
|
|
|
throw new \\InvalidArgumentException('Message not found!!!');
|
|
|
|
}
|
|
|
|
$envelope = Envelope::wrap($data);
|
|
|
|
$envelope = $envelope->with($context);
|
|
|
|
|
|
|
|
$this->bus->dispatch($envelope);
|
|
|
|
$this->context = null;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function key(string $key): self
|
|
|
|
{
|
|
|
|
$context = $this->grabContextStamp();
|
|
|
|
$context->setKey($key);
|
|
|
|
return $this;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function ttl(int $ttl): self
|
|
|
|
{
|
|
|
|
$context = $this->grabContextStamp();
|
|
|
|
$context->setTtl($ttl);
|
|
|
|
return $this;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function lock(): self
|
|
|
|
{
|
|
|
|
$context = $this->grabContextStamp();
|
|
|
|
$context->setUseLock(true);
|
|
|
|
return $this;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function parallel(): self
|
|
|
|
{
|
|
|
|
$context = $this->grabContextStamp();
|
|
|
|
$context->setParallel(true);
|
|
|
|
return $this;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function ifNewer(int $timestamp): self
|
|
|
|
{
|
|
|
|
$context = $this->grabContextStamp();
|
|
|
|
$context->setTimestamp($timestamp);
|
|
|
|
return $this;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function resolveOrderConflict(): self
|
|
|
|
{
|
|
|
|
$context = $this->grabContextStamp();
|
|
|
|
$context->setNeedResolveOrderConflict(true);
|
|
|
|
return $this;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function addMessage(object $message): self
|
|
|
|
{
|
|
|
|
$context = $this->grabContextStamp();
|
|
|
|
$context->addMessage($message);
|
|
|
|
return $this;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function setAfterParallelMessage(object $message): self
|
|
|
|
{
|
|
|
|
$context = $this->grabContextStamp();
|
|
|
|
$context->setAfterParallelMessage($message);
|
|
|
|
return $this;
|
|
|
|
}
|
|
|
|
|
|
|
|
private function grabContextStamp(): ContextStamp
|
|
|
|
{
|
|
|
|
return $this->context = ($this->context ?? new ContextStamp());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
`}
|
|
|
|
language={`php`}
|
|
|
|
showLineNumbers={false}
|
|
|
|
startingLineNumber={1}
|
|
|
|
theme={github}
|
|
|
|
/>
|
|
|
|
</div>
|
|
|
|
<p>Он реализован по паттерну builder, где все состояние хранится в объекте контексте, который всегда очищается после вызова метода send</p>
|
|
|
|
<p>В сервисе заложен набор фич с реализацией определенных кейсов работы с очередями, которые неоднократно применялись на проектах.</p>
|
|
|
|
<p>Кейсы:</p>
|
|
|
|
<h3>Задача должна выполнить 1 раз. Следующая может начаться только если 1 закончилась.</h3>
|
|
|
|
<p>В данном случае ставится лок в редис. Если лок есть, то новое сообщение игнорируется</p>
|
|
|
|
<div className={styles.code}>
|
|
|
|
<CopyBlock
|
|
|
|
text={`
|
|
|
|
$this->superBus
|
|
|
|
->key('user_1') // Ключ запроса
|
|
|
|
->lock() // Флаг блокировки. Если стоит, то применится механизм. Ключ будет построен на основе key
|
|
|
|
->addMessage(new Test('asdf'))
|
|
|
|
->send();
|
|
|
|
`}
|
|
|
|
language={`php`}
|
|
|
|
showLineNumbers={false}
|
|
|
|
startingLineNumber={1}
|
|
|
|
theme={github}
|
|
|
|
/>
|
|
|
|
</div>
|
|
|
|
<h3>Нужно игнорировать сообщения дата отправки которых меньше, чем те которые уже получили</h3>
|
|
|
|
<p>В данном случае в редис пишется метка времени последнего отправленного сообщения
|
|
|
|
Следующее сообщение будет сравниваться с установленным значением. Если значение меньше, сообщение игнорится.
|
|
|
|
Если больше, то отправляем и ставим новое значение в редис.</p>
|
|
|
|
<div className={styles.code}>
|
|
|
|
<CopyBlock
|
|
|
|
text={`
|
|
|
|
$this->superBus
|
|
|
|
->key('user_1')
|
|
|
|
->ifNewer(121)
|
|
|
|
->addMessage(new Test('asdf'))
|
|
|
|
->send(); // Отправится, так как первое
|
|
|
|
$this->superBus
|
|
|
|
->key('user_1')
|
|
|
|
->ifNewer(120)
|
|
|
|
->addMessage(new Test('asdf'))
|
|
|
|
->send(); // Будет проигнорировано
|
|
|
|
$this->superBus
|
|
|
|
->key('user_1')
|
|
|
|
->ifNewer(122)
|
|
|
|
->addMessage(new Test('asdf'))
|
|
|
|
->send(); // Отправится, так как оно более свежее
|
|
|
|
`}
|
|
|
|
language={`php`}
|
|
|
|
showLineNumbers={false}
|
|
|
|
startingLineNumber={1}
|
|
|
|
theme={github}
|
|
|
|
/>
|
|
|
|
</div>
|
|
|
|
<h3>Отправка сообщения, которое должно обработаться через 23 секунды</h3>
|
|
|
|
<p>В данном случае добавляется stamp с delay меткой. Messenger сам поймет что сообщение ttl. Создаст новую ttl ветку и отправит туда сообщение.</p>
|
|
|
|
<div className={styles.code}>
|
|
|
|
<CopyBlock
|
|
|
|
text={`
|
|
|
|
$this->superBus
|
|
|
|
->ttl(23)
|
|
|
|
->addMessage(new Test('asdf'))
|
|
|
|
->send();
|
|
|
|
`}
|
|
|
|
language={`php`}
|
|
|
|
showLineNumbers={false}
|
|
|
|
startingLineNumber={1}
|
|
|
|
theme={github}
|
|
|
|
/>
|
|
|
|
</div>
|
|
|
|
<h3>Гарантирование порядка выполнения сообщений</h3>
|
|
|
|
<p>
|
|
|
|
1. 1 очередь и много воркеров. 2 сообщение может обработаться быстрее более быстрым воркером чем первое
|
|
|
|
</p>
|
|
|
|
<p>
|
|
|
|
2. Сообщения в разных очередях и разные воркеры. Когда есть 2 зависимые задачи. Например, в одну очередь отправляем сообщение о добавлении продукта, а в другую о добавлении настроек продуктов.
|
|
|
|
</p>
|
|
|
|
<p>
|
|
|
|
В данном случае в редисе ставится лок на отправку сообщений.
|
|
|
|
Если ключ уже залочен, то бедет создана лок очередь и сообщения будут отпарвлены туда.
|
|
|
|
Как только базовое сообщение обработается, воркер заберет остальные сообщения из лок очереди и выполнит их в этом же потоке.
|
|
|
|
Если базовое сообщение не успеет обработаться в течение часа, сообщения будет перенаправлено в sync очередь, где обработаются синхронно 1 воркером</p>
|
|
|
|
<div className={styles.code}>
|
|
|
|
<CopyBlock
|
|
|
|
text={`
|
|
|
|
$this->superBus
|
|
|
|
->key('product_1')
|
|
|
|
->resolveOrderConflict()
|
|
|
|
->addMessage(new Product('111'))
|
|
|
|
->send();
|
|
|
|
$this->superBus
|
|
|
|
->key('product_1')
|
|
|
|
->resolveOrderConflict()
|
|
|
|
->addMessage(new ProductSettings('111'))
|
|
|
|
->send();
|
|
|
|
`}
|
|
|
|
language={`php`}
|
|
|
|
showLineNumbers={false}
|
|
|
|
startingLineNumber={1}
|
|
|
|
theme={github}
|
|
|
|
/>
|
|
|
|
</div>
|
|
|
|
<h3>Цепочка задач. Когда нужно гарантировать порядок выполнения задач и все задачи заранее известны.</h3>
|
|
|
|
<p>В данном случае в rabbit улетят все сообщения разом и обработаются в такой же последовательности</p>
|
|
|
|
<div className={styles.code}>
|
|
|
|
<CopyBlock
|
|
|
|
text={`
|
|
|
|
$this->superBus
|
|
|
|
->addMessage(new Product('111'))
|
|
|
|
->addMessage(new Product('222'))
|
|
|
|
->addMessage(new Product('333'))
|
|
|
|
->addMessage(new Product('444'))
|
|
|
|
->send();
|
|
|
|
`}
|
|
|
|
language={`php`}
|
|
|
|
showLineNumbers={false}
|
|
|
|
startingLineNumber={1}
|
|
|
|
theme={github}
|
|
|
|
/>
|
|
|
|
</div>
|
|
|
|
<h3>Запустить задачи параллельно</h3>
|
|
|
|
<p>В данном случае в rabbit улетят все сообщения отдельно. Последовательность обработки не гарантирована</p>
|
|
|
|
<div className={styles.code}>
|
|
|
|
<CopyBlock
|
|
|
|
text={`
|
|
|
|
$this->superBus
|
|
|
|
->parallel()
|
|
|
|
->addMessage(new Product('111'))
|
|
|
|
->addMessage(new Product('222'))
|
|
|
|
->addMessage(new Product('333'))
|
|
|
|
->addMessage(new Product('444'))
|
|
|
|
->send();
|
|
|
|
`}
|
|
|
|
language={`php`}
|
|
|
|
showLineNumbers={false}
|
|
|
|
startingLineNumber={1}
|
|
|
|
theme={github}
|
|
|
|
/>
|
|
|
|
</div>
|
|
|
|
<h3>Выполнить задачу после выполнения ряда параллельных задач</h3>
|
|
|
|
<p>В данном случае в rabbit улетят все параллельные сообщения отдельно. Последовательность обработки не гарантирована.
|
|
|
|
После того как все сообщения выполнятся будет снята блокировка в редисе и выполнится последнее сообщение</p>
|
|
|
|
<div className={styles.code}>
|
|
|
|
<CopyBlock
|
|
|
|
text={`
|
|
|
|
$this->superBus
|
|
|
|
->key('user_2')
|
|
|
|
->parallel()
|
|
|
|
->addMessage(new Test('111'))
|
|
|
|
->addMessage(new Test('222'))
|
|
|
|
->addMessage(new Test('333'))
|
|
|
|
->addMessage(new Test('444'))
|
|
|
|
->setAfterParallelMessage(new Test('555'))
|
|
|
|
->send();
|
|
|
|
`}
|
|
|
|
language={`php`}
|
|
|
|
showLineNumbers={false}
|
|
|
|
startingLineNumber={1}
|
|
|
|
theme={github}
|
|
|
|
/>
|
|
|
|
</div>
|
|
|
|
</section>
|
|
|
|
</main>
|
|
|
|
<footer className={styles.footer}>
|
|
|
|
Powered by <b>Rinsvent</b>
|
|
|
|
</footer>
|
|
|
|
</div>
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
export default Home
|