定制 lelikptz/async-consumer 二次开发

按需修改功能、优化性能、对接业务系统,提供一站式技术支持

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

lelikptz/async-consumer

最新稳定版本:v0.1.1

Composer 安装命令:

composer require lelikptz/async-consumer

包简介

Async consumer based on Fibers

README 文档

README

Асинхронный consumer реализованный с помощью Fiber. Для работы необходимо имплементировать TaskInterface. Реализация должна возвращать статус неблокирующей операции, которую можно распараллелить.

В Task.php пример имплементации TaskInterface где неблокирующей операцией является http запрос через guzzle.

Пример использования Http\Task:

Имплементируем фабрику для создания реквеста:

final class Factory implements RequestFactoryInterface
{
    public function __construct(private readonly LoggerInterface $logger)
    {
    }

    public function create(): RequestInterface
    {
        $this->logger->info('Some logic for creating request');

        return new Request('GET', 'https://www.google.com');
    }
}

Имплементируем handler для обработки респонса и ошибки:

final class Handler implements ResponseHandlerInterface
{
    public function __construct(private readonly LoggerInterface $logger)
    {
    }

    public function onSuccess(ResponseInterface $response): void
    {
        $this->logger->info(
            sprintf(
                "Response body: %s; response code: %s",
                $response->getBody()->getContents(),
                $response->getStatusCode()
            )
        );
        $this->logger->info('Some logic with response');
        $this->logger->info('Finish');
    }

    public function onException(RequestException $exception): void
    {
        $this->logger->error($exception->getMessage());
    }
}

Провайдер задач собирает необходимую таску и возвращает её в консьюмер по мере готовности:

final class Provider implements ProviderInterface
{
    public function __construct(private readonly LoggerInterface $logger)
    {
    }

    public function get(): array
    {
        return [
            new Task(new Factory($this->logger), new Handler($this->logger)),
        ];
    }
}

Собираем консьюмер и запускаем как демон например через супервизор.

$pollTimeoutInMicroseconds - дэлэй между опросами провайдера

$logger = new ConsoleLogger(new ConsoleOutput(OutputInterface::VERBOSITY_DEBUG));
(new AsyncConsumer(new Provider($logger), new FiberExecutor(), $pollTimeoutInMicroseconds, $logger))->consume();

Пример использования rabbitmq как провайдера задач:

Для использования AMPQProvider.php имплементируем TransformerInterface для создания TaskInterface из сообщения AMQPMessage:

final class Transformer implements TransformerInterface
{
    public function __construct(private readonly LoggerInterface $logger)
    {
    }

    public function transform(AMQPMessage $message): TaskInterface
    {
        return new Task(new Factory($this->logger), new Handler($this->logger));
    }
}

Собираем и запускаем:

$maxBatchSize - максимальный размер батча, который будем собирать из rabbitmq и по факту количество распараллеленных задач

$maxBatchCollectTimeInSeconds - время, которое ждём пока батч собирается из rabbitmq, если оно вышло запускам обработку с тем, что есть

$pollTimeoutInMicroseconds - дэлэй между опросами провайдера

$connection = new AMQPStreamConnection('localhost', '5672', 'guest', 'guest');
$provider = new AMPQProvider($connection, 'provider', new Transformer($logger));
$logger = new ConsoleLogger(new ConsoleOutput(OutputInterface::VERBOSITY_DEBUG));
$batch = new BatchProvider($provider, 10, 5, $pollTimeoutInMicroseconds);

(new AsyncConsumer($batch, new FiberExecutor(), $pollTimeoutInMicroseconds, $logger))->consume();

统计信息

  • 总下载量: 4
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 1
  • 点击次数: 0
  • 依赖项目数: 0
  • 推荐数: 0

GitHub 信息

  • Stars: 0
  • Watchers: 1
  • Forks: 0
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2023-10-16