美文网首页
libprocess原理&Actor模型

libprocess原理&Actor模型

作者: celusing | 来源:发表于2020-11-30 21:04 被阅读0次

一.Actor模型

1.并发通信方式

一般来说,并发线程中通信方式有两种:

  • 共享数据:常见的基于锁的多线程编程;
  • 消息传递:常见的Actor模式

2.Actor介绍

Actor是通用的并发编程模型,并非某个语言或框架特有。典型的是erlang,从语言层面上支持Actor模型。

  • 面向对象=数据+行为
  • Actor模型=数据+行为+消息。


    WX20201124-225434@2x.png

    特性:

  • 无锁:每个Actor在同一时间处理最多一个消息,可以发送消息给其他Actor,保证了单独写原则。从而巧妙避免了多线程写争夺。
  • 异步:Actor之间通信方式是异步的,这是Actor实现异步的基础。每个Actor都有一个MailBox用来接受消息,Actor按照接受消息的顺序,逐条处理。
    这样的设计主要优势就是解耦了Actor,数万个Actor并发的运行,每个actor都以自己的步调运行,且发送消息,接收消息都不会被阻塞。
  • 隔离:Actor模型内部的状态由它自己维护,即它内部数据只能由它自己修改(通过消息传递来进行状态修改)。不同Actor处于物理隔离状态。
  • 位置透明:由于Actor模型基于消息传递方式,每个Actor实例的位置透明,无论Actor地址是在本地还是在远程机器上对于代码来说都是一样的。
  • 容错:传统编程是防御式编程,在可能出错的地方加异常处理。Actor模型遵循:任其崩溃哲学理念,让Actor的管理者去处理这些崩溃问题。每个Actor的崩溃或者异常都反馈到管理者处,由管理者决定处理方式。

3.Actor做什么

当一个actor接收到消息后,它能做如下三件事中的一件:

  • Create more actors; 创建其他actors
  • Send messages to other actors; 向其他actors发送消息
  • Designates what to do with the next message. 指定下一条消息到来的行为
    「指定下一条消息来到做什么」意味着可以定义下条消息来到时的状态。更清楚地说,就是actors如何修改状态。
    例子:想有一个actor像计算器,它的初始状态是数字0。当这个actor接收到add(1)消息时,它并不改变它原本的状态,而是指定当它接收到下一个消息时,状态会变为1。

二.Libprocess

每个Actor(Process)占用一个线程。不同Actor,可能处于同一个进程内,也可以处于不同进程。调用方式:

  • 线程内函数调用:A-->B 或者 A.dispacher(self())向自己写event方式。
  • 相同进程,不同Actor函数调用:A.disptcher(B.id, message),A向B发送一个event事件.
  • 不同进程,不同Actor函数调用:A.Send(B.id, message),A向B发送一个event事件。

1.Libprocess中概念介绍

Libprocess是基于Actor模型实现的库,提供基于actor style的并行编程框架。通过使用libprocess,异步并行变成变得相对简单。

  • process:与OS的进程process概念不同,libprocess中的process表示一个actor。具体来说:是一个class。

libprocess的基础是:event visitor和event list,其继承关系可表述为:


深度截图_选择区域_20201130162329.png

process的基类是event_visitor,其内部有个属性:event_list,process的工作实际是visit这个list中的events并执行。
但是process本身不会主动去visitor各个events,实际上各个Actor仍然运行在实体Thread上。在libprocess库中,全局静态变量类process_manager(声明在:process.h中,在process.cpp中定义)将负责启动线程并运行Actor.

运行流程总结如下:

  • 创建process对象;
    调用processBase()构造函数-->process::initialize()函数,访问全局变量process_manager。process_manager负责记录管理Actor信息。
//如果process::initialize()从未被调用过,则创建至少8个线程或者等于CPU数目的线程
long cpus = std::max(8L, sysconf(_SC_NPROCESSORS_ONLY))
for (int i = 0; i < cpus, i++) {
  pthread_t thread; //for now , not saving handles on our threads
  if (phtread_create(&thread, null, schedule, null) != 0) {
  LOG(FATAL) << "Failed to initialize, pthread_create";
}
}

声明process之后,实体线程运行的schedule函数取出运行队列中process并运行。

//调度器不断查看process_manager队列中是否有process,如果有,恢复并启动process.
void* schedule(void* arg)
{
  do{
      ProcessBase* process = process_manager->dequeue();
      if (process == NULL) {

      }
      process_manager->resume(process);
  }
}

