美文网首页
Workerman创建WebSocket客户端和服务端推送数据

Workerman创建WebSocket客户端和服务端推送数据

作者: 许文同学 | 来源:发表于2018-03-22 11:24 被阅读0次

    本文场景:
    服务器A:服务端,目的是要接收外部数据处理后放入消息队列。
    服务器B:客户端,向服务器A提供数据。

    前面有文章测试了基于HTTP的方案,但都是基于一台服务器自己测试。实际在外部服务器测试时单机请求性能很差,远远达不到处理能力,网路性能制约严重。

    原基于HTTP的方案,每次推送都要重新建立连接,消耗过大。所以想到了Socket长连接方案。
    扩展:HTTP使用TCP 三次握手建立连接,客户端和服务器需要交换3个包。HTTPS除了 TCP 的三个包,还要加上 SSL握手需要的9个包,一共是12个包。

    服务端和客户端都是基于Workerman建立的

    // Workerman-WebSocket服务端
    <?php
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    
    $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin');
    $channel = $connection->channel();
    $q_name = 'amqplib_test';
    $channel->queue_declare($q_name, false, false, false, false);
    
    use Workerman\Worker;
    require_once __DIR__ . '/Workerman/Autoloader.php';
    
    $ws = new Worker('websocket://0.0.0.0:8282');
    $ws->onConnect = function($connection) use($channel,$q_name)
    {
        $connection->onWebSocketConnect = function($connection , $http_header)
        {
            // 可以在这里判断连接来源
        };
        $connection->onMessage = function($connection, $data) use($channel,$q_name)
        {
            $msg = new AMQPMessage($data);
            $channel->basic_publish($msg, '', $q_name);
            var_dump($data);
            // $connection->send('receive success');
        };
    };
    Worker::runAll();
    
    // Workerman-WebSocket客户端
    <?php
    use Workerman\Worker;
    use Workerman\Connection\AsyncTcpConnection;
    require_once __DIR__ . '/Workerman/Autoloader.php';
    $worker = new Worker();
    $status = true;
    // 进程启动时
    $worker->onWorkerStart = function()
    {
        // 以websocket协议连接远程websocket服务器 ws://IP:端口
        $ws_connection = new AsyncTcpConnection("ws://*.*.*.*:8282");
        // 连接成功
        $ws_connection->onConnect = function($connection)
        {
            // 调用发送处理
            send_msg($connection);
        };
        // 远程websocket服务器发来消息时
        $ws_connection->onMessage = function($connection, $data){
            echo "recv: $data\n";
        };
        // 连接上发生错误时,一般是连接远程websocket服务器失败错误
        $ws_connection->onError = function($connection, $code, $msg)
        {
            echo "error: $msg\n";
        };
        // 当连接远程websocket服务器的连接断开时
        $ws_connection->onClose = function($connection){
            echo "connection closed\n";
        };
        // 设置好以上各种回调后,执行连接操作
        $ws_connection->connect();
    };
    // 定义发送相关操作
    function send_msg($connection)
    {
        // 创建【可发送】状态变量
        $connection->Available = true;
        $connection->msg = 0;
        $do_send = function () use($connection)
        {
            // 判断当前是否可发送
            while($connection->Available)
            {
                echo $connection->msg.'\n';
                $connection->send($connection->msg);
                $connection->msg += 1;
            }
        };
        // 发生连接发送缓冲区满事件时设置不可发送消息
        $connection->onBufferFull = function($connection)
        {
            // 标记发送缓冲区满,暂停do_send发送
            $connection->Available = false;
        };
        // 当发送缓冲区数据发送完毕时触发
        $connection->onBufferDrain = function($connection)use($do_send)
        {
            $connection->Available = true;
            $do_send();
        };
        // 执行发送
        $do_send();
    }
    // 执行
    Worker::runAll();
    

    之前HTTP方案的测试受制于发送瓶颈太严重,单台请求能力才几百每秒,多服务器请求服务端速率正常倍增。

    Socket方案外部对服务器的测试截图,平均处理6K/s

    websocket本地对服务端测试2.png

    下面是外部直接作为MQ生产者的测试,平均处理1.7K/s

    外部裸连MQ.png

    相关文章

      网友评论

          本文标题:Workerman创建WebSocket客户端和服务端推送数据

          本文链接:https://www.haomeiwen.com/subject/psaoqftx.html