美文网首页
Overlord通过zookeeper分发任务和执行

Overlord通过zookeeper分发任务和执行

作者: sydt2011 | 来源:发表于2019-12-14 12:00 被阅读0次

druid中overlord节点负责任务的分发,通过zookeeper中节点创建与删除管理任务的创建移除等。

动态配置configManager

druid提供了动态修改配置的功能,主要思路是:
ConfigManager模块定时从数据库读取需要监控的配置,更新AtomicReference对象的内容。
使用方通过AtomicReference对象获取配置。
对外提供可以修改配置的接口,接口的功能将更新的配置写到数据库,并将当前配置的key加入到ConfigManager中监控。
例如druid提供通过js的方式,指定不同的任务运行在不同的middlemanager节点上,具体的js配置可以通过数据库查看。

//定时调度poll()方法
private void poll()
{//  ConcurrentMap<String, ConfigHolder> watchedConfigs:存储了以配置的key->配置容器ConfigHolder
  for (Map.Entry<String, ConfigHolder> entry : watchedConfigs.entrySet()) {
    try {// 从数据库读取配置,并更新ConfigHolder对象的reference对象
      if (entry.getValue().swapIfNew(dbConnector.lookup(configTable, "name", "payload", entry.getKey()))) {
        log.info("New value for key[%s] seen.", entry.getKey());
      }
    }
    catch (Exception e) {
      log.warn(e, "Exception when checking property[%s]", entry.getKey());
    }
  }
}
private static class ConfigHolder<T>
{
  private final AtomicReference<byte[]> rawBytes;
  private final ConfigSerde<T> serde;
  private final AtomicReference<T> reference;
 
  ConfigHolder(
      byte[] rawBytes,
      ConfigSerde<T> serde
  )
  {
    this.rawBytes = new AtomicReference<byte[]>(rawBytes);
    this.serde = serde;
    this.reference = new AtomicReference<T>(serde.deserialize(rawBytes));//配置使用方持有AtomicReference对象
  }
 
  public AtomicReference<T> getReference()
  {
    return reference;
  }
 
  public boolean swapIfNew(byte[] newBytes)
  {
    if (!Arrays.equals(newBytes, rawBytes.get())) {
      reference.set(serde.deserialize(newBytes));// 更新reference里的内容,使用方可以感知到配置变化
      rawBytes.set(newBytes);
      return true;
    }
    return false;
  }
}

JacksonConfigManagerProvider中将配置的key添加到ConfigManager,监控对于的配置修改,监控对象的AtomicReference 注入到guice框架,方便使用方获取
http://<OVERLORD_IP>:<port>/druid/indexer/v1/worker

提交post请求修改任务配置workerBehaviorConfig

@POST
  @Path("/worker")
  @Consumes(MediaType.APPLICATION_JSON)
  @ResourceFilters(ConfigResourceFilter.class)
  public Response setWorkerConfig(
      final WorkerBehaviorConfig workerBehaviorConfig,
      @HeaderParam(AuditManager.X_DRUID_AUTHOR) @DefaultValue("") final String author,
      @HeaderParam(AuditManager.X_DRUID_COMMENT) @DefaultValue("") final String comment,
      @Context final HttpServletRequest req
  )
  {
    final SetResult setResult = configManager.set(//修改配置
        WorkerBehaviorConfig.CONFIG_KEY,
        workerBehaviorConfig,
        new AuditInfo(author, comment, req.getRemoteAddr())
    );
    if (setResult.isOk()) {
      log.info("Updating Worker configs: %s", workerBehaviorConfig);
 
      return Response.ok().build();
    } else {
      return Response.status(Response.Status.BAD_REQUEST).build();
    }
  }
  
public <T> SetResult set(final String key, final ConfigSerde<T> serde, final T obj)
{
  if (obj == null || !started) {
    if (obj == null) {
      return SetResult.fail(new IllegalAccessException("input obj is null"));
    } else {
      return SetResult.fail(new IllegalStateException("configManager is not started yet"));
    }
  }
 
  final byte[] newBytes = serde.serialize(obj);
 
  try {//此处使用专门的线程池(一个线程),应该是为了避免并发调用更新数据库频率太高
    exec.submit(
        () -> {//将提交的配置更新到数据库
          dbConnector.insertOrUpdate(configTable, "name", "payload", key, newBytes);
 
          final ConfigHolder configHolder = watchedConfigs.get(key);
          if (configHolder != null) {//更新configHolder对象,即使用方持有的AtomicReference对象
            configHolder.swapIfNew(newBytes);
          }
 
          return true;
        }
    ).get();
    return SetResult.ok();
  }
  catch (Exception e) {
    log.warn(e, "Failed to set[%s]", key);
    return SetResult.fail(e);
  }
}

Overlord接收任务

只有master节点接收任务提交,其他slave节点返回503code
taskMaster监控leader的变更,当节点成为leader后,创建TaskQueue队列存放任务,taskRunner运行任务。
将提交的任务插入到mysql的druid_tasks表,任务状态为running;将任务添加到tasks列表,供定时任务处理
将任务配置,具体指payload,加到任务表,任务状态时running。
任务添加到tasks列表,taskLockbox列表

