cesurapp/swoole-bundle
最新稳定版本:1.2.8
Composer 安装命令:
composer require cesurapp/swoole-bundle
包简介
Symfony Swoole Bundle
README 文档
README
Built-in Swoole http server, background jobs (Task), scheduled task (Cron) worker are available. Failed jobs are saved in the database to be retried. Each server has built-in background task worker. Scheduled tasks run simultaneously on all servers. It is not possible for tasks to run at the same time as locking is used.
Install
Required Symfony 8
composer req cesurapp/swoole-bundle
Edit: public/index.php
... require_once dirname(__DIR__).'/vendor/cesurapp/swoole-bundle/src/Runtime/entrypoint.php'; require_once dirname(__DIR__).'/vendor/autoload_runtime.php'; ...
Configuration:
# config/packages/swoole.yaml swoole: entrypoint: public/index.php watch_dir: /config,/src,/templates watch_extension: '*.php,*.yaml,*.yml,*.twig' replace_http_client: true # Replace Symfony HTTP Client to Swoole Client cron_worker: true # Enable Cron Worker Service task_worker: true # Enable Task Worker Service task_sync_mode: false # Enable SYNC Mode -> Default false failed_task_retry: '@EveryMinute10' failed_task_attempt: 2 # Failed Task Retry Count
Server Environment: .env
# Worker Configuration #SERVER_WORKER_CRON=true # Run Cron Worker -> Default = 1 #SERVER_WORKER_TASK=true # Run Task Worker -> Default = 1 #SERVER_WORKER_PROCESS=true # Run Process Worker -> Default = 1 # HTTP Server Configuration SERVER_HTTP_HOST=127.0.0.1 # Default = 0.0.0.0 SERVER_HTTP_PORT=9090 # Default = 80 #SERVER_HTTP_MODE=2 # SWOOLE_PROCESS -> Default = 2 #SERVER_HTTP_SOCK_TYPE=1 # SWOOLE_SOCK_TCP -> Default = 1 #SERVER_HTTP_SOCKET=false # Websocket Socket -> Default = false # HTTP Server Settings #SERVER_HTTP_SETTINGS_WORKER_NUM=2 # Default = CPU Count #SERVER_HTTP_SETTINGS_TASK_WORKER_NUM=1 # Default = CPU Count / 2 #SERVER_HTTP_SETTINGS_ENABLE_STATIC_HANDLER=false # Default = false #SERVER_HTTP_SETTINGS_LOG_LEVEL=4 # Details Openswoole\Constant LOG_LEVEL -> Default = 4 (SWOOLE_LOG_WARNING) #SERVER_HTTP_SETTINGS_MAX_WAIT_TIME=60 # Default = 60 #SERVER_HTTP_SETTINGS_TASK_ENABLE_COROUTINE=true # Default = true #SERVER_HTTP_SETTINGS_TASK_MAX_REQUEST=0 # Default = 0 #SERVER_HTTP_SETTINGS_PACKAGE_MAX_LENGTH=15728640 # 15MB -> Default = 15728640 #SERVER_HTTP_SETTINGS_HTTP_COMPRESSION=true # Default = true #SERVER_HTTP_SETTINGS_MAX_REQUEST=10000 # Default = 10000 #SERVER_HTTP_SETTINGS_HEARTBEAT_CHECK_INTERVAL=60 # Default = 60 #SERVER_HTTP_SETTINGS_HEARTBEAT_IDLE_TIME=180 # Default = 180 # TCP Server Configuration #SERVER_TCP_PORT=9502 # Default = 9502
Server Commands
# Cron Commands bin/console cron:list # List cron jobs bin/console cron:run AcmeCron # Run cron process one time, without locking. # Server Commands bin/console server:start # Start http,cron,queue server bin/console server:stop # Stop http,cron,queue server bin/console server:status # Status http,cron,queue server bin/console server:watch # Start http,cron,queue server for development mode (file watcher enabled) # Task|Job Commands bin/console task:list # List registered tasks bin/console task:failed:clear # Clear all failed task bin/console task:failed:retry # Forced send all failed tasks to swoole task worker bin/console task:failed:view # Lists failed tasks
Create Cron Job
You can use cron expression for scheduled tasks, or you can use predefined expressions.
/** * Predefined Scheduling * * '@yearly' => '0 0 1 1 *', * '@annually' => '0 0 1 1 *', * '@monthly' => '0 0 1 * *', * '@weekly' => '0 0 * * 0', * '@daily' => '0 0 * * *', * '@hourly' => '0 * * * *', * '@EveryMinute' => 'w* * * * *', * "@EveryMinute5' => '*\/5 * * * *', * '@EveryMinute10' => '*\/10 * * * *', * '@EveryMinute15' => '*\/15 * * * *', * '@EveryMinute30' => '*\/30 * * * *',``` */ class ExampleJob extends \Cesurapp\SwooleBundle\Cron\AbstractCronJob { /** * @see AbstractCronJob */ public string $TIME = '@EveryMinute10'; /** * Cron is Enable|Disable. */ public bool $ENABLE = true; /** * Cron Context */ public function __invoke(): void { } }
Create Task (Background Job or Queue)
Data passed to jobs must be of type string, int, bool, array, objects cannot be serialized.
Create:
class ExampleTask implements \Cesurapp\SwooleBundle\Task\TaskInterface { public function __invoke(object|string $data = null): void { var_dump( $data['name'], $data['invoke'] ); } }
Handle Task:
public function hello(\Cesurapp\SwooleBundle\Task\TaskHandler $taskHandler) { $taskHandler->dispatch(ExampleTask::class, [ 'name' => 'Test', 'invoke' => 'Data' ]); }
Create Process Worker
Process Worker allows you to create continuously running tasks in a separate process when the server starts. It's ideal for Redis LISTEN, Postgres LISTEN, or similar continuous listening commands.
Features:
- Each process runs as a separate Swoole Process
- Automatic restart support when the process completes
- Configurable restart delay
- Enable/Disable support
Configuration:
# config/packages/swoole.yaml swoole: process_worker: true # Default: true
Or via environment variable:
SERVER_WORKER_PROCESS=1 # Enable SERVER_WORKER_PROCESS=0 # Disable
Create Process Job:
Use ProcessInterface or extend AbstractProcessJob:
<?php namespace App\Process; use Cesurapp\SwooleBundle\Process\AbstractProcessJob; class RedisListenerProcess extends AbstractProcessJob { // Is process active? public bool $ENABLE = true; // Restart when process completes public bool $RESTART = true; // Wait time before restart (seconds) public int $RESTART_DELAY = 5; public function __construct( private readonly RedisClient $redis, private readonly LoggerInterface $logger ) { } public function __invoke(): void { $this->logger->info('Redis listener started'); // Redis SUBSCRIBE command $this->redis->subscribe(['channel1', 'channel2'], function ($redis, $channel, $message) { $this->logger->info("Received message from {$channel}: {$message}"); // Process here }); } }
Postgres LISTEN Example:
<?php namespace App\Process; use Cesurapp\SwooleBundle\Process\AbstractProcessJob; use Doctrine\DBAL\Connection; class PostgresListenerProcess extends AbstractProcessJob { public bool $ENABLE = true; public bool $RESTART = true; public int $RESTART_DELAY = 3; public function __construct( private readonly Connection $connection, private readonly LoggerInterface $logger ) { } public function __invoke(): void { $this->logger->info('Postgres listener started'); // LISTEN command $this->connection->executeStatement('LISTEN my_channel'); while (true) { // Wait for notification $notification = pg_get_notify($this->connection->getNativeConnection()); if ($notification) { $this->logger->info('Received notification', [ 'channel' => $notification['message'], 'payload' => $notification['payload'] ]); // Process here } usleep(100000); // Wait 100ms } } }
One-Time Process (Without Restart):
<?php namespace App\Process; use Cesurapp\SwooleBundle\Process\AbstractProcessJob; class OneTimeProcess extends AbstractProcessJob { public bool $ENABLE = true; public bool $RESTART = false; // Restart disabled public function __invoke(): void { // One-time operation $this->doSomething(); // Process terminates when completed } }
Notes:
- Each process runs as a separate Swoole Process, isolated from each other
- Processes start automatically when the server starts
- When
RESTART=true, the process restarts afterRESTART_DELAYseconds upon completion - Processes must implement
ProcessInterface(or extendAbstractProcessJob) - Automatically registered in Symfony DI container with lazy loading support
WebSocket Server
The bundle provides built-in WebSocket server support powered by Swoole. You can enable WebSocket functionality alongside the HTTP server to handle real-time bidirectional communication.
Enable WebSocket:
Configuration:
# config/packages/swoole.yaml swoole: websocket_handler: App\WebSocket\MyWebSocketHandler
Or via environment variable:
SERVER_HTTP_SOCKET=true # Enable WebSocket support
Create WebSocket Handler:
Your WebSocket handler must implement the initServerEvents method to register Swoole WebSocket events:
<?php namespace App\WebSocket; use Swoole\WebSocket\Frame; use Swoole\WebSocket\Server; class MyWebSocketHandler { public function initServerEvents(Server $server): void { // WebSocket connection opened $server->on('open', function (Server $server, $request) { echo "Connection opened: {$request->fd}\n"; $server->push($request->fd, json_encode([ 'type' => 'connected', 'message' => 'Welcome to WebSocket server' ])); }); // WebSocket message received $server->on('message', function (Server $server, Frame $frame) { echo "Received message from {$frame->fd}: {$frame->data}\n"; // Echo back to sender $server->push($frame->fd, "Server received: {$frame->data}"); // Broadcast to all connections foreach ($server->connections as $fd) { if ($server->isEstablished($fd)) { $server->push($fd, "Broadcast: {$frame->data}"); } } }); // WebSocket connection closed $server->on('close', function (Server $server, int $fd) { echo "Connection closed: {$fd}\n"; }); } }
Advanced WebSocket Handler with Dependency Injection:
<?php namespace App\WebSocket; use Swoole\WebSocket\Frame; use Swoole\WebSocket\Server; use Psr\Log\LoggerInterface; use App\Service\ChatService; class ChatWebSocketHandler { public function __construct( private readonly LoggerInterface $logger, private readonly ChatService $chatService ) { } public function initServerEvents(Server $server): void { $server->on('open', function (Server $server, $request) { $this->logger->info('WebSocket connection opened', ['fd' => $request->fd]); // Authenticate user from request headers/cookies $token = $request->header['authorization'] ?? null; $user = $this->chatService->authenticateToken($token); if ($user) { $server->push($request->fd, json_encode([ 'type' => 'auth_success', 'user' => $user ])); } else { $server->disconnect($request->fd); } }); $server->on('message', function (Server $server, Frame $frame) { $this->logger->info('WebSocket message received', [ 'fd' => $frame->fd, 'data' => $frame->data ]); $data = json_decode($frame->data, true); match ($data['type'] ?? null) { 'chat_message' => $this->handleChatMessage($server, $frame->fd, $data), 'typing' => $this->handleTyping($server, $frame->fd, $data), 'ping' => $server->push($frame->fd, json_encode(['type' => 'pong'])), default => $this->logger->warning('Unknown message type', ['data' => $data]) }; }); $server->on('close', function (Server $server, int $fd) { $this->logger->info('WebSocket connection closed', ['fd' => $fd]); $this->chatService->handleDisconnect($fd); }); } private function handleChatMessage(Server $server, int $fd, array $data): void { $message = $this->chatService->saveMessage($fd, $data['message']); // Broadcast to room members foreach ($this->chatService->getRoomMembers($data['room_id']) as $memberId) { if ($server->isEstablished($memberId)) { $server->push($memberId, json_encode([ 'type' => 'new_message', 'message' => $message ])); } } } private function handleTyping(Server $server, int $fd, array $data): void { // Notify room members about typing status foreach ($this->chatService->getRoomMembers($data['room_id']) as $memberId) { if ($memberId !== $fd && $server->isEstablished($memberId)) { $server->push($memberId, json_encode([ 'type' => 'user_typing', 'user_id' => $data['user_id'] ])); } } } }
Client-Side JavaScript Example:
const ws = new WebSocket('ws://127.0.0.1:9090'); ws.onopen = function(event) { console.log('Connected to WebSocket server'); ws.send(JSON.stringify({ type: 'chat_message', room_id: 1, message: 'Hello, World!' })); }; ws.onmessage = function(event) { const data = JSON.parse(event.data); console.log('Received:', data); switch(data.type) { case 'new_message': displayMessage(data.message); break; case 'user_typing': showTypingIndicator(data.user_id); break; } }; ws.onclose = function(event) { console.log('Disconnected from WebSocket server'); }; ws.onerror = function(error) { console.error('WebSocket error:', error); };
Available Swoole WebSocket Server Methods:
// Send message to specific connection $server->push(int $fd, string $data, int $opcode = WEBSOCKET_OPCODE_TEXT): bool // Check if connection is valid WebSocket connection $server->isEstablished(int $fd): bool // Disconnect connection $server->disconnect(int $fd, int $code = SWOOLE_WEBSOCKET_CLOSE_NORMAL, string $reason = ''): bool // Check if connection exists $server->exist(int $fd): bool // Get all connection IDs $server->connections: Iterator // Get connection info $server->getClientInfo(int $fd): array|false
Configuration Options:
# Enable WebSocket SERVER_HTTP_SOCKET=true # WebSocket heartbeat (keep-alive) SERVER_HTTP_SETTINGS_HEARTBEAT_CHECK_INTERVAL=60 # Check interval in seconds SERVER_HTTP_SETTINGS_HEARTBEAT_IDLE_TIME=180 # Idle timeout in seconds # Message size limit SERVER_HTTP_SETTINGS_PACKAGE_MAX_LENGTH=15728640 # 15MB default
Notes:
- WebSocket server runs on the same port as HTTP server
- HTTP requests and WebSocket connections are handled simultaneously
- WebSocket handler is initialized once when server starts
- Each WebSocket connection has a unique file descriptor (fd)
- Use
$server->isEstablished($fd)before pushing data to avoid errors - WebSocket handler has access to Symfony DI container services
- Heartbeat mechanism automatically closes idle connections
统计信息
- 总下载量: 320
- 月度下载量: 0
- 日度下载量: 0
- 收藏数: 3
- 点击次数: 1
- 依赖项目数: 0
- 推荐数: 0
其他信息
- 授权协议: MIT
- 更新时间: 2023-12-13