Ratchet x Laravel x Redis-Async

Ratchet x Laravel

因為最近需要做 Realtime 的系統,一般來說都會使用 node.js 這類 non-blocking io 的語言來做,不過最近看到 PHP 也有 ReactPHP (Event-driven, non-blocking I/O with PHP.) 可以來處理 Realtime 的情況。
要用 PHP 透過 Websocket 建立 Realtime 的系統,網路有現成的 Ratchet 可以直接使用,但是 Ratchet 只有處理 Websocket 的部份,所以需要 Laravel 這樣完整的 Framework 來補足。
大概上來說,需要實作的功能有
1. 當使用者在 client 操作時,能透過 websocket 即時讓 Server 執行相對應的動作,所以要能讓 Ratchet 能使用 Laravel 中的功能,方便處理一些 DB 或 Service 的事情
2. 讓 Server 在執行 cronjob 時,能即時把訊息更新給 Client ,所以要讓 Laravel 能主動透過 Ratchet 傳訊息給 Client

Sample Code

先安裝 Laravel

這部份就不多說了。
composer create-project laravel/laravel --prefer-dist laratchet

在 Laravel 中裝上 Ratchet

cd larachet #切到剛剛建好的目錄
composer require cboden/ratchet

用 Laravel 的 Artisan 執行 Ratchet

用 artisan 執行 ratchet 是為了讓 ratchet 可以直接使用 laravel 的套件,就像平常在開發 laravel 的案件一樣。

建立 Ratchet WampService

Websocket 服務,當有人傳送訊息給 server 時,server 會廣播給所有目前有連線的 Client。
/laratchet/app/Websocket/Chat.php
<?php
namespace App\Websocket;
use Ratchet\MessageComponentInterface;
use Ratchet\ConnectionInterface;
use Illuminate\Support\Facades\Redis;
use Exception;
class Chat implements MessageComponentInterface {
    protected $clients;

    public function __construct() {
        $this->clients = new \SplObjectStorage;
    }

    public function onOpen(ConnectionInterface $conn) {
        try {
            // Store the new connection to send messages to later
            $this->clients->attach($conn);
            $conn->send('welcome');
            Log::v('S', $conn, 'welcome');

            $request = (array)$conn->WebSocket->request;
            $request = (array)array_get($request, "\0*\0headers");
            $request = (array)array_get($request, "\0*\0headers.user-agent");
            $request = array_get($request, "\0*\0values.0");
            $request = (empty($request)) ? 'unknown' : $request;
            Log::v('R', $conn, "new client({$conn->resourceId}) on {$conn->remoteAddress}({$request})");
        } catch (Exception $e) {
            Log::e($e);
        }
    }

    public function onMessage(ConnectionInterface $from, $msg) {
        try {
            Log::v('R', $from, "receiving message \"{$msg}\"");
            $numRecv = count($this->clients) - 1;
            foreach ($this->clients as $client) {
                if ($from !== $client) {
                    // The sender is not the receiver, send to each client connected
                    $client->send($msg);
                    Log::v('S', $client, "sending message \"{$msg}\"");
                }
            }
        } catch (Exception $e) {
            Log::e($e);
        }
    }

    public function onClose(ConnectionInterface $conn) {
        try {
            // The connection is closed, remove it, as we can no longer send it messages
            $this->clients->detach($conn);

            Log::v('R', $conn, 'close', "Client({$conn->resourceId}) has disconnected");
        } catch (Exception $e) {
            Log::e($e);
        }
    }

    public function onError(ConnectionInterface $conn, \Exception $e) {
        Log::e($e);
        $conn->close();
    }
}

建立執行 Ratchet 的 Command

可以用 php artisan websocket:start 來啓動剛剛寫好的 websocket 服務
/laratchet/app/console/command/WsChatCommand.php
<?php namespace App\Console\Commands;
use Illuminate\Console\Command;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputArgument;

use Ratchet\Server\IoServer;
use Ratchet\Http\HttpServer;
use Ratchet\WebSocket\WsServer;

class WsChatCommand extends Command {
/**
         * The console command name.
         *
         * @var string
         */
        protected $name = 'websocket:start';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Start Websocket Service.';

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function fire()
    {
        $port = intval($this->option('port'));
           $loop   = \React\EventLoop\Factory::create();

        $webSock = new \React\Socket\Server($loop);
        $webSock->listen($port, '0.0.0.0');
        $webServer = new \Ratchet\Server\IoServer(
            new \Ratchet\Http\HttpServer(
                new \Ratchet\WebSocket\WsServer(
                       new \App\Websocket\Chat()
                )
            ),
            $webSock
        );

        \App\Websocket\Log::v(' ', $loop, "Starting Websocket Service on port " . $port);
        $loop->run();
    }

    /**
     * Get the console command arguments.
     *
     * @return array
     */
    protected function getArguments()
    {
        return [
        ];
    }

    /**
     * Get the console command options.
     *
     * @return array
     */
    protected function getOptions()
    {
        return [
            ['port', 'p', InputOption::VALUE_OPTIONAL, 'Port where to launch the server.', 9090],
        ];
    }
}

php artisan websocket:start

