asifshoumik/kafka-laravel 问题修复 & 功能扩展

解决BUG、新增功能、兼容多环境部署,快速响应你的开发需求

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

asifshoumik/kafka-laravel

最新稳定版本:v1.0.1

Composer 安装命令:

composer require asifshoumik/kafka-laravel

包简介

A robust Kafka queue driver for Laravel microservices with support for delayed jobs, retry mechanisms, and comprehensive error handling.

README 文档

README

A robust and feature-rich Kafka queue driver for Laravel microservices with comprehensive error handling, delayed jobs, retry mechanisms, and dead letter queue support.

Latest Version on Packagist Total Downloads

Features

  • Full Laravel Queue Integration - Works with Laravel's built-in queue system
  • Error Handling & Retries - Comprehensive error handling with configurable retry mechanisms
  • Dead Letter Queue - Failed jobs are automatically moved to a dead letter queue
  • Delayed Jobs - Support for delayed job execution
  • Multiple Security Protocols - PLAINTEXT, SSL/TLS, SASL authentication
  • Configurable Timeouts - Fine-tuned timeout configurations for different scenarios
  • Batch Processing - Optimized batch processing for better performance
  • Monitoring & Logging - Comprehensive logging and monitoring capabilities
  • Console Commands - Artisan commands for queue management
  • Topic Mapping - Route different job types to different Kafka topics

Requirements

  • PHP 8.1 or higher
  • Laravel 10.0 or higher
  • rdkafka PHP extension
  • Kafka 2.1.0 or higher

Installation

📋 Important: The rdkafka PHP extension is required. See INSTALLATION.md for detailed installation instructions.

1. Install rdkafka Extension First

Important: The rdkafka PHP extension must be installed before installing this package.

Ubuntu/Debian:

sudo apt-get install librdkafka-dev
sudo pecl install rdkafka
# Add extension=rdkafka.so to your php.ini file

macOS (with Homebrew):

brew install librdkafka
pecl install rdkafka
# Add extension=rdkafka.so to your php.ini file

Windows:

  1. Download the appropriate DLL from the PECL rdkafka page
  2. Place the DLL in your PHP extensions directory
  3. Add extension=rdkafka to your php.ini file
  4. Restart your web server

Verify Installation:

php -m | grep rdkafka

2. Install the Package

composer require asifshoumik/kafka-laravel

3. Publish Configuration (Optional)

The package works out of the box, but you can publish the configuration file for customization:

php artisan vendor:publish --tag=kafka-queue-config

This creates config/kafka-queue.php with all available options.

4. Configure Environment Variables

Add the following to your .env file:

# Queue Configuration
QUEUE_CONNECTION=kafka

# Kafka Configuration  
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_GROUP_ID=laravel-consumer-group
KAFKA_DEFAULT_TOPIC=laravel-jobs
KAFKA_DEAD_LETTER_QUEUE=laravel-failed-jobs

# Security (if needed)
KAFKA_SECURITY_PROTOCOL=PLAINTEXT
KAFKA_SASL_MECHANISMS=PLAIN
KAFKA_SASL_USERNAME=
KAFKA_SASL_PASSWORD=

# Performance Tuning
KAFKA_BATCH_SIZE=16384
KAFKA_LINGER_MS=5
KAFKA_COMPRESSION_TYPE=none
KAFKA_MAX_ATTEMPTS=3

That's it! The package automatically registers the Kafka queue connection with all configurations, including:

  • Main connection settings (brokers, security, SSL, etc.)
  • Topic mapping for routing different job types to different topics
  • Consumer configuration for worker management
  • Monitoring and logging settings

No manual configuration of config/queue.php is required.

Quick Start

1. Dispatch Jobs

Any existing Laravel job will work with Kafka:

use App\Jobs\ProcessOrder;

// Dispatch to default topic
dispatch(new ProcessOrder($order));

// Dispatch to specific topic  
ProcessOrder::dispatch($order)->onQueue('orders');

// Dispatch with delay
ProcessOrder::dispatch($order)->delay(now()->addMinutes(10));

2. Start Consumer

php artisan kafka:work

3. Monitor Jobs

# Work specific queue/topic
php artisan kafka:work orders

# Work with options
php artisan kafka:work --timeout=60 --tries=3

Usage

Basic Job Dispatching

use App\Jobs\ProcessPodcast;

// Dispatch to default topic
ProcessPodcast::dispatch($podcast);

// Dispatch to specific topic
ProcessPodcast::dispatch($podcast)->onQueue('emails');

// Dispatch with delay
ProcessPodcast::dispatch($podcast)->delay(now()->addMinutes(10));

Creating Jobs

<?php

namespace App\Jobs;

use Asifshoumik\KafkaLaravel\Jobs\KafkaJob;

