定制 byjg/message-queue-client 二次开发

按需修改功能、优化性能、对接业务系统,提供一站式技术支持

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

byjg/message-queue-client

最新稳定版本:6.0.0

Composer 安装命令:

composer require byjg/message-queue-client

包简介

A minimal PHP foundation for building message queue drivers. Features low-code publishing and consumption, decoupled components (Messages, Queues, and Connectors), and easy connector implementation.

README 文档

README

Sponsor Build Status Opensource ByJG GitHub source GitHub license GitHub release

A minimal PHP foundation for building message queue drivers. Features low-code publishing and consumption, decoupled components (Messages, Queues, and Connectors), and easy connector implementation.

Features

  • Low code to publish and consume messages
  • Messages, Queues and Connector objects are decoupled
  • Easy to implement new connectors
┌─────────────────┐                  ┌────────────────────────┐
│                 │                  │  Envelope              │
│                 │                  │                        │
│                 │                  │                        │
│                 │                  │   ┌─────────────────┐  │
│                 │   publish()      │   │      Pipe       │  │
│                 ├─────────────────▶│   └─────────────────┘  │
│                 │                  │   ┌─────────────────┐  │
│                 │                  │   │     Message     │  │
│                 │                  │   └─────────────────┘  │
│                 │                  │                        │
│                 │                  └────────────────────────┘
│    Connector    │
│                 │
│                 │
│                 │       consume()     ┌─────────────────┐
│                 │◀────────────────────│      Pipe       │
│                 │                     └─────────────────┘
│                 │
│                 │
│                 │
└─────────────────┘

Code Structure

Component Description Location
Message Represents a message payload with properties \ByJG\MessageQueueClient\Message
Pipe Abstraction representing a queue or topic destination with optional properties \ByJG\MessageQueueClient\Connector\Pipe
Envelope Combines a Message with its destination Pipe \ByJG\MessageQueueClient\Envelope
ConnectorInterface Interface for message queue implementations \ByJG\MessageQueueClient\Connector\ConnectorInterface
ConnectorFactory Factory for creating connector instances \ByJG\MessageQueueClient\Connector\ConnectorFactory
ConsumerClientTrait Helper for implementing consumer clients \ByJG\MessageQueueClient\ConsumerClientTrait
ConsumerClientInterface Interface for consumer client implementations \ByJG\MessageQueueClient\ConsumerClientInterface

Implemented Connectors

Connector URL / Documentation Composer Package
Mock docs/mock-connector.md -
RabbitMQ https://github.com/byjg/rabbitmq-client byjg/rabbitmq-client
Redis https://github.com/byjg/redis-queue-client byjg/redis-queue-client

Usage

Publish

<?php
// Register the connector and associate with a scheme
use ByJG\MessageQueueClient\Connector\ConnectorFactory;
use ByJG\MessageQueueClient\Connector\Pipe;
use ByJG\MessageQueueClient\Envelope;
use ByJG\MessageQueueClient\Message;
use ByJG\MessageQueueClient\MockConnector;
use ByJG\Util\Uri;

ConnectorFactory::registerConnector(MockConnector::class);

// Create a connector
$connector = ConnectorFactory::create(new Uri("mock://local"));

// Create a queue
$pipe = new Pipe("test");
$pipe->withDeadLetter(new Pipe("dlq_test"));

// Create a message
$message = new Message("Hello World");

// Publish the message into the queue
$connector->publish(new Envelope($pipe, $message));

Consume

<?php
// Register the connector and associate with a scheme
use ByJG\MessageQueueClient\Connector\ConnectorFactory;
use ByJG\MessageQueueClient\Connector\Pipe;
use ByJG\MessageQueueClient\Envelope;
use ByJG\MessageQueueClient\Message;
use ByJG\MessageQueueClient\MockConnector;
use ByJG\Util\Uri;

ConnectorFactory::registerConnector(MockConnector::class);

// Create a connector
$connector = ConnectorFactory::create(new Uri("mock://local"));

// Create a queue
$pipe = new Pipe("test");
$pipe->withDeadLetter(new Pipe("dlq_test"));

// Connect to the queue and wait to consume the message
$connector->consume(
    $pipe,                                 // Queue name
    function (Envelope $envelope) {         // Callback function to process the message
        echo "Process the message";
        echo $envelope->getMessage()->getBody();
        return Message::ACK;
    },
    function (Envelope $envelope, $ex) {    // Callback function to process the failed message
        echo "Process the failed message";
        echo $ex->getMessage();
        return Message::REQUEUE;
    }
);

The consume method will wait for a message and call the callback function to process the message. If there is no message in the queue, the method will wait until a message arrives.

If you want to exit the consume method, just return Message::ACK | Message::EXIT from the callback function.

Possible return values from the callback function:

  • Message::ACK - Acknowledge the message and remove from the queue
  • Message::NACK - Not acknowledge the message and remove from the queue. If the queue has a dead letter queue, the message will be sent to the dead letter queue.
  • Message::REQUEUE - Requeue the message
  • Message::EXIT - Exit the consume method

Consumer Client

You can simplify the consume method by using the ConsumerClientTrait. See more details in the docs/consumer-client-trait.md.

Connectors

The connectors are the classes responsible to connect to the message queue server and send/receive messages.

All connector have the following interface:

<?php
interface ConnectorInterface
{
    public static function schema(): array;

    public function setUp(Uri $uri): void;

    public function getDriver(): mixed;

    public function publish(Envelope $envelope): void;

    public function consume(Pipe $pipe, \Closure $onReceive, \Closure $onError, ?string $identification = null): void;
}

There is no necessary call the method getDriver() because the method publish() and consume() will call it automatically. Use the method getDriver() only if you need to access the connection directly.

Documentation

Core Components

  • Pipe Class - Represents a message queue or topic
  • Message Class - Represents a message that can be published or consumed
  • Envelope Class - Encapsulates a message with its destination pipe

Connectors

Helpers

Dependencies

flowchart TD
    byjg/message-queue-client --> byjg/uri
Loading

Open source ByJG

统计信息

  • 总下载量: 8.59k
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 2
  • 点击次数: 2
  • 依赖项目数: 3
  • 推荐数: 0

GitHub 信息

  • Stars: 2
  • Watchers: 1
  • Forks: 1
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2023-05-04