定制 airygen/laravel-amqp-producer 二次开发

按需修改功能、优化性能、对接业务系统,提供一站式技术支持

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

airygen/laravel-amqp-producer

最新稳定版本:1.0.2

Composer 安装命令:

composer require airygen/laravel-amqp-producer

包简介

RabbitMQ publisher for Laravel with confirms, retries, DLX strategy.

README 文档

README

Laravel-oriented RabbitMQ publisher with:

  • Publisher confirms
  • Mandatory publish (detect unroutable messages)
  • Exponential retry for transient AMQP errors
  • Structured automatic headers: request_id, source, env, ISO8601 datetime
  • Multi-connection support (choose connection per payload)

This library focuses on publishing only. For queue workers / consumers you can use: vyuldashev/laravel-queue-rabbitmq.

Installation

composer require airygen/laravel-amqp-producer

Publish Configuration

php artisan vendor:publish --provider="Airygen\\RabbitMQ\\RabbitMQServiceProvider" --tag=config

Generated file: config/amqp.php

return [
    'retry' => [
        'base_delay' => 0.2,
        'max_delay' => 1.5,
        'jitter' => false, // set true to randomize backoff (helps avoid thundering herd)
    ],
    'connections' => [
        'default' => [
            'host' => env('AMQP_HOST', '127.0.0.1'),
            'port' => (int) env('AMQP_PORT', 5672),
            'user' => env('AMQP_USER', 'guest'),
            'password' => env('AMQP_PASSWORD', 'guest'),
            'vhost' => env('AMQP_VHOST', '/'),
            'options' => [
                'lazy' => true,
                'keepalive' => true,
                'heartbeat' => 60,
                // Channel reuse removed: each publish opens/closes channel and connection
            ],
        ],
        // Add more named connections if needed
        // 'analytics' => [ ... ],
    ],
];

Defining a Payload

Extend ProducerPayload and (optionally) override connection / exchange / routing key.

use Airygen\RabbitMQ\ProducerPayload;

final class MemberCreatedPayload extends ProducerPayload
{
    protected string $connectionName = 'default';              // optional (defaults to 'default')
    protected ?string $exchangeName = 'ex.members';            // required if you publish to a non-empty exchange
    protected ?string $routingKey = 'member.created';          // required for direct/topic exchanges
}

Basic Publish

use Airygen\RabbitMQ\Publisher;

$publisher = app(Publisher::class);
$publisher->publish(new MemberCreatedPayload(['id' => 123]));

Custom Headers

$publisher->publish(
    new MemberCreatedPayload(['id' => 123]),
    header: ['foo' => 'bar']
);

Batch Publish

$payloads = [
    new MemberCreatedPayload(['id' => 1]),
    new MemberCreatedPayload(['id' => 2]),
];

$publisher->batchPublish($payloads);

Retry Strategy

The publisher retries transient AMQP IO / protocol errors (IO / protocol channel exceptions) with exponential backoff:

  • Initial delay base_delay (~200ms), doubled each attempt, capped at max_delay (~1500ms)
  • Default attempts: 3

Custom rule:

$publisher->publish(
    new MemberCreatedPayload(['id' => 1]),
    retryTimes: 5,
    when: function (Throwable $e): bool {
        return $e instanceof PhpAmqpLib\Exception\AMQPIOException
            || str_contains($e->getMessage(), 'timeout');
    }
);

Disable retry: Enable jitter:

config(['amqp.retry.jitter' => true]);
$publisher->publish($payload);

Jitter multiplies each delay by a random factor ~0.85 - 1.15.

$publisher->publish(
    new MemberCreatedPayload(['id' => 1]),
    retryTimes: 1,
    when: fn() => false
);

Multi-Connection Example

final class AnalyticsEventPayload extends ProducerPayload
{
    protected string $connectionName = 'analytics';
    protected ?string $exchangeName = 'ex.analytics';
    protected ?string $routingKey = 'event.ingest';
}

$publisher->publish(new AnalyticsEventPayload(['type' => 'login']));

Automatic Headers Added

MessageFactory injects:

  • request_id (existing X-Request-Id header or a new UUID)
  • source (Laravel app name)
  • env (current environment)
  • datetime (ISO8601)

You can still provide additional custom headers; your keys override defaults if duplicated.

Header precedence: custom headers provided in publish() / batchPublish() override automatically injected keys when the same key exists.

Metrics & Stats

The package ships with an in-memory static counter registry Stats intended for lightweight instrumentation or exporting into your own monitoring system.

Global counters:

  • publish_attempts
  • publish_retries
  • publish_failures
  • connection_resets

Per‑connection counters (nested under per_connection[connection_name]):

  • publish_attempts
  • publish_retries
  • publish_failures
  • connection_resets