class ProcessPodcast extends KafkaJob
{
    public $podcast;

    public function __construct($podcast)
    {
        $this->podcast = $podcast;
    }

    public function handle(): void
    {
        // Process the podcast
        $this->processPayload([
            'podcast_id' => $this->podcast->id,
            'action' => 'process'
        ]);
    }

    protected function processPayload(array $payload): void
    {
        // Your custom processing logic here
        logger()->info('Processing podcast', $payload);
    }
}

Consuming Messages

Using Artisan Command

# Consume from specific topic
php artisan kafka:consume laravel-jobs

# With options
php artisan kafka:consume laravel-jobs --timeout=60 --memory=256 --sleep=3

# Stop when queue is empty
php artisan kafka:consume laravel-jobs --stopWhenEmpty

# Force run in maintenance mode
php artisan kafka:consume laravel-jobs --force

Using Queue Worker

# Standard Laravel queue worker
php artisan queue:work kafka --queue=laravel-jobs

# With specific options
php artisan queue:work kafka --queue=laravel-jobs --timeout=60 --memory=256

Topic Mapping

Topic mapping is automatically available through the published configuration. You can customize it in config/kafka-queue.php:

'topic_mapping' => [
    'default' => 'laravel-jobs',
    'emails' => 'laravel-email-jobs',
    'notifications' => 'laravel-notification-jobs',
    'processing' => 'laravel-processing-jobs',
],

Then use specific queues:

// Will go to 'laravel-email-jobs' topic
SendEmailJob::dispatch($user)->onQueue('emails');

// Will go to 'laravel-notification-jobs' topic
SendNotificationJob::dispatch($notification)->onQueue('notifications');

Error Handling and Retries

Jobs are automatically retried based on configuration. Failed jobs exceeding max attempts are moved to the dead letter queue.

class RiskyJob extends KafkaJob
{
    public $tries = 5; // Override default max attempts
    public $timeout = 120; // Job timeout in seconds

    public function handle(): void
    {
        // Risky operation that might fail
        if ($this->shouldFail()) {
            throw new \Exception('Job failed');
        }
    }

    public function failed(\Exception $exception): void
    {
        // Handle job failure
        Log::error('RiskyJob failed', [
            'exception' => $exception->getMessage(),
            'payload' => $this->getPayload()
        ]);
    }
}

Advanced Configuration

SSL/TLS Configuration

📋 Detailed Guide: See SSL-CONFIGURATION.md for comprehensive SSL/TLS setup instructions.

For SSL-only authentication (server verification only):

Option 1 - Using certificate files:

KAFKA_SECURITY_PROTOCOL=SSL
KAFKA_SSL_CA_LOCATION=/path/to/ca-certificate.crt

Option 2 - Using certificate strings (Kubernetes secrets):

KAFKA_SECURITY_PROTOCOL=SSL
KAFKA_SSL_CA_PEM="-----BEGIN CERTIFICATE-----
MIIErDCCApSgAwIBAgIRdmZqCilhcM...
-----END CERTIFICATE-----"

# Optional: SSL verification settings
KAFKA_SSL_VERIFY_HOSTNAME=true
KAFKA_SSL_CHECK_HOSTNAME=true

For mutual SSL authentication (client and server verification):

Option 1 - Using certificate files:

KAFKA_SECURITY_PROTOCOL=SSL
KAFKA_SSL_CA_LOCATION=/path/to/ca-certificate.crt
KAFKA_SSL_CERTIFICATE_LOCATION=/path/to/client-certificate.crt
KAFKA_SSL_KEY_LOCATION=/path/to/client-private-key.key

Option 2 - Using certificate strings (Kubernetes secrets):

KAFKA_SECURITY_PROTOCOL=SSL
KAFKA_SSL_CA_PEM="-----BEGIN CERTIFICATE-----..."
KAFKA_SSL_CERTIFICATE_PEM="-----BEGIN CERTIFICATE-----..."
KAFKA_SSL_KEY_PEM="-----BEGIN PRIVATE KEY-----..."

# Optional: SSL verification settings
KAFKA_SSL_VERIFY_HOSTNAME=true
KAFKA_SSL_CHECK_HOSTNAME=true

Example with your CA certificate:

# Basic SSL with CA verification
KAFKA_SECURITY_PROTOCOL=SSL
KAFKA_SSL_CA_LOCATION=C:\path\to\ca-certificate.crt

# If you also have client certificates (for mutual TLS)
KAFKA_SSL_CERTIFICATE_LOCATION=C:\path\to\client.crt
KAFKA_SSL_KEY_LOCATION=C:\path\to\client.key

