美文网首页
reactphp/socket大体流程梳理

reactphp/socket大体流程梳理

作者: 小山丘321 | 来源:发表于2020-03-28 22:01 被阅读0次
    1. 第一步,首先调用React\EventLoop\Factory生成loop,这个loop后面用于执行整个 socket服务的运行,这里我们按照他生成的是StreamSelectLoop来解释
    // 这里生成一个$loop
    $loop = Factory::create(); 
    
    
    
    // 生成$loop逻辑,路径 vendor/react/event-loop/src/Factory.php
    public static function create()
    {
        // @codeCoverageIgnoreStart
        if (\function_exists('uv_loop_new')) {
            // only use ext-uv on PHP 7
            return new ExtUvLoop();
        } elseif (\class_exists('libev\EventLoop', false)) {
            return new ExtLibevLoop();
        } elseif (\class_exists('EvLoop', false)) {
            return new ExtEvLoop();
        } elseif (\class_exists('EventBase', false)) {
            return new ExtEventLoop();
        } elseif (\function_exists('event_base_new') && \PHP_MAJOR_VERSION === 5) {
            // only use ext-libevent on PHP 5 for now
            return new ExtLibeventLoop();
        }
    
        return new StreamSelectLoop();
        // @codeCoverageIgnoreEnd
    }
    
    
    // 创建StreamSelectLoop时会创建FutureTickQueue和Timers,
       用于后面的监听的睡眠时间,这个会在下面的流程做出说明
    
    1. 根据设置的监听的资源创建对应的服务 Server,如:tcp://127.0.0.1:9090
    //创建 server
    $server = new Server(isset($argv[1]) ? $argv[1] : 0, $loop, array(
        'tls' => array(
            'local_cert' => isset($argv[2]) ? $argv[2] : (__DIR__ . '/localhost.pem')
        )
    ));
    
    
    
    
    //创建 tcp-server,路径 vendor/react/socket/src/Server.php
    //加上对应 tcp-server 的监听数组,
    //他的数据保存在$tcpServer->listeners
    //$tcpServer->listeners['connection'][] = tcp-server-connection-callable
    //$tcpServer->listeners['error'][] = tcp-server-error-callable
    
    //tcp-server-connection-callable 对应匿名函数=》
    function (ConnectionInterface $conn) use ($that) {
        $that->emit('connection', array($conn));
    })
    //tcp-server-connection-callable会触发 server-connection-callable
    
    public function __construct($uri, LoopInterface $loop, array $context = array())
    {
        // 省略
        ....
        ...
        ..
        //创建 tcp-server,这里会去创建一个socket服务资源,设置成非阻塞模式,
        //并将当前资源和匿名函数分别放置到$loop的readStreams数组和readListeners数组里,
        //$loop->readStreams[资源 id] => 服务资源信息
        //$loop->readListeners[资源 id] => loop-read-listener-callable
            $that = $this;
            $this->loop->addReadStream($this->master, function ($master) use ($that) {
                $newSocket = @\stream_socket_accept($master);
                if (false === $newSocket) {
                    $that->emit('error', array(new \RuntimeException('Error accepting new connection')));
    
                    return;
                }
                $that->handleConnection($newSocket);
            });
        // 上方loop-read-listener-callable触发时会去触发 tcp-server-connection-callable
        $server = new TcpServer(str_replace('tls://', '', $uri), $loop, $context['tcp']);
    
        $this->server = $server;
    
        $that = $this;
        $server->on('connection', function (ConnectionInterface $conn) use ($that) {
            $that->emit('connection', array($conn));
        });
        $server->on('error', function (Exception $error) use ($that) {
            $that->emit('error', array($error));
        });
    }
    
    
    //当 tcp-server的tcp-server-connection-callable和被触发的时候,
    //会同步去触发server的server-connection-callable,详情看emit函数:
    public function emit($event, array $arguments = [])
    {
        if ($event === null) {
            throw new InvalidArgumentException('event name must not be null');
        }
    
        if (isset($this->listeners[$event])) {
            foreach ($this->listeners[$event] as $listener) {
                $listener(...$arguments);
            }
        }
    
        if (isset($this->onceListeners[$event])) {
            $listeners = $this->onceListeners[$event];
            unset($this->onceListeners[$event]);
            foreach ($listeners as $listener) {
                $listener(...$arguments);
            }
        }
    }
    
    1. 创建 limiting-server
    $server = new LimitingServer($server, null);
    
    //创建 limiting-server,路径 vendor/react/socket/src/Server.php
    //这里会帮第二步创建的server加上对应 server 的监听数组,
    //他的数据保存在$server->listeners
    //$server->listeners['connection'][] = server-connection-callable
    //$server->listeners['error'][] = server-error-callable
    
    public function __construct(ServerInterface $server, $connectionLimit, $pauseOnLimit = false)
    {
        $this->server = $server;
        $this->limit = $connectionLimit;
        if ($connectionLimit !== null) {
            $this->pauseOnLimit = $pauseOnLimit;
        }
    
        $this->server->on('connection', array($this, 'handleConnection'));
        $this->server->on('error', array($this, 'handleError'));
    }
    
    1. 为第三步创建的 limiting-server 加上对应监听信息
    //数据保存在$limiting-server->listeners
    //$limiting-server->listeners['connection'][] = limiting-server-connection-callable
    //$limiting-server->listeners['error'][] = limiting-server-error-callable
    
    $server->on('connection', function (ConnectionInterface $client) use ($server) {
        // whenever a new message comes in
        $client->on('data', function ($data) use ($client, $server) {
            // remove any non-word characters (just for the demo)
            $data = trim(preg_replace('/[^\w\d \.\,\-\!\?]/u', '', $data));
    
            // ignore empty messages
            if ($data === '') {
                return;
            }
    
            // prefix with client IP and broadcast to all connected clients
            $data = trim(parse_url($client->getRemoteAddress(), PHP_URL_HOST), '[]') . ': ' . $data . PHP_EOL;
            foreach ($server->getConnections() as $connection) {
                $connection->write($data);
            }
        });
    });
    
    $server->on('error', 'printf');
    
    1. 运行$loop的 run方法,现在我们来看下这个主要的运行内容逻辑
    public function run()
    {
        // 设置运行状态
        $this->running = true;
    
        while ($this->running) {
            // 执行所有已经存在第一步生成的FutureTickQueue队列的的异步函数
            $this->futureTickQueue->tick();
    
            // 执行所有符合当前激活条件的时间Timer对象
            $this->timers->tick();
    
            // FutureTickQueue不为空,等待回调时
            if (!$this->running || !$this->futureTickQueue->isEmpty()) {
                $timeout = 0;
    
            // There is a pending timer, only block until it is due ...
            } elseif ($scheduledAt = $this->timers->getFirst()) {
                $timeout = $scheduledAt - $this->timers->getTime();
                if ($timeout < 0) {
                    $timeout = 0;
                } else {
                    // Convert float seconds to int microseconds.
                    // Ensure we do not exceed maximum integer size, which may
                    // cause the loop to tick once every ~35min on 32bit systems.
                    $timeout *= self::MICROSECONDS_PER_SECOND;
                    $timeout = $timeout > \PHP_INT_MAX ? \PHP_INT_MAX : (int)$timeout;
                }
    
            // The only possible event is stream or signal activity, so wait forever ...
            } elseif ($this->readStreams || $this->writeStreams || !$this->signals->isEmpty()) {
                $timeout = null;
    
            // There's nothing left to do ...
            } else {
                break;
            }
    
            $this->waitForStreamActivity($timeout);
        }
    }
    
    /**
     * Wait/check for stream activity, or until the next timer is due.
     *
     * @param integer|null $timeout Activity timeout in microseconds, or null to wait forever.
     */
    private function waitForStreamActivity($timeout)
    {
        $read  = $this->readStreams;
        $write = $this->writeStreams;
    
        $available = $this->streamSelect($read, $write, $timeout);
        if ($this->pcntlPoll) {
            \pcntl_signal_dispatch();
        }
        if (false === $available) {
            // if a system call has been interrupted,
            // we cannot rely on it's outcome
            return;
        }
    
        foreach ($read as $stream) {
            $key = (int) $stream;
    
            if (isset($this->readListeners[$key])) {
                \call_user_func($this->readListeners[$key], $stream);
            }
        }
    
        foreach ($write as $stream) {
            $key = (int) $stream;
    
            if (isset($this->writeListeners[$key])) {
                \call_user_func($this->writeListeners[$key], $stream);
            }
        }
    }
    
    /**
     * Emulate a stream_select() implementation that does not break when passed
     * empty stream arrays.
     *
     * @param array    $read    An array of read streams to select upon.
     * @param array    $write   An array of write streams to select upon.
     * @param int|null $timeout Activity timeout in microseconds, or null to wait forever.
     *
     * @return int|false The total number of streams that are ready for read/write.
     *     Can return false if stream_select() is interrupted by a signal.
     */
    private function streamSelect(array &$read, array &$write, $timeout)
    {
        if ($read || $write) {
            // We do not usually use or expose the `exceptfds` parameter passed to the underlying `select`.
            // However, Windows does not report failed connection attempts in `writefds` passed to `select` like most other platforms.
            // Instead, it uses `writefds` only for successful connection attempts and `exceptfds` for failed connection attempts.
            // We work around this by adding all sockets that look like a pending connection attempt to `exceptfds` automatically on Windows and merge it back later.
            // This ensures the public API matches other loop implementations across all platforms (see also test suite or rather test matrix).
            // Lacking better APIs, every write-only socket that has not yet read any data is assumed to be in a pending connection attempt state.
            // @link https://docs.microsoft.com/de-de/windows/win32/api/winsock2/nf-winsock2-select
            $except = null;
            if (\DIRECTORY_SEPARATOR === '\\') {
                $except = array();
                foreach ($write as $key => $socket) {
                    if (!isset($read[$key]) && @\ftell($socket) === 0) {
                        $except[$key] = $socket;
                    }
                }
            }
    
            // suppress warnings that occur, when stream_select is interrupted by a signal
            $ret = @\stream_select($read, $write, $except, $timeout === null ? null : 0, $timeout);
    
            if ($except) {
                $write = \array_merge($write, $except);
            }
            return $ret;
        }
    
        if ($timeout > 0) {
            \usleep($timeout);
        } elseif ($timeout === null) {
            // wait forever (we only reach this if we're only awaiting signals)
            // this may be interrupted and return earlier when a signal is received
            \sleep(PHP_INT_MAX);
        }
    
        return 0;
    }
    
    1. 总的来说就是:tcp-server触发server,sever触发limiting-server

    相关文章

      网友评论

          本文标题:reactphp/socket大体流程梳理

          本文链接:https://www.haomeiwen.com/subject/bbkeuhtx.html