美文网首页
以太坊源码深入分析(4)-- 以太坊RPC通信实例和原理代码分析

以太坊源码深入分析(4)-- 以太坊RPC通信实例和原理代码分析

作者: 老鱼游啊游 | 来源:发表于2018-04-20 16:07 被阅读0次

    上一节我们试着写了一个RPC的请求实例,通过分析源码知道了RPC服务的创建流程,以及Http RPC server创建过程,Http RPC Client的请求流程。
    这一节,先分析一下Http RPC server如何处理client的请求。然后再分析一下IPC RPC的处理流程。
    一,Http RPC server处理Client的请求。
    回到上一节startHTTP() 里面HTTPServer初始化的方法

    func NewHTTPServer(cors []string, vhosts []string, srv *Server) *http.Server {
        // Wrap the CORS-handler within a host-handler
        handler := newCorsHandler(srv, cors)
        handler = newVHostHandler(vhosts, handler)
        return &http.Server{Handler: handler}
    }
    
    // ServeHTTP serves JSON-RPC requests over HTTP.
    func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
        // Permit dumb empty requests for remote health-checks (AWS)
        if r.Method == http.MethodGet && r.ContentLength == 0 && r.URL.RawQuery == "" {
            return
        }
        if code, err := validateRequest(r); err != nil {
            http.Error(w, err.Error(), code)
            return
        }
        // All checks passed, create a codec that reads direct from the request body
        // untilEOF and writes the response to w and order the server to process a
        // single request.
        codec := NewJSONCodec(&httpReadWriteNopCloser{r.Body, w})
        defer codec.Close()
    
        w.Header().Set("content-type", contentType)
        srv.ServeSingleRequest(codec, OptionMethodInvocation)
    }
    

    实现了http.server的 ServeHTTP(w http.ResponseWriter, r *http.Request)方法。
    先过滤掉非法的请求,对接收到的请求body体,进行JSONCodec封装。
    然后交由 srv.ServeSingleRequest(codec, OptionMethodInvocation)处理。
    接着调用 s.serveRequest(codec, true, options)
    singleShot参数是控制请求时同步还是异步。如果singleShot为true,那么请求的处理是同步的,需要等待处理结果之后才能退出。 singleShot为false,把处理请求的方法交由goroutine异步处理。
    Http RPC的处理是使用同步方式。

    func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecOption) error {
        var pend sync.WaitGroup
    
        defer func() {
            if err := recover(); err != nil {
                const size = 64 << 10
                buf := make([]byte, size)
                buf = buf[:runtime.Stack(buf, false)]
                log.Error(string(buf))
            }
            s.codecsMu.Lock()
            s.codecs.Remove(codec)
            s.codecsMu.Unlock()
        }()
    
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()
    
        // if the codec supports notification include a notifier that callbacks can use
        // to send notification to clients. It is thight to the codec/connection. If the
        // connection is closed the notifier will stop and cancels all active subscriptions.
        if options&OptionSubscriptions == OptionSubscriptions {
            ctx = context.WithValue(ctx, notifierKey{}, newNotifier(codec))
        }
        s.codecsMu.Lock()
        if atomic.LoadInt32(&s.run) != 1 { // server stopped
            s.codecsMu.Unlock()
            return &shutdownError{}
        }
        s.codecs.Add(codec)
        s.codecsMu.Unlock()
    
        // test if the server is ordered to stop
        for atomic.LoadInt32(&s.run) == 1 {
            reqs, batch, err := s.readRequest(codec)
            if err != nil {
                // If a parsing error occurred, send an error
                if err.Error() != "EOF" {
                    log.Debug(fmt.Sprintf("read error %v\n", err))
                    codec.Write(codec.CreateErrorResponse(nil, err))
                }
                // Error or end of stream, wait for requests and tear down
                pend.Wait()
                return nil
            }
    
            // check if server is ordered to shutdown and return an error
            // telling the client that his request failed.
            if atomic.LoadInt32(&s.run) != 1 {
                err = &shutdownError{}
                if batch {
                    resps := make([]interface{}, len(reqs))
                    for i, r := range reqs {
                        resps[i] = codec.CreateErrorResponse(&r.id, err)
                    }
                    codec.Write(resps)
                } else {
                    codec.Write(codec.CreateErrorResponse(&reqs[0].id, err))
                }
                return nil
            }
            // If a single shot request is executing, run and return immediately
            if singleShot {
                if batch {
                    s.execBatch(ctx, codec, reqs)
                } else {
                    s.exec(ctx, codec, reqs[0])
                }
                return nil
            }
            // For multi-shot connections, start a goroutine to serve and loop back
            pend.Add(1)
    
            go func(reqs []*serverRequest, batch bool) {
                defer pend.Done()
                if batch {
                    s.execBatch(ctx, codec, reqs)
                } else {
                    s.exec(ctx, codec, reqs[0])
                }
            }(reqs, batch)
        }
        return nil
    }
    

    当server启动 s.run的值就为1,直到server stop。
    将codec add进s.codecs,codecs是一个set。
    处理完请求数据,返回时需要从s.codecs remove 这个codec
    对s.codecs的add 和 remove需要添加互斥锁,保证s.codecs的线程安全。

    s.readRequest(codec) 处理请求的codec数据。

    func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) {
        reqs, batch, err := codec.ReadRequestHeaders()
        if err != nil {
            return nil, batch, err
        }
    
        requests := make([]*serverRequest, len(reqs))
    
        // verify requests
        for i, r := range reqs {
            var ok bool
            var svc *service
    
            if r.err != nil {
                requests[i] = &serverRequest{id: r.id, err: r.err}
                continue
            }
    
            if r.isPubSub && strings.HasSuffix(r.method, unsubscribeMethodSuffix) {
                requests[i] = &serverRequest{id: r.id, isUnsubscribe: true}
                argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg
                if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {
                    requests[i].args = args
                } else {
                    requests[i].err = &invalidParamsError{err.Error()}
                }
                continue
            }
    
            if svc, ok = s.services[r.service]; !ok { // rpc method isn't available
                requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}
                continue
            }
    
            if r.isPubSub { // eth_subscribe, r.method contains the subscription method name
                if callb, ok := svc.subscriptions[r.method]; ok {
                    requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}
                    if r.params != nil && len(callb.argTypes) > 0 {
                        argTypes := []reflect.Type{reflect.TypeOf("")}
                        argTypes = append(argTypes, callb.argTypes...)
                        if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {
                            requests[i].args = args[1:] // first one is service.method name which isn't an actual argument
                        } else {
                            requests[i].err = &invalidParamsError{err.Error()}
                        }
                    }
                } else {
                    requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}
                }
                continue
            }
    
            if callb, ok := svc.callbacks[r.method]; ok { // lookup RPC method
                requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}
                if r.params != nil && len(callb.argTypes) > 0 {
                    if args, err := codec.ParseRequestArguments(callb.argTypes, r.params); err == nil {
                        requests[i].args = args
                    } else {
                        requests[i].err = &invalidParamsError{err.Error()}
                    }
                }
                continue
            }
    
            requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}
        }
    
        return requests, batch, nil
    }
    

    codec.ReadRequestHeaders()解析了请求数据

    func (c *jsonCodec) ReadRequestHeaders() ([]rpcRequest, bool, Error) {
        c.decMu.Lock()
        defer c.decMu.Unlock()
    
        var incomingMsg json.RawMessage
        if err := c.d.Decode(&incomingMsg); err != nil {
            return nil, false, &invalidRequestError{err.Error()}
        }
    
        if isBatch(incomingMsg) {
            return parseBatchRequest(incomingMsg)
        }
    
        return parseRequest(incomingMsg)
    }
    

    如果请求的数据是一组req数组用parseBatchRequest(incomingMsg)解析,否则用 parseRequest(incomingMsg)。两者处理大同小异。

    func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) {
        var in jsonRequest
        if err := json.Unmarshal(incomingMsg, &in); err != nil {
            return nil, false, &invalidMessageError{err.Error()}
        }
    
        if err := checkReqId(in.Id); err != nil {
            return nil, false, &invalidMessageError{err.Error()}
        }
    
        // subscribe are special, they will always use `subscribeMethod` as first param in the payload
        if strings.HasSuffix(in.Method, subscribeMethodSuffix) {
            reqs := []rpcRequest{{id: &in.Id, isPubSub: true}}
            if len(in.Payload) > 0 {
                // first param must be subscription name
                var subscribeMethod [1]string
                if err := json.Unmarshal(in.Payload, &subscribeMethod); err != nil {
                    log.Debug(fmt.Sprintf("Unable to parse subscription method: %v\n", err))
                    return nil, false, &invalidRequestError{"Unable to parse subscription request"}
                }
    
                reqs[0].service, reqs[0].method = strings.TrimSuffix(in.Method, subscribeMethodSuffix), subscribeMethod[0]
                reqs[0].params = in.Payload
                return reqs, false, nil
            }
            return nil, false, &invalidRequestError{"Unable to parse subscription request"}
        }
    
        if strings.HasSuffix(in.Method, unsubscribeMethodSuffix) {
            return []rpcRequest{{id: &in.Id, isPubSub: true,
                method: in.Method, params: in.Payload}}, false, nil
        }
    
        elems := strings.Split(in.Method, serviceMethodSeparator)
        if len(elems) != 2 {
            return nil, false, &methodNotFoundError{in.Method, ""}
        }
    
        // regular RPC call
        if len(in.Payload) == 0 {
            return []rpcRequest{{service: elems[0], method: elems[1], id: &in.Id}}, false, nil
        }
    
        return []rpcRequest{{service: elems[0], method: elems[1], id: &in.Id, params: in.Payload}}, false, nil
    }
    

    解析出service名字,方法名,id,请求参数组装成rpcRequest对象,并返回。
    readRequest(codec ServerCodec)方法对rpcRequest再处理加工一下,然后返回。

    回到serveRequest方法,继续分析s.exec(ctx, codec, reqs[0])的实现

    func (s *Server) exec(ctx context.Context, codec ServerCodec, req *serverRequest) {
        var response interface{}
        var callback func()
        if req.err != nil {
            response = codec.CreateErrorResponse(&req.id, req.err)
        } else {
            response, callback = s.handle(ctx, codec, req)
        }
    
        if err := codec.Write(response); err != nil {
            log.Error(fmt.Sprintf("%v\n", err))
            codec.Close()
        }
    
        // when request was a subscribe request this allows these subscriptions to be actived
        if callback != nil {
            callback()
        }
    }
    

    交由s.handle(ctx, codec, req)处理

    func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) (interface{}, func()) {
        if req.err != nil {
            return codec.CreateErrorResponse(&req.id, req.err), nil
        }
    
        if req.isUnsubscribe { // cancel subscription, first param must be the subscription id
            if len(req.args) >= 1 && req.args[0].Kind() == reflect.String {
                notifier, supported := NotifierFromContext(ctx)
                if !supported { // interface doesn't support subscriptions (e.g. http)
                    return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil
                }
    
                subid := ID(req.args[0].String())
                if err := notifier.unsubscribe(subid); err != nil {
                    return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
                }
    
                return codec.CreateResponse(req.id, true), nil
            }
            return codec.CreateErrorResponse(&req.id, &invalidParamsError{"Expected subscription id as first argument"}), nil
        }
    
        if req.callb.isSubscribe {
            subid, err := s.createSubscription(ctx, codec, req)
            if err != nil {
                return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
            }
    
            // active the subscription after the sub id was successfully sent to the client
            activateSub := func() {
                notifier, _ := NotifierFromContext(ctx)
                notifier.activate(subid, req.svcname)
            }
    
            return codec.CreateResponse(req.id, subid), activateSub
        }
    
        // regular RPC call, prepare arguments
        if len(req.args) != len(req.callb.argTypes) {
            rpcErr := &invalidParamsError{fmt.Sprintf("%s%s%s expects %d parameters, got %d",
                req.svcname, serviceMethodSeparator, req.callb.method.Name,
                len(req.callb.argTypes), len(req.args))}
            return codec.CreateErrorResponse(&req.id, rpcErr), nil
        }
    
        arguments := []reflect.Value{req.callb.rcvr}
        if req.callb.hasCtx {
            arguments = append(arguments, reflect.ValueOf(ctx))
        }
        if len(req.args) > 0 {
            arguments = append(arguments, req.args...)
        }
    
        // execute RPC method and return result
        reply := req.callb.method.Func.Call(arguments)
        if len(reply) == 0 {
            return codec.CreateResponse(req.id, nil), nil
        }
    
        if req.callb.errPos >= 0 { // test if method returned an error
            if !reply[req.callb.errPos].IsNil() {
                e := reply[req.callb.errPos].Interface().(error)
                res := codec.CreateErrorResponse(&req.id, &callbackError{e.Error()})
                return res, nil
            }
        }
        return codec.CreateResponse(req.id, reply[0].Interface()), nil
    }
    

    跳过对订阅和取消订阅的请求处理。
    reply := req.callb.method.Func.Call(arguments) 执行了RPC方法并返回结果reply。
    codec.CreateResponse(req.id, reply[0].Interface())是rpc.json.go对返回结果的封装。
    回到exec(ctx context.Context, codec ServerCodec, req *serverRequest)方法。codec.Write(response)对返回结果json序列化。
    如果请求方法是订阅执行有回调callback()。

    // 往client写resp
    func (c *jsonCodec) Write(res interface{}) error {
        c.encMu.Lock()
        defer c.encMu.Unlock()
    
        return c.e.Encode(res)
    }
    

    c.e.Encode(res)会调用enc.w.Write(b),这个w就是func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)方法传入的http.ResponseWriter。借用这个writer来实现server和client的通信。

    二,其他RPC 拨号的实现方法
    RPC Client拨号的过程实质是建立client和server的读写通道。
    1,上一节分析的DialHTTPWithClient()方法,RPC的Http服务,创建了一个httpConn通道。
    2,RPC的WebSocket服务,拨号的实现方法:

    func wsDialContext(ctx context.Context, config *websocket.Config) (*websocket.Conn, error) {
        var conn net.Conn
        var err error
        switch config.Location.Scheme {
        case "ws":
            conn, err = dialContext(ctx, "tcp", wsDialAddress(config.Location))
        case "wss":
            dialer := contextDialer(ctx)
            conn, err = tls.DialWithDialer(dialer, "tcp", wsDialAddress(config.Location), config.TlsConfig)
        default:
            err = websocket.ErrBadScheme
        }
        if err != nil {
            return nil, err
        }
        ws, err := websocket.NewClient(config, conn)
        if err != nil {
            conn.Close()
            return nil, err
        }
        return ws, err
    }
    

    dialContext创建了一个ws的net.conn,tls.DialWithDialer创建了一个wss的net.conn
    3,RPC的InProc服务,拨号的实现方法

    func DialInProc(handler *Server) *Client {
        initctx := context.Background()
        c, _ := newClient(initctx, func(context.Context) (net.Conn, error) {
            p1, p2 := net.Pipe()
            go handler.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions)
            return p2, nil
        })
        return c
    }
    

    创建了一个net.Pipe通道
    4,RPC的IPC服务,拨号的实现方法

    func DialIPC(ctx context.Context, endpoint string) (*Client, error) {
        return newClient(ctx, func(ctx context.Context) (net.Conn, error) {
            return newIPCConnection(ctx, endpoint)
        })
    }
    //unix
    func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) {
        return dialContext(ctx, "unix", endpoint)
    }
    //windows
    // newIPCConnection will connect to a named pipe with the given //endpoint as name.
    func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) {
        timeout := defaultPipeDialTimeout
        if deadline, ok := ctx.Deadline(); ok {
            timeout = deadline.Sub(time.Now())
            if timeout < 0 {
                timeout = 0
            }
        }
        return npipe.DialTimeout(endpoint, timeout)
    }
    
    

    如果是unix系统走的是websocket的创建方式,创建一个net.conn通道,
    如果是windows系统用第三方防范,创建了一个net.conn通道

    三,其他RPC Client如何发送请求
    Rpc/client.go 的CallContext()方法,如果不是http请求,选择走c.send(ctx, op, msg)方法。之所以会这样是因为,http是一个短连接,每次请求是同步的,直接返回请求结果。其他的比如IPC、InProc、 websocket都是长连接,每次请求都是异步的,需要在网络线程外监听请求返回的结果。

    // send registers op with the dispatch loop, then sends msg on the connection.
    // if sending fails, op is deregistered.
    func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error {
        select {
        case c.requestOp <- op:
            log.Trace("", "msg", log.Lazy{Fn: func() string {
                return fmt.Sprint("sending ", msg)
            }})
            err := c.write(ctx, msg)
            c.sendDone <- err
            return err
        case <-ctx.Done():
            // This can happen if the client is overloaded or unable to keep up with
            // subscription notifications.
            return ctx.Err()
        case <-c.didQuit:
            return ErrClientQuit
        }
    

    这时候请求被select阻塞住,直到c.requestOp receive到op,或者receive 到 ctx.Done(),或receive到 c.didQuit。c.requestOp拿到op,调用write方法把请求的内容写到conn通道去。然后发送给sendDone chan,client的dispactch方法会收到这个结果。

    func (c *Client) dispatch(conn net.Conn) {
        // Spawn the initial read loop.
        go c.read(conn)
    
        var (
            lastOp        *requestOp    // tracks last send operation
            requestOpLock = c.requestOp // nil while the send lock is held
            reading       = true        // if true, a read loop is running
        )
        defer close(c.didQuit)
        defer func() {
            c.closeRequestOps(ErrClientQuit)
            conn.Close()
            if reading {
                // Empty read channels until read is dead.
                for {
                    select {
                    case <-c.readResp:
                    case <-c.readErr:
                        return
                    }
                }
            }
        }()
    
        for {
            select {
            case <-c.close:
                return
    
            // Read path.
            case batch := <-c.readResp:
                for _, msg := range batch {
                    switch {
                    case msg.isNotification():
                        log.Trace("", "msg", log.Lazy{Fn: func() string {
                            return fmt.Sprint("<-readResp: notification ", msg)
                        }})
                        c.handleNotification(msg)
                    case msg.isResponse():
                        log.Trace("", "msg", log.Lazy{Fn: func() string {
                            return fmt.Sprint("<-readResp: response ", msg)
                        }})
                        c.handleResponse(msg)
                    default:
                        log.Debug("", "msg", log.Lazy{Fn: func() string {
                            return fmt.Sprint("<-readResp: dropping weird message", msg)
                        }})
                        // TODO: maybe close
                    }
                }
    
            case err := <-c.readErr:
                log.Debug(fmt.Sprintf("<-readErr: %v", err))
                c.closeRequestOps(err)
                conn.Close()
                reading = false
    
            case newconn := <-c.reconnected:
                log.Debug(fmt.Sprintf("<-reconnected: (reading=%t) %v", reading, conn.RemoteAddr()))
                if reading {
                    // Wait for the previous read loop to exit. This is a rare case.
                    conn.Close()
                    <-c.readErr
                }
                go c.read(newconn)
                reading = true
                conn = newconn
    
            // Send path.
            case op := <-requestOpLock:
                // Stop listening for further send ops until the current one is done.
                requestOpLock = nil
                lastOp = op
                for _, id := range op.ids {
                    c.respWait[string(id)] = op
                }
    
            case err := <-c.sendDone:
                if err != nil {
                    // Remove response handlers for the last send. We remove those here
                    // because the error is already handled in Call or BatchCall. When the
                    // read loop goes down, it will signal all other current operations.
                    for _, id := range lastOp.ids {
                        delete(c.respWait, string(id))
                    }
                }
                // Listen for send ops again.
                requestOpLock = c.requestOp
                lastOp = nil
            }
        }
    }
    

    这个dispatch()方法也是配套给非http请求用的。通过goroutine c.read(conn)。来读server通过conn返回的数据。

    func (c *Client) read(conn net.Conn) error {
        var (
            buf json.RawMessage
            dec = json.NewDecoder(conn)
        )
        readMessage := func() (rs []*jsonrpcMessage, err error) {
            buf = buf[:0]
            if err = dec.Decode(&buf); err != nil {
                return nil, err
            }
            if isBatch(buf) {
                err = json.Unmarshal(buf, &rs)
            } else {
                rs = make([]*jsonrpcMessage, 1)
                err = json.Unmarshal(buf, &rs[0])
            }
            return rs, err
        }
    
        for {
            resp, err := readMessage()
            if err != nil {
                c.readErr <- err
                return err
            }
            c.readResp <- resp
        }
    }
    

    然后把server返回数据send 到c.readResp chan。
    dispatch的 select case batch := <-c.readResp: receive到c.readResp。如果这个请求的是通知,走通知的响应,否则走c.handleResponse(msg)

    func (c *Client) handleResponse(msg *jsonrpcMessage) {
        op := c.respWait[string(msg.ID)]
        if op == nil {
            log.Debug(fmt.Sprintf("unsolicited response %v", msg))
            return
        }
        delete(c.respWait, string(msg.ID))
        // For normal responses, just forward the reply to Call/BatchCall.
        if op.sub == nil {
            op.resp <- msg
            return
        }
        // For subscription responses, start the subscription if the server
        // indicates success. EthSubscribe gets unblocked in either case through
        // the op.resp channel.
        defer close(op.resp)
        if msg.Error != nil {
            op.err = msg.Error
            return
        }
        if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil {
            go op.sub.start()
            c.subs[op.sub.subid] = op.sub
        }
    }
    

    这时候把返回数据send给op.resp <- msg。 后续处理和http RPC的处理一致,走到CallContext方法的 resp, err := op.wait(ctx)。

    四,总结

    go-ethereum有四种RPC。HTTP RPC、Inproc RPC、IPC RPC、WS RPC。它们主要的实现逻辑都在rpc/server.go和rpc/client.go。各自根据自己的实现方式派生自己的client实例,建立各自的net.conn通道。由于HTTP RPC是基于短链接请求,实现方式和其他的不太一样。

    相关文章

      网友评论

          本文标题:以太坊源码深入分析(4)-- 以太坊RPC通信实例和原理代码分析

          本文链接:https://www.haomeiwen.com/subject/jjqzkftx.html