美文网首页
gRPC实现Flutter客户端go语言服务端异步通信

gRPC实现Flutter客户端go语言服务端异步通信

作者: 渣渣曦 | 来源:发表于2020-05-02 16:25 被阅读0次

    本文中实现以下场景:

    1. 移动端应用户可以发送信息到服务端并异步接收服务端回复。使用Dart语言和Flutter框架。
    2. 服务端使用go语言开发。通过gRPC实现Send方法接收客户端信息。服务端回复信息通过gRPC stream实现Subscribe方法。
    3. 使用protobuf定义的API实现客户/服务端通信。

    客户端实现应用如下图:


    image.png

    前提

    • 因为将使用Go modules,所以安装Go语言1.11以上版本。
    • 尽管在安装Flutter已经包含了Dart SDK,但还需要安装Dart 2的服务端SDK来生成protofbuf API的Dart文件。
    • 安装Flutter 1.0以上。
    • 安装protobuf编译器。
    • 安装Go的Proto编译器生成插件:
    go get -u github.com/golang/protobuf/protoc-gen-go
    
    • 安装Dart的protoc插件:
    pub global activate protoc_plugin
    

    项目结构

    代码结构如下:

    flutter-grpc-tutorial (root folder)
           |
           |----- api (proto API definition)
           |----- flutter_client (chat client App)
           |----- go-server (echo chat server)
           |----- third_party (miscellaneous files are needed to compile Go and Dart files from proto)
    

    三方文件

    image.png
    从该地址下载empty.proto,timestamp.protowrappers.proto三个文件放到flutter-grpc-tutorial/thrid_party/google/protobuf文件夹下。

    注:protoc-gen.cmd文件可选——不必关注。这只是一个windows下的proto编译脚本。

    定义客户/服务端通信API

    api文件夹中存放定义ChatService的文件。

    image.png

    Go服务端

    首先使用proto API定义文件生成Go语言的protobuf/grpc代码,在生成之前创建输出目录。

    cd flutter-grpc-tutorial
    mkdir -p go-server/pkg/api/v1
    

    然后生成Go代码:

    protoc chat.proto --proto_path=api/proto/v1 --proto_path=. --go_out=plugins=grpc:go-server/pkg/api/v1
    

    生成结果如下:

    image.png
    接着创建go-server/pkg/service/v1/chat.go文件——ChatService的实现。服务端接收信息并存储到channel中。Subscribe方法从channel中获取消息并发送返馈到客户端。实现代码非常简单:
    package v1
    
    import (
        "context"
        "fmt"
        "log"
    
        "github.com/golang/protobuf/ptypes/empty"
        "github.com/golang/protobuf/ptypes/wrappers"
    
        "github.com/amsokol/flutter-grpc-tutorial/go-server/pkg/api/v1"
    )
    
    // chatServiceServer is implementation of v1.ChatServiceServer proto interface
    type chatServiceServer struct {
        msg chan string
    }
    
    // NewChatServiceServer creates Chat service object
    func NewChatServiceServer() v1.ChatServiceServer {
        return &chatServiceServer{msg: make(chan string, 1000)}
    }
    
    // Send sends message to the server
    func (s *chatServiceServer) Send(ctx context.Context, message *wrappers.StringValue) (*empty.Empty, error) {
        if message != nil {
            log.Printf("Send requested: message=%v", *message)
            s.msg <- message.Value
        } else {
            log.Print("Send requested: message=<empty>")
        }
    
        return &empty.Empty{}, nil
    }
    
    // Subscribe is streaming method to get echo messages from the server
    func (s *chatServiceServer) Subscribe(e *empty.Empty, stream v1.ChatService_SubscribeServer) error {
        log.Print("Subscribe requested")
        for {
            m := <-s.msg
            n := v1.Message{Text: fmt.Sprintf("I have received from you: %s. Thanks!", m)}
            if err := stream.Send(&n); err != nil {
                // put message back to the channel
                s.msg <- m
                log.Printf("Stream connection failed: %v", err)
                return nil
            }
            log.Printf("Message sent: %+v", n)
        }
    }
    

    接着编写注册ChatService代码运行在gRPC服务中(go-server/pkg/protocol/grpc/server.go):

    package grpc
    
    import (
        "context"
        "log"
        "net"
    
        "google.golang.org/grpc"
    
        "github.com/amsokol/flutter-grpc-tutorial/go-server/pkg/api/v1"
    )
    
    // RunServer registers gRPC service and run server
    func RunServer(ctx context.Context, srv v1.ChatServiceServer, port string) error {
        listen, err := net.Listen("tcp", ":"+port)
        if err != nil {
            return err
        }
    
        // register service
        server := grpc.NewServer()
        v1.RegisterChatServiceServer(server, srv)
    
        // start gRPC server
        log.Println("starting gRPC server...")
        return server.Serve(listen)
    }
    

    最后开发主函数做为Go应用启动点(go-server/cmd/server/main.go):

    package main
    
    import (
        "context"
        "fmt"
        "os"
    
        "github.com/amsokol/flutter-grpc-tutorial/go-server/pkg/protocol/grpc"
        "github.com/amsokol/flutter-grpc-tutorial/go-server/pkg/service/v1"
    )
    
    func main() {
        if err := grpc.RunServer(context.Background(), v1.NewChatServiceServer(), "3000"); err != nil {
            fmt.Fprintf(os.Stderr, "%v\n", err)
            os.Exit(1)
        }
    }
    

    Go服务端代码整体结构如下:

    image.png
    获取go-server完整代码

    创建Flutter客户端应用

    使用官方教程做为本文Flutter客户端程序设计。
    https://codelabs.developers.google.com/codelabs/flutter/#4
    强烈建议认真读这篇教程。可以帮助理解开发Flutter炫酷的UI。

    注1:避免"IOS和Android定制化程序"以保证代码简洁。
    注2:分割main.dart文件到多个文件。

    从该地址获取Flutter客户端代码。

    image.png
    开始前打开pubspec.yaml文件。查看项目依赖包。dependencies代码段如下:
    image.png
    Dart语言包grpcprotobuf提供gRPC引擎。uuid用于chat消息的唯一ID。
    首先通过proto API定义生成Dart的protobuf/grpc代码。在生成前创建文件夹:
    cd flutter-grpc-tutorial
    mkdir -p flutter_client/lib/api/v1/google/protobuf
    

    接着通过chat.proto生成protobuf的Dart代码支持文件:

    protoc empty.proto timestamp.proto wrappers.proto --proto_path=third_party/google/protobuf --plugin=protoc-gen-dart=%USERPROFILE%/AppData/Roaming/Pub/Cache/bin/protoc-gen-dart.bat --dart_out=grpc:flutter_client/lib/api/v1/google/protobuf
    protoc chat.proto --proto_path=api/proto/v1 --proto_path=third_party  --plugin=protoc-gen-dart=%USERPROFILE%/AppData/Roaming/Pub/Cache/bin/protoc-gen-dart.bat --dart_out=grpc:flutter_client/lib/api/v1
    

    注:参数 plugin=protoc-gen-dart=%USERPROFILE%/AppData/Roaming/Pub/Cache/bin/protoc-gen-dart.bat仅在Windows下使用,Linux和MacOS下忽略。

    生成结果如下:


    image.png

    查看Dart文件
    main.dart

    import 'package:flutter/material.dart';
    
    import 'app.dart';
    
    /// main is entry point of Flutter application
    void main() {
      runApp(FriendlychatApp());
    }
    

    app.dart

    import 'package:flutter/material.dart';
    
    import 'chat_screen.dart';
    
    /// FriendlychatApp is Flutter application
    class FriendlychatApp extends StatelessWidget {
      @override
      Widget build(BuildContext context) {
        return MaterialApp(
          title: "Friendlychat",
          home: ChatScreen(),
        );
      }
    }
    

    chat_message.dart

    import 'package:flutter/material.dart';
    import 'package:uuid/uuid.dart';
    
    /// Message is class defining message data (id and text)
    class Message {
      /// _uuid is unique ID generator
      static var _uuid = Uuid();
    
      /// id is unique ID of message
      String id;
    
      /// text is content of message
      String text;
    
      /// Class constructor
      Message(this.text, [this.id]) {
        if (id == null) {
          id = _uuid.v4();
        }
      }
    }
    
    /// ChatMessage is base abstract class for outgoing and incoming message widgets
    abstract class ChatMessage extends Widget {
      /// Message content
      Message get message;
    
      /// Controller of animation for message widget
      AnimationController get animationController;
    }
    

    该文件的Message类包含消息的唯一ID和内容。ChatMessage是通信widget消息输入输出的基类。
    chat_message_incoming.dart

    import 'package:flutter/material.dart';
    
    import 'chat_message.dart';
    
    /// Incoming message author name
    const String _server = "Server";
    
    /// ChatMessageIncoming is widget to display incoming from server message
    class ChatMessageIncoming extends StatelessWidget implements ChatMessage {
      /// Incoming message content
      final Message message;
    
      /// Controller of animation for message widget
      final AnimationController animationController;
    
      /// Constructor
      ChatMessageIncoming({this.message, this.animationController})
          : super(key: new ObjectKey(message.id));
    
      @override
      Widget build(BuildContext context) {
        return SizeTransition(
          sizeFactor:
              CurvedAnimation(parent: animationController, curve: Curves.easeOut),
          axisAlignment: 0.0,
          child: Container(
            margin: EdgeInsets.symmetric(vertical: 10.0),
            child: Row(
              crossAxisAlignment: CrossAxisAlignment.start,
              children: <Widget>[
                Expanded(
                  child: Column(
                    crossAxisAlignment: CrossAxisAlignment.end,
                    children: <Widget>[
                      Text(_server, style: Theme.of(context).textTheme.subhead),
                      Container(
                        margin: EdgeInsets.only(top: 5.0),
                        child: Text(message.text),
                      ),
                    ],
                  ),
                ),
                Container(
                  margin: EdgeInsets.only(left: 16.0),
                  child: CircleAvatar(
                      backgroundColor: Colors.pink.shade600,
                      child: Text(_server[0])),
                ),
              ],
            ),
          ),
        );
      }
    }
    

    ChatMessageIncoming是无状态组件显示接收信息到ListView区域。Stateless意味着ChatMessageIncoming对象一量创建便不会改变。
    chat_message_outgoing.dart

    import 'package:flutter/material.dart';
    
    import 'chat_message.dart';
    
    /// Outgoing message author name
    const String _name = "Me";
    
    /// Outgoing message statuses
    /// UNKNOWN - message just created and is not sent yet
    /// SENT - message is sent to the server successfully
    enum MessageOutgoingStatus { UNKNOWN, SENT }
    
    /// MessageOutgoing is class defining message data (id and text) and status
    class MessageOutgoing extends Message {
      /// Outgoing message status
      MessageOutgoingStatus status;
    
      /// Constructor
      MessageOutgoing(
          {String text, String id, this.status = MessageOutgoingStatus.UNKNOWN})
          : super(text, id);
    }
    
    /// ChatMessageOutgoingController is 'Controller' class that allows change message properties
    class ChatMessageOutgoingController {
      /// Outgoing message content
      MessageOutgoing message;
    
      /// Controller raises this event when status has been changed
      void Function(
              MessageOutgoingStatus oldStatus, MessageOutgoingStatus newStatus)
          onStatusChanged;
    
      /// Constructor
      ChatMessageOutgoingController({this.message});
    
      /// setStatus is method to update status of the outgoing message
      /// It raises onStatusChanged event
      void setStatus(MessageOutgoingStatus newStatus) {
        var oldStatus = message.status;
        if (oldStatus != newStatus) {
          message.status = newStatus;
          if (onStatusChanged != null) {
            onStatusChanged(oldStatus, newStatus);
          }
        }
      }
    }
    
    /// ChatMessageOutgoing is widget to display outgoing to server message
    class ChatMessageOutgoing extends StatefulWidget implements ChatMessage {
      /// Outgoing message content
      final MessageOutgoing message;
    
      /// Message state controller
      final ChatMessageOutgoingController controller;
    
      /// Controller of animation for message widget
      final AnimationController animationController;
    
      /// Constructor
      ChatMessageOutgoing({this.message, this.animationController})
          : controller = ChatMessageOutgoingController(message: message),
            super(key: new ObjectKey(message.id));
    
      @override
      State createState() => ChatMessageOutgoingState(
          animationController: animationController, controller: controller);
    }
    
    /// State for ChatMessageOutgoing widget
    class ChatMessageOutgoingState extends State<ChatMessageOutgoing> {
      /// Message state controller
      final ChatMessageOutgoingController controller;
    
      /// Controller of animation for message widget
      final AnimationController animationController;
    
      /// Constructor
      ChatMessageOutgoingState({this.controller, this.animationController}) {
        // Subscribe to event "message status has been changed"
        controller.onStatusChanged = onStatusChanged;
      }
    
      /// Subscription to event "message status has been changed"
      void onStatusChanged(
          MessageOutgoingStatus oldStatus, MessageOutgoingStatus newStatus) {
        setState(() {});
      }
    
      @override
      Widget build(BuildContext context) {
        return SizeTransition(
          sizeFactor:
              CurvedAnimation(parent: animationController, curve: Curves.easeOut),
          axisAlignment: 0.0,
          child: Container(
            margin: EdgeInsets.symmetric(vertical: 10.0),
            child: Row(
              crossAxisAlignment: CrossAxisAlignment.start,
              children: <Widget>[
                Container(
                  margin: EdgeInsets.only(right: 16.0),
                  child: CircleAvatar(child: Text(_name[0])),
                ),
                Expanded(
                  child: Column(
                    crossAxisAlignment: CrossAxisAlignment.start,
                    children: <Widget>[
                      Text(_name, style: Theme.of(context).textTheme.subhead),
                      Container(
                        margin: EdgeInsets.only(top: 5.0),
                        child: Text(controller.message.text),
                      ),
                    ],
                  ),
                ),
                Container(
                  child: Icon(
                      controller.message.status == MessageOutgoingStatus.SENT
                          ? Icons.done
                          : Icons.access_time),
                ),
              ],
            ),
          ),
        );
      }
    }
    

    ChatMessageOutgoing是有状态(stateful)组件用于输出消息到ListView区域。stateful意味着消息状态能从未知变更为已发送。ChatMessageOutgoingState状态类用于显示发送消息状态。绘制UNKNOWN图标🕗和 done✔图标以标注发送状态。
    ChatMessageOutgoingController允许通过setStatus方法变更消息状态。
    chat_service.dart

    import 'package:grpc/grpc.dart';
    
    import 'api/v1/chat.pbgrpc.dart' as grpc;
    import 'api/v1/google/protobuf/empty.pb.dart';
    import 'api/v1/google/protobuf/wrappers.pb.dart';
    import 'chat_message.dart';
    import 'chat_message_outgoing.dart';
    
    /// CHANGE TO IP ADDRESS OF YOUR SERVER IF IT IS NECESSARY
    const serverIP = "127.0.0.1";
    const serverPort = 3000;
    
    /// ChatService client implementation
    class ChatService {
      /// Flag is indicating that client is shutting down
      bool _isShutdown = false;
    
      /// gRPC client channel to send messages to the server
      ClientChannel _clientSend;
    
      /// gRPC client channel to receive messages from the server
      ClientChannel _clientReceive;
    
      /// Event is raised when message has been sent to the server successfully
      final void Function(MessageOutgoing message) onSentSuccess;
    
      /// Event is raised when message sending is failed
      final void Function(MessageOutgoing message, String error) onSentError;
    
      /// Event is raised when message has been received from the server
      final void Function(Message message) onReceivedSuccess;
    
      /// Event is raised when message receiving is failed
      final void Function(String error) onReceivedError;
    
      /// Constructor
      ChatService(
          {this.onSentSuccess,
          this.onSentError,
          this.onReceivedSuccess,
          this.onReceivedError});
    
      // Shutdown client
      Future<void> shutdown() async {
        _isShutdown = true;
        _shutdownSend();
        _shutdownReceive();
      }
    
      // Shutdown client (send channel)
      void _shutdownSend() {
        if (_clientSend != null) {
          _clientSend.shutdown();
          _clientSend = null;
        }
      }
    
      // Shutdown client (receive channel)
      void _shutdownReceive() {
        if (_clientReceive != null) {
          _clientReceive.shutdown();
          _clientReceive = null;
        }
      }
    
      /// Send message to the server
      void send(MessageOutgoing message) {
        if (_clientSend == null) {
          // create new client
          _clientSend = ClientChannel(
            serverIP, // Your IP here or localhost
            port: serverPort,
            options: ChannelOptions(
              //TODO: Change to secure with server certificates
              credentials: ChannelCredentials.insecure(),
              idleTimeout: Duration(seconds: 10),
            ),
          );
        }
    
        var request = StringValue.create();
        request.value = message.text;
    
        grpc.ChatServiceClient(_clientSend).send(request).then((_) {
          // call for success handler
          if (onSentSuccess != null) {
            var sentMessage = MessageOutgoing(
                text: message.text,
                id: message.id,
                status: MessageOutgoingStatus.SENT);
            onSentSuccess(sentMessage);
          }
        }).catchError((e) {
          if (!_isShutdown) {
            // invalidate current client
            _shutdownSend();
    
            // call for error handler
            if (onSentError != null) {
              onSentError(message, e.toString());
            }
    
            // try to send again
            Future.delayed(Duration(seconds: 30), () {
              send(message);
            });
          }
        });
      }
    
      /// Start listening messages from the server
      void startListening() {
        if (_clientReceive == null) {
          // create new client
          _clientReceive = ClientChannel(
            serverIP, // Your IP here or localhost
            port: serverPort,
            options: ChannelOptions(
              //TODO: Change to secure with server certificates
              credentials: ChannelCredentials.insecure(),
              idleTimeout: Duration(seconds: 10),
            ),
          );
        }
    
        var stream =
            grpc.ChatServiceClient(_clientReceive).subscribe(Empty.create());
    
        stream.forEach((msg) {
          if (onReceivedSuccess != null) {
            var message = Message(msg.text);
            onReceivedSuccess(message);
          }
        }).then((_) {
          // raise exception to start listening again
          throw Exception("stream from the server has been closed");
        }).catchError((e) {
          if (!_isShutdown) {
            // invalidate current client
            _shutdownReceive();
    
            // call for error handler
            if (onReceivedError != null) {
              onReceivedError(e.toString());
            }
    
            // start listening again
            Future.delayed(Duration(seconds: 30), () {
              startListening();
            });
          }
        });
      }
    }
    

    解释一下该文件的代码片段。
    send方法异步发送消息到服务端。首先建立客户端连接channel到服务端:

    image.png
    然后通过proto编译文件中的ChatServiceClient发送消息到服务端。成功返回后触发onSentSuccess事件更新UNKNOWNSENT状态:
    image.png
    错误返回触发onSentError事件,验证链接连通后再次发送消息。当后台应用关闭后停止发送尝试:
    image.png
    接下来看startListening方法。该方法创建客户端连接channel到服务端:
    image.png
    接着使用proto生成代码中的ChatServiceClient开启gRPC stream流用于侦听接收消息。当收到消息后触发onReceivedSuccess事件:
    image.png
    当发生错语或关闭流时触发onReceivedError事件,验证链接连通后尝试再次打开侦听。当服务端应用关闭后停止侦听消息接收:
    image.png
    chat_screen.dart
    import 'dart:async';
    
    import 'package:flutter/material.dart';
    
    import 'bandwidth_buffer.dart';
    import 'chat_message.dart';
    import 'chat_message_incoming.dart';
    import 'chat_message_outgoing.dart';
    import 'chat_service.dart';
    
    /// Host screen widget - main window
    class ChatScreen extends StatefulWidget {
      ChatScreen() : super(key: new ObjectKey("Main window"));
    
      @override
      State createState() => ChatScreenState();
    }
    
    /// State for ChatScreen widget
    class ChatScreenState extends State<ChatScreen> with TickerProviderStateMixin {
      /// Chat client service
      ChatService _service;
    
      /// Look at the https://github.com/flutter/flutter/issues/26375
      BandwidthBuffer _bandwidthBuffer;
    
      /// Stream controller to add messages to the ListView
      final StreamController _streamController = StreamController<List<Message>>();
    
      /// Chat messages list to display into ListView
      final List<ChatMessage> _messages = <ChatMessage>[];
    
      /// Look at the https://codelabs.developers.google.com/codelabs/flutter/#4
      final TextEditingController _textController = TextEditingController();
      bool _isComposing = false;
    
      @override
      void initState() {
        super.initState();
    
        // initialize bandwidth buffer for chat messages display
        _bandwidthBuffer = BandwidthBuffer<Message>(
          duration: Duration(milliseconds: 500),
          onReceive: onReceivedFromBuffer,
        );
        _bandwidthBuffer.start();
    
        // initialize Chat client service
        _service = ChatService(
            onSentSuccess: onSentSuccess,
            onSentError: onSentError,
            onReceivedSuccess: onReceivedSuccess,
            onReceivedError: onReceivedError);
        _service.startListening();
      }
    
      @override
      void dispose() {
        // close Chat client service
        _service.shutdown();
    
        // close bandwidth buffer
        _bandwidthBuffer.stop();
    
        // free UI resources
        for (ChatMessage message in _messages)
          message.animationController.dispose();
        super.dispose();
      }
    
      @override
      Widget build(BuildContext context) {
        return Scaffold(
          appBar: AppBar(title: Text("Friendlychat")),
          body: Column(
            children: <Widget>[
              Flexible(
                child: StreamBuilder<List<Message>>(
                  stream: _streamController.stream,
                  builder: (BuildContext context, AsyncSnapshot snapshot) {
                    if (snapshot.hasError) {
                      return Text("Error: ${snapshot.error}");
                    }
                    switch (snapshot.connectionState) {
                      case ConnectionState.none:
                      case ConnectionState.waiting:
                        break;
                      case ConnectionState.active:
                      case ConnectionState.done:
                        _addMessages(snapshot.data);
                    }
                    return ListView.builder(
                        padding: EdgeInsets.all(8.0),
                        reverse: true,
                        itemBuilder: (_, int index) => _messages[index],
                        itemCount: _messages.length);
                  },
                ),
              ),
              Divider(height: 1.0),
              Container(
                decoration: BoxDecoration(color: Theme.of(context).cardColor),
                child: _buildTextComposer(),
              ),
            ],
          ),
        );
      }
    
      /// Look at the https://codelabs.developers.google.com/codelabs/flutter/#4
      Widget _buildTextComposer() {
        return IconTheme(
          data: IconThemeData(color: Theme.of(context).accentColor),
          child: Container(
            margin: const EdgeInsets.symmetric(horizontal: 8.0),
            child: Row(
              children: <Widget>[
                Flexible(
                  child: TextField(
                    maxLines: null,
                    textInputAction: TextInputAction.send,
                    controller: _textController,
                    onChanged: (String text) {
                      setState(() {
                        _isComposing = text.length > 0;
                      });
                    },
                    onSubmitted: _isComposing ? _handleSubmitted : null,
                    decoration:
                        InputDecoration.collapsed(hintText: "Send a message"),
                  ),
                ),
                Container(
                  margin: EdgeInsets.symmetric(horizontal: 4.0),
                  child: IconButton(
                      icon: Icon(Icons.send),
                      onPressed: _isComposing
                          ? () => _handleSubmitted(_textController.text)
                          : null),
                ),
              ],
            ),
          ),
        );
      }
    
      /// 'new outgoing message created' event
      void _handleSubmitted(String text) {
        _textController.clear();
        _isComposing = false;
    
        // create new message from input text
        var message = MessageOutgoing(text: text);
    
        // send message to the display stream through the bandwidth buffer
        _bandwidthBuffer.send(message);
    
        // async send message to the server
        _service.send(message);
      }
    
      /// 'outgoing message sent to the server' event
      void onSentSuccess(MessageOutgoing message) {
        debugPrint("message \"${message.text}\" sent to the server");
        // send updated message to the display stream through the bandwidth buffer
        _bandwidthBuffer.send(message);
      }
    
      /// 'failed to send message' event
      void onSentError(Message message, String error) {
        debugPrint(
            "FAILED to send message \"${message.text}\" to the server: $error");
      }
    
      /// 'new incoming message received from the server' event
      void onReceivedSuccess(Message message) {
        debugPrint("received message from the server: ${message.text}");
        // send updated message to the display stream through the bandwidth buffer
        _bandwidthBuffer.send(message);
      }
    
      /// 'failed to receive messages' event
      void onReceivedError(String error) {
        debugPrint("FAILED to receive messages from the server: $error");
      }
    
      /// this event means 'the message (or messages) can be displayed'
      /// Look at the https://github.com/flutter/flutter/issues/26375
      void onReceivedFromBuffer(List<Message> messages) {
        // send message(s) to the ListView stream
        _streamController.add(messages);
      }
    
      /// this methods is called to display new (outgoing or incoming) message or
      /// update status of existing outgoing message
      void _addMessages(List<Message> messages) {
        messages.forEach((message) {
          // check if message with the same ID is already existed
          var i = _messages.indexWhere((msg) => msg.message.id == message.id);
          if (i != -1) {
            // found
            var chatMessage = _messages[i];
            if (chatMessage is ChatMessageOutgoing) {
              assert(message is MessageOutgoing,
                  "message must be MessageOutcome type");
              // update status for outgoing message (from UNKNOWN to SENT)
              chatMessage.controller.setStatus((message as MessageOutgoing).status);
            }
          } else {
            // new message
            ChatMessage chatMessage;
            var animationController = AnimationController(
              duration: Duration(milliseconds: 700),
              vsync: this,
            );
            switch (message.runtimeType) {
              case MessageOutgoing:
                // add new outgoing message
                chatMessage = ChatMessageOutgoing(
                  message: message,
                  animationController: animationController,
                );
                break;
              default:
                // add new incoming message
                chatMessage = ChatMessageIncoming(
                  message: message,
                  animationController: animationController,
                );
                break;
            }
            _messages.insert(0, chatMessage);
    
            // look at the https://codelabs.developers.google.com/codelabs/flutter/#6
            chatMessage.animationController.forward();
          }
        });
      }
    }
    

    这是显示消息的主文件。需深入了解一些代码片段。initState方法初始化gRPC通信客户端服务和建立带宽缓存(bandwidth buffer)(关于带宽缓存稍后介绍)。订阅(subscribes) chat服务事件(onSentSuccess, onSentError, onReceivedSuccess, onReceivedError)和侦听来自服务端的消息:

    image.png
    当输入文字信息点击“send”按钮时,触发_handleSubmitted事件。该事件通过输入文本创建输出文件,通过bandwidth buffer显示并发送信息到服务端。发送信息为异步操作因此在该阶段返回结果未知:
    image.png
    下面代码展示如何处理服务层事件:
    image.png
    接着是build方法描述在终端屏上如何显示信息:
    image.png
    使用StreamBuilder类显示送发到stream的数据信息。
    _addMessages方法更新输出或增加输出或收到信息列表状态:
    image.png
    接着ListView builder从消息列表创建ListView 组件:
    image.png
    创建一个简单的BandwidthBuffer类。用来加速消息和发送到StreamBuilder在构造函数中。
    这段代码创建500毫秒的参数:
    image.png
    下面代码是如何发送到buffer:
    image.png
    通过StreamController每隔500毫秒触发发送信息到StreamBuilder:
    image.png
    bandwidth_buffer.dart代码如下:
    import 'dart:async';
    
    class BandwidthBuffer<T> {
      final Duration duration;
      final void Function(List<T>) onReceive;
    
      List<T> _list = <T>[];
      Timer _timer;
    
      BandwidthBuffer({this.duration, this.onReceive});
    
      void start() {
        _timer = Timer.periodic(duration, _onTimeToSend);
      }
    
      void stop() {
        if (_timer != null) {
          _timer.cancel();
          _timer = null;
        }
      }
    
      void send(T t) {
        _list.add(t);
      }
    
      void _onTimeToSend(Timer t) {
        if (_list.length > 0 && onReceive != null) {
          var list = _list;
          _list = <T>[];
          onReceive(list);
        }
      }
    }
    

    Flutter客户端全部代码在该地址下载

    运行服务端及客户端程序

    首先编译和运行服务端代码:

    cd flutter-grpc-tutorial/go-server/cmd/server
    go build .
    server
    

    服务端打印日志如下:

    2019/01/13 19:42:14 starting gRPC server...
    

    接下来运行客户端应用。使用Visual Studio Code运行应用的debug模式。运行结果如下:

    image.png
    在客户端发送两个消息到服务端:
    image.png
    Flutter的程序带了一点容错机制。能够在离线模式下工作。为了验证该功能,需要关闭服务端再次发送消息:
    image.png
    消息被标记为已发送。再次开启服务后Dart会花一些时间进行服务器重新连接并接收到返回信息:
    image.png
    完整代码在该地址下载
    全篇完。

    原文地址(需翻墙)

    相关文章

      网友评论

          本文标题:gRPC实现Flutter客户端go语言服务端异步通信

          本文链接:https://www.haomeiwen.com/subject/nspkghtx.html