thiagobrauer/laravel-kafka
Composer 安装命令:
composer require thiagobrauer/laravel-kafka
包简介
README 文档
README
This package is based on anam-hossain's example
Installation
-
Install the librdkafka library
-
Install the php-rdkafka PECL extension
-
Install this package using composer:
composer require thiagobrauer/laravel-kafka
- Publish the package's configuration file:
php artisan vendor:publish --provider="ThiagoBrauer\LaravelKafka\ServiceProvider"
- Add these properties to your
.envfile, changing the values as needed:
KAFKA_PRODUCER_SERVERS=kafka:9092
KAFKA_PRODUCER_DEBUG=true
KAFKA_CONSUMER_SERVERS=kafka:9092
KAFKA_CONSUMER_TOPICS=inventories
KAFKA_CONSUMER_GROUP_ID=group1
You can set multiple producer servers, consumer servers and consumer topics, using a , as separator.
Usage
Producer
To produce a message, just use the KafkaProducer.php class:
use ThiagoBrauer\LaravelKafka\KafkaProducer; ... $producer new KafkaProducer() $producer->setTopic('topic1')->send('message');
Consumer
First, you need to create a class to handle the messages received. The class must extend ThiagoBrauer\LaravelKafka\Handlers\MessageHandler and implement the method handle, like the example below:
<?php namespace App\Kafka\Handlers; use ThiagoBrauer\LaravelKafka\Handlers\MessageHandler; use RdKafka\Message; class KafkaMessageHandler extends MessageHandler { public function handle(Message $message) { var_dump($message); } }
Then, add your class to the message_handlers section of your config/laravel_kafka.php file, organized by topic:
... 'message_handlers' => [ 'topic1' => [ App\Kafka\Handlers\KafkaMessageHandler::class ], 'topic2' => [ App\Kafka\Handlers\KafkaMessageHandler::class ] ] ...
and run php artisan config:cache
After that, you're ready to start the consumer
php artisan kafka:consume
You can define the consumer configuration using variables in your .env file or command options:
KAFKA_PRODUCER_SERVERS=kafka:9092
KAFKA_PRODUCER_DEBUG=true
KAFKA_PRODUCER_COMPRESSION=snappy
KAFKA_CONSUMER_SERVERS=kafka:9092
KAFKA_CONSUMER_TOPICS=inventories
KAFKA_CONSUMER_GROUP_ID=group1
KAFKA_CONSUMER_COMMIT_ASYNC=true
KAFKA_CONSUMER_TIMEOUT_MS=120000
KAFKA_CONSUMER_AUTO_OFFSET_RESET=earliest
KAFKA_CONSUMER_AUTO_COMMIT=true
php artisan kafka:consume --servers=kafka:9092 php artisan kafka:consume --topics=inventories php artisan kafka:consume --group_id=group1 php artisan kafka:consume --timeout_ms=120000 php artisan kafka:consume --auto_offset_reset=earliest php artisan kafka:consume --commit_async php artisan kafka:consume --auto_commit php artisan kafka:consume --once // consume only one message
统计信息
- 总下载量: 9.23k
- 月度下载量: 0
- 日度下载量: 0
- 收藏数: 1
- 点击次数: 0
- 依赖项目数: 0
- 推荐数: 0
其他信息
- 授权协议: Unknown
- 更新时间: 2021-05-27