所以process声明之后,还需要写入相应线程的运行队列中,才能够开始运行。
将process写入运行队列需要通过spawn(process)函数。

  • spawn(process):
    1)首先将调用process本身的initialize()函数,进行初始化。
    2)然后调用process_manager的enqueue()方法,将process写入某一个thread的运行队列中。
UPID ProcessManager::spawn(ProcessBase* process, bool manager)
{
  CHECK(process != NULL);
  synchronized (process) {
    //processes已经记录过该process
    if (processes.count(process->pid.id) > 0) {
      return UPID();
    } else {
      //如果是新的Actor,记录下来
      processes[process->pid.id] = process;
    }
  }

  if (manager) {
    dispatch(gc, &GarbageCollector::manage<ProcessBase>, process);
  }

  UPID pid = process->self();
   //Actor入队
   enqueue(process);

  return pid;
}

至此,Actor创建成功,并放入process_manager的队列中,process_manager中有一个线程,不断激活各个Actor。Actor启动成功。

1.libprocess中多个process的单机并行通信:dispatcehr/delay

同一个进程内的不同Actor之间的并行通信,不需要获取返回值。

  1. dispatcher()
process::PID<SimpleProcess> pid = process::spwn(simpleProcess);
process::dispatcher(pid, &SimplerProcesss::doSomething, "test");

dispatcher()方法将一个event插入目标process的event队列中。event由目标process的成员函数和相应的变量组成,dispatcher成功后,对应process的event队列将有一个执行相关函数的event.

  1. delay()
delay(Seconds(5), self(), &Self::batch)

delay()是延迟的dispatcher(),dispatch和delay方法均可以通过self()函数对本线程写入新event。

2.libporcess中的异步并发:future/promise/defer

同一个进程内的不同Actor之间的并行通信,需要获取返回值。

  1. future
    用于异步回调的对象,一般由promise产生,可以在不同process中拷贝/引用,future中定义了各个回调函数接口:
const Future<T>& onDiscard(DiscardCallback&& callback) const;
const Future<T>& onRready(ReadyCallback&& callback) const;
const Future<T>& onFailed(FailedCallback&& callback) const;
const Future<T>& onDiscarded(DiscardedCallback&& callback) const;
const Future<T>& onAny(AnyCallback&& callback) const;

当一个future对象状态发生变化时,相应的回调函数会被调用,从而达到异步并发的效果。

  1. promise
    future的入口,一般只在一个process中set(赋值)。promise不应该再多个process之间拷贝/引用。promise中只包含了future成员变量。
    一般通过promise.future的方式产生一个future。futrue可以在不同process中引用/拷贝,但只能在一个进程中通过promise设置。因为promise一般只存在一个process中。
//简单实例
class Simple : public process::Process<SimpleProcess>
{
public:
  Future<Nothing> doSomething(const string msg) {
    cout << "Wrapping message" << msg << endl;
    return Nothing();
  }

  Future<int> calc(int lhs, int rhs) {
    return Promise<int>(lhs+rhs).future();
  }
private:
  Promise<bool> shouldQuit;  
}

int runProcess()
{
  SimpleProcess simpleProcess;
  process::PID<SimpleProcess> pid = process::spawn(simpleProcess);

  process::dispatcher(pid, &SimpleProcess::doSomething, "test");
  
  Future<int> sum = process::dispatch(pid, &SimpleProcess::calc, 99, 101);
  sum.then([](int n) {
    cout << "99 + 101 = " << n << endl;
    return Nothing();
  })

  sum.await();
  process::terminate(SimpleProcess);
  process::wait(simpleProcess);
}
  1. defer
    如果使用如下语句:
dispatch(master_, &Master::indicateInverseOffer, request).onAny(dispatch(self(), &Self::updateRequestQueue, lambda::_1))

这一命令向master process dispach一个event,调用master类中的indicateInverseOffer函数。该函数返回一个future。我们希望future状态变化时,往本线程dispatcher一个event。
但是,onAny中的dispatch函数是由master process赋值的future调用,从而是由master process所属线程发起的dispatcher调用。不合目的。

dispatch(master_, &Master::indicateInverseOffer, request).onAny(defer(self(), &Self::updateRequestQueue, lambad::_1))

