- 2.https://www.jianshu.com/p/449850aa8e82
- 3.https://www.cnblogs.com/qianggezhishen/archive/2004/01/13/7349323.html
一.Actor模型
1.并发通信方式
一般来说,并发线程中通信方式有两种:
- 共享数据:常见的基于锁的多线程编程;
- 消息传递:常见的
Actor
模式
2.Actor介绍
Actor是通用的并发编程模型,并非某个语言或框架特有。典型的是erlang,从语言层面上支持Actor模型。
- 面向对象=数据+行为
-
Actor模型=数据+行为+消息。
WX20201124-225434@2x.png
特性:
- 无锁:每个Actor在同一时间处理最多一个消息,可以发送消息给其他Actor,保证了单独写原则。从而巧妙避免了多线程写争夺。
- 异步:Actor之间通信方式是异步的,这是Actor实现异步的基础。每个Actor都有一个MailBox用来接受消息,Actor按照接受消息的顺序,逐条处理。
这样的设计主要优势就是解耦了Actor,数万个Actor并发的运行,每个actor都以自己的步调运行,且发送消息,接收消息都不会被阻塞。 - 隔离:Actor模型内部的状态由它自己维护,即它内部数据只能由它自己修改(通过消息传递来进行状态修改)。不同Actor处于物理隔离状态。
- 位置透明:由于Actor模型基于消息传递方式,每个Actor实例的位置透明,无论Actor地址是在本地还是在远程机器上对于代码来说都是一样的。
- 容错:传统编程是防御式编程,在可能出错的地方加异常处理。Actor模型遵循:任其崩溃哲学理念,让Actor的管理者去处理这些崩溃问题。每个Actor的崩溃或者异常都反馈到管理者处,由管理者决定处理方式。
3.Actor做什么
当一个actor接收到消息后,它能做如下三件事中的一件:
- Create more actors; 创建其他actors
- Send messages to other actors; 向其他actors发送消息
- Designates what to do with the next message. 指定下一条消息到来的行为
「指定下一条消息来到做什么」意味着可以定义下条消息来到时的状态。更清楚地说,就是actors如何修改状态。
例子:想有一个actor像计算器,它的初始状态是数字0。当这个actor接收到add(1)消息时,它并不改变它原本的状态,而是指定当它接收到下一个消息时,状态会变为1。
二.Libprocess
每个Actor(Process)占用一个线程。不同Actor,可能处于同一个进程内,也可以处于不同进程。调用方式:
- 线程内函数调用:A-->B 或者 A.dispacher(self())向自己写event方式。
- 相同进程,不同Actor函数调用:A.disptcher(B.id, message),A向B发送一个event事件.
- 不同进程,不同Actor函数调用:A.Send(B.id, message),A向B发送一个event事件。
1.Libprocess中概念介绍
Libprocess是基于Actor模型实现的库,提供基于actor style的并行编程框架。通过使用libprocess,异步并行变成变得相对简单。
- process:与OS的进程process概念不同,libprocess中的process表示一个actor。具体来说:是一个class。
libprocess的基础是:event visitor和event list,其继承关系可表述为:
深度截图_选择区域_20201130162329.png
process的基类是event_visitor,其内部有个属性:event_list,process的工作实际是visit这个list中的events并执行。
但是process本身不会主动去visitor各个events,实际上各个Actor仍然运行在实体Thread上。在libprocess库中,全局静态变量类process_manager(声明在:process.h中,在process.cpp中定义)将负责启动线程并运行Actor.
运行流程总结如下:
- 创建process对象;
调用processBase()构造函数-->process::initialize()函数,访问全局变量process_manager。process_manager负责记录管理Actor信息。
//如果process::initialize()从未被调用过,则创建至少8个线程或者等于CPU数目的线程
long cpus = std::max(8L, sysconf(_SC_NPROCESSORS_ONLY))
for (int i = 0; i < cpus, i++) {
pthread_t thread; //for now , not saving handles on our threads
if (phtread_create(&thread, null, schedule, null) != 0) {
LOG(FATAL) << "Failed to initialize, pthread_create";
}
}
声明process之后,实体线程运行的schedule函数取出运行队列中process并运行。
//调度器不断查看process_manager队列中是否有process,如果有,恢复并启动process.
void* schedule(void* arg)
{
do{
ProcessBase* process = process_manager->dequeue();
if (process == NULL) {
}
process_manager->resume(process);
}
}
所以process声明之后,还需要写入相应线程的运行队列中,才能够开始运行。
将process写入运行队列需要通过spawn(process)函数。
- spawn(process):
1)首先将调用process本身的initialize()函数,进行初始化。
2)然后调用process_manager的enqueue()方法,将process写入某一个thread的运行队列中。
UPID ProcessManager::spawn(ProcessBase* process, bool manager)
{
CHECK(process != NULL);
synchronized (process) {
//processes已经记录过该process
if (processes.count(process->pid.id) > 0) {
return UPID();
} else {
//如果是新的Actor,记录下来
processes[process->pid.id] = process;
}
}
if (manager) {
dispatch(gc, &GarbageCollector::manage<ProcessBase>, process);
}
UPID pid = process->self();
//Actor入队
enqueue(process);
return pid;
}
至此,Actor创建成功,并放入process_manager的队列中,process_manager中有一个线程,不断激活各个Actor。Actor启动成功。
1.libprocess中多个process的单机并行通信:dispatcehr/delay
同一个进程内的不同Actor之间的并行通信,不需要获取返回值。
- dispatcher()
process::PID<SimpleProcess> pid = process::spwn(simpleProcess);
process::dispatcher(pid, &SimplerProcesss::doSomething, "test");
dispatcher()方法将一个event插入目标process的event队列中。event由目标process的成员函数和相应的变量组成,dispatcher成功后,对应process的event队列将有一个执行相关函数的event.
- delay()
delay(Seconds(5), self(), &Self::batch)
delay()是延迟的dispatcher(),dispatch和delay方法均可以通过self()函数对本线程写入新event。
2.libporcess中的异步并发:future/promise/defer
同一个进程内的不同Actor之间的并行通信,需要获取返回值。
- future
用于异步回调的对象,一般由promise产生,可以在不同process中拷贝/引用,future中定义了各个回调函数接口:
const Future<T>& onDiscard(DiscardCallback&& callback) const;
const Future<T>& onRready(ReadyCallback&& callback) const;
const Future<T>& onFailed(FailedCallback&& callback) const;
const Future<T>& onDiscarded(DiscardedCallback&& callback) const;
const Future<T>& onAny(AnyCallback&& callback) const;
当一个future对象状态发生变化时,相应的回调函数会被调用,从而达到异步并发的效果。
- promise
future的入口,一般只在一个process中set(赋值)。promise不应该再多个process之间拷贝/引用。promise中只包含了future成员变量。
一般通过promise.future的方式产生一个future。futrue可以在不同process中引用/拷贝,但只能在一个进程中通过promise设置。因为promise一般只存在一个process中。
//简单实例
class Simple : public process::Process<SimpleProcess>
{
public:
Future<Nothing> doSomething(const string msg) {
cout << "Wrapping message" << msg << endl;
return Nothing();
}
Future<int> calc(int lhs, int rhs) {
return Promise<int>(lhs+rhs).future();
}
private:
Promise<bool> shouldQuit;
}
int runProcess()
{
SimpleProcess simpleProcess;
process::PID<SimpleProcess> pid = process::spawn(simpleProcess);
process::dispatcher(pid, &SimpleProcess::doSomething, "test");
Future<int> sum = process::dispatch(pid, &SimpleProcess::calc, 99, 101);
sum.then([](int n) {
cout << "99 + 101 = " << n << endl;
return Nothing();
})
sum.await();
process::terminate(SimpleProcess);
process::wait(simpleProcess);
}
- defer
如果使用如下语句:
dispatch(master_, &Master::indicateInverseOffer, request).onAny(dispatch(self(), &Self::updateRequestQueue, lambda::_1))
这一命令向master process dispach一个event,调用master类中的indicateInverseOffer函数。该函数返回一个future。我们希望future状态变化时,往本线程dispatcher一个event。
但是,onAny中的dispatch函数是由master process赋值的future调用,从而是由master process所属线程发起的dispatcher调用。不合目的。
dispatch(master_, &Master::indicateInverseOffer, request).onAny(defer(self(), &Self::updateRequestQueue, lambad::_1))
简单来说,defer()返回一个called object, 这个called object是在相应线程被初始化的。
那么当indicateInverseOffer()所返回的future状态发生变化时,onAny调用的是defer()函数的返回值,这个返回值是再代码所属的process产生的,这保证了程序的意图。
3.Libprocess分布式系统编程:基于protobuf消息的方法
不同进程间的不同Actor之间的并行通信。
libprocess通过提供send()和install()函数进行分布式通信。虽然libprocess分布式通信不依赖于protobuf, 但是为了简化消息结构序列化问题,libprocess提供protobufProcess类,这一定制的process支持基于protobuf的消息通信。
- install()
Install方法将handler注册到process类所属的handler map中,一旦相应message到来。即启用handler map对应的handler(函数),以下定义作为install方法的实现路径之一:
template <typename M>
void install(void (T::*method)(const process::UPID))
{
google::protobuf::Message* m = new M();
T* t = static_cast<T*>(this);
protobufHandlers[m->GetTypeName()] = lambda::bind(handler0, t, method,
lambda::_1, lambda::_2);
delete m;
}
install 接受protobuf message中的field作为独立的输入,如:
install<SlaveRegisteredMessage>
(&SimpleMasterProcess::SlaveRegistered,&SlaveRegisteredMessage::slave_id)
当SlaveRegisteredMessage信息到来之后,process将把slaveRegistered()函数作为一个event写入队列,并将message中的slave_id field作为输入量。
特别注意的是,protobufProcess中,protobuf消息中的repeat fileds将会被自动转换成vector.
template<typename T>
std::vector<T> convert(const google::protobuf::RepeatedPtrFiled<T>& items)
{
vector<T> result;
for(int i = 0; i < items.size(); i++) {
result.push_back(items.Get(i));
}
return result;
}
- UPID与通信
非分布式通信,即同一进程内,不同Actor区分通过PID。
在基于libprocess的分布式系统中,每个process均有一个唯一的UPID,process之间分布式通信通过UPID识别地址,UPID的主要信息如下:
std::string id;
//一般是ip:port
network::Addresss address;
每个UPID实现了libprocess的地址空间中唯一地址,其中id默认是从1开始,spawn一个新的process,则增加到(2),如此类推。
- send()
使用send()换气远程process的回调函数:通过UPID,向远程process发送一个protobuf消息。远程process接收到消息后,将取出消息对应的handler,将它放入任务队列。
同时注意,send()函数总会发送起始process的UPID,所以protobufProcess的install函数所有的handler的第一个输入永远是:
//源UPID
const process::UPID&
这样,可以知道每个消息来源的UPID.
网友评论