cesurapp/swoole-bundle 问题修复 & 功能扩展

解决BUG、新增功能、兼容多环境部署,快速响应你的开发需求

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

cesurapp/swoole-bundle

最新稳定版本:1.2.8

Composer 安装命令:

composer require cesurapp/swoole-bundle

包简介

Symfony Swoole Bundle

README 文档

README

App Tester Software License

Built-in Swoole http server, background jobs (Task), scheduled task (Cron) worker are available. Failed jobs are saved in the database to be retried. Each server has built-in background task worker. Scheduled tasks run simultaneously on all servers. It is not possible for tasks to run at the same time as locking is used.

Install

Required Symfony 8

composer req cesurapp/swoole-bundle

Edit: public/index.php

...
require_once dirname(__DIR__).'/vendor/cesurapp/swoole-bundle/src/Runtime/entrypoint.php';
require_once dirname(__DIR__).'/vendor/autoload_runtime.php';
...

Configuration:

# config/packages/swoole.yaml
swoole:
  entrypoint: public/index.php
  watch_dir: /config,/src,/templates
  watch_extension: '*.php,*.yaml,*.yml,*.twig'
  replace_http_client: true # Replace Symfony HTTP Client to Swoole Client 
  cron_worker: true # Enable Cron Worker Service
  task_worker: true # Enable Task Worker Service
  task_sync_mode: false # Enable SYNC Mode -> Default false
  failed_task_retry: '@EveryMinute10'
  failed_task_attempt: 2 # Failed Task Retry Count

Server Environment: .env

# Worker Configuration
#SERVER_WORKER_CRON=true # Run Cron Worker -> Default = 1
#SERVER_WORKER_TASK=true # Run Task Worker -> Default = 1
#SERVER_WORKER_PROCESS=true # Run Process Worker -> Default = 1

# HTTP Server Configuration
SERVER_HTTP_HOST=127.0.0.1 # Default = 0.0.0.0
SERVER_HTTP_PORT=9090 # Default = 80
#SERVER_HTTP_MODE=2 # SWOOLE_PROCESS -> Default = 2
#SERVER_HTTP_SOCK_TYPE=1 # SWOOLE_SOCK_TCP -> Default = 1
#SERVER_HTTP_SOCKET=false # Websocket Socket -> Default = false

# HTTP Server Settings
#SERVER_HTTP_SETTINGS_WORKER_NUM=2 # Default = CPU Count
#SERVER_HTTP_SETTINGS_TASK_WORKER_NUM=1 # Default = CPU Count / 2
#SERVER_HTTP_SETTINGS_ENABLE_STATIC_HANDLER=false # Default = false
#SERVER_HTTP_SETTINGS_LOG_LEVEL=4 # Details Openswoole\Constant LOG_LEVEL -> Default = 4 (SWOOLE_LOG_WARNING)
#SERVER_HTTP_SETTINGS_MAX_WAIT_TIME=60 # Default = 60
#SERVER_HTTP_SETTINGS_TASK_ENABLE_COROUTINE=true # Default = true
#SERVER_HTTP_SETTINGS_TASK_MAX_REQUEST=0 # Default = 0
#SERVER_HTTP_SETTINGS_PACKAGE_MAX_LENGTH=15728640 # 15MB -> Default = 15728640
#SERVER_HTTP_SETTINGS_HTTP_COMPRESSION=true # Default = true
#SERVER_HTTP_SETTINGS_MAX_REQUEST=10000 # Default = 10000
#SERVER_HTTP_SETTINGS_HEARTBEAT_CHECK_INTERVAL=60 # Default = 60
#SERVER_HTTP_SETTINGS_HEARTBEAT_IDLE_TIME=180 # Default = 180

# TCP Server Configuration
#SERVER_TCP_PORT=9502 # Default = 9502

Server Commands

# Cron Commands
bin/console cron:list         # List cron jobs
bin/console cron:run AcmeCron # Run cron process one time, without locking.

