美文网首页
flume dualchannel

flume dualchannel

作者: 背麻袋的袋鼠 | 来源:发表于2019-04-08 09:27 被阅读0次
    public class DualChannel extends BasicChannelSemantics {
    
    private static final Logger logger = LoggerFactory.getLogger(DualChannel.class);
    
    /****************************** fileChannel **********************************/
    
    private volatile boolean open = false;
    private static final String PUT = "PUT";
    private static final String TAKE = "TAKE";
    
    
    /**************************** else ************************************/
    
    private AtomicBoolean putToMemChannel = new AtomicBoolean(true);
    private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true);
    
    private FileChannel fileChannel;
    private MemoryChannel memoryChannel;
    
    private FileChannel.FileBackedTransaction fileBackedTransaction;
    private MemoryChannel.MemoryTransaction memoryTransaction;
    
    public DualChannel() {
        this.fileChannel = new FileChannel();
        this.memoryChannel = new MemoryChannel();
    }
    
    @Override
    protected BasicTransactionSemantics createTransaction() {
        return null;
    }
    
    
    @Override
    protected BasicTransactionSemantics createTransaction(String action) {
        logger.info("method  createTransaction ( DualTransaction  ) before......");
        BasicTransactionSemantics basicTransactionSemantics = null;
        if (open) {
            if (PUT.equals(action)) {//获取
                if (putToMemChannel.get()) {//
                    basicTransactionSemantics = memoryChannel.createTransaction();
                    memoryTransaction = (MemoryChannel.MemoryTransaction) basicTransactionSemantics;
                } else {
                    basicTransactionSemantics = fileChannel.createTransaction();
                    fileBackedTransaction = (FileChannel.FileBackedTransaction) fileChannel.createTransaction();
                }
            } else if (TAKE.equals(action)) {
                if (takeFromMemChannel.get()) {
                    basicTransactionSemantics = memoryChannel.createTransaction();
                    memoryTransaction = (MemoryChannel.MemoryTransaction) basicTransactionSemantics;
                } else {
                    basicTransactionSemantics = fileChannel.createTransaction();
                    fileBackedTransaction = (FileChannel.FileBackedTransaction) fileChannel.createTransaction();
                }
            }
        }
        return basicTransactionSemantics;
    }
    
    @Override
    public synchronized void setName(String name) {
        logger.info("method  setName  before......" + name);
        fileChannel.setName(name);
        memoryChannel.setName(name);
        super.setName(name);
    }
    
    @Override
    public void configure(Context context) {
        logger.info("method  configgure  before......" + context.toString());
        this.memoryChannel.configure(context);
        this.fileChannel.configure(context);
    
    }
    
    @Override
    public synchronized void start() {
        super.start();
        open = true;
        logger.info("method  start  before......");
        this.fileChannel.start();
        this.memoryChannel.start();
    
        logger.info("method  start  after......");
    
    }
    
    @Override
    public synchronized void stop() {
        super.stop();
        open = false;
        logger.info("method  stop  before......");
        this.fileChannel.stop();
        this.memoryChannel.stop();
        logger.info("method  stop  after......");
    }
    
    
    @Override
    public void put(Event event) throws ChannelException {
        logger.info("method  put  before......");
        if (open) {
            if (putToMemChannel.get()) {
                //往memChannel中写数据
                memoryTransaction.put(event);
                if (memoryChannel.isFull()) {
                    putToMemChannel.set(false);
                }
            } else {
                //往fileChannel中写数据
                fileBackedTransaction.put(event);
            }
        }
        logger.info("method  put  after......");
    }
    
    @Override
    public Event take() throws ChannelException {//put 和 take 事务紊乱
        logger.info("method  take  before......");
        Event event = null;
        if (open) {
            if (takeFromMemChannel.get()) {
                //从memChannel中取数据
                event = memoryTransaction.take();
                if (event == null) {
                    takeFromMemChannel.set(false);
                }
            } else {
                //从fileChannel中取数据
                event = fileBackedTransaction.take();
                if (event == null) {
                    takeFromMemChannel.set(true);
    
                    putToMemChannel.set(true);
                }
            }
            logger.info("method  take  after......");
        }
        return event;
    }
    

    }

    相关文章

      网友评论

          本文标题:flume dualchannel

          本文链接:https://www.haomeiwen.com/subject/vxgyiqtx.html