美文网首页
Flume事务详解

Flume事务详解

作者: 吃货大米饭 | 来源:发表于2019-09-29 09:13 被阅读0次

    本文基于AvroSource,MemoryChannel,HDFSSink三个组件,对Flume数据传输的事务进行分析,如果使用的是其他组件,Flume事务具体的处理方式将会不同。一般情况下,用MemoryChannel就好了,我们公司用的就是这个,FileChannel速度慢,虽然提供日志级别的数据恢复,但是一般情况下,不断电MemoryChannel是不会丢数据的。

    Flume提供事物操作,保证用户的数据的可靠性,主要体现在:

    • 数据在传输到下个节点时(通常是批量数据),如果接收节点出现异常,比如网络异常,则回滚这一批数据。因此有可能导致数据重发
    • 同个节点内,Source写入数据到Channel,数据在一个批次内的数据出现异常,则不写入到Channel。已接收到的部分数据直接抛弃,靠上一个节点重发数据。


    编程模型

    一个事务的实现是在一个channel的实现中实现的,每一个连接到ChannelSourceSink必须获得一个事务对象。Source使用ChannelProcessor管理事务,Sink的事务管理则通过他们配置的Channel进行管理。Flume在对Channel进行Put和Take操作的时候,必须要用事物包住,比如:

    Channel ch = new MemoryChannel();
    Transaction txn = ch.getTransaction();
    txn.begin();
    try {
      // This try clause includes whatever Channel operations you want to do
    
      Event eventToStage = EventBuilder.withBody("Hello Flume!",
                           Charset.forName("UTF-8"));
      ch.put(eventToStage);
      // Event takenEvent = ch.take();
      // ...
      txn.commit();
    } catch (Throwable t) {
      txn.rollback();
    
      // Log exception, handle individual exceptions as needed
    
      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    } finally {
      txn.close();
    }
    

    Put事务流程

    Put事务可以分为以下阶段:

    • doPut:将批数据先写入临时缓冲区putList
    • doCommit:检查channel内存队列是否足够合并。
    • doRollback:channel内存队列空间不足,抛弃数据,回滚event

    我们从Source数据接收到写入Channel这个过程对Put事务进行分析。


    AvroSource会spawn多个Worker线程(ThriftSourceHandler)去处理数据,Worker处理数据的接口,我们只看batch批量处理这个接口:

    public Status appendBatch(List<AvroFlumeEvent> events) {
        logger.debug("Avro source {}: Received avro event batch of {} events.",
            getName(), events.size());
        sourceCounter.incrementAppendBatchReceivedCount();
        sourceCounter.addToEventReceivedCount(events.size());
    
        List<Event> batch = new ArrayList<Event>();
    
        for (AvroFlumeEvent avroEvent : events) {
          Event event = EventBuilder.withBody(avroEvent.getBody().array(),
              toStringMap(avroEvent.getHeaders()));
    
          batch.add(event);
        }
    
        try {
          //ChannelProcessor,在Source初始化的时候传进来.将数据写入对应的Channel
          getChannelProcessor().processEventBatch(batch);
        } catch (Throwable t) {
          logger.error("Avro source " + getName() + ": Unable to process event " +
              "batch. Exception follows.", t);
          sourceCounter.incrementChannelWriteFail();
          if (t instanceof Error) {
            throw (Error) t;
          }
          return Status.FAILED;
        }
    
        sourceCounter.incrementAppendBatchAcceptedCount();
        sourceCounter.addToEventAcceptedCount(events.size());
    
        return Status.OK;
      }
    

    事务逻辑都在processEventBatch这个方法里:

    public void processEventBatch(List<Event> events) {
        Preconditions.checkNotNull(events, "Event list must not be null");
        //预处理每行数据
        events = interceptorChain.intercept(events);
    
        Map<Channel, List<Event>> reqChannelQueue =
            new LinkedHashMap<Channel, List<Event>>();
    
        Map<Channel, List<Event>> optChannelQueue =
            new LinkedHashMap<Channel, List<Event>>();
    
        //分类数据,划分不同的channel集合对应的数据
        for (Event event : events) {
          List<Channel> reqChannels = selector.getRequiredChannels(event);
    
          for (Channel ch : reqChannels) {
            List<Event> eventQueue = reqChannelQueue.get(ch);
            if (eventQueue == null) {
              eventQueue = new ArrayList<Event>();
              reqChannelQueue.put(ch, eventQueue);
            }
            eventQueue.add(event);
          }
    
          List<Channel> optChannels = selector.getOptionalChannels(event);
    
          for (Channel ch : optChannels) {
            List<Event> eventQueue = optChannelQueue.get(ch);
            if (eventQueue == null) {
              eventQueue = new ArrayList<Event>();
              optChannelQueue.put(ch, eventQueue);
            }
    
            eventQueue.add(event);
          }
        }
    
        // Process required channels
        for (Channel reqChannel : reqChannelQueue.keySet()) {
          Transaction tx = reqChannel.getTransaction();
          Preconditions.checkNotNull(tx, "Transaction object must not be null");
          try {
            //事务开始,tx即MemoryTransaction类实例
            tx.begin();
    
            List<Event> batch = reqChannelQueue.get(reqChannel);
    
            for (Event event : batch) {
              // 这个put操作实际调用的是transaction.doPut
              reqChannel.put(event);
            }
            //提交,将数据写入Channel的队列中
            tx.commit();
          } catch (Throwable t) {
            //回滚
            tx.rollback();
            if (t instanceof Error) {
              LOG.error("Error while writing to required channel: " + reqChannel, t);
              throw (Error) t;
            } else if (t instanceof ChannelException) {
              throw (ChannelException) t;
            } else {
              throw new ChannelException("Unable to put batch on required " +
                  "channel: " + reqChannel, t);
            }
          } finally {
            if (tx != null) {
              tx.close();
            }
          }
        }
    
        // Process optional channels
        for (Channel optChannel : optChannelQueue.keySet()) {
          Transaction tx = optChannel.getTransaction();
          Preconditions.checkNotNull(tx, "Transaction object must not be null");
          try {
            tx.begin();
    
            List<Event> batch = optChannelQueue.get(optChannel);
    
            for (Event event : batch) {
              optChannel.put(event);
            }
    
            tx.commit();
          } catch (Throwable t) {
            tx.rollback();
            LOG.error("Unable to put batch on optional channel: " + optChannel, t);
            if (t instanceof Error) {
              throw (Error) t;
            }
          } finally {
            if (tx != null) {
              tx.close();
            }
          }
        }
      }
    

    每个Worker线程都拥有一个Transaction实例,保存在Channel(BasicChannelSemantics)里的ThreadLocal<BasicTransactionSemantics>变量currentTransaction中。
    那么,事务到底做了什么?


    实际上,Transaction实例包含两个双向阻塞队列LinkedBlockingDeque(感觉没必要用双向队列,每个线程写自己的putList,又不是多个线程?),分别为:

    • putList
    • takeList
      对于Put事物操作,当然是只用到putList了。putList就是一个临时的缓冲区,数据会先put到putList,最后由commit方法会检查channel是否有足够的缓冲区,有则合并到channel的队列。
      channel.put -> transaction.doPut:
    protected void doPut(Event event) throws InterruptedException {
          channelCounter.incrementEventPutAttemptCount();
          //计算数据字节大小
          int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
          //写入临时缓冲区putList
          if (!putList.offer(event)) {
            throw new ChannelException(
                "Put queue for MemoryTransaction of capacity " +
                putList.size() + " full, consider committing more frequently, " +
                "increasing capacity or increasing thread count");
          }
          putByteCounter += eventByteSize;
        }
    

    transaction.commit:

    protected void doCommit() throws InterruptedException {
          //检查channel的队列剩余大小是否足够
          int remainingChange = takeList.size() - putList.size();
          if (remainingChange < 0) {
            if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {
              throw new ChannelException("Cannot commit transaction. Byte capacity " +
                  "allocated to store event body " + byteCapacity * byteCapacitySlotSize +
                  "reached. Please increase heap space/byte capacity allocated to " +
                  "the channel as the sinks may not be keeping up with the sources");
            }
            if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
              bytesRemaining.release(putByteCounter);
              throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
                  " Sinks are likely not keeping up with sources, or the buffer size is too tight");
            }
          }
          int puts = putList.size();
          int takes = takeList.size();
          synchronized (queueLock) {
            if (puts > 0) {
              while (!putList.isEmpty()) {
                //写入到channel的队列
                if (!queue.offer(putList.removeFirst())) {
                  throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
                }
              }
            }
            //清除临时队列
            putList.clear();
            takeList.clear();
          }
          bytesRemaining.release(takeByteCounter);
          takeByteCounter = 0;
          putByteCounter = 0;
    
          queueStored.release(puts);
          if (remainingChange > 0) {
            queueRemaining.release(remainingChange);
          }
          if (puts > 0) {
            channelCounter.addToEventPutSuccessCount(puts);
          }
          if (takes > 0) {
            channelCounter.addToEventTakeSuccessCount(takes);
          }
    
          channelCounter.setChannelSize(queue.size());
        }
    

    如果在事务期间出现异常,比如channel剩余空间不足,则rollback:

    protected void doRollback() {
          int takes = takeList.size();
          //检查内存队列空间大小,是否足够takeList写回去
          synchronized (queueLock) {
            Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),
                "Not enough space in memory channel " +
                "queue to rollback takes. This should never happen, please report");
            while (!takeList.isEmpty()) {
              queue.addFirst(takeList.removeLast());
            }
            //抛弃数据,没合并到channel的内存队列
            putList.clear();
          }
          putByteCounter = 0;
          takeByteCounter = 0;
    
          queueStored.release(takes);
          channelCounter.setChannelSize(queue.size());
        }
    

    Take事务流程

    Take事务分为以下阶段:

    • doTake:先将数据取到临时缓冲区takeList
    • doCommit:如果数据全部发送成功,则清除临时缓冲区takeList
    • doRollback:数据发送过程中如果出现异常,rollback将临时缓冲区takeList中的数据归还给channel内存队列。


    Sink其实是由SinkRunner线程调用Sink.process方法来了处理数据的。我们从HdfsEventSink的process方法说起,Sink类都有个process方法,用来处理传输数据的逻辑。:

    public Status process() throws EventDeliveryException {
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
         //事务开始
        transaction.begin();
        try {
          Set<BucketWriter> writers = new LinkedHashSet<>();
          int txnEventCount = 0;
          for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
            //take数据到临时缓冲区,实际调用的是transaction.doTake
            Event event = channel.take();
            if (event == null) {
              break;
            }
    
            // reconstruct the path name by substituting place holders
            String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
                timeZone, needRounding, roundUnit, roundValue, useLocalTime);
            String realName = BucketPath.escapeString(fileName, event.getHeaders(),
                timeZone, needRounding, roundUnit, roundValue, useLocalTime);
    
            String lookupPath = realPath + DIRECTORY_DELIMITER + realName;
            BucketWriter bucketWriter;
            HDFSWriter hdfsWriter = null;
            // Callback to remove the reference to the bucket writer from the
            // sfWriters map so that all buffers used by the HDFS file
            // handles are garbage collected.
            WriterCallback closeCallback = new WriterCallback() {
              @Override
              public void run(String bucketPath) {
                LOG.info("Writer callback called.");
                synchronized (sfWritersLock) {
                  sfWriters.remove(bucketPath);
                }
              }
            };
            synchronized (sfWritersLock) {
              bucketWriter = sfWriters.get(lookupPath);
              // we haven't seen this file yet, so open it and cache the handle
              if (bucketWriter == null) {
                hdfsWriter = writerFactory.getWriter(fileType);
                bucketWriter = initializeBucketWriter(realPath, realName,
                  lookupPath, hdfsWriter, closeCallback);
                sfWriters.put(lookupPath, bucketWriter);
              }
            }
    
            // Write the data to HDFS
            try {
              //写数据到HDFS
              bucketWriter.append(event);
            } catch (BucketClosedException ex) {
              LOG.info("Bucket was closed while trying to append, " +
                       "reinitializing bucket and writing event.");
              hdfsWriter = writerFactory.getWriter(fileType);
              bucketWriter = initializeBucketWriter(realPath, realName,
                lookupPath, hdfsWriter, closeCallback);
              synchronized (sfWritersLock) {
                sfWriters.put(lookupPath, bucketWriter);
              }
              bucketWriter.append(event);
            }
    
            // track the buckets getting written in this transaction
            if (!writers.contains(bucketWriter)) {
              writers.add(bucketWriter);
            }
          }
    
          if (txnEventCount == 0) {
            sinkCounter.incrementBatchEmptyCount();
          } else if (txnEventCount == batchSize) {
            sinkCounter.incrementBatchCompleteCount();
          } else {
            sinkCounter.incrementBatchUnderflowCount();
          }
    
          // flush all pending buckets before committing the transaction
          for (BucketWriter bucketWriter : writers) {
            bucketWriter.flush();
          }
          //事务提交
          transaction.commit();
    
          if (txnEventCount < 1) {
            return Status.BACKOFF;
          } else {
            sinkCounter.addToEventDrainSuccessCount(txnEventCount);
            return Status.READY;
          }
        } catch (IOException eIO) {
          transaction.rollback();
          LOG.warn("HDFS IO error", eIO);
          sinkCounter.incrementEventWriteFail();
          return Status.BACKOFF;
        } catch (Throwable th) {
          transaction.rollback();
          LOG.error("process failed", th);
          sinkCounter.incrementEventWriteOrChannelFail(th);
          if (th instanceof Error) {
            throw (Error) th;
          } else {
            throw new EventDeliveryException(th);
          }
        } finally {
          //关闭事务
          transaction.close();
        }
      }
    

    大致流程图:


    接着看看channel.take,作用是将数据放到临时缓冲区,实际调用的是transaction.doTake:

    protected Event doTake() throws InterruptedException {
          channelCounter.incrementEventTakeAttemptCount();
          if (takeList.remainingCapacity() == 0) {
            throw new ChannelException("Take list for MemoryTransaction, capacity " +
                takeList.size() + " full, consider committing more frequently, " +
                "increasing capacity, or increasing thread count");
          }
          if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
            return null;
          }
          Event event;
          synchronized (queueLock) {
            //从channel内存队列取数据
            event = queue.poll();
          }
          Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
              "signalling existence of entry");
          //将数据放到临时缓冲区
          takeList.put(event);
    
          int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
          takeByteCounter += eventByteSize;
    
          return event;
        }
    

    相关文章

      网友评论

          本文标题:Flume事务详解

          本文链接:https://www.haomeiwen.com/subject/kdgoyctx.html