還有一個 Log.php 請直接到 Github 中下載
執行成功的話,應該會出現以下畫面
Starting Websocket Success

測試一下

用 Firefox 的 Console 來測試一下 Websocket 的功能有沒有正常
先開啓 2 個 console 來連線,然後由其中一方傳送訊息給對方
enter image description here
enter image description here
enter image description here

讓 Laravel 可以透過 Ratchet 傳訊息給 Client

這部份在 Laravel - Broadcasting Events 官方文件中就有提到了,只是官方文件中是使用 socket.io 來處理 websocket 的部份。
在 Ratchet 上也是使用 Publish / Subscribe on Redis 的方式來處理 Laravel 跟 Websocket 之間溝通的問題。

安裝 Redis

這也不多說了
brew install redis-server
redis-server #啓動redis

安裝 predis/predis-async

使用 non-blocking 版本的 redis client 套件,才能讓 Process 同時處理 websocket 跟 subscribe 這兩件事情
# 舊版的 predis-async 會要求安裝 ext-phpiredis,dev-master#b278b9e的版本不會
# composer require predis/predis-async
composer require predis/predis-async:dev-master#b278b9e

修改 Laravel Boardcasting 的設定

/laratchet/config/braodcasting.php
'default' => env('BROADCAST_DRIVER', 'redis'),

建立 Broadcasting Event (Publish)

/laratchet/app/events/WsPublish.php
<?php

namespace App\Events;

use App\Events\Event;
use Illuminate\Queue\SerializesModels;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;

class WsPublish extends Event implements ShouldBroadcast
{
    use SerializesModels;
    public $message;

    /**
     * Create a new event instance.
     *
     * @return void
     */
    public function __construct($channel, $data)
    {
        $this->message = [
                $channel, $data
            ];
    }

    /**
     * Get the channels the event should be broadcast on.
     *
     * @return array
     */
    public function broadcastOn()
    {
        return ['WampMessage'];
    }
}

推播訊息給 client (Subscribe)

/laratchet/app/Websocket/pusher.php
<?php

namespace App\Websocket;

use Ratchet\ConnectionInterface;
use Ratchet\Wamp\WampServerInterface;

class Pusher implements WampServerInterface {
    public function __construct($loop) {
        $redis_host = config('database.redis.default.host');
        $redis_port = config('database.redis.default.port');
        $client = new \Predis\Async\Client('tcp://'. $redis_host. ':'. $redis_port, $loop);
        $client->connect(function ($client) use ($loop) {
            // $logger = new \Predis\Async\Client('tcp://127.0.0.1:6379', $loop);
            $logger = null;
            $client->pubSubLoop('WampMessage', function ($event) use ($logger) {
                $payload = json_decode($event->payload, true);
                $message = json_encode($payload['data']['message']);
                Chat::getInstance()->broadcast($message);
            });
            Log::v(' ', $loop, "Connected to Redis.");
        });
    }
    # 其它 WampServerInterface 該實作的空 function 就不列出來了,請直接參考 Github
}

在 Ratchet 建立 broadcast 的 function

這個 function 是為了 Demo 專案加的,實際上收到 publish 出來的 message 後,應該會去做該功能真的該做的事情
/laratchet/app/Websocket/chat.php
    # 只列有新增的部份
class Chat implements MessageComponentInterface {
    private static $instance;
    public function __construct() {
        $this->clients = new \SplObjectStorage;
        self::$instance = $this;
    }

    public static function getInstance() {
        return self::$instance;
    }

    public function broadcast($msg) {
        foreach ($this->clients as $client) {
            $client->send($msg);
            Log::v('S', $client, "sending message \"{$msg}\"");
        }
    }
}

啓動 websocket 時加入 Subscribe 的監聽

/laratchet/app/Console/Commands/WsChatCommand.php
$loop   = \React\EventLoop\Factory::create();
$pusher = new \App\Websocket\Pusher($loop);

$webSock = new \React\Socket\Server($loop);
$webSock->listen($port, '0.0.0.0');
$webServer = new \Ratchet\Server\IoServer(
    new \Ratchet\Http\HttpServer(
        new \Ratchet\WebSocket\WsServer(
            new \App\Websocket\Chat($pusher)
        )
    ),
    $webSock
);

測試一下

一樣使用 Firefox 的 console 連上 websocket, 執行下面的指令之後應該就會看到, console 出現 websocket 傳回來的訊息了
php artisan tinker
>>> event(new App\Events\WsPublish('hihi from laravel'));

Sample Code

最後

第一次開發這種 Socket 的系統,感覺很多思考方式都跟以前寫網頁程式都不一樣。ReactPHP 也還不太熟析,不知道這樣的作法有沒有什麼問題,之後會再多作一些測試或壓力測試。
還要再學習了…
參考網頁
Laravel 5 Websockets and Queue Async with socket alert callback
Thruway - 另一套像 Ratchet 的套件
Pubnub - 另一套像 Pusher 的服務
BrainSocket
laravel-4-real-time-chat github project
LaravelRatchetChat github project
JWT
在 laravel 5 實作瀏覽器推播通知
Written with StackEdit.

0 意見:

張貼留言