成熟的微服务框架甚多,让人眼花缭乱,光是学会使用就得费老大劲儿,更不用说去理解其中的精髓了;辗转良久,最终决定自己搞一套,主要也是为了体验下设计框架时的种种思考,以便加深对微服务相关概念的理解;特在此记录其实现以供查阅。
rpc框架的设计
- 服务端
type Args struct {
A int
B int
}
type Reply struct {
C int
}
type Arith int
func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
reply.C = args.A * args.B
return nil
}
func main() {
s := server.NewServer()
s.RegisterName("Arith", new(Arith)) //注册服务实例
s.Serve("tcp", ":8972") //开启监听
}
- 服务实例注册
直接使用go rpc标准库的register方法来注册,将其中的suitableMethods方法改动一下,以支持context参数,关键代码如下:
// Method needs four ins: receiver, context.Context, *args, *reply.
if mtype.NumIn() != 4 {
if reportErr {
log.Errorf("rpc.Register: method %q has %d input parameters; needs exactly three\n", mname, mtype.NumIn())
}
continue
}
ctxType := mtype.In(1)
if !ctxType.Implements(contextType) {
if reportErr {
log.Errorf("rpc.Register: context type of method %q is not implemented: %q\n", mname, ctxType)
}
}
// First arg need not be a pointer.
argType := mtype.In(2)
if !isExportedOrBuiltinType(argType) {
if reportErr {
log.Errorf("rpc.Register: argument type of method %q is not exported: %q\n", mname, argType)
}
continue
}
// Second arg must be a pointer.
replyType := mtype.In(3)
if replyType.Kind() != reflect.Ptr {
if reportErr {
log.Errorf("rpc.Register: reply type of method %q is not a pointer: %q\n", mname, replyType)
}
continue
}
// Reply type must be exported.
if !isExportedOrBuiltinType(replyType) {
if reportErr {
log.Errorf("rpc.Register: reply type of method %q is not exported: %q\n", mname, replyType)
}
continue
}
- 监听接收rpc连接
一个连接一个goroutine
func (server *Server) Serve(network, address string) {
ln, err := net.Listen(network, address)
if err != nil {
log.Fatal("listen error:", err)
}
........
server.serve(ln)
}
func (server *Server) serve(l net.Listener) error {
var tempDelay time.Duration // how long to sleep on accept failure
for {
rw, e := l.Accept() //接收连接
if e != nil {
select {
case <-server.getDoneChan():
return ErrServerClosed
default:
}
if ne, ok := e.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
time.Sleep(tempDelay)
continue
}
return e
}
tempDelay = 0
if tc, ok := rw.(*net.TCPConn); ok {
tc.SetKeepAlive(true)
tc.SetKeepAlivePeriod(3 * time.Minute)
}
ec := newEasyConn(server, rw) //创建连接对象
go ec.serveConn() //一个连接一个goroutine
}
}
- rpc连接的处理
从连接中读取数据,并设置读超时时间,如果读超时或者出现其他错误,则关闭连接,否则走处理数据流程。
func (ec *easyConn) serveConn() {
defer func() {
log.Infof("serveConn exit")
if err := recover(); err != nil {
const size = 64 << 10
buf := make([]byte, size)
ss := runtime.Stack(buf, false)
if ss > size {
ss = size
}
buf = buf[:ss]
}
ec.rwc.Close()
}()
r := bufio.NewReaderSize(ec.rwc, 1024)
for {
ec.rwc.SetReadDeadline(time.Now().Add(time.Duration(ec.maxIdleTime) * time.Second))
req, err := ec.readRequest(r)
if err != nil {
if err != io.EOF {
log.Errorf("readRequest error %v", err)
}
return
}
if req.IsHeartbeat() {
//log.Debugf("server receives heartbeat at time %d", time.Now().Unix())
req.SetMessageType(protocol.Response)
ec.writeResponse(req)
protocol.FreeMsg(req)
continue
}
ctx := context.WithValue(context.Background(), ConnDataKey{}, ec)
ec.server.jobChan <- &workerJob{
ctx: ctx,
conn: ec,
req: req,
}
}
}
2.客户端
type RPCClient struct {
network string
address string
servicePath string
reconnectTryNums int
// codec ClientCodec
conn net.Conn
//reqMutex sync.Mutex // protects following
//request Request
heartBeatTryNums int
heartBeatTimeout int
heartBeatInterval int64
mutex sync.Mutex // protects following
seq uint64
pending map[uint64]*Call
lastSend int64
status ConnStatus
suicide bool
DialTimeout time.Duration
doneChan chan struct{}
}
- 客户端的创建
每个rpc客户端都有两个goroutine,一个用于接收服务端发来的数据响应;一个用于维持客户端和服务端的心跳。
func NewRPCClient(network, address, servicePath string, dialTimeout time.Duration) (*RPCClient, error) {
log.Infof("create rpc client for netword %s address %s service %s", network, address, servicePath)
var conn net.Conn
var err error
if network == "tcp" {
conn, err = Dial(network, address, dialTimeout)
if err != nil {
return nil, err
}
} else if network == "http" {
conn, err = DialHTTP(network, address, dialTimeout)
if err != nil {
return nil, err
}
} else {
return nil, fmt.Errorf("unsupport network %s", network)
}
client := &RPCClient{
network: network,
address: address,
servicePath: strings.ToLower(servicePath),
conn: conn,
pending: make(map[uint64]*Call),
heartBeatInterval: defHeatBeatInterval,
doneChan: make(chan struct{}),
DialTimeout: dialTimeout,
}
go client.input()
go client.keepalive()
return client, nil
}
3.客户端请求和响应
- 发起请求
根据请求参数,创建call结构,调用client.send发送请求;
在send方法中,根据call数据,创建对应的request结构体,再将request序列化成字节流发送到服务端;
func (client *RPCClient) Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *Call, options ...BeforeOrAfterCallOption) *Call {
call := new(Call)
call.ServicePath = client.servicePath
call.ServiceMethod = serviceMethod
call.ctx = ctx
call.Args = args
call.Reply = reply
call.serializeType = protocol.MsgPack
for _, opt := range options {
if opt.after {
call.AfterCalls = append(call.AfterCalls, opt.option)
} else {
opt.option(call)
}
}
if done == nil {
done = make(chan *Call, 10) // buffered.
} else {
// If caller passes done != nil, it must arrange that
// done has enough buffer for the number of simultaneous
// RPCs that will be using that channel. If the channel
// is totally unbuffered, it's best not to run at all.
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
call.Done = done
client.send(call)
return call
}
func (client *RPCClient) send(call *Call) {
// Register this call.
client.mutex.Lock()
if client.status != ConnAvailable {
client.mutex.Unlock()
call.Error = ErrShutdown
call.done()
return
}
seq := atomic.AddUint64(&client.seq, 1)
client.pending[seq] = call
client.lastSend = time.Now().Unix()
rawConn := client.conn
client.mutex.Unlock()
req, err := client.createRequest(call, seq)
if err != nil {
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
if call != nil {
call.Error = err
call.done()
}
return
}
_, err = rawConn.Write(req.Encode())
if err != nil {
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
if call != nil {
call.Error = err
call.done()
}
}
}
- 服务端响应
从服务端读取响应,判断是否有错误发生;如果发生错误且不是客户端主动关闭情况,则调用client.reconnect进行重连;如果没有错误发生,则根据传输协议格式解析字节流,并通知请求调用方。
func (client *RPCClient) input() {
var err error
resp := protocol.NewMessage()
for {
_, err = client.readResponse(resp)
if err != nil {
if client.suicide {
break
}
log.Errorf("readResponse error %+v client %+v", err, client)
err = client.reconnect()
if err != nil {
break
} else {
continue
}
}
seq := resp.Seq()
client.mutex.Lock()
call := client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
if resp.IsHeartbeat() {
call.done()
continue
}
if call != nil {
err = checkReplyError(resp)
if err != nil {
call.Error = err
call.done()
continue
}
codec := share.Codecs[resp.SerializeType()]
if codec == nil {
err = fmt.Errorf("can not find codec for %d", resp.SerializeType())
call.Error = err
call.done()
continue
}
err = codec.Decode(resp.Payload, call.Reply)
if err != nil {
call.Error = err
call.done()
continue
}
if resp.Metadata != nil {
call.Metadata = resp.Metadata
}
for _, afterCall := range call.AfterCalls {
afterCall(call)
}
call.done()
}
}
client.mutex.Lock()
if client.status == ConnReconnectFail {
client.close(err, false)
}
client.mutex.Unlock()
log.Infof("client input goroutine exit")
}
(未完待续...)
源码地址:https://github.com/masonchen2014/easymicro
网友评论