@POST
@Path("/task")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response taskPost(
    final Task task,
    @Context final HttpServletRequest req
)
{
 
  return asLeaderWith(// 只有master节点接受任务提交,其他slave节点返回503 code
      taskMaster.getTaskQueue(),// taskMaster监听leader节点变更,当成为leader节点后,创建TaskQueue存放任务、taskRunner运行任务
      new Function<TaskQueue, Response>()
      {
        @Override
        public Response apply(TaskQueue taskQueue)
        {
          try {
            taskQueue.add(task);//将提交的任务插入到mysql数据库druid_tasks表,任务状态置为running;将任务添加到tasks列表,供定时任务处理
            return Response.ok(ImmutableMap.of("task", task.getId())).build();
          }
          catch (EntryExistsException e) {
            ...;
          }
        }
      }
  );

TaskQueue通过数据库管理任务

添加任务-http请求调用,将任务写到数据库并添加任务到任务列表中
定时线程定时将数据中active的表同步到内存在任务列表中
对上一步添加的任务执行RemoteTaskRunner.run,添加了任务异步回调逻辑,
当任务执行完成后同步任务状态到数据库。

/**
 * Main task runner management loop. Meant to run forever, or, at least until we're stopped.
 */
private void manage() throws InterruptedException
{
  log.info("Beginning management in %s.", config.getStartDelay());
  Thread.sleep(config.getStartDelay().getMillis());
 
  // Ignore return value- we'll get the IDs and futures from getKnownTasks later.
  taskRunner.restore();
 
  while (active) {
    giant.lock();
 
    try {
      // Task futures available from the taskRunner
      final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = Maps.newHashMap();
      for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) {
        runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());
      }
      // Attain futures for all active tasks (assuming they are ready to run).
      // Copy tasks list, as notifyStatus may modify it.
      for (final Task task : ImmutableList.copyOf(tasks)) {
        if (!taskFutures.containsKey(task.getId())) {
          final ListenableFuture<TaskStatus> runnerTaskFuture;
          if (runnerTaskFutures.containsKey(task.getId())) {
            runnerTaskFuture = runnerTaskFutures.get(task.getId());
          } else {
            // Task should be running, so run it.
            final boolean taskIsReady;
            try {
              taskIsReady = task.isReady(taskActionClientFactory.create(task));
            }
            catch (Exception e) {
              log.warn(e, "Exception thrown during isReady for task: %s", task.getId());
              notifyStatus(task, TaskStatus.failure(task.getId()));
              continue;
            }
            if (taskIsReady) {
              log.info("Asking taskRunner to run: %s", task.getId());
              runnerTaskFuture = taskRunner.run(task);// 执行RemoteTaskRunner.run ,返回ListenableFuture<TaskStatus>,可以对这个Future添加监听逻辑
            } else {
              continue;
            }
          }
          taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));// 添加回调逻辑,当任务成功或失败时同步任务状态到数据库
        }
      }
      // Kill tasks that shouldn't be running
      final Set<String> tasksToKill = Sets.difference(
          runnerTaskFutures.keySet(),
          ImmutableSet.copyOf(
              Lists.transform(
                  tasks,
                  new Function<Task, Object>()
                  {
                    @Override
                    public String apply(Task task)
                    {
                      return task.getId();
                    }
                  }
              )
          )
      );
      if (!tasksToKill.isEmpty()) {
        log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size());
        for (final String taskId : tasksToKill) {
          try {
            taskRunner.shutdown(taskId);
          }
          catch (Exception e) {
            log.warn(e, "TaskRunner failed to clean up task: %s", taskId);
          }
        }
      }
      // awaitNanos because management may become necessary without this condition signalling,
      // due to e.g. tasks becoming ready when other folks mess with the TaskLockbox.
      managementMayBeNecessary.awaitNanos(60000000000L /* 60 seconds */);
    }
    finally {
      giant.unlock();
    }
  }
}
  
  
private void notifyStatus(final Task task, final TaskStatus taskStatus)
{
  giant.lock();
 
  try {
    Preconditions.checkNotNull(task, "task");
    Preconditions.checkNotNull(taskStatus, "status");
    Preconditions.checkState(active, "Queue is not active!");
    Preconditions.checkArgument(
        task.getId().equals(taskStatus.getId()),
        "Mismatching task ids[%s/%s]",
        task.getId(),
        taskStatus.getId()
    );
    // Inform taskRunner that this task can be shut down
    try {
      taskRunner.shutdown(task.getId());// 如果正在运行的任务就向work发起http请求/task/%s/shutdown,停掉任务;
    }
    catch (Exception e) {
      log.warn(e, "TaskRunner failed to cleanup task after completion: %s", task.getId());
    }
    // Remove from running tasks
    int removed = 0;
    for (int i = tasks.size() - 1; i >= 0; i--) {
      if (tasks.get(i).getId().equals(task.getId())) {
        removed++;
        removeTaskInternal(tasks.get(i));//清理内存中已经完成的任务
        break;
      }
    }
    if (removed == 0) {
      log.warn("Unknown task completed: %s", task.getId());
    } else if (removed > 1) {
      log.makeAlert("Removed multiple copies of task").addData("count", removed).addData("task", task.getId()).emit();
    }
    // Remove from futures list
    taskFutures.remove(task.getId());
    if (removed > 0) {
      // If we thought this task should be running, save status to DB
      try {
        final Optional<TaskStatus> previousStatus = taskStorage.getStatus(task.getId());
        if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) {
          log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit();
        } else {
          taskStorage.setStatus(taskStatus);// 任务同步到数据库
          log.info("Task done: %s", task);
          managementMayBeNecessary.signalAll();
        }
      }
      catch (Exception e) {
        log.makeAlert(e, "Failed to persist status for task")
           .addData("task", task.getId())
           .addData("statusCode", taskStatus.getStatusCode())
           .emit();
      }
    }
  }
  finally {
    giant.unlock();
  }
}

RemoteTaskRunner与zookeeper交互实现对任务的管理

监控znode节点/druid/indexer/announcements

  • 新增worker
    调用addworker方法,添加对status的监听/druid/indexer/status/worker.getHost()
  • 更新zkworker
    worker的配置有更新,则更新zkworker,原先的监控不变
  • 删除worker
    获取所有分配到当前worker的任务,延迟config.getTaskCleanupTimeout(15min)清理这些任务,删除
    znode的/druid/indexer/status/worker.getHost 和 /druid/indexer/tasks/worker.getHost()/taskId;最后删除/druid/indexer/status
    停止zkworker的znode节点的status的监控
