美文网首页PHP源码分析Yar Rpc框架源码解读
[原]PHP-yar拓展源码解读七-concurrent_cli

[原]PHP-yar拓展源码解读七-concurrent_cli

作者: bromine | 来源:发表于2018-08-26 12:41 被阅读0次

    异步任务的封装

    //yar_transport.h
    typedef struct _yar_call_data {
        ulong sequence;
        zend_string *uri;
        zend_string *method;
        zval callback;
        zval ecallback;
        zval parameters;
        zval options;
    } yar_call_data_t;
    

    Yar用yar_call_data_t表示一个异步任务,sequence是从1开始的任务ID,除了sequence,其他基本上就是对应Yar_Concurrent_Client::call()上的同名参数。为方便作为一个PHP变量使用该结构体,该结构体平时被Yar包装成一个le_calldata型Resource。

    异步客户端

    //从c源码反推出的的PHP类定义,没有PHP定义文件
    class Yar_Concurrent_Client{
         /** @var Resource[] le_calldata资源数组 */
         protected static $_callstack;  
         /** @var Callable RPC成功的回调 */
         protected static $_callback;
         /** @var Callable RPC失败的回调 */ 
         protected static $_error_callback;
         /** @var boolean $_start true 异步调用执行中 false 未执行 */
         protected static $_start;
    
        public static call($uri , $method, $parameters=null, $callback=null,$error_callback=null,$options):int
        public static loop($callback=null , $error_callback=null):bool
    
    }
    

    Yar_Concurrent_Client定义如上,
    看下Yar_Concurrent_Client::call()用于注册一个异步RPC调用

    //yar_client.c
    /* {{{ proto Yar_Concurrent_Client::call($uri, $method, $parameters = NULL, $callback = NULL, $error_callback = NULL, $options = array()) */
    PHP_METHOD(yar_concurrent_client, call) {
        zend_string *uri, *method;
        zend_string *name = NULL;
        long sequence;
        zval *callstack, item, *status;
        zval *error_callback = NULL, *callback = NULL, *parameters = NULL, *options = NULL;
        yar_call_data_t *entry;
    
        if (zend_parse_parameters(ZEND_NUM_ARGS(), "SS|a!z!za",
                    &uri, &method, &parameters, &callback, &error_callback, &options) == FAILURE) {
            return;
        }
    
        if (!ZSTR_LEN(uri)) {
            php_error_docref(NULL, E_WARNING, "first parameter is expected to be a valid rpc server uri");
            return;
        }
    
        if (strncasecmp(ZSTR_VAL(uri), "http://", sizeof("http://") - 1) 
                && strncasecmp(ZSTR_VAL(uri), "https://", sizeof("https://") - 1)) {
            php_error_docref(NULL, E_WARNING, "only http protocol is supported in concurrent client for now");
            return;
        }
    
        if (!method->len) {
            php_error_docref(NULL, E_WARNING, "second parameter is expected to be a valid rpc api name");
            return;
        }
    
        //回调有效性检查
        if (callback && !Z_ISNULL_P(callback) &&
                !zend_is_callable(callback, 0, &name)) {
            php_error_docref1(NULL, ZSTR_VAL(name), E_ERROR, "fourth parameter is expected to be a valid callback");
            zend_string_release(name);
            RETURN_FALSE;
        }
    
        if (name) {
            zend_string_release(name);
            name = NULL;
        }
    
        if (error_callback && !Z_ISNULL_P(error_callback) &&
                !zend_is_callable(error_callback, 0, &name)) {
            php_error_docref1(NULL, ZSTR_VAL(name), E_ERROR, "fifth parameter is expected to be a valid error callback");
            zend_string_release(name);
            RETURN_FALSE;
        }
    
        if (name) {
            zend_string_release(name);
        }
    
        //执行中的concurrent_client不能添加任务
        status = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_start"), 0);
        if (Z_TYPE_P(status) == IS_TRUE) {
            php_error_docref(NULL, E_WARNING, "concurrent client has already started");
            RETURN_FALSE;
        }
    
        entry = ecalloc(1, sizeof(yar_call_data_t));
    
        entry->uri = zend_string_copy(uri);
        entry->method = zend_string_copy(method);
    
        if (callback && !Z_ISNULL_P(callback)) {
            ZVAL_COPY(&entry->callback, callback);
        }
        if (error_callback && !Z_ISNULL_P(error_callback)) {
            ZVAL_COPY(&entry->ecallback, error_callback);
        }
        if (parameters && IS_ARRAY == Z_TYPE_P(parameters)) {
            ZVAL_COPY(&entry->parameters, parameters);
        }
        if (options && IS_ARRAY == Z_TYPE_P(options)) {
            ZVAL_COPY(&entry->options, options);
        }
    
        callstack = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callstack"), 0);
        //初始化Yar_Concurrent_Client::_callstack;
        if (Z_ISNULL_P(callstack)) {
            zval tmp;
            array_init(&tmp);
            zend_update_static_property(yar_concurrent_client_ce, ZEND_STRL("_callstack"), &tmp);
            zval_ptr_dtor(&tmp);
        }
        
        //用yar_call_data_t生成一个le_calldata资源,写入_callstack属性
        ZVAL_RES(&item, zend_register_resource(entry, le_calldata));
        sequence = zend_hash_next_free_element(Z_ARRVAL_P(callstack));
        entry->sequence = sequence + 1;
        zend_hash_next_index_insert(Z_ARRVAL_P(callstack), &item);
        RETURN_LONG(entry->sequence);
    }
    /* }}} */
    

    整个call调用,除去防御代码,其实就做了一件事,构造一个 le_calldata型Resource,并添加到Yar_Concurrent_Client::_callstack数组中。
    接着是Yar_Concurrent_Client::loop()

    //yar_client.c
    PHP_METHOD(yar_concurrent_client, loop) {
        zend_string *name = NULL;
        zval *callstack;
        zval *callback = NULL, *error_callback = NULL;
        zval *status;
        uint ret = 0;
    
        if (zend_parse_parameters(ZEND_NUM_ARGS(), "|zz", &callback, &error_callback) == FAILURE) {
            return;
        }
    
        status = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_start"), 0);
        if (Z_TYPE_P(status) == IS_TRUE) {
            php_error_docref(NULL, E_WARNING, "concurrent client has already started");
            RETURN_FALSE;
        }
    
        if (callback && !Z_ISNULL_P(callback) &&
                !zend_is_callable(callback, 0, &name)) {
            php_error_docref1(NULL, ZSTR_VAL(name), E_ERROR, "first argument is expected to be a valid callback");
            zend_string_release(name);
            RETURN_FALSE;
        }
    
        if (name) {
            zend_string_release(name);
            name = NULL;
        }
    
        if (error_callback && !Z_ISNULL_P(error_callback) &&
                !zend_is_callable(error_callback, 0, &name)) {
            php_error_docref1(NULL, ZSTR_VAL(name), E_ERROR, "second argument is expected to be a valid error callback");
            zend_string_release(name);
            RETURN_FALSE;
        }
        if (name) {
            zend_string_release(name);
        }
    
        callstack = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callstack"), 0);
        if (Z_ISNULL_P(callstack) || zend_hash_num_elements(Z_ARRVAL_P(callstack)) == 0) {
            RETURN_TRUE;
        }
    
        //回调函数写入$_callback,$_error_callback 类成员变量
        if (callback && !Z_ISNULL_P(callback)) {
            zend_update_static_property(yar_concurrent_client_ce, ZEND_STRL("_callback"), callback);
        }
    
        if (error_callback && !Z_ISNULL_P(error_callback)) {
            zend_update_static_property(yar_concurrent_client_ce, ZEND_STRL("_error_callback"), error_callback);
        }
    
        //更新$_start类成员变量并执行 php_yar_concurrent_client_hand()
        ZVAL_BOOL(status, 1);
        ret = php_yar_concurrent_client_handle(callstack);
        ZVAL_BOOL(status, 0);
        RETURN_BOOL(ret);
    }
    

    可见除了设置回调函数,唯一实际有效的操作是调用php_yar_concurrent_client_hand()

    //yar_client.c
    int php_yar_concurrent_client_handle(zval *callstack) /* {{{ */ {
        char *msg;
        zval *calldata;
        zend_ulong sequence;
        yar_request_t *request;
        const yar_transport_t *factory;
        yar_transport_interface_t *transport;
        yar_transport_multi_interface_t *multi;
    
        //socket(Tcp/unixsock协议)传输器目前不支持并发调用
        factory = php_yar_transport_get(ZEND_STRL("curl"));
        multi = factory->multi->init();//调用curl_multi_init()初始化 yar-curl并发调度器
    
        //遍历Yar_Concurrent_Client::_callstack 中的yar_call_data_t 结构体
        ZEND_HASH_FOREACH_NUM_KEY_VAL(Z_ARRVAL_P(callstack), sequence, calldata) {
            yar_call_data_t *entry;
            long flags = 0;
    
            entry = (yar_call_data_t *)zend_fetch_resource(Z_RES_P(calldata), "Yar Call Data", le_calldata);
    
            if (!entry) {
                continue;
            }
            //下面的流程和curl的同步client大同小异
            //构造request
            if (Z_ISUNDEF(entry->parameters)) {
                array_init(&entry->parameters);
            } 
    
            transport = factory->init();
    
            if (YAR_G(allow_persistent)) {
                if (!Z_ISUNDEF(entry->options)) {
                    zval *flag = php_yar_client_get_opt(&entry->options, YAR_OPT_PERSISTENT);
                    if (flag && (Z_TYPE_P(flag) == IS_TRUE || (Z_TYPE_P(flag) == IS_LONG && Z_LVAL_P(flag)))) {
                        flags |= YAR_PROTOCOL_PERSISTENT;
                    }
                }
            }
    
             
            if (!(request = php_yar_request_instance(entry->method,
                            &entry->parameters, Z_ISUNDEF(entry->options)? NULL: & entry->options))) {
                transport->close(transport);
                factory->destroy(transport);
                return 0;
            }
    
            msg = (char*)&entry->options;
            //打开并初始化一个libc-CURL句柄
            if (!transport->open(transport, entry->uri, flags, &msg)) {
                php_yar_client_trigger_error(1, YAR_ERR_TRANSPORT, msg);
                transport->close(transport);
                factory->destroy(transport);
                efree(msg);
                return 0;
            }
    
            DEBUG_C(ZEND_ULONG_FMT": call api '%s' at (%c)'%s' with '%d' parameters",
                    request->id, ZSTR_VAL(request->method), (flags & YAR_PROTOCOL_PERSISTENT)? 'p' : 'r', entry->uri, 
                    zend_hash_num_elements(Z_ARRVAL(request->parameters)));
            //设置libc-CURL要发送的数据
            if (!transport->send(transport, request, &msg)) {
                php_yar_client_trigger_error(1, YAR_ERR_TRANSPORT, msg);
                transport->close(transport);
                factory->destroy(transport);
                efree(msg);
                return 0;
            }
    
            //将entry存到transport实例的data对象中,方便后续使用
            transport->calldata(transport, entry);
            //注册异步任务
            multi->add(multi, transport);
            php_yar_request_destroy(request);
        } ZEND_HASH_FOREACH_END();
    
        //发包收包 执行回调方法
        if (!multi->exec(multi, php_yar_concurrent_client_callback)) {
            multi->close(multi);
            return 0;
        }
        //资源释放
        multi->close(multi);
        return 1;
    } /* }}} */
    

    并发调度器

    上面的流程大体上和之前介绍的同步调用的流程类似,但是多了一个multi = factory->multi->init();和该multi变量的相关调用。
    multi相关结构如下:

    //yar_transport.h
    //并发调度器
    typedef struct _yar_transport_multi_interface {
        //对象成员变量
        void *data;
        //对象方法
        int (*add)(struct _yar_transport_multi_interface *self, yar_transport_interface_t *cp);
        int (*exec)(struct _yar_transport_multi_interface *self, yar_concurrent_client_callback *callback);
        void (*close)(struct _yar_transport_multi_interface *self);
    } yar_transport_multi_interface_t;
    //并发调度器工厂
    typedef struct _yar_transport_multi {
        struct _yar_transport_multi_interface * (*init)();
    } yar_transport_multi_t;
    

    我们称呼上面的那个(yar_transport_multi_interface_t)类型的变量为为 并发调度器
    看过传输器章节的结构关系分析,这几个结构理解起来应该也毫无压力。

    由于C语言中实现“继承”所用的"父类"的结构和“子类实例”的结构是完全一致的,我们不能像PHP/JAVA一样随意往子类中添加成员变量(其实还是有办法的,参考PHP内核的zend_function相关结构的实现,思路不一样),所以上面的(void *)data变量并不是对应我们理解的对象中的某个变量,而是所有变量,子类对象通过存放不同类型的指针来“持有"不同数量和不同类型的成员变量。
    curl的并发传输器实例中,其该data指针的类型为yar_curl_multi_data_t:

    //curl.c
    typedef struct _yar_curl_multi_data_t {
        CURLM *cm;//libcurl的批处理句柄,也是curl并发调度器的核心
        yar_transport_interface_t *chs;//curl传输器实例
    } yar_curl_multi_data_t;
    

    另外yar_transport_interface_t也有一个data用于同样的用途,curl"子类"实例中其类型为_yar_curl_data_t:

    typedef struct _yar_curl_data_t {
        CURL        *cp;//libcurl-CURL普通句柄
        char        persistent;//是否长连接
        smart_str   buf;.//返回报文存放用的buf
        smart_str   postfield;//post报文
        php_url     *host;
        yar_call_data_t *calldata;//异步任务
        yar_curl_plink_t *plink;//长连接用的
        struct curl_slist *headers;//http header头
        yar_transport_interface_t *next;//传输器链表
    #if LIBCURL_VERSION_NUM < 0x071100
        zend_string *address;
    #endif
    } yar_curl_data_t;
    

    相关数据结构介绍完了,这里看下curl传输器的实现下multl的成员方法.

    Init()

    yar_transport_curl_multi->init()代表并发调度器的初始化,即构造器,除了内存分配和成员方法设置,核心就是调用libcurl的curl_multi_init生成一个libcurl-CURLM句柄

    yar_transport_multi_interface_t * php_yar_curl_multi_init() /* {{{ */ {
        yar_transport_multi_interface_t *multi = emalloc(sizeof(yar_transport_multi_interface_t));
        yar_curl_multi_data_t *data = ecalloc(1, sizeof(yar_curl_multi_data_t));
    
        data->cm = curl_multi_init();
    
        multi->data     = data;
        multi->add      = php_yar_curl_multi_add_handle;
        multi->exec     = php_yar_curl_multi_exec;
        multi->close    = php_yar_curl_multi_close;
    
        return multi;
    } /* }}} */
    

    Add()

    yar_transport_multi_interface_t->add()用于向异步调度器注册一个异步调用。

    int php_yar_curl_multi_add_handle(yar_transport_multi_interface_t *self, yar_transport_interface_t *handle) /* {{{ */ {
        yar_curl_multi_data_t *multi = (yar_curl_multi_data_t *)self->data;
        yar_curl_data_t *data = (yar_curl_data_t *)handle->data;
            //libcurl相关配置
        php_yar_curl_prepare(handle);
            //往CURLM栈添加CURL句柄
        curl_multi_add_handle(multi->cm, data->cp);
            //将最后注册的传输器添加到传输器链表表头
        if (multi->chs) {
            data->next = multi->chs;
            multi->chs = handle;
        } else {
            multi->chs = handle;
        }
    
        return 1;
    } /* }}} */
    

    Exec()

    yar_transport_multi_interface_t->exec()执行异步调度器,基于select事件模型,使用curl_multi_perform收发所有数据。
    其实Yar差不多从初版开始就有一个epoll事件模型实现的异步调度器,不过启用需要自行在config.h开启ENABLE_EPOLL宏。

    int php_yar_curl_multi_exec(yar_transport_multi_interface_t *self, yar_concurrent_client_callback *f) /* {{{ */ {
        int running_count, rest_count;
        yar_curl_multi_data_t *multi;
    
    
        multi = (yar_curl_multi_data_t *)self->data;
        while (CURLM_CALL_MULTI_PERFORM == curl_multi_perform(multi->cm, &running_count));
    
        //调用注册的回调函数,Yar回调回调函数有两个时机
        //一个在某个请求返回报文接受完毕时,这个是我们一般意义上理解的回调函数
       //另一个是Yar的特色处理方式,在一个在所有请求发出后,这时会用空参数回调Yar_Concurrent_Client的回调函数让用户先行处理其他事情。第二种情况对应的就是下面这面这句。
        if (!f(NULL, YAR_ERR_OKEY, NULL)) {
            goto bailout;
        }
    
        //用户回调函数执行后的异常检查
        if (EG(exception)) {
            goto onerror;
        }
        //重复调用curl_multi_perform直到所有句柄的数据首发完成
        if (running_count) {
            rest_count = running_count;
            do {
                int max_fd, return_code;
                struct timeval tv;
                fd_set readfds;
                fd_set writefds;
                fd_set exceptfds;
    
                FD_ZERO(&readfds);
                FD_ZERO(&writefds);
                FD_ZERO(&exceptfds);
    
                curl_multi_fdset(multi->cm, &readfds, &writefds, &exceptfds, &max_fd);
                if (max_fd == -1) {
                    /*  When  max_fd  returns  with  -1,  you  need  to  wait  a  while  and then proceed and call
                        curl_multi_perform anyway, How long to wait? I would suggest 100 milliseconds at least */
                    tv.tv_sec = 0;
                    tv.tv_usec = 50000; /* sleep 50ms */
                    select(1, &readfds, &writefds, &exceptfds, &tv);
                    while (CURLM_CALL_MULTI_PERFORM == curl_multi_perform(multi->cm, &running_count));
                    continue;
                } 
    
                /* maybe we should use curl_multi_timeout like:
                 * curl_multi_timeout(curlm, (long *)&curl_timeout);
                 * if (curl_timeout == 0) {
                 *    continue;
                 * } else if (curl_timeout == -1) {
                 *    tv.tv_sec = (ulong)(YAR_G(timeout) / 1000);
                 *    tv.tv_usec = (ulong)((YAR_G(timeout) % 1000)? (YAR_G(timeout) & 1000) * 1000 : 0);
                 * } else {
                 *    tv.tv_sec  =  curl_timeout / 1000;
                 *    tv.tv_usec = (curl_timeout % 1000) * 1000;
                 * }
                 */
                tv.tv_sec = (ulong)(YAR_G(timeout) / 1000);
                tv.tv_usec = (ulong)((YAR_G(timeout) % 1000)? (YAR_G(timeout) & 1000) * 1000 : 0);
    
                return_code = select(max_fd + 1, &readfds, &writefds, &exceptfds, &tv);
                if (return_code > 0) {
                    while (CURLM_CALL_MULTI_PERFORM == curl_multi_perform(multi->cm, &running_count));
                } else if (-1 == return_code) {
                    php_error_docref(NULL, E_WARNING, "select error '%s'", strerror(errno));
                    goto onerror;
                } else {
                    /* timeout */
                    php_error_docref(NULL, E_WARNING, "select timeout %ldms reached", YAR_G(timeout));
                    goto onerror;
                }
    
                //每完成任意一个请求就执行一次回调
                if (rest_count > running_count) {
                    int ret = php_yar_curl_multi_parse_response(multi, f);
                    if (ret == -1) {
                        goto bailout;
                    } else if (ret == 0) {
                        goto onerror;
                    }
                    rest_count = running_count;
                }
            } while (running_count);
        } else {
            //将各个连接的返回数据分别打包成response
            int ret = php_yar_curl_multi_parse_response(multi, f);
            if (ret == -1) {
                goto bailout;
            } else if (ret == 0) {
                goto onerror;
            }
        }
    
        return 1;
    onerror:
        return 0;
    bailout:
        self->close(self);
        zend_bailout();
        return 0;
    } /* }}} */
    

    Close()

    yar_transport_multi_interface_t->close()用于各种资源的清理。

    void php_yar_curl_multi_close(yar_transport_multi_interface_t *self) /* {{{ */ {
        yar_curl_multi_data_t *multi = (yar_curl_multi_data_t *)self->data;
    
        if (multi->chs) {
            yar_transport_interface_t *p, *q;
            p = multi->chs;
            for( ; p;) {
                yar_curl_data_t *data = (yar_curl_data_t *)p->data;
                q = data->next;
                curl_multi_remove_handle(multi->cm, data->cp);
                p->close(p);
                p = q;
            }
        }
        curl_multi_cleanup(multi->cm);
        efree(multi);
        efree(self);
        return ;
    } /* }}} */
    

    数据提取和反序列化

    php_yar_curl_multi_parse_response()用于从收包完成后的CURLM,解析出response并执行回调。

    static int php_yar_curl_multi_parse_response(yar_curl_multi_data_t *multi, yar_concurrent_client_callback *f) /* {{{ */ {
        int msg_in_sequence;
        CURLMsg *msg;
    
        //遍历所有CURL的传输结果
        do {
            msg = curl_multi_info_read(multi->cm, &msg_in_sequence);
            if (msg && msg->msg == CURLMSG_DONE) {
                uint found = 0;
                yar_transport_interface_t *handle = multi->chs, *q = NULL;
    
                //遍历传输器链表取出对应的传输器
                while (handle) {
                    if (msg->easy_handle == ((yar_curl_data_t*)handle->data)->cp) {
                        if (q) {
                            ((yar_curl_data_t *)q->data)->next = ((yar_curl_data_t*)handle->data)->next;
                        } else {
                            multi->chs = ((yar_curl_data_t*)handle->data)->next;
                        }
                        found = 1;
                        break;
                    }
                    q = handle;
                    handle = ((yar_curl_data_t*)handle->data)->next;
                }
    
                if (found) {
                    long http_code = 200;
                    yar_response_t *response;
                    yar_curl_data_t *data = (yar_curl_data_t *)handle->data;
    
                    response = php_yar_response_instance();
    
                    if (msg->data.result == CURLE_OK) {
                        curl_multi_remove_handle(multi->cm, data->cp);
                        //异常返回
                        if(curl_easy_getinfo(data->cp, CURLINFO_RESPONSE_CODE, &http_code) == CURLE_OK && http_code != 200) {
                            char buf[128];
                            uint len = snprintf(buf, sizeof(buf), "server responsed non-200 code '%ld'", http_code);
    
                            php_yar_response_set_error(response, YAR_ERR_TRANSPORT, buf, len);
                            //调用失败回调函数
                            if (!f(data->calldata, YAR_ERR_TRANSPORT, response)) {
                                //用户在回调里使用了die()
                                handle->close(handle);
                                php_yar_response_destroy(response);
                                return -1;
                            }
                            if (EG(exception)) {
                                //异常处理
                                handle->close(handle);
                                php_yar_response_destroy(response);
                                return 0;
                            }
                            handle->close(handle);
                            php_yar_response_destroy(response);
                            continue;
                        } else {
                            //成功返回
                            if (data->buf.s) {
                                char *msg = NULL;
                                zval *retval, rret;
                                yar_header_t *header;
                                char *payload;
                                size_t payload_len;
    
                                smart_str_0(&data->buf);
    
                                payload = ZSTR_VAL(data->buf.s);
                                payload_len = ZSTR_LEN(data->buf.s);
    
                                //协议头解析
                                if (!(header = php_yar_protocol_parse(payload))) {
                                    php_yar_error(response, YAR_ERR_PROTOCOL, "malformed response header '%.32s'", payload);
                                } else {
                                    /* skip over the leading header */
                                    payload += sizeof(yar_header_t);
                                    payload_len -= sizeof(yar_header_t);
    
                                //去掉协议头后解析成isroe数组
                                    if (!(retval = php_yar_packager_unpack(payload, payload_len, &msg, &rret))) {
                                        php_yar_response_set_error(response, YAR_ERR_PACKAGER, msg, strlen(msg));
                                    } else {
                                        //isroe数组转response结构
                                        php_yar_response_map_retval(response, retval);
                                        DEBUG_C(ZEND_ULONG_FMT": server response content packaged by '%.*s', len '%ld', content '%.32s'", response->id, 7, payload, header->body_len, payload + 8);
                                        zval_ptr_dtor(retval);
                                    }
                                    if (msg) {
                                        efree(msg);
                                    }
                                }
                            } else {
                                php_yar_response_set_error(response, YAR_ERR_EMPTY_RESPONSE, ZEND_STRL("empty response"));
                            }
                            //调用对应回调函数
                            if (!f(data->calldata, response->status, response)) {
                                handle->close(handle);
                                php_yar_response_destroy(response);
                                return -1;
                            }
                            if (EG(exception)) {
                                handle->close(handle);
                                php_yar_response_destroy(response);
                                return 0;
                            }
                        }
                    } else {
                        char *err = (char *)curl_easy_strerror(msg->data.result);
                        php_yar_response_set_error(response, YAR_ERR_TRANSPORT, err, strlen(err));
                        if (!f(data->calldata, YAR_ERR_TRANSPORT, response)) {
                            handle->close(handle);
                            php_yar_response_destroy(response);
                            return -1;
                        }
                        if (EG(exception)) {
                            handle->close(handle);
                            php_yar_response_destroy(response);
                            return 0;
                        }
                    }
                    handle->close(handle);
                    php_yar_response_destroy(response);
                } else {
                    php_error_docref(NULL, E_WARNING, "unexpected transport info missed");
                }
            }
        } while (msg_in_sequence);
    
        return 1;
    }
    /* }}} */
    

    回调用户注册方法

    最后一个核心方法php_yar_concurrent_client_callback()用于回调用户提供的回调函数

    nt php_yar_concurrent_client_callback(yar_call_data_t *calldata, int status, yar_response_t *response) /* {{{ */ {
        zval code, retval, retval_ptr;
        zval callinfo, *callback, *func_params;
        zend_bool bailout = 0;
        uint params_count, i;
    
      
        //第一个条件分支对应响应回调,第二个分支对应所有请求都发出后的那次空回调
        if (calldata) {
                    //根据请求结果选择成功/失败回调函数,其中单个请求的回调函数优先于Yar_Concurrent_Client(loop)的回调函数
            /* data callback */
            if (status == YAR_ERR_OKEY) {
                if (!Z_ISUNDEF(calldata->callback)) {
                    callback = &calldata->callback;
                } else {
                    callback = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callback"), 0);
                }
                params_count = 2;
            } else {
                if (!Z_ISUNDEF(calldata->ecallback)) {
                    callback = &calldata->ecallback;
                } else {
                    callback = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_error_callback"), 0);
                }
                params_count = 3;
            }
    
            //没有合适回调直接抛出Error或这打印远程服务方法的返回值
            if (Z_ISNULL_P(callback)) {
                if (status != YAR_ERR_OKEY) {
                    if (!Z_ISUNDEF(response->err)) {
                        php_yar_client_handle_error(0, response);
                    } else {
                        php_error_docref(NULL, E_WARNING, "[%d]:unknown Error", status);
                    }
                } else if (!Z_ISUNDEF(response->retval)) {
                    zend_print_zval(&response->retval, 1);
                }
                return 1;
            }
    
            if (status == YAR_ERR_OKEY) {
                if (Z_ISUNDEF(response->retval)) {
                    php_yar_client_trigger_error(0, YAR_ERR_REQUEST, "%s", "server responsed empty response");
                    return 1;
                }
                ZVAL_COPY(&retval, &response->retval);
            } else {
                ZVAL_LONG(&code, status);
                ZVAL_COPY(&retval, &response->err);
            }
    
            array_init(&callinfo);
            //回调函数的最后一个参数$callinfo,包含请求ID,uri,和远程方法名
            add_assoc_long_ex(&callinfo, "sequence", sizeof("sequence") - 1, calldata->sequence);
            add_assoc_str_ex(&callinfo, "uri", sizeof("uri") - 1, zend_string_copy(calldata->uri));
            add_assoc_str_ex(&callinfo, "method", sizeof("method") - 1, zend_string_copy(calldata->method));
        } else {
            callback = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callback"), 0);
            if (Z_ISNULL_P(callback)) {
                return 1;
            }
            params_count = 2;
        }
    
       
        if (calldata && (status != YAR_ERR_OKEY)) {
             //失败回调方法接受三个参数 function($code,$retval,$callinfo)){}
            func_params = safe_emalloc(sizeof(zval), 3, 0);
            ZVAL_COPY_VALUE(&func_params[0], &code);
            ZVAL_COPY_VALUE(&func_params[1], &retval);
            ZVAL_COPY_VALUE(&func_params[2], &callinfo);
        } else if (calldata) {
            //成功的回调方法接受2个参数 function($retval,$callinfo)){}
            func_params = safe_emalloc(sizeof(zval), 2, 0);
            ZVAL_COPY_VALUE(&func_params[0], &retval);
            ZVAL_COPY_VALUE(&func_params[1], &callinfo);
        } else {
           //所有请求都发出后的首次回调参数都是空
            func_params = safe_emalloc(sizeof(zval), 2, 0);
            ZVAL_NULL(&func_params[0]);
            ZVAL_NULL(&func_params[1]);
        }
    
        //调用回调方法,清理相关资源
        zend_try {
            if (call_user_function_ex(EG(function_table), NULL, callback,
                        &retval_ptr, params_count, func_params, 0, NULL) != SUCCESS) {
                for (i = 0; i < params_count; i++) {
                    zval_ptr_dtor(&func_params[i]); 
                }
                efree(func_params);
                if (calldata) {
                    php_error_docref(NULL, E_WARNING, "call to callback failed for request: '%s'", ZSTR_VAL(calldata->method));
                } else {
                    php_error_docref(NULL, E_WARNING, "call to initial callback failed");
                }
                return 1;
            }
        } zend_catch {
            bailout = 1;
        } zend_end_try();
    
        if (!Z_ISUNDEF(retval_ptr)) {
            zval_ptr_dtor(&retval_ptr);
        }
    
        for (i = 0; i < params_count; i++) {
            zval_ptr_dtor(&func_params[i]); 
        }
        efree(func_params);
        return bailout? 0 : 1;
    } /* }}} */
    
    

    相关文章

      网友评论

        本文标题:[原]PHP-yar拓展源码解读七-concurrent_cli

        本文链接:https://www.haomeiwen.com/subject/ommvvftx.html