mle86/wq-amqp
最新稳定版本:v1.4.1
Composer 安装命令:
composer require mle86/wq-amqp
包简介
An AMQP module for mle86/wq using the php-amqplib connector
README 文档
README
This package contains the PHP class
mle86\WQ\WorkServerAdapter\AMQPWorkServer.
It supplements the
mle86/wq package
by implementing its WorkServerAdapter interface.
It connects to an AMQP server such as RabbitMQ using the php-amqplib/php-amqplib package.
It creates durable queues in the default exchange (“”) for immediate-delivery jobs
and the durable “_phpwq._delayed” queue in the custom “_phpwq._delay_exchange” exchange for delayed jobs.
Jobs stored with storeJob() are always durable as well.
Empty queues or exchanges won't be deleted automatically.
Version and Compatibility
This is
version 1.4.0
of mle86/wq-amqp.
It was developed for
version 1.0.0
of mle86/wq
and should be compatible
with all of its future 1.x versions as well.
Installation and Dependencies
$ composer require mle86/wq-amqp
It requires PHP 8.0+.
It depends on mle86/wq and php-amqplib/php-amqplib, which in turn requires the mbstring, sockets, and bcmath PHP extensions.
Class reference
class mle86\WQ\WorkServerAdapter\AMQPWorkServer implements WorkServerAdapter
public function __construct (AMQPStreamConnection $connection)
Constructor. Takes an already-configuredAMQPStreamConnectioninstance to work with. Does not attempt to establish a connection itself – use theconnect()factory method for that instead.public static function connect ($host = 'localhost', $port = 5672, $user = 'guest', $password = 'guest', $vhost = '/', $insist = false, $login_method = 'AMQPLAIN', $login_response = null, $locale = 'en_US', $connection_timeout = 3.0, $read_write_timeout = 3.0, $context = null, $keepalive = false, $heartbeat = 0)
Factory method. See AMQPStreamConnection::__construct for the parameter descriptions.
Interface methods
which are documented in the WorkServerAdapter interface:
public function storeJob (string $workQueue, Job $job, int $delay = 0)public function getNextQueueEntry ($workQueue, int $timeout = DEFAULT_TIMEOUT): ?QueueEntrypublic function buryEntry (QueueEntry $entry)public function requeueEntry (QueueEntry $entry, int $delay, string $workQueue = null)public function deleteEntry (QueueEntry $entry)
Usage example
<?php use mle86\WQ\WorkServerAdapter\AMQPWorkServer; use mle86\WQ\WorkProcessor; use mle86\WQ\Job\Job; $processor = new WorkProcessor( AMQPWorkServer::connect("localhost") ); while (true) { $processor->processNextJob("mail", function(Job $job) { $job->...; }); }
This executes all jobs available in the local AMQP server's “mail” queue, forever.
It will however abort if one of the jobs throws an exception –
you might want to add a logging try-catch block around the processNextJob() call
as shown in WQ's “Quick Start” example.
统计信息
- 总下载量: 1.27k
- 月度下载量: 0
- 日度下载量: 0
- 收藏数: 0
- 点击次数: 0
- 依赖项目数: 0
- 推荐数: 0
其他信息
- 授权协议: MIT
- 更新时间: 2020-04-22