// Add listener for creation/deletion of workers
workerPathCache.getListenable().addListener(
    new PathChildrenCacheListener()
    {
      @Override
      public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception
      {
        final Worker worker;
        switch (event.getType()) {
          case CHILD_ADDED:// 新增worker
            worker = jsonMapper.readValue(
                event.getData().getData(),
                Worker.class
            );
            synchronized (waitingForMonitor) {
              waitingFor.increment();
            }
            Futures.addCallback(
                addWorker(worker),
                new FutureCallback<ZkWorker>()
                {
                  @Override
                  public void onSuccess(ZkWorker zkWorker)
                  {
                    synchronized (waitingForMonitor) {
                      waitingFor.decrement();
                      waitingForMonitor.notifyAll();
                    }
                  }
 
                  @Override
                  public void onFailure(Throwable throwable)
                  {
                    synchronized (waitingForMonitor) {
                      waitingFor.decrement();
                      waitingForMonitor.notifyAll();
                    }
                  }
                }
            );
            break;
          case CHILD_UPDATED:
            worker = jsonMapper.readValue(
                event.getData().getData(),
                Worker.class
            );
            updateWorker(worker);
            break;
 
          case CHILD_REMOVED:
            worker = jsonMapper.readValue(
                event.getData().getData(),
                Worker.class
            );
            removeWorker(worker);
            break;
          case INITIALIZED:
            // Schedule cleanup for task status of the workers that might have disconnected while overlord was not running
            List<String> workers;
            try {
              workers = cf.getChildren().forPath(indexerZkConfig.getStatusPath());
            }
            catch (KeeperException.NoNodeException e) {
              // statusPath doesn't exist yet; can occur if no middleManagers have started.
              workers = ImmutableList.of();
            }
            for (String workerId : workers) {
              final String workerAnnouncePath = JOINER.join(indexerZkConfig.getAnnouncementsPath(), workerId);
              final String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), workerId);
              if (!zkWorkers.containsKey(workerId) && cf.checkExists().forPath(workerAnnouncePath) == null) {
                try {
                  scheduleTasksCleanupForWorker(workerId, cf.getChildren().forPath(workerStatusPath));
                }
                catch (Exception e) {
                  log.warn(
                      e,
                      "Could not schedule cleanup for worker[%s] during startup (maybe someone removed the status znode[%s]?). Skipping.",
                      workerId,
                      workerStatusPath
                  );
                }
              }
            }
            synchronized (waitingForMonitor) {
              waitingFor.decrement();
              waitingForMonitor.notifyAll();
            }
            break;
          case CONNECTION_SUSPENDED:
          case CONNECTION_RECONNECTED:
          case CONNECTION_LOST:
            // do nothing
        }
      }
    }
);
workerPathCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

ZKworker监控znode节点/druid/indexer/status

  • 新增或者更新任务状态
    Location变更,更新任务locational.所谓的location就是任务运行在那个节点上.
    任务完成,更新zkworker最新任务完成时间和连续失败的任务数;从runningtasks删除任务,添加到completeTasks
  • 删除znode
    任务消失,从runningtasks删除任务
  • 初始化完成
    zkworker添加到zkworkers,调用runPendingtask()运行pending的任务。
 // Add status listener to the watcher for status changes
zkWorker.addListener(
    new PathChildrenCacheListener()
    {
      @Override
      public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
      {
        final String taskId;
        final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
        synchronized (statusLock) {
          try {
            switch (event.getType()) {
              case CHILD_ADDED:
              case CHILD_UPDATED:// 任务status发生变化
                taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
                final TaskAnnouncement announcement = jsonMapper.readValue(
                    event.getData().getData(), TaskAnnouncement.class
                );
 
                log.info(
                    "Worker[%s] wrote %s status for task [%s] on [%s]",
                    zkWorker.getWorker().getHost(),
                    announcement.getTaskStatus().getStatusCode(),
                    taskId,
                    announcement.getTaskLocation()
                );
 
                // Synchronizing state with ZK
                statusLock.notifyAll();
 
                final RemoteTaskRunnerWorkItem tmp;
                if ((tmp = runningTasks.get(taskId)) != null) {
                  taskRunnerWorkItem = tmp;
                } else {
                  final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
                      taskId,
                      announcement.getTaskType(),
                      zkWorker.getWorker(),
                      TaskLocation.unknown(),
                      announcement.getTaskDataSource()
                  );
                  final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
                      taskId,
                      newTaskRunnerWorkItem
                  );
                  if (existingItem == null) {
                    log.warn(
                        "Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
                        zkWorker.getWorker().getHost(),
                        taskId
                    );
                    taskRunnerWorkItem = newTaskRunnerWorkItem;
                  } else {
                    taskRunnerWorkItem = existingItem;
                  }
                }
 
                if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
                  taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
                  TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
                }
 
                if (announcement.getTaskStatus().isComplete()) {// 任务完成
                  taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus());
                  runPendingTasks();
                }
                break;
              case CHILD_REMOVED:// 任务消失
                taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
                taskRunnerWorkItem = runningTasks.remove(taskId);
                if (taskRunnerWorkItem != null) {
                  log.info("Task[%s] just disappeared!", taskId);
                  taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
                  TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId));
                } else {
                  log.info("Task[%s] went bye bye.", taskId);
                }
                break;
              case INITIALIZED:
                if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
                  retVal.set(zkWorker);
                } else {
                  final String message = StringUtils.format(
                      "WTF?! Tried to add already-existing worker[%s]",
                      worker.getHost()
                  );
                  log.makeAlert(message)
                     .addData("workerHost", worker.getHost())
                     .addData("workerIp", worker.getIp())
                     .emit();
                  retVal.setException(new IllegalStateException(message));
                }
                runPendingTasks();
                break;
              case CONNECTION_SUSPENDED:
              case CONNECTION_RECONNECTED:
              case CONNECTION_LOST:
                // do nothing
            }
          }
          catch (Exception e) {
            log.makeAlert(e, "Failed to handle new worker status")
               .addData("worker", zkWorker.getWorker().getHost())
               .addData("znode", event.getData().getPath())
               .emit();
          }
        }
      }
    }
);
zkWorker.start();

运行pending的任务runPendingTasks()

按照任务的插入时间排序
分配并运行任务tryAssignTask(task,taskRunnerWorkItem)

`private` `void` `runPendingTasks()`

`{`

`runPendingTasksExec.submit(`

`new` `Callable<Void>()`

`{`

`@Override`

`public` `Void call() ``throws` `Exception`

`{`

`try` `{`

`// make a copy of the pending tasks because tryAssignTask may delete tasks from pending and move them`

`// into running status`

`List<RemoteTaskRunnerWorkItem> copy = Lists.newArrayList(pendingTasks.values());`

`sortByInsertionTime(copy);``// 按任务的插入时间排序`

`for` `(RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) {`

`String taskId = taskRunnerWorkItem.getTaskId();`

`if` `(tryAssignTasks.putIfAbsent(taskId, taskId) == ``null``) {`

