beta/kafka.client
最新稳定版本:1.0.3
Composer 安装命令:
composer require beta/kafka.client
包简介
Kafka client
README 文档
README
Установка
composer require beta/kafka.client
Consumer пример работы
use RdKafka\Conf; use RdKafka\Consumer; use RdKafka\TopicConf; use KafkaClient\Client; $consumerConfig = new Conf(); $consumerConfig->set('group.id', 'myConsumerGroup'); // устанавливаем идентификатор группы потребителей сообщений $consumerConfig->set('enable.partition.eof', 'true'); $consumer = new Consumer($consumerConfig); $consumer->addBrokers("172.0.0.1,172.0.0.2"); // указываем адреса для подключения к брокерам сообщений $topicConfig = new TopicConf(); $topicConfig->set('auto.commit.interval.ms', 100); $topicConfig->set('offset.store.method', 'broker'); // механизм для хранения курсора $topicConfig->set('auto.offset.reset', 'earliest'); // курсор по-умолчанию $client = Client::initAsConsumer($consumer, $topicConfig); $message = $client->getMessage( 'my_topic', [ 'partition' => 0, 'offset' => 2, 'timeout' => 2000 ] ); // запрашиваем 1 сообщение из брокера $message->getData(); // payload сообщения $message->getOriginal(); // оригинальное сообщение RdKafka /** * Перебираем новые сообщения из брокера **/ foreach ($client->getMessageIterator('my_topic') as $message) { echo $message->getData(); }
Producer пример работы
use RdKafka\Conf; use RdKafka\Producer; use RdKafka\TopicConf; use KafkaClient\Client; $producerConfig = new Conf(); $producerConfig->set('log_level', (string) LOG_DEBUG); // режим ведения логов $producerConfig->set('debug', 'all'); $producer = new Producer($producerConfig); $producer->addBrokers("172.0.0.1,172.0.0.2"); // указываем адреса для подключения к брокерам сообщений $client = Client::initAsProducer($producer); $client->sendMessage( 'Test message', 'my_topic', [ 'partition' => 0, 'key' => 'example', 'headers' => ['One' => 'Two'] ] );
统计信息
- 总下载量: 78
- 月度下载量: 0
- 日度下载量: 0
- 收藏数: 0
- 点击次数: 0
- 依赖项目数: 1
- 推荐数: 0
其他信息
- 授权协议: MIT
- 更新时间: 2024-01-31