美文网首页
以太坊源码阅读-网络处理-RPC

以太坊源码阅读-网络处理-RPC

作者: 区块链布道者Enoch | 来源:发表于2018-08-27 22:57 被阅读0次

    server.go 实现了RPC服务端的核心逻辑,包括注册、读取请求、处理请求、发送回应等逻辑。
    // Server represents a RPC server
    type Server struct {
    services serviceRegistry

    run      int32
    codecsMu sync.Mutex
    codecs   mapset.Set
    

    }

    // callback is a method callback which was registered in the server
    type callback struct {
    rcvr reflect.Value // receiver of method
    method reflect.Method // callback
    argTypes []reflect.Type // input argument types
    hasCtx bool // method's first argument is a context (not included in argTypes)
    errPos int // err return idx, of -1 when method cannot return error
    isSubscribe bool // indication if the callback is a subscription
    }

    // service represents a registered object
    type service struct {
    name string // name for service
    typ reflect.Type // receiver type
    callbacks callbacks // registered handlers
    subscriptions subscriptions // available subscriptions/notifications
    }

    Server创建的时候通过调用RegisterName注册自己的实例。
    // NewServer will create a new server instance with no registered handlers.
    func NewServer() *Server {
    server := &Server{
    services: make(serviceRegistry),
    codecs: mapset.NewSet(),
    run: 1,
    }

    // register a default service which will provide meta information about the RPC service such as the services and
    // methods it offers.
    rpcService := &RPCService{server}
    server.RegisterName(MetadataApi, rpcService)
    
    return server
    

    }

    服务注册RegisterName,该方法会通过传入的参数创建一个service对象,如果没有找到合适的方法则返回错误,如果没有错误,则将创建的service实例加入serviceRegistry。
    // RegisterName will create a service for the given rcvr type under the given name. When no methods on the given rcvr
    // match the criteria to be either a RPC method or a subscription an error is returned. Otherwise a new service is
    // created and added to the service collection this server instance serves.
    func (s *Server) RegisterName(name string, rcvr interface{}) error {
    if s.services == nil {
    s.services = make(serviceRegistry)
    }

    svc := new(service)
    svc.typ = reflect.TypeOf(rcvr)
    rcvrVal := reflect.ValueOf(rcvr)
    
    if name == "" {
        return fmt.Errorf("no service name for type %s", svc.typ.String())
    }
    if !isExported(reflect.Indirect(rcvrVal).Type().Name()) {
        return fmt.Errorf("%s is not exported", reflect.Indirect(rcvrVal).Type().Name())
    }
    
    methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ)
    
    if len(methods) == 0 && len(subscriptions) == 0 {
        return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
    }
    
    // already a previous service register under given name, merge methods/subscriptions
    if regsvc, present := s.services[name]; present {
        for _, m := range methods {
            regsvc.callbacks[formatName(m.method.Name)] = m
        }
        for _, s := range subscriptions {
            regsvc.subscriptions[formatName(s.method.Name)] = s
        }
        return nil
    }
    
    svc.name = name
    svc.callbacks, svc.subscriptions = methods, subscriptions
    
    s.services[svc.name] = svc
    return nil
    

    }

    suitableCallbacks()遍历这个类型的所有方法并返回。
    // suitableCallbacks iterates over the methods of the given type. It will determine if a method satisfies the criteria
    // for a RPC callback or a subscription callback and adds it to the collection of callbacks or subscriptions. See server
    // documentation for a summary of these criteria.
    func suitableCallbacks(rcvr reflect.Value, typ reflect.Type) (callbacks, subscriptions) {
    callbacks := make(callbacks)
    subscriptions := make(subscriptions)

    METHODS:
    for m := 0; m < typ.NumMethod(); m++ {
    method := typ.Method(m)
    mtype := method.Type
    mname := formatName(method.Name)
    if method.PkgPath != "" { // method must be exported
    continue
    }

        var h callback
        h.isSubscribe = isPubSub(mtype)
        h.rcvr = rcvr
        h.method = method
        h.errPos = -1
    
        firstArg := 1
        numIn := mtype.NumIn()
        if numIn >= 2 && mtype.In(1) == contextType {
            h.hasCtx = true
            firstArg = 2
        }
    
        if h.isSubscribe {
            h.argTypes = make([]reflect.Type, numIn-firstArg) // skip rcvr type
            for i := firstArg; i < numIn; i++ {
                argType := mtype.In(i)
                if isExportedOrBuiltinType(argType) {
                    h.argTypes[i-firstArg] = argType
                } else {
                    continue METHODS
                }
            }
    
            subscriptions[mname] = &h
            continue METHODS
        }
    
        // determine method arguments, ignore first arg since it's the receiver type
        // Arguments must be exported or builtin types
        h.argTypes = make([]reflect.Type, numIn-firstArg)
        for i := firstArg; i < numIn; i++ {
            argType := mtype.In(i)
            if !isExportedOrBuiltinType(argType) {
                continue METHODS
            }
            h.argTypes[i-firstArg] = argType
        }
    
        // check that all returned values are exported or builtin types
        for i := 0; i < mtype.NumOut(); i++ {
            if !isExportedOrBuiltinType(mtype.Out(i)) {
                continue METHODS
            }
        }
    
        // when a method returns an error it must be the last returned value
        h.errPos = -1
        for i := 0; i < mtype.NumOut(); i++ {
            if isErrorType(mtype.Out(i)) {
                h.errPos = i
                break
            }
        }
    
        if h.errPos >= 0 && h.errPos != mtype.NumOut()-1 {
            continue METHODS
        }
    
        switch mtype.NumOut() {
        case 0, 1, 2:
            if mtype.NumOut() == 2 && h.errPos == -1 { // method must one return value and 1 error
                continue METHODS
            }
            callbacks[mname] = &h
        }
    }
    
    return callbacks, subscriptions
    

    }

    server启动和服务,在ipc.go中,可以看到每Accept(),则启动一个goroutine调用srv.ServeCodec来进行服务。
    func (srv *Server) ServeListener(l net.Listener) error {
    for {
    conn, err := l.Accept()
    if err != nil {
    return err
    }
    log.Trace(fmt.Sprint("accepted conn", conn.RemoteAddr()))
    go srv.ServeCodec(NewJSONCodec(conn), OptionMethodInvocation|OptionSubscriptions)
    }
    }

    func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) {
    defer codec.Close()
    s.serveRequest(codec, false, options)
    }

    serveRequest从codec读取请求,调用对应方法并返回至codec,sync.WaitGroup实现了一个信号量的功能,context实现上下文管理。
    func (s *Server) serveRequest(ctx context.Context, codec ServerCodec, singleShot bool, options CodecOption) error {
    var pend sync.WaitGroup

    defer func() {
        if err := recover(); err != nil {
            const size = 64 << 10
            buf := make([]byte, size)
            buf = buf[:runtime.Stack(buf, false)]
            log.Error(string(buf))
        }
        s.codecsMu.Lock()
        s.codecs.Remove(codec)
        s.codecsMu.Unlock()
    }()
    
    //  ctx, cancel := context.WithCancel(context.Background())
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
        //如果codec支持,可以通过一个叫notifier的对象执行回调函数发送消息给客户端。
    // if the codec supports notification include a notifier that callbacks can use
    // to send notification to clients. It is tied to the codec/connection. If the
    // connection is closed the notifier will stop and cancels all active subscriptions.
    if options&OptionSubscriptions == OptionSubscriptions {
        ctx = context.WithValue(ctx, notifierKey{}, newNotifier(codec))
    }
    s.codecsMu.Lock()
    if atomic.LoadInt32(&s.run) != 1 { // server stopped
        s.codecsMu.Unlock()
        return &shutdownError{}
    }
    s.codecs.Add(codec)
    s.codecsMu.Unlock()
    
    // test if the server is ordered to stop
    for atomic.LoadInt32(&s.run) == 1 {
        reqs, batch, err := s.readRequest(codec)
        if err != nil {
            // If a parsing error occurred, send an error
            if err.Error() != "EOF" {
                log.Debug(fmt.Sprintf("read error %v\n", err))
                codec.Write(codec.CreateErrorResponse(nil, err))
            }
            // Error or end of stream, wait for requests and tear down
            pend.Wait()
            return nil
        }
    
        // check if server is ordered to shutdown and return an error
        // telling the client that his request failed.
        if atomic.LoadInt32(&s.run) != 1 {
            err = &shutdownError{}
            if batch {
                resps := make([]interface{}, len(reqs))
                for i, r := range reqs {
                    resps[i] = codec.CreateErrorResponse(&r.id, err)
                }
                codec.Write(resps)
            } else {
                codec.Write(codec.CreateErrorResponse(&reqs[0].id, err))
            }
            return nil
        }
        // If a single shot request is executing, run and return immediately
        if singleShot {
            if batch {
                s.execBatch(ctx, codec, reqs)
            } else {
                s.exec(ctx, codec, reqs[0])
            }
            return nil
        }
        // For multi-shot connections, start a goroutine to serve and loop back
        pend.Add(1)
    
        go func(reqs []*serverRequest, batch bool) {
            defer pend.Done()
            if batch {
                s.execBatch(ctx, codec, reqs)
            } else {
                s.exec(ctx, codec, reqs[0])
            }
        }(reqs, batch)
    }
    return nil
    

    }

    readRequest方法, 从codec读取请求,查找对应的方法组装成rpcRequest。
    type rpcRequest struct {
    service string
    method string
    id interface{}
    isPubSub bool
    params interface{}
    err Error // invalid batch element
    }
    然后返回serverRequest
    // serverRequest is an incoming request
    type serverRequest struct {
    id interface{}
    svcname string
    callb *callback
    args []reflect.Value
    isUnsubscribe bool
    err Error
    }

    readRequest方法,从codec读取请求,对请求进行处理生成serverRequest对象返回。
    func (s Server) readRequest(codec ServerCodec) ([]serverRequest, bool, Error) {
    reqs, batch, err := codec.ReadRequestHeaders()
    if err != nil {
    return nil, batch, err
    }

    requests := make([]*serverRequest, len(reqs))
        //根据reqs构建requests
    // verify requests
    for i, r := range reqs {
        var ok bool
        var svc *service
    
        if r.err != nil {
            requests[i] = &serverRequest{id: r.id, err: r.err}
            continue
        }
                //如果请求时发送/订阅的请求,而且方法名称有_unsuvscribe后缀。
        if r.isPubSub && strings.HasSuffix(r.method, unsubscribeMethodSuffix) {
            requests[i] = &serverRequest{id: r.id, isUnsubscribe: true}
            argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg
            if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {
                requests[i].args = args
            } else {
                requests[i].err = &invalidParamsError{err.Error()}
            }
            continue
        }
                //如果没有注册这个方法
        if svc, ok = s.services[r.service]; !ok { // rpc method isn't available
            requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}
            continue
        }
                //如果是发布和订阅模式,调用订阅方法。
        if r.isPubSub { // eth_subscribe, r.method contains the subscription method name
            if callb, ok := svc.subscriptions[r.method]; ok {
                requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}
                if r.params != nil && len(callb.argTypes) > 0 {
                    argTypes := []reflect.Type{reflect.TypeOf("")}
                    argTypes = append(argTypes, callb.argTypes...)
                    if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {
                        requests[i].args = args[1:] // first one is service.method name which isn't an actual argument
                    } else {
                        requests[i].err = &invalidParamsError{err.Error()}
                    }
                }
            } else {
                requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}
            }
            continue
        }
    
        if callb, ok := svc.callbacks[r.method]; ok { // lookup RPC method
            requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}
            if r.params != nil && len(callb.argTypes) > 0 {
                if args, err := codec.ParseRequestArguments(callb.argTypes, r.params); err == nil {
                    requests[i].args = args
                } else {
                    requests[i].err = &invalidParamsError{err.Error()}
                }
            }
            continue
        }
    
        requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}
    }
    
    return requests, batch, nil
    

    }

    exec&execBatch方法,调用s.handle方法对request进行处理。
    // exec executes the given request and writes the result back using the codec.
    func (s *Server) exec(ctx context.Context, codec ServerCodec, req *serverRequest) {
    var response interface{}
    var callback func()
    if req.err != nil {
    response = codec.CreateErrorResponse(&req.id, req.err)
    } else {
    response, callback = s.handle(ctx, codec, req)
    }

    if err := codec.Write(response); err != nil {
        log.Error(fmt.Sprintf("%v\n", err))
        codec.Close()
    }
    
    // when request was a subscribe request this allows these subscriptions to be actived
    if callback != nil {
        callback()
    }
    

    }

    // execBatch executes the given requests and writes the result back using the codec.
    // It will only write the response back when the last request is processed.
    func (s Server) execBatch(ctx context.Context, codec ServerCodec, requests []serverRequest) {
    responses := make([]interface{}, len(requests))
    var callbacks []func()
    for i, req := range requests {
    if req.err != nil {
    responses[i] = codec.CreateErrorResponse(&req.id, req.err)
    } else {
    var callback func()
    if responses[i], callback = s.handle(ctx, codec, req); callback != nil {
    callbacks = append(callbacks, callback)
    }
    }
    }

    if err := codec.Write(responses); err != nil {
        log.Error(fmt.Sprintf("%v\n", err))
        codec.Close()
    }
    
    // when request holds one of more subscribe requests this allows these subscriptions to be activated
    for _, c := range callbacks {
        c()
    }
    

    }

    handle方法,执行一个request,然后返回response。
    // handle executes a request and returns the response from the callback.
    func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) (interface{}, func()) {
    if req.err != nil {
    return codec.CreateErrorResponse(&req.id, req.err), nil
    }
    //取消订阅消息,通过NotifierFromContext获取存入ctx的notifier。
    if req.isUnsubscribe { // cancel subscription, first param must be the subscription id
    if len(req.args) >= 1 && req.args[0].Kind() == reflect.String {
    notifier, supported := NotifierFromContext(ctx)
    if !supported { // interface doesn't support subscriptions (e.g. http)
    return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil
    }

            subid := ID(req.args[0].String())
            if err := notifier.unsubscribe(subid); err != nil {
                return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
            }
    
            return codec.CreateResponse(req.id, true), nil
        }
        return codec.CreateErrorResponse(&req.id, &invalidParamsError{"Expected subscription id as first argument"}), nil
    }
        //如果是订阅消息,创建订阅并激活
    if req.callb.isSubscribe {
        subid, err := s.createSubscription(ctx, codec, req)
        if err != nil {
            return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
        }
    
        // active the subscription after the sub id was successfully sent to the client
        activateSub := func() {
            notifier, _ := NotifierFromContext(ctx)
            notifier.activate(subid, req.svcname)
        }
    
        return codec.CreateResponse(req.id, subid), activateSub
    }
    
    // regular RPC call, prepare arguments
    if len(req.args) != len(req.callb.argTypes) {
        rpcErr := &invalidParamsError{fmt.Sprintf("%s%s%s expects %d parameters, got %d",
            req.svcname, serviceMethodSeparator, req.callb.method.Name,
            len(req.callb.argTypes), len(req.args))}
        return codec.CreateErrorResponse(&req.id, rpcErr), nil
    }
    
    arguments := []reflect.Value{req.callb.rcvr}
    if req.callb.hasCtx {
        arguments = append(arguments, reflect.ValueOf(ctx))
    }
    if len(req.args) > 0 {
        arguments = append(arguments, req.args...)
    }
    
    // execute RPC method and return result
    reply := req.callb.method.Func.Call(arguments)
    if len(reply) == 0 {
        return codec.CreateResponse(req.id, nil), nil
    }
    if req.callb.errPos >= 0 { // test if method returned an error
        if !reply[req.callb.errPos].IsNil() {
            e := reply[req.callb.errPos].Interface().(error)
            res := codec.CreateErrorResponse(&req.id, &callbackError{e.Error()})
            return res, nil
        }
    }
    return codec.CreateResponse(req.id, reply[0].Interface()), nil
    

    }

    subscription.go发布订阅模式
    在服务一个客户端连接时候,调用newNotifier方法创建了一个notifier对象存储到ctx中。可以观察到Notifier对象保存了codec的实例,也就是说Notifier对象保存了网络连接,用来在需要的时候发送数据。
    // newNotifier creates a new notifier that can be used to send subscription
    // notifications to the client.
    func newNotifier(codec ServerCodec) Notifier {
    return &Notifier{
    codec: codec,
    active: make(map[ID]
    Subscription),
    inactive: make(map[ID]*Subscription),
    }
    }

    createSubscription方法会调用指定的注册上来的方法,并得到回应。

    // createSubscription will call the subscription callback and returns the subscription id or error.
    func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (ID, error) {
    // subscription have as first argument the context following optional arguments
    args := []reflect.Value{req.callb.rcvr, reflect.ValueOf(ctx)}
    args = append(args, req.args...)
    reply := req.callb.method.Func.Call(args)

    if !reply[1].IsNil() { // subscription creation failed
        return "", reply[1].Interface().(error)
    }
    
    return reply[0].Interface().(*Subscription).ID, nil
    

    }

    在来看看我们的activate方法,这个方法激活了subscription。 subscription在subscription ID被发送给客户端之后被激活,避免客户端还没有收到subscription ID的时候就收到了subscription信息。

    // activate enables a subscription. Until a subscription is enabled all
    // notifications are dropped. This method is called by the RPC server after
    // the subscription ID was sent to client. This prevents notifications being
    // send to the client before the subscription ID is send to the client.
    func (n *Notifier) activate(id ID, namespace string) {
    n.subMu.Lock()
    defer n.subMu.Unlock()
    if sub, found := n.inactive[id]; found {
    sub.namespace = namespace
    n.active[id] = sub
    delete(n.inactive, id)
    }
    }

    取消订阅的函数
    // unsubscribe a subscription.
    // If the subscription could not be found ErrSubscriptionNotFound is returned.
    func (n *Notifier) unsubscribe(id ID) error {
    n.subMu.Lock()
    defer n.subMu.Unlock()
    if s, found := n.active[id]; found {
    close(s.err)
    delete(n.active, id)
    return nil
    }
    return ErrSubscriptionNotFound
    }

    最后是一个发送订阅的函数,调用这个函数把数据发送到客户端, 这个也比较简单。
    // Notify sends a notification to the client with the given data as payload.
    // If an error occurs the RPC connection is closed and the error is returned.
    func (n *Notifier) Notify(id ID, data interface{}) error {
    n.subMu.RLock()
    defer n.subMu.RUnlock()
    sub, active := n.active[id]
    if active {
    notification := n.codec.CreateNotification(string(id), sub.namespace, data)
    if err := n.codec.Write(notification); err != nil {
    n.codec.Close()
    return err
    }
    }
    return nil
    }

    client.go
    先来看看客户端的数据结构
    // Client represents a connection to an RPC server.
    type Client struct {
    idCounter uint32
    connectFunc func(ctx context.Context) (net.Conn, error)
    isHTTP bool
    // writeConn is only safe to access outside dispatch, with the
    // write lock held. The write lock is taken by sending on
    // requestOp and released by sending on sendDone.
    writeConn net.Conn
    // for dispatch
    close chan struct{}
    didQuit chan struct{} // closed when client quits
    reconnected chan net.Conn // where write/reconnect sends the new connection
    readErr chan error // errors from read
    readResp chan []jsonrpcMessage // valid messages from read
    requestOp chan requestOp // for registering response IDs
    sendDone chan error // signals write completion, releases write lock
    respWait map[string]
    requestOp // active requests
    subs map[string]
    ClientSubscription // active subscriptions
    }

    newClient, 新建一个客户端。 通过调用connectFunc方法来获取一个网络连接,如果网络连接是httpConn对象的化,那么isHTTP设置为true。然后是对象的初始化, 如果是HTTP连接的化,直接返回,否者就启动一个goroutine调用dispatch方法。 dispatch方法是整个client的指挥中心,通过上面提到的channel来和其他的goroutine来进行通信,获取信息,根据信息做出各种决策。后续会详细介绍dispatch。 因为HTTP的调用方式非常简单, 这里先对HTTP的方式做一个简单的阐述。
    func newClient(initctx context.Context, connectFunc func(context.Context) (net.Conn, error)) (Client, error) {
    //调用connectFunc方法来获取一个网络连接
    conn, err := connectFunc(initctx)
    if err != nil {
    return nil, err
    }
    _, isHTTP := conn.(
    httpConn)
    c := &Client{
    writeConn: conn,
    isHTTP: isHTTP,
    connectFunc: connectFunc,
    close: make(chan struct{}),
    didQuit: make(chan struct{}),
    reconnected: make(chan net.Conn),
    readErr: make(chan error),
    readResp: make(chan []jsonrpcMessage),
    requestOp: make(chan requestOp),
    sendDone: make(chan error, 1),
    respWait: make(map[string]
    requestOp),
    subs: make(map[string]
    ClientSubscription),
    }
    if !isHTTP {
    go c.dispatch(conn)
    }
    return c, nil
    }

    请求通过调用client的call方法来进行RPC调用
    // Call performs a JSON-RPC call with the given arguments and unmarshals into
    // result if no error occurred.
    // The result must be a pointer so that package json can unmarshal into it. You
    // can also pass nil, in which case the result is ignored.
    func (c *Client) Call(result interface{}, method string, args ...interface{}) error {
    ctx := context.Background()
    return c.CallContext(ctx, result, method, args...)
    }

    func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
    msg, err := c.newMessage(method, args...)
    if err != nil {
    return err
    }
    op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}
    if c.isHTTP {
    err = c.sendHTTP(ctx, op, msg)
    } else {
    err = c.send(ctx, op, msg)
    }
    if err != nil {
    return err
    }
    // dispatch has accepted the request and will close the channel when it quits.
    switch resp, err := op.wait(ctx); {
    case err != nil:
    return err
    case resp.Error != nil:
    return resp.Error
    case len(resp.Result) == 0:
    return ErrNoResult
    default:
    return json.Unmarshal(resp.Result, &result)
    }
    }

    sendHTTP,这个方法直接调用doRequest方法进行请求拿到回应。然后写入到resp队列就返回了。

    func (c *Client) sendHTTP(ctx context.Context, op requestOp, msg interface{}) error {
    hc := c.writeConn.(
    httpConn)
    respBody, err := hc.doRequest(ctx, msg)
    if err != nil {
    return err
    }
    defer respBody.Close()
    var respmsg jsonrpcMessage
    if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil {
    return err
    }
    op.resp <- &respmsg
    return nil
    }

    在看看上面的另一个方法 op.wait()方法,这个方法会查看两个队列的信息。如果是http那么从resp队列获取到回应就会直接返回。 这样整个HTTP的请求过程就完成了。 中间没有涉及到多线程问题,都在一个线程内部完成了。
    func (op requestOp) wait(ctx context.Context) (jsonrpcMessage, error) {
    select {
    case <-ctx.Done():
    return nil, ctx.Err()
    case resp := <-op.resp:
    return resp, op.err
    }
    }

    // send registers op with the dispatch loop, then sends msg on the connection.
    // if sending fails, op is deregistered.
    func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error {
    select {
    case c.requestOp <- op:
    log.Trace("", "msg", log.Lazy{Fn: func() string {
    return fmt.Sprint("sending ", msg)
    }})
    err := c.write(ctx, msg)
    c.sendDone <- err
    return err
    case <-ctx.Done():
    // This can happen if the client is overloaded or unable to keep up with
    // subscription notifications.
    return ctx.Err()
    case <-c.didQuit:
    //已经退出,可能被调用了Close
    return ErrClientQuit
    }
    }

    dispatch方法
    // dispatch is the main loop of the client.
    // It sends read messages to waiting calls to Call and BatchCall
    // and subscription notifications to registered subscriptions.
    func (c *Client) dispatch(conn net.Conn) {
    // Spawn the initial read loop.
    go c.read(conn)
    var (
    lastOp *requestOp // tracks last send operation
    requestOpLock = c.requestOp // nil while the send lock is held
    reading = true // if true, a read loop is running
    )
    defer close(c.didQuit)
    defer func() {
    c.closeRequestOps(ErrClientQuit)
    conn.Close()
    if reading {
    // Empty read channels until read is dead.
    for {
    select {
    case <-c.readResp:
    case <-c.readErr:
    return
    }
    }
    }
    }()

    for {
        select {
        case <-c.close:
            return
    
        // Read path.
        case batch := <-c.readResp:
            for _, msg := range batch {
                switch {
                case msg.isNotification():
                    log.Trace("", "msg", log.Lazy{Fn: func() string {
                        return fmt.Sprint("<-readResp: notification ", msg)
                    }})
                    c.handleNotification(msg)
                case msg.isResponse():
                    log.Trace("", "msg", log.Lazy{Fn: func() string {
                        return fmt.Sprint("<-readResp: response ", msg)
                    }})
                    c.handleResponse(msg)
                default:
                    log.Debug("", "msg", log.Lazy{Fn: func() string {
                        return fmt.Sprint("<-readResp: dropping weird message", msg)
                    }})
                    // TODO: maybe close
                }
            }
    
        case err := <-c.readErr:
            log.Debug("<-readErr", "err", err)
            c.closeRequestOps(err)
            conn.Close()
            reading = false
    
        case newconn := <-c.reconnected:
            log.Debug("<-reconnected", "reading", reading, "remote", conn.RemoteAddr())
            if reading {
                // Wait for the previous read loop to exit. This is a rare case.
                conn.Close()
                <-c.readErr
            }
            go c.read(newconn)
            reading = true
            conn = newconn
    
        // Send path.
        case op := <-requestOpLock:
            // Stop listening for further send ops until the current one is done.
            requestOpLock = nil
            lastOp = op
            for _, id := range op.ids {
                c.respWait[string(id)] = op
            }
    
        case err := <-c.sendDone:
            if err != nil {
                // Remove response handlers for the last send. We remove those here
                // because the error is already handled in Call or BatchCall. When the
                // read loop goes down, it will signal all other current operations.
                for _, id := range lastOp.ids {
                    delete(c.respWait, string(id))
                }
            }
            // Listen for send ops again.
            requestOpLock = c.requestOp
            lastOp = nil
        }
    }
    

    }

    客户端-订阅模式的特殊处理
    以太坊的RPC框架支持发布和订阅的模式。
    //Subscribe会使用传入的参数调用"<namespace>_subscribe"方法来订阅指定的消息。
    //服务器的通知会写入channel参数指定的队列。 channel参数必须和返回的类型相同。
    //ctx参数可以用来取消RPC的请求,但是如果订阅已经完成就不会有效果了。
    //处理速度太慢的订阅者的消息会被删除,每个客户端有8000个消息的缓存。
    func (c Client) Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (ClientSubscription, error) {
    // Check type of channel first.
    chanVal := reflect.ValueOf(channel)
    if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 {
    panic("first argument to Subscribe must be a writable channel")
    }
    if chanVal.IsNil() {
    panic("channel given to Subscribe must not be nil")
    }
    if c.isHTTP {
    return nil, ErrNotificationsUnsupported
    }

    msg, err := c.newMessage(namespace+subscribeMethodSuffix, args...)
    if err != nil {
        return nil, err
    }
    //requestOp的参数和Call调用的不一样。 多了一个参数sub.
    op := &requestOp{
        ids:  []json.RawMessage{msg.ID},
        resp: make(chan *jsonrpcMessage),
        sub:  newClientSubscription(c, namespace, chanVal),
    }
    
    // Send the subscription request.
    // The arrival and validity of the response is signaled on sub.quit.
    if err := c.send(ctx, op, msg); err != nil {
        return nil, err
    }
    if _, err := op.wait(ctx); err != nil {
        return nil, err
    }
    return op.sub, nil
    

    }
    newClientSubscription方法,这个方法创建了一个新的对象ClientSubscription,这个对象把传入的channel参数保存起来。 然后自己又创建了三个chan对象。后续会对详细介绍这三个chan对象

    func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription {
    sub := &ClientSubscription{
    client: c,
    namespace: namespace,
    etype: channel.Type().Elem(),
    channel: channel,
    quit: make(chan struct{}),
    err: make(chan error, 1),
    in: make(chan json.RawMessage),
    }
    return sub
    }
    从上面的代码可以看出。订阅过程根Call过程差不多,构建一个订阅请求。调用send发送到网络上,然后等待返回。 我们通过dispatch对返回结果的处理来看看订阅和Call的不同。

    func (c *Client) handleResponse(msg *jsonrpcMessage) {
    op := c.respWait[string(msg.ID)]
    if op == nil {
    log.Debug(fmt.Sprintf("unsolicited response %v", msg))
    return
    }
    delete(c.respWait, string(msg.ID))
    // For normal responses, just forward the reply to Call/BatchCall.
    如果op.sub是nil,普通的RPC请求,这个字段的值是空白的,只有订阅请求才有值。
    if op.sub == nil {
    op.resp <- msg
    return
    }
    // For subscription responses, start the subscription if the server
    // indicates success. EthSubscribe gets unblocked in either case through
    // the op.resp channel.
    defer close(op.resp)
    if msg.Error != nil {
    op.err = msg.Error
    return
    }
    if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil {
    //启动一个新的goroutine 并把op.sub.subid记录起来。
    go op.sub.start()
    c.subs[op.sub.subid] = op.sub
    }
    }

    op.sub.start方法,专门用来处理订阅消息。主要的功能是从in队列里面获取订阅消息,然后把订阅消息放到buffer里面。 如果数据能够发送。就从buffer里面发送一些数据给用户传入的那个channel。 如果buffer超过指定的大小,就丢弃。
    func (sub *ClientSubscription) start() {
    sub.quitWithError(sub.forward())
    }

    func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) {
    cases := []reflect.SelectCase{
    {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)},
    {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)},
    {Dir: reflect.SelectSend, Chan: sub.channel},
    }
    buffer := list.New()
    defer buffer.Init()
    for {
    var chosen int
    var recv reflect.Value
    if buffer.Len() == 0 {
    // Idle, omit send case.
    chosen, recv, _ = reflect.Select(cases[:2])
    } else {
    // Non-empty buffer, send the first queued item.
    cases[2].Send = reflect.ValueOf(buffer.Front().Value)
    chosen, recv, _ = reflect.Select(cases)
    }

        switch chosen {
        case 0: // <-sub.quit
            return nil, false
        case 1: // <-sub.in
            val, err := sub.unmarshal(recv.Interface().(json.RawMessage))
            if err != nil {
                return err, true
            }
            if buffer.Len() == maxClientSubscriptionBuffer {
                return ErrSubscriptionQueueOverflow, true
            }
            buffer.PushBack(val)
        case 2: // sub.channel<-
            cases[2].Send = reflect.Value{} // Don't hold onto the value.
            buffer.Remove(buffer.Front())
        }
    }
    

    }
    当接收到一条Notification消息的时候会调用handleNotification方法。会把消息传送给in队列。
    [图片上传失败...(image-b9d024-1535549234468)]
    func (c *Client) handleNotification(msg *jsonrpcMessage) {
    if !strings.HasSuffix(msg.Method, notificationMethodSuffix) {
    log.Debug(fmt.Sprint("dropping non-subscription message: ", msg))
    return
    }
    var subResult struct {
    ID string json:"subscription"
    Result json.RawMessage json:"result"
    }
    if err := json.Unmarshal(msg.Params, &subResult); err != nil {
    log.Debug(fmt.Sprint("dropping invalid subscription message: ", msg))
    return
    }
    if c.subs[subResult.ID] != nil {
    c.subs[subResult.ID].deliver(subResult.Result)
    }
    }
    func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) {
    select {
    case sub.in <- result:
    return true
    case <-sub.quit:
    return false
    }
    }

    相关文章

      网友评论

          本文标题:以太坊源码阅读-网络处理-RPC

          本文链接:https://www.haomeiwen.com/subject/kzgtwftx.html