mix8872/rabbitmq-rpc 问题修复 & 功能扩展

解决BUG、新增功能、兼容多环境部署,快速响应你的开发需求

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

mix8872/rabbitmq-rpc

最新稳定版本:v1.1.2

Composer 安装命令:

composer require mix8872/rabbitmq-rpc

包简介

Json RPC implementation over rabbitmq

README 文档

README

Способ позволяет выполнять функции с аргументами на удаленном сервисе (или группе сервисов).

Зависимости:

  • php 8.*
  • php-amqplib
  • needle-project/laravel-rabbitmq,
    используется форк git@github.com:mix8872/laravel-rabbitmq.git в котором сняты ограничения на максимальное время работы и количество запросов

Принцип работы

Для выполнения процедур используется json формат сообщения rmq, который содержит следующие поля:

  • 'request_id' => 'string|required' | ID запроса - добавляется автоматически
  • 'reply_to' => 'string|required' | имя маршрута rmq для ответа - добавляется автоматически из config('app.name')
    на этот маршрут должна быть подписана очередь, которую слушает отправитель запроса, чтоб получить ответ или уведомление об ошибке
  • 'action' => 'string|required_without:error|regex:/[a-z]+.[a-z]+/ui' | формат поля action: 'псевдоним_класса.метод'
    псевдоним_класса - ключ массива в конфиге laravel_rabbitmq.rpc
  • 'attributes' => 'array' | массив атрибутов [ключ => значение]
  • 'error' => 'string|required_without:action' | ответ ошибкой на предыдущий запрос
  • 'reply_for' => 'string|required_with:error' | ИД предыдущего запроса, обязателен, если отвечаем ошибкой

Подписчик RMQ получает сообщение, по полю action ищет нужный метод и выполняет его, подставляя в него аргументы
из поля attributes.

Все сообщения в RabbitMQ шифруются через Crypt::encryptString, таким образом во всех связанных сервисах должен быть указан один и тот же APP_KEY.

Конфигурирование

В разделе queues задать все необходимые очереди, например:

...
'queues' => [
        '<имя_сервиса>' => [
            'connection' => 'grch',
            'name' => '<имя_сервиса>',
            'attributes' => [
                'durable' => true,
                'bind' => [
                    ['exchange' => 'GRCHExchanger', 'routing_key' => 'projectsData'], // подписываем очередь на обновления проектов
                    ['exchange' => 'GRCHExchanger', 'routing_key' => 'usersData'], // подписываем очередь на обновления пользователей
                    ['exchange' => 'GRCHExchanger', 'routing_key' => \config('app.name')], // собственная очередь сервиса
                ],
            ],
        ],
    ],
...

В разделе consumers указать:

...
'consumers' => [
        '<имя_сервиса>' => [
            'queue' => '<имя_сервиса>',
            'message_processor' => RMQRpcProcessor::class,
        ],
    ],
...

Добавить в конфиг laravel_rabbitmq блок rpc -> processors, в котором задать массив обработчиков в формате
<псевдоним> => <класс>, например:

...
'rpc' => [
        'processors' => [
            'users' => UsersProcessor::class,
            'projects' => ProjectsProcessor::class,
        ],
    ],
...

Использование

Создаем обработчики

Создаем классы обработчиков и заполняем их в конфиг.
Каждому обработчику должен быть назначен уникальный псевдоним, который будет использоваться для вызова процедур.

Класс обработчика должен содержать публичные методы.
Каждый публичный метод может быть вызван через rpc.
Допускается использование статических методов.
Если запрошенный метод является статическим - он будет выполнен напрямую, в качестве его аргументов будет распакован массив attributes из входящего запроса, последним аргументом будет передан весь массив данных входящего запроса в виде именованного параметра $arData, иначе будет создан экземпляр класса и выполнен запрошенный метод у объекта. В конструктор класса будет передан весь массив данных входящего запроса в виде именованного параметра $arData.

Таким образом статический метод обработчика или конструктор класса обработчика (при его наличии) должны содержать обязательный аргумент
array $arData.

Отправка сообщения

Выполнение процедуры

Например, отправка команды на создание проекта:

RMQRpcPublisher::make()
    ->action('projects.create')
    ->attributes([
        'project' => $project->toArray(),
    ])
    ->publish('projectsData');

где:

  • 'projects.create' - экшн, projects - псевдоним класса, create - метод класса
  • 'project' - аргумент для передачи в метод класса
  • 'projectsData' - routing key, для отправки в очереди, подписанные на этот маршрут

Ответ с ошибкой

RMQRpcPublisher::make()
    ->error(<текст_ошибки>)
    ->replyFor(<request_id_из_предыдущего_запроса>)
    ->publish(<reply_to_из_предыдущего_запроса>);

Запускаем слушатель rmq:

php artisan rabbitmq:consume <имя_сервиса> --time=-1 --messages=-1, где:

  • --time=-1 - без ограничения по времени
  • --messages=-1 - без ограничения по количеству сообщений

统计信息

  • 总下载量: 47
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 0
  • 点击次数: 0
  • 依赖项目数: 0
  • 推荐数: 0

GitHub 信息

  • Stars: 0
  • Watchers: 2
  • Forks: 0
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2025-04-07