美文网首页
Flume事务详解

Flume事务详解

作者: 吃货大米饭 | 来源:发表于2019-09-29 09:13 被阅读0次

本文基于AvroSource,MemoryChannel,HDFSSink三个组件,对Flume数据传输的事务进行分析,如果使用的是其他组件,Flume事务具体的处理方式将会不同。一般情况下,用MemoryChannel就好了,我们公司用的就是这个,FileChannel速度慢,虽然提供日志级别的数据恢复,但是一般情况下,不断电MemoryChannel是不会丢数据的。

Flume提供事物操作,保证用户的数据的可靠性,主要体现在:

  • 数据在传输到下个节点时(通常是批量数据),如果接收节点出现异常,比如网络异常,则回滚这一批数据。因此有可能导致数据重发
  • 同个节点内,Source写入数据到Channel,数据在一个批次内的数据出现异常,则不写入到Channel。已接收到的部分数据直接抛弃,靠上一个节点重发数据。


编程模型

一个事务的实现是在一个channel的实现中实现的,每一个连接到ChannelSourceSink必须获得一个事务对象。Source使用ChannelProcessor管理事务,Sink的事务管理则通过他们配置的Channel进行管理。Flume在对Channel进行Put和Take操作的时候,必须要用事物包住,比如:

Channel ch = new MemoryChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
  // This try clause includes whatever Channel operations you want to do

  Event eventToStage = EventBuilder.withBody("Hello Flume!",
                       Charset.forName("UTF-8"));
  ch.put(eventToStage);
  // Event takenEvent = ch.take();
  // ...
  txn.commit();
} catch (Throwable t) {
  txn.rollback();

  // Log exception, handle individual exceptions as needed

  // re-throw all Errors
  if (t instanceof Error) {
    throw (Error)t;
  }
} finally {
  txn.close();
}

Put事务流程

Put事务可以分为以下阶段:

  • doPut:将批数据先写入临时缓冲区putList
  • doCommit:检查channel内存队列是否足够合并。
  • doRollback:channel内存队列空间不足,抛弃数据,回滚event

我们从Source数据接收到写入Channel这个过程对Put事务进行分析。


AvroSource会spawn多个Worker线程(ThriftSourceHandler)去处理数据,Worker处理数据的接口,我们只看batch批量处理这个接口:

public Status appendBatch(List<AvroFlumeEvent> events) {
    logger.debug("Avro source {}: Received avro event batch of {} events.",
        getName(), events.size());
    sourceCounter.incrementAppendBatchReceivedCount();
    sourceCounter.addToEventReceivedCount(events.size());

    List<Event> batch = new ArrayList<Event>();

    for (AvroFlumeEvent avroEvent : events) {
      Event event = EventBuilder.withBody(avroEvent.getBody().array(),
          toStringMap(avroEvent.getHeaders()));

      batch.add(event);
    }

    try {
      //ChannelProcessor,在Source初始化的时候传进来.将数据写入对应的Channel
      getChannelProcessor().processEventBatch(batch);
    } catch (Throwable t) {
      logger.error("Avro source " + getName() + ": Unable to process event " +
          "batch. Exception follows.", t);
      sourceCounter.incrementChannelWriteFail();
      if (t instanceof Error) {
        throw (Error) t;
      }
      return Status.FAILED;
    }

    sourceCounter.incrementAppendBatchAcceptedCount();
    sourceCounter.addToEventAcceptedCount(events.size());

    return Status.OK;
  }

事务逻辑都在processEventBatch这个方法里:

public void processEventBatch(List<Event> events) {
    Preconditions.checkNotNull(events, "Event list must not be null");
    //预处理每行数据
    events = interceptorChain.intercept(events);

    Map<Channel, List<Event>> reqChannelQueue =
        new LinkedHashMap<Channel, List<Event>>();

    Map<Channel, List<Event>> optChannelQueue =
        new LinkedHashMap<Channel, List<Event>>();

    //分类数据,划分不同的channel集合对应的数据
    for (Event event : events) {
      List<Channel> reqChannels = selector.getRequiredChannels(event);

      for (Channel ch : reqChannels) {
        List<Event> eventQueue = reqChannelQueue.get(ch);
        if (eventQueue == null) {
          eventQueue = new ArrayList<Event>();
          reqChannelQueue.put(ch, eventQueue);
        }
        eventQueue.add(event);
      }

      List<Channel> optChannels = selector.getOptionalChannels(event);

      for (Channel ch : optChannels) {
        List<Event> eventQueue = optChannelQueue.get(ch);
        if (eventQueue == null) {
          eventQueue = new ArrayList<Event>();
          optChannelQueue.put(ch, eventQueue);
        }

        eventQueue.add(event);
      }
    }

    // Process required channels
    for (Channel reqChannel : reqChannelQueue.keySet()) {
      Transaction tx = reqChannel.getTransaction();
      Preconditions.checkNotNull(tx, "Transaction object must not be null");
      try {
        //事务开始,tx即MemoryTransaction类实例
        tx.begin();

        List<Event> batch = reqChannelQueue.get(reqChannel);

        for (Event event : batch) {
          // 这个put操作实际调用的是transaction.doPut
          reqChannel.put(event);
        }
        //提交,将数据写入Channel的队列中
        tx.commit();
      } catch (Throwable t) {
        //回滚
        tx.rollback();
        if (t instanceof Error) {
          LOG.error("Error while writing to required channel: " + reqChannel, t);
          throw (Error) t;
        } else if (t instanceof ChannelException) {
          throw (ChannelException) t;
        } else {
          throw new ChannelException("Unable to put batch on required " +
              "channel: " + reqChannel, t);
        }
      } finally {
        if (tx != null) {
          tx.close();
        }
      }
    }

    // Process optional channels
    for (Channel optChannel : optChannelQueue.keySet()) {
      Transaction tx = optChannel.getTransaction();
      Preconditions.checkNotNull(tx, "Transaction object must not be null");
      try {
        tx.begin();

        List<Event> batch = optChannelQueue.get(optChannel);

        for (Event event : batch) {
          optChannel.put(event);
        }

        tx.commit();
      } catch (Throwable t) {
        tx.rollback();
        LOG.error("Unable to put batch on optional channel: " + optChannel, t);
        if (t instanceof Error) {
          throw (Error) t;
        }
      } finally {
        if (tx != null) {
          tx.close();
        }
      }
    }
  }

每个Worker线程都拥有一个Transaction实例,保存在Channel(BasicChannelSemantics)里的ThreadLocal<BasicTransactionSemantics>变量currentTransaction中。
那么,事务到底做了什么?


实际上,Transaction实例包含两个双向阻塞队列LinkedBlockingDeque(感觉没必要用双向队列,每个线程写自己的putList,又不是多个线程?),分别为:

  • putList
  • takeList
    对于Put事物操作,当然是只用到putList了。putList就是一个临时的缓冲区,数据会先put到putList,最后由commit方法会检查channel是否有足够的缓冲区,有则合并到channel的队列。
    channel.put -> transaction.doPut:
protected void doPut(Event event) throws InterruptedException {
      channelCounter.incrementEventPutAttemptCount();
      //计算数据字节大小
      int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
      //写入临时缓冲区putList
      if (!putList.offer(event)) {
        throw new ChannelException(
            "Put queue for MemoryTransaction of capacity " +
            putList.size() + " full, consider committing more frequently, " +
            "increasing capacity or increasing thread count");
      }
      putByteCounter += eventByteSize;
    }

transaction.commit:

protected void doCommit() throws InterruptedException {
      //检查channel的队列剩余大小是否足够
      int remainingChange = takeList.size() - putList.size();
      if (remainingChange < 0) {
        if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {
          throw new ChannelException("Cannot commit transaction. Byte capacity " +
              "allocated to store event body " + byteCapacity * byteCapacitySlotSize +
              "reached. Please increase heap space/byte capacity allocated to " +
              "the channel as the sinks may not be keeping up with the sources");
        }
        if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
          bytesRemaining.release(putByteCounter);
          throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
              " Sinks are likely not keeping up with sources, or the buffer size is too tight");
        }
      }
      int puts = putList.size();
      int takes = takeList.size();
      synchronized (queueLock) {
        if (puts > 0) {
          while (!putList.isEmpty()) {
            //写入到channel的队列
            if (!queue.offer(putList.removeFirst())) {
              throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
            }
          }
        }
        //清除临时队列
        putList.clear();
        takeList.clear();
      }
      bytesRemaining.release(takeByteCounter);
      takeByteCounter = 0;
      putByteCounter = 0;

      queueStored.release(puts);
      if (remainingChange > 0) {
        queueRemaining.release(remainingChange);
      }
      if (puts > 0) {
        channelCounter.addToEventPutSuccessCount(puts);
      }
      if (takes > 0) {
        channelCounter.addToEventTakeSuccessCount(takes);
      }

      channelCounter.setChannelSize(queue.size());
    }

如果在事务期间出现异常,比如channel剩余空间不足,则rollback:

protected void doRollback() {
      int takes = takeList.size();
      //检查内存队列空间大小,是否足够takeList写回去
      synchronized (queueLock) {
        Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),
            "Not enough space in memory channel " +
            "queue to rollback takes. This should never happen, please report");
        while (!takeList.isEmpty()) {
          queue.addFirst(takeList.removeLast());
        }
        //抛弃数据,没合并到channel的内存队列
        putList.clear();
      }
      putByteCounter = 0;
      takeByteCounter = 0;

      queueStored.release(takes);
      channelCounter.setChannelSize(queue.size());
    }

Take事务流程

Take事务分为以下阶段:

  • doTake:先将数据取到临时缓冲区takeList
  • doCommit:如果数据全部发送成功,则清除临时缓冲区takeList
  • doRollback:数据发送过程中如果出现异常,rollback将临时缓冲区takeList中的数据归还给channel内存队列。


Sink其实是由SinkRunner线程调用Sink.process方法来了处理数据的。我们从HdfsEventSink的process方法说起,Sink类都有个process方法,用来处理传输数据的逻辑。:

public Status process() throws EventDeliveryException {
    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
     //事务开始
    transaction.begin();
    try {
      Set<BucketWriter> writers = new LinkedHashSet<>();
      int txnEventCount = 0;
      for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
        //take数据到临时缓冲区,实际调用的是transaction.doTake
        Event event = channel.take();
        if (event == null) {
          break;
        }

        // reconstruct the path name by substituting place holders
        String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
            timeZone, needRounding, roundUnit, roundValue, useLocalTime);
        String realName = BucketPath.escapeString(fileName, event.getHeaders(),
            timeZone, needRounding, roundUnit, roundValue, useLocalTime);

        String lookupPath = realPath + DIRECTORY_DELIMITER + realName;
        BucketWriter bucketWriter;
        HDFSWriter hdfsWriter = null;
        // Callback to remove the reference to the bucket writer from the
        // sfWriters map so that all buffers used by the HDFS file
        // handles are garbage collected.
        WriterCallback closeCallback = new WriterCallback() {
          @Override
          public void run(String bucketPath) {
            LOG.info("Writer callback called.");
            synchronized (sfWritersLock) {
              sfWriters.remove(bucketPath);
            }
          }
        };
        synchronized (sfWritersLock) {
          bucketWriter = sfWriters.get(lookupPath);
          // we haven't seen this file yet, so open it and cache the handle
          if (bucketWriter == null) {
            hdfsWriter = writerFactory.getWriter(fileType);
            bucketWriter = initializeBucketWriter(realPath, realName,
              lookupPath, hdfsWriter, closeCallback);
            sfWriters.put(lookupPath, bucketWriter);
          }
        }

        // Write the data to HDFS
        try {
          //写数据到HDFS
          bucketWriter.append(event);
        } catch (BucketClosedException ex) {
          LOG.info("Bucket was closed while trying to append, " +
                   "reinitializing bucket and writing event.");
          hdfsWriter = writerFactory.getWriter(fileType);
          bucketWriter = initializeBucketWriter(realPath, realName,
            lookupPath, hdfsWriter, closeCallback);
          synchronized (sfWritersLock) {
            sfWriters.put(lookupPath, bucketWriter);
          }
          bucketWriter.append(event);
        }

        // track the buckets getting written in this transaction
        if (!writers.contains(bucketWriter)) {
          writers.add(bucketWriter);
        }
      }

      if (txnEventCount == 0) {
        sinkCounter.incrementBatchEmptyCount();
      } else if (txnEventCount == batchSize) {
        sinkCounter.incrementBatchCompleteCount();
      } else {
        sinkCounter.incrementBatchUnderflowCount();
      }

      // flush all pending buckets before committing the transaction
      for (BucketWriter bucketWriter : writers) {
        bucketWriter.flush();
      }
      //事务提交
      transaction.commit();

      if (txnEventCount < 1) {
        return Status.BACKOFF;
      } else {
        sinkCounter.addToEventDrainSuccessCount(txnEventCount);
        return Status.READY;
      }
    } catch (IOException eIO) {
      transaction.rollback();
      LOG.warn("HDFS IO error", eIO);
      sinkCounter.incrementEventWriteFail();
      return Status.BACKOFF;
    } catch (Throwable th) {
      transaction.rollback();
      LOG.error("process failed", th);
      sinkCounter.incrementEventWriteOrChannelFail(th);
      if (th instanceof Error) {
        throw (Error) th;
      } else {
        throw new EventDeliveryException(th);
      }
    } finally {
      //关闭事务
      transaction.close();
    }
  }

大致流程图:


接着看看channel.take,作用是将数据放到临时缓冲区,实际调用的是transaction.doTake:

protected Event doTake() throws InterruptedException {
      channelCounter.incrementEventTakeAttemptCount();
      if (takeList.remainingCapacity() == 0) {
        throw new ChannelException("Take list for MemoryTransaction, capacity " +
            takeList.size() + " full, consider committing more frequently, " +
            "increasing capacity, or increasing thread count");
      }
      if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
        return null;
      }
      Event event;
      synchronized (queueLock) {
        //从channel内存队列取数据
        event = queue.poll();
      }
      Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
          "signalling existence of entry");
      //将数据放到临时缓冲区
      takeList.put(event);

      int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
      takeByteCounter += eventByteSize;

      return event;
    }

相关文章

  • Flume事务详解

    本文基于AvroSource,MemoryChannel,HDFSSink三个组件,对Flume数据传输的事务进行...

  • Spring之事务管理

    Spring事务管理(详解+实例)Spring详解(八)------事务管理 一. 概念 事务(Transacti...

  • 【Flume1】Flume组成,Put事务,Take事务

    1.4.1 Flume组成,Put事务,Take事务 1)taildir source (1)断点续传、多目录(2...

  • Flume(三)事务

    介绍 Flume是什么? Flume 是一个分布式、可靠且高可用的服务,用于有效地收集,聚合和移动大量日志数据。它...

  • Flume系列3-Flume事务与传输流程

    一. Flume 事务 流程图: Put 事务流程:doPut:将批数据先写入临时缓冲区 putListdoCom...

  • Spring事务

    JerryTse—— Spring事务传播行为详解

  • Flume使用详解

    使用Flume监听日志变化 进入到flume的conf目录下,创建文件,进行配置 配置好了就可以启动了 通过写一个...

  • 数据传输事务分析

    本文基于ThriftSource,MemoryChannel,HdfsSink三个组件,对Flume数据传输的事务...

  • Spring中的事务处理

    1. spring 的事务属性 1.1 什么是事务属性 1.2 如何添加事务属性 1.3 事务属性详解 1.3.1...

  • Index of contents

    一、事务 @TransactionalSpring事务传播行为详解Spring事务管理的两种方式try catch...

网友评论

      本文标题:Flume事务详解

      本文链接:https://www.haomeiwen.com/subject/kdgoyctx.html