# Server Commands
bin/console server:start  # Start http,cron,queue server
bin/console server:stop   # Stop http,cron,queue server
bin/console server:status # Status http,cron,queue server
bin/console server:watch  # Start http,cron,queue server for development mode (file watcher enabled)

# Task|Job Commands
bin/console task:list           # List registered tasks
bin/console task:failed:clear   # Clear all failed task
bin/console task:failed:retry   # Forced send all failed tasks to swoole task worker
bin/console task:failed:view    # Lists failed tasks

Create Cron Job

You can use cron expression for scheduled tasks, or you can use predefined expressions.

/**
 * Predefined Scheduling
 *
 * '@yearly'    => '0 0 1 1 *',
 * '@annually'  => '0 0 1 1 *',
 * '@monthly'   => '0 0 1 * *',
 * '@weekly'    => '0 0 * * 0',
 * '@daily'     => '0 0 * * *',
 * '@hourly'    => '0 * * * *',
 * '@EveryMinute'    => 'w* * * * *',
 * "@EveryMinute5'  => '*\/5 * * * *',
 * '@EveryMinute10'  => '*\/10 * * * *',
 * '@EveryMinute15'  => '*\/15 * * * *',
 * '@EveryMinute30'  => '*\/30 * * * *',```
 */
class ExampleJob extends \Cesurapp\SwooleBundle\Cron\AbstractCronJob {
    /**
     * @see AbstractCronJob
     */
    public string $TIME = '@EveryMinute10';

    /**
     * Cron is Enable|Disable.
     */
    public bool $ENABLE = true;
    
    /**
     * Cron Context 
     */
    public function __invoke(): void {
    
    }
}

Create Task (Background Job or Queue)

Data passed to jobs must be of type string, int, bool, array, objects cannot be serialized.

Create:

class ExampleTask implements \Cesurapp\SwooleBundle\Task\TaskInterface {
    public function __invoke(object|string $data = null): void {
        var_dump(
            $data['name'],
            $data['invoke']
        );
    }
}

Handle Task:

public function hello(\Cesurapp\SwooleBundle\Task\TaskHandler $taskHandler) {
    $taskHandler->dispatch(ExampleTask::class, [
        'name' => 'Test',
        'invoke' => 'Data'
    ]);
}

Create Process Worker

Process Worker allows you to create continuously running tasks in a separate process when the server starts. It's ideal for Redis LISTEN, Postgres LISTEN, or similar continuous listening commands.

Features:

  • Each process runs as a separate Swoole Process
  • Automatic restart support when the process completes
  • Configurable restart delay
  • Enable/Disable support

Configuration:

# config/packages/swoole.yaml
swoole:
    process_worker: true  # Default: true

Or via environment variable:

SERVER_WORKER_PROCESS=1  # Enable
SERVER_WORKER_PROCESS=0  # Disable

Create Process Job:

Use ProcessInterface or extend AbstractProcessJob:

<?php

namespace App\Process;

use Cesurapp\SwooleBundle\Process\AbstractProcessJob;

class RedisListenerProcess extends AbstractProcessJob
{
    // Is process active?
    public bool $ENABLE = true;
    
    // Restart when process completes
    public bool $RESTART = true;
    
    // Wait time before restart (seconds)
    public int $RESTART_DELAY = 5;

    public function __construct(
        private readonly RedisClient $redis,
        private readonly LoggerInterface $logger
    ) {
    }

    public function __invoke(): void
    {
        $this->logger->info('Redis listener started');
        
        // Redis SUBSCRIBE command
        $this->redis->subscribe(['channel1', 'channel2'], function ($redis, $channel, $message) {
            $this->logger->info("Received message from {$channel}: {$message}");
            // Process here
        });
    }
}

Postgres LISTEN Example:

<?php

namespace App\Process;

use Cesurapp\SwooleBundle\Process\AbstractProcessJob;
use Doctrine\DBAL\Connection;

class PostgresListenerProcess extends AbstractProcessJob
{
    public bool $ENABLE = true;
    public bool $RESTART = true;
    public int $RESTART_DELAY = 3;

    public function __construct(
        private readonly Connection $connection,
        private readonly LoggerInterface $logger
    ) {
    }