Example snapshot:

use Airygen\RabbitMQ\Support\Stats;
$snapshot = Stats::snapshot();
// [ 'publish_attempts' => 10, 'per_connection' => ['default' => ['publish_attempts' => 7]] ]

You can reset counters (e.g. at the start of a test or scheduled export) with:

Stats::reset();

For production telemetry, consider periodically reading the snapshot and pushing to Prometheus / OpenTelemetry.*

Note: These counters are process‑local (not shared across workers). If you run Octane/Swoole multi-worker, aggregate externally.

Behavior: Always Open/Close

For operational safety across PHP-FPM/CLI and worker runtimes (Octane/Swoole/RoadRunner), this package always opens a fresh channel for each publish and closes both channel and connection afterwards.

Octane / Swoole / RoadRunner

Health Check Command

Run a simple connectivity probe:

php artisan rabbitmq:ping            # test all configured connections
php artisan rabbitmq:ping secondary  # test a specific connection

Exit code is non‑zero on failure (suitable for container readiness / liveness probes). Long-lived worker environments reuse PHP processes, so you must ensure stale connections/channels don't leak across deploys or forks.

Built-in safeguards:

  • On worker start/stop (Octane events) the connection manager reset() is invoked (if Octane is installed).
  • Connections are opened and closed per publish; you can call ConnectionManager::reset() manually if desired.

Recommended practices:

  1. Avoid holding a Publisher instance in static singletons you construct before workers fork.
  2. Call app(ProducerInterface::class) per request/job (container will reuse safe singleton manager underneath).
  3. If you rotate workers periodically, no extra action is needed—the hook already clears state.
  4. For Swoole without Octane events, you can manually schedule: ConnectionManager::reset() during your custom lifecycle hooks.

Optional manual reset example:

// e.g. in a scheduled task or health hook
app(\Airygen\RabbitMQ\Support\ConnectionManager::class)->reset();

TLS / SSL (Optional)

If you need TLS encryption, enable and configure the SSL related options inside config/amqp.php:

    'connections' => [
        'default' => [
            // ... host, port, user, password, vhost
            'options' => [
                'ssl' => true,
                'cafile' => base_path('certs/ca.pem'),
                'local_cert' => base_path('certs/client.pem'),
                'local_pk' => base_path('certs/client.key'),
                'verify_peer' => true,
                // 'passphrase' => env('AMQP_CERT_PASSPHRASE'),
            ],
        ],
    ],

The factory will build a stream context when ssl is truthy and any of the certificate fields are present. If verify_peer is enabled, ensure cafile is supplied.

Roadmap

  • Pluggable metrics exporter interface
  • Circuit breaker / total backoff budget
  • Async publisher confirm pipeline
  • Mandatory publish return callbacks (unroutable detection)
  • Dead letter / delayed publish helpers

Development

Dockerized Workflow

All commands are wrapped to run inside the php service defined in docker-compose.yml.

Startup & install:

docker compose up -d rabbitmq
docker compose build php
docker compose run --rm php composer install

Using Makefile targets:

make unit          # run unit tests
make integration   # run integration tests (requires rabbitmq service up)
make test          # unit + integration
make coverage      # generates coverage/html & coverage/clover.xml
make lint          # code style check
make analyse       # phpstan static analysis
make fix           # auto-fix style

Manual (host) without Docker wrapper:

php -d xdebug.mode=off vendor/bin/phpunit --testsuite Unit
INTEGRATION_TESTS=1 php -d xdebug.mode=off vendor/bin/phpunit --testsuite Integration

Management UI: http://localhost:15672 (guest / guest)

If you prefer to run host-native (without docker) use the host:* scripts or call PHPUnit directly.

Prometheus Metrics (Skeleton)

The package keeps lightweight in-memory counters (not persisted). A minimal Prometheus text exporter is provided.

Artisan command:

php artisan rabbitmq:metrics            # full HELP/TYPE + samples
php artisan rabbitmq:metrics --raw      # only metric lines

Example output:

# HELP rabbitmq_publish_attempts_total Total publish attempts (before confirm).
# TYPE rabbitmq_publish_attempts_total counter
rabbitmq_publish_attempts_total 42
... (other metrics)

Per-connection metrics are emitted with a connection label, e.g.:

rabbitmq_connection_publish_attempts_total{connection="primary"} 10

You can bind your own implementation of Airygen\\RabbitMQ\\Contracts\\MetricsExporterInterface if you need richer aggregation or to integrate with an existing metrics system.

License

MIT

统计信息

  • 总下载量: 5
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 0
  • 点击次数: 1
  • 依赖项目数: 0
  • 推荐数: 0

GitHub 信息

  • Stars: 0
  • Watchers: 0
  • Forks: 0
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2025-09-24