美文网首页
libprocess原理&Actor模型

libprocess原理&Actor模型

作者: celusing | 来源:发表于2020-11-30 21:04 被阅读0次

    一.Actor模型

    1.并发通信方式

    一般来说,并发线程中通信方式有两种:

    • 共享数据:常见的基于锁的多线程编程;
    • 消息传递:常见的Actor模式

    2.Actor介绍

    Actor是通用的并发编程模型,并非某个语言或框架特有。典型的是erlang,从语言层面上支持Actor模型。

    • 面向对象=数据+行为
    • Actor模型=数据+行为+消息。


      WX20201124-225434@2x.png

      特性:

    • 无锁:每个Actor在同一时间处理最多一个消息,可以发送消息给其他Actor,保证了单独写原则。从而巧妙避免了多线程写争夺。
    • 异步:Actor之间通信方式是异步的,这是Actor实现异步的基础。每个Actor都有一个MailBox用来接受消息,Actor按照接受消息的顺序,逐条处理。
      这样的设计主要优势就是解耦了Actor,数万个Actor并发的运行,每个actor都以自己的步调运行,且发送消息,接收消息都不会被阻塞。
    • 隔离:Actor模型内部的状态由它自己维护,即它内部数据只能由它自己修改(通过消息传递来进行状态修改)。不同Actor处于物理隔离状态。
    • 位置透明:由于Actor模型基于消息传递方式,每个Actor实例的位置透明,无论Actor地址是在本地还是在远程机器上对于代码来说都是一样的。
    • 容错:传统编程是防御式编程,在可能出错的地方加异常处理。Actor模型遵循:任其崩溃哲学理念,让Actor的管理者去处理这些崩溃问题。每个Actor的崩溃或者异常都反馈到管理者处,由管理者决定处理方式。

    3.Actor做什么

    当一个actor接收到消息后,它能做如下三件事中的一件:

    • Create more actors; 创建其他actors
    • Send messages to other actors; 向其他actors发送消息
    • Designates what to do with the next message. 指定下一条消息到来的行为
      「指定下一条消息来到做什么」意味着可以定义下条消息来到时的状态。更清楚地说,就是actors如何修改状态。
      例子:想有一个actor像计算器,它的初始状态是数字0。当这个actor接收到add(1)消息时,它并不改变它原本的状态,而是指定当它接收到下一个消息时,状态会变为1。

    二.Libprocess

    每个Actor(Process)占用一个线程。不同Actor,可能处于同一个进程内,也可以处于不同进程。调用方式:

    • 线程内函数调用:A-->B 或者 A.dispacher(self())向自己写event方式。
    • 相同进程,不同Actor函数调用:A.disptcher(B.id, message),A向B发送一个event事件.
    • 不同进程,不同Actor函数调用:A.Send(B.id, message),A向B发送一个event事件。

    1.Libprocess中概念介绍

    Libprocess是基于Actor模型实现的库,提供基于actor style的并行编程框架。通过使用libprocess,异步并行变成变得相对简单。

    • process:与OS的进程process概念不同,libprocess中的process表示一个actor。具体来说:是一个class。

    libprocess的基础是:event visitor和event list,其继承关系可表述为:


    深度截图_选择区域_20201130162329.png

    process的基类是event_visitor,其内部有个属性:event_list,process的工作实际是visit这个list中的events并执行。
    但是process本身不会主动去visitor各个events,实际上各个Actor仍然运行在实体Thread上。在libprocess库中,全局静态变量类process_manager(声明在:process.h中,在process.cpp中定义)将负责启动线程并运行Actor.

    运行流程总结如下:

    • 创建process对象;
      调用processBase()构造函数-->process::initialize()函数,访问全局变量process_manager。process_manager负责记录管理Actor信息。
    //如果process::initialize()从未被调用过,则创建至少8个线程或者等于CPU数目的线程
    long cpus = std::max(8L, sysconf(_SC_NPROCESSORS_ONLY))
    for (int i = 0; i < cpus, i++) {
      pthread_t thread; //for now , not saving handles on our threads
      if (phtread_create(&thread, null, schedule, null) != 0) {
      LOG(FATAL) << "Failed to initialize, pthread_create";
    }
    }
    

    声明process之后,实体线程运行的schedule函数取出运行队列中process并运行。

    //调度器不断查看process_manager队列中是否有process,如果有,恢复并启动process.
    void* schedule(void* arg)
    {
      do{
          ProcessBase* process = process_manager->dequeue();
          if (process == NULL) {
    
          }
          process_manager->resume(process);
      }
    }
    

    所以process声明之后,还需要写入相应线程的运行队列中,才能够开始运行。
    将process写入运行队列需要通过spawn(process)函数。

    • spawn(process):
      1)首先将调用process本身的initialize()函数,进行初始化。
      2)然后调用process_manager的enqueue()方法,将process写入某一个thread的运行队列中。
    UPID ProcessManager::spawn(ProcessBase* process, bool manager)
    {
      CHECK(process != NULL);
      synchronized (process) {
        //processes已经记录过该process
        if (processes.count(process->pid.id) > 0) {
          return UPID();
        } else {
          //如果是新的Actor,记录下来
          processes[process->pid.id] = process;
        }
      }
    
      if (manager) {
        dispatch(gc, &GarbageCollector::manage<ProcessBase>, process);
      }
    
      UPID pid = process->self();
       //Actor入队
       enqueue(process);
    
      return pid;
    }
    

    至此,Actor创建成功,并放入process_manager的队列中,process_manager中有一个线程,不断激活各个Actor。Actor启动成功。

    1.libprocess中多个process的单机并行通信:dispatcehr/delay

    同一个进程内的不同Actor之间的并行通信,不需要获取返回值。

    1. dispatcher()
    process::PID<SimpleProcess> pid = process::spwn(simpleProcess);
    process::dispatcher(pid, &SimplerProcesss::doSomething, "test");
    

    dispatcher()方法将一个event插入目标process的event队列中。event由目标process的成员函数和相应的变量组成,dispatcher成功后,对应process的event队列将有一个执行相关函数的event.

    1. delay()
    delay(Seconds(5), self(), &Self::batch)
    

    delay()是延迟的dispatcher(),dispatch和delay方法均可以通过self()函数对本线程写入新event。

    2.libporcess中的异步并发:future/promise/defer

    同一个进程内的不同Actor之间的并行通信,需要获取返回值。

    1. future
      用于异步回调的对象,一般由promise产生,可以在不同process中拷贝/引用,future中定义了各个回调函数接口:
    const Future<T>& onDiscard(DiscardCallback&& callback) const;
    const Future<T>& onRready(ReadyCallback&& callback) const;
    const Future<T>& onFailed(FailedCallback&& callback) const;
    const Future<T>& onDiscarded(DiscardedCallback&& callback) const;
    const Future<T>& onAny(AnyCallback&& callback) const;
    

    当一个future对象状态发生变化时,相应的回调函数会被调用,从而达到异步并发的效果。

    1. promise
      future的入口,一般只在一个process中set(赋值)。promise不应该再多个process之间拷贝/引用。promise中只包含了future成员变量。
      一般通过promise.future的方式产生一个future。futrue可以在不同process中引用/拷贝,但只能在一个进程中通过promise设置。因为promise一般只存在一个process中。
    //简单实例
    class Simple : public process::Process<SimpleProcess>
    {
    public:
      Future<Nothing> doSomething(const string msg) {
        cout << "Wrapping message" << msg << endl;
        return Nothing();
      }
    
      Future<int> calc(int lhs, int rhs) {
        return Promise<int>(lhs+rhs).future();
      }
    private:
      Promise<bool> shouldQuit;  
    }
    
    int runProcess()
    {
      SimpleProcess simpleProcess;
      process::PID<SimpleProcess> pid = process::spawn(simpleProcess);
    
      process::dispatcher(pid, &SimpleProcess::doSomething, "test");
      
      Future<int> sum = process::dispatch(pid, &SimpleProcess::calc, 99, 101);
      sum.then([](int n) {
        cout << "99 + 101 = " << n << endl;
        return Nothing();
      })
    
      sum.await();
      process::terminate(SimpleProcess);
      process::wait(simpleProcess);
    }
    
    1. defer
      如果使用如下语句:
    dispatch(master_, &Master::indicateInverseOffer, request).onAny(dispatch(self(), &Self::updateRequestQueue, lambda::_1))
    

    这一命令向master process dispach一个event,调用master类中的indicateInverseOffer函数。该函数返回一个future。我们希望future状态变化时,往本线程dispatcher一个event。
    但是,onAny中的dispatch函数是由master process赋值的future调用,从而是由master process所属线程发起的dispatcher调用。不合目的。

    dispatch(master_, &Master::indicateInverseOffer, request).onAny(defer(self(), &Self::updateRequestQueue, lambad::_1))
    

    简单来说,defer()返回一个called object, 这个called object是在相应线程被初始化的。
    那么当indicateInverseOffer()所返回的future状态发生变化时,onAny调用的是defer()函数的返回值,这个返回值是再代码所属的process产生的,这保证了程序的意图。

    3.Libprocess分布式系统编程:基于protobuf消息的方法

    不同进程间的不同Actor之间的并行通信。
    libprocess通过提供send()和install()函数进行分布式通信。虽然libprocess分布式通信不依赖于protobuf, 但是为了简化消息结构序列化问题,libprocess提供protobufProcess类,这一定制的process支持基于protobuf的消息通信。

    1. install()
      Install方法将handler注册到process类所属的handler map中,一旦相应message到来。即启用handler map对应的handler(函数),以下定义作为install方法的实现路径之一:
    template <typename M>
    void install(void (T::*method)(const process::UPID))
    {
      google::protobuf::Message* m = new M();
      T* t = static_cast<T*>(this);
      protobufHandlers[m->GetTypeName()] = lambda::bind(handler0, t, method,   
      lambda::_1, lambda::_2);
      delete m;
    }
    
    install 接受protobuf message中的field作为独立的输入,如:
    install<SlaveRegisteredMessage>
    (&SimpleMasterProcess::SlaveRegistered,&SlaveRegisteredMessage::slave_id)
    

    当SlaveRegisteredMessage信息到来之后,process将把slaveRegistered()函数作为一个event写入队列,并将message中的slave_id field作为输入量。
    特别注意的是,protobufProcess中,protobuf消息中的repeat fileds将会被自动转换成vector.

    template<typename T>
    std::vector<T> convert(const google::protobuf::RepeatedPtrFiled<T>& items)
    {
      vector<T> result;
      for(int i = 0; i < items.size(); i++) {
        result.push_back(items.Get(i));
      }
      
      return result;
    }
    
    1. UPID与通信
      非分布式通信,即同一进程内,不同Actor区分通过PID。
      在基于libprocess的分布式系统中,每个process均有一个唯一的UPID,process之间分布式通信通过UPID识别地址,UPID的主要信息如下:
    std::string id;
    //一般是ip:port
    network::Addresss address;
    

    每个UPID实现了libprocess的地址空间中唯一地址,其中id默认是从1开始,spawn一个新的process,则增加到(2),如此类推。

    1. send()
      使用send()换气远程process的回调函数:通过UPID,向远程process发送一个protobuf消息。远程process接收到消息后,将取出消息对应的handler,将它放入任务队列。
      同时注意,send()函数总会发送起始process的UPID,所以protobufProcess的install函数所有的handler的第一个输入永远是:
    //源UPID
    const process::UPID&
    

    这样,可以知道每个消息来源的UPID.

    相关文章

      网友评论

          本文标题:libprocess原理&Actor模型

          本文链接:https://www.haomeiwen.com/subject/dpewiktx.html