美文网首页
14_Flume之事务

14_Flume之事务

作者: K__3f8b | 来源:发表于2023-09-26 01:16 被阅读0次

Flume使用两个独立的事务分别负责从soucrce到channel,以及从channel到sink的event传递。一旦事务中所有的event全部传递到channel且提交成功,那么source就将该文件标记为完成。同理,事务以类似的方式处理从channel到sink的传递过程,如果因为某种原因使得event无法记录,那么事务将会回滚,且所有的event都会保持到channel中,等待重新传递。

Flume的事务机制保证了source产生的每个event都会传送到sink中(如果失败会无限重试),flume采用的是At-least-once的提交方式,这样就造成每个source产生的event至少到达sink一次,这种方式保证了数据的可靠性,但数据可能重复。

Transaction接口定义如下:

public void begin();

public void commit();

public void rollback();

public void close();

以MemoryTransaction介绍介绍下事务机制:

MemoryTransaction是MemoryChannel中的一个内部类,内部有2个阻塞队列putList和takeList,MemoryChannel内部有个queue阻塞队列。putList接收Source交给Channel的event数据,takeList保存Channel交给Sink的event数据。

  1. 如果Source交给Channel任务完成,进行commit时,会把putList中的所有event放到MemoryChannel中的queue。
  2. 如果Source交给Channel任务失败,进行rollback时,程序就不会继续走下去,比如KafkaSource需要commitOffsets,如果任务失败就不会commitOffsets。
  3. 如果Sink处理完Channel带来的event,进行commit的时,会清空takeList中的event数据,因为已经没consume。
  4. 如果Sink处理Channel带来的event失败,进行rollback的时,会把takeList中的event写回到queue中。

commit的关键代码:

@Override
protected void doCommit() throws InterruptedException {
  int puts = putList.size();
  int takes = takeList.size();
  synchronized(queueLock) {
    if(puts > 0 ) {
      // 清空putList,丢到外部类MemoryChannel中的queue队列里
      while(!putList.isEmpty()) {
        // MemoryChannel中的queue队列
        if(!queue.offer(putList.removeFirst())) {
          throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
        }
      }
    }
    putList.clear();
    takeList.clear();
  }
}

rollback的关键代码

@Override
protected void doRollback() {
  int takes = takeList.size();
  synchronized(queueLock) {
    Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +
        "queue to rollback takes. This should never happen, please report");
    // 把takeList中的数据放回到queue中
    while(!takeList.isEmpty()) {
      queue.addFirst(takeList.removeLast());
    }
    putList.clear();
  }
}

相关文章

  • golang mysql事务

    Golang之Mysql事务

  • Redis事务

    转载自Redis之Redis事务 Redis事务的概念: Redis 事务的本质是一组命令的集合。事务支持一次执行...

  • SAP MM 事务代码MI31之思考之续集

    SAP MM 事务代码MI31之思考之续集 上周写了一篇文章,关于事务代码MI31之思考的(https://www...

  • 十九、spring事务之创建事务

    接上面一节十八、spring 事务之事务执行流程,Spring获取事务管理器后,就开始创建事务信息,这里面的逻辑就...

  • Spring事务之编程式事务

    一.Spring对编程式事务的支持 Spring中的事务分为物理事务和逻辑事务;物理事务:就是底层数据库提供的事务...

  • PL/SQL事务

    开始/结束事务事务都有开始和结束。事务开始时有下列事件之一: 连接到数据库后执行的第一个SQL语句。 在事务完成之...

  • 2022-06-28

    思考与行动之间的可怕“沼泽”(GTD:搞定之54) 事务最艰巨的任务,就是“明确你的事务” 在明确自己的事务时,我...

  • MySQL之:事务

    并发控制 锁粒度: 表级锁 行级锁 锁: 读锁:共享锁,只读不可写,多个读互相不阻塞 写锁:独占锁,排它锁,一个写...

  • Spring0001

    1.【Spring学习34】Spring事务(4):事务属性之7种传播行为 https://blog.csdn.n...

  • Spring 事务传播行为和隔离级别的应用

    一言蔽之 对Spring事务了解一直处于理论阶段,几个事务传播行为(propagation behaviors),...

网友评论

      本文标题:14_Flume之事务

      本文链接:https://www.haomeiwen.com/subject/lzoxjdtx.html