简单来说,defer()返回一个called object, 这个called object是在相应线程被初始化的。
那么当indicateInverseOffer()所返回的future状态发生变化时,onAny调用的是defer()函数的返回值,这个返回值是再代码所属的process产生的,这保证了程序的意图。

3.Libprocess分布式系统编程:基于protobuf消息的方法

不同进程间的不同Actor之间的并行通信。
libprocess通过提供send()和install()函数进行分布式通信。虽然libprocess分布式通信不依赖于protobuf, 但是为了简化消息结构序列化问题,libprocess提供protobufProcess类,这一定制的process支持基于protobuf的消息通信。

  1. install()
    Install方法将handler注册到process类所属的handler map中,一旦相应message到来。即启用handler map对应的handler(函数),以下定义作为install方法的实现路径之一:
template <typename M>
void install(void (T::*method)(const process::UPID))
{
  google::protobuf::Message* m = new M();
  T* t = static_cast<T*>(this);
  protobufHandlers[m->GetTypeName()] = lambda::bind(handler0, t, method,   
  lambda::_1, lambda::_2);
  delete m;
}

install 接受protobuf message中的field作为独立的输入,如:
install<SlaveRegisteredMessage>
(&SimpleMasterProcess::SlaveRegistered,&SlaveRegisteredMessage::slave_id)

当SlaveRegisteredMessage信息到来之后,process将把slaveRegistered()函数作为一个event写入队列,并将message中的slave_id field作为输入量。
特别注意的是,protobufProcess中,protobuf消息中的repeat fileds将会被自动转换成vector.

template<typename T>
std::vector<T> convert(const google::protobuf::RepeatedPtrFiled<T>& items)
{
  vector<T> result;
  for(int i = 0; i < items.size(); i++) {
    result.push_back(items.Get(i));
  }
  
  return result;
}
  1. UPID与通信
    非分布式通信,即同一进程内,不同Actor区分通过PID。
    在基于libprocess的分布式系统中,每个process均有一个唯一的UPID,process之间分布式通信通过UPID识别地址,UPID的主要信息如下:
std::string id;
//一般是ip:port
network::Addresss address;

每个UPID实现了libprocess的地址空间中唯一地址,其中id默认是从1开始,spawn一个新的process,则增加到(2),如此类推。

  1. send()
    使用send()换气远程process的回调函数:通过UPID,向远程process发送一个protobuf消息。远程process接收到消息后,将取出消息对应的handler,将它放入任务队列。
    同时注意,send()函数总会发送起始process的UPID,所以protobufProcess的install函数所有的handler的第一个输入永远是:
//源UPID
const process::UPID&

这样,可以知道每个消息来源的UPID.

相关文章

  • libprocess原理&Actor模型

    [https://blog.csdn.net/bjweimengshu/article/details/89062...

  • [xactor]学习笔记--序

    最近学习CSP模型和actor模型,actor模型没有玩过。所以开始学习。actor模型在rust语言下有 Xac...

  • Actor模型

    Actor模型本质上是一种计算模型,基本的计算单元称为Actor。在Actor模型中,所有的计算都是在Actor中...

  • Actor模型合CSP模型

    Actor模型 在Actor模型中,主角是Actor,类似一种Worker。Actor彼此之间直接发送消息,不需要...

  • actor、reactor与proactor模型

    actor、reactor与proactor模型:高性能服务器的几种模型概念。 actor模型: 实体之通过消息通...

  • actor reactor proactor模型

    actor、reactor与proactor模型:高性能服务器的几种模型概念。 actor模型: 实体之通过消息通...

  • Orleans(一) 整体介绍

    背景 Orleans是微软开源的Actor模型开发框架。 Actor模型此模型解决了并发编程时对资源竞争使用的问题...

  • actor模型

    概述 本文主要介绍actor模型的定义和使用场景,最后介绍一个使用akka的例子。 actor模型定义 在许多并发...

  • Actor模型

    有关并行(Parallelism),不得不提的肯定是Erlang,通用的面向并发的函数编程语言,这种编程语言的选择...

  • Actor模型

    传统的游戏服务器要么是单线程要么是多线程,过去几十年里CPU一直遵循摩尔定律发展,带来的结果是单核频率越来越高。而...

网友评论

      本文标题:libprocess原理&Actor模型

      本文链接:https://www.haomeiwen.com/subject/dpewiktx.html