美文网首页
从零实现简单的go微服务治理框架

从零实现简单的go微服务治理框架

作者: 炎炎daddy | 来源:发表于2019-12-06 21:16 被阅读0次

成熟的微服务框架甚多,让人眼花缭乱,光是学会使用就得费老大劲儿,更不用说去理解其中的精髓了;辗转良久,最终决定自己搞一套,主要也是为了体验下设计框架时的种种思考,以便加深对微服务相关概念的理解;特在此记录其实现以供查阅。

rpc框架的设计

  1. 服务端
type Args struct {
    A int
    B int
}

type Reply struct {
    C int
}

type Arith int

func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
    reply.C = args.A * args.B
    return nil
}
func main() {
    s := server.NewServer()
    s.RegisterName("Arith", new(Arith))  //注册服务实例
    s.Serve("tcp", ":8972")  //开启监听
}
  • 服务实例注册
    直接使用go rpc标准库的register方法来注册,将其中的suitableMethods方法改动一下,以支持context参数,关键代码如下:
        // Method needs four ins: receiver, context.Context, *args, *reply.
        if mtype.NumIn() != 4 {
            if reportErr {
                log.Errorf("rpc.Register: method %q has %d input parameters; needs exactly three\n", mname, mtype.NumIn())
            }
            continue
        }

        ctxType := mtype.In(1)
        if !ctxType.Implements(contextType) {
            if reportErr {
                log.Errorf("rpc.Register: context type of method %q is not implemented: %q\n", mname, ctxType)
            }
        }

        // First arg need not be a pointer.
        argType := mtype.In(2)
        if !isExportedOrBuiltinType(argType) {
            if reportErr {
                log.Errorf("rpc.Register: argument type of method %q is not exported: %q\n", mname, argType)
            }
            continue
        }
        // Second arg must be a pointer.
        replyType := mtype.In(3)
        if replyType.Kind() != reflect.Ptr {
            if reportErr {
                log.Errorf("rpc.Register: reply type of method %q is not a pointer: %q\n", mname, replyType)
            }
            continue
        }
        // Reply type must be exported.
        if !isExportedOrBuiltinType(replyType) {
            if reportErr {
                log.Errorf("rpc.Register: reply type of method %q is not exported: %q\n", mname, replyType)
            }
            continue
        }
  • 监听接收rpc连接
    一个连接一个goroutine
func (server *Server) Serve(network, address string) {
    ln, err := net.Listen(network, address)   
    if err != nil {
        log.Fatal("listen error:", err)
    }
     ........
    server.serve(ln)
}

func (server *Server) serve(l net.Listener) error {
    var tempDelay time.Duration // how long to sleep on accept failure
    for {
        rw, e := l.Accept()  //接收连接
        if e != nil {
            select {
            case <-server.getDoneChan():
                return ErrServerClosed
            default:
            }
            if ne, ok := e.(net.Error); ok && ne.Temporary() {
                if tempDelay == 0 {
                    tempDelay = 5 * time.Millisecond
                } else {
                    tempDelay *= 2
                }
                if max := 1 * time.Second; tempDelay > max {
                    tempDelay = max
                }
                time.Sleep(tempDelay)
                continue
            }
            return e
        }
        tempDelay = 0
        if tc, ok := rw.(*net.TCPConn); ok {
            tc.SetKeepAlive(true)
            tc.SetKeepAlivePeriod(3 * time.Minute)
        }

        ec := newEasyConn(server, rw)   //创建连接对象
        go ec.serveConn()  //一个连接一个goroutine
    }
}
  • rpc连接的处理
    从连接中读取数据,并设置读超时时间,如果读超时或者出现其他错误,则关闭连接,否则走处理数据流程。

