定制 thesis/pgmq 二次开发

按需修改功能、优化性能、对接业务系统,提供一站式技术支持

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

thesis/pgmq

最新稳定版本:0.1.4

Composer 安装命令:

composer require thesis/pgmq

包简介

A non-blocking php client for Postgres Message Queue (PGMQ).

README 文档

README

Non-blocking php client for pgmq. See the extension installation guide.

Installation

composer require thesis/pgmq

Why is almost all the API functional?

Since you most likely expect exactly-once semantics from a database-based queue, all requests — sending or processing business logic with message acknowledgments — must be transactional. And the transaction object is short-lived: it cannot be used after rollback() or commit(), so it cannot be made a dependency. That's why all the API is built on functions that take Amp\Postgres\PostgresLink as their first parameter, which can be either a transaction object or just a connection. And only the consumer accepts Amp\Postgres\PostgresConnection, because it itself opens transactions for reading and acknowledging messages transactionally.

Contents

Create queue

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');

Create unlogged queue

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createUnloggedQueue($pg, 'events');

Create partitioned queue

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createPartitionedQueue(
    pg: $pg,
    queue: 'events',
    partitionInterval: 10000,
    retentionInterval: 100000,
);

List queues

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

foreach (Pgmq\listQueues($pg) as $queue) {
    $md = $queue->metadata();
    var_dump($md);
}

List queue metrics

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

foreach (Pgmq\metrics($pg) as $metrics) {
    var_dump($metrics);
}

List queue metadata

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

foreach (Pgmq\listQueueMetadata($pg) as $md) {
    var_dump($md);
}

Drop queue

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$queue->drop();

Purge queue

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
var_dump($queue->purge());

Send message

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messageId = $queue->send(new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'));

Send message with relative delay

use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messageId = $queue->send(
    new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
    TimeSpan::fromSeconds(5),
);

Send message with absolute delay

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messageId = $queue->send(
    new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
    new \DateTimeImmutable('+5 seconds'),
);

Send batch

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messageIds = $queue->sendBatch([
    new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
    new Pgmq\SendMessage('{"id": 2}', '{"x-header": "x-value"}'),
]);

Send batch with relative delay

use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messageIds = $queue->sendBatch(
    [
        new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
        new Pgmq\SendMessage('{"id": 2}', '{"x-header": "x-value"}'),
    ],
    TimeSpan::fromSeconds(5),
);

Send batch with absolute delay

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messageIds = $queue->sendBatch(
    [
        new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
        new Pgmq\SendMessage('{"id": 2}', '{"x-header": "x-value"}'),
    ],
    new \DateTimeImmutable('+5 seconds'),
);

Read message

use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->read(TimeSpan::fromSeconds(20));

Read batch

use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->readBatch(10, TimeSpan::fromSeconds(20));

Pop message

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->pop();

Read batch with poll

use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messages = $queue->readPoll(
    batch: 10,
    maxPoll: TimeSpan::fromSeconds(5),
    pollInterval: TimeSpan::fromMilliseconds(250),
);

Set visibility timeout

use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->read();

if ($message !== null) {
    // handle the message

    $queue->setVisibilityTimeout($message->id, TimeSpan::fromSeconds(10));
}

Archive message

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->read();

if ($message !== null) {
    $queue->archive($message->id);
}

Archive batch

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messages = [...$queue->readBatch(5)];

if ($messages !== []) {
    $queue->archiveBatch(array_map(
        static fn(Pgmq\Message $message): int => $messages->id),
        $messages,
    );
}

Delete message

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->read();

if ($message !== null) {
    $queue->delete($message->id);
}

Delete batch

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messages = [...$queue->readBatch(5)];

if ($messages !== []) {
    $queue->deleteBatch(array_map(
        static fn(Pgmq\Message $message): int => $messages->id),
        $messages,
    );
}

Enable notify insert

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$channel = $queue->enableNotifyInsert(); // postgres channel to listen is returned

Disable notify insert

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$queue->disableNotifyInsert();

Consume messages

This functionality is not a standard feature of the pgmq extension, but is provided by the library as an add-on for reliable and correct processing of message batches from the queue, with the ability to ack, nack (with delay) and archive (term) messages from the queue.

  1. First of all, create the extension if it doesn't exist yet:
use Thesis\Pgmq;

Pgmq\createExtension($pg);
  1. Then create a queue:
use Thesis\Pgmq;

Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');
  1. Next, create the consumer object:
use Thesis\Pgmq;

Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');

$consumer = Pgmq\createConsumer($pg);
  1. Now we can proceed to configure the queue consumer handler:
use Thesis\Pgmq;

Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');

$consumer = Pgmq\createConsumer($pg);

$context = $consumer->consume(
    static function (array $messages, Pgmq\ConsumeController $ctrl): void {
        var_dump($messages);
        $ctrl->ack($messages);
    },
    new Pgmq\ConsumeConfig(
        queue: 'events',
    ),
);

Through Pgmq\ConsumeConfig you can configure:

  • the batch size of received messages;
  • the message visibility timeout;
  • enable monitoring for queue inserts via the LISTEN/NOTIFY mechanism;
  • and set the polling interval.

At least one of these settings — listenForInserts or pollTimeout — must be specified.

Through the Pgmq\ConsumeController, you can:

  • ack messages, causing them to be deleted from the queue;
  • nack messages with a delay, setting a visibility timeout for them;
  • terminate processing (when a message can no longer be retried), resulting in them being archived;
  • stop the consumer.

Since receiving messages and acking/nacking them occur within the same transaction, for your own database queries you must use the ConsumeController::$tx object to ensure exactly-once semantics for message processing.

use Thesis\Pgmq;

Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');

$consumer = Pgmq\createConsumer($pg);

$context = $consumer->consume(
    static function (array $messages, Pgmq\ConsumeController $ctrl): void {
        $ctrl->tx->execute('...some business logic');
        $ctrl->ack($messages);
    },
    new Pgmq\ConsumeConfig(
        queue: 'events',
    ),
);

Using ConsumeContext, you can gracefully stop the consumer, waiting for the current batch to finish processing.

use Thesis\Pgmq;
use function Amp\trapSignal;

Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');

$consumer = Pgmq\createConsumer($pg);

$context = $consumer->consume(
    static function (array $messages, Pgmq\ConsumeController $ctrl): void {
        $ctrl->tx->execute('...some business logic');
        $ctrl->ack($messages);
    },
    new Pgmq\ConsumeConfig(
        queue: 'events',
    ),
);

trapSignal([\SIGINT, \SIGTERM])

$context->stop();
$context->awaitCompletion();

Or stop all current consumers using $consumer->stop():

use Thesis\Pgmq;
use function Amp\trapSignal;

Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');

$consumer = Pgmq\createConsumer($pg);

$context = $consumer->consume(...);

trapSignal([\SIGINT, \SIGTERM])

$consumer->stop();
$context->awaitCompletion();

License

The MIT License (MIT). Please see License File for more information.

统计信息

  • 总下载量: 8
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 3
  • 点击次数: 0
  • 依赖项目数: 0
  • 推荐数: 0

GitHub 信息

  • Stars: 3
  • Watchers: 0
  • Forks: 1
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2025-11-17