美文网首页phpSwoft框架研究
[原创]Swoft源码剖析-RPC功能实现

[原创]Swoft源码剖析-RPC功能实现

作者: bromine | 来源:发表于2018-04-02 21:03 被阅读634次

    Swoft提供了一个自建RPC(远程方法调用)实现,让你可以方便的调用其他Swoft上的服务。

    RPC服务端的初始化

    RPC有两种启动方式Http伴随启动RPC单独启动。值得一提的是目前swoole的tcp服务即RPC服务,暂没有其他的tcp服务功能,所以基本上tcp相关的配置指代的就是RPC。

    Http伴随启动

    swoft的RPC 服务在Http服务启动时候伴随启动

    //Swoft\Http\Server\Http\HttpServer.php
    
    /**
     * Http Server
     */
    class HttpServer extends AbstractServer
        /**
         * Start Server
         *
         * @throws \Swoft\Exception\RuntimeException
         */
        public function start()
        {
            //coco ...
    
            //根据.env配置文件Server区段的TCPABLE字段决定是否启动RPC服务
            if ((int)$this->serverSetting['tcpable'] === 1) {
                $this->registerRpcEvent();
            }
            //code ....
        }
    }
    
    Swoole监听

    初始化流程即根据相关注解注册一个swoole监听

    //Swoft\Http\Server\Http\HttpServer.php
        /**
         * Register rpc event, swoft/rpc-server required
         *
         * @throws \Swoft\Exception\RuntimeException
         */
        protected function registerRpcEvent()
        {
            //含有@SwooleListener且type为SwooleEvent::TYPE_PORT的Bean,即RpcEventListener
            $swooleListeners = SwooleListenerCollector::getCollector();
            if (!isset($swooleListeners[SwooleEvent::TYPE_PORT][0]) || empty($swooleListeners[SwooleEvent::TYPE_PORT][0])) {
                throw new RuntimeException("Please use swoft/rpc-server, run 'composer require swoft/rpc-server'");
            }
    
            //添加swoole RPC相关的tcp监听端口,使用的是.env文件中的TCP区段配置
            $this->listen = $this->server->listen($this->tcpSetting['host'], $this->tcpSetting['port'], $this->tcpSetting['type']);
            $tcpSetting = $this->getListenTcpSetting();
            $this->listen->set($tcpSetting);
    
            //根据RpcEventListener的相关注解添加监听处理句柄
            $swooleRpcPortEvents = $swooleListeners[SwooleEvent::TYPE_PORT][0];
            $this->registerSwooleEvents($this->listen, $swooleRpcPortEvents);
        }
    

    由于是初版,根据@SwooleListener获取RPC监听Bean的相关处理暂时还有点生硬。
    目前swoft中type为SwooleEvent::TYPE_PORT@SwooleListener只有RpcEventListener一个,如果添加了同类Bean容易出问题,稳定版出的时候应该会有相关优化。

    RPC单独启动

    入口从Swoft\Http\Server\Command\ServerCommand换成Swoft\Rpc\Server\Command\RpcCommand,流程和Http大同小异,区别仅仅在于使用前者使用Swoole\Http\Server建立Http服务器后额外监听一个Tcp端口支持Rpc,后者直接使用Swoole\Server监听Tcp来支持Rpc,此处不再赘述。

    RPC请求处理

    RPC服务器和HTTP服务器的区别仅仅在于与客户端交互报文格式和报文所在的网络层(Swoft的RPC基于TCP层次),运行原理基本相通,都是路由,中间件,RPC Service(对应Http的Controller),你完全可以以Http服务的思路去理解他。

    swoole的RPC-TCP监听设置好后,RPC服务端就可以开始接受请求了。RpcEventListener的负责的工作仅仅是把收到的数据转发给\Swoft\Rpc\Server\ServiceDispatcher分发。Dispatcher会将请求传递给各个Middleware中间件,最终最终传递给HandlerAdapterMiddleware处理。

    PackerMiddleware

    PackerMiddleware是RPC中比较重要的一个中间件,负责将TCP请求中数据流解包和数据流封包。

    <?php
    //Swoft\Rpc\Server\Middleware.PackerMiddleware
    namespace Swoft\Rpc\Server\Middleware;
    /**
     * service packer
     *
     * @Bean()
     * @uses      PackerMiddleware
     * @version   2017年11月26日
     * @author    stelin <phpcrazy@126.com>
     * @copyright Copyright 2010-2016 swoft software
     * @license   PHP Version 7.x {@link http://www.php.net/license/3_0.txt}
     */
    class PackerMiddleware implements MiddlewareInterface
    {
        /**
         * packer middleware
         *
         * @param \Psr\Http\Message\ServerRequestInterface     $request
         * @param \Psr\Http\Server\RequestHandlerInterface $handler
         *
         * @return \Psr\Http\Message\ResponseInterface
         */
        public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
        {
            //获取servicePacker Bean(\Swoft\Rpc\Packer\ServicePacker)用于字符串解包封包
            $packer = service_packer();
            $data   = $request->getAttribute(self::ATTRIBUTE_DATA);
            $data   = $packer->unpack($data);
    
            // 触发一个RpcServerEvent::BEFORE_RECEIVE事件,默认只有一个用于添加请求上下文信息的BeforeReceiveListener
            // 利用中间件触发流程关键事件的做法耦合有点高,猜测以后会调整
            App::trigger(RpcServerEvent::BEFORE_RECEIVE, null, $data);
            //替换解包后的解包到Request中,提供给后续中间件和Handler使用
            $request = $request->withAttribute(self::ATTRIBUTE_DATA, $data);
    
            /* @var \Swoft\Rpc\Server\Rpc\Response $response */
            $response      = $handler->handle($request);
    
           //为Response封包返回给RPC客户端
            $serviceResult = $response->getAttribute(HandlerAdapter::ATTRIBUTE);
            $serviceResult = $packer->pack($serviceResult);
            return $response->withAttribute(HandlerAdapter::ATTRIBUTE, $serviceResult);
        }
    }
    
    RouterMiddleware

    RouterMiddleware负责根据RPC请求的method,version,interface 获取处理的RPC服务类,充当了路由的作用

    <?php
    //Swoft\Rpc\Server\Middleware\RouterMiddleware.php
    
    /**
     * service router
     *
     * @Bean()
     * @uses      RouterMiddleware
     * @version   2017年11月26日
     * @author    stelin <phpcrazy@126.com>
     * @copyright Copyright 2010-2016 swoft software
     * @license   PHP Version 7.x {@link http://www.php.net/license/3_0.txt}
     */
    class RouterMiddleware implements MiddlewareInterface
    {
        /**
         * get handler from router
         *
         * @param \Psr\Http\Message\ServerRequestInterface     $request
         * @param \Psr\Http\Server\RequestHandlerInterface $handler
         *
         * @return \Psr\Http\Message\ResponseInterface
         */
        public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
        {
            // service data
            $data = $request->getAttribute(PackerMiddleware::ATTRIBUTE_DATA);
    
            $method    = $data['method']??"";
            $version   = $data['version']??"";
            $interface = $data['interface']??"";
    
            /* @var \Swoft\Rpc\Server\Router\HandlerMapping $serviceRouter */
            $serviceRouter  = App::getBean('serviceRouter');
            //路由匹配,即向Swoft\Rpc\Server\Router\HandlerMapping->$routes获取RPC服务信息
            $serviceHandler = $serviceRouter->getHandler($interface, $version, $method);
    
            // deliver service data
            $request = $request->withAttribute(self::ATTRIBUTE, $serviceHandler);
    
            return $handler->handle($request);
        }
    }
    
    

    swoft启动阶段会扫描并初始化注解信息(参考注解章节),注解初始化完毕后会触发一个AppEvent::APPLICATION_LOADER事件,此时会将来自@Service的所有RPC的路由信息注册到Swoft\Rpc\Server\Router\HandlerMapping->$routes中,用于serviceRouter Bean的路由匹配。

    HandlerAdapterMiddleware

    HandlerAdapterMiddleware最终转发请求给HandlerAdapter处理,HandlerAdapter会使用刚刚RouterMiddleware匹配到的服务类信息转发请求并封装Response最终返回给ServiceDispatcher,ServiceDispatcher会返回TCP流给客户端然后结束本次请求。

    <?php
    //Swoft\Rpc\Server\Router\HandlerAdapter.php
    /**
     * Service handler adapter
     * @Bean("serviceHandlerAdapter")
     */
    class HandlerAdapter implements HandlerAdapterInterface
    {
    
        /**
         * Execute service handler
         *
         * @param \Psr\Http\Message\ServerRequestInterface $request
         * @param array                                    $handler
         * @return Response
         */
        public function doHandler(ServerRequestInterface $request, array $handler): Response
        {
            // RPC方法的各个参数
            $data = $request->getAttribute(PackerMiddleware::ATTRIBUTE_DATA);
            $params = $data['params'] ?? [];
            
            //路由解析出来的,处理该请求的服务Bean和方法
            list($serviceClass, $method) = $handler;
            $service = App::getBean($serviceClass);
    
            // execute handler with params
            $response = PhpHelper::call([$service, $method], $params);
            $response = ResponseHelper::formatData($response);
    
            // 构造Response返回客户端
            if (! $response instanceof Response) {
                $response = (new Response())->withAttribute(self::ATTRIBUTE, $response);
            }
    
            return $response;
        }
    }
    
    

    RPC客户端的实现

    在Bean的属性中声明@Reference,swoft即会根据@var声明的类型注入相应的RPC客户端实例。

        /**
         * @Reference(name="user")
         *
         * @var DemoInterface
         */
        private $demoService;
    

    依赖注入的实现会专门另外用一篇文章单独解释,这里先看看RPC客户端的相关代码。

    远程代理

    namespace Swoft\Rpc\Client\Service;
    
    /**
     * The proxy of service
     */
    class ServiceProxy
    {
        /**
         * @param string $className
         * @param string $interfaceClass
         */
        public static function   (string $className, string $interfaceClass)
        {
            $reflectionClass   = new \ReflectionClass($interfaceClass);
            $reflectionMethods = $reflectionClass->getMethods(\ReflectionMethod::IS_PUBLIC);
    
            $template = "class $className extends \\Swoft\\Rpc\\Client\\Service implements {$interfaceClass} {";
    
            //\Swoft\Rpc\Client\Service::class
            // the template of methods
            $template .= self::getMethodsTemplate($reflectionMethods);
            $template .= "}";
                
            eval($template);
        }
        //code ...
    }
    

    和AOP一样,原理一样是使用了动态代理,更具体的说法是动态远程代理
    RPC动态客户端类实现了客户端声明的Interface类型(如DemoInterface)并继承了\Swoft\Rpc\Client\Service类。
    动态类的实现很简单,对于接口显式声明的方法,实际上都是调用\Swoft\Rpc\Client\Service->call()方法。

    interface DemoInterface
    {
        /**
         * @param array $ids
         * @return array
         */
        public function getUsers(array $ids);
    }
    
    class 动态生成RPC客户端类 extends \Swoft\Rpc\Client\Service implements \App\Lib\DemoInterface { 
        public function getUsers ( array  $ids  ) {
            $params = func_get_args();
            return $this->call('getUsers', $params);
        }
        //code ...
    }
    

    对于自动生成的defer方法,则是通过魔术方法__call(),调用\Swoft\Rpc\Client\Service->deferCall()

        /**
         * @param string $name
         * @param array  $arguments
         *
         * @return ResultInterface
         * @throws RpcClientException
         */
        function __call(string $name, array $arguments)
        {
            $method = $name;
            $prefix = self::DEFER_PREFIX;//'defer'
            if (strpos($name, $prefix) !== 0) {
                throw new RpcClientException(sprintf('the method of %s is not exist! ', $name));
            }
    
            if ($name == $prefix) {
                $method = array_shift($arguments);
            } elseif (strpos($name, $prefix) === 0) {
                $method = lcfirst(ltrim($name, $prefix));
            }
    
            return $this->deferCall($method, $arguments);
        }
    

    我们这里只看具有代表性的call()方法,deferCall()大致相同。
    RPC客户端动态类的本质是将客户端的参数和接口信息根据swoft自己的格式传递给RPC服务端,然后将服务器返回的数据解包取出返回值返回给RPC的调用者,对外伪装成一个普通的对象,屏蔽远程调用操作。

    // Swoft\Rpc\Client\Service.php
        /**
         * Do call service
         *
         * @param string $func
         * @param array  $params
         *
         * @throws \Throwable
         * @return mixed
         */
        public function call(string $func, array $params)
        {
            $profileKey = $this->interface . '->' . $func;
            //根据@reference的fallback属性获取降级处理句柄,在RPC服务调用失败的时候可以会使用fallback句柄代替
            $fallback   = $this->getFallbackHandler($func);
            try {
                $connectPool    = $this->getPool();
                $circuitBreaker = $this->getBreaker();
    
                /* @var $client AbstractServiceConnection */
                $client = $connectPool->getConnection();
                //数据封包,和RPC服务端一致
                $packer   = service_packer();
                $type     = $this->getPackerName();
                $data     = $packer->formatData($this->interface, $this->version, $func, $params);
                $packData = $packer->pack($data, $type);
    
                //通过熔断器调用接口
                $result = $circuitBreaker->call([$client, 'send'], [$packData], $fallback);
                if ($result === null || $result === false) {
                    return null;
                }
    
                //和defercall不一致这里直接收包,解包
                App::profileStart($profileKey);
                $result = $client->recv();
                App::profileEnd($profileKey);
                $connectPool->release($client);
    
                App::debug(sprintf('%s call %s success, data=%', $this->interface, $func, json_encode($data, JSON_UNESCAPED_UNICODE)));
                $result = $packer->unpack($result);
                $data   = $packer->checkData($result);
            } catch (\Throwable $throwable) {
                if (empty($fallback)) {
                    throw $throwable;
                }
                //RPC调用失败则调用降级句柄,代替实际RPC服务直接返回
                $data = PhpHelper::call($fallback, $params);
            }
    
            return $data;
        }
    

    熔断器

    熔断器的swoft-RPC的另一重要概念,RPC的所有请求都通过熔断器发送。
    熔断器使用状态模式实现,熔断器有开启,半开,关闭 3种状态,不同状态下熔断器会持有不同的状态实例,状态根据RPC调用情况切换,熔断器根据持有状态实例的不同,行为也有所不同。

    熔断器关闭状态策略
    <?php
    //Swoft\Sg\Circuit\CloseState.php 
    /**
     * close状态的熔断器,对所有RPC调用都通过协程客户端发送到RPC服务器
     *  关闭状态及切换
     * 1. 重置failCounter=0 successCount=0
     * 2. 操作失败,failCounter计数
     * 3. 操作失败一定计数,切换为open开启状态
     */
    class CloseState extends CircuitBreakerState
    {
        /**
         * 熔断器调用
         *
         * @param mixed $callback 回调函数
         * @param array $params 参数
         * @param mixed $fallback 失败回调
         *
         * @return mixed 返回结果
         */
        public function doCall($callback, $params = [], $fallback = null)
        {
            list($class, $method) = $callback;
    
            try {
                if ($class == null) {
                    throw new \Exception($this->circuitBreaker->serviceName . "服务,连接建立失败(null)");
                }
    
                if ($class instanceof Client && $class->isConnected() == false) {
                    throw new \Exception($this->circuitBreaker->serviceName . "服务,当前连接已断开");
                }
                //调用swoole协程客户端的send()方法发送数据
                $data = $class->$method(...$params);
            } catch (\Exception $e) {
                //递增失败计数
                if ($this->circuitBreaker->isClose()) {
                    $this->circuitBreaker->incFailCount();
                }
    
                App::error($this->circuitBreaker->serviceName . "服务,当前[关闭状态],服务端调用失败,开始服务降级容错处理,error=" . $e->getMessage());
                //RPC调用失败则使用降级接口 
                $data = $this->circuitBreaker->fallback($fallback);
            }
            
            //失败次数过线则切换状态
            $failCount = $this->circuitBreaker->getFailCounter();
            $switchToFailCount = $this->circuitBreaker->getSwitchToFailCount();
            if ($failCount >= $switchToFailCount && $this->circuitBreaker->isClose()) {
                App::trace($this->circuitBreaker->serviceName . "服务,当前[关闭状态],服务失败次数达到上限,开始切换为开启状态,failCount=" . $failCount);
                $this->circuitBreaker->switchToOpenState();
            }
    
            App::trace($this->circuitBreaker->serviceName . "服务,当前[关闭状态],failCount=" . $this->circuitBreaker->getFailCounter());
            return $data;
        }
    }
    
    熔断器开启状态策略
    <?php
    \\Swoft\Sg\Circuit\OpenState .php;
    /**
     * open状态的熔断器,对所有RPC调用都使用降级句柄代替
     * 开启状态及切换(open)
     * 1. 重置failCounter=0 successCounter=0
     * 2. 请求立即返回错误响应
     * 3. 定时器一定时间后切换为半开状态(open)
     */
    class OpenState extends CircuitBreakerState
    {
        /**
         * 熔断器调用
         *
         * @param mixed $callback 回调函数
         * @param array $params 参数
         * @param mixed $fallback 失败回调
         *
         * @return mixed 返回结果
         */
        public function doCall($callback, $params = [], $fallback = null)
        {
            $data = $this->circuitBreaker->fallback();
    
            App::trace($this->getServiceName() . "服务,当前[开启状态],执行服务fallback服务降级容错处理");
            $nowTime = time();
    
            if ($this->circuitBreaker->isOpen()
                && $nowTime > $this->circuitBreaker->getSwitchOpenToHalfOpenTime()
            ) {
                $delayTime = $this->circuitBreaker->getDelaySwitchTimer();
    
                // swoole定时器不是严格的,3s容错时间 ,定时切换状态的半开
                $switchToHalfStateTime = $nowTime + ($delayTime / 1000) + 3;
                App::getTimer()->addAfterTimer('openState', $delayTime, [$this, 'delayCallback']);
                $this->circuitBreaker->setSwitchOpenToHalfOpenTime($switchToHalfStateTime);
    
                App::trace($this->getServiceName() . "服务,当前[开启状态],创建延迟触发器,一段时间后状态切换为半开状态");
            }
    
            return $data;
        }
    
    }
    
    
    熔断器半开状态策略

    半开熔断器是熔断器关闭状态和熔断器开启状态的过度状态,半开熔断器的所有RPC调用都是加锁的,连续成功或者连续失败到阈值后会切换到关闭状态或者开启状态,代码类似,此处不再累述,有兴趣的读者可以自行研究。

    Swoft源码剖析系列目录:https://www.jianshu.com/p/2f679e0b4d58

    相关文章

      网友评论

      • fly1234:您好,我又遇到一个关于开启日志,task报了
        Fatal error: go(): can't use async-io in task process. in /data/www/mnt/fayho_business_circle/www/vendor/swoft/framework/src/Log/FileHandler.php on line 79
        fly1234:@bromine 感谢,现在才看到您的回复,确实是framework更新了所致,swoft 官方群员建议我更新后已解决:sunglasses::relaxed:
        bromine:@fly1234 相反的,你这个报错是在Task里面使用协程导致的,Task是纯同步的。
        我所在的swoft版本,日志都是同步写的,更新到最新版本的swoft发现已经会自动根据当前环境使用同步写或者协程写日志,但在task中写日志仍然没有出现你的问题。
        建议你更新下框架代码,如果还有报错直接给官方提个issue,自动任务里不能开启日志这种一般是非预期行为。
        fly1234:@fly1234 ,我看了源码,底层好像是没有实现协程写日志,是否理解为,自动任务里不应该开启日志呢
      • fly1234:您好,我想问下rpc服务的异常是如何捕获的,今天试了下发现验证的异常也是直接返回500,客户端就直接调用了服务降级,有尝试过app/exception下去捕获,但没有效果,请赐教
        fly1234:@bromine ,后面我去看了源码,作者没有过多处理,我猜测也是基于服务治理,不过我对已知异常做了友好处理的扩展,重写了dispatch 中的catch,感谢您的解答
        bromine:@fly1234 这种情况归fallback管,不使用fallback就能拿到异常了

      本文标题:[原创]Swoft源码剖析-RPC功能实现

      本文链接:https://www.haomeiwen.com/subject/wnzuqftx.html