`try` `{`

`//this can still be null due to race from explicit task shutdown request`

`//or if another thread steals and completes this task right after this thread makes copy`

`//of pending tasks. See [https://github.com/druid-io/druid/issues/2842](https://github.com/druid-io/druid/issues/2842) .`

`Task task = pendingTaskPayloads.get(taskId);`

`if` `(task != ``null` `&& tryAssignTask(task, taskRunnerWorkItem)) {``// 分配并运行任务`

`pendingTaskPayloads.remove(taskId);`

`}`

`}`

`catch` `(Exception e) {`

`log.makeAlert(e, ``"Exception while trying to assign task"``)`

`.addData(``"taskId"``, taskRunnerWorkItem.getTaskId())`

`.emit();`

`RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);`

`if` `(workItem != ``null``) {`

`taskComplete(workItem, ``null``, TaskStatus.failure(taskId));``// 任务完成,更新zkWorker最新任务完成时间和连续失败的任务数;从runningTasks删除任务,添加到completeTasks`

`}`

`}`

`finally` `{`

`tryAssignTasks.remove(taskId);`

`}`

`}`

`}`

`}`

`catch` `(Exception e) {`

`log.makeAlert(e, ``"Exception in running pending tasks"``).emit();`

`}`

`return` `null``;`

`}`

`}`

`);`

`}`

分配并运行任务tryAssignTask

根据配置workerBehaviorConfig的策略分配worker
通过zookeeper向 worker分配任务announceTask(task,assignworker,taskrunnerworkItem,)

 /**
 * Ensures no workers are already running a task before assigning the task to a worker.
 * It is possible that a worker is running a task that the RTR has no knowledge of. This occurs when the RTR
 * needs to bootstrap after a restart.
 *
 * @param taskRunnerWorkItem - the task to assign
 *
 * @return true iff the task is now assigned
 */
private boolean tryAssignTask(final Task task, final RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception
{
  Preconditions.checkNotNull(task, "task");
  Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
  Preconditions.checkArgument(task.getId().equals(taskRunnerWorkItem.getTaskId()), "task id != workItem id");
 
  if (runningTasks.containsKey(task.getId()) || findWorkerRunningTask(task.getId()) != null) {
    log.info("Task[%s] already running.", task.getId()); // 已经有worker在运行这个任务
    return true;
  } else {
    // Nothing running this task, announce it in ZK for a worker to run it
    WorkerBehaviorConfig workerConfig = workerConfigRef.get();// 动态配置,可以及时感受到配置修改
    WorkerSelectStrategy strategy;
    if (workerConfig == null || workerConfig.getSelectStrategy() == null) {
      strategy = WorkerBehaviorConfig.DEFAULT_STRATEGY;// 默认的worker分配策略,按所有worker可用任务数排序
      log.debug("No worker selection strategy set. Using default of [%s]", strategy.getClass().getSimpleName());
    } else {
      strategy = workerConfig.getSelectStrategy();
    }
 
    ZkWorker assignedWorker = null;
    final ImmutableWorkerInfo immutableZkWorker;
    try {
      synchronized (workersWithUnacknowledgedTask) {
        immutableZkWorker = strategy.findWorkerForTask(// 分配worker
            config,
            ImmutableMap.copyOf(
                Maps.transformEntries(
                    Maps.filterEntries(
                        zkWorkers, new Predicate<Map.Entry<String, ZkWorker>>()
                        {
                          @Override
                          public boolean apply(Map.Entry<String, ZkWorker> input)
                          {
                            return !lazyWorkers.containsKey(input.getKey()) &&
                                   !workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
                                   !blackListedWorkers.contains(input.getValue());
                          }
                        }
                    ),
                    new Maps.EntryTransformer<String, ZkWorker, ImmutableWorkerInfo>()
                    {
                      @Override
                      public ImmutableWorkerInfo transformEntry(
                          String key, ZkWorker value
                      )
                      {
                        return value.toImmutable();
                      }
                    }
                )
            ),
            task
        );
 
        if (immutableZkWorker != null &&
            workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.getWorker().getHost(), task.getId())
              == null) {
          assignedWorker = zkWorkers.get(immutableZkWorker.getWorker().getHost());
        }
      }
 
      if (assignedWorker != null) {
        return announceTask(task, assignedWorker, taskRunnerWorkItem);// 通过zookeeper向worker分配任务
      } else {
        log.debug(
            "Unsuccessful task-assign attempt for task [%s] on workers [%s]. Workers to ack tasks are [%s].",
            task.getId(),
            zkWorkers.values(),
            workersWithUnacknowledgedTask
        );
      }
 
      return false;
    }
    finally {
      if (assignedWorker != null) {
        workersWithUnacknowledgedTask.remove(assignedWorker.getWorker().getHost());
        //if this attempt won the race to run the task then other task might be able to use this worker now after task ack.
        runPendingTasks();
      }
    }
  }
}

通过zookeeper向worker分配任务announceTask(task, assignedWorker, taskRunnerWorkItem)
主要在zookeeper中建立新的znode节点/druid/indexer/tasks/worker/taskId
通过statuslock加锁,闲置等待任务被worker接收以后,才能任务任务分配成功;
只有当当前任务分配完成,才会继续分配一下个任务
ContainerCacheListener与InventoryCacheListener代码

/**
 * Creates a ZK entry under a specific path associated with a worker. The worker is responsible for
 * removing the task ZK entry and creating a task status ZK entry.
 *
 * @param theZkWorker        The worker the task is assigned to
 * @param taskRunnerWorkItem The task to be assigned
 *
 * @return boolean indicating whether the task was successfully assigned or not
 */
