写在开头
本文仅用于自己学习
1.引入
1.1微服务是什么
微服务是一种软件架构方式,将应用构建成一系列按照业务领域划分的模块、小的自治服务。比如商城服务划分为登录模块和购物车模块等,每个模块/服务都是自我包含,并且业务功能单一,即一个系统划分多个子系统,每个字系统可独立运行且完整,子系统间可以按照一定的协议互相通信
image.png2.2RPC是什么
RPC是远程过程调用协议Remote Procedure Call。RPC在微服务中起到非常大的作用,通过RPC我们可以像调用本地方法一样调用其他主机上的方法,用户将无感服务器与服务器间的通讯。这也意味着RPC需要解决以下问题:
- 目标服务器的函数ID。具体表现为需要获知调用哪个函数,服务器与服务器间如何约定和确定,即寻址
- 通讯协议
-
数据的传输问题,即数据序列化
image.png
这是一次RPC调用的大致流程,初步一看和http请求很像(实际上rpc中的服务注册、发现、负载均衡和限流上面都没有体现出来)
RPC协议本质上是一种实现上述三个要求的通信流程,具体实现技术没有约束,比如我们可以规定自己的RPC请求/响应包含消息体和消息头,使用gob/json/xml等来序列化消息体,使用socket/tcp/http2来进行网络通信
2.GO的内置RPC使用
一个简单的例子server.go
这里我们将Helloserve看作是一个打印hello的服务
这里需要注意服务端定义rpc结构体的时候,响应数据需要使用指针接受结果
func (s *HelloServe) Hello(request string, reply *string) error
否则会报错
rpc.Register: type hello has no exported methods of suitable type
demo
type HelloServe struct{}
func (s *HelloServe) Hello(request string, reply *string) error {
*reply = "hello " + request
return nil
}
func main() {
ln, err := net.Listen("tcp", ":8801")
if err != nil {
fmt.Printf("listen fail:%s\n", err)
return
}
//注册服务
//_ = rpc.Register(&HelloServe{})
_ = rpc.RegisterName("HelloServe", &HelloServe{})
for {
con, err := ln.Accept()
if err != nil {
fmt.Printf("accept fail:%s\n", err)
return
}
go rpc.ServeConn(con)
}
}
client.go
func RpcClient() {
client, _ := rpc.Dial("tcp", ":8801")
var res string
_ = client.Call("HelloServe.Hello", "myname", &res)
fmt.Println(res)
}
1.如何寻址
客户端client.go想要调用服务端的HelloServe结构体的hello方法
//client.go
client.Call("HelloServe.Hello", "myname", &res)
同时并向函数传入myname的参数,将结果保存在res中
//server.go
_ = rpc.RegisterName("HelloServe", &HelloServe{})
服务端通过内置的RegisterName的方式实现服务的注册
这就是两者互相约定好的,对函数的寻址格式
2.使用的通讯协议,tcp通讯(rpc并不是通讯协议,上面解释过了)
3.数据序列化,内置默认采用gob编码
客户端Call与GO函数的区别
还要注意client.Call是同步调用的,另一个client.GO是异步的,call内部实际就是调用GO的,其在调用完成之前会被阻塞在chan上,因此后续RPC发送必须等待
服务端函数返回了Error怎么办
server.go
func (s HelloServer) Hello(str string, res *string) error {
*res = "hello" + str
return errors.New("by")
}
client.go
err = con.Call(同上XXXXX, "zjb", &str)
if err != nil {
log.Fatal(err)
return
}
fmt.Println("xxxxx")
fmt.Println(err)
fmt.Println(str)
客户端响应结果
2023/11/28 09:19:40 by
exit status 1
即使在函数中修改了res的值客户端也得不到,这里直接进入了error!=nil的分支
请求与响应使用了结构体怎么办
- rpc 客户端请求的结构体和服务端的结构体名称不必完全一致
- rpc 客户端中的结构体,属性名和类型要和服务端一致,否则出现一个不致的就到直接报错,即接口(interface)类型一致
2.1源码查看
2.1.1 Register
rpc.Register是对内置变量DefaultServer的方法包装(这是不是让我们想起了http.HandlerFunc也是默认注册到一个DefaultMutex的包变量中)
func Register(rcvr any) error { return DefaultServer.Register(rcvr) }
DefaultServer是一个rpc.Server结构体实例,一个实例表示一个rpc服务器(对应上文的helloserve结构体)
// Server represents an RPC Server.
type Server struct {
serviceMap sync.Map // map[string]*service
reqLock sync.Mutex // protects freeReq
freeReq *Request
respLock sync.Mutex // protects freeResp
freeResp *Response
}
func (server *Server) Register(rcvr any) error {
return server.register(rcvr, "", false)
}
type service struct {
name string // name of service
rcvr reflect.Value // receiver of methods for the service
typ reflect.Type // type of the receiver
method map[string]*methodType // registered methods
}
进入Server.register函数,dlv获取函数输入如下
image.png
reflect.Indirect(v).Type()和reflect.TypeOf(u)的区别
如文档所解释,indirect返回v所指向的值:
1.v是一个nil指针,则返回零值
2.v不是指针则返回v
3.v是指针则返回v所指向的值
所以上文sname 值为HelloServe,之后将这个值作为注册服务的名字存入services.name中(即HelloServe的服务被抽象成service结构体)
s.name = sname
s.method = suitableMethods(s.typ, logRegisterError)
最后就是挂载到Server的线程安全map中(Server.serviceMap类型为sync.Map)
RegisterName实际上就是对Register的包装
小结Register:
- 通过反射获取结构体变量结构体名
- 将结构体的方法存入service中
- 将结构体名作为键,变量作为值挂载到map中
2.1.2 消息的编码与解码
ServeConn实际上是对ServeCode方法的包装,其中我们主要关注消息的序列化
type gobServerCodec struct {
rwc io.ReadWriteCloser
dec *gob.Decoder
enc *gob.Encoder
encBuf *bufio.Writer
closed bool
}
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
buf := bufio.NewWriter(conn)
srv := &gobServerCodec{
rwc: conn,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
encBuf: buf,
}
server.ServeCodec(srv)
}
这就引入了默认的数据序列化方式gob
服务端负责解码
type ServerCodec interface {
ReadRequestHeader(*Request) error
ReadRequestBody(any) error
WriteResponse(*Response, any) error
// Close can be called multiple times and must be idempotent.
Close() error
}
对应的客户端编码
type ClientCodec interface {
WriteRequest(*Request, any) error
ReadResponseHeader(*Response) error
ReadResponseBody(any) error
Close() error
}
服务端负责序列化的结构gobServerCodec的实现了ServerCodec接口,服务端需要编解码消息的地方,都会调用gobServerCodec的对应方法(客户端也是类似的实现,也是一样使用gob编解码)。
回到ServeCode
sending := new(sync.Mutex)
wg := new(sync.WaitGroup)
for {
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
wg.Add(1)
go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
}
wg.Wait()
codec.Close()
}
使用wg确保请求的完整
根据编码方式解析出以下信息:
- service请求的服务
- mtype 请求服务的方法
- 请求信息和方法参数
- 方法的返回值
同时对于每个请求都会开启一个单独的协程来处理
go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
3.替换RPC的序列化协议为JSON
同理就是jsonrpc的调用
4.替换RPC通讯协议为http
使用rpc.ServeRequest代理请求
func main() {
rpc.RegisterName("hello", &HelloServer{})
http.HandleFunc("/rpc", func(w http.ResponseWriter, r *http.Request) {
var conn io.ReadWriteCloser = struct {
io.Writer
io.ReadCloser
}{
Writer: w,
ReadCloser: r.Body,
}
rpc.ServeRequest(jsonrpc.NewServerCodec(conn))
})
http.ListenAndServe(":8080", nil)
}
这里我使用postman进行json数据的发送
image.png
网友评论