美文网首页
flink rpc akka 源代码刨析

flink rpc akka 源代码刨析

作者: 邵红晓 | 来源:发表于2020-04-28 14:43 被阅读0次

注意:Flink内部节点之间的通信是用Akka,比如JobManager和TaskManager之间的通信。而operator之间的数据传输是利用NettyFlink uses Akka for RPC between components (JobManager/TaskManager/ResourceManager). Flink does not use Akka for data transport.

  • 发送端
    RpcGateway
    RpcEndpoint(绑定一个rpcserver)
protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
        this.rpcService = checkNotNull(rpcService, "rpcService");
        this.endpointId = checkNotNull(endpointId, "endpointId");

        this.rpcServer = rpcService.startServer(this);
The main thread executor to be used to execute future callbacks in the main thread
 of the executing rpc server
        this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
    }

注意:The main thread executor to be used to execute future callbacks in the main thread of the executing rpc server
对于同一个flink的 rpcEndpoint(actor) 调用都是在同一个主线里串行执行,因此不会有并发问题

RpcServer
AkkaRpcService implements RpcService
AkkaRpcService #startServer 启动了akka actor

if (rpcEndpoint instanceof FencedRpcEndpoint) {
            akkaRpcActorProps = Props.create(
                FencedAkkaRpcActor.class,
                rpcEndpoint,
                terminationFuture,
                getVersion(),
                configuration.getMaximumFramesize());
        } else {
            akkaRpcActorProps = Props.create(
                AkkaRpcActor.class,
                rpcEndpoint,
                terminationFuture,
                getVersion(),
                configuration.getMaximumFramesize());
        }
        ActorRef actorRef;
        synchronized (lock) {
            checkState(!stopped, "RpcService is stopped");
注意:创建akka actor,利用父actor上下文创建子actor
            actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId());
            actors.put(actorRef, rpcEndpoint);
        }

创建FencedAkkaRpcActor(需要验证rpc tocken,The rpc is then only executed if the attached fencing token equals the endpoint's own token)或AkkaRpcActor(Akka rpc actor which receivesAkka rpc actor which receives,不需要验证)

AkkaInvocationHandler implements InvocationHandler反射实现类
AkkaInvocationHandler # invoke(),真正执行rpc远程调用逻辑,有
1.Patterns.ask(rpcEndpoint, message, timeout.toMilliseconds()))异步调用方式有返回值
2.rpcEndpoint.tell(message, ActorRef.noSender())无返回值调用

  • 接收端
    AkkaRpcActor extends AbstractActor AbstractActor 是akka中actor的抽象实现类
    实现了createReceive方法
    @Override
    public Receive createReceive() {
        return ReceiveBuilder.create()
            .match(RemoteHandshakeMessage.class, this::handleHandshakeMessage) 握手消息
            .match(ControlMessages.class, this::handleControlMessage) 控制起停消息
            .matchAny(this::handleMessage)
            .build();
    }
private void handleMessage(final Object message) {
        if (state.isRunning()) {
            mainThreadValidator.enterMainThread();
            try {
                handleRpcMessage(message);
            } finally {
                mainThreadValidator.exitMainThread();
            }
        } else {
            log.info("The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.",
                rpcEndpoint.getClass().getName(),
                message.getClass().getName());

            sendErrorIfSender(new AkkaRpcException(
                String.format("Discard message, because the rpc endpoint %s has not been started yet.", rpcEndpoint.getAddress())));
        }
    }

1.handleMessage真正消息处理端
handleRpcInvocation((RpcInvocation) message);->
result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
同样执行的是动态代理调用方式
2.mainThreadValidator.enterMainThread(),这里会获取RpcEndpoint,的主线程并且进入,使用主线程进行消息处理

总结:

akka如何解决多线程问题并发,数据一致性问题?

  • 数据共享一致性,锁的问题
    我们一开始说过并发导致最大的问题就是对共享数据的操作,我们在面对并发问题时多采用的是用锁去保证共享数据的一致性,但这同样也会带来其他相关问题,比如要去考虑锁的粒度(对方法,程序块等),锁的形式(读锁,写锁等)等问题
  • Actor就不导致这些问题,首先Actor的消息特性(不可变)就决定了在与Actor通信上不会有共享数据的困扰,另外在Actor内部是串行处理消息的,就可以保证Actor内部状态数据的一致性,实现了jvm多线程数据一致性
  • 一个Actor它可能会有很多线程同时向它发送消息,之前我们也说到Actor本身是串行处理的消息的,那它是如何保障这种机制的呢?
    Mailbox.scala 内部维护了一个messageQueue这样的消息队列,消息队列保证了消息执行的异步性,
    processMailbox 方法采用递归的方式逐条取消息并处理。
    actor接受消息支持并发,处理消息是单个线程执行的,所以保证了actor内部状态的一致性
递归调用处理mailbox消息
/**
   * Process the messages in the mailbox
   */
  @tailrec private final def processMailbox(
      left: Int = java.lang.Math.max(dispatcher.throughput, 1),
      deadlineNs: Long =
        if (dispatcher.isThroughputDeadlineTimeDefined)
          System.nanoTime + dispatcher.throughputDeadlineTime.toNanos
        else 0L): Unit =
    if (shouldProcessMessage) {
      val next = dequeue()
      if (next ne null) {
        if (Mailbox.debug) println(actor.self + " processing message " + next)
        actor.invoke(next)
        if (Thread.interrupted())
          throw new InterruptedException("Interrupted while processing actor messages")
        processAllSystemMessages()
        if ((left > 1) && (!dispatcher.isThroughputDeadlineTimeDefined || (System.nanoTime - deadlineNs) < 0))
          processMailbox(left - 1, deadlineNs)
      }
    }

是否有线程正在调度执行该MailBox的任务,若没有则去更改状态为以调度,直到被其他线程抢占或者更改成功
@tailrec
  final def setAsScheduled(): Boolean = {  
    val s = currentStatus
    /*
     * Only try to add Scheduled bit if pure Open/Suspended, not Closed or with
     * Scheduled bit already set.
     */
    if ((s & shouldScheduleMask) != Open) false
    else updateStatus(s, s | Scheduled) || setAsScheduled()
  }


示例代码

package org.apache.flink.runtime.rpc;

import akka.actor.ActorSystem;
import akka.actor.Terminated;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * @author shao.hongxiao
 */
public class RpcTest {
    private static final Time TIMEOUT = Time.seconds(10L);
    private static ActorSystem actorSystem = null;
    private static RpcService rpcService = null;

    // 定义通信协议
    public interface HelloGateway extends RpcGateway {
        String hello();
    }

    public interface HiGateway extends RpcGateway {
        String hi();
    }

    // 具体实现
    public static class HelloRpcEndpoint extends RpcEndpoint implements HelloGateway {
        protected HelloRpcEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        @Override
        public String hello() {
            return "hello";
        }
    }

    public static class HiRpcEndpoint extends RpcEndpoint implements HiGateway {
        protected HiRpcEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        @Override
        public String hi() {
            return "hi";
        }
    }

    @BeforeClass
    public static void setup() {
        actorSystem = AkkaUtils.createDefaultActorSystem();
        // 创建 RpcService, 基于 AKKA 的实现
        rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
    }

    @AfterClass
    public static void teardown() throws Exception {

        final CompletableFuture<Void> rpcTerminationFuture = rpcService.stopService();
        final CompletableFuture<Terminated> actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate());

        FutureUtils
            .waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture))
            .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
    }

    @Test
    public void test() throws Exception {
        HelloRpcEndpoint helloEndpoint = new HelloRpcEndpoint(rpcService);
        HiRpcEndpoint hiEndpoint = new HiRpcEndpoint(rpcService);

        helloEndpoint.start();
        //获取 endpoint 的 self gateway
        HelloGateway helloGateway = helloEndpoint.getSelfGateway(HelloGateway.class);
        String hello = helloGateway.hello();
        System.out.println(hello);

        hiEndpoint.start();
        // 通过 endpoint 的地址获得代理
        HiGateway hiGateway = rpcService.connect(hiEndpoint.getAddress(),HiGateway.class).get();
        String hi = hiGateway.hi();
        System.out.println(hi);
    }
}

import akka.actor.{Actor, ActorSystem, Props}
import akka.event.Logging
import akka.pattern.Patterns

import scala.util.{Failure, Success}

trait Action{
  val message: String
  val time: Int
}
case class TurnOnLight(time: Int) extends Action {   // 开灯消息
  val message = "Turn on the living room light"
}
case class BoilWater(time: Int) extends Action {   // 烧水消息
  val message = "Burn a pot of water"
}
class RobotActor extends Actor {
  val log = Logging(context.system, this)
  def receive: Receive = { //机器人接受指令
    case t: TurnOnLight => log.info(s"${t.message} after ${t.time} hour")
    case b: BoilWater => log.info(s"${b.message} after ${b.time} hour")
    case s:String  => log.info(s"I can not handle this message: ${s}")
  }
}

/**
  * https://scala.cool/tags/Akka/
  * https://cloud.tencent.com/developer/article/1460210
  *
  * 对并发模型进行了更高的抽象
  * 异步、非阻塞、高性能的事件驱动编程模型
  * 轻量级事件处理(1GB内存可容纳百万级别个Actor)
  *JVM中的Actor有以下几个特点:
  * 每个Actor都有对应一个邮箱
  * Actor是串行处理消息的
  * Actor中的消息是不可变的
  *
  * akka解决多线程问题
  *  1. 数据共享,锁的问题
  * 我们一开始说过并发导致最大的问题就是对共享数据的操作,我们在面对并发问题时多采用的是用锁去保证共享数据的一致性,
  * 但这同样也会带来其他相关问题,比如要去考虑锁的粒度(对方法,程序块等),锁的形式(读锁,写锁等)等问题,
  * 这些问题对并发程序来说是至关重要的,但一个初写并发程序的程序员来说,往往不能掌控的很好,这无疑给程序员在编程上提高了复杂性,
  * 而且还不容易掌控,但使用Actor就不导致这些问题,首先Actor的消息特性就觉得了在与Actor通信上不会有共享数据的困扰,
  * 另外在Actor内部是串行处理消息的,同样不会对Actor内的数据造成污染,用Actor编写并发程序无疑大大降低了编码的复杂度。
  */
object Demo {
  def main(args: Array[String]): Unit = {
    val actorSyatem = ActorSystem("flink")
    val robotActor = actorSyatem.actorOf(Props(new RobotActor()), "robot") //创建一个机器人
    // tel 方式
    robotActor ! TurnOnLight(1) //给机器人发送一个开灯命令
    robotActor ! BoilWater(2) //给机器人发送一个烧水命令
    robotActor ! "who are you" //给机器人发送一个任意命令
    import java.util.concurrent.TimeUnit

    import scala.concurrent.duration.Duration
    val t = Duration.create(1, TimeUnit.SECONDS)

    //使用ask发送消息,actor处理完,必须有返回(超时时间5秒),异步处理
    val res =  Patterns.ask(robotActor, "Hello,world", t)
    import scala.concurrent.ExecutionContext.Implicits.global
    res onComplete {
      case Success(result) => println(result)
      case Failure(e) => println("error: " + e.getMessage)
    }
    actorSyatem terminate ()
  }
}

参考
https://cloud.tencent.com/developer/article/1460210
https://likehui.top/2019/09/05/akka-%E6%A0%B8%E5%BF%83%E7%9F%A5%E8%AF%86%E6%A2%B3%E7%90%86/
https://scala.cool/tags/Akka/

相关文章

  • flink rpc akka 源代码刨析

    注意:Flink内部节点之间的通信是用Akka,比如JobManager和TaskManager之间的通信。而op...

  • Apache Flink源码解析 (七)Flink RPC的底层

    Prerequisites Flink的RPC服务是基于Akka Remote实现的。一个简单的Akka Remo...

  • Spark通信框架Spark Network Common

    Spark Network 模块分析 为什么用Netty通信框架代替Akka 一直以来,基于Akka实现的RPC通...

  • Flink Parallelism和Slot理解

    相关博客:Flink工作原理 1 问题出现 Caused by: akka.pattern.AskTimeoutE...

  • 【Flink】flink 内部 Akka and Actors

    使用Akka,所有远程过程调用现在都实现为异步消息。 这主要影响JobManager,TaskManager和Jo...

  • 自我刨析

    沉下心来思考,接受的教育、所处的环境、社会文化的熏陶,正面的东西确实不是很多,特别是出了社会以后更加如此,但是更令...

  • 自我刨析

    总感觉每天都很忙,总感觉每天的时间都不够用。总在反问时间都去哪了?除了睡觉的那几个小时之外,几乎每个小时都是...

  • 自我刨析

    “文化属性”,我喜欢这个词的感觉。 我曾经多次预想过,退休后隐居山林。不过,这个计划都被山野之地的各种不...

  • 自我刨析

    近期待业,所以在思考自己的职业规划,职业规划第一步就是自我认知,只有知道自己是个什么样人,喜欢和适合做什么,才能定...

  • 自我刨析

    既然养成自卑胆小懦弱的性格,那就要改。 首先放开自我,没什么大不了。 其次锻炼意志,改掉懒惰。 神经质问题,把精力...

网友评论

      本文标题:flink rpc akka 源代码刨析

      本文链接:https://www.haomeiwen.com/subject/tmuvwhtx.html