func (ec *easyConn) serveConn() {
    defer func() {
        log.Infof("serveConn exit")
        if err := recover(); err != nil {
            const size = 64 << 10
            buf := make([]byte, size)
            ss := runtime.Stack(buf, false)
            if ss > size {
                ss = size
            }
            buf = buf[:ss]
        }
        ec.rwc.Close()

    }()

    r := bufio.NewReaderSize(ec.rwc, 1024)

    for {
        ec.rwc.SetReadDeadline(time.Now().Add(time.Duration(ec.maxIdleTime) * time.Second))
        req, err := ec.readRequest(r)
        if err != nil {
            if err != io.EOF {
                log.Errorf("readRequest error %v", err)
            }
            return
        }

        if req.IsHeartbeat() {
            //log.Debugf("server receives heartbeat at time %d", time.Now().Unix())
            req.SetMessageType(protocol.Response)
            ec.writeResponse(req)
            protocol.FreeMsg(req)
            continue
        }
        ctx := context.WithValue(context.Background(), ConnDataKey{}, ec)
        ec.server.jobChan <- &workerJob{
            ctx:  ctx,
            conn: ec,
            req:  req,
        }
    }
}

2.客户端

type RPCClient struct {
    network     string
    address     string
    servicePath string

    reconnectTryNums int
    //  codec ClientCodec
    conn net.Conn
    //reqMutex sync.Mutex // protects following
    //request  Request
    heartBeatTryNums  int
    heartBeatTimeout  int
    heartBeatInterval int64

    mutex    sync.Mutex // protects following
    seq      uint64
    pending  map[uint64]*Call
    lastSend int64
    status   ConnStatus
    suicide  bool

    DialTimeout time.Duration
    doneChan chan struct{}
}
  • 客户端的创建
    每个rpc客户端都有两个goroutine,一个用于接收服务端发来的数据响应;一个用于维持客户端和服务端的心跳。
func NewRPCClient(network, address, servicePath string, dialTimeout time.Duration) (*RPCClient, error) {
    log.Infof("create rpc client for netword %s address %s service %s", network, address, servicePath)
    var conn net.Conn
    var err error
    if network == "tcp" {
        conn, err = Dial(network, address, dialTimeout)
        if err != nil {
            return nil, err
        }
    } else if network == "http" {
        conn, err = DialHTTP(network, address, dialTimeout)
        if err != nil {
            return nil, err
        }
    } else {
        return nil, fmt.Errorf("unsupport network %s", network)
    }

    client := &RPCClient{
        network:           network,
        address:           address,
        servicePath:       strings.ToLower(servicePath),
        conn:              conn,
        pending:           make(map[uint64]*Call),
        heartBeatInterval: defHeatBeatInterval,
        doneChan:          make(chan struct{}),
        DialTimeout:       dialTimeout,
    }

    go client.input()
    go client.keepalive()
    return client, nil
}

3.客户端请求和响应

  • 发起请求
    根据请求参数,创建call结构,调用client.send发送请求;
    在send方法中,根据call数据,创建对应的request结构体,再将request序列化成字节流发送到服务端;
func (client *RPCClient) Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *Call, options ...BeforeOrAfterCallOption) *Call {
    call := new(Call)
    call.ServicePath = client.servicePath
    call.ServiceMethod = serviceMethod
    call.ctx = ctx
    call.Args = args
    call.Reply = reply
    call.serializeType = protocol.MsgPack
    for _, opt := range options {
        if opt.after {
            call.AfterCalls = append(call.AfterCalls, opt.option)
        } else {
            opt.option(call)
        }
    }

    if done == nil {
        done = make(chan *Call, 10) // buffered.
    } else {
        // If caller passes done != nil, it must arrange that
        // done has enough buffer for the number of simultaneous
        // RPCs that will be using that channel. If the channel
        // is totally unbuffered, it's best not to run at all.
        if cap(done) == 0 {
            log.Panic("rpc: done channel is unbuffered")
        }
    }
    call.Done = done
    client.send(call)
    return call
}



func (client *RPCClient) send(call *Call) {

    // Register this call.
    client.mutex.Lock()
    if client.status != ConnAvailable {
        client.mutex.Unlock()
        call.Error = ErrShutdown
        call.done()
        return
    }

    seq := atomic.AddUint64(&client.seq, 1)
    client.pending[seq] = call
    client.lastSend = time.Now().Unix()
    rawConn := client.conn
    client.mutex.Unlock()

    req, err := client.createRequest(call, seq)
    if err != nil {
        client.mutex.Lock()
        call = client.pending[seq]
        delete(client.pending, seq)
        client.mutex.Unlock()
        if call != nil {
            call.Error = err
            call.done()
        }
        return
    }

    _, err = rawConn.Write(req.Encode())
    if err != nil {
        client.mutex.Lock()
        call = client.pending[seq]
        delete(client.pending, seq)
        client.mutex.Unlock()
        if call != nil {
            call.Error = err
            call.done()
        }
    }
}
  • 服务端响应
    从服务端读取响应,判断是否有错误发生;如果发生错误且不是客户端主动关闭情况,则调用client.reconnect进行重连;如果没有错误发生,则根据传输协议格式解析字节流,并通知请求调用方。

func (client *RPCClient) input() {
    var err error
    resp := protocol.NewMessage()

    for {
        _, err = client.readResponse(resp)
        if err != nil {
            if client.suicide {
                break
            }
            log.Errorf("readResponse error %+v client %+v", err, client)
            err = client.reconnect()
            if err != nil {
                break
            } else {
                continue
            }
        }

        seq := resp.Seq()
        client.mutex.Lock()
        call := client.pending[seq]
        delete(client.pending, seq)
        client.mutex.Unlock()

        if resp.IsHeartbeat() {
            call.done()
            continue
        }

        if call != nil {
            err = checkReplyError(resp)
            if err != nil {
                call.Error = err
                call.done()
                continue
            }

            codec := share.Codecs[resp.SerializeType()]
            if codec == nil {
                err = fmt.Errorf("can not find codec for %d", resp.SerializeType())
                call.Error = err
                call.done()
                continue
            }

            err = codec.Decode(resp.Payload, call.Reply)
            if err != nil {
                call.Error = err
                call.done()
                continue
            }

            if resp.Metadata != nil {
                call.Metadata = resp.Metadata
            }
            for _, afterCall := range call.AfterCalls {
                afterCall(call)
            }
            call.done()
        }
    }

    client.mutex.Lock()
    if client.status == ConnReconnectFail {
        client.close(err, false)
    }
    client.mutex.Unlock()
    log.Infof("client input goroutine exit")
}

(未完待续...)
源码地址:https://github.com/masonchen2014/easymicro

相关文章

  • 从零实现简单的go微服务治理框架

    成熟的微服务框架甚多,让人眼花缭乱,光是学会使用就得费老大劲儿,更不用说去理解其中的精髓了;辗转良久,最终决定自己...

  • golang 进阶知识点

    Go 架构实践 - 微服务(微服务概览与治理) 微服务的原理、概念,以及微服务的实现细节 API Gateway、...

  • 开箱即用的微服务框架 Go-zero(进阶篇)

    之前我们简单介绍过 Go-zero 详见《Go-zero:开箱即用的微服务框架》。这次我们从动手实现一个 Blog...

  • spring-cloud_Eureka

    Eureka完成微服务架构中的服务治理功能。实现各个微服务实例的自动化注册与发现。 服务注册:在服务治理框架中,通...

  • Spring Cloud-1.服务治理

    服务治理 服务治理用来实现各个微服务示例的自动化注册与发现。 服务治理围绕“服务注册”和“服务发现”机制来完成对微...

  • Dubbo

    Dubbo(服务治理框架) RPC 各服务都要实现rpc协议,才能实现服务间的调用 rpc:远程过程调用协议,是一...

  • 服务治理:Spring Cloud Eureka

    服务治理 主要作用是实现各个微服务实例的自动化注册与发现。 服务注册 在服务治理框架中,通常会构建一个注册中心,每...

  • SpringCloud-Eureka

    Springcloud封装了Netfix公司开发的Eureka来实现服务治理。在传统的RPC框架中,管理每个服务和...

  • RPC协议及实现方式(分布式微服务治理的核心)

    分布式微服务治理的核心在于: 微服务和分布式 (微服务框架)微服务的最优技术实现目前是: SpringBoot (...

  • Go Micro Restful Service Demo

    本文介绍如何从零开始,使用 Go 语言的一些有代表性的框架,实现一个微服务架构的 Restful Web Serv...

网友评论

      本文标题:从零实现简单的go微服务治理框架

      本文链接:https://www.haomeiwen.com/subject/uisegctx.html