Fabric的节点通过grpc向内部或外部提供接口,在学习源码之前,需要对grpc的基本使用有所了解,并了解如何在grpc中配置tls。因此,本节整理了go语言中grpc四种类型的服务模式
示例来源:http://doc.oschina.net/grpc?t=60133
具体代码可见:https://github.com/ssj234/fabric/tree/master/grpc-go
2.定义服务
2.1 定义服务
RouteGuide服务有四个方法,代表了四种模式,
service RouteGuide {
// 根据点获取Feature
rpc GetFeature(Point) returns (Feature){}
// 矩形内包含多个点,获取矩形内点对于的Feature,返回是一个流类型
rpc ListFeatures(Rectangle) returns (stream Feature){}
// 输入是一个流,服务器可以不断的获取客户端发生的点,最后返回RouteSummary
rpc RecordRoute(stream Point) returns (RouteSummary){}
// 输入和输出都是流式的,可以不断的获取输入并返回
rpc RouteChat(stream RouteNote) returns (stream RouteNote){}
}
2.2生成代码
使用protoc3可以生成go语言代码
protoc --go_out=plugins=grpc:. grpc_guide.proto
2.3 非stream服务
服务器端程序在使用grpc时,首先需要定义一个结构体,并为其实现服务
type routeGuideServer struct {
savedFeatures []*pb.Feature // 初始化后就是只读的了
mu sync.Mutex // 多线程,保护routeNotes
routeNotes map[string][]*pb.RouteNote // 保存了名称与RouteNote
}
服务端程序
对于非stream服务,我们需要实现rpc GetFeature(Point) returns (Feature){}
,逻辑是通过客户端发生的Point,在savedFeatures中查找这个点所在的feature,如果没有有将传入的Point包装为Feature并返回。
// 通过Point查找Feature,Feature有个变量叫做location
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
for _,feature := range s.savedFeatures{
if proto.Equal(feature.Location,point){
return feature,nil
}
}
// 没有的话就创建一个,设置location为point
return &pb.Feature{Location:point},nil
}
客户端程序
客户端程序与上一章节中一样,建立连接后获取client并调用即可
var opts []grpc.DialOption
opts = append(opts, grpc.WithInsecure())
conn, err := grpc.Dial("127.0.0.1:9999", opts...)
if err != nil {
log.Fatalf("fail to dial: %v", err)
}
defer conn.Close()
client := pb.NewRouteGuideClient(conn) // 使用pb生成client
// 调用GetFeature
feature, err := client.GetFeature(context.Background(), &pb.Point{Latitude:409146138, Longitude:-746188906})
if err != nil {
log.Fatalf("%v.GetFeatures(_) = _, %v: ", client, err)
}
log.Println(feature)
2.3 服务端stream服务
服务端程序
服务端steam服务是服务器收到客户端的请求后不断的返回数据,rpc ListFeatures(Rectangle) returns (stream Feature){}
这个服务用来返回矩形内的Feature
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
// 遍历savedFeatures,查看是否在rect里面
for _, feature := range s.savedFeatures {
if inRange(feature.Location, rect) {
if err := stream.Send(feature);err != nil{
return err
}
}
}
return nil
}
客户端程序
point1 := &pb.Point{Longitude:-743977337,Latitude:407033786}
point2 := &pb.Point{Longitude:-740477477,Latitude:414653148}
rectangle := &pb.Rectangle{Lo:point1,Hi:point2}
stream, err := client.ListFeatures(context.Background(),rectangle)
for {
feature, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
}
log.Println(feature)
}
2.4 客户端stream服务
rpc RecordRoute(stream Point) returns (RouteSummary){}
,这个服务不断的接收客户端请求,最后收到EOF后返回RouteSummary
服务端程序
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
var pointCount, featureCount, distance int32
var lastPoint *pb.Point // 最后一个点
startTime := time.Now() // 开始时间
for{ // 不断的收到客户端轻轻,直到EOF
point,err := stream.Recv()
if err == io.EOF{
endTime := time.Now()
return stream.SendAndClose(&pb.RouteSummary{
PointCount:pointCount,
FeatureCount:featureCount,
Distance:distance,
ElapsedTime:int32(endTime.Sub(startTime).Seconds()),
})
}
if err != nil{
return err
}
pointCount++
for _,feature := range s.savedFeatures{
if proto.Equal(feature.Location,point){
featureCount++
}
}
if lastPoint != nil{
distance += calcDistance(lastPoint,point)
}
lastPoint = point
}
}
客户端程序
// 构造一些point
r := rand.New(rand.NewSource(time.Now().UnixNano()))
pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
var points []*pb.Point
for i := 0; i < pointCount; i++ {
points = append(points, randomPoint(r))
}
// 调用服务
stream, err := client.RecordRoute(context.Background())
for _, point := range points {
if err := stream.Send(point); err != nil {
log.Fatalf("%v.Send(%v) = %v", stream, point, err)
}
}
// 结束调用
reply, err := stream.CloseAndRecv()
log.Printf("Route summary: %v", reply)
2.5 双向stream服务
服务端程序
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
for{
in,err := stream.Recv() // RouteNote error
if err == io.EOF{
return nil
}
if err != nil{
return err
}
key := serialize(in.Location)
s.mu.Lock()
s.routeNotes[key] = append(s.routeNotes[key],in)
rn := make([]*pb.RouteNote, len(s.routeNotes[key]))
copy(rn,s.routeNotes[key])
s.mu.Unlock()
for _, note := range rn { // 不断的发出
if err := stream.Send(note); err != nil {
return err
}
}
}
}
客户端程序
stream, err := client.RouteChat(context.Background())
// 由于在发送RouteNode的同时,还要接收RouteNode,因此使用go
waitc := make(chan struct{})
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
close(waitc) // 关闭chan
return
}
log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
}
}()
for _, note := range getNodes() {
if err := stream.Send(note); err != nil {
log.Fatalf("Failed to send a note: %v", err.Error())
}
}
stream.CloseSend()
<-waitc // 等待chan关闭
网友评论