oeltimacreation/php-simplequeue
最新稳定版本:1.0.0
Composer 安装命令:
composer require oeltimacreation/php-simplequeue
包简介
A lightweight, framework-agnostic background job queue system for PHP with Redis and database drivers
README 文档
README
A lightweight, framework-agnostic background job queue system for PHP. Supports Redis and database-backed queues with automatic retries, progress tracking, and graceful shutdown.
Features
- Framework Agnostic: Works with any PHP framework or standalone applications
- Multiple Queue Drivers: Redis (recommended) and Database polling
- Automatic Retries: Configurable retry with exponential backoff
- Progress Tracking: Report job progress with percentage and messages
- Graceful Shutdown: Handles SIGTERM/SIGINT for clean worker termination
- Singleton Worker: File locking prevents multiple workers from running
- Stale Job Recovery: Automatically recovers jobs stuck in running state
- PSR Compliant: Uses PSR-3 Logger and PSR-11 Container interfaces
Requirements
- PHP 8.1 or higher
- Redis (optional, for Redis driver)
- PDO (optional, for database driver)
Installation
composer require oeltimacreation/php-simplequeue
For Redis support:
composer require predis/predis
Quick Start
1. Create a Job Handler
<?php use Oeltima\SimpleQueue\Contract\JobHandlerInterface; class SendEmailJob implements JobHandlerInterface { public function handle(int $jobId, array $payload, ?callable $progressCallback = null): mixed { $to = $payload['to']; $subject = $payload['subject']; $body = $payload['body']; // Report progress if ($progressCallback) { $progressCallback(50, 'Sending email...'); } // Send email logic here mail($to, $subject, $body); if ($progressCallback) { $progressCallback(100, 'Email sent'); } return ['sent_at' => date('Y-m-d H:i:s')]; } }
2. Set Up the Queue
<?php use Oeltima\SimpleQueue\JobDispatcher; use Oeltima\SimpleQueue\JobRegistry; use Oeltima\SimpleQueue\QueueManager; use Oeltima\SimpleQueue\Storage\PdoJobStorage; use Predis\Client as RedisClient; // Create storage $pdo = new PDO('mysql:host=localhost;dbname=myapp', 'user', 'password'); $storage = new PdoJobStorage($pdo); // Create queue manager with Redis (recommended) $redis = new RedisClient(['host' => '127.0.0.1']); $queueManager = QueueManager::redis($redis); // Or use database polling as fallback // $queueManager = QueueManager::database($storage); // Or auto-select (tries Redis first, falls back to database) // $queueManager = QueueManager::create('auto', $redis, $storage); // Create job registry and register handlers $registry = new JobRegistry(); $registry->register('email.send', SendEmailJob::class); // Create dispatcher $dispatcher = new JobDispatcher($storage, $queueManager);
3. Dispatch Jobs
// Dispatch a single job $jobId = $dispatcher->dispatch('email.send', [ 'to' => 'user@example.com', 'subject' => 'Welcome!', 'body' => 'Thanks for signing up.', ]); // Dispatch with custom options $jobId = $dispatcher->dispatch( type: 'email.send', payload: ['to' => 'user@example.com', 'subject' => 'Hello'], queue: 'emails', // Custom queue name maxAttempts: 5, // Retry up to 5 times requestId: 'req-123' // Correlation ID for tracing ); // Dispatch batch $jobIds = $dispatcher->dispatchBatch('email.send', [ ['to' => 'user1@example.com', 'subject' => 'Hello'], ['to' => 'user2@example.com', 'subject' => 'Hello'], ]); // Check job status $job = $dispatcher->getStatus($jobId); echo $job->status; // pending, running, completed, failed echo $job->progress; // 0-100
4. Run the Worker
<?php // worker.php use Oeltima\SimpleQueue\Worker; use Monolog\Logger; use Monolog\Handler\StreamHandler; // Set up logging $logger = new Logger('worker'); $logger->pushHandler(new StreamHandler('php://stdout')); // Create worker $worker = new Worker( storage: $storage, queueManager: $queueManager, registry: $registry, logger: $logger, queue: 'default', options: [ 'poll_timeout' => 5, // Seconds to wait for jobs 'stuck_job_ttl' => 600, // Recover jobs running > 10 min 'retry_base_delay' => 2, // Base delay for exponential backoff 'retry_max_delay' => 300, // Maximum retry delay (5 min) 'lock_file' => '/tmp/myapp-worker.lock', ] ); // Run the worker (blocks until shutdown signal) $worker->run();
Run the worker:
php worker.php
For production, use a process manager like Supervisor:
[program:queue-worker] command=php /path/to/worker.php autostart=true autorestart=true user=www-data numprocs=1 redirect_stderr=true stdout_logfile=/var/log/queue-worker.log
Database Schema
Create the jobs table:
CREATE TABLE background_jobs ( id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY, queue VARCHAR(255) NOT NULL DEFAULT 'default', type VARCHAR(255) NOT NULL, status ENUM('pending', 'running', 'completed', 'failed', 'cancelled') NOT NULL DEFAULT 'pending', payload JSON, attempts INT UNSIGNED NOT NULL DEFAULT 0, max_attempts INT UNSIGNED NOT NULL DEFAULT 3, progress INT UNSIGNED DEFAULT NULL, progress_message VARCHAR(255) DEFAULT NULL, result JSON DEFAULT NULL, available_at DATETIME DEFAULT NULL, started_at DATETIME DEFAULT NULL, completed_at DATETIME DEFAULT NULL, locked_by VARCHAR(255) DEFAULT NULL, locked_at DATETIME DEFAULT NULL, error_message TEXT DEFAULT NULL, error_trace TEXT DEFAULT NULL, request_id VARCHAR(255) DEFAULT NULL, created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL, INDEX idx_queue_status (queue, status), INDEX idx_status_available (status, available_at), INDEX idx_locked_at (locked_at) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
Configuration
Queue Drivers
Redis Driver (recommended for production):
use Oeltima\SimpleQueue\QueueManager; use Predis\Client; $redis = new Client(['host' => '127.0.0.1', 'port' => 6379]); $queueManager = QueueManager::redis($redis, 'myapp'); // prefix for Redis keys
Database Driver (fallback option):
$queueManager = QueueManager::database($storage);
Auto Selection:
$queueManager = QueueManager::create( driverName: 'auto', // 'redis', 'db', or 'auto' redis: $redis, // Optional Redis client storage: $storage, // Optional storage for DB fallback redisPrefix: 'myapp' );
Worker Options
| Option | Default | Description |
|---|---|---|
poll_timeout |
5 | Seconds to wait for new jobs |
stuck_job_ttl |
600 | Seconds before recovering stuck jobs |
retry_base_delay |
2 | Base delay for exponential backoff |
retry_max_delay |
300 | Maximum retry delay in seconds |
lock_file |
/tmp/simplequeue-worker.lock |
Lock file path (null to disable) |
PSR-11 Container Integration
use Oeltima\SimpleQueue\JobRegistry; // Pass your PSR-11 container $registry = new JobRegistry($container); $registry->register('email.send', SendEmailJob::class); // Handler will be resolved from container if registered // Otherwise, instantiated directly
Advanced Usage
Custom Job Storage
Implement JobStorageInterface for custom storage:
use Oeltima\SimpleQueue\Contract\JobStorageInterface; use Oeltima\SimpleQueue\Contract\JobData; class MongoJobStorage implements JobStorageInterface { public function createJob(string $type, array $payload, ...): int { ... } public function find(int $id): ?JobData { ... } // Implement all interface methods }
Custom Queue Driver
Implement QueueDriverInterface for custom drivers:
use Oeltima\SimpleQueue\Contract\QueueDriverInterface; class RabbitMQDriver implements QueueDriverInterface { public function isAvailable(): bool { ... } public function enqueue(string $queue, int $jobId): void { ... } public function dequeue(string $queue, int $timeoutSeconds): ?int { ... } public function ack(string $queue, int $jobId): void { ... } public function nack(string $queue, int $jobId): void { ... } }
Handling Job Failures
Jobs automatically retry with exponential backoff:
- Attempt 1 fails → retry after 2 seconds
- Attempt 2 fails → retry after 4 seconds
- Attempt 3 fails → marked as failed
Access error information:
$job = $dispatcher->getStatus($jobId); if ($job->status === 'failed') { echo $job->errorMessage; echo $job->errorTrace; }
Testing
Use in-memory implementations for testing:
use Oeltima\SimpleQueue\Driver\InMemoryQueueDriver; use Oeltima\SimpleQueue\Storage\InMemoryJobStorage; $storage = new InMemoryJobStorage(); $driver = new InMemoryQueueDriver(); $queueManager = new QueueManager($driver); // Dispatch and process synchronously $dispatcher = new JobDispatcher($storage, $queueManager); $jobId = $dispatcher->dispatch('test.job', ['data' => 'value']); $worker = new Worker($storage, $queueManager, $registry); $worker->processOne(); // Process single job $job = $storage->find($jobId); $this->assertEquals('completed', $job->status);
API Reference
JobDispatcher
| Method | Description |
|---|---|
dispatch(string $type, array $payload, ...) |
Queue a single job |
dispatchBatch(string $type, array $payloads, ...) |
Queue multiple jobs |
getStatus(int $jobId) |
Get job details |
Worker
| Method | Description |
|---|---|
run() |
Start the worker loop |
processOne() |
Process a single job |
stop() |
Signal the worker to stop |
getWorkerId() |
Get the worker identifier |
JobData
| Property | Type | Description |
|---|---|---|
id |
int | Job ID |
type |
string | Job type identifier |
status |
string | pending, running, completed, failed, cancelled |
payload |
array | Job data |
progress |
?int | Progress percentage (0-100) |
progressMessage |
?string | Progress status message |
result |
mixed | Job result (when completed) |
errorMessage |
?string | Error message (when failed) |
attempts |
int | Number of attempts made |
Contributing
Contributions are welcome! Please see CONTRIBUTING.md for details.
Security
If you discover a security vulnerability, please send an email to gema@oeltimacreation.com instead of using the issue tracker.
License
The MIT License (MIT). Please see LICENSE for more information.
统计信息
- 总下载量: 2
- 月度下载量: 0
- 日度下载量: 0
- 收藏数: 1
- 点击次数: 0
- 依赖项目数: 0
- 推荐数: 0
其他信息
- 授权协议: MIT
- 更新时间: 2026-01-07