Nimbus 与 Supervisor 关系
Nimbus 与 Supervisor 关系
注 NimbusServer 中 Thrift 接口主要用与客户端访问 Nimbus 服务,比如提交 Topology 任务等,Nimbus 与 Supervisor 之间通过 ZooKeeper 进行交互,其中 Nimbus 与 Supervisor 都内置一个 Httpserver 用于日志下载等。
注 在 Zookeeper 中的目录结构如下:
+ ${zk_root_dir}
| ---- + topology: 记录集群中所有正在运行的 topology 数据
| | ---- + ${topology_id}: 指定 topology 的相关信息(名称、开始运行时间、运行状态等)
| ---- + supervisors: 记录集群中所有 supervisor 节点的心跳信息
| | ---- + ${supervivor_id}: 指定 supervisor 的心跳信息(心跳时间、主机名称、所有 worker 的端口号、运行时间等)
| ---- + assignments: 记录提交给集群的 topology 任务分配信息
| | ---- + ${topology_id}: 指定 topology 的任务分配信息(对应 nimbus 上的代码目录、所有 task 的启动时间、每个 task 与节点和端口的映射关系等)
| ---- + assignments_bak: 记录提交给集群的 topology 任务分配信息的备份
| ---- + tasks: 记录集群中所有 topology 的 task 信息
| | ---- + ${topology_id}: 指定 topology 的所有 task 信息
| | | ---- + ${task_id}: 指定 task 所属的组件 ID 和类型(spout/bolt)
| ---- + taskbeats: 记录集群中所有 task 的心跳信息
| | ---- + ${topology_id}: 记录指定 topology 下所有 task 的心跳信息、topologyId,以及 topologyMasterId 等
| | | ---- + ${task_id}: 指定 task 的心跳信息(最近一次心跳时间、运行时长、统计信息等)
| ---- + taskerrors: 记录集群中所有 topology 的 task 运行错误信息
| | ---- + ${topology_id}: 指定 topology 下所有 task 的运行错误信息
| | | ---- + ${task_id}: 指定 task 的运行错误信息
| ---- + metrics: 记录集群中所有 topology 的 metricsId
| ---- + blobstore: 记录集群对应的 blobstore 信息,用于协调数据一致性
| ---- + gray_upgrade: 记录灰度发布中的 topologyId
Worker 内部结构
worker 内部结构
注 在多个 worker 执行某个 Topology 过程中,worker 会通过占用的 slot.port 来封装 TaskTrasfer 类负责 Spout 和 Bolt 之间跨 worker 执行时的数据传输。每个 Spout 或 Bolt 都是一个独立的 Thread, 然后这些都会被 TaskShutdownDameon 管理起来。
注 在 jstorm 中,单个 task 会按照 Topology 结构组成了 TaskShutdownDameon 列表,最终放置到 WorkerData 中进行存储管理。
注 Worker 启动时,会主动监听分配的 slot.port 端口,实现为 IConnection,用于获取来自其他 Worker 的消息内容, 并绑定到 VirtualPortCtrlDispatch 与当前 worker 中的 DisruptorQueue 相关联起来(主要是传递控制相关的消息), 参考 Worker#startDispatchThread() 函数实现:
// 构建控制使用的内部 MQ 实例
DisruptorQueue recvControlQueue = DisruptorQueue.mkInstance("Dispatch-control", ProducerType.MULTI,
queueSize, waitStrategy, false, 0, 0);
//metric for recvControlQueue
QueueGauge revCtrlGauge = new QueueGauge(recvControlQueue, MetricDef.RECV_CTRL_QUEUE);
JStormMetrics.registerWorkerMetric(JStormMetrics.workerMetricName(MetricDef.RECV_CTRL_QUEUE, MetricType.GAUGE), new AsmGauge(
revCtrlGauge));
// 2. 为当前 worker 基于 Netty 创建并返回一个 Socket 连接用于接收消息
IConnection recvConnection = context.bind(topologyId, workerData.getPort(), workerData.getDeserializeQueues(),
recvControlQueue, false, workerData.getTaskIds());
workerData.setRecvConnection(recvConnection);
// create recvice control messages's thread
// 3. 启动一个线程循环消费 worker 接收到的消息,并应用 DisruptorRunnable.onEvent 方法,
// 最终调用的是 VirtualPortCtrlDispatch.handleEvent 方法,将消息投递给指定 task 的消息队列
RunnableCallback recvControlDispather = new VirtualPortCtrlDispatch(
workerData, recvConnection, recvControlQueue, MetricDef.RECV_THREAD);
注 数据消息通过 TaskTransfer#serialize() 函数将数据转化为 TaskMessage 传输到对应的 IConnection 下游。具体如下:
protected void serialize(KryoTupleSerializer serializer, Object event) {
long start = serializeTimer.getTime();
try {
ITupleExt tuple = (ITupleExt) event;
int targetTaskId = tuple.getTargetTaskId();
// 获取事件任务 ID 对应的 IConnection
IConnection conn = getConnection(targetTaskId);
if (conn != null) {
// 将数据序列化为字节数组
byte[] tupleMessage = serializer.serialize((TupleExt) tuple);
//LOG.info("Task-{} sent msg to task-{}, data={}", task.getTaskId(), taskid,
// JStormUtils.toPrintableString(tupleMessage));
// 将数据序列化后的数据封装为 TaskMessage 类
TaskMessage taskMessage = new TaskMessage(taskId, targetTaskId, tupleMessage);
// 将 TaskMessage 类实例发送出去
conn.send(taskMessage);
} else {
LOG.error("Can not find connection for task-{}", targetTaskId);
}
} finally {
if (MetricUtils.metricAccurateCal) {
serializeTimer.updateTime(start);
}
}
}
注 在 StormServerHandler#messageReceived 函数中接受到数据,然后直接将数据发送到 NettyServer#enqueue 函数中,追加到 deserializeQueues 列表中对应的队列。
Spout 与 Bolt 内部
Spout 与 Bolt 内部
注 在 jstorm 内部使用 AsyncLoopThread 来反复执行 SpoutExecutors 封装的 Spout 接口,然后通过 SpoutCollector 收集数据产生的数据,然后在 TaskReceiver 来接受其他组件反馈的数据比如ack, fail 等数据。
注 在 JStorm 系统中,每个 Topology 结构都会分装到多个 TaskShutdownDameon 线程,都是一个独立的 task 实例进行执行,每个都有自己的 taskId ,因此都是通过 taskId 来管理各种的 DisruptorQueue。







网友评论