private boolean announceTask(
    final Task task,
    final ZkWorker theZkWorker,
    final RemoteTaskRunnerWorkItem taskRunnerWorkItem
) throws Exception
{
  Preconditions.checkArgument(task.getId().equals(taskRunnerWorkItem.getTaskId()), "task id != workItem id");
  final String worker = theZkWorker.getWorker().getHost();
  synchronized (statusLock) {
    if (!zkWorkers.containsKey(worker) || lazyWorkers.containsKey(worker)) {
      // the worker might have been killed or marked as lazy
      log.info("Not assigning task to already removed worker[%s]", worker);
      return false;
    }
    log.info("Coordinator asking Worker[%s] to add task[%s]", worker, task.getId());
 
    CuratorUtils.createIfNotExists(//创建znode节点/druid/indexer/tasks/worker/taskId
        cf,
        JOINER.join(indexerZkConfig.getTasksPath(), worker, task.getId()),
        CreateMode.EPHEMERAL,
        jsonMapper.writeValueAsBytes(task),
        config.getMaxZnodeBytes()
    );
 
    RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(task.getId());//删除pendingTasks中的pending任务
    if (workItem == null) {
      log.makeAlert("WTF?! Got a null work item from pending tasks?! How can this be?!")
         .addData("taskId", task.getId())
         .emit();
      return false;
    }
 
    RemoteTaskRunnerWorkItem newWorkItem = workItem.withWorker(theZkWorker.getWorker(), null);
    runningTasks.put(task.getId(), newWorkItem);// 添加到runningTasks
    log.info("Task %s switched from pending to running (on [%s])", task.getId(), newWorkItem.getWorker().getHost());
    TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.running(task.getId()));
 
    // Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running
    // on a worker - this avoids overflowing a worker with tasks
    Stopwatch timeoutStopwatch = Stopwatch.createStarted();
    while (!isWorkerRunningTask(theZkWorker, task.getId())) {// 重要!!!这里通过statusLock加锁,限制等待任务被worker接受以后才认为任务分配成功
      final long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
      statusLock.wait(waitMs);
      long elapsed = timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS);
      if (elapsed >= waitMs) {
        log.makeAlert(
            "Task assignment timed out on worker [%s], never ran task [%s]! Timeout: (%s >= %s)!",
            worker,
            task.getId(),
            elapsed,
            config.getTaskAssignmentTimeout()
        ).emit();
        taskComplete(taskRunnerWorkItem, theZkWorker, TaskStatus.failure(task.getId()));
        break;
      }
    }
    return true;
  }
}

选择分配worker节点
如果有单独的配置,知名某些datasource分配到那些worker节点,优先将datasource分配到这些节点。
这里通过js配置
如果知名worker节点没有可用的可以执行的任务,分配没有明确指定的datasource 的worker节点

 
/**
 * Helper for {@link WorkerSelectStrategy} implementations.
 *
 * @param allWorkers     map of all workers, in the style provided to {@link WorkerSelectStrategy}
 * @param affinityConfig affinity config, or null
 * @param workerSelector function that receives a list of eligible workers: version is high enough, worker can run
 *                       the task, and worker satisfies the affinity config. may return null.
 *
 * @return selected worker from "allWorkers", or null.
 */
@Nullable
public static ImmutableWorkerInfo selectWorker(
    final Task task,
    final Map<String, ImmutableWorkerInfo> allWorkers,
    final WorkerTaskRunnerConfig workerTaskRunnerConfig,
    @Nullable final AffinityConfig affinityConfig,
    final Function<ImmutableMap<String, ImmutableWorkerInfo>, ImmutableWorkerInfo> workerSelector
)
{
  // Workers that could potentially run this task, ignoring affinityConfig.
  final Map<String, ImmutableWorkerInfo> runnableWorkers = allWorkers
      .values()
      .stream()
      .filter(worker -> worker.canRunTask(task)
                        && worker.isValidVersion(workerTaskRunnerConfig.getMinWorkerVersion()))
      .collect(Collectors.toMap(w -> w.getWorker().getHost(), Function.identity()));
 
  if (affinityConfig == null) {
    // All runnable workers are valid.
    return workerSelector.apply(ImmutableMap.copyOf(runnableWorkers));// 没有单独配置,直接应用worker分配策略
  } else {
    // Workers assigned to the affinity pool for our task.
    final Set<String> dataSourceWorkers = affinityConfig.getAffinity().get(task.getDataSource());
 
    if (dataSourceWorkers == null) {
      // No affinity config for this dataSource; use non-affinity workers.
      return workerSelector.apply(getNonAffinityWorkers(affinityConfig, runnableWorkers));
    } else {// 指明某些datasource分配到哪些worker节点,优先将datasource分配到这些节点
      // Get runnable, affinity workers.
      final ImmutableMap<String, ImmutableWorkerInfo> dataSourceWorkerMap =
          ImmutableMap.copyOf(Maps.filterKeys(runnableWorkers, dataSourceWorkers::contains));
 
      final ImmutableWorkerInfo selected = workerSelector.apply(dataSourceWorkerMap);
 
      if (selected != null) {
        return selected;
      } else if (affinityConfig.isStrong()) {
        return null;
      } else {
        // Weak affinity allows us to use nonAffinityWorkers for this dataSource, if no affinity workers
        // are available.
        return workerSelector.apply(getNonAffinityWorkers(affinityConfig, runnableWorkers));// 如果指明的worker节点没有可用的可以执行的任务,分配没有指明datasource的worker节点
      }
    }
  }
}

实际应用中,一般是均衡策略。
EqualDistributionWorkerSelectStrategy的分配策略是将所有worker节点按还可以执行的任务数排序,选择任务数最多的节点

public ImmutableWorkerInfo findWorkerForTask(
    final WorkerTaskRunnerConfig config,
    final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
    final Task task
)
{
  return WorkerSelectUtils.selectWorker(
      task,
      zkWorkers,
      config,
      affinityConfig,
      EqualDistributionWorkerSelectStrategy::selectFromEligibleWorkers
  );
}
 
private static ImmutableWorkerInfo selectFromEligibleWorkers(final Map<String, ImmutableWorkerInfo> eligibleWorkers)
{
  return eligibleWorkers.values().stream().max(
      Comparator.comparing(ImmutableWorkerInfo::getAvailableCapacity)
  ).orElse(null);
}

worker节点借助zookeeper接收任务,更新任务状态

WorkerCuratorCoordinator创建znode,对外停工当前可用的worker

  • 创建持久类型的znode节点/druid/indexer/tasks/workerhost 任务节点,用于接收overlord分配的任务
  • 创建持久类型的znode节点/druid/indexer/status/workerhost 任务状态节点,worker更新此节点,overlord监听此节点接收任务状态变化
  • Announcer创建/druid/indexer/announcements/workerhost worker节点,声明可用的worker