Note:

  • File paths: Use forward slashes / or double backslashes \\ in Windows paths
  • Certificate strings: Include the full PEM content with headers and footers
  • Kubernetes: PEM strings work perfectly with Kubernetes secrets
  • Precedence: PEM strings take priority over file paths if both are provided
  • SSL Verification: ssl_verify_hostname and ssl_check_hostname provide additional security
  • The CA certificate verifies the Kafka broker's identity
  • Client certificates are only needed for mutual TLS authentication

SSL Verification Options:

  • KAFKA_SSL_VERIFY_HOSTNAME=true (default) - Enables SSL certificate verification
  • KAFKA_SSL_CHECK_HOSTNAME=true (default) - Enables hostname verification
  • Set to false only for testing or when using self-signed certificates

SASL Authentication

KAFKA_SECURITY_PROTOCOL=SASL_SSL
KAFKA_SASL_MECHANISMS=SCRAM-SHA-256
KAFKA_SASL_USERNAME=your-username
KAFKA_SASL_PASSWORD=your-password

Performance Optimization

# Increase batch size for better throughput
KAFKA_BATCH_SIZE=65536

# Enable compression
KAFKA_COMPRESSION_TYPE=snappy

# Tune timeouts
KAFKA_MESSAGE_TIMEOUT_MS=300000
KAFKA_REQUEST_TIMEOUT_MS=30000

Monitoring and Logging

The package provides comprehensive logging. Monitor your application logs for:

  • Job processing information
  • Connection status
  • Error details
  • Performance metrics

Example log entry:

{
    "level": "info",
    "message": "Kafka job processed",
    "context": {
        "topic": "laravel-jobs",
        "job_id": "uuid-here",
        "job_name": "App\\Jobs\\ProcessPodcast",
        "attempts": 1,
        "processing_time": 1.5
    }
}

Troubleshooting

rdkafka Extension Not Found

If you get an error like this when running composer require asifshoumik/kafka-laravel:

Package asifshoumik/kafka-laravel has requirements incompatible with your PHP version, PHP extensions and Composer version:
- asifshoumik/kafka-laravel v1.0.0 requires ext-rdkafka * but it is not present.

This means the rdkafka PHP extension is not installed. Follow these steps:

  1. Install the extension first (see INSTALLATION.md for detailed instructions)
  2. Verify installation: php -m | grep rdkafka
  3. Check php.ini: Ensure extension=rdkafka is added to your php.ini
  4. Restart web server after installing the extension

Installation with Missing Extensions

If you need to install the package without rdkafka (for development purposes only):

composer require asifshoumik/kafka-laravel --ignore-platform-req=ext-rdkafka

Warning: The package will not work without the rdkafka extension in production.

Configuration Not Being Used

If your configuration changes in config/kafka-queue.php are not taking effect:

  1. Clear config cache: php artisan config:clear
  2. Restart queue workers: Stop and restart all queue workers
  3. Verify environment: php artisan config:show queue.connections.kafka

The package configuration works in this order:

  1. Environment variables (.env file) take highest priority
  2. Published config/kafka-queue.php provides defaults and advanced options
  3. Package defaults are used as fallbacks

Note: The package automatically registers the Kafka queue connection, so no manual configuration of config/queue.php is required.

Connection Issues

  • Verify Kafka is running: telnet localhost 9092
  • Check firewall settings for Kafka ports
  • Verify broker addresses in configuration
  • Check authentication credentials if using SASL

SSL/TLS Certificate Issues

Certificate file not found:

Failed to set SSL CA location: No such file or directory
  • Verify the certificate file path exists
  • Use absolute paths for certificate files
  • Check file permissions (readable by web server)

Certificate verification failed:

SSL handshake failed
  • Ensure the CA certificate matches your Kafka broker's certificate
  • Check if the certificate has expired
  • Verify the certificate chain is complete

Windows path issues:

  • Use forward slashes: C:/path/to/cert.crt
  • Or escape backslashes: C:\\path\\to\\cert.crt
  • Avoid spaces in certificate file paths

Testing certificate connection:

# Test SSL connection to Kafka broker
openssl s_client -connect your-kafka-broker:9093 -CAfile /path/to/your/ca.crt

Performance Issues

  • Increase batch_size for higher throughput
  • Enable compression: KAFKA_COMPRESSION_TYPE=snappy
  • Tune consumer fetch settings
  • Monitor partition lag

Testing

composer test

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests for new functionality
  5. Submit a pull request

Security

If you discover any security-related issues, please email the maintainer instead of using the issue tracker.

License

The MIT License (MIT). Please see License File for more information.

Changelog

Please see CHANGELOG for more information on what has changed recently.

Credits

Support

统计信息

  • 总下载量: 61
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 0
  • 点击次数: 2
  • 依赖项目数: 0
  • 推荐数: 0

GitHub 信息

  • Stars: 0
  • Watchers: 0
  • Forks: 0
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2025-07-21