gensart-x/bullmq-php
最新稳定版本:1.0.0
Composer 安装命令:
composer require gensart-x/bullmq-php
包简介
BullMQ Queue client for PHP - Schedule jobs to be processed by BullMQ workers
README 文档
README
BullMQ PHP
A PHP client library for BullMQ, the modern queue system for Node.js.
This library allows you to add jobs to a BullMQ queue from your PHP application. The jobs can then be processed by workers written in Node.js, Python, or Elixir.
Requirements
- PHP 8.1 or higher
- Redis 5.0 or higher (6.2+ recommended)
- Composer
Installation
This package is distributed directly from the BullMQ monorepo. Add the repository to your composer.json and require the package:
{
"repositories": [
{
"type": "vcs",
"url": "https://github.com/taskforcesh/bullmq"
}
],
"require": {
"taskforcesh/bullmq-php": "dev-master"
},
"minimum-stability": "dev",
"prefer-stable": true
}
Then run:
composer install
Or add it to an existing project:
composer config repositories.bullmq vcs https://github.com/taskforcesh/bullmq composer require taskforcesh/bullmq-php:dev-master
Note: Stable releases are tagged with the format
vphp{version}(e.g.,vphp1.0.0). Check the releases page for available versions.
Quick Start
Creating a Queue
use BullMQ\Queue; // Create a queue with default connection (localhost:6379) $queue = new Queue('my-queue'); // Or with custom Redis connection $queue = new Queue('my-queue', [ 'connection' => [ 'host' => 'redis.example.com', 'port' => 6379, 'password' => 'your-password', ], ]);
Adding Jobs
use BullMQ\Queue; $queue = new Queue('email-queue'); // Add a simple job $job = $queue->add('send-email', [ 'to' => 'user@example.com', 'subject' => 'Welcome!', 'body' => 'Thanks for signing up.', ]); echo "Job added with ID: " . $job->id . "\n";
Job Options
// Delayed job (delay in milliseconds) $job = $queue->add('reminder', $data, [ 'delay' => 60000, // Process after 60 seconds ]); // Priority job (lower number = higher priority) $job = $queue->add('urgent', $data, [ 'priority' => 1, ]); // Custom job ID $job = $queue->add('process-order', $data, [ 'jobId' => 'order-' . $orderId, ]); // Job with retry settings $job = $queue->add('flaky-operation', $data, [ 'attempts' => 3, 'backoff' => [ 'type' => 'exponential', 'delay' => 1000, ], ]); // Job with removal policy $job = $queue->add('task', $data, [ 'removeOnComplete' => true, 'removeOnFail' => 100, // Keep last 100 failed jobs ]); // LIFO (Last In, First Out) - process newest jobs first $job = $queue->add('task', $data, [ 'lifo' => true, ]); // Custom timestamp (defaults to current time) $job = $queue->add('task', $data, [ 'timestamp' => (int)(microtime(true) * 1000), ]);
Adding Multiple Jobs
$jobs = $queue->addBulk([ ['name' => 'email', 'data' => ['to' => 'user1@example.com']], ['name' => 'email', 'data' => ['to' => 'user2@example.com']], ['name' => 'email', 'data' => ['to' => 'user3@example.com']], ]);
Getting Job Information
// Get a specific job $job = $queue->getJob('job-id'); if ($job) { echo "Job name: " . $job->name . "\n"; echo "Job data: " . json_encode($job->data) . "\n"; echo "Job state: " . $queue->getJobState($job->id) . "\n"; } // Get jobs by state $waitingJobs = $queue->getWaiting(0, 10); $activeJobs = $queue->getActive(0, 10); $delayedJobs = $queue->getDelayed(0, 10); $completedJobs = $queue->getCompleted(0, 10); $failedJobs = $queue->getFailed(0, 10); // Get job counts $counts = $queue->getJobCounts(); echo "Waiting: " . $counts['waiting'] . "\n"; echo "Active: " . $counts['active'] . "\n"; echo "Delayed: " . $counts['delayed'] . "\n"; echo "Completed: " . $counts['completed'] . "\n"; echo "Failed: " . $counts['failed'] . "\n"; // Get count for specific types $pending = $queue->getJobCountByTypes('waiting', 'delayed'); echo "Pending jobs: " . $pending . "\n"; // Get counts grouped by priority $priorityCounts = $queue->getCountsPerPriority([0, 1, 2, 3]); echo "Priority 0: " . $priorityCounts[0] . "\n"; echo "Priority 1: " . $priorityCounts[1] . "\n";
Queue Management
// Pause the queue $queue->pause(); echo "Queue paused: " . ($queue->isPaused() ? 'yes' : 'no') . "\n"; // Resume the queue $queue->resume(); // Remove a specific job $removed = $queue->remove('job-id'); // Clean old jobs (grace period in milliseconds) $cleaned = $queue->clean( grace: 3600000, // 1 hour limit: 100, type: 'completed' ); // Retry failed jobs with options $queue->retryJobs([ 'count' => 100, // Max jobs to retry per iteration 'state' => 'failed', // State to retry from: 'failed' or 'completed' 'timestamp' => time() * 1000, // Only retry jobs before this timestamp ]); // Promote delayed jobs (move to waiting) $queue->promoteJobs(['count' => 100]); // Drain the queue (remove all waiting jobs) $queue->drain(); // Obliterate the queue (remove everything) $queue->obliterate(['force' => true]);
Parent-Child Jobs (Flows)
// Add a child job with a parent $childJob = $queue->add('child-task', $childData, [ 'parent' => [ 'id' => 'parent-job-id', 'queue' => 'bull:parent-queue', ], ]);
Job States
Jobs can be in one of the following states:
waiting- Job is waiting to be processedactive- Job is currently being processeddelayed- Job is delayed and waiting for its delay to expirecompleted- Job has been successfully processedfailed- Job has failed after all retry attemptspaused- Job is in a paused queueprioritized- Job is in the prioritized setwaiting-children- Parent job waiting for child jobs to complete
Connection Options
// Using a connection array $queue = new Queue('my-queue', [ 'connection' => [ 'host' => 'localhost', 'port' => 6379, 'database' => 0, 'password' => null, 'username' => null, ], ]); // Using a Redis URI $queue = new Queue('my-queue', [ 'connection' => 'redis://user:password@localhost:6379/0', ]); // Sharing a connection use BullMQ\RedisConnection; $connection = new RedisConnection([ 'host' => 'localhost', 'port' => 6379, ]); $queue1 = new Queue('queue-1', ['connection' => $connection]); $queue2 = new Queue('queue-2', ['connection' => $connection]); // Custom prefix $queue = new Queue('my-queue', [ 'prefix' => 'myapp', ]);
Interoperability
Jobs added with this PHP library can be processed by BullMQ workers in:
- Node.js - Using the official BullMQ package
- Python - Using the BullMQ Python package
- Elixir - Using the BullMQ Elixir package
Example Node.js worker:
import { Worker } from 'bullmq'; const worker = new Worker('my-queue', async job => { console.log(`Processing job ${job.id} with data:`, job.data); // Process the job... return { success: true }; });
Error Handling
use BullMQ\Queue; try { $queue = new Queue('my-queue'); $job = $queue->add('task', $data); } catch (\RuntimeException $e) { echo "Error: " . $e->getMessage() . "\n"; }
Testing
composer test
License
MIT License - see LICENSE for details.
Contributing
Contributions are welcome! Please see CONTRIBUTING.md for guidelines.
API Reference
Queue Methods
| Method | Description |
|---|---|
add(name, data, opts) |
Add a single job to the queue |
addBulk(jobs) |
Add multiple jobs in a single operation |
getJob(id) |
Get a job by its ID |
getJobs(jobIds) |
Get multiple jobs by their IDs |
getJobsByType(types, start, end, asc) |
Get jobs by type(s) with pagination |
getJobState(id) |
Get the current state of a job |
getJobCounts() |
Get job counts for all states |
getJobCountByTypes(...types) |
Get total count for specific types |
getCountsPerPriority(priorities) |
Get counts grouped by priority |
getWaiting(start, end) |
Get waiting jobs |
getActive(start, end) |
Get active jobs |
getDelayed(start, end) |
Get delayed jobs |
getPrioritized(start, end) |
Get prioritized jobs |
getCompleted(start, end) |
Get completed jobs |
getFailed(start, end) |
Get failed jobs |
pause() |
Pause the queue |
resume() |
Resume the queue |
isPaused() |
Check if queue is paused |
remove(jobId) |
Remove a specific job |
clean(grace, limit, type) |
Clean old jobs |
drain(delayed) |
Remove all waiting/delayed jobs |
obliterate(opts) |
Remove all queue data |
retryJobs(opts) |
Retry failed/completed jobs |
promoteJobs(opts) |
Promote delayed jobs to waiting |
close() |
Close the connection |
Job Options
| Option | Type | Description |
|---|---|---|
jobId |
string | Custom job ID |
delay |
int | Delay in milliseconds before processing |
priority |
int | Priority (lower = higher priority) |
attempts |
int | Number of retry attempts |
backoff |
array/int | Backoff strategy for retries |
lifo |
bool | Process newest jobs first |
removeOnComplete |
bool/int/array | Remove job on completion |
removeOnFail |
bool/int/array | Remove job on failure |
timestamp |
int | Job creation timestamp (ms) |
parent |
array | Parent job reference for flows |
Note on Job Schedulers: Repeatable/scheduled jobs (cron patterns) should be created from the Node.js side using
JobScheduler. The PHP client is designed for adding individual jobs, not managing schedulers.
Links
统计信息
- 总下载量: 4
- 月度下载量: 0
- 日度下载量: 0
- 收藏数: 0
- 点击次数: 0
- 依赖项目数: 0
- 推荐数: 0
其他信息
- 授权协议: MIT
- 更新时间: 2025-12-18