Nimbus
与 Supervisor
关系

注
NimbusServer
中 Thrift
接口主要用与客户端访问 Nimbus
服务,比如提交 Topology
任务等,Nimbus
与 Supervisor
之间通过 ZooKeeper
进行交互,其中 Nimbus
与 Supervisor
都内置一个 Httpserver
用于日志下载等。
注
在 Zookeeper
中的目录结构如下:
+ ${zk_root_dir}
| ---- + topology: 记录集群中所有正在运行的 topology 数据
| | ---- + ${topology_id}: 指定 topology 的相关信息(名称、开始运行时间、运行状态等)
| ---- + supervisors: 记录集群中所有 supervisor 节点的心跳信息
| | ---- + ${supervivor_id}: 指定 supervisor 的心跳信息(心跳时间、主机名称、所有 worker 的端口号、运行时间等)
| ---- + assignments: 记录提交给集群的 topology 任务分配信息
| | ---- + ${topology_id}: 指定 topology 的任务分配信息(对应 nimbus 上的代码目录、所有 task 的启动时间、每个 task 与节点和端口的映射关系等)
| ---- + assignments_bak: 记录提交给集群的 topology 任务分配信息的备份
| ---- + tasks: 记录集群中所有 topology 的 task 信息
| | ---- + ${topology_id}: 指定 topology 的所有 task 信息
| | | ---- + ${task_id}: 指定 task 所属的组件 ID 和类型(spout/bolt)
| ---- + taskbeats: 记录集群中所有 task 的心跳信息
| | ---- + ${topology_id}: 记录指定 topology 下所有 task 的心跳信息、topologyId,以及 topologyMasterId 等
| | | ---- + ${task_id}: 指定 task 的心跳信息(最近一次心跳时间、运行时长、统计信息等)
| ---- + taskerrors: 记录集群中所有 topology 的 task 运行错误信息
| | ---- + ${topology_id}: 指定 topology 下所有 task 的运行错误信息
| | | ---- + ${task_id}: 指定 task 的运行错误信息
| ---- + metrics: 记录集群中所有 topology 的 metricsId
| ---- + blobstore: 记录集群对应的 blobstore 信息,用于协调数据一致性
| ---- + gray_upgrade: 记录灰度发布中的 topologyId
Worker
内部结构

注
在多个 worker
执行某个 Topology
过程中,worker
会通过占用的 slot.port
来封装 TaskTrasfer
类负责 Spout
和 Bolt
之间跨 worker
执行时的数据传输。每个 Spout
或 Bolt
都是一个独立的 Thread
, 然后这些都会被 TaskShutdownDameon
管理起来。
注
在 jstorm
中,单个 task
会按照 Topology
结构组成了 TaskShutdownDameon
列表,最终放置到 WorkerData
中进行存储管理。
注
Worker
启动时,会主动监听分配的 slot.port
端口,实现为 IConnection
,用于获取来自其他 Worker
的消息内容, 并绑定到 VirtualPortCtrlDispatch
与当前 worker
中的 DisruptorQueue
相关联起来(主要是传递控制相关的消息), 参考 Worker#startDispatchThread()
函数实现:
// 构建控制使用的内部 MQ 实例
DisruptorQueue recvControlQueue = DisruptorQueue.mkInstance("Dispatch-control", ProducerType.MULTI,
queueSize, waitStrategy, false, 0, 0);
//metric for recvControlQueue
QueueGauge revCtrlGauge = new QueueGauge(recvControlQueue, MetricDef.RECV_CTRL_QUEUE);
JStormMetrics.registerWorkerMetric(JStormMetrics.workerMetricName(MetricDef.RECV_CTRL_QUEUE, MetricType.GAUGE), new AsmGauge(
revCtrlGauge));
// 2. 为当前 worker 基于 Netty 创建并返回一个 Socket 连接用于接收消息
IConnection recvConnection = context.bind(topologyId, workerData.getPort(), workerData.getDeserializeQueues(),
recvControlQueue, false, workerData.getTaskIds());
workerData.setRecvConnection(recvConnection);
// create recvice control messages's thread
// 3. 启动一个线程循环消费 worker 接收到的消息,并应用 DisruptorRunnable.onEvent 方法,
// 最终调用的是 VirtualPortCtrlDispatch.handleEvent 方法,将消息投递给指定 task 的消息队列
RunnableCallback recvControlDispather = new VirtualPortCtrlDispatch(
workerData, recvConnection, recvControlQueue, MetricDef.RECV_THREAD);
注
数据消息通过 TaskTransfer#serialize()
函数将数据转化为 TaskMessage
传输到对应的 IConnection
下游。具体如下:
protected void serialize(KryoTupleSerializer serializer, Object event) {
long start = serializeTimer.getTime();
try {
ITupleExt tuple = (ITupleExt) event;
int targetTaskId = tuple.getTargetTaskId();
// 获取事件任务 ID 对应的 IConnection
IConnection conn = getConnection(targetTaskId);
if (conn != null) {
// 将数据序列化为字节数组
byte[] tupleMessage = serializer.serialize((TupleExt) tuple);
//LOG.info("Task-{} sent msg to task-{}, data={}", task.getTaskId(), taskid,
// JStormUtils.toPrintableString(tupleMessage));
// 将数据序列化后的数据封装为 TaskMessage 类
TaskMessage taskMessage = new TaskMessage(taskId, targetTaskId, tupleMessage);
// 将 TaskMessage 类实例发送出去
conn.send(taskMessage);
} else {
LOG.error("Can not find connection for task-{}", targetTaskId);
}
} finally {
if (MetricUtils.metricAccurateCal) {
serializeTimer.updateTime(start);
}
}
}
注
在 StormServerHandler#messageReceived
函数中接受到数据,然后直接将数据发送到 NettyServer#enqueue
函数中,追加到 deserializeQueues
列表中对应的队列。
Spout
与 Bolt
内部

注
在 jstorm
内部使用 AsyncLoopThread
来反复执行 SpoutExecutors
封装的 Spout
接口,然后通过 SpoutCollector
收集数据产生的数据,然后在 TaskReceiver
来接受其他组件反馈的数据比如ack
, fail
等数据。
注
在 JStorm
系统中,每个 Topology
结构都会分装到多个 TaskShutdownDameon
线程,都是一个独立的 task
实例进行执行,每个都有自己的 taskId
,因此都是通过 taskId
来管理各种的 DisruptorQueue
。
网友评论