美文网首页
实时任务Peon进程创建segment

实时任务Peon进程创建segment

作者: sydt2011 | 来源:发表于2019-12-14 15:38 被阅读0次

    创建Segment过程概述

    Peon进程由middle manager进程启动
    提供http接口接收原始数据
    将一行行的数据做merge,当行数达到maxRowsInMemory(当前集群配置是50000)或者intermediatePersistPeriod(10分钟),将内存中的数据序列化成Segment文件,持久化到本地磁盘
    达到时间窗口后,停止接收数据,并将上步创建的segment合并成一个大的segment
    新创建的segment通过hdfs移交到历史节点

    启动Peon进程

    middleManager接收到overlord分配过来的任务后,创建线程,设置jvm的命令并执行,包括classpath,堆内堆外内存设置,druid端口(用与peon进程对外的查询接口,这里分配的端口导致有端口被占用的bug)
    任务json配置等准备工作。
    执行jvm命令后,当前线程将一直等待直到任务完成。
    Peon进程和导入相关的启动过程是:

    • 创建接收原始数据的http接口,当前集群配置的EventReceiverRirehoseFactory
    • 创建并执行,ExecutorLifeCycle根据json配置创建任务RealTimeIndexTask,创建lock文件对任务加锁,将任务交给ThreadPoolTaskRunner
    @LifecycleStart
    public void start() throws InterruptedException
    {
      final File taskFile = Preconditions.checkNotNull(taskExecutorConfig.getTaskFile(), "taskFile");
      final File statusFile = Preconditions.checkNotNull(taskExecutorConfig.getStatusFile(), "statusFile");
      final InputStream parentStream = Preconditions.checkNotNull(taskExecutorConfig.getParentStream(), "parentStream");
     
      try {
        task = jsonMapper.readValue(taskFile, Task.class);
     
        log.info(
            "Running with task: %s",
            jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task)
        );
      }
      catch (IOException e) {
        throw Throwables.propagate(e);
      }
     
      // Avoid running the same task twice on the same machine by locking the task base directory.
     
      final File taskLockFile = taskConfig.getTaskLockFile(task.getId());
     
      try {
        synchronized (this) {
          if (taskLockChannel == null && taskLockFileLock == null) {
            taskLockChannel = FileChannel.open(
                taskLockFile.toPath(),
                StandardOpenOption.CREATE,
                StandardOpenOption.WRITE
            );
     
            log.info("Attempting to lock file[%s].", taskLockFile);
            final long startLocking = System.currentTimeMillis();
            final long timeout = DateTimes.utc(startLocking).plus(taskConfig.getDirectoryLockTimeout()).getMillis();
            while (taskLockFileLock == null && System.currentTimeMillis() < timeout) {
              taskLockFileLock = taskLockChannel.tryLock();
              if (taskLockFileLock == null) {
                Thread.sleep(100);
              }
            }
     
            if (taskLockFileLock == null) {
              throw new ISE("Could not acquire lock file[%s] within %,dms.", taskLockFile, timeout - startLocking);
            } else {
              log.info("Acquired lock file[%s] in %,dms.", taskLockFile, System.currentTimeMillis() - startLocking);
            }
          } else {
            throw new ISE("Already started!");
          }
        }
      }
      catch (IOException e) {
        throw Throwables.propagate(e);
      }
     
      if (taskExecutorConfig.isParentStreamDefined()) {
        // Spawn monitor thread to keep a watch on parent's stdin
        // If stdin reaches eof, the parent is gone, and we should shut down
        parentMonitorExec.submit(
            new Runnable()
            {
              @Override
              public void run()
              {
                try {
                  while (parentStream.read() != -1) {
                    // Toss the byte
                  }
                }
                catch (Exception e) {
                  log.error(e, "Failed to read from stdin");
                }
     
                // Kind of gross, but best way to kill the JVM as far as I know
                log.info("Triggering JVM shutdown.");
                System.exit(2);
              }
            }
        );
      }
     
      // Won't hurt in remote mode, and is required for setting up locks in local mode:
      try {
        if (!task.isReady(taskActionClientFactory.create(task))) {
          throw new ISE("Task[%s] is not ready to run yet!", task.getId());
        }
      }
      catch (Exception e) {
        throw new ISE(e, "Failed to run task[%s] isReady", task.getId());
      }
     
      statusFuture = Futures.transform(
          taskRunner.run(task),
          new Function<TaskStatus, TaskStatus>()
          {
            @Override
            public TaskStatus apply(TaskStatus taskStatus)
            {
              try {
                log.info(
                    "Task completed with status: %s",
                    jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(taskStatus)
                );
     
                final File statusFileParent = statusFile.getParentFile();
                if (statusFileParent != null) {
                  statusFileParent.mkdirs();
                }
                jsonMapper.writeValue(statusFile, taskStatus);
     
                return taskStatus;
              }
              catch (Exception e) {
                throw Throwables.propagate(e);
              }
            }
          }
      );
    }
    

    ThreadPoolTaskRunner

    runner.png

    数据导入

    Peon进程启动后,在RealtimeIndexTask.run()方法中,完成任务的执行。

    • RealtimePlumber.startJob()完成任务的初始化配置,从磁盘加载已有的segment临时文件,启动持久化,merge,推送segment等线程
    • EventReceiverFirehose接收数据行,OnheapIncrementalIndex创建索引

    启动任务进程

    导入数据到OnheapIncrementalIndex

    EventReceiverFirehose提供http接口/push-events,接收tranquitiy提交的批量数据
    EventReceiverFirehose 对数据处理的逻辑是:
    对接收的数据进行解析,解析异常直接报错失败,对解析后的数据行接入到阻塞队列。
    队列大小默认10万,如果任务线程队列消费数据不及时,接口会阻塞。

    POST
    @Path("/push-events")
    @Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
    @Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
    public Response addAll(
        InputStream in,
        @Context final HttpServletRequest req
    )
    {
      ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
     
      Optional<Response> producerSequenceResponse = checkProducerSequence(req, reqContentType, objectMapper);
      if (producerSequenceResponse.isPresent()) {
        return producerSequenceResponse.get();
      }
     
      CountingInputStream countingInputStream = new CountingInputStream(in);
      Collection<Map<String, Object>> events = null;
      try {
        events = objectMapper.readValue(
            countingInputStream, new TypeReference<Collection<Map<String, Object>>>()
            {
            }
        );
      }
      catch (IOException e) {
        return Response.serverError().entity(ImmutableMap.<String, Object>of("error", e.getMessage())).build();
      }
      finally {
        bytesReceived.addAndGet(countingInputStream.getCount());
      }
      log.debug("Adding %,d events to firehose: %s", events.size(), serviceName);
     
      final List<InputRow> rows = Lists.newArrayList();
      for (final Map<String, Object> event : events) {
        // Might throw an exception. We'd like that to happen now, instead of while adding to the row buffer.
        rows.addAll(parser.parseBatch(event));
      }
     
      try {
        addRows(rows);
        return Response.ok(
            objectMapper.writeValueAsString(ImmutableMap.of("eventCount", events.size())),
            contentType
        ).build();
      }
      catch (InterruptedException e) {
        ...;
      }
    }
      
    public void addRows(Iterable<InputRow> rows) throws InterruptedException
    {
      for (final InputRow row : rows) {
        boolean added = false;
        while (!closed && !added) {
          added = buffer.offer(row, 500, TimeUnit.MILLISECONDS);
          if (!added) {
            long currTime = System.currentTimeMillis();
            long lastTime = lastBufferAddFailMsgTime.get();
            if (currTime - lastTime > 10000 && lastBufferAddFailMsgTime.compareAndSet(lastTime, currTime)) {
              log.warn("Failed to add event to buffer with current size [%s] . Retrying...", buffer.size());
            }
          }
        }
     
        if (!added) {
          throw new IllegalStateException("Cannot add events to closed firehose!");
        }
      }
    }
    

    RealtimeIndexTask持有上部的firehose的实例,消费缓存对垒,交给realtimePlumber处理:“
    1 根据数据行的时间戳获取sink,每个interval对应一个sink,一般一个peon进程就一个sink;
    sink中有多个Firehydrant,有一个负责响应查询到增量导入数据,其余只是负责查询
    2.添加数据行到sink的房前firehydrant,由Firehydrant的onheapIndecrementalindex完成增量索引创建
    3.判读当前的firehydrant中已有的数据行数,如果达到配置maxRowsinmemory,或者处理时间超过配置的intermediatePersistperiod,把当前firehydrant数据持久化到磁盘

    public int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException
    {
      long messageTimestamp = row.getTimestampFromEpoch();
      final Sink sink = getSink(messageTimestamp);
      metrics.reportMessageMaxTimestamp(messageTimestamp);
      if (sink == null) {
        return -1;
      }
     
      final IncrementalIndexAddResult addResult = sink.add(row, false);
      if (config.isReportParseExceptions() && addResult.getParseException() != null) {
        throw addResult.getParseException();
      }
     
      if (!sink.canAppendRow() || System.currentTimeMillis() > nextFlush) {
        persist(committerSupplier.get());
      }
     
      return addResult.getRowCount();
    }
    

    一个sink包含了最终生成的segment数据,一个segment数据比较大,不合适都放在内存中,在创建segment中,会达到一定数据后持久化到磁盘,会有小小的segment的问题件,有一个个的Firehydrant表示,已经持久化到磁盘的数据对应的Firehydrant对象只负责响应查询,Sink中还有当前Firehydrant,既响应查询也不断将数据加入到Firehydrant对象中。
    如果当前数据行时间字所在的interval没有对应的sink,就会创建新的sink对象。创建Sink的过程中,就会创建当前的FireHydrant对象

    private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema)
    {
      final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
          .withMinTimestamp(minTimestamp)
          .withTimestampSpec(schema.getParser())
          .withQueryGranularity(schema.getGranularitySpec().getQueryGranularity())
          .withDimensionsSpec(schema.getParser())
          .withMetrics(schema.getAggregators())
          .withRollup(schema.getGranularitySpec().isRollup())
          .build();
      //创建OnheapIncrementalIndex对象
      final IncrementalIndex newIndex = new IncrementalIndex.Builder()
          .setIndexSchema(indexSchema)
          .setReportParseExceptions(reportParseExceptions)
          .setMaxRowCount(maxRowsInMemory)
          .buildOnheap();
     
      final FireHydrant old;
      synchronized (hydrantLock) {
        if (writable) {
          old = currHydrant;
          int newCount = 0;
          int numHydrants = hydrants.size();
          if (numHydrants > 0) {
            FireHydrant lastHydrant = hydrants.get(numHydrants - 1);
            newCount = lastHydrant.getCount() + 1;
            if (!indexSchema.getDimensionsSpec().hasCustomDimensions()) {
              Map<String, ColumnCapabilitiesImpl> oldCapabilities;
              if (lastHydrant.hasSwapped()) {
                oldCapabilities = Maps.newHashMap();
                ReferenceCountingSegment segment = lastHydrant.getIncrementedSegment();
                try {
                  QueryableIndex oldIndex = segment.asQueryableIndex();
                  for (String dim : oldIndex.getAvailableDimensions()) {
                    dimOrder.add(dim);
                    oldCapabilities.put(dim, (ColumnCapabilitiesImpl) oldIndex.getColumn(dim).getCapabilities());
                  }
                }
                finally {
                  segment.decrement();
                }
              } else {
                IncrementalIndex oldIndex = lastHydrant.getIndex();
                dimOrder.addAll(oldIndex.getDimensionOrder());
                oldCapabilities = oldIndex.getColumnCapabilities();
              }
              newIndex.loadDimensionIterable(dimOrder, oldCapabilities);
            }
          }
          currHydrant = new FireHydrant(newIndex, newCount, getSegment().getIdentifier());
          if (old != null) {
            numRowsExcludingCurrIndex.addAndGet(old.getIndex().size());
          }
          //新创建的FireHydrant加入到Sink
          hydrants.add(currHydrant);
        } else {
          // Oops, someone called finishWriting while we were making this new index.
          newIndex.close();
          throw new ISE("finishWriting() called during swap");
        }
      }
     
      return old;
    }
    

    数据行的添加最终是在OnheapIncrementalIndex对象的addToFacts方法完成:
    OnheapIncrementalIndex实现可以理解成有一个Map对象以维度列和时间列TimeAndDims作为key,指标列作为value,当新的数据行加入时,通过key(TimeAndDims)确认对应的Aggregator聚合器对象,
    维度值出现null值,作为一个单独的值加到维度字典编码中
    Aggregator聚合器对象完成对数据行的累加操作
    聚合时,使用ColumnSelectorFactory获取每行的指标值,和查询时通过游标获取列值不同,这里通过threadlocal方式获取,每次聚合前IncrementalIndex将数据InputRow放入到threadlocal
    猜测这样实现而不是聚合时直接传入指标值的方式,主要是因为指标值聚合前需要类型转换和值的各种转换,这块逻辑主要在ColumnSelector完成,
    这样,IncrementalIndex只负责将数据添加到threadlocal,Aggregator只需要从ColumnSelector获取要聚合的数据RollupFactsHolder(跳表)记录了维度值->行号,aggregators记录了行号->指标值数组的映射

    protected AddToFactsResult addToFacts(
        AggregatorFactory[] metrics,
        boolean deserializeComplexMetrics,
        boolean reportParseExceptions,
        InputRow row,
        AtomicInteger numEntries,
        TimeAndDims key,
        ThreadLocal<InputRow> rowContainer,
        Supplier<InputRow> rowSupplier,
        boolean skipMaxRowsInMemoryCheck
    ) throws IndexSizeExceededException
    {
      List<String> parseExceptionMessages;
      final int priorIndex = facts.getPriorIndex(key);
     
      Aggregator[] aggs;
     
      if (TimeAndDims.EMPTY_ROW_INDEX != priorIndex) {
        aggs = concurrentGet(priorIndex);
        parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
      } else {
        aggs = new Aggregator[metrics.length];
        factorizeAggs(metrics, aggs, rowContainer, row);
        parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
     
        final int rowIndex = indexIncrement.getAndIncrement();
        concurrentSet(rowIndex, aggs);
     
        // Last ditch sanity checks
        if (numEntries.get() >= maxRowCount
            && facts.getPriorIndex(key) == TimeAndDims.EMPTY_ROW_INDEX
            && !skipMaxRowsInMemoryCheck) {
          throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount);
        }
        final int prev = facts.putIfAbsent(key, rowIndex);
        if (TimeAndDims.EMPTY_ROW_INDEX == prev) {
          numEntries.incrementAndGet();
        } else {
          // We lost a race
          aggs = concurrentGet(prev);
          parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
          // Free up the misfire
          concurrentRemove(rowIndex);
          // This is expected to occur ~80% of the time in the worst scenarios
        }
      }
     
      return new AddToFactsResult(numEntries.get(), parseExceptionMessages);
    }
    

    前FireHydrant中已有的数据行数如果达到配置的maxRowsInMemory,或者处理时间超过配置的intermediatePersistPeriod,将把当前FireHydrant数据持久化到磁盘

    任务移交

    任务移交.png

    相关文章

      网友评论

          本文标题:实时任务Peon进程创建segment

          本文链接:https://www.haomeiwen.com/subject/yjicnctx.html