WoekerTaskMonitor监听新分配的任务

监听znode节点/druid/indexer/tasks/workerhost的新增节点
反序列画节点内容为task对象,调用assignTask(Task:task):异步调用ForkingTaskRunner.run运行任务,将任务添加到running队列
此时认为任务分配成功,更新znode节点/druid/indexer/status/workerhost 任务状态节点为running状态;删除znode节点/druid/indexer/tasks/workerhost任务节点

public void assignTask(Task task)
{
  Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.SECONDS), "not started");
 
  synchronized (lock) {
    if (assignedTasks.containsKey(task.getId())
        || runningTasks.containsKey(task.getId())
        || completedTasks.containsKey(task.getId())) {
      log.info("Assign task[%s] request ignored because it exists already.", task.getId());
      return;
    }
 
    try {
      jsonMapper.writeValue(new File(getAssignedTaskDir(), task.getId()), task);
      assignedTasks.put(task.getId(), task);
    }
    catch (IOException ex) {
      log.error(ex, "Error while trying to persist assigned task[%s]", task.getId());
      throw new ISE("Assign Task[%s] Request failed because [%s].", task.getId(), ex.getMessage());
    }
 
    changeHistory.addChangeRequest(//记录变更历史
        new WorkerHistoryItem.TaskUpdate(
            TaskAnnouncement.create(
                task,
                TaskStatus.running(task.getId()),
                TaskLocation.unknown()
            )
        )
    );
  }
 
  submitNoticeToExec(new RunNotice(task));// 提交任务运行
}
private class RunNotice implements Notice
{
  private final Task task;
 
  public RunNotice(Task task)
  {
    this.task = task;
  }
 
  @Override
  public String getTaskId()
  {
    return task.getId();
  }
 
  @Override
  public void handle() throws Exception
  {
    TaskAnnouncement announcement = null;
    synchronized (lock) {
      if (runningTasks.containsKey(task.getId()) || completedTasks.containsKey(task.getId())) {
        log.warn(
            "Got run notice for task [%s] that I am already running or completed...",
            task.getId()
        );
 
        taskStarted(task.getId());
        return;
      }
 
      final ListenableFuture<TaskStatus> future = taskRunner.run(task);//异步调用ForkingTaskRunner.run运行任务
      addRunningTask(task, future);//将任务添加到running队列
 
      announcement = TaskAnnouncement.create(
          task,
          TaskStatus.running(task.getId()),
          TaskLocation.unknown()
      );
 
      changeHistory.addChangeRequest(new WorkerHistoryItem.TaskUpdate(announcement));
 
      cleanupAssignedTask(task);
      log.info("Task[%s] started.", task.getId());
    }
 
    taskAnnouncementChanged(announcement);// 更新znode节点/druid/indexer/status/workerhost 任务状态节点为running状态
    taskStarted(task.getId());// 删除znode节点/druid/indexer/tasks/workerhost 任务节点
 
  }
}

添加对任务Location和Status的监听

调用workerTaskMonitor.taskAnnouncementChanged(TaskAnnouncement announcement)
更新znode节点

ContainerCacheListener与InventoryCacheListener代码
 private void registerLocationListener()
{
  taskRunner.registerListener(
      new TaskRunnerListener()
      {
        @Override
        public String getListenerId()
        {
          return "WorkerTaskManager";
        }
 
        @Override
        public void locationChanged(final String taskId, final TaskLocation newLocation)
        {
          submitNoticeToExec(new LocationNotice(taskId, newLocation));
        }
 
        @Override
        public void statusChanged(final String taskId, final TaskStatus status)
        {
          // do nothing
        }
      },
      MoreExecutors.sameThreadExecutor()
  );
}
 
private void addRunningTask(final Task task, final ListenableFuture<TaskStatus> future)
{
  runningTasks.put(task.getId(), new TaskDetails(task));
  Futures.addCallback(
      future,
      new FutureCallback<TaskStatus>()
      {
        @Override
        public void onSuccess(TaskStatus result)
        {
          submitNoticeToExec(new StatusNotice(task, result));
        }
 
        @Override
        public void onFailure(Throwable t)
        {
          submitNoticeToExec(new StatusNotice(task, TaskStatus.failure(task.getId())));
        }
      }
  );
}

定时任务清理已经完成的任务scheduleCompletedTaskCleanup()

private void scheduleCompletedTasksCleanup()
{
  completedTasksCleanupExecutor.scheduleAtFixedRate(
      () -> {
        try {
          if (completedTasks.isEmpty()) {
            log.debug("Skipping completed tasks cleanup. Its empty.");
            return;
          }
 
          ImmutableSet<String> taskIds = ImmutableSet.copyOf(completedTasks.keySet());
          Map<String, TaskStatus> taskStatusesFromOverlord = null;
 
          try {
            FullResponseHolder fullResponseHolder = overlordClient.go(// DruidLeaderClient获取overlord的leader节点
                overlordClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/taskStatus")
                              .setContent(jsonMapper.writeValueAsBytes(taskIds))
                              .addHeader(HttpHeaders.Names.ACCEPT, MediaType.APPLICATION_JSON)
                              .addHeader(HttpHeaders.Names.CONTENT_TYPE, MediaType.APPLICATION_JSON)
 
            );
            if (fullResponseHolder.getStatus().getCode() == 200) {
              String responseContent = fullResponseHolder.getContent();// 从overlord节点获取已完成任务
              taskStatusesFromOverlord = jsonMapper.readValue(responseContent, new TypeReference<Map<String, TaskStatus>>()
              {
              });
              log.debug("Received completed task status response [%s].", responseContent);
            } else if (fullResponseHolder.getStatus().getCode() == 404) {
              // NOTE: this is to support backward compatibility, when overlord doesn't have "activeTasks" endpoint.
              // this if clause should be removed in a future release.
              log.debug("Deleting all completed tasks. Overlord appears to be running on older version.");
              taskStatusesFromOverlord = ImmutableMap.of();
            } else {
              log.info(
                  "Got non-success code[%s] from overlord while getting active tasks. will retry on next scheduled run.",
                  fullResponseHolder.getStatus().getCode()
              );
            }
          }
          catch (Exception ex) {
            log.info(ex, "Exception while getting active tasks from overlord. will retry on next scheduled run.");
 
            if (ex instanceof InterruptedException) {
              Thread.currentThread().interrupt();
            }
          }
 
          if (taskStatusesFromOverlord == null) {
            return;
          }
 
          for (String taskId : taskIds) {
            TaskStatus status = taskStatusesFromOverlord.get(taskId);
            if (status == null || status.isComplete()) {
 
              log.info(
                  "Deleting completed task[%s] information, overlord task status[%s].",
                  taskId,
                  status == null ? "unknown" : status.getStatusCode()
              );
 
              completedTasks.remove(taskId);
              File taskFile = new File(getCompletedTaskDir(), taskId);
              try {
                Files.deleteIfExists(taskFile.toPath());// 删除已完成任务临时文件
                changeHistory.addChangeRequest(new WorkerHistoryItem.TaskRemoval(taskId));
              }
              catch (IOException ex) {
                log.error(ex, "Failed to delete completed task from disk [%s].", taskFile);
              }
 
            }
          }
        }
        catch (Throwable th) {
          log.error(th, "WTF! Got unknown exception while running the scheduled cleanup.");
        }
      },
      1,
      5,
      TimeUnit.MINUTES
  );
}