    public function __invoke(): void
    {
        $this->logger->info('Postgres listener started');
        
        // LISTEN command
        $this->connection->executeStatement('LISTEN my_channel');
        
        while (true) {
            // Wait for notification
            $notification = pg_get_notify($this->connection->getNativeConnection());
            
            if ($notification) {
                $this->logger->info('Received notification', [
                    'channel' => $notification['message'],
                    'payload' => $notification['payload']
                ]);
                
                // Process here
            }
            
            usleep(100000); // Wait 100ms
        }
    }
}

One-Time Process (Without Restart):

<?php

namespace App\Process;

use Cesurapp\SwooleBundle\Process\AbstractProcessJob;

class OneTimeProcess extends AbstractProcessJob
{
    public bool $ENABLE = true;
    public bool $RESTART = false; // Restart disabled

    public function __invoke(): void
    {
        // One-time operation
        $this->doSomething();
        
        // Process terminates when completed
    }
}

Notes:

  • Each process runs as a separate Swoole Process, isolated from each other
  • Processes start automatically when the server starts
  • When RESTART=true, the process restarts after RESTART_DELAY seconds upon completion
  • Processes must implement ProcessInterface (or extend AbstractProcessJob)
  • Automatically registered in Symfony DI container with lazy loading support

WebSocket Server

The bundle provides built-in WebSocket server support powered by Swoole. You can enable WebSocket functionality alongside the HTTP server to handle real-time bidirectional communication.

Enable WebSocket:

Configuration:

# config/packages/swoole.yaml
swoole:
    websocket_handler: App\WebSocket\MyWebSocketHandler

Or via environment variable:

SERVER_HTTP_SOCKET=true  # Enable WebSocket support

Create WebSocket Handler:

Your WebSocket handler must implement the initServerEvents method to register Swoole WebSocket events:

<?php

namespace App\WebSocket;

use Swoole\WebSocket\Frame;
use Swoole\WebSocket\Server;

class MyWebSocketHandler
{
    public function initServerEvents(Server $server): void
    {
        // WebSocket connection opened
        $server->on('open', function (Server $server, $request) {
            echo "Connection opened: {$request->fd}\n";
            $server->push($request->fd, json_encode([
                'type' => 'connected',
                'message' => 'Welcome to WebSocket server'
            ]));
        });

        // WebSocket message received
        $server->on('message', function (Server $server, Frame $frame) {
            echo "Received message from {$frame->fd}: {$frame->data}\n";
            
            // Echo back to sender
            $server->push($frame->fd, "Server received: {$frame->data}");
            
            // Broadcast to all connections
            foreach ($server->connections as $fd) {
                if ($server->isEstablished($fd)) {
                    $server->push($fd, "Broadcast: {$frame->data}");
                }
            }
        });

        // WebSocket connection closed
        $server->on('close', function (Server $server, int $fd) {
            echo "Connection closed: {$fd}\n";
        });
    }
}

Advanced WebSocket Handler with Dependency Injection:

<?php

namespace App\WebSocket;

use Swoole\WebSocket\Frame;
use Swoole\WebSocket\Server;
use Psr\Log\LoggerInterface;
use App\Service\ChatService;

class ChatWebSocketHandler
{
    public function __construct(
        private readonly LoggerInterface $logger,
        private readonly ChatService $chatService
    ) {
    }

    public function initServerEvents(Server $server): void
    {
        $server->on('open', function (Server $server, $request) {
            $this->logger->info('WebSocket connection opened', ['fd' => $request->fd]);
            
            // Authenticate user from request headers/cookies
            $token = $request->header['authorization'] ?? null;
            $user = $this->chatService->authenticateToken($token);
            
            if ($user) {
                $server->push($request->fd, json_encode([
                    'type' => 'auth_success',
                    'user' => $user
                ]));
            } else {
                $server->disconnect($request->fd);
            }
        });

        $server->on('message', function (Server $server, Frame $frame) {
            $this->logger->info('WebSocket message received', [
                'fd' => $frame->fd,
                'data' => $frame->data
            ]);
            
            $data = json_decode($frame->data, true);
            
            match ($data['type'] ?? null) {
                'chat_message' => $this->handleChatMessage($server, $frame->fd, $data),
                'typing' => $this->handleTyping($server, $frame->fd, $data),
                'ping' => $server->push($frame->fd, json_encode(['type' => 'pong'])),
                default => $this->logger->warning('Unknown message type', ['data' => $data])
            };
        });

        $server->on('close', function (Server $server, int $fd) {
            $this->logger->info('WebSocket connection closed', ['fd' => $fd]);
            $this->chatService->handleDisconnect($fd);
        });
    }

