在当今的数据处理和数据流水线系统中,将数据从源流传输到接收器是一项非常琐碎的任务。因此,有许多[流媒体]解决方案,例如:Kafka Stream,Spark Streaming,Apache Flink等。
Akka流在这场战斗中脱颖而出,并具有完全由应用程序驱动的优势。Akka流是在Akka著名的Actor模型(实际上是受Erlang的actor模型启发)的基础上构建的。因此,Akka流可以利用其经过战斗考验的弹性,弹性,事件驱动和响应能力。
Actor模型简介
Actor由状态(state)、行为(Behavior)和邮箱(mailBox)三部分组成
- 状态:Actor中的状态指的是Actor对象的变量信息,状态由Actor自己管理,避免了并发环境下的锁和内存原子性等问题
- 行为:行为指定的是Actor中计算逻辑,通过Actor接收到消息来改变Actor的状态
- 邮箱:邮箱是Actor和Actor之间的通信桥梁,邮箱内部通过FIFO消息队列来存储发送方Actor消息,接受方Actor从邮箱队列中获取消息
Actor 模型及其说明
image.png- Akka 处理并发的方法基于 Actor 模型。(示意图)
- 在基于 Actor 的系统里,所有的事物都是 Actor,就好像在面向对象设计里面所有的事物都是 对象一样。
- Actor 模型是作为一个并发模型设计和架构的。Actor 与 Actor 之间只能通过消息通信,如图 的信封
- Actor 与 Actor 之间只能用消息进行通信,当一个 Actor 给另外一个 Actor 发消息,消息是有 顺序的 (消息队列),只需要将消息投寄的相应的邮箱即可。
- 怎么处理消息是由接收消息的 Actor 决定的,发送消息 Actor 可以等待回复,也可以异步处理 【ajax】
- ActorSystem 的职责是负责创建并管理其创建的 Actor, ActorSystem 是单例的 (可以 ActorSystem 是一个工厂,专门创建 Actor),一个 JVM 进程中有一个即可,而 Acotr 是可以有多个的。
- Actor 模型是对并发模型进行了更高的抽象。
- Actor 模型是异步、非阻塞、高性能的事件驱动编程模型。[案例:说明 什么是异步、非阻塞,最 经典的案例就是 ajax 异步请求处理]
- Actor 模型是轻量级事件处理 (1GB 内存可容纳百万级别个 Actor),因此处理大并发性能高.
Actor 模型工作机制说明
image.png说明了 Actor 模型的工作机制 (对应上图)
- ActorySystem 创建 Actor
- ActorRef: 可以理解成是 Actor 的代理或者引用。消息是通过 ActorRef 来发送,而不能通过 Actor 发 送消息,通过哪个 ActorRef 发消息,就表示把该消息发给哪个 Actor
- 消息发送到 Dispatcher Message (消息分发器),它得到消息后,会将消息进行分发到对应的 MailBox。(注: Dispatcher Message 可以理解成是一个线程池,MailBox 可以理解成是消息队列,可以缓 冲多个消息,遵守 FIFO)
- Actor 可以通过 receive 方法来获取消息,然后进行处理。 Actor 模型的消息机制 (对应上图)
- 每一个消息就是一个 Message 对象。Message 继承了 Runable, 因为 Message 就是线程类。 2) 从 Actor 模型工作机制看上去很麻烦,但是程序员编程时只需要编写 Actor 就可以了,其它的交 给 Actor 模型完成即可。
- A Actor 要给 B Actor 发送消息,那么 A Actor 要先拿到 (也称为持有) B Actor 的 代理对象 ActorRef 才能发送消息
spring boot集成akka
根据自身的经验和理解,提供Akka与Spring集成的方案。本文不说明Spring框架的具体使用,并从Spring已经配置完备的情况开始叙述。
Actor系统——ActorSystem
什么是ActorSystem?根据Akka官网的描述——ActorSystem是一个重量级的结构体,可以用于分配1到N个线程,所以每个应用都需要创建一个ActorSystem。通常而言,使用以下代码来创建ActorSystem。
ActorSystem system = ActorSystem.create("Hello");
不过对于接入Spring而言,由IOC(Inversion of Control,控制反转)方式会更接地气,你可以这样:
@Configuration
class ApplicationConfiguration {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private SpringExtension springExtension;
@Bean
public ActorSystem actorSystem() {
ActorSystem actorSystem = ActorSystem.create("actor-system", akkaConfiguration());
springExtension.initialize(applicationContext);
return actorSystem;
}
@Bean
public Config akkaConfiguration() {
return ConfigFactory.load();
}
}
然后在你需要的地方依赖注入即可。
Actor编程模型
我们可以通过以下代码(代码片段借用了Akka官网的例子)创建一个简单的Actor例子。
Greeter是代表问候者的Actor:
public class Greeter extends UntypedActor {
public static enum Msg {
GREET, DONE;
}
@Override
public void onReceive(Object msg) {
if (msg == Msg.GREET) {
System.out.println("Hello World!");
getSender().tell(Msg.DONE, getSelf());
} else
unhandled(msg);
}
}
一般情况下我们的Actor都需要继承自UntypedActor,并实现其onReceive方法。onReceive用于接收消息,你可以在其中实现对消息的匹配并做不同的处理。
HelloWorld是用于向Greeter发送问候消息的访客:
public class HelloWorld extends UntypedActor {
@Override
public void preStart() {
// create the greeter actor
final ActorRef greeter = getContext().actorOf(Props.create(Greeter.class), "greeter");
// tell it to perform the greeting
greeter.tell(Greeter.Msg.GREET, getSelf());
}
@Override
public void onReceive(Object msg) {
if (msg == Greeter.Msg.DONE) {
// when the greeter is done, stop this actor and with it the application
getContext().stop(getSelf());
} else
unhandled(msg);
}
}
有了Actor之后,我们可以这样使用它:
ActorRef a = system.actorOf(Props.create(HelloWorld.class), "helloWorld");
在HelloWorld的preStart实现中,获取了Greeter的ActorRef(Actor的引用)并向Greeter发送了问候的消息,Greeter收到问候消息后,会先打印Hello World!,然后向HelloWorld回复完成的消息,HelloWorld得知Greeter完成了向世界问好这个伟大的任务后,就结束了自己的生命。HelloWorld的例子用编程API的方式告诉了我们如何使用Actor及发送、接收消息。为了便于描述与Spring的集成,下面再介绍一个例子。
CountingActor(代码主体借用自Akka官网)是用于计数的Actor,见代码清单所示。
@Named("CountingActor")
@Scope("prototype")
public class CountingActor extends UntypedActor {
public static class Count {
}
public static class Get {
}
// the service that will be automatically injected
@Resource
private CountingService countingService;
private int count = 0;
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof Count) {
count = countingService.increment(count);
} else if (message instanceof Get) {
getSender().tell(count, getSelf());
} else {
unhandled(message);
}
}
}
CountingActor用于接收Count消息进行计数,接收Get消息回复给发送者当前的计数值。CountingService是用于计数的接口,其定义如下:
public interface CountingService {
/**
* 计数
* @param count
* @return
*/
int increment(int count);
}
CountingService的具体实现是CountingServiceImpl,其实现如下:
@Service("countingService")
public class CountingServiceImpl implements CountingService {
private static Logger logger = LoggerFactory.getLogger(CountingServiceImpl.class);
/*
* (non-Javadoc)
*
* @see com.elong.sentosa.metadata.service.CountingService#increment(int)
*/
@Override
public int increment(int count) {
logger.info("increase " + count + "by 1.");
return count + 1;
}
}
CountingActor通过注解方式注入了CountingService,CountingActor的计数实际是由CountingService完成。
细心的同学可能发现了CountingActor使用了注解Named,这里为什么没有使用@Service或者@Component等注解呢?由于Akka的Actor在初始化的时候必须使用System或者Context的工厂方法actorOf创建新的Actor实例,不能使用构造器来初始化,而使用Spring的Service或者Component注解,会导致使用构造器初始化Actor,所以会抛出以下异常:
akka.actor.ActorInitializationException: You cannot create an instance of [com.elong.metadata.akka.actor.CountingActor] explicitly using the constructor (new). You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.
如果我们不能使用@Service或者@Component,也不能使用XML配置的方式使用(与注解一个道理),那么我们如何使用CountingActor提供的服务呢?
IndirectActorProducer接口
IndirectActorProducer是Akka提供的Actor生成接口,从其名字我们知道Akka给我们指出了另一条道路——石头大了绕着走!通过实现IndirectActorProducer接口我们可以定制一些Actor的生成方式,与Spring集成可以这样实现它,见代码清单所示。
public class SpringActorProducer implements IndirectActorProducer {
private final ApplicationContext applicationContext;
private final String actorBeanName;
private final Object[] args;
public SpringActorProducer(ApplicationContext applicationContext, String actorBeanName, Object ... args) {
this.applicationContext = applicationContext;
this.actorBeanName = actorBeanName;
this.args = args;
}
public Actor produce() {
return (Actor) applicationContext.getBean(actorBeanName, args);
}
public Class<? extends Actor> actorClass() {
return (Class<? extends Actor>) applicationContext.getType(actorBeanName);
}
}
SpringActorProducer的实现主要借鉴了Akka官方文档,我这里对其作了一些扩展以便于支持构造器带有多个参数的情况。从其实现看到实际是利用了ApplicationContext提供的getBean方式实例化Actor。
这里还有两个问题:一、ApplicationContext如何获取和设置?二、如何使用SpringActorProducer生成Spring需要的Actor实例?
对于第一个问题,我们可以通过封装SpringActorProducer并实现ApplicationContextAware接口的方式获取ApplicationContext;对于第二个问题,我们知道Akka中的所有Actor实例都是以Props作为配置参数开始的,这里以SpringActorProducer为代理生成我们需要的Actor的Props。
SpringExt实现了以上思路,见代码清单所示。
@Component("springExt")
public class SpringExt implements Extension, ApplicationContextAware {
private ApplicationContext applicationContext;
/**
* Create a Props for the specified actorBeanName using the
* SpringActorProducer class.
*
* @param actorBeanName
* The name of the actor bean to create Props for
* @return a Props that will create the named actor bean using Spring
*/
public Props props(String actorBeanName, Object ... args) {
return Props.create(SpringActorProducer.class, applicationContext, actorBeanName, args);
}
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
应用例子
经过了以上的铺垫,现在你可以使用创建好的CountingActor了,首先你需要在你的业务类中注入ActorSystem和SpringExt。
@Autowired
private ActorSystem actorSystem;
@Autowired
private SpringExt springExt;
然后我们使用CountingActor进行计数,代码如下:
ActorRef counter = actorSystem.actorOf(springExt.props("CountingActor"), "counter");
// Create the "actor-in-a-box"
final Inbox inbox = Inbox.create(system);
// tell it to count three times
inbox.send(counter, new Count());
inbox.send(counter, new Count());
inbox.send(counter, new Count());
// print the result
FiniteDuration duration = FiniteDuration.create(3, TimeUnit.SECONDS);
Future<Object> result = ask(counter, new Get(), Timeout.durationToTimeout(duration));
try {
System.out.println("Got back " + Await.result(result, duration));
} catch (Exception e) {
System.err.println("Failed getting result: " + e.getMessage());
throw e;
}
输出结果为:
Got back 3
此处为自己给自己发消息。
小结
本文只是最简单的Akka集成Spring的例子,Akka的remote、cluster、persistence、router等机制都可以应用。
网友评论