重启后恢复中断的任务restoreRestorableTask

读取{baseTaskDir}/restore.json文件,获取需要复原的任务id,{baseTaskDir}当前集群配置时var/druid/task
读取任务{baseTaskDir}/{taskId}/task.json文件,反序列化成task对象

异步调用ForkingTaskRunner.run运行任务,将任务添加到running队列

public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
{
  final File restoreFile = getRestoreFile();// 读取${baseTaskDir}/task/restore.json文件,获取需要复原的任务Id
  final TaskRestoreInfo taskRestoreInfo;
  if (restoreFile.exists()) {
    try {
      taskRestoreInfo = jsonMapper.readValue(restoreFile, TaskRestoreInfo.class);
    }
    catch (Exception e) {
      log.error(e, "Failed to read restorable tasks from file[%s]. Skipping restore.", restoreFile);
      return ImmutableList.of();
    }
  } else {
    return ImmutableList.of();
  }
 
  final List<Pair<Task, ListenableFuture<TaskStatus>>> retVal = Lists.newArrayList();
  for (final String taskId : taskRestoreInfo.getRunningTasks()) {
    try {// 读取任务${baseTaskDir}/task/${taskId}/task.json文件,反序列化成task对象
      final File taskFile = new File(taskConfig.getTaskDir(taskId), "task.json");
      final Task task = jsonMapper.readValue(taskFile, Task.class);
 
      if (!task.getId().equals(taskId)) {
        throw new ISE("WTF?! Task[%s] restore file had wrong id[%s].", taskId, task.getId());
      }
 
      if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
        log.info("Restoring task[%s].", task.getId());
        retVal.add(Pair.of(task, run(task)));// 异步调用ForkingTaskRunner.run运行任务,将任务添加到running队列
      }
    }
    catch (Exception e) {
      log.warn(e, "Failed to restore task[%s]. Trying to restore other tasks.", taskId);
    }
  }
 
  log.info("Restored %,d tasks.", retVal.size());
 
  return retVal;
}

ForkingTaskRunner启动Peon进程

分配http端口

PortFinder根据配置从8100端口开始查找可用端口,并维护一个已经使用的端口列表,peon进程结束端口会被回收

通过执行new ServerSocket(portNum).close();如果没有异常,则认为当前端口可用;如果报BindException异常继续重试其他端口

案例详见Druid 任务因端口占用导致启动失败

任务启动参数配置

classpath类路径继承worker进程的类路径

druid.indexer.runner.javaOpts(worker上配置的参数)

"context" : {

"druid.indexer.runner.javaOpts" : "-Xms1024m  -Xmx2048m  -XX:MaxDirectMemorySize=5g",(单个任务配置)

}

配置文件中的键值对(-D%s=%s)

配置文件中的以druid.indexer.fork.property.开头的键值对(-D%s=%s,覆盖上面相同key的参数)

"context" : {

"druid.indexer.fork.property.druid.query.groupBy.maxOnDiskStorage" : 4294967296(单个任务配置,覆盖上面相同key的参数)

}

host、端口等参数

io.druid.cli.Main internal peon 启动类

taskFile、statusFile、reportsFile路径地址

ProcessBuilder启动peon进程

new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start()启动进程

int statusCode = processHolder.process.waitFor();当前线程一直等待进程执行结束,返回0表示执行成功

需要杀掉任务时调用process.destroy()终止进程,这种方式不能强制终止已经oom的任务,Java的Process.destroy()无法杀掉已经OOM的任务进程导致任务一直无法结束占用middle节点资源

任务堆栈

middle节点突然挂掉 overlord的日志

