美文网首页
Overlord通过zookeeper分发任务和执行

Overlord通过zookeeper分发任务和执行

作者: sydt2011 | 来源:发表于2019-12-14 12:00 被阅读0次

    druid中overlord节点负责任务的分发,通过zookeeper中节点创建与删除管理任务的创建移除等。

    动态配置configManager

    druid提供了动态修改配置的功能,主要思路是:
    ConfigManager模块定时从数据库读取需要监控的配置,更新AtomicReference对象的内容。
    使用方通过AtomicReference对象获取配置。
    对外提供可以修改配置的接口,接口的功能将更新的配置写到数据库,并将当前配置的key加入到ConfigManager中监控。
    例如druid提供通过js的方式,指定不同的任务运行在不同的middlemanager节点上,具体的js配置可以通过数据库查看。

    //定时调度poll()方法
    private void poll()
    {//  ConcurrentMap<String, ConfigHolder> watchedConfigs:存储了以配置的key->配置容器ConfigHolder
      for (Map.Entry<String, ConfigHolder> entry : watchedConfigs.entrySet()) {
        try {// 从数据库读取配置,并更新ConfigHolder对象的reference对象
          if (entry.getValue().swapIfNew(dbConnector.lookup(configTable, "name", "payload", entry.getKey()))) {
            log.info("New value for key[%s] seen.", entry.getKey());
          }
        }
        catch (Exception e) {
          log.warn(e, "Exception when checking property[%s]", entry.getKey());
        }
      }
    }
    private static class ConfigHolder<T>
    {
      private final AtomicReference<byte[]> rawBytes;
      private final ConfigSerde<T> serde;
      private final AtomicReference<T> reference;
     
      ConfigHolder(
          byte[] rawBytes,
          ConfigSerde<T> serde
      )
      {
        this.rawBytes = new AtomicReference<byte[]>(rawBytes);
        this.serde = serde;
        this.reference = new AtomicReference<T>(serde.deserialize(rawBytes));//配置使用方持有AtomicReference对象
      }
     
      public AtomicReference<T> getReference()
      {
        return reference;
      }
     
      public boolean swapIfNew(byte[] newBytes)
      {
        if (!Arrays.equals(newBytes, rawBytes.get())) {
          reference.set(serde.deserialize(newBytes));// 更新reference里的内容,使用方可以感知到配置变化
          rawBytes.set(newBytes);
          return true;
        }
        return false;
      }
    }
    

    JacksonConfigManagerProvider中将配置的key添加到ConfigManager,监控对于的配置修改,监控对象的AtomicReference 注入到guice框架,方便使用方获取
    http://<OVERLORD_IP>:<port>/druid/indexer/v1/worker

    提交post请求修改任务配置workerBehaviorConfig

    @POST
      @Path("/worker")
      @Consumes(MediaType.APPLICATION_JSON)
      @ResourceFilters(ConfigResourceFilter.class)
      public Response setWorkerConfig(
          final WorkerBehaviorConfig workerBehaviorConfig,
          @HeaderParam(AuditManager.X_DRUID_AUTHOR) @DefaultValue("") final String author,
          @HeaderParam(AuditManager.X_DRUID_COMMENT) @DefaultValue("") final String comment,
          @Context final HttpServletRequest req
      )
      {
        final SetResult setResult = configManager.set(//修改配置
            WorkerBehaviorConfig.CONFIG_KEY,
            workerBehaviorConfig,
            new AuditInfo(author, comment, req.getRemoteAddr())
        );
        if (setResult.isOk()) {
          log.info("Updating Worker configs: %s", workerBehaviorConfig);
     
          return Response.ok().build();
        } else {
          return Response.status(Response.Status.BAD_REQUEST).build();
        }
      }
      
    public <T> SetResult set(final String key, final ConfigSerde<T> serde, final T obj)
    {
      if (obj == null || !started) {
        if (obj == null) {
          return SetResult.fail(new IllegalAccessException("input obj is null"));
        } else {
          return SetResult.fail(new IllegalStateException("configManager is not started yet"));
        }
      }
     
      final byte[] newBytes = serde.serialize(obj);
     
      try {//此处使用专门的线程池(一个线程),应该是为了避免并发调用更新数据库频率太高
        exec.submit(
            () -> {//将提交的配置更新到数据库
              dbConnector.insertOrUpdate(configTable, "name", "payload", key, newBytes);
     
              final ConfigHolder configHolder = watchedConfigs.get(key);
              if (configHolder != null) {//更新configHolder对象,即使用方持有的AtomicReference对象
                configHolder.swapIfNew(newBytes);
              }
     
              return true;
            }
        ).get();
        return SetResult.ok();
      }
      catch (Exception e) {
        log.warn(e, "Failed to set[%s]", key);
        return SetResult.fail(e);
      }
    }
    

    Overlord接收任务

    只有master节点接收任务提交,其他slave节点返回503code
    taskMaster监控leader的变更,当节点成为leader后,创建TaskQueue队列存放任务,taskRunner运行任务。
    将提交的任务插入到mysql的druid_tasks表,任务状态为running;将任务添加到tasks列表,供定时任务处理
    将任务配置,具体指payload,加到任务表,任务状态时running。
    任务添加到tasks列表,taskLockbox列表

    @POST
    @Path("/task")
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    public Response taskPost(
        final Task task,
        @Context final HttpServletRequest req
    )
    {
     
      return asLeaderWith(// 只有master节点接受任务提交,其他slave节点返回503 code
          taskMaster.getTaskQueue(),// taskMaster监听leader节点变更,当成为leader节点后,创建TaskQueue存放任务、taskRunner运行任务
          new Function<TaskQueue, Response>()
          {
            @Override
            public Response apply(TaskQueue taskQueue)
            {
              try {
                taskQueue.add(task);//将提交的任务插入到mysql数据库druid_tasks表,任务状态置为running;将任务添加到tasks列表,供定时任务处理
                return Response.ok(ImmutableMap.of("task", task.getId())).build();
              }
              catch (EntryExistsException e) {
                ...;
              }
            }
          }
      );
    

    TaskQueue通过数据库管理任务

    添加任务-http请求调用,将任务写到数据库并添加任务到任务列表中
    定时线程定时将数据中active的表同步到内存在任务列表中
    对上一步添加的任务执行RemoteTaskRunner.run,添加了任务异步回调逻辑,
    当任务执行完成后同步任务状态到数据库。

    /**
     * Main task runner management loop. Meant to run forever, or, at least until we're stopped.
     */
    private void manage() throws InterruptedException
    {
      log.info("Beginning management in %s.", config.getStartDelay());
      Thread.sleep(config.getStartDelay().getMillis());
     
      // Ignore return value- we'll get the IDs and futures from getKnownTasks later.
      taskRunner.restore();
     
      while (active) {
        giant.lock();
     
        try {
          // Task futures available from the taskRunner
          final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = Maps.newHashMap();
          for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) {
            runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());
          }
          // Attain futures for all active tasks (assuming they are ready to run).
          // Copy tasks list, as notifyStatus may modify it.
          for (final Task task : ImmutableList.copyOf(tasks)) {
            if (!taskFutures.containsKey(task.getId())) {
              final ListenableFuture<TaskStatus> runnerTaskFuture;
              if (runnerTaskFutures.containsKey(task.getId())) {
                runnerTaskFuture = runnerTaskFutures.get(task.getId());
              } else {
                // Task should be running, so run it.
                final boolean taskIsReady;
                try {
                  taskIsReady = task.isReady(taskActionClientFactory.create(task));
                }
                catch (Exception e) {
                  log.warn(e, "Exception thrown during isReady for task: %s", task.getId());
                  notifyStatus(task, TaskStatus.failure(task.getId()));
                  continue;
                }
                if (taskIsReady) {
                  log.info("Asking taskRunner to run: %s", task.getId());
                  runnerTaskFuture = taskRunner.run(task);// 执行RemoteTaskRunner.run ,返回ListenableFuture<TaskStatus>,可以对这个Future添加监听逻辑
                } else {
                  continue;
                }
              }
              taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));// 添加回调逻辑,当任务成功或失败时同步任务状态到数据库
            }
          }
          // Kill tasks that shouldn't be running
          final Set<String> tasksToKill = Sets.difference(
              runnerTaskFutures.keySet(),
              ImmutableSet.copyOf(
                  Lists.transform(
                      tasks,
                      new Function<Task, Object>()
                      {
                        @Override
                        public String apply(Task task)
                        {
                          return task.getId();
                        }
                      }
                  )
              )
          );
          if (!tasksToKill.isEmpty()) {
            log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size());
            for (final String taskId : tasksToKill) {
              try {
                taskRunner.shutdown(taskId);
              }
              catch (Exception e) {
                log.warn(e, "TaskRunner failed to clean up task: %s", taskId);
              }
            }
          }
          // awaitNanos because management may become necessary without this condition signalling,
          // due to e.g. tasks becoming ready when other folks mess with the TaskLockbox.
          managementMayBeNecessary.awaitNanos(60000000000L /* 60 seconds */);
        }
        finally {
          giant.unlock();
        }
      }
    }
      
      
    private void notifyStatus(final Task task, final TaskStatus taskStatus)
    {
      giant.lock();
     
      try {
        Preconditions.checkNotNull(task, "task");
        Preconditions.checkNotNull(taskStatus, "status");
        Preconditions.checkState(active, "Queue is not active!");
        Preconditions.checkArgument(
            task.getId().equals(taskStatus.getId()),
            "Mismatching task ids[%s/%s]",
            task.getId(),
            taskStatus.getId()
        );
        // Inform taskRunner that this task can be shut down
        try {
          taskRunner.shutdown(task.getId());// 如果正在运行的任务就向work发起http请求/task/%s/shutdown,停掉任务;
        }
        catch (Exception e) {
          log.warn(e, "TaskRunner failed to cleanup task after completion: %s", task.getId());
        }
        // Remove from running tasks
        int removed = 0;
        for (int i = tasks.size() - 1; i >= 0; i--) {
          if (tasks.get(i).getId().equals(task.getId())) {
            removed++;
            removeTaskInternal(tasks.get(i));//清理内存中已经完成的任务
            break;
          }
        }
        if (removed == 0) {
          log.warn("Unknown task completed: %s", task.getId());
        } else if (removed > 1) {
          log.makeAlert("Removed multiple copies of task").addData("count", removed).addData("task", task.getId()).emit();
        }
        // Remove from futures list
        taskFutures.remove(task.getId());
        if (removed > 0) {
          // If we thought this task should be running, save status to DB
          try {
            final Optional<TaskStatus> previousStatus = taskStorage.getStatus(task.getId());
            if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) {
              log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit();
            } else {
              taskStorage.setStatus(taskStatus);// 任务同步到数据库
              log.info("Task done: %s", task);
              managementMayBeNecessary.signalAll();
            }
          }
          catch (Exception e) {
            log.makeAlert(e, "Failed to persist status for task")
               .addData("task", task.getId())
               .addData("statusCode", taskStatus.getStatusCode())
               .emit();
          }
        }
      }
      finally {
        giant.unlock();
      }
    }
    

    RemoteTaskRunner与zookeeper交互实现对任务的管理

    监控znode节点/druid/indexer/announcements

    • 新增worker
      调用addworker方法,添加对status的监听/druid/indexer/status/worker.getHost()
    • 更新zkworker
      worker的配置有更新,则更新zkworker,原先的监控不变
    • 删除worker
      获取所有分配到当前worker的任务,延迟config.getTaskCleanupTimeout(15min)清理这些任务,删除
      znode的/druid/indexer/status/worker.getHost 和 /druid/indexer/tasks/worker.getHost()/taskId;最后删除/druid/indexer/status
      停止zkworker的znode节点的status的监控
    // Add listener for creation/deletion of workers
    workerPathCache.getListenable().addListener(
        new PathChildrenCacheListener()
        {
          @Override
          public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception
          {
            final Worker worker;
            switch (event.getType()) {
              case CHILD_ADDED:// 新增worker
                worker = jsonMapper.readValue(
                    event.getData().getData(),
                    Worker.class
                );
                synchronized (waitingForMonitor) {
                  waitingFor.increment();
                }
                Futures.addCallback(
                    addWorker(worker),
                    new FutureCallback<ZkWorker>()
                    {
                      @Override
                      public void onSuccess(ZkWorker zkWorker)
                      {
                        synchronized (waitingForMonitor) {
                          waitingFor.decrement();
                          waitingForMonitor.notifyAll();
                        }
                      }
     
                      @Override
                      public void onFailure(Throwable throwable)
                      {
                        synchronized (waitingForMonitor) {
                          waitingFor.decrement();
                          waitingForMonitor.notifyAll();
                        }
                      }
                    }
                );
                break;
              case CHILD_UPDATED:
                worker = jsonMapper.readValue(
                    event.getData().getData(),
                    Worker.class
                );
                updateWorker(worker);
                break;
     
              case CHILD_REMOVED:
                worker = jsonMapper.readValue(
                    event.getData().getData(),
                    Worker.class
                );
                removeWorker(worker);
                break;
              case INITIALIZED:
                // Schedule cleanup for task status of the workers that might have disconnected while overlord was not running
                List<String> workers;
                try {
                  workers = cf.getChildren().forPath(indexerZkConfig.getStatusPath());
                }
                catch (KeeperException.NoNodeException e) {
                  // statusPath doesn't exist yet; can occur if no middleManagers have started.
                  workers = ImmutableList.of();
                }
                for (String workerId : workers) {
                  final String workerAnnouncePath = JOINER.join(indexerZkConfig.getAnnouncementsPath(), workerId);
                  final String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), workerId);
                  if (!zkWorkers.containsKey(workerId) && cf.checkExists().forPath(workerAnnouncePath) == null) {
                    try {
                      scheduleTasksCleanupForWorker(workerId, cf.getChildren().forPath(workerStatusPath));
                    }
                    catch (Exception e) {
                      log.warn(
                          e,
                          "Could not schedule cleanup for worker[%s] during startup (maybe someone removed the status znode[%s]?). Skipping.",
                          workerId,
                          workerStatusPath
                      );
                    }
                  }
                }
                synchronized (waitingForMonitor) {
                  waitingFor.decrement();
                  waitingForMonitor.notifyAll();
                }
                break;
              case CONNECTION_SUSPENDED:
              case CONNECTION_RECONNECTED:
              case CONNECTION_LOST:
                // do nothing
            }
          }
        }
    );
    workerPathCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
    

    ZKworker监控znode节点/druid/indexer/status

    • 新增或者更新任务状态
      Location变更,更新任务locational.所谓的location就是任务运行在那个节点上.
      任务完成,更新zkworker最新任务完成时间和连续失败的任务数;从runningtasks删除任务,添加到completeTasks
    • 删除znode
      任务消失,从runningtasks删除任务
    • 初始化完成
      zkworker添加到zkworkers,调用runPendingtask()运行pending的任务。
     // Add status listener to the watcher for status changes
    zkWorker.addListener(
        new PathChildrenCacheListener()
        {
          @Override
          public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
          {
            final String taskId;
            final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
            synchronized (statusLock) {
              try {
                switch (event.getType()) {
                  case CHILD_ADDED:
                  case CHILD_UPDATED:// 任务status发生变化
                    taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
                    final TaskAnnouncement announcement = jsonMapper.readValue(
                        event.getData().getData(), TaskAnnouncement.class
                    );
     
                    log.info(
                        "Worker[%s] wrote %s status for task [%s] on [%s]",
                        zkWorker.getWorker().getHost(),
                        announcement.getTaskStatus().getStatusCode(),
                        taskId,
                        announcement.getTaskLocation()
                    );
     
                    // Synchronizing state with ZK
                    statusLock.notifyAll();
     
                    final RemoteTaskRunnerWorkItem tmp;
                    if ((tmp = runningTasks.get(taskId)) != null) {
                      taskRunnerWorkItem = tmp;
                    } else {
                      final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
                          taskId,
                          announcement.getTaskType(),
                          zkWorker.getWorker(),
                          TaskLocation.unknown(),
                          announcement.getTaskDataSource()
                      );
                      final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
                          taskId,
                          newTaskRunnerWorkItem
                      );
                      if (existingItem == null) {
                        log.warn(
                            "Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
                            zkWorker.getWorker().getHost(),
                            taskId
                        );
                        taskRunnerWorkItem = newTaskRunnerWorkItem;
                      } else {
                        taskRunnerWorkItem = existingItem;
                      }
                    }
     
                    if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
                      taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
                      TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
                    }
     
                    if (announcement.getTaskStatus().isComplete()) {// 任务完成
                      taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus());
                      runPendingTasks();
                    }
                    break;
                  case CHILD_REMOVED:// 任务消失
                    taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
                    taskRunnerWorkItem = runningTasks.remove(taskId);
                    if (taskRunnerWorkItem != null) {
                      log.info("Task[%s] just disappeared!", taskId);
                      taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
                      TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId));
                    } else {
                      log.info("Task[%s] went bye bye.", taskId);
                    }
                    break;
                  case INITIALIZED:
                    if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
                      retVal.set(zkWorker);
                    } else {
                      final String message = StringUtils.format(
                          "WTF?! Tried to add already-existing worker[%s]",
                          worker.getHost()
                      );
                      log.makeAlert(message)
                         .addData("workerHost", worker.getHost())
                         .addData("workerIp", worker.getIp())
                         .emit();
                      retVal.setException(new IllegalStateException(message));
                    }
                    runPendingTasks();
                    break;
                  case CONNECTION_SUSPENDED:
                  case CONNECTION_RECONNECTED:
                  case CONNECTION_LOST:
                    // do nothing
                }
              }
              catch (Exception e) {
                log.makeAlert(e, "Failed to handle new worker status")
                   .addData("worker", zkWorker.getWorker().getHost())
                   .addData("znode", event.getData().getPath())
                   .emit();
              }
            }
          }
        }
    );
    zkWorker.start();
    

    运行pending的任务runPendingTasks()

    按照任务的插入时间排序
    分配并运行任务tryAssignTask(task,taskRunnerWorkItem)

    `private` `void` `runPendingTasks()`
    
    `{`
    
    `runPendingTasksExec.submit(`
    
    `new` `Callable<Void>()`
    
    `{`
    
    `@Override`
    
    `public` `Void call() ``throws` `Exception`
    
    `{`
    
    `try` `{`
    
    `// make a copy of the pending tasks because tryAssignTask may delete tasks from pending and move them`
    
    `// into running status`
    
    `List<RemoteTaskRunnerWorkItem> copy = Lists.newArrayList(pendingTasks.values());`
    
    `sortByInsertionTime(copy);``// 按任务的插入时间排序`
    
    `for` `(RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) {`
    
    `String taskId = taskRunnerWorkItem.getTaskId();`
    
    `if` `(tryAssignTasks.putIfAbsent(taskId, taskId) == ``null``) {`
    
    `try` `{`
    
    `//this can still be null due to race from explicit task shutdown request`
    
    `//or if another thread steals and completes this task right after this thread makes copy`
    
    `//of pending tasks. See [https://github.com/druid-io/druid/issues/2842](https://github.com/druid-io/druid/issues/2842) .`
    
    `Task task = pendingTaskPayloads.get(taskId);`
    
    `if` `(task != ``null` `&& tryAssignTask(task, taskRunnerWorkItem)) {``// 分配并运行任务`
    
    `pendingTaskPayloads.remove(taskId);`
    
    `}`
    
    `}`
    
    `catch` `(Exception e) {`
    
    `log.makeAlert(e, ``"Exception while trying to assign task"``)`
    
    `.addData(``"taskId"``, taskRunnerWorkItem.getTaskId())`
    
    `.emit();`
    
    `RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);`
    
    `if` `(workItem != ``null``) {`
    
    `taskComplete(workItem, ``null``, TaskStatus.failure(taskId));``// 任务完成,更新zkWorker最新任务完成时间和连续失败的任务数;从runningTasks删除任务,添加到completeTasks`
    
    `}`
    
    `}`
    
    `finally` `{`
    
    `tryAssignTasks.remove(taskId);`
    
    `}`
    
    `}`
    
    `}`
    
    `}`
    
    `catch` `(Exception e) {`
    
    `log.makeAlert(e, ``"Exception in running pending tasks"``).emit();`
    
    `}`
    
    `return` `null``;`
    
    `}`
    
    `}`
    
    `);`
    
    `}`
    
    

    分配并运行任务tryAssignTask

    根据配置workerBehaviorConfig的策略分配worker
    通过zookeeper向 worker分配任务announceTask(task,assignworker,taskrunnerworkItem,)

     /**
     * Ensures no workers are already running a task before assigning the task to a worker.
     * It is possible that a worker is running a task that the RTR has no knowledge of. This occurs when the RTR
     * needs to bootstrap after a restart.
     *
     * @param taskRunnerWorkItem - the task to assign
     *
     * @return true iff the task is now assigned
     */
    private boolean tryAssignTask(final Task task, final RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception
    {
      Preconditions.checkNotNull(task, "task");
      Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
      Preconditions.checkArgument(task.getId().equals(taskRunnerWorkItem.getTaskId()), "task id != workItem id");
     
      if (runningTasks.containsKey(task.getId()) || findWorkerRunningTask(task.getId()) != null) {
        log.info("Task[%s] already running.", task.getId()); // 已经有worker在运行这个任务
        return true;
      } else {
        // Nothing running this task, announce it in ZK for a worker to run it
        WorkerBehaviorConfig workerConfig = workerConfigRef.get();// 动态配置,可以及时感受到配置修改
        WorkerSelectStrategy strategy;
        if (workerConfig == null || workerConfig.getSelectStrategy() == null) {
          strategy = WorkerBehaviorConfig.DEFAULT_STRATEGY;// 默认的worker分配策略,按所有worker可用任务数排序
          log.debug("No worker selection strategy set. Using default of [%s]", strategy.getClass().getSimpleName());
        } else {
          strategy = workerConfig.getSelectStrategy();
        }
     
        ZkWorker assignedWorker = null;
        final ImmutableWorkerInfo immutableZkWorker;
        try {
          synchronized (workersWithUnacknowledgedTask) {
            immutableZkWorker = strategy.findWorkerForTask(// 分配worker
                config,
                ImmutableMap.copyOf(
                    Maps.transformEntries(
                        Maps.filterEntries(
                            zkWorkers, new Predicate<Map.Entry<String, ZkWorker>>()
                            {
                              @Override
                              public boolean apply(Map.Entry<String, ZkWorker> input)
                              {
                                return !lazyWorkers.containsKey(input.getKey()) &&
                                       !workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
                                       !blackListedWorkers.contains(input.getValue());
                              }
                            }
                        ),
                        new Maps.EntryTransformer<String, ZkWorker, ImmutableWorkerInfo>()
                        {
                          @Override
                          public ImmutableWorkerInfo transformEntry(
                              String key, ZkWorker value
                          )
                          {
                            return value.toImmutable();
                          }
                        }
                    )
                ),
                task
            );
     
            if (immutableZkWorker != null &&
                workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.getWorker().getHost(), task.getId())
                  == null) {
              assignedWorker = zkWorkers.get(immutableZkWorker.getWorker().getHost());
            }
          }
     
          if (assignedWorker != null) {
            return announceTask(task, assignedWorker, taskRunnerWorkItem);// 通过zookeeper向worker分配任务
          } else {
            log.debug(
                "Unsuccessful task-assign attempt for task [%s] on workers [%s]. Workers to ack tasks are [%s].",
                task.getId(),
                zkWorkers.values(),
                workersWithUnacknowledgedTask
            );
          }
     
          return false;
        }
        finally {
          if (assignedWorker != null) {
            workersWithUnacknowledgedTask.remove(assignedWorker.getWorker().getHost());
            //if this attempt won the race to run the task then other task might be able to use this worker now after task ack.
            runPendingTasks();
          }
        }
      }
    }
    

    通过zookeeper向worker分配任务announceTask(task, assignedWorker, taskRunnerWorkItem)
    主要在zookeeper中建立新的znode节点/druid/indexer/tasks/worker/taskId
    通过statuslock加锁,闲置等待任务被worker接收以后,才能任务任务分配成功;
    只有当当前任务分配完成,才会继续分配一下个任务
    ContainerCacheListener与InventoryCacheListener代码

    /**
     * Creates a ZK entry under a specific path associated with a worker. The worker is responsible for
     * removing the task ZK entry and creating a task status ZK entry.
     *
     * @param theZkWorker        The worker the task is assigned to
     * @param taskRunnerWorkItem The task to be assigned
     *
     * @return boolean indicating whether the task was successfully assigned or not
     */
    private boolean announceTask(
        final Task task,
        final ZkWorker theZkWorker,
        final RemoteTaskRunnerWorkItem taskRunnerWorkItem
    ) throws Exception
    {
      Preconditions.checkArgument(task.getId().equals(taskRunnerWorkItem.getTaskId()), "task id != workItem id");
      final String worker = theZkWorker.getWorker().getHost();
      synchronized (statusLock) {
        if (!zkWorkers.containsKey(worker) || lazyWorkers.containsKey(worker)) {
          // the worker might have been killed or marked as lazy
          log.info("Not assigning task to already removed worker[%s]", worker);
          return false;
        }
        log.info("Coordinator asking Worker[%s] to add task[%s]", worker, task.getId());
     
        CuratorUtils.createIfNotExists(//创建znode节点/druid/indexer/tasks/worker/taskId
            cf,
            JOINER.join(indexerZkConfig.getTasksPath(), worker, task.getId()),
            CreateMode.EPHEMERAL,
            jsonMapper.writeValueAsBytes(task),
            config.getMaxZnodeBytes()
        );
     
        RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(task.getId());//删除pendingTasks中的pending任务
        if (workItem == null) {
          log.makeAlert("WTF?! Got a null work item from pending tasks?! How can this be?!")
             .addData("taskId", task.getId())
             .emit();
          return false;
        }
     
        RemoteTaskRunnerWorkItem newWorkItem = workItem.withWorker(theZkWorker.getWorker(), null);
        runningTasks.put(task.getId(), newWorkItem);// 添加到runningTasks
        log.info("Task %s switched from pending to running (on [%s])", task.getId(), newWorkItem.getWorker().getHost());
        TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.running(task.getId()));
     
        // Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running
        // on a worker - this avoids overflowing a worker with tasks
        Stopwatch timeoutStopwatch = Stopwatch.createStarted();
        while (!isWorkerRunningTask(theZkWorker, task.getId())) {// 重要!!!这里通过statusLock加锁,限制等待任务被worker接受以后才认为任务分配成功
          final long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
          statusLock.wait(waitMs);
          long elapsed = timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS);
          if (elapsed >= waitMs) {
            log.makeAlert(
                "Task assignment timed out on worker [%s], never ran task [%s]! Timeout: (%s >= %s)!",
                worker,
                task.getId(),
                elapsed,
                config.getTaskAssignmentTimeout()
            ).emit();
            taskComplete(taskRunnerWorkItem, theZkWorker, TaskStatus.failure(task.getId()));
            break;
          }
        }
        return true;
      }
    }
    

    选择分配worker节点
    如果有单独的配置,知名某些datasource分配到那些worker节点,优先将datasource分配到这些节点。
    这里通过js配置
    如果知名worker节点没有可用的可以执行的任务,分配没有明确指定的datasource 的worker节点

     
    /**
     * Helper for {@link WorkerSelectStrategy} implementations.
     *
     * @param allWorkers     map of all workers, in the style provided to {@link WorkerSelectStrategy}
     * @param affinityConfig affinity config, or null
     * @param workerSelector function that receives a list of eligible workers: version is high enough, worker can run
     *                       the task, and worker satisfies the affinity config. may return null.
     *
     * @return selected worker from "allWorkers", or null.
     */
    @Nullable
    public static ImmutableWorkerInfo selectWorker(
        final Task task,
        final Map<String, ImmutableWorkerInfo> allWorkers,
        final WorkerTaskRunnerConfig workerTaskRunnerConfig,
        @Nullable final AffinityConfig affinityConfig,
        final Function<ImmutableMap<String, ImmutableWorkerInfo>, ImmutableWorkerInfo> workerSelector
    )
    {
      // Workers that could potentially run this task, ignoring affinityConfig.
      final Map<String, ImmutableWorkerInfo> runnableWorkers = allWorkers
          .values()
          .stream()
          .filter(worker -> worker.canRunTask(task)
                            && worker.isValidVersion(workerTaskRunnerConfig.getMinWorkerVersion()))
          .collect(Collectors.toMap(w -> w.getWorker().getHost(), Function.identity()));
     
      if (affinityConfig == null) {
        // All runnable workers are valid.
        return workerSelector.apply(ImmutableMap.copyOf(runnableWorkers));// 没有单独配置,直接应用worker分配策略
      } else {
        // Workers assigned to the affinity pool for our task.
        final Set<String> dataSourceWorkers = affinityConfig.getAffinity().get(task.getDataSource());
     
        if (dataSourceWorkers == null) {
          // No affinity config for this dataSource; use non-affinity workers.
          return workerSelector.apply(getNonAffinityWorkers(affinityConfig, runnableWorkers));
        } else {// 指明某些datasource分配到哪些worker节点,优先将datasource分配到这些节点
          // Get runnable, affinity workers.
          final ImmutableMap<String, ImmutableWorkerInfo> dataSourceWorkerMap =
              ImmutableMap.copyOf(Maps.filterKeys(runnableWorkers, dataSourceWorkers::contains));
     
          final ImmutableWorkerInfo selected = workerSelector.apply(dataSourceWorkerMap);
     
          if (selected != null) {
            return selected;
          } else if (affinityConfig.isStrong()) {
            return null;
          } else {
            // Weak affinity allows us to use nonAffinityWorkers for this dataSource, if no affinity workers
            // are available.
            return workerSelector.apply(getNonAffinityWorkers(affinityConfig, runnableWorkers));// 如果指明的worker节点没有可用的可以执行的任务,分配没有指明datasource的worker节点
          }
        }
      }
    }
    

    实际应用中,一般是均衡策略。
    EqualDistributionWorkerSelectStrategy的分配策略是将所有worker节点按还可以执行的任务数排序,选择任务数最多的节点

    public ImmutableWorkerInfo findWorkerForTask(
        final WorkerTaskRunnerConfig config,
        final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
        final Task task
    )
    {
      return WorkerSelectUtils.selectWorker(
          task,
          zkWorkers,
          config,
          affinityConfig,
          EqualDistributionWorkerSelectStrategy::selectFromEligibleWorkers
      );
    }
     
    private static ImmutableWorkerInfo selectFromEligibleWorkers(final Map<String, ImmutableWorkerInfo> eligibleWorkers)
    {
      return eligibleWorkers.values().stream().max(
          Comparator.comparing(ImmutableWorkerInfo::getAvailableCapacity)
      ).orElse(null);
    }
    

    worker节点借助zookeeper接收任务,更新任务状态

    WorkerCuratorCoordinator创建znode,对外停工当前可用的worker

    • 创建持久类型的znode节点/druid/indexer/tasks/workerhost 任务节点,用于接收overlord分配的任务
    • 创建持久类型的znode节点/druid/indexer/status/workerhost 任务状态节点,worker更新此节点,overlord监听此节点接收任务状态变化
    • Announcer创建/druid/indexer/announcements/workerhost worker节点,声明可用的worker

    WoekerTaskMonitor监听新分配的任务

    监听znode节点/druid/indexer/tasks/workerhost的新增节点
    反序列画节点内容为task对象,调用assignTask(Task:task):异步调用ForkingTaskRunner.run运行任务,将任务添加到running队列
    此时认为任务分配成功,更新znode节点/druid/indexer/status/workerhost 任务状态节点为running状态;删除znode节点/druid/indexer/tasks/workerhost任务节点

    public void assignTask(Task task)
    {
      Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.SECONDS), "not started");
     
      synchronized (lock) {
        if (assignedTasks.containsKey(task.getId())
            || runningTasks.containsKey(task.getId())
            || completedTasks.containsKey(task.getId())) {
          log.info("Assign task[%s] request ignored because it exists already.", task.getId());
          return;
        }
     
        try {
          jsonMapper.writeValue(new File(getAssignedTaskDir(), task.getId()), task);
          assignedTasks.put(task.getId(), task);
        }
        catch (IOException ex) {
          log.error(ex, "Error while trying to persist assigned task[%s]", task.getId());
          throw new ISE("Assign Task[%s] Request failed because [%s].", task.getId(), ex.getMessage());
        }
     
        changeHistory.addChangeRequest(//记录变更历史
            new WorkerHistoryItem.TaskUpdate(
                TaskAnnouncement.create(
                    task,
                    TaskStatus.running(task.getId()),
                    TaskLocation.unknown()
                )
            )
        );
      }
     
      submitNoticeToExec(new RunNotice(task));// 提交任务运行
    }
    private class RunNotice implements Notice
    {
      private final Task task;
     
      public RunNotice(Task task)
      {
        this.task = task;
      }
     
      @Override
      public String getTaskId()
      {
        return task.getId();
      }
     
      @Override
      public void handle() throws Exception
      {
        TaskAnnouncement announcement = null;
        synchronized (lock) {
          if (runningTasks.containsKey(task.getId()) || completedTasks.containsKey(task.getId())) {
            log.warn(
                "Got run notice for task [%s] that I am already running or completed...",
                task.getId()
            );
     
            taskStarted(task.getId());
            return;
          }
     
          final ListenableFuture<TaskStatus> future = taskRunner.run(task);//异步调用ForkingTaskRunner.run运行任务
          addRunningTask(task, future);//将任务添加到running队列
     
          announcement = TaskAnnouncement.create(
              task,
              TaskStatus.running(task.getId()),
              TaskLocation.unknown()
          );
     
          changeHistory.addChangeRequest(new WorkerHistoryItem.TaskUpdate(announcement));
     
          cleanupAssignedTask(task);
          log.info("Task[%s] started.", task.getId());
        }
     
        taskAnnouncementChanged(announcement);// 更新znode节点/druid/indexer/status/workerhost 任务状态节点为running状态
        taskStarted(task.getId());// 删除znode节点/druid/indexer/tasks/workerhost 任务节点
     
      }
    }
    

    添加对任务Location和Status的监听

    调用workerTaskMonitor.taskAnnouncementChanged(TaskAnnouncement announcement)
    更新znode节点

    ContainerCacheListener与InventoryCacheListener代码
     private void registerLocationListener()
    {
      taskRunner.registerListener(
          new TaskRunnerListener()
          {
            @Override
            public String getListenerId()
            {
              return "WorkerTaskManager";
            }
     
            @Override
            public void locationChanged(final String taskId, final TaskLocation newLocation)
            {
              submitNoticeToExec(new LocationNotice(taskId, newLocation));
            }
     
            @Override
            public void statusChanged(final String taskId, final TaskStatus status)
            {
              // do nothing
            }
          },
          MoreExecutors.sameThreadExecutor()
      );
    }
     
    private void addRunningTask(final Task task, final ListenableFuture<TaskStatus> future)
    {
      runningTasks.put(task.getId(), new TaskDetails(task));
      Futures.addCallback(
          future,
          new FutureCallback<TaskStatus>()
          {
            @Override
            public void onSuccess(TaskStatus result)
            {
              submitNoticeToExec(new StatusNotice(task, result));
            }
     
            @Override
            public void onFailure(Throwable t)
            {
              submitNoticeToExec(new StatusNotice(task, TaskStatus.failure(task.getId())));
            }
          }
      );
    }
    

    定时任务清理已经完成的任务scheduleCompletedTaskCleanup()

    private void scheduleCompletedTasksCleanup()
    {
      completedTasksCleanupExecutor.scheduleAtFixedRate(
          () -> {
            try {
              if (completedTasks.isEmpty()) {
                log.debug("Skipping completed tasks cleanup. Its empty.");
                return;
              }
     
              ImmutableSet<String> taskIds = ImmutableSet.copyOf(completedTasks.keySet());
              Map<String, TaskStatus> taskStatusesFromOverlord = null;
     
              try {
                FullResponseHolder fullResponseHolder = overlordClient.go(// DruidLeaderClient获取overlord的leader节点
                    overlordClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/taskStatus")
                                  .setContent(jsonMapper.writeValueAsBytes(taskIds))
                                  .addHeader(HttpHeaders.Names.ACCEPT, MediaType.APPLICATION_JSON)
                                  .addHeader(HttpHeaders.Names.CONTENT_TYPE, MediaType.APPLICATION_JSON)
     
                );
                if (fullResponseHolder.getStatus().getCode() == 200) {
                  String responseContent = fullResponseHolder.getContent();// 从overlord节点获取已完成任务
                  taskStatusesFromOverlord = jsonMapper.readValue(responseContent, new TypeReference<Map<String, TaskStatus>>()
                  {
                  });
                  log.debug("Received completed task status response [%s].", responseContent);
                } else if (fullResponseHolder.getStatus().getCode() == 404) {
                  // NOTE: this is to support backward compatibility, when overlord doesn't have "activeTasks" endpoint.
                  // this if clause should be removed in a future release.
                  log.debug("Deleting all completed tasks. Overlord appears to be running on older version.");
                  taskStatusesFromOverlord = ImmutableMap.of();
                } else {
                  log.info(
                      "Got non-success code[%s] from overlord while getting active tasks. will retry on next scheduled run.",
                      fullResponseHolder.getStatus().getCode()
                  );
                }
              }
              catch (Exception ex) {
                log.info(ex, "Exception while getting active tasks from overlord. will retry on next scheduled run.");
     
                if (ex instanceof InterruptedException) {
                  Thread.currentThread().interrupt();
                }
              }
     
              if (taskStatusesFromOverlord == null) {
                return;
              }
     
              for (String taskId : taskIds) {
                TaskStatus status = taskStatusesFromOverlord.get(taskId);
                if (status == null || status.isComplete()) {
     
                  log.info(
                      "Deleting completed task[%s] information, overlord task status[%s].",
                      taskId,
                      status == null ? "unknown" : status.getStatusCode()
                  );
     
                  completedTasks.remove(taskId);
                  File taskFile = new File(getCompletedTaskDir(), taskId);
                  try {
                    Files.deleteIfExists(taskFile.toPath());// 删除已完成任务临时文件
                    changeHistory.addChangeRequest(new WorkerHistoryItem.TaskRemoval(taskId));
                  }
                  catch (IOException ex) {
                    log.error(ex, "Failed to delete completed task from disk [%s].", taskFile);
                  }
     
                }
              }
            }
            catch (Throwable th) {
              log.error(th, "WTF! Got unknown exception while running the scheduled cleanup.");
            }
          },
          1,
          5,
          TimeUnit.MINUTES
      );
    }
    

    重启后恢复中断的任务restoreRestorableTask

    读取{baseTaskDir}/restore.json文件,获取需要复原的任务id,{baseTaskDir}当前集群配置时var/druid/task
    读取任务{baseTaskDir}/{taskId}/task.json文件,反序列化成task对象

    异步调用ForkingTaskRunner.run运行任务,将任务添加到running队列

    public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
    {
      final File restoreFile = getRestoreFile();// 读取${baseTaskDir}/task/restore.json文件,获取需要复原的任务Id
      final TaskRestoreInfo taskRestoreInfo;
      if (restoreFile.exists()) {
        try {
          taskRestoreInfo = jsonMapper.readValue(restoreFile, TaskRestoreInfo.class);
        }
        catch (Exception e) {
          log.error(e, "Failed to read restorable tasks from file[%s]. Skipping restore.", restoreFile);
          return ImmutableList.of();
        }
      } else {
        return ImmutableList.of();
      }
     
      final List<Pair<Task, ListenableFuture<TaskStatus>>> retVal = Lists.newArrayList();
      for (final String taskId : taskRestoreInfo.getRunningTasks()) {
        try {// 读取任务${baseTaskDir}/task/${taskId}/task.json文件,反序列化成task对象
          final File taskFile = new File(taskConfig.getTaskDir(taskId), "task.json");
          final Task task = jsonMapper.readValue(taskFile, Task.class);
     
          if (!task.getId().equals(taskId)) {
            throw new ISE("WTF?! Task[%s] restore file had wrong id[%s].", taskId, task.getId());
          }
     
          if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
            log.info("Restoring task[%s].", task.getId());
            retVal.add(Pair.of(task, run(task)));// 异步调用ForkingTaskRunner.run运行任务,将任务添加到running队列
          }
        }
        catch (Exception e) {
          log.warn(e, "Failed to restore task[%s]. Trying to restore other tasks.", taskId);
        }
      }
     
      log.info("Restored %,d tasks.", retVal.size());
     
      return retVal;
    }
    

    ForkingTaskRunner启动Peon进程

    分配http端口

    PortFinder根据配置从8100端口开始查找可用端口,并维护一个已经使用的端口列表,peon进程结束端口会被回收

    通过执行new ServerSocket(portNum).close();如果没有异常,则认为当前端口可用;如果报BindException异常继续重试其他端口

    案例详见Druid 任务因端口占用导致启动失败

    任务启动参数配置

    classpath类路径继承worker进程的类路径

    druid.indexer.runner.javaOpts(worker上配置的参数)

    "context" : {

    "druid.indexer.runner.javaOpts" : "-Xms1024m  -Xmx2048m  -XX:MaxDirectMemorySize=5g",(单个任务配置)
    

    }

    配置文件中的键值对(-D%s=%s)

    配置文件中的以druid.indexer.fork.property.开头的键值对(-D%s=%s,覆盖上面相同key的参数)

    "context" : {

    "druid.indexer.fork.property.druid.query.groupBy.maxOnDiskStorage" : 4294967296(单个任务配置,覆盖上面相同key的参数)
    

    }

    host、端口等参数

    io.druid.cli.Main internal peon 启动类

    taskFile、statusFile、reportsFile路径地址

    ProcessBuilder启动peon进程

    new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start()启动进程

    int statusCode = processHolder.process.waitFor();当前线程一直等待进程执行结束,返回0表示执行成功

    需要杀掉任务时调用process.destroy()终止进程,这种方式不能强制终止已经oom的任务,Java的Process.destroy()无法杀掉已经OOM的任务进程导致任务一直无法结束占用middle节点资源

    任务堆栈

    middle节点突然挂掉 overlord的日志

    2019-05-21T03:01:14,381 INFO [qtp631349266-164] io.druid.indexing.overlord.MetadataTaskStorage - Inserting task index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1 with status: TaskStatus{id=index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1, status=RUNNING, duration=-1, errorMsg=null}
    2019-05-21T03:01:14,478 INFO [qtp631349266-164] io.druid.indexing.overlord.TaskLockbox - Adding task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] to activeTasks
    2019-05-21T03:01:14,479 INFO [TaskQueue-Manager] io.druid.indexing.overlord.TaskQueue - Asking taskRunner to run: index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1
    2019-05-21T03:01:14,479 INFO [TaskQueue-Manager] io.druid.indexing.overlord.RemoteTaskRunner - Added pending task index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1
    2019-05-21T03:01:15,107 INFO [rtr-pending-tasks-runner-0] io.druid.indexing.overlord.RemoteTaskRunner - Coordinator asking Worker[st3-mdrd-41.prod.yiran.com:8091] to add task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1]
    2019-05-21T03:01:15,113 INFO [rtr-pending-tasks-runner-0] io.druid.indexing.overlord.RemoteTaskRunner - Task index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1 switched from pending to running (on [st3-mdrd-41.prod.yiran.com:8091])
    2019-05-21T03:01:15,113 INFO [rtr-pending-tasks-runner-0] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] status changed to [RUNNING].
    2019-05-21T03:01:15,128 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.RemoteTaskRunner - Worker[st3-mdrd-41.prod.yiran.com:8091] wrote RUNNING status for task [index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] on [TaskLocation{host='null', port=-1, tlsPort=-1}]
    2019-05-21T03:01:15,133 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.RemoteTaskRunner - Worker[st3-mdrd-41.prod.yiran.com:8091] wrote RUNNING status for task [index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] on [TaskLocation{host='st3-mdrd-41.prod.yiran.com', port=8106, tlsPort=-1}]
    2019-05-21T03:01:15,133 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] location changed to [TaskLocation{host='st3-mdrd-41.prod.yiran.com', port=8106, tlsPort=-1}].
    2019-05-21T03:01:27,823 INFO [qtp631349266-144] io.druid.indexing.common.actions.LocalTaskActionClient - Performing action for task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1]: LockAcquireAction{lockType=EXCLUSIVE, interval=2019-05-20T19:00:00.000Z/2019-05-20T20:00:00.000Z, timeoutMs=300000}
    2019-05-21T03:01:28,310 INFO [qtp631349266-144] io.druid.indexing.overlord.TaskLockbox - Added task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] to TaskLock[index_realtime_mktdm_msg_push]
    2019-05-21T03:01:28,310 INFO [qtp631349266-144] io.druid.indexing.overlord.MetadataTaskStorage - Adding lock on interval[2019-05-20T19:00:00.000Z/2019-05-20T20:00:00.000Z] version[2019-05-20T19:01:20.769Z] for task: index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1
    2019-05-21T03:01:28,515 INFO [qtp631349266-174] io.druid.indexing.common.actions.LocalTaskActionClient - Performing action for task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1]: LockAcquireAction{lockType=EXCLUSIVE, interval=2019-05-20T19:00:00.000Z/2019-05-20T20:00:00.000Z, timeoutMs=300000}
    2019-05-21T03:01:28,704 INFO [qtp631349266-174] io.druid.indexing.overlord.TaskLockbox - Task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] already present in TaskLock[index_realtime_mktdm_msg_push]
    2019-05-21T03:36:24,982 INFO [Curator-PathChildrenCache-0] io.druid.indexing.overlord.RemoteTaskRunner - [st3-mdrd-41.prod.yiran.com:8091]: Found [index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] running
    2019-05-21T03:51:25,105 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.RemoteTaskRunner - Failing task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1]
    2019-05-21T03:51:25,105 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.TaskQueue - Received FAILED status for task: index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1
    2019-05-21T03:51:25,108 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.RemoteTaskRunner - Can't shutdown! No worker running task index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1
    2019-05-21T03:51:25,108 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.TaskLockbox - Removing task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] from activeTasks
    2019-05-21T03:51:25,108 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.TaskLockbox - Removing task[index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] from TaskLock[index_realtime_mktdm_msg_push]
    2019-05-21T03:51:25,129 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.MetadataTaskStorage - Updating task index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1 to status: TaskStatus{id=index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1, status=FAILED, duration=-1, errorMsg=null}
    2019-05-21T03:51:25,137 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.TaskQueue - Task done: AbstractTask{id='index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1', groupId='index_realtime_mktdm_msg_push', taskResource=TaskResource{availabilityGroup='mktdm_msg_push-2019-05-20T19:00:00.000Z-0028', requiredCapacity=1}, dataSource='mktdm_msg_push', context={}}
    2019-05-21T03:51:25,137 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.TaskQueue - Task FAILED: AbstractTask{id='index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1', groupId='index_realtime_mktdm_msg_push', taskResource=TaskResource{availabilityGroup='mktdm_msg_push-2019-05-20T19:00:00.000Z-0028', requiredCapacity=1}, dataSource='mktdm_msg_push', context={}} (-1 run duration)
    2019-05-21T03:51:25,137 INFO [RemoteTaskRunner-Scheduled-Cleanup--0] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_realtime_mktdm_msg_push_2019-05-20T19:00:00.000Z_28_1] status changed to [FAILED].
    
    

    任务分配过程总结
    overlord负责接收任务,将任务状态同步到数据库,借助zookeeper分配任务给worker

    需要注意的点有:

    单个peon进程的启动参数可以是任务级的,即每个任务可以有单独的启动参数
    任务启动http端口是在worker中分配的,可能会有端口冲突的问题
    任务oom无法通过调用任务的shutdown接口删除
    overlord支持动态配置,可以指定datasource固定在哪些worker上执行
    一个任务oom后一直hang住,zookeeper上的任务状态一直是running状态,overlord认为任务还在运行;导致tranquility无法及时感知任务状态,数据发送到oom的任务一直失败,长时间重试,导致消息堆积无法及时发送到正常的备份任务

    相关文章

      网友评论

          本文标题:Overlord通过zookeeper分发任务和执行

          本文链接:https://www.haomeiwen.com/subject/kjscnctx.html