定制 byrnedo/php-nats-streaming 二次开发

按需修改功能、优化性能、对接业务系统,提供一站式技术支持

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

byrnedo/php-nats-streaming

最新稳定版本:0.2.4

Composer 安装命令:

composer require byrnedo/php-nats-streaming

包简介

PHP Client for nats-streaming-server.

README 文档

README

Build

Master Develop
Build Status Build Status

Coverage

Master Develop
Coverage Status Coverage Status

Intro

A php client for Nats Streaming Server.

Uses phpnats under the hood and closesly resembles it's api.

Requirements

Installation

Get composer:

curl -O http://getcomposer.org/composer.phar && chmod +x composer.phar

Add php-nats-streaming as a dependency to your project

php composer.phar require 'byrnedo/php-nats-streaming:^0.2.4'

Usage

Publish

$options = new \NatsStreaming\ConnectionOptions();
$options->setClientID("test");
$options->setClusterID("test-cluster");
$c = new \NatsStreaming\Connection($options);

$c->connect();

// Publish
$r = $c->publish('special.subject', 'some serialized payload...');

// optionally wait for the ack
$gotAck = $r->wait();
if (!$gotAck) {
    ...
}

$c->close();

Note

If publishing many messages at a time, you might at first do this:

foreach ($req as $data){
    $r = $c->publish(...);
    $gotAck = $r->wait();
    if (!$gotAck) {
        ...
    }
}

It's actually much faster to do the following:

$rs = [];
foreach ($req as $data){
    $rs[] = $c->publish(...);
}

foreach ($rs as $r){
    $r->wait();
}

Subscribe

$options = new \NatsStreaming\ConnectionOptions();
$c = new \NatsStreaming\Connection($options);

$c->connect();

$subOptions = new \NatsStreaming\SubscriptionOptions();
$subOptions->setStartAt(\NatsStreamingProtos\StartPosition::First());

$sub = $c->subscribe('special.subject', function ($message) {
    // implement
}, $subOptions);

$sub->wait(1);

// not explicitly needed
$sub->unsubscribe(); // or $sub->close();

$c->close();

If you want to subscribe to multiple channels you can use $c->wait():

...

$c->connect();

...

$sub = $c->subscribe('special.subject', function ($message) {
    // implement
}, $subOptions);
$sub2 = $c->subscribe('special.subject', function ($message) {
    // implement
}, $subOptions);

$c->wait();

Queue Group Subscribe

$options = new \NatsStreaming\ConnectionOptions();
$c = new \NatsStreaming\Connection($options);

$c->connect();

$subOptions = new \NatsStreaming\SubscriptionOptions();
$sub = $c->queueSubscribe('specialer.subject', 'workgroup', function ($message) {
    // implement
}, $subOptions);


$sub->wait(1);

// not explicitly needed
$sub->close(); // or $sub->unsubscribe();

$c->close();

Manual Ack

$options = new \NatsStreaming\ConnectionOptions();
$c = new \NatsStreaming\Connection($options);

$c->connect();

$subOptions = new \NatsStreaming\SubscriptionOptions();
$subOptions->setManualAck(true);

$sub = $c->subscribe('special.subject', function ($message) {
    $message->ack();
}, $subOptions);

$sub->wait(1);

$c->close();

License

MIT, see LICENSE

统计信息

  • 总下载量: 301.51k
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 45
  • 点击次数: 1
  • 依赖项目数: 0
  • 推荐数: 0

GitHub 信息

  • Stars: 45
  • Watchers: 4
  • Forks: 21
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2017-11-11