美文网首页
尚硅谷大数据技术之Flume

尚硅谷大数据技术之Flume

作者: 尚硅谷教育 | 来源:发表于2018-12-06 15:06 被阅读18次

    5.4.4 MySQLSource
    代码实现:
    package com.atguigu.source;

    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.PollableSource;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.event.SimpleEvent;
    import org.apache.flume.source.AbstractSource;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.text.ParseException;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;

    public class SQLSource extends AbstractSource implements Configurable, PollableSource {

    //打印日志
    

    private static final Logger LOG = LoggerFactory.getLogger(SQLSource.class);

    //定义sqlHelper
    private SQLSourceHelper sqlSourceHelper;
    
    
    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }
    
    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }
    
    @Override
    

    public void configure(Context context) {

        try {
            //初始化
            sqlSourceHelper = new SQLSourceHelper(context);
        } catch (ParseException e) {
            e.printStackTrace();
        }
    }
    
    @Override
    

    public Status process() throws EventDeliveryException {

        try {
            //查询数据表
            List<List<Object>> result = sqlSourceHelper.executeQuery();
    
            //存放event的集合
            List<Event> events = new ArrayList<>();
    
            //存放event头集合
            HashMap<String, String> header = new HashMap<>();
    
            //如果有返回数据,则将数据封装为event
            if (!result.isEmpty()) {
    
                List<String> allRows = sqlSourceHelper.getAllRows(result);
    
                Event event = null;
    
                for (String row : allRows) {
                    event = new SimpleEvent();
                    event.setBody(row.getBytes());
                    event.setHeaders(header);
                    events.add(event);
                }
    
                //将event写入channel
                this.getChannelProcessor().processEventBatch(events);
    
                //更新数据表中的offset信息
                sqlSourceHelper.updateOffset2DB(result.size());
            }
    
            //等待时长
            Thread.sleep(sqlSourceHelper.getRunQueryDelay());
    
            return Status.READY;
        } catch (InterruptedException e) {
            LOG.error("Error procesing row", e);
    
            return Status.BACKOFF;
        }
    }
    
    @Override
    

    public synchronized void stop() {

        LOG.info("Stopping sql source {} ...", getName());
    
        try {
            //关闭资源
            sqlSourceHelper.close();
        } finally {
            super.stop();
        }
    }
    

    }

    本教程由尚硅谷教育大数据研究院出品,如需转载请注明来源,欢迎大家关注尚硅谷公众号(atguigu)了解更多。

    相关文章

      网友评论

          本文标题:尚硅谷大数据技术之Flume

          本文链接:https://www.haomeiwen.com/subject/eejocqtx.html