美文网首页
flume dualchannel

flume dualchannel

作者: 背麻袋的袋鼠 | 来源:发表于2019-04-08 09:27 被阅读0次
public class DualChannel extends BasicChannelSemantics {

private static final Logger logger = LoggerFactory.getLogger(DualChannel.class);

/****************************** fileChannel **********************************/

private volatile boolean open = false;
private static final String PUT = "PUT";
private static final String TAKE = "TAKE";


/**************************** else ************************************/

private AtomicBoolean putToMemChannel = new AtomicBoolean(true);
private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true);

private FileChannel fileChannel;
private MemoryChannel memoryChannel;

private FileChannel.FileBackedTransaction fileBackedTransaction;
private MemoryChannel.MemoryTransaction memoryTransaction;

public DualChannel() {
    this.fileChannel = new FileChannel();
    this.memoryChannel = new MemoryChannel();
}

@Override
protected BasicTransactionSemantics createTransaction() {
    return null;
}


@Override
protected BasicTransactionSemantics createTransaction(String action) {
    logger.info("method  createTransaction ( DualTransaction  ) before......");
    BasicTransactionSemantics basicTransactionSemantics = null;
    if (open) {
        if (PUT.equals(action)) {//获取
            if (putToMemChannel.get()) {//
                basicTransactionSemantics = memoryChannel.createTransaction();
                memoryTransaction = (MemoryChannel.MemoryTransaction) basicTransactionSemantics;
            } else {
                basicTransactionSemantics = fileChannel.createTransaction();
                fileBackedTransaction = (FileChannel.FileBackedTransaction) fileChannel.createTransaction();
            }
        } else if (TAKE.equals(action)) {
            if (takeFromMemChannel.get()) {
                basicTransactionSemantics = memoryChannel.createTransaction();
                memoryTransaction = (MemoryChannel.MemoryTransaction) basicTransactionSemantics;
            } else {
                basicTransactionSemantics = fileChannel.createTransaction();
                fileBackedTransaction = (FileChannel.FileBackedTransaction) fileChannel.createTransaction();
            }
        }
    }
    return basicTransactionSemantics;
}

@Override
public synchronized void setName(String name) {
    logger.info("method  setName  before......" + name);
    fileChannel.setName(name);
    memoryChannel.setName(name);
    super.setName(name);
}

@Override
public void configure(Context context) {
    logger.info("method  configgure  before......" + context.toString());
    this.memoryChannel.configure(context);
    this.fileChannel.configure(context);

}

@Override
public synchronized void start() {
    super.start();
    open = true;
    logger.info("method  start  before......");
    this.fileChannel.start();
    this.memoryChannel.start();

    logger.info("method  start  after......");

}

@Override
public synchronized void stop() {
    super.stop();
    open = false;
    logger.info("method  stop  before......");
    this.fileChannel.stop();
    this.memoryChannel.stop();
    logger.info("method  stop  after......");
}


@Override
public void put(Event event) throws ChannelException {
    logger.info("method  put  before......");
    if (open) {
        if (putToMemChannel.get()) {
            //往memChannel中写数据
            memoryTransaction.put(event);
            if (memoryChannel.isFull()) {
                putToMemChannel.set(false);
            }
        } else {
            //往fileChannel中写数据
            fileBackedTransaction.put(event);
        }
    }
    logger.info("method  put  after......");
}

@Override
public Event take() throws ChannelException {//put 和 take 事务紊乱
    logger.info("method  take  before......");
    Event event = null;
    if (open) {
        if (takeFromMemChannel.get()) {
            //从memChannel中取数据
            event = memoryTransaction.take();
            if (event == null) {
                takeFromMemChannel.set(false);
            }
        } else {
            //从fileChannel中取数据
            event = fileBackedTransaction.take();
            if (event == null) {
                takeFromMemChannel.set(true);

                putToMemChannel.set(true);
            }
        }
        logger.info("method  take  after......");
    }
    return event;
}

}

相关文章

  • flume dualchannel

    }

  • Flume01

    Flume架构组成 Flume 负载均衡 Flume Agent内部原理 启动 Flume 监听

  • Flume

    总结 一、Flume的定义 1、flume的优势 2、flume的组成 3、flume的架构 二、 flume部署...

  • 玩转大数据计算之Flume

    Flume版本:我们使用Flume最新的版本:Flume NG 1.7.0 Flume架构Flume是一个分布式的...

  • Flume 入门

    一:Flume是什么: 二:特点: 三:Flume版本介绍 四:Flume NG基本架构 五:Flume NG核心...

  • flume的部署和测试

    1 flume 安装 flume下载:http://flume.apache.org/download.htmlf...

  • 091-BigData-19Flume与Flume之间数据传递

    上一篇:090-BigData-18Flume Flume与Flume之间数据传递 一、单Flume多Channe...

  • java大数据之flume

    一、Flume简介 1.1 Flume的位置 1.2 Flume是什么 (1)Flume提供一种分布式的,可靠地,...

  • Flume(一)概述

    Flume图标 Flume图标 Flume定义 Apache Flume是一个分布式,可靠且可用的系统,用于有效地...

  • Flume pull方式和push方式整合

    Pull方式 Flume Agent 编写 启动Flume Push方式 Flume Agent的编写 启动flu...

网友评论

      本文标题:flume dualchannel

      本文链接:https://www.haomeiwen.com/subject/vxgyiqtx.html