    private function handleChatMessage(Server $server, int $fd, array $data): void
    {
        $message = $this->chatService->saveMessage($fd, $data['message']);
        
        // Broadcast to room members
        foreach ($this->chatService->getRoomMembers($data['room_id']) as $memberId) {
            if ($server->isEstablished($memberId)) {
                $server->push($memberId, json_encode([
                    'type' => 'new_message',
                    'message' => $message
                ]));
            }
        }
    }

    private function handleTyping(Server $server, int $fd, array $data): void
    {
        // Notify room members about typing status
        foreach ($this->chatService->getRoomMembers($data['room_id']) as $memberId) {
            if ($memberId !== $fd && $server->isEstablished($memberId)) {
                $server->push($memberId, json_encode([
                    'type' => 'user_typing',
                    'user_id' => $data['user_id']
                ]));
            }
        }
    }
}

Client-Side JavaScript Example:

const ws = new WebSocket('ws://127.0.0.1:9090');

ws.onopen = function(event) {
    console.log('Connected to WebSocket server');
    ws.send(JSON.stringify({
        type: 'chat_message',
        room_id: 1,
        message: 'Hello, World!'
    }));
};

ws.onmessage = function(event) {
    const data = JSON.parse(event.data);
    console.log('Received:', data);
    
    switch(data.type) {
        case 'new_message':
            displayMessage(data.message);
            break;
        case 'user_typing':
            showTypingIndicator(data.user_id);
            break;
    }
};

ws.onclose = function(event) {
    console.log('Disconnected from WebSocket server');
};

ws.onerror = function(error) {
    console.error('WebSocket error:', error);
};

Available Swoole WebSocket Server Methods:

// Send message to specific connection
$server->push(int $fd, string $data, int $opcode = WEBSOCKET_OPCODE_TEXT): bool

// Check if connection is valid WebSocket connection
$server->isEstablished(int $fd): bool

// Disconnect connection
$server->disconnect(int $fd, int $code = SWOOLE_WEBSOCKET_CLOSE_NORMAL, string $reason = ''): bool

// Check if connection exists
$server->exist(int $fd): bool

// Get all connection IDs
$server->connections: Iterator

// Get connection info
$server->getClientInfo(int $fd): array|false

Configuration Options:

# Enable WebSocket
SERVER_HTTP_SOCKET=true

# WebSocket heartbeat (keep-alive)
SERVER_HTTP_SETTINGS_HEARTBEAT_CHECK_INTERVAL=60  # Check interval in seconds
SERVER_HTTP_SETTINGS_HEARTBEAT_IDLE_TIME=180      # Idle timeout in seconds

# Message size limit
SERVER_HTTP_SETTINGS_PACKAGE_MAX_LENGTH=15728640  # 15MB default

Notes:

  • WebSocket server runs on the same port as HTTP server
  • HTTP requests and WebSocket connections are handled simultaneously
  • WebSocket handler is initialized once when server starts
  • Each WebSocket connection has a unique file descriptor (fd)
  • Use $server->isEstablished($fd) before pushing data to avoid errors
  • WebSocket handler has access to Symfony DI container services
  • Heartbeat mechanism automatically closes idle connections

统计信息

  • 总下载量: 320
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 3
  • 点击次数: 1
  • 依赖项目数: 0
  • 推荐数: 0

GitHub 信息

  • Stars: 3
  • Watchers: 2
  • Forks: 2
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2023-12-13