2019-05-21T03:01:14,381 INFO [qtp631349266-164] io.druid.indexing.overlord.MetadataTaskStorage - Inserting task index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1 with status: TaskStatus{id=index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1, status=RUNNING, duration=-1, errorMsg=null}
2019-05-21T03:01:14,478 INFO [qtp631349266-164] io.druid.indexing.overlord.TaskLockbox - Adding task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] to activeTasks
2019-05-21T03:01:14,479 INFO [TaskQueue-Manager] io.druid.indexing.overlord.TaskQueue - Asking taskRunner to run: index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1
2019-05-21T03:01:14,479 INFO [TaskQueue-Manager] io.druid.indexing.overlord.RemoteTaskRunner - Added pending task index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1
2019-05-21T03:01:15,107 INFO [rtr-pending-tasks-runner-0] io.druid.indexing.overlord.RemoteTaskRunner - Coordinator asking Worker[st3-mdrd-41.prod.yiran.com:8091] to add task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1]
2019-05-21T03:01:15,113 INFO [rtr-pending-tasks-runner-0] io.druid.indexing.overlord.RemoteTaskRunner - Task index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1 switched from pending to running (on [st3-mdrd-41.prod.yiran.com:8091])
2019-05-21T03:01:15,113 INFO [rtr-pending-tasks-runner-0] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] status changed to [RUNNING].
2019-05-21T03:01:15,128 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.RemoteTaskRunner - Worker[st3-mdrd-41.prod.yiran.com:8091] wrote RUNNING status for task [index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] on [TaskLocation{host='null', port=-1, tlsPort=-1}]
2019-05-21T03:01:15,133 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.RemoteTaskRunner - Worker[st3-mdrd-41.prod.yiran.com:8091] wrote RUNNING status for task [index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] on [TaskLocation{host='st3-mdrd-41.prod.yiran.com', port=8106, tlsPort=-1}]
2019-05-21T03:01:15,133 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] location changed to [TaskLocation{host='st3-mdrd-41.prod.yiran.com', port=8106, tlsPort=-1}].
2019-05-21T03:01:27,823 INFO [qtp631349266-144] io.druid.indexing.common.actions.LocalTaskActionClient - Performing action for task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1]: LockAcquireAction{lockType=EXCLUSIVE, interval=2019-05-20T19:00:00.000Z/2019-05-20T20:00:00.000Z, timeoutMs=300000}
2019-05-21T03:01:28,310 INFO [qtp631349266-144] io.druid.indexing.overlord.TaskLockbox - Added task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] to TaskLock[index_realtime_mktdm_msg_push]
2019-05-21T03:01:28,310 INFO [qtp631349266-144] io.druid.indexing.overlord.MetadataTaskStorage - Adding lock on interval[2019-05-20T19:00:00.000Z/2019-05-20T20:00:00.000Z] version[2019-05-20T19:01:20.769Z] for task: index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1
2019-05-21T03:01:28,515 INFO [qtp631349266-174] io.druid.indexing.common.actions.LocalTaskActionClient - Performing action for task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1]: LockAcquireAction{lockType=EXCLUSIVE, interval=2019-05-20T19:00:00.000Z/2019-05-20T20:00:00.000Z, timeoutMs=300000}
2019-05-21T03:01:28,704 INFO [qtp631349266-174] io.druid.indexing.overlord.TaskLockbox - Task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] already present in TaskLock[index_realtime_mktdm_msg_push]
2019-05-21T03:36:24,982 INFO [Curator-PathChildrenCache-0] io.druid.indexing.overlord.RemoteTaskRunner - [st3-mdrd-41.prod.yiran.com:8091]: Found [index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] running
2019-05-21T03:51:25,105 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.RemoteTaskRunner - Failing task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1]
2019-05-21T03:51:25,105 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.TaskQueue - Received FAILED status for task: index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1
2019-05-21T03:51:25,108 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.RemoteTaskRunner - Can't shutdown! No worker running task index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1
2019-05-21T03:51:25,108 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.TaskLockbox - Removing task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] from activeTasks
2019-05-21T03:51:25,108 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.TaskLockbox - Removing task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] from TaskLock[index_realtime_mktdm_msg_push]
2019-05-21T03:51:25,129 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.MetadataTaskStorage - Updating task index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1 to status: TaskStatus{id=index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1, status=FAILED, duration=-1, errorMsg=null}
2019-05-21T03:51:25,137 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.TaskQueue - Task done: AbstractTask{id='index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1', groupId='index_realtime_mktdm_msg_push', taskResource=TaskResource{availabilityGroup='mktdm_msg_push-2019-05-20T19:00:00.000Z-0028', requiredCapacity=1}, dataSource='mktdm_msg_push', context={}}
2019-05-21T03:51:25,137 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.TaskQueue - Task FAILED: AbstractTask{id='index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1', groupId='index_realtime_mktdm_msg_push', taskResource=TaskResource{availabilityGroup='mktdm_msg_push-2019-05-20T19:00:00.000Z-0028', requiredCapacity=1}, dataSource='mktdm_msg_push', context={}} (-1 run duration)
2019-05-21T03:51:25,137 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] status changed to [FAILED].

任务分配过程总结
overlord负责接收任务,将任务状态同步到数据库,借助zookeeper分配任务给worker

需要注意的点有:

单个peon进程的启动参数可以是任务级的,即每个任务可以有单独的启动参数
任务启动http端口是在worker中分配的,可能会有端口冲突的问题
任务oom无法通过调用任务的shutdown接口删除
overlord支持动态配置,可以指定datasource固定在哪些worker上执行
一个任务oom后一直hang住,zookeeper上的任务状态一直是running状态,overlord认为任务还在运行;导致tranquility无法及时感知任务状态,数据发送到oom的任务一直失败,长时间重试,导致消息堆积无法及时发送到正常的备份任务

相关文章

  • Overlord通过zookeeper分发任务和执行

    druid中overlord节点负责任务的分发,通过zookeeper中节点创建与删除管理任务的创建移除等。 动态...

  • Zookeeper

    为什么要有Zookeeper? 电视里经常会有一些狗血的设定,队长和副队长一起出去执行任务,执行完任务后副队长回...

  • Android Handler 机制

    Handler机制是Andrdoid中很常用的线程任务控制,Handler类负责信息传送和分发执行,Looper是...

  • Druid-Duid中Overlord Process

    基于Apache-Druid-0.17.0 概述 Overlord进程负责接受任务、协调任务分配、任务锁创建,并将...

  • 鸿蒙第13课任务分发器线程(1)

    1.同步任务分发 3.异步任务分发 4.异步延迟分发 5.分组任务分发 6.屏蔽任务分发 7.多次任务分发

  • tensorflow template

    机器学习的三个核心的定义:定义模型、定义目标函数和定义优化方法 构建计算图 分发计算任务 执行计算任务另外还要准备...

  • iOS OS X 和 iOS 中的多线程技术-4.1 (GCD)

    //联系人:石虎QQ:1224614774昵称:嗡嘛呢叭咪哄 一、GCD 分发队列 GCD 分发队列是执行任务的有...

  • Linux自动执行任务

    单次执行用at和batch,周期性任务执行用crontab。任务执行结束后会将结果返回给发起人,通过邮件完成的。邮...

  • 异步方案RabbitMQ和Celery

    what? Celery任务队列是一种在线程或者机器间分发任务的机制。 主要角色: Worker:执行任务的消费者...

  • Gradle 基础用法

    命令行 执行任务 可以通过命令行执行一个或多个任务 任务依赖 下面定义四个任务,dist和test依赖于comil...

网友评论

      本文标题:Overlord通过zookeeper分发任务和执行

      本文链接:https://www.haomeiwen.com/subject/kjscnctx.html