asifshoumik/kafka-laravel
最新稳定版本:v1.0.1
Composer 安装命令:
composer require asifshoumik/kafka-laravel
包简介
A robust Kafka queue driver for Laravel microservices with support for delayed jobs, retry mechanisms, and comprehensive error handling.
README 文档
README
A robust and feature-rich Kafka queue driver for Laravel microservices with comprehensive error handling, delayed jobs, retry mechanisms, and dead letter queue support.
Features
- ✅ Full Laravel Queue Integration - Works with Laravel's built-in queue system
- ✅ Error Handling & Retries - Comprehensive error handling with configurable retry mechanisms
- ✅ Dead Letter Queue - Failed jobs are automatically moved to a dead letter queue
- ✅ Delayed Jobs - Support for delayed job execution
- ✅ Multiple Security Protocols - PLAINTEXT, SSL/TLS, SASL authentication
- ✅ Configurable Timeouts - Fine-tuned timeout configurations for different scenarios
- ✅ Batch Processing - Optimized batch processing for better performance
- ✅ Monitoring & Logging - Comprehensive logging and monitoring capabilities
- ✅ Console Commands - Artisan commands for queue management
- ✅ Topic Mapping - Route different job types to different Kafka topics
Requirements
- PHP 8.1 or higher
- Laravel 10.0 or higher
- rdkafka PHP extension
- Kafka 2.1.0 or higher
Installation
📋 Important: The rdkafka PHP extension is required. See INSTALLATION.md for detailed installation instructions.
1. Install rdkafka Extension First
Important: The rdkafka PHP extension must be installed before installing this package.
Ubuntu/Debian:
sudo apt-get install librdkafka-dev
sudo pecl install rdkafka
# Add extension=rdkafka.so to your php.ini file
macOS (with Homebrew):
brew install librdkafka
pecl install rdkafka
# Add extension=rdkafka.so to your php.ini file
Windows:
- Download the appropriate DLL from the PECL rdkafka page
- Place the DLL in your PHP extensions directory
- Add
extension=rdkafkato your php.ini file - Restart your web server
Verify Installation:
php -m | grep rdkafka
2. Install the Package
composer require asifshoumik/kafka-laravel
3. Publish Configuration (Optional)
The package works out of the box, but you can publish the configuration file for customization:
php artisan vendor:publish --tag=kafka-queue-config
This creates config/kafka-queue.php with all available options.
4. Configure Environment Variables
Add the following to your .env file:
# Queue Configuration QUEUE_CONNECTION=kafka # Kafka Configuration KAFKA_BOOTSTRAP_SERVERS=localhost:9092 KAFKA_GROUP_ID=laravel-consumer-group KAFKA_DEFAULT_TOPIC=laravel-jobs KAFKA_DEAD_LETTER_QUEUE=laravel-failed-jobs # Security (if needed) KAFKA_SECURITY_PROTOCOL=PLAINTEXT KAFKA_SASL_MECHANISMS=PLAIN KAFKA_SASL_USERNAME= KAFKA_SASL_PASSWORD= # Performance Tuning KAFKA_BATCH_SIZE=16384 KAFKA_LINGER_MS=5 KAFKA_COMPRESSION_TYPE=none KAFKA_MAX_ATTEMPTS=3
That's it! The package automatically registers the Kafka queue connection with all configurations, including:
- Main connection settings (brokers, security, SSL, etc.)
- Topic mapping for routing different job types to different topics
- Consumer configuration for worker management
- Monitoring and logging settings
No manual configuration of config/queue.php is required.
Quick Start
1. Dispatch Jobs
Any existing Laravel job will work with Kafka:
use App\Jobs\ProcessOrder; // Dispatch to default topic dispatch(new ProcessOrder($order)); // Dispatch to specific topic ProcessOrder::dispatch($order)->onQueue('orders'); // Dispatch with delay ProcessOrder::dispatch($order)->delay(now()->addMinutes(10));
2. Start Consumer
php artisan kafka:work
3. Monitor Jobs
# Work specific queue/topic php artisan kafka:work orders # Work with options php artisan kafka:work --timeout=60 --tries=3
Usage
Basic Job Dispatching
use App\Jobs\ProcessPodcast; // Dispatch to default topic ProcessPodcast::dispatch($podcast); // Dispatch to specific topic ProcessPodcast::dispatch($podcast)->onQueue('emails'); // Dispatch with delay ProcessPodcast::dispatch($podcast)->delay(now()->addMinutes(10));
Creating Jobs
<?php namespace App\Jobs; use Asifshoumik\KafkaLaravel\Jobs\KafkaJob; class ProcessPodcast extends KafkaJob { public $podcast; public function __construct($podcast) { $this->podcast = $podcast; } public function handle(): void { // Process the podcast $this->processPayload([ 'podcast_id' => $this->podcast->id, 'action' => 'process' ]); } protected function processPayload(array $payload): void { // Your custom processing logic here logger()->info('Processing podcast', $payload); } }
Consuming Messages
Using Artisan Command
# Consume from specific topic php artisan kafka:consume laravel-jobs # With options php artisan kafka:consume laravel-jobs --timeout=60 --memory=256 --sleep=3 # Stop when queue is empty php artisan kafka:consume laravel-jobs --stopWhenEmpty # Force run in maintenance mode php artisan kafka:consume laravel-jobs --force
Using Queue Worker
# Standard Laravel queue worker php artisan queue:work kafka --queue=laravel-jobs # With specific options php artisan queue:work kafka --queue=laravel-jobs --timeout=60 --memory=256
Topic Mapping
Topic mapping is automatically available through the published configuration. You can customize it in config/kafka-queue.php:
'topic_mapping' => [ 'default' => 'laravel-jobs', 'emails' => 'laravel-email-jobs', 'notifications' => 'laravel-notification-jobs', 'processing' => 'laravel-processing-jobs', ],
Then use specific queues:
// Will go to 'laravel-email-jobs' topic SendEmailJob::dispatch($user)->onQueue('emails'); // Will go to 'laravel-notification-jobs' topic SendNotificationJob::dispatch($notification)->onQueue('notifications');
Error Handling and Retries
Jobs are automatically retried based on configuration. Failed jobs exceeding max attempts are moved to the dead letter queue.
class RiskyJob extends KafkaJob { public $tries = 5; // Override default max attempts public $timeout = 120; // Job timeout in seconds public function handle(): void { // Risky operation that might fail if ($this->shouldFail()) { throw new \Exception('Job failed'); } } public function failed(\Exception $exception): void { // Handle job failure Log::error('RiskyJob failed', [ 'exception' => $exception->getMessage(), 'payload' => $this->getPayload() ]); } }
Advanced Configuration
SSL/TLS Configuration
📋 Detailed Guide: See SSL-CONFIGURATION.md for comprehensive SSL/TLS setup instructions.
For SSL-only authentication (server verification only):
Option 1 - Using certificate files:
KAFKA_SECURITY_PROTOCOL=SSL KAFKA_SSL_CA_LOCATION=/path/to/ca-certificate.crt
Option 2 - Using certificate strings (Kubernetes secrets):
KAFKA_SECURITY_PROTOCOL=SSL KAFKA_SSL_CA_PEM="-----BEGIN CERTIFICATE----- MIIErDCCApSgAwIBAgIRdmZqCilhcM... -----END CERTIFICATE-----" # Optional: SSL verification settings KAFKA_SSL_VERIFY_HOSTNAME=true KAFKA_SSL_CHECK_HOSTNAME=true
For mutual SSL authentication (client and server verification):
Option 1 - Using certificate files:
KAFKA_SECURITY_PROTOCOL=SSL KAFKA_SSL_CA_LOCATION=/path/to/ca-certificate.crt KAFKA_SSL_CERTIFICATE_LOCATION=/path/to/client-certificate.crt KAFKA_SSL_KEY_LOCATION=/path/to/client-private-key.key
Option 2 - Using certificate strings (Kubernetes secrets):
KAFKA_SECURITY_PROTOCOL=SSL KAFKA_SSL_CA_PEM="-----BEGIN CERTIFICATE-----..." KAFKA_SSL_CERTIFICATE_PEM="-----BEGIN CERTIFICATE-----..." KAFKA_SSL_KEY_PEM="-----BEGIN PRIVATE KEY-----..." # Optional: SSL verification settings KAFKA_SSL_VERIFY_HOSTNAME=true KAFKA_SSL_CHECK_HOSTNAME=true
Example with your CA certificate:
# Basic SSL with CA verification KAFKA_SECURITY_PROTOCOL=SSL KAFKA_SSL_CA_LOCATION=C:\path\to\ca-certificate.crt # If you also have client certificates (for mutual TLS) KAFKA_SSL_CERTIFICATE_LOCATION=C:\path\to\client.crt KAFKA_SSL_KEY_LOCATION=C:\path\to\client.key
Note:
- File paths: Use forward slashes
/or double backslashes\\in Windows paths - Certificate strings: Include the full PEM content with headers and footers
- Kubernetes: PEM strings work perfectly with Kubernetes secrets
- Precedence: PEM strings take priority over file paths if both are provided
- SSL Verification:
ssl_verify_hostnameandssl_check_hostnameprovide additional security - The CA certificate verifies the Kafka broker's identity
- Client certificates are only needed for mutual TLS authentication
SSL Verification Options:
KAFKA_SSL_VERIFY_HOSTNAME=true(default) - Enables SSL certificate verificationKAFKA_SSL_CHECK_HOSTNAME=true(default) - Enables hostname verification- Set to
falseonly for testing or when using self-signed certificates
SASL Authentication
KAFKA_SECURITY_PROTOCOL=SASL_SSL KAFKA_SASL_MECHANISMS=SCRAM-SHA-256 KAFKA_SASL_USERNAME=your-username KAFKA_SASL_PASSWORD=your-password
Performance Optimization
# Increase batch size for better throughput KAFKA_BATCH_SIZE=65536 # Enable compression KAFKA_COMPRESSION_TYPE=snappy # Tune timeouts KAFKA_MESSAGE_TIMEOUT_MS=300000 KAFKA_REQUEST_TIMEOUT_MS=30000
Monitoring and Logging
The package provides comprehensive logging. Monitor your application logs for:
- Job processing information
- Connection status
- Error details
- Performance metrics
Example log entry:
{
"level": "info",
"message": "Kafka job processed",
"context": {
"topic": "laravel-jobs",
"job_id": "uuid-here",
"job_name": "App\\Jobs\\ProcessPodcast",
"attempts": 1,
"processing_time": 1.5
}
}
Troubleshooting
rdkafka Extension Not Found
If you get an error like this when running composer require asifshoumik/kafka-laravel:
Package asifshoumik/kafka-laravel has requirements incompatible with your PHP version, PHP extensions and Composer version:
- asifshoumik/kafka-laravel v1.0.0 requires ext-rdkafka * but it is not present.
This means the rdkafka PHP extension is not installed. Follow these steps:
- Install the extension first (see INSTALLATION.md for detailed instructions)
- Verify installation:
php -m | grep rdkafka - Check php.ini: Ensure
extension=rdkafkais added to your php.ini - Restart web server after installing the extension
Installation with Missing Extensions
If you need to install the package without rdkafka (for development purposes only):
composer require asifshoumik/kafka-laravel --ignore-platform-req=ext-rdkafka
Warning: The package will not work without the rdkafka extension in production.
Configuration Not Being Used
If your configuration changes in config/kafka-queue.php are not taking effect:
- Clear config cache:
php artisan config:clear - Restart queue workers: Stop and restart all queue workers
- Verify environment:
php artisan config:show queue.connections.kafka
The package configuration works in this order:
- Environment variables (
.envfile) take highest priority - Published
config/kafka-queue.phpprovides defaults and advanced options - Package defaults are used as fallbacks
Note: The package automatically registers the Kafka queue connection, so no manual configuration of config/queue.php is required.
Connection Issues
- Verify Kafka is running:
telnet localhost 9092 - Check firewall settings for Kafka ports
- Verify broker addresses in configuration
- Check authentication credentials if using SASL
SSL/TLS Certificate Issues
Certificate file not found:
Failed to set SSL CA location: No such file or directory
- Verify the certificate file path exists
- Use absolute paths for certificate files
- Check file permissions (readable by web server)
Certificate verification failed:
SSL handshake failed
- Ensure the CA certificate matches your Kafka broker's certificate
- Check if the certificate has expired
- Verify the certificate chain is complete
Windows path issues:
- Use forward slashes:
C:/path/to/cert.crt - Or escape backslashes:
C:\\path\\to\\cert.crt - Avoid spaces in certificate file paths
Testing certificate connection:
# Test SSL connection to Kafka broker
openssl s_client -connect your-kafka-broker:9093 -CAfile /path/to/your/ca.crt
Performance Issues
- Increase
batch_sizefor higher throughput - Enable compression:
KAFKA_COMPRESSION_TYPE=snappy - Tune consumer fetch settings
- Monitor partition lag
Testing
composer test
Contributing
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests for new functionality
- Submit a pull request
Security
If you discover any security-related issues, please email the maintainer instead of using the issue tracker.
License
The MIT License (MIT). Please see License File for more information.
Changelog
Please see CHANGELOG for more information on what has changed recently.
Credits
Support
统计信息
- 总下载量: 61
- 月度下载量: 0
- 日度下载量: 0
- 收藏数: 0
- 点击次数: 2
- 依赖项目数: 0
- 推荐数: 0
其他信息
- 授权协议: MIT
- 更新时间: 2025-07-21