druid中overlord节点负责任务的分发,通过zookeeper中节点创建与删除管理任务的创建移除等。
动态配置configManager
druid提供了动态修改配置的功能,主要思路是:
ConfigManager模块定时从数据库读取需要监控的配置,更新AtomicReference对象的内容。
使用方通过AtomicReference对象获取配置。
对外提供可以修改配置的接口,接口的功能将更新的配置写到数据库,并将当前配置的key加入到ConfigManager中监控。
例如druid提供通过js的方式,指定不同的任务运行在不同的middlemanager节点上,具体的js配置可以通过数据库查看。
//定时调度poll()方法
private void poll()
{// ConcurrentMap<String, ConfigHolder> watchedConfigs:存储了以配置的key->配置容器ConfigHolder
for (Map.Entry<String, ConfigHolder> entry : watchedConfigs.entrySet()) {
try {// 从数据库读取配置,并更新ConfigHolder对象的reference对象
if (entry.getValue().swapIfNew(dbConnector.lookup(configTable, "name", "payload", entry.getKey()))) {
log.info("New value for key[%s] seen.", entry.getKey());
}
}
catch (Exception e) {
log.warn(e, "Exception when checking property[%s]", entry.getKey());
}
}
}
private static class ConfigHolder<T>
{
private final AtomicReference<byte[]> rawBytes;
private final ConfigSerde<T> serde;
private final AtomicReference<T> reference;
ConfigHolder(
byte[] rawBytes,
ConfigSerde<T> serde
)
{
this.rawBytes = new AtomicReference<byte[]>(rawBytes);
this.serde = serde;
this.reference = new AtomicReference<T>(serde.deserialize(rawBytes));//配置使用方持有AtomicReference对象
}
public AtomicReference<T> getReference()
{
return reference;
}
public boolean swapIfNew(byte[] newBytes)
{
if (!Arrays.equals(newBytes, rawBytes.get())) {
reference.set(serde.deserialize(newBytes));// 更新reference里的内容,使用方可以感知到配置变化
rawBytes.set(newBytes);
return true;
}
return false;
}
}
JacksonConfigManagerProvider中将配置的key添加到ConfigManager,监控对于的配置修改,监控对象的AtomicReference 注入到guice框架,方便使用方获取
http://<OVERLORD_IP>:<port>/druid/indexer/v1/worker
提交post请求修改任务配置workerBehaviorConfig
@POST
@Path("/worker")
@Consumes(MediaType.APPLICATION_JSON)
@ResourceFilters(ConfigResourceFilter.class)
public Response setWorkerConfig(
final WorkerBehaviorConfig workerBehaviorConfig,
@HeaderParam(AuditManager.X_DRUID_AUTHOR) @DefaultValue("") final String author,
@HeaderParam(AuditManager.X_DRUID_COMMENT) @DefaultValue("") final String comment,
@Context final HttpServletRequest req
)
{
final SetResult setResult = configManager.set(//修改配置
WorkerBehaviorConfig.CONFIG_KEY,
workerBehaviorConfig,
new AuditInfo(author, comment, req.getRemoteAddr())
);
if (setResult.isOk()) {
log.info("Updating Worker configs: %s", workerBehaviorConfig);
return Response.ok().build();
} else {
return Response.status(Response.Status.BAD_REQUEST).build();
}
}
public <T> SetResult set(final String key, final ConfigSerde<T> serde, final T obj)
{
if (obj == null || !started) {
if (obj == null) {
return SetResult.fail(new IllegalAccessException("input obj is null"));
} else {
return SetResult.fail(new IllegalStateException("configManager is not started yet"));
}
}
final byte[] newBytes = serde.serialize(obj);
try {//此处使用专门的线程池(一个线程),应该是为了避免并发调用更新数据库频率太高
exec.submit(
() -> {//将提交的配置更新到数据库
dbConnector.insertOrUpdate(configTable, "name", "payload", key, newBytes);
final ConfigHolder configHolder = watchedConfigs.get(key);
if (configHolder != null) {//更新configHolder对象,即使用方持有的AtomicReference对象
configHolder.swapIfNew(newBytes);
}
return true;
}
).get();
return SetResult.ok();
}
catch (Exception e) {
log.warn(e, "Failed to set[%s]", key);
return SetResult.fail(e);
}
}
Overlord接收任务
只有master节点接收任务提交,其他slave节点返回503code
taskMaster监控leader的变更,当节点成为leader后,创建TaskQueue队列存放任务,taskRunner运行任务。
将提交的任务插入到mysql的druid_tasks表,任务状态为running;将任务添加到tasks列表,供定时任务处理
将任务配置,具体指payload,加到任务表,任务状态时running。
任务添加到tasks列表,taskLockbox列表
@POST
@Path("/task")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response taskPost(
final Task task,
@Context final HttpServletRequest req
)
{
return asLeaderWith(// 只有master节点接受任务提交,其他slave节点返回503 code
taskMaster.getTaskQueue(),// taskMaster监听leader节点变更,当成为leader节点后,创建TaskQueue存放任务、taskRunner运行任务
new Function<TaskQueue, Response>()
{
@Override
public Response apply(TaskQueue taskQueue)
{
try {
taskQueue.add(task);//将提交的任务插入到mysql数据库druid_tasks表,任务状态置为running;将任务添加到tasks列表,供定时任务处理
return Response.ok(ImmutableMap.of("task", task.getId())).build();
}
catch (EntryExistsException e) {
...;
}
}
}
);
TaskQueue通过数据库管理任务
添加任务-http请求调用,将任务写到数据库并添加任务到任务列表中
定时线程定时将数据中active的表同步到内存在任务列表中
对上一步添加的任务执行RemoteTaskRunner.run,添加了任务异步回调逻辑,
当任务执行完成后同步任务状态到数据库。
/**
* Main task runner management loop. Meant to run forever, or, at least until we're stopped.
*/
private void manage() throws InterruptedException
{
log.info("Beginning management in %s.", config.getStartDelay());
Thread.sleep(config.getStartDelay().getMillis());
// Ignore return value- we'll get the IDs and futures from getKnownTasks later.
taskRunner.restore();
while (active) {
giant.lock();
try {
// Task futures available from the taskRunner
final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = Maps.newHashMap();
for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) {
runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());
}
// Attain futures for all active tasks (assuming they are ready to run).
// Copy tasks list, as notifyStatus may modify it.
for (final Task task : ImmutableList.copyOf(tasks)) {
if (!taskFutures.containsKey(task.getId())) {
final ListenableFuture<TaskStatus> runnerTaskFuture;
if (runnerTaskFutures.containsKey(task.getId())) {
runnerTaskFuture = runnerTaskFutures.get(task.getId());
} else {
// Task should be running, so run it.
final boolean taskIsReady;
try {
taskIsReady = task.isReady(taskActionClientFactory.create(task));
}
catch (Exception e) {
log.warn(e, "Exception thrown during isReady for task: %s", task.getId());
notifyStatus(task, TaskStatus.failure(task.getId()));
continue;
}
if (taskIsReady) {
log.info("Asking taskRunner to run: %s", task.getId());
runnerTaskFuture = taskRunner.run(task);// 执行RemoteTaskRunner.run ,返回ListenableFuture<TaskStatus>,可以对这个Future添加监听逻辑
} else {
continue;
}
}
taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));// 添加回调逻辑,当任务成功或失败时同步任务状态到数据库
}
}
// Kill tasks that shouldn't be running
final Set<String> tasksToKill = Sets.difference(
runnerTaskFutures.keySet(),
ImmutableSet.copyOf(
Lists.transform(
tasks,
new Function<Task, Object>()
{
@Override
public String apply(Task task)
{
return task.getId();
}
}
)
)
);
if (!tasksToKill.isEmpty()) {
log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size());
for (final String taskId : tasksToKill) {
try {
taskRunner.shutdown(taskId);
}
catch (Exception e) {
log.warn(e, "TaskRunner failed to clean up task: %s", taskId);
}
}
}
// awaitNanos because management may become necessary without this condition signalling,
// due to e.g. tasks becoming ready when other folks mess with the TaskLockbox.
managementMayBeNecessary.awaitNanos(60000000000L /* 60 seconds */);
}
finally {
giant.unlock();
}
}
}
private void notifyStatus(final Task task, final TaskStatus taskStatus)
{
giant.lock();
try {
Preconditions.checkNotNull(task, "task");
Preconditions.checkNotNull(taskStatus, "status");
Preconditions.checkState(active, "Queue is not active!");
Preconditions.checkArgument(
task.getId().equals(taskStatus.getId()),
"Mismatching task ids[%s/%s]",
task.getId(),
taskStatus.getId()
);
// Inform taskRunner that this task can be shut down
try {
taskRunner.shutdown(task.getId());// 如果正在运行的任务就向work发起http请求/task/%s/shutdown,停掉任务;
}
catch (Exception e) {
log.warn(e, "TaskRunner failed to cleanup task after completion: %s", task.getId());
}
// Remove from running tasks
int removed = 0;
for (int i = tasks.size() - 1; i >= 0; i--) {
if (tasks.get(i).getId().equals(task.getId())) {
removed++;
removeTaskInternal(tasks.get(i));//清理内存中已经完成的任务
break;
}
}
if (removed == 0) {
log.warn("Unknown task completed: %s", task.getId());
} else if (removed > 1) {
log.makeAlert("Removed multiple copies of task").addData("count", removed).addData("task", task.getId()).emit();
}
// Remove from futures list
taskFutures.remove(task.getId());
if (removed > 0) {
// If we thought this task should be running, save status to DB
try {
final Optional<TaskStatus> previousStatus = taskStorage.getStatus(task.getId());
if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) {
log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit();
} else {
taskStorage.setStatus(taskStatus);// 任务同步到数据库
log.info("Task done: %s", task);
managementMayBeNecessary.signalAll();
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to persist status for task")
.addData("task", task.getId())
.addData("statusCode", taskStatus.getStatusCode())
.emit();
}
}
}
finally {
giant.unlock();
}
}
RemoteTaskRunner与zookeeper交互实现对任务的管理
监控znode节点/druid/indexer/announcements
- 新增worker
调用addworker方法,添加对status的监听/druid/indexer/status/worker.getHost() - 更新zkworker
worker的配置有更新,则更新zkworker,原先的监控不变 - 删除worker
获取所有分配到当前worker的任务,延迟config.getTaskCleanupTimeout(15min)清理这些任务,删除
znode的/druid/indexer/status/worker.getHost 和 /druid/indexer/tasks/worker.getHost()/taskId;最后删除/druid/indexer/status
停止zkworker的znode节点的status的监控
// Add listener for creation/deletion of workers
workerPathCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception
{
final Worker worker;
switch (event.getType()) {
case CHILD_ADDED:// 新增worker
worker = jsonMapper.readValue(
event.getData().getData(),
Worker.class
);
synchronized (waitingForMonitor) {
waitingFor.increment();
}
Futures.addCallback(
addWorker(worker),
new FutureCallback<ZkWorker>()
{
@Override
public void onSuccess(ZkWorker zkWorker)
{
synchronized (waitingForMonitor) {
waitingFor.decrement();
waitingForMonitor.notifyAll();
}
}
@Override
public void onFailure(Throwable throwable)
{
synchronized (waitingForMonitor) {
waitingFor.decrement();
waitingForMonitor.notifyAll();
}
}
}
);
break;
case CHILD_UPDATED:
worker = jsonMapper.readValue(
event.getData().getData(),
Worker.class
);
updateWorker(worker);
break;
case CHILD_REMOVED:
worker = jsonMapper.readValue(
event.getData().getData(),
Worker.class
);
removeWorker(worker);
break;
case INITIALIZED:
// Schedule cleanup for task status of the workers that might have disconnected while overlord was not running
List<String> workers;
try {
workers = cf.getChildren().forPath(indexerZkConfig.getStatusPath());
}
catch (KeeperException.NoNodeException e) {
// statusPath doesn't exist yet; can occur if no middleManagers have started.
workers = ImmutableList.of();
}
for (String workerId : workers) {
final String workerAnnouncePath = JOINER.join(indexerZkConfig.getAnnouncementsPath(), workerId);
final String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), workerId);
if (!zkWorkers.containsKey(workerId) && cf.checkExists().forPath(workerAnnouncePath) == null) {
try {
scheduleTasksCleanupForWorker(workerId, cf.getChildren().forPath(workerStatusPath));
}
catch (Exception e) {
log.warn(
e,
"Could not schedule cleanup for worker[%s] during startup (maybe someone removed the status znode[%s]?). Skipping.",
workerId,
workerStatusPath
);
}
}
}
synchronized (waitingForMonitor) {
waitingFor.decrement();
waitingForMonitor.notifyAll();
}
break;
case CONNECTION_SUSPENDED:
case CONNECTION_RECONNECTED:
case CONNECTION_LOST:
// do nothing
}
}
}
);
workerPathCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
ZKworker监控znode节点/druid/indexer/status
- 新增或者更新任务状态
Location变更,更新任务locational.所谓的location就是任务运行在那个节点上.
任务完成,更新zkworker最新任务完成时间和连续失败的任务数;从runningtasks删除任务,添加到completeTasks - 删除znode
任务消失,从runningtasks删除任务 - 初始化完成
zkworker添加到zkworkers,调用runPendingtask()运行pending的任务。
// Add status listener to the watcher for status changes
zkWorker.addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
final String taskId;
final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
synchronized (statusLock) {
try {
switch (event.getType()) {
case CHILD_ADDED:
case CHILD_UPDATED:// 任务status发生变化
taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
final TaskAnnouncement announcement = jsonMapper.readValue(
event.getData().getData(), TaskAnnouncement.class
);
log.info(
"Worker[%s] wrote %s status for task [%s] on [%s]",
zkWorker.getWorker().getHost(),
announcement.getTaskStatus().getStatusCode(),
taskId,
announcement.getTaskLocation()
);
// Synchronizing state with ZK
statusLock.notifyAll();
final RemoteTaskRunnerWorkItem tmp;
if ((tmp = runningTasks.get(taskId)) != null) {
taskRunnerWorkItem = tmp;
} else {
final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
taskId,
announcement.getTaskType(),
zkWorker.getWorker(),
TaskLocation.unknown(),
announcement.getTaskDataSource()
);
final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
taskId,
newTaskRunnerWorkItem
);
if (existingItem == null) {
log.warn(
"Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
zkWorker.getWorker().getHost(),
taskId
);
taskRunnerWorkItem = newTaskRunnerWorkItem;
} else {
taskRunnerWorkItem = existingItem;
}
}
if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
}
if (announcement.getTaskStatus().isComplete()) {// 任务完成
taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus());
runPendingTasks();
}
break;
case CHILD_REMOVED:// 任务消失
taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
taskRunnerWorkItem = runningTasks.remove(taskId);
if (taskRunnerWorkItem != null) {
log.info("Task[%s] just disappeared!", taskId);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId));
} else {
log.info("Task[%s] went bye bye.", taskId);
}
break;
case INITIALIZED:
if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
retVal.set(zkWorker);
} else {
final String message = StringUtils.format(
"WTF?! Tried to add already-existing worker[%s]",
worker.getHost()
);
log.makeAlert(message)
.addData("workerHost", worker.getHost())
.addData("workerIp", worker.getIp())
.emit();
retVal.setException(new IllegalStateException(message));
}
runPendingTasks();
break;
case CONNECTION_SUSPENDED:
case CONNECTION_RECONNECTED:
case CONNECTION_LOST:
// do nothing
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to handle new worker status")
.addData("worker", zkWorker.getWorker().getHost())
.addData("znode", event.getData().getPath())
.emit();
}
}
}
}
);
zkWorker.start();
运行pending的任务runPendingTasks()
按照任务的插入时间排序
分配并运行任务tryAssignTask(task,taskRunnerWorkItem)
`private` `void` `runPendingTasks()`
`{`
`runPendingTasksExec.submit(`
`new` `Callable<Void>()`
`{`
`@Override`
`public` `Void call() ``throws` `Exception`
`{`
`try` `{`
`// make a copy of the pending tasks because tryAssignTask may delete tasks from pending and move them`
`// into running status`
`List<RemoteTaskRunnerWorkItem> copy = Lists.newArrayList(pendingTasks.values());`
`sortByInsertionTime(copy);``// 按任务的插入时间排序`
`for` `(RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) {`
`String taskId = taskRunnerWorkItem.getTaskId();`
`if` `(tryAssignTasks.putIfAbsent(taskId, taskId) == ``null``) {`
`try` `{`
`//this can still be null due to race from explicit task shutdown request`
`//or if another thread steals and completes this task right after this thread makes copy`
`//of pending tasks. See [https://github.com/druid-io/druid/issues/2842](https://github.com/druid-io/druid/issues/2842) .`
`Task task = pendingTaskPayloads.get(taskId);`
`if` `(task != ``null` `&& tryAssignTask(task, taskRunnerWorkItem)) {``// 分配并运行任务`
`pendingTaskPayloads.remove(taskId);`
`}`
`}`
`catch` `(Exception e) {`
`log.makeAlert(e, ``"Exception while trying to assign task"``)`
`.addData(``"taskId"``, taskRunnerWorkItem.getTaskId())`
`.emit();`
`RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);`
`if` `(workItem != ``null``) {`
`taskComplete(workItem, ``null``, TaskStatus.failure(taskId));``// 任务完成,更新zkWorker最新任务完成时间和连续失败的任务数;从runningTasks删除任务,添加到completeTasks`
`}`
`}`
`finally` `{`
`tryAssignTasks.remove(taskId);`
`}`
`}`
`}`
`}`
`catch` `(Exception e) {`
`log.makeAlert(e, ``"Exception in running pending tasks"``).emit();`
`}`
`return` `null``;`
`}`
`}`
`);`
`}`
分配并运行任务tryAssignTask
根据配置workerBehaviorConfig的策略分配worker
通过zookeeper向 worker分配任务announceTask(task,assignworker,taskrunnerworkItem,)
/**
* Ensures no workers are already running a task before assigning the task to a worker.
* It is possible that a worker is running a task that the RTR has no knowledge of. This occurs when the RTR
* needs to bootstrap after a restart.
*
* @param taskRunnerWorkItem - the task to assign
*
* @return true iff the task is now assigned
*/
private boolean tryAssignTask(final Task task, final RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception
{
Preconditions.checkNotNull(task, "task");
Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
Preconditions.checkArgument(task.getId().equals(taskRunnerWorkItem.getTaskId()), "task id != workItem id");
if (runningTasks.containsKey(task.getId()) || findWorkerRunningTask(task.getId()) != null) {
log.info("Task[%s] already running.", task.getId()); // 已经有worker在运行这个任务
return true;
} else {
// Nothing running this task, announce it in ZK for a worker to run it
WorkerBehaviorConfig workerConfig = workerConfigRef.get();// 动态配置,可以及时感受到配置修改
WorkerSelectStrategy strategy;
if (workerConfig == null || workerConfig.getSelectStrategy() == null) {
strategy = WorkerBehaviorConfig.DEFAULT_STRATEGY;// 默认的worker分配策略,按所有worker可用任务数排序
log.debug("No worker selection strategy set. Using default of [%s]", strategy.getClass().getSimpleName());
} else {
strategy = workerConfig.getSelectStrategy();
}
ZkWorker assignedWorker = null;
final ImmutableWorkerInfo immutableZkWorker;
try {
synchronized (workersWithUnacknowledgedTask) {
immutableZkWorker = strategy.findWorkerForTask(// 分配worker
config,
ImmutableMap.copyOf(
Maps.transformEntries(
Maps.filterEntries(
zkWorkers, new Predicate<Map.Entry<String, ZkWorker>>()
{
@Override
public boolean apply(Map.Entry<String, ZkWorker> input)
{
return !lazyWorkers.containsKey(input.getKey()) &&
!workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
!blackListedWorkers.contains(input.getValue());
}
}
),
new Maps.EntryTransformer<String, ZkWorker, ImmutableWorkerInfo>()
{
@Override
public ImmutableWorkerInfo transformEntry(
String key, ZkWorker value
)
{
return value.toImmutable();
}
}
)
),
task
);
if (immutableZkWorker != null &&
workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.getWorker().getHost(), task.getId())
== null) {
assignedWorker = zkWorkers.get(immutableZkWorker.getWorker().getHost());
}
}
if (assignedWorker != null) {
return announceTask(task, assignedWorker, taskRunnerWorkItem);// 通过zookeeper向worker分配任务
} else {
log.debug(
"Unsuccessful task-assign attempt for task [%s] on workers [%s]. Workers to ack tasks are [%s].",
task.getId(),
zkWorkers.values(),
workersWithUnacknowledgedTask
);
}
return false;
}
finally {
if (assignedWorker != null) {
workersWithUnacknowledgedTask.remove(assignedWorker.getWorker().getHost());
//if this attempt won the race to run the task then other task might be able to use this worker now after task ack.
runPendingTasks();
}
}
}
}
通过zookeeper向worker分配任务announceTask(task, assignedWorker, taskRunnerWorkItem)
主要在zookeeper中建立新的znode节点/druid/indexer/tasks/worker/taskId
通过statuslock加锁,闲置等待任务被worker接收以后,才能任务任务分配成功;
只有当当前任务分配完成,才会继续分配一下个任务
ContainerCacheListener与InventoryCacheListener代码
/**
* Creates a ZK entry under a specific path associated with a worker. The worker is responsible for
* removing the task ZK entry and creating a task status ZK entry.
*
* @param theZkWorker The worker the task is assigned to
* @param taskRunnerWorkItem The task to be assigned
*
* @return boolean indicating whether the task was successfully assigned or not
*/
private boolean announceTask(
final Task task,
final ZkWorker theZkWorker,
final RemoteTaskRunnerWorkItem taskRunnerWorkItem
) throws Exception
{
Preconditions.checkArgument(task.getId().equals(taskRunnerWorkItem.getTaskId()), "task id != workItem id");
final String worker = theZkWorker.getWorker().getHost();
synchronized (statusLock) {
if (!zkWorkers.containsKey(worker) || lazyWorkers.containsKey(worker)) {
// the worker might have been killed or marked as lazy
log.info("Not assigning task to already removed worker[%s]", worker);
return false;
}
log.info("Coordinator asking Worker[%s] to add task[%s]", worker, task.getId());
CuratorUtils.createIfNotExists(//创建znode节点/druid/indexer/tasks/worker/taskId
cf,
JOINER.join(indexerZkConfig.getTasksPath(), worker, task.getId()),
CreateMode.EPHEMERAL,
jsonMapper.writeValueAsBytes(task),
config.getMaxZnodeBytes()
);
RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(task.getId());//删除pendingTasks中的pending任务
if (workItem == null) {
log.makeAlert("WTF?! Got a null work item from pending tasks?! How can this be?!")
.addData("taskId", task.getId())
.emit();
return false;
}
RemoteTaskRunnerWorkItem newWorkItem = workItem.withWorker(theZkWorker.getWorker(), null);
runningTasks.put(task.getId(), newWorkItem);// 添加到runningTasks
log.info("Task %s switched from pending to running (on [%s])", task.getId(), newWorkItem.getWorker().getHost());
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.running(task.getId()));
// Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running
// on a worker - this avoids overflowing a worker with tasks
Stopwatch timeoutStopwatch = Stopwatch.createStarted();
while (!isWorkerRunningTask(theZkWorker, task.getId())) {// 重要!!!这里通过statusLock加锁,限制等待任务被worker接受以后才认为任务分配成功
final long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
statusLock.wait(waitMs);
long elapsed = timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS);
if (elapsed >= waitMs) {
log.makeAlert(
"Task assignment timed out on worker [%s], never ran task [%s]! Timeout: (%s >= %s)!",
worker,
task.getId(),
elapsed,
config.getTaskAssignmentTimeout()
).emit();
taskComplete(taskRunnerWorkItem, theZkWorker, TaskStatus.failure(task.getId()));
break;
}
}
return true;
}
}
选择分配worker节点
如果有单独的配置,知名某些datasource分配到那些worker节点,优先将datasource分配到这些节点。
这里通过js配置
如果知名worker节点没有可用的可以执行的任务,分配没有明确指定的datasource 的worker节点
/**
* Helper for {@link WorkerSelectStrategy} implementations.
*
* @param allWorkers map of all workers, in the style provided to {@link WorkerSelectStrategy}
* @param affinityConfig affinity config, or null
* @param workerSelector function that receives a list of eligible workers: version is high enough, worker can run
* the task, and worker satisfies the affinity config. may return null.
*
* @return selected worker from "allWorkers", or null.
*/
@Nullable
public static ImmutableWorkerInfo selectWorker(
final Task task,
final Map<String, ImmutableWorkerInfo> allWorkers,
final WorkerTaskRunnerConfig workerTaskRunnerConfig,
@Nullable final AffinityConfig affinityConfig,
final Function<ImmutableMap<String, ImmutableWorkerInfo>, ImmutableWorkerInfo> workerSelector
)
{
// Workers that could potentially run this task, ignoring affinityConfig.
final Map<String, ImmutableWorkerInfo> runnableWorkers = allWorkers
.values()
.stream()
.filter(worker -> worker.canRunTask(task)
&& worker.isValidVersion(workerTaskRunnerConfig.getMinWorkerVersion()))
.collect(Collectors.toMap(w -> w.getWorker().getHost(), Function.identity()));
if (affinityConfig == null) {
// All runnable workers are valid.
return workerSelector.apply(ImmutableMap.copyOf(runnableWorkers));// 没有单独配置,直接应用worker分配策略
} else {
// Workers assigned to the affinity pool for our task.
final Set<String> dataSourceWorkers = affinityConfig.getAffinity().get(task.getDataSource());
if (dataSourceWorkers == null) {
// No affinity config for this dataSource; use non-affinity workers.
return workerSelector.apply(getNonAffinityWorkers(affinityConfig, runnableWorkers));
} else {// 指明某些datasource分配到哪些worker节点,优先将datasource分配到这些节点
// Get runnable, affinity workers.
final ImmutableMap<String, ImmutableWorkerInfo> dataSourceWorkerMap =
ImmutableMap.copyOf(Maps.filterKeys(runnableWorkers, dataSourceWorkers::contains));
final ImmutableWorkerInfo selected = workerSelector.apply(dataSourceWorkerMap);
if (selected != null) {
return selected;
} else if (affinityConfig.isStrong()) {
return null;
} else {
// Weak affinity allows us to use nonAffinityWorkers for this dataSource, if no affinity workers
// are available.
return workerSelector.apply(getNonAffinityWorkers(affinityConfig, runnableWorkers));// 如果指明的worker节点没有可用的可以执行的任务,分配没有指明datasource的worker节点
}
}
}
}
实际应用中,一般是均衡策略。
EqualDistributionWorkerSelectStrategy的分配策略是将所有worker节点按还可以执行的任务数排序,选择任务数最多的节点
public ImmutableWorkerInfo findWorkerForTask(
final WorkerTaskRunnerConfig config,
final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
final Task task
)
{
return WorkerSelectUtils.selectWorker(
task,
zkWorkers,
config,
affinityConfig,
EqualDistributionWorkerSelectStrategy::selectFromEligibleWorkers
);
}
private static ImmutableWorkerInfo selectFromEligibleWorkers(final Map<String, ImmutableWorkerInfo> eligibleWorkers)
{
return eligibleWorkers.values().stream().max(
Comparator.comparing(ImmutableWorkerInfo::getAvailableCapacity)
).orElse(null);
}
worker节点借助zookeeper接收任务,更新任务状态
WorkerCuratorCoordinator创建znode,对外停工当前可用的worker
- 创建持久类型的znode节点/druid/indexer/tasks/workerhost 任务节点,用于接收overlord分配的任务
- 创建持久类型的znode节点/druid/indexer/status/workerhost 任务状态节点,worker更新此节点,overlord监听此节点接收任务状态变化
- Announcer创建/druid/indexer/announcements/workerhost worker节点,声明可用的worker
WoekerTaskMonitor监听新分配的任务
监听znode节点/druid/indexer/tasks/workerhost的新增节点
反序列画节点内容为task对象,调用assignTask(Task:task):异步调用ForkingTaskRunner.run运行任务,将任务添加到running队列
此时认为任务分配成功,更新znode节点/druid/indexer/status/workerhost 任务状态节点为running状态;删除znode节点/druid/indexer/tasks/workerhost任务节点
public void assignTask(Task task)
{
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.SECONDS), "not started");
synchronized (lock) {
if (assignedTasks.containsKey(task.getId())
|| runningTasks.containsKey(task.getId())
|| completedTasks.containsKey(task.getId())) {
log.info("Assign task[%s] request ignored because it exists already.", task.getId());
return;
}
try {
jsonMapper.writeValue(new File(getAssignedTaskDir(), task.getId()), task);
assignedTasks.put(task.getId(), task);
}
catch (IOException ex) {
log.error(ex, "Error while trying to persist assigned task[%s]", task.getId());
throw new ISE("Assign Task[%s] Request failed because [%s].", task.getId(), ex.getMessage());
}
changeHistory.addChangeRequest(//记录变更历史
new WorkerHistoryItem.TaskUpdate(
TaskAnnouncement.create(
task,
TaskStatus.running(task.getId()),
TaskLocation.unknown()
)
)
);
}
submitNoticeToExec(new RunNotice(task));// 提交任务运行
}
private class RunNotice implements Notice
{
private final Task task;
public RunNotice(Task task)
{
this.task = task;
}
@Override
public String getTaskId()
{
return task.getId();
}
@Override
public void handle() throws Exception
{
TaskAnnouncement announcement = null;
synchronized (lock) {
if (runningTasks.containsKey(task.getId()) || completedTasks.containsKey(task.getId())) {
log.warn(
"Got run notice for task [%s] that I am already running or completed...",
task.getId()
);
taskStarted(task.getId());
return;
}
final ListenableFuture<TaskStatus> future = taskRunner.run(task);//异步调用ForkingTaskRunner.run运行任务
addRunningTask(task, future);//将任务添加到running队列
announcement = TaskAnnouncement.create(
task,
TaskStatus.running(task.getId()),
TaskLocation.unknown()
);
changeHistory.addChangeRequest(new WorkerHistoryItem.TaskUpdate(announcement));
cleanupAssignedTask(task);
log.info("Task[%s] started.", task.getId());
}
taskAnnouncementChanged(announcement);// 更新znode节点/druid/indexer/status/workerhost 任务状态节点为running状态
taskStarted(task.getId());// 删除znode节点/druid/indexer/tasks/workerhost 任务节点
}
}
添加对任务Location和Status的监听
调用workerTaskMonitor.taskAnnouncementChanged(TaskAnnouncement announcement)
更新znode节点
ContainerCacheListener与InventoryCacheListener代码
private void registerLocationListener()
{
taskRunner.registerListener(
new TaskRunnerListener()
{
@Override
public String getListenerId()
{
return "WorkerTaskManager";
}
@Override
public void locationChanged(final String taskId, final TaskLocation newLocation)
{
submitNoticeToExec(new LocationNotice(taskId, newLocation));
}
@Override
public void statusChanged(final String taskId, final TaskStatus status)
{
// do nothing
}
},
MoreExecutors.sameThreadExecutor()
);
}
private void addRunningTask(final Task task, final ListenableFuture<TaskStatus> future)
{
runningTasks.put(task.getId(), new TaskDetails(task));
Futures.addCallback(
future,
new FutureCallback<TaskStatus>()
{
@Override
public void onSuccess(TaskStatus result)
{
submitNoticeToExec(new StatusNotice(task, result));
}
@Override
public void onFailure(Throwable t)
{
submitNoticeToExec(new StatusNotice(task, TaskStatus.failure(task.getId())));
}
}
);
}
定时任务清理已经完成的任务scheduleCompletedTaskCleanup()
private void scheduleCompletedTasksCleanup()
{
completedTasksCleanupExecutor.scheduleAtFixedRate(
() -> {
try {
if (completedTasks.isEmpty()) {
log.debug("Skipping completed tasks cleanup. Its empty.");
return;
}
ImmutableSet<String> taskIds = ImmutableSet.copyOf(completedTasks.keySet());
Map<String, TaskStatus> taskStatusesFromOverlord = null;
try {
FullResponseHolder fullResponseHolder = overlordClient.go(// DruidLeaderClient获取overlord的leader节点
overlordClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/taskStatus")
.setContent(jsonMapper.writeValueAsBytes(taskIds))
.addHeader(HttpHeaders.Names.ACCEPT, MediaType.APPLICATION_JSON)
.addHeader(HttpHeaders.Names.CONTENT_TYPE, MediaType.APPLICATION_JSON)
);
if (fullResponseHolder.getStatus().getCode() == 200) {
String responseContent = fullResponseHolder.getContent();// 从overlord节点获取已完成任务
taskStatusesFromOverlord = jsonMapper.readValue(responseContent, new TypeReference<Map<String, TaskStatus>>()
{
});
log.debug("Received completed task status response [%s].", responseContent);
} else if (fullResponseHolder.getStatus().getCode() == 404) {
// NOTE: this is to support backward compatibility, when overlord doesn't have "activeTasks" endpoint.
// this if clause should be removed in a future release.
log.debug("Deleting all completed tasks. Overlord appears to be running on older version.");
taskStatusesFromOverlord = ImmutableMap.of();
} else {
log.info(
"Got non-success code[%s] from overlord while getting active tasks. will retry on next scheduled run.",
fullResponseHolder.getStatus().getCode()
);
}
}
catch (Exception ex) {
log.info(ex, "Exception while getting active tasks from overlord. will retry on next scheduled run.");
if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
if (taskStatusesFromOverlord == null) {
return;
}
for (String taskId : taskIds) {
TaskStatus status = taskStatusesFromOverlord.get(taskId);
if (status == null || status.isComplete()) {
log.info(
"Deleting completed task[%s] information, overlord task status[%s].",
taskId,
status == null ? "unknown" : status.getStatusCode()
);
completedTasks.remove(taskId);
File taskFile = new File(getCompletedTaskDir(), taskId);
try {
Files.deleteIfExists(taskFile.toPath());// 删除已完成任务临时文件
changeHistory.addChangeRequest(new WorkerHistoryItem.TaskRemoval(taskId));
}
catch (IOException ex) {
log.error(ex, "Failed to delete completed task from disk [%s].", taskFile);
}
}
}
}
catch (Throwable th) {
log.error(th, "WTF! Got unknown exception while running the scheduled cleanup.");
}
},
1,
5,
TimeUnit.MINUTES
);
}
重启后恢复中断的任务restoreRestorableTask
读取{baseTaskDir}当前集群配置时var/druid/task
读取任务{taskId}/task.json文件,反序列化成task对象
异步调用ForkingTaskRunner.run运行任务,将任务添加到running队列
public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
{
final File restoreFile = getRestoreFile();// 读取${baseTaskDir}/task/restore.json文件,获取需要复原的任务Id
final TaskRestoreInfo taskRestoreInfo;
if (restoreFile.exists()) {
try {
taskRestoreInfo = jsonMapper.readValue(restoreFile, TaskRestoreInfo.class);
}
catch (Exception e) {
log.error(e, "Failed to read restorable tasks from file[%s]. Skipping restore.", restoreFile);
return ImmutableList.of();
}
} else {
return ImmutableList.of();
}
final List<Pair<Task, ListenableFuture<TaskStatus>>> retVal = Lists.newArrayList();
for (final String taskId : taskRestoreInfo.getRunningTasks()) {
try {// 读取任务${baseTaskDir}/task/${taskId}/task.json文件,反序列化成task对象
final File taskFile = new File(taskConfig.getTaskDir(taskId), "task.json");
final Task task = jsonMapper.readValue(taskFile, Task.class);
if (!task.getId().equals(taskId)) {
throw new ISE("WTF?! Task[%s] restore file had wrong id[%s].", taskId, task.getId());
}
if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
log.info("Restoring task[%s].", task.getId());
retVal.add(Pair.of(task, run(task)));// 异步调用ForkingTaskRunner.run运行任务,将任务添加到running队列
}
}
catch (Exception e) {
log.warn(e, "Failed to restore task[%s]. Trying to restore other tasks.", taskId);
}
}
log.info("Restored %,d tasks.", retVal.size());
return retVal;
}
ForkingTaskRunner启动Peon进程
分配http端口
PortFinder根据配置从8100端口开始查找可用端口,并维护一个已经使用的端口列表,peon进程结束端口会被回收
通过执行new ServerSocket(portNum).close();如果没有异常,则认为当前端口可用;如果报BindException异常继续重试其他端口
任务启动参数配置
classpath类路径继承worker进程的类路径
druid.indexer.runner.javaOpts(worker上配置的参数)
"context" : {
"druid.indexer.runner.javaOpts" : "-Xms1024m -Xmx2048m -XX:MaxDirectMemorySize=5g",(单个任务配置)
}
配置文件中的键值对(-D%s=%s)
配置文件中的以druid.indexer.fork.property.开头的键值对(-D%s=%s,覆盖上面相同key的参数)
"context" : {
"druid.indexer.fork.property.druid.query.groupBy.maxOnDiskStorage" : 4294967296(单个任务配置,覆盖上面相同key的参数)
}
host、端口等参数
io.druid.cli.Main internal peon 启动类
taskFile、statusFile、reportsFile路径地址
ProcessBuilder启动peon进程
new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start()启动进程
int statusCode = processHolder.process.waitFor();当前线程一直等待进程执行结束,返回0表示执行成功
需要杀掉任务时调用process.destroy()终止进程,这种方式不能强制终止已经oom的任务,Java的Process.destroy()无法杀掉已经OOM的任务进程导致任务一直无法结束占用middle节点资源
任务堆栈
middle节点突然挂掉 overlord的日志
2019-05-21T03:01:14,381 INFO [qtp631349266-164] io.druid.indexing.overlord.MetadataTaskStorage - Inserting task index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1 with status: TaskStatus{id=index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1, status=RUNNING, duration=-1, errorMsg=null}
2019-05-21T03:01:14,478 INFO [qtp631349266-164] io.druid.indexing.overlord.TaskLockbox - Adding task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] to activeTasks
2019-05-21T03:01:14,479 INFO [TaskQueue-Manager] io.druid.indexing.overlord.TaskQueue - Asking taskRunner to run: index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1
2019-05-21T03:01:14,479 INFO [TaskQueue-Manager] io.druid.indexing.overlord.RemoteTaskRunner - Added pending task index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1
2019-05-21T03:01:15,107 INFO [rtr-pending-tasks-runner-0] io.druid.indexing.overlord.RemoteTaskRunner - Coordinator asking Worker[st3-mdrd-41.prod.yiran.com:8091] to add task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1]
2019-05-21T03:01:15,113 INFO [rtr-pending-tasks-runner-0] io.druid.indexing.overlord.RemoteTaskRunner - Task index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1 switched from pending to running (on [st3-mdrd-41.prod.yiran.com:8091])
2019-05-21T03:01:15,113 INFO [rtr-pending-tasks-runner-0] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] status changed to [RUNNING].
2019-05-21T03:01:15,128 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.RemoteTaskRunner - Worker[st3-mdrd-41.prod.yiran.com:8091] wrote RUNNING status for task [index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] on [TaskLocation{host='null', port=-1, tlsPort=-1}]
2019-05-21T03:01:15,133 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.RemoteTaskRunner - Worker[st3-mdrd-41.prod.yiran.com:8091] wrote RUNNING status for task [index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] on [TaskLocation{host='st3-mdrd-41.prod.yiran.com', port=8106, tlsPort=-1}]
2019-05-21T03:01:15,133 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] location changed to [TaskLocation{host='st3-mdrd-41.prod.yiran.com', port=8106, tlsPort=-1}].
2019-05-21T03:01:27,823 INFO [qtp631349266-144] io.druid.indexing.common.actions.LocalTaskActionClient - Performing action for task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1]: LockAcquireAction{lockType=EXCLUSIVE, interval=2019-05-20T19:00:00.000Z/2019-05-20T20:00:00.000Z, timeoutMs=300000}
2019-05-21T03:01:28,310 INFO [qtp631349266-144] io.druid.indexing.overlord.TaskLockbox - Added task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] to TaskLock[index_realtime_mktdm_msg_push]
2019-05-21T03:01:28,310 INFO [qtp631349266-144] io.druid.indexing.overlord.MetadataTaskStorage - Adding lock on interval[2019-05-20T19:00:00.000Z/2019-05-20T20:00:00.000Z] version[2019-05-20T19:01:20.769Z] for task: index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1
2019-05-21T03:01:28,515 INFO [qtp631349266-174] io.druid.indexing.common.actions.LocalTaskActionClient - Performing action for task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1]: LockAcquireAction{lockType=EXCLUSIVE, interval=2019-05-20T19:00:00.000Z/2019-05-20T20:00:00.000Z, timeoutMs=300000}
2019-05-21T03:01:28,704 INFO [qtp631349266-174] io.druid.indexing.overlord.TaskLockbox - Task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] already present in TaskLock[index_realtime_mktdm_msg_push]
2019-05-21T03:36:24,982 INFO [Curator-PathChildrenCache-0] io.druid.indexing.overlord.RemoteTaskRunner - [st3-mdrd-41.prod.yiran.com:8091]: Found [index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] running
2019-05-21T03:51:25,105 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.RemoteTaskRunner - Failing task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1]
2019-05-21T03:51:25,105 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.TaskQueue - Received FAILED status for task: index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1
2019-05-21T03:51:25,108 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.RemoteTaskRunner - Can't shutdown! No worker running task index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1
2019-05-21T03:51:25,108 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.TaskLockbox - Removing task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] from activeTasks
2019-05-21T03:51:25,108 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.TaskLockbox - Removing task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] from TaskLock[index_realtime_mktdm_msg_push]
2019-05-21T03:51:25,129 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.MetadataTaskStorage - Updating task index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1 to status: TaskStatus{id=index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1, status=FAILED, duration=-1, errorMsg=null}
2019-05-21T03:51:25,137 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.TaskQueue - Task done: AbstractTask{id='index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1', groupId='index_realtime_mktdm_msg_push', taskResource=TaskResource{availabilityGroup='mktdm_msg_push-2019-05-20T19:00:00.000Z-0028', requiredCapacity=1}, dataSource='mktdm_msg_push', context={}}
2019-05-21T03:51:25,137 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.TaskQueue - Task FAILED: AbstractTask{id='index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1', groupId='index_realtime_mktdm_msg_push', taskResource=TaskResource{availabilityGroup='mktdm_msg_push-2019-05-20T19:00:00.000Z-0028', requiredCapacity=1}, dataSource='mktdm_msg_push', context={}} (-1 run duration)
2019-05-21T03:51:25,137 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] status changed to [FAILED].
任务分配过程总结
overlord负责接收任务,将任务状态同步到数据库,借助zookeeper分配任务给worker
需要注意的点有:
单个peon进程的启动参数可以是任务级的,即每个任务可以有单独的启动参数
任务启动http端口是在worker中分配的,可能会有端口冲突的问题
任务oom无法通过调用任务的shutdown接口删除
overlord支持动态配置,可以指定datasource固定在哪些worker上执行
一个任务oom后一直hang住,zookeeper上的任务状态一直是running状态,overlord认为任务还在运行;导致tranquility无法及时感知任务状态,数据发送到oom的任务一直失败,长时间重试,导致消息堆积无法及时发送到正常的备份任务
网友评论