注意:Flink内部节点之间的通信是用Akka,比如JobManager和TaskManager之间的通信。而operator之间的数据传输是利用NettyFlink uses Akka for RPC between components (JobManager/TaskManager/ResourceManager). Flink does not use Akka for data transport.
- 发送端
RpcGateway
RpcEndpoint(绑定一个rpcserver)
protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
this.rpcService = checkNotNull(rpcService, "rpcService");
this.endpointId = checkNotNull(endpointId, "endpointId");
this.rpcServer = rpcService.startServer(this);
The main thread executor to be used to execute future callbacks in the main thread
of the executing rpc server
this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
}
注意:The main thread executor to be used to execute future callbacks in the main thread of the executing rpc server
对于同一个flink的 rpcEndpoint(actor
) 调用都是在同一个主线里串行执行,因此不会有并发问题
RpcServer
AkkaRpcService implements RpcService
AkkaRpcService #startServer 启动了akka actor
if (rpcEndpoint instanceof FencedRpcEndpoint) {
akkaRpcActorProps = Props.create(
FencedAkkaRpcActor.class,
rpcEndpoint,
terminationFuture,
getVersion(),
configuration.getMaximumFramesize());
} else {
akkaRpcActorProps = Props.create(
AkkaRpcActor.class,
rpcEndpoint,
terminationFuture,
getVersion(),
configuration.getMaximumFramesize());
}
ActorRef actorRef;
synchronized (lock) {
checkState(!stopped, "RpcService is stopped");
注意:创建akka actor,利用父actor上下文创建子actor
actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId());
actors.put(actorRef, rpcEndpoint);
}
创建FencedAkkaRpcActor(需要验证rpc tocken,The rpc is then only executed if the attached fencing token equals the endpoint's own token
)或AkkaRpcActor(Akka rpc actor which receivesAkka rpc actor which receives
,不需要验证)
AkkaInvocationHandler implements InvocationHandler反射实现类
AkkaInvocationHandler # invoke(),真正执行rpc远程调用逻辑,有
1.Patterns.ask(rpcEndpoint, message, timeout.toMilliseconds()))
异步调用方式有返回值
2.rpcEndpoint.tell(message, ActorRef.noSender())
无返回值调用
- 接收端
AkkaRpcActor extends AbstractActor
AbstractActor 是akka中actor的抽象实现类
实现了createReceive方法
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(RemoteHandshakeMessage.class, this::handleHandshakeMessage) 握手消息
.match(ControlMessages.class, this::handleControlMessage) 控制起停消息
.matchAny(this::handleMessage)
.build();
}
private void handleMessage(final Object message) {
if (state.isRunning()) {
mainThreadValidator.enterMainThread();
try {
handleRpcMessage(message);
} finally {
mainThreadValidator.exitMainThread();
}
} else {
log.info("The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.",
rpcEndpoint.getClass().getName(),
message.getClass().getName());
sendErrorIfSender(new AkkaRpcException(
String.format("Discard message, because the rpc endpoint %s has not been started yet.", rpcEndpoint.getAddress())));
}
}
1.handleMessage
真正消息处理端
handleRpcInvocation((RpcInvocation) message);
->
result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
同样执行的是动态代理调用方式
2.mainThreadValidator.enterMainThread()
,这里会获取RpcEndpoint,的主线程并且进入,使用主线程进行消息处理
总结:
akka如何解决多线程问题并发,数据一致性问题?
- 数据共享一致性,锁的问题
我们一开始说过并发导致最大的问题就是对共享数据的操作,我们在面对并发问题时多采用的是用锁去保证共享数据的一致性,但这同样也会带来其他相关问题,比如要去考虑锁的粒度(对方法,程序块等),锁的形式(读锁,写锁等)等问题 - Actor就不导致这些问题,首先Actor的消息特性(不可变)就决定了在与Actor通信上不会有共享数据的困扰,另外在Actor内部是串行处理消息的,就可以保证Actor内部状态数据的一致性,实现了jvm多线程数据一致性
- 一个Actor它可能会有很多线程同时向它发送消息,之前我们也说到Actor本身是串行处理的消息的,那它是如何保障这种机制的呢?
Mailbox.scala 内部维护了一个messageQueue这样的消息队列,消息队列保证了消息执行的异步性,
processMailbox 方法采用递归的方式逐条取消息并处理。
actor接受消息支持并发,处理消息是单个线程执行的,所以保证了actor内部状态的一致性
递归调用处理mailbox消息
/**
* Process the messages in the mailbox
*/
@tailrec private final def processMailbox(
left: Int = java.lang.Math.max(dispatcher.throughput, 1),
deadlineNs: Long =
if (dispatcher.isThroughputDeadlineTimeDefined)
System.nanoTime + dispatcher.throughputDeadlineTime.toNanos
else 0L): Unit =
if (shouldProcessMessage) {
val next = dequeue()
if (next ne null) {
if (Mailbox.debug) println(actor.self + " processing message " + next)
actor.invoke(next)
if (Thread.interrupted())
throw new InterruptedException("Interrupted while processing actor messages")
processAllSystemMessages()
if ((left > 1) && (!dispatcher.isThroughputDeadlineTimeDefined || (System.nanoTime - deadlineNs) < 0))
processMailbox(left - 1, deadlineNs)
}
}
是否有线程正在调度执行该MailBox的任务,若没有则去更改状态为以调度,直到被其他线程抢占或者更改成功
@tailrec
final def setAsScheduled(): Boolean = {
val s = currentStatus
/*
* Only try to add Scheduled bit if pure Open/Suspended, not Closed or with
* Scheduled bit already set.
*/
if ((s & shouldScheduleMask) != Open) false
else updateStatus(s, s | Scheduled) || setAsScheduled()
}
示例代码
package org.apache.flink.runtime.rpc;
import akka.actor.ActorSystem;
import akka.actor.Terminated;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* @author shao.hongxiao
*/
public class RpcTest {
private static final Time TIMEOUT = Time.seconds(10L);
private static ActorSystem actorSystem = null;
private static RpcService rpcService = null;
// 定义通信协议
public interface HelloGateway extends RpcGateway {
String hello();
}
public interface HiGateway extends RpcGateway {
String hi();
}
// 具体实现
public static class HelloRpcEndpoint extends RpcEndpoint implements HelloGateway {
protected HelloRpcEndpoint(RpcService rpcService) {
super(rpcService);
}
@Override
public String hello() {
return "hello";
}
}
public static class HiRpcEndpoint extends RpcEndpoint implements HiGateway {
protected HiRpcEndpoint(RpcService rpcService) {
super(rpcService);
}
@Override
public String hi() {
return "hi";
}
}
@BeforeClass
public static void setup() {
actorSystem = AkkaUtils.createDefaultActorSystem();
// 创建 RpcService, 基于 AKKA 的实现
rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
}
@AfterClass
public static void teardown() throws Exception {
final CompletableFuture<Void> rpcTerminationFuture = rpcService.stopService();
final CompletableFuture<Terminated> actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate());
FutureUtils
.waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture))
.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
}
@Test
public void test() throws Exception {
HelloRpcEndpoint helloEndpoint = new HelloRpcEndpoint(rpcService);
HiRpcEndpoint hiEndpoint = new HiRpcEndpoint(rpcService);
helloEndpoint.start();
//获取 endpoint 的 self gateway
HelloGateway helloGateway = helloEndpoint.getSelfGateway(HelloGateway.class);
String hello = helloGateway.hello();
System.out.println(hello);
hiEndpoint.start();
// 通过 endpoint 的地址获得代理
HiGateway hiGateway = rpcService.connect(hiEndpoint.getAddress(),HiGateway.class).get();
String hi = hiGateway.hi();
System.out.println(hi);
}
}
import akka.actor.{Actor, ActorSystem, Props}
import akka.event.Logging
import akka.pattern.Patterns
import scala.util.{Failure, Success}
trait Action{
val message: String
val time: Int
}
case class TurnOnLight(time: Int) extends Action { // 开灯消息
val message = "Turn on the living room light"
}
case class BoilWater(time: Int) extends Action { // 烧水消息
val message = "Burn a pot of water"
}
class RobotActor extends Actor {
val log = Logging(context.system, this)
def receive: Receive = { //机器人接受指令
case t: TurnOnLight => log.info(s"${t.message} after ${t.time} hour")
case b: BoilWater => log.info(s"${b.message} after ${b.time} hour")
case s:String => log.info(s"I can not handle this message: ${s}")
}
}
/**
* https://scala.cool/tags/Akka/
* https://cloud.tencent.com/developer/article/1460210
*
* 对并发模型进行了更高的抽象
* 异步、非阻塞、高性能的事件驱动编程模型
* 轻量级事件处理(1GB内存可容纳百万级别个Actor)
*JVM中的Actor有以下几个特点:
* 每个Actor都有对应一个邮箱
* Actor是串行处理消息的
* Actor中的消息是不可变的
*
* akka解决多线程问题
* 1. 数据共享,锁的问题
* 我们一开始说过并发导致最大的问题就是对共享数据的操作,我们在面对并发问题时多采用的是用锁去保证共享数据的一致性,
* 但这同样也会带来其他相关问题,比如要去考虑锁的粒度(对方法,程序块等),锁的形式(读锁,写锁等)等问题,
* 这些问题对并发程序来说是至关重要的,但一个初写并发程序的程序员来说,往往不能掌控的很好,这无疑给程序员在编程上提高了复杂性,
* 而且还不容易掌控,但使用Actor就不导致这些问题,首先Actor的消息特性就觉得了在与Actor通信上不会有共享数据的困扰,
* 另外在Actor内部是串行处理消息的,同样不会对Actor内的数据造成污染,用Actor编写并发程序无疑大大降低了编码的复杂度。
*/
object Demo {
def main(args: Array[String]): Unit = {
val actorSyatem = ActorSystem("flink")
val robotActor = actorSyatem.actorOf(Props(new RobotActor()), "robot") //创建一个机器人
// tel 方式
robotActor ! TurnOnLight(1) //给机器人发送一个开灯命令
robotActor ! BoilWater(2) //给机器人发送一个烧水命令
robotActor ! "who are you" //给机器人发送一个任意命令
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
val t = Duration.create(1, TimeUnit.SECONDS)
//使用ask发送消息,actor处理完,必须有返回(超时时间5秒),异步处理
val res = Patterns.ask(robotActor, "Hello,world", t)
import scala.concurrent.ExecutionContext.Implicits.global
res onComplete {
case Success(result) => println(result)
case Failure(e) => println("error: " + e.getMessage)
}
actorSyatem terminate ()
}
}
参考
https://cloud.tencent.com/developer/article/1460210
https://likehui.top/2019/09/05/akka-%E6%A0%B8%E5%BF%83%E7%9F%A5%E8%AF%86%E6%A2%B3%E7%90%86/
https://scala.cool/tags/Akka/
网友评论