mshauneu/php-rdkafka-bundle 问题修复 & 功能扩展

解决BUG、新增功能、兼容多环境部署,快速响应你的开发需求

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

mshauneu/php-rdkafka-bundle

最新稳定版本:1.0.5

Composer 安装命令:

composer require mshauneu/php-rdkafka-bundle

包简介

Integrates php-rdkafka with Symfony2|3

README 文档

README

About

This Symfony bundle provides connectivity to the Kafka publish-subscribe messaging system based on rdkafka binding to librdkafka

Installation

Add the dependency in your composer.json

{
    "require": {
        "mshauneu/php-rdkafka-bundle"
    }
}

Enable the bundle in your application kernel

// app/AppKernel.php
public function registerBundles() {
    $bundles = array(
        // ...
        new Mshauneu\RdKafkaBundle\MshauneuRdKafkaBundle(),
    );
}

Configuration

Simple configuration could look like:

mshauneu_rd_kafka:
  producers: 
    test_producer: 
      brokers: 127.0.0.1:9092
      topic: test_topic   
  consumers:
    test_consumer:
      brokers: 127.0.0.1:9092
      topic: test_topic   
      properties: 
        group_id: "test_group_id"
      topic_properties: 
        offset_store_method: broker           
        auto_offset_reset: smallest
        auto_commit_interval_ms: 100

Configuration properties are documented:

Usage

Publishing messages to a Kafka topic

From a Symfony controller:

$payload = 'test_message';
$topicProducer = $container->get('mshauneu_rd_kafka')->getProducer("test_producer");
$topicProducer->produceStart();
$topicProducer->produce("message");
$topicProducer->produceStop();

By CLI:

./app/console kafka:producer --producer test_producer test_message 

Consume messages out of a Kafka topic:

Implement ConsumerInterface

class MessageHandler implements ConsumerInterface {
	public function consume($topic, $partition, $offset, $key, $payload) {
		echo "Received payload: " . $payload . PHP_EOL;
	}
}

Register it:

test_message_handler:
    class: MessageHandler

From a Symfony controller:

$topicConsumer = $container->get('mshauneu_rd_kafka')->getConsumer("test_producer");
$topicConsumer->consumeStart(TopicCommunicator::OFFSET_STORED);
$topicConsumer->consume($consumerImpl);
$topicConsumer->consumeStop();

By CLI:

./app/console kafka:consumer --consumer test_consumer --handler test_message_handler 

License

This project is under the MIT License. See the LICENSE file for the full license text.

统计信息

  • 总下载量: 26.67k
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 14
  • 点击次数: 0
  • 依赖项目数: 0
  • 推荐数: 0

GitHub 信息

  • Stars: 13
  • Watchers: 2
  • Forks: 9
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2016-07-21