美文网首页gprc
Fabric源码基础-grpc的使用02

Fabric源码基础-grpc的使用02

作者: 史圣杰 | 来源:发表于2018-12-18 14:27 被阅读0次

    Fabric的节点通过grpc向内部或外部提供接口,在学习源码之前,需要对grpc的基本使用有所了解,并了解如何在grpc中配置tls。因此,本节整理了go语言中grpc四种类型的服务模式

    示例来源:http://doc.oschina.net/grpc?t=60133
    具体代码可见:https://github.com/ssj234/fabric/tree/master/grpc-go

    2.定义服务

    2.1 定义服务

    RouteGuide服务有四个方法,代表了四种模式,

    service RouteGuide {
        // 根据点获取Feature
        rpc GetFeature(Point) returns (Feature){}
        // 矩形内包含多个点,获取矩形内点对于的Feature,返回是一个流类型
        rpc ListFeatures(Rectangle) returns (stream Feature){}
        // 输入是一个流,服务器可以不断的获取客户端发生的点,最后返回RouteSummary
        rpc RecordRoute(stream Point) returns (RouteSummary){}
       // 输入和输出都是流式的,可以不断的获取输入并返回
        rpc RouteChat(stream RouteNote) returns (stream RouteNote){}
    }
    

    2.2生成代码

    使用protoc3可以生成go语言代码

    protoc --go_out=plugins=grpc:. grpc_guide.proto
    

    2.3 非stream服务

    服务器端程序在使用grpc时,首先需要定义一个结构体,并为其实现服务

    type routeGuideServer struct {
        savedFeatures []*pb.Feature // 初始化后就是只读的了
        mu         sync.Mutex // 多线程,保护routeNotes
        routeNotes map[string][]*pb.RouteNote // 保存了名称与RouteNote
    }
    

    服务端程序
    对于非stream服务,我们需要实现rpc GetFeature(Point) returns (Feature){},逻辑是通过客户端发生的Point,在savedFeatures中查找这个点所在的feature,如果没有有将传入的Point包装为Feature并返回。

    // 通过Point查找Feature,Feature有个变量叫做location
    func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
        for _,feature := range s.savedFeatures{
            if proto.Equal(feature.Location,point){
                return feature,nil
            }
        }
        // 没有的话就创建一个,设置location为point
        return &pb.Feature{Location:point},nil
    }
    

    客户端程序

    客户端程序与上一章节中一样,建立连接后获取client并调用即可

    var opts []grpc.DialOption
        opts = append(opts, grpc.WithInsecure())
        conn, err := grpc.Dial("127.0.0.1:9999", opts...)
        if err != nil {
            log.Fatalf("fail to dial: %v", err)
        }
        defer conn.Close()
        client := pb.NewRouteGuideClient(conn) // 使用pb生成client
    
        // 调用GetFeature
        feature, err := client.GetFeature(context.Background(), &pb.Point{Latitude:409146138, Longitude:-746188906})
        if err != nil {
            log.Fatalf("%v.GetFeatures(_) = _, %v: ", client, err)
        }
        log.Println(feature)
    

    2.3 服务端stream服务

    服务端程序
    服务端steam服务是服务器收到客户端的请求后不断的返回数据,rpc ListFeatures(Rectangle) returns (stream Feature){} 这个服务用来返回矩形内的Feature

    func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
         // 遍历savedFeatures,查看是否在rect里面
        for _, feature := range s.savedFeatures {
            if inRange(feature.Location, rect) {
                if err := stream.Send(feature);err != nil{
                    return err
                }
            }
        }
        return nil
    }
    

    客户端程序

    point1 := &pb.Point{Longitude:-743977337,Latitude:407033786}
        point2 := &pb.Point{Longitude:-740477477,Latitude:414653148}
        rectangle := &pb.Rectangle{Lo:point1,Hi:point2}
        stream, err := client.ListFeatures(context.Background(),rectangle)
        for {
            feature, err := stream.Recv()
            if err == io.EOF {
                break
            }
            if err != nil {
                log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
            }
            log.Println(feature)
        }
    

    2.4 客户端stream服务

    rpc RecordRoute(stream Point) returns (RouteSummary){},这个服务不断的接收客户端请求,最后收到EOF后返回RouteSummary

    服务端程序

    func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
        var pointCount, featureCount, distance int32
        var lastPoint *pb.Point // 最后一个点
        startTime := time.Now() // 开始时间
        for{  // 不断的收到客户端轻轻,直到EOF
            point,err := stream.Recv()
            if err == io.EOF{
                endTime := time.Now()
                return stream.SendAndClose(&pb.RouteSummary{
                    PointCount:pointCount,
                    FeatureCount:featureCount,
                    Distance:distance,
                    ElapsedTime:int32(endTime.Sub(startTime).Seconds()),
                })
            }
            if err != nil{
                return  err
            }
            pointCount++
            for _,feature := range s.savedFeatures{
                if proto.Equal(feature.Location,point){
                    featureCount++
                }
            }
    
            if lastPoint != nil{
                distance += calcDistance(lastPoint,point)
            }
            lastPoint = point
        }
    
    }
    

    客户端程序

    // 构造一些point
        r := rand.New(rand.NewSource(time.Now().UnixNano()))
        pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
        var points []*pb.Point
        for i := 0; i < pointCount; i++ {
            points = append(points, randomPoint(r))
        }
        // 调用服务
        stream, err := client.RecordRoute(context.Background())
        for _, point := range points {
            if err := stream.Send(point); err != nil {
                log.Fatalf("%v.Send(%v) = %v", stream, point, err)
            }
        }
        // 结束调用
        reply, err := stream.CloseAndRecv()
        log.Printf("Route summary: %v", reply)
    

    2.5 双向stream服务

    服务端程序

    func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
    
        for{
            in,err := stream.Recv() // RouteNote error
            if err == io.EOF{
                return nil
            }
            if err != nil{
                return  err
            }
    
            key := serialize(in.Location)
            s.mu.Lock()
            s.routeNotes[key] = append(s.routeNotes[key],in)
            rn := make([]*pb.RouteNote, len(s.routeNotes[key]))
            copy(rn,s.routeNotes[key])
            s.mu.Unlock()
    
            for _, note := range rn { // 不断的发出
                if err := stream.Send(note); err != nil {
                    return err
                }
            }
    
        }
    }
    

    客户端程序

    stream, err := client.RouteChat(context.Background())
        // 由于在发送RouteNode的同时,还要接收RouteNode,因此使用go
        waitc := make(chan struct{})
        go func() {
            for {
                in, err := stream.Recv()
                if err == io.EOF {
                    close(waitc) // 关闭chan
                    return
                }
                log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
            }
        }()
    
        for _, note := range getNodes() {
            if err := stream.Send(note); err != nil {
                log.Fatalf("Failed to send a note: %v", err.Error())
            }
        }
        stream.CloseSend()
        <-waitc // 等待chan关闭
    

    相关文章

      网友评论

        本文标题:Fabric源码基础-grpc的使用02

        本文链接:https://www.haomeiwen.com/subject/mjbkkqtx.html