美文网首页Flutter学习
Flutter——手撸线程(Isolate)池和子线程工作模块的

Flutter——手撸线程(Isolate)池和子线程工作模块的

作者: 吉哈达 | 来源:发表于2020-09-03 16:55 被阅读0次

    前言

    一些时候我们执行异步操作,如一个api请求,可直接使用async/await 完成。但有时候我们可能需要短时间内执行大量异步操作,如打印,写入文件等等,那么再用async/await 直接插入到event queue中势必会造成卡顿,为此我们应该将它们放在子线程中。

    不过频繁创建/释放isolate也是对资源的浪费,所以我决定模仿java写一个线程池(基础版)。

    基础版 略显稚嫩,还请海涵
    

    结构图

    image

    主要分为三部分:

    main isolate : flutter  主线程
    proxy isolate :  子线程  负责接收主线程的指令,并下发给work isolate
    work isolate :  实际执行任务的子线程
    

    流程

    主线程提交任务,proxy线程接收并缓存下来,同时向线程池(3个线程)内部的空闲线程进行任务派发

    生命周期:线程创建后,便会一直存在,与APP一致
    

    之后通过反射在工作线程中调用预先定义好的方法。

    对于开发人员,无需关注线程内部原理,只需要定义一个方法,
    然后通过WorkerMainProxy类的invokeWorker就可以实现子线程执行。
    

    接下来我们分别实现它们,还是老规矩,代码较多时我会将说明写在注释里。

    main_isolate

    一些常量和数据结构

    因为isolate之间只能通过sendPort.send互相通信,所以我们先定义一些常量

    const int kSendPortKey = 6633;//第二个元素则为 sendPort
    
    const int kTaskKey = 8844; // 第二个元素为task
    
    const int kTaskParamsKey = 10055; // 第二个元素为 方法对应的 参数
    
    const int kTaskResult = 15500;//任务返回结果
    
    ///
    const String kMethodName = 'kMethodName';
    const String kNameArgs = 'kNameArgs';
    

    port发送的message可以是 null,num,double,bool,或者包含上述类型的List和Map,以及SendPort,所以我们需要设计一下数据结构。

    我这里统一使用数组 :

    [key,data] key 是上方的常量值
    当data为task时,类型会是Map,然后用到上面的字符串常量
    

    WorkerMainProxy

    这个类内部与proxy_isolate联系,并通过invokeWorker方法,将任务提交给proxy_isolate,我们看一下他的代码,说明我写在注释里

    class WorkerMainProxy{
      static WorkerMainProxy _instance;
      static WorkerMainProxy getInstance(){
        if(_instance == null){
          _instance = WorkerMainProxy._();
        }
        return _instance;
      }
    
      factory WorkerMainProxy()=>getInstance();
    
    
      WorkerMainProxy._();
        //主线程的收听端口
      final ReceivePort receivePort = ReceivePort();
      //proxy
      Isolate isolate;
      //向proxy发送信息的port
      SendPort childPort;
        //因为线程初始化为异步,确保下达的任务不丢失,创建这个缓存
      List<TaskWrapper> taskCache = [];
        //是否在初始化proxy线程
      bool initializing = false;
    
      ///nameArgs  key: params name
      ///value: params value .
      ///and type can only 'num,null,double,String'
      void invokeWorker({String methodName,Map<String,dynamic> nameArgs})async{
        taskCache.add(TaskWrapper(methodName, nameArgs));
        if(isolate == null && !initializing){
          initializing = true;
          //初始化子线程,第一个参数为顶级函数,由子线程执行,
          //第二个参数为当前线程的sendport,子线程可以用这个向当前线程发送消息
          isolate  = await Isolate.spawn(proxyHandler, receivePort.sendPort);
          //收听子线程发送的消息
          receivePort.listen((message) {
            if(message[0] == kSendPortKey){
              //子线程的sendport传递过来
              childPort = message[1];
              sendTask();
            }else if(message[0]==kTaskResult){
               //初版不支持返回结果
              ///要考虑多线程不同步的情况
              //可能造成难以预料的结果,所以暂不考虑加入返回结果的功能
            }
          });
        }else if(isolate != null && childPort != null){
          sendTask();
        }
    
      }
        //当childPort初始化成功后,开始发送任务
      void sendTask(){
        if(taskCache.length > 0){
          taskCache.forEach((element) {
            childPort.send([kTaskKey,element.methodName,element.nameArgs]);
          });
          taskCache.clear();
        }
      }
    
    }
    

    功能很简单,接下来我们看 proxy_isolate的实现

    proxy_isolate

    结构图

    image

    proxyHandler

    这个方法是属于proxy 线程(是isolate,叫线程叫习惯了,别被我搞乱了)的,

    切记dart线程是不能共享内存的,
    同时不要在子线程里使用dart:ui 或者flutter的东西,否则会报错
    

    代码如下:

    //任务缓存
    List<TaskWrapper> taskLog = [];
    //收/发端口
    final ReceivePort receiveMainPort = ReceivePort();
    final SendPort sendPortOfProxy = receiveMainPort.sendPort;
    //线程池
    final Map<int,WorkIsolateWrapper> workers = {};
    
    void proxyHandler(SendPort mainPort)async{
      //监听主线程的信息
      receiveMainPort.listen((message) {
        if(message[0] == kTaskKey){
        //对主线程的任务进行缓存
          String name = message[1];
          Map<String,dynamic> args = message[2];
          TaskWrapper wrapper = TaskWrapper(name,args);
          taskLog.add(wrapper);
        }
    
      });
     //将proxy的sendport传给主线程
      mainPort.send([kSendPortKey,sendPortOfProxy]);
    
    
      /// create 3 work isolate
        ///创建三个工作线程
      List.generate(3, (index) async{
       //同理
        final ReceivePort proxyPort = ReceivePort();
        final SendPort proxySendPort = proxyPort.sendPort;
        Isolate.spawn(_workerIsolate, proxySendPort,paused: true)
          .then((isolate) {
          //生成ID
          int id = Random().nextInt(1000);
          while(workers.containsKey(id)){
            id = Random().nextInt(1000);
          }
          var worker = WorkIsolateWrapper(id,proxyPort, proxySendPort, isolate);
          workers[id] = worker;
          //启动工作线程
          worker.init();
        });
    
    
      });
      //启动代理
      runProxy();
    
    }
    //创建了一个timer用于循环从任务缓存取出任务,并分发给空闲的工作线程
    void runProxy(){
      final timer = Timer.periodic(Duration(milliseconds: 1), (timer) {
        if(taskLog.length>0){
          workers.forEach((key, value) {
            if(taskLog.length > 0){
              if(value.isStandBy()){
              //向空闲的线程发送任务
                TaskWrapper task = taskLog.first;
                value.setStatus(false);// not free
                value.workSendPort.send([kTaskKey,{kMethodName:task.methodName,
                  kNameArgs:task.nameArgs}]);
                  //移除对应任务
                taskLog.removeWhere((element) => element == task);
              }
    
            }
    
          });
    
        }
      });
    }
    

    对工作线程进行了一下包装WorkIsolateWrapper,方便操作,我们来看一下他的内部结构

    WorkIsolateWrapper

    class WorkIsolateWrapper {
      final int id;
      final ReceivePort proxyPort;
    
      final SendPort proxySendPort;
      //work_isolate
      final Isolate _isolate;
    
      WorkIsolateWrapper(this.id,this.proxyPort, this.proxySendPort, this._isolate);
    
      ///是否空闲
      bool _isFree = true;
      bool isStandBy()=> _isFree&&initSuccess;
      setStatus(bool status){
        _isFree = status;
      }
    
      SendPort workSendPort;
      bool initSuccess = false;
      //这就是我们上面调用的init方法
      //这个方法开始初始化 work isoalte,并把其sendPort保存下来。
      init() {
        _isolate.resume(_isolate.pauseCapability);
        proxyPort.listen((message) {
        //监听工作线程
          if (message[0] == kSendPortKey) {
           //保存工作线程的 sendPort
            workSendPort = message[1];
            initSuccess = true;
          }else if(message[0] == kWorkDone){
            ///work done
            setStatus(true);
            print('isolate $id  完成了 :${message[1].toString()}');
          }
        });
      }
    }
    

    至此proxy_isolate就实现了,我们接下来看一下work_isolate的实现

    work_isolate

    实现

    const int kWorkDone = 98766; //处理完后的回复tag
    
    void _workerIsolate(SendPort proxyPort){
      //启动反射,这里我在后面介绍
      initializeReflectable();
      final ReceivePort receivePort = ReceivePort();
      final SendPort sendPort = receivePort.sendPort;
    
      receivePort.listen((message) {
        if(message[0] == kTaskKey){
          ///执行任务
          //对msg进行解构再组装
          Map method = message[1];
          String mn = method[kMethodName];
          Map<Symbol,dynamic> nameArguments = {};
          if(method[kNameArgs] is Map){
            ///为了避免顺序错误导致的参数异常,这里不使用positionalArguments
            (method[kNameArgs] as Map).forEach((key, value) {
              nameArguments[Symbol(key)] = value;
            });
            //功能模板类,里面定义了一些列方法,通过反射进行调用
            final WorkList workerList = WorkList();
            final InstanceMirror instanceMirror = myReflect.reflect(workerList);
            //执行静态方法用的,这里不需要先注释掉
            //final ClassMirror classMirror = myReflect.reflectType(WorkList);
            //调用方法
            instanceMirror.invoke(mn, [],nameArguments);
            ///work done
            ///结构暂定为 [order flag, result(Map)]
            ///个人认为这种多线程处理任务,最好不要有返回结果 ....待设计
            proxyPort.send([kWorkDone,{'method':mn,'args':nameArguments.toString()}]);
          }
        }
      });
        //将工作线程的sendPort发送给proxy
      proxyPort.send([kSendPortKey,sendPort]);
    
    }
    

    workList 反射模板

    介绍

    我们在worklist里定义好方法,子线程通过反射就可以调用这些方法,这样实际使用的时候,开发人员就不需要关心子线程的实现,只用在WorkList里面定义好自己的方法,然后通过WorkerMainProxy.invoerWorker()方法调用即可。

    实现

    首先我们需要两个插件:

    reflectable: ^2.2.5
    build_runner: ^1.7.0
    

    之后我们在main.dart文件中增加如下代码:

    ///具体方法模板类
    @myReflect
    class WorkList{
    
      test({String n,String m}){
        print('  test method   $n');
      }
    
    
    }
    
    ///插件反射对象
    const myReflect = MyReflectable();
    
    class MyReflectable extends Reflectable{
      const MyReflectable():super(invokingCapability);
    }
    
    

    同时在根目录中增加一个build.yaml文件用来生成代码

    targets:
      $default:
        builders:
          reflectable:
            generate_for:
              - lib/main.dart
    

    都弄好了后,我们在控制台执行

    flutter packages pub run build_runner build
    

    过一小会,你就会看到main.dart下方多了一个

    main.refelctable.dart 
    

    这样我们就完成了反射模板workList的配置,每次编辑workList后都需要运行一下

    flutter packages pub run build_runner build
    

    测试

    至此整个功能就开发完毕了,我们测试一下,点击按钮发起100次调用:

                  RaisedButton(onPressed: ()async{
                    List.generate(100, (index){
                      WorkerMainProxy.getInstance()
                      ///参数一:方法名字,参数二:方法对应的命名参数,
                      ///务必确保参数名与WorkList中的一致
                          .invokeWorker(methodName: 'test',nameArgs: {'n':'第$index次唤起','m':'第二个参数'});
    
                    });
    
                  },
                    child: Text('测试worker'),),
    
    image

    可以看到测试结果是符合我们预期的,初版功能就开发完毕了。

    结语

    以上只是初版功能,还有诸多需要完善的,后续我会不断增加,另外功能模块也可以进一步拓展,如 java中的cache线程池。

    也欢迎大家补充,如有不足还请指出,

    该功能已加入 Bedrock开发框架,希望大家多提意见 :)

    Bedrock开发框架

    我的其它文章

    Bedrock——基于MVVM+Provider的Flutter快速开发框架

    Flutter自定义View——仿高德三级联动Drawer

    Flutter 自定义View——仿同花顺自选股列表

    相关文章

      网友评论

        本文标题:Flutter——手撸线程(Isolate)池和子线程工作模块的

        本文链接:https://www.haomeiwen.com/subject/dwrosktx.html