美文网首页
jstorm 源码分析

jstorm 源码分析

作者: 走在成长的道路上 | 来源:发表于2020-12-11 09:19 被阅读0次

    NimbusSupervisor 关系

    Nimbus 与 Supervisor 关系

    NimbusServerThrift 接口主要用与客户端访问 Nimbus 服务,比如提交 Topology 任务等,NimbusSupervisor 之间通过 ZooKeeper 进行交互,其中 NimbusSupervisor 都内置一个 Httpserver 用于日志下载等。

    Zookeeper 中的目录结构如下:

    + ${zk_root_dir}
    | ---- + topology: 记录集群中所有正在运行的 topology 数据
    |      | ---- + ${topology_id}: 指定 topology 的相关信息(名称、开始运行时间、运行状态等)
    | ---- + supervisors: 记录集群中所有 supervisor 节点的心跳信息
    |      | ---- + ${supervivor_id}: 指定 supervisor 的心跳信息(心跳时间、主机名称、所有 worker 的端口号、运行时间等)
    | ---- + assignments: 记录提交给集群的 topology 任务分配信息
    |      | ---- + ${topology_id}: 指定 topology 的任务分配信息(对应 nimbus 上的代码目录、所有 task 的启动时间、每个 task 与节点和端口的映射关系等)
    | ---- + assignments_bak: 记录提交给集群的 topology 任务分配信息的备份
    | ---- + tasks: 记录集群中所有 topology 的 task 信息
    |      | ---- + ${topology_id}: 指定 topology 的所有 task 信息
    |      |      | ---- + ${task_id}: 指定 task 所属的组件 ID 和类型(spout/bolt)
    | ---- + taskbeats: 记录集群中所有 task 的心跳信息
    |      | ---- + ${topology_id}: 记录指定 topology 下所有 task 的心跳信息、topologyId,以及 topologyMasterId 等
    |      |      | ---- + ${task_id}: 指定 task 的心跳信息(最近一次心跳时间、运行时长、统计信息等)
    | ---- + taskerrors: 记录集群中所有 topology 的 task 运行错误信息
    |      | ---- + ${topology_id}: 指定 topology 下所有 task 的运行错误信息
    |      |      | ---- + ${task_id}: 指定 task 的运行错误信息
    | ---- + metrics: 记录集群中所有 topology 的 metricsId
    | ---- + blobstore: 记录集群对应的 blobstore 信息,用于协调数据一致性
    | ---- + gray_upgrade: 记录灰度发布中的 topologyId
    

    Worker 内部结构

    worker 内部结构

    在多个 worker 执行某个 Topology 过程中,worker 会通过占用的 slot.port 来封装 TaskTrasfer 类负责 SpoutBolt 之间跨 worker 执行时的数据传输。每个 SpoutBolt 都是一个独立的 Thread, 然后这些都会被 TaskShutdownDameon 管理起来。

    jstorm 中,单个 task 会按照 Topology 结构组成了 TaskShutdownDameon 列表,最终放置到 WorkerData 中进行存储管理。

    Worker 启动时,会主动监听分配的 slot.port 端口,实现为 IConnection,用于获取来自其他 Worker 的消息内容, 并绑定到 VirtualPortCtrlDispatch 与当前 worker 中的 DisruptorQueue 相关联起来(主要是传递控制相关的消息), 参考 Worker#startDispatchThread() 函数实现:

    // 构建控制使用的内部 MQ 实例
    DisruptorQueue recvControlQueue = DisruptorQueue.mkInstance("Dispatch-control", ProducerType.MULTI,
            queueSize, waitStrategy, false, 0, 0);
    
    //metric for recvControlQueue
    QueueGauge revCtrlGauge = new QueueGauge(recvControlQueue, MetricDef.RECV_CTRL_QUEUE);
    JStormMetrics.registerWorkerMetric(JStormMetrics.workerMetricName(MetricDef.RECV_CTRL_QUEUE, MetricType.GAUGE), new AsmGauge(
            revCtrlGauge));
    
    // 2. 为当前 worker 基于 Netty 创建并返回一个 Socket 连接用于接收消息
    IConnection recvConnection = context.bind(topologyId, workerData.getPort(), workerData.getDeserializeQueues(),
            recvControlQueue, false, workerData.getTaskIds());
    workerData.setRecvConnection(recvConnection);
    
    // create recvice control messages's thread
    // 3. 启动一个线程循环消费 worker 接收到的消息,并应用 DisruptorRunnable.onEvent 方法,
    //    最终调用的是 VirtualPortCtrlDispatch.handleEvent 方法,将消息投递给指定 task 的消息队列
    RunnableCallback recvControlDispather = new VirtualPortCtrlDispatch(
            workerData, recvConnection, recvControlQueue, MetricDef.RECV_THREAD);
    

    数据消息通过 TaskTransfer#serialize() 函数将数据转化为 TaskMessage 传输到对应的 IConnection 下游。具体如下:

    protected void serialize(KryoTupleSerializer serializer, Object event) {
        long start = serializeTimer.getTime();
        try {
            ITupleExt tuple = (ITupleExt) event;
            int targetTaskId = tuple.getTargetTaskId();
            // 获取事件任务 ID 对应的 IConnection
            IConnection conn = getConnection(targetTaskId);
            if (conn != null) {
                // 将数据序列化为字节数组
                byte[] tupleMessage = serializer.serialize((TupleExt) tuple);
                //LOG.info("Task-{} sent msg to task-{}, data={}", task.getTaskId(), taskid,
                // JStormUtils.toPrintableString(tupleMessage));
                // 将数据序列化后的数据封装为 TaskMessage 类
                TaskMessage taskMessage = new TaskMessage(taskId, targetTaskId, tupleMessage);
                // 将 TaskMessage 类实例发送出去
                conn.send(taskMessage);
            } else {
                LOG.error("Can not find connection for task-{}", targetTaskId);
            }
        } finally {
            if (MetricUtils.metricAccurateCal) {
                serializeTimer.updateTime(start);
            }
        }
    }
    

    StormServerHandler#messageReceived 函数中接受到数据,然后直接将数据发送到 NettyServer#enqueue 函数中,追加到 deserializeQueues 列表中对应的队列。

    SpoutBolt 内部

    Spout 与 Bolt 内部

    jstorm 内部使用 AsyncLoopThread 来反复执行 SpoutExecutors 封装的 Spout 接口,然后通过 SpoutCollector 收集数据产生的数据,然后在 TaskReceiver 来接受其他组件反馈的数据比如ack, fail 等数据。

    JStorm 系统中,每个 Topology 结构都会分装到多个 TaskShutdownDameon 线程,都是一个独立的 task 实例进行执行,每个都有自己的 taskId ,因此都是通过 taskId 来管理各种的 DisruptorQueue

    相关文章

      网友评论

          本文标题:jstorm 源码分析

          本文链接:https://www.haomeiwen.com/subject/ndqrgktx.html