美文网首页
Storm从入门到精通11:Storm与JDBC集成

Storm从入门到精通11:Storm与JDBC集成

作者: 金字塔下的小蜗牛 | 来源:发表于2020-04-04 23:15 被阅读0次

    1.导入Jar包

    将Storm处理的结果存放到MySQL数据库中,需要依赖下面一些Jar包:

    $STORM_HOME\external\sql\storm-sql-core*.jar
    $STORM_HOME\external\storm-jdbc\storm-jdbc-1.0.3.jar
    mysql-connector-java-5.1.7-bin.jar
    commons-lang3-3.1.jar

    2.示例

    将Storm的计算结果存入MySQL:以Storm的WordCount程序为例

    2.1创建Spout

    创建Spout(WordCountSpout)组件采集数据,作为整个Topology的数据源

    public class WordCountSpout extends BaseRichSpout{
        //模拟数据
        private String[] data = {"I love Beijing",
                                 "I love China",
                                  "Beijing is the capital of China"};
        //用于往下一个组件发送消息
        private SpoutOutputCollector collector;
        @Override
        public void nextTuple(){
            Utils.sleep(3000);
            //由Storm框架调用,用于接收外部数据源的数据
            int random = (new Random()).nextInt(3);
            String sentence = data[random];
            //System.out.println("发送数据:"+sentence);
            this.collector.emit(new Values(sentence));
        }
        @Override
        public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector){
            //Spout初始化方法
            this.collector = collector;
        }    @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer){
            declarer.declare(new Fields("sentence"));
        }
    }
    

    2.2创建Bolt

    创建Bolt(WordCountSplitBolt)组件进行分词操作

    public class WordCountSplitBolt extends BaseRichBolt{
        private OutputCollector collector;
        @Override
        public void execute(Tuple tuple){
            String sentence = tuple.getStringByField("sentence");
            //分词
            String[] words = sentence.split(" ");
            for(String word:words){
                this.collector.emit(new Values(word,1));
            }
        }
        @Override
        public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector){
            this.collector = collector;
        }    @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer){
            declarer.declare(new Fields("word","count"));
        }
    }
    

    2.3创建Bolt

    创建Bolt(WordCountBoltCount)组件进行单词计数操作

    public class WordCountBoltCount extends BaseRichBolt{
        private Map<String, Integer> result = new HashMap<String, Integer>();
        private OutputCollector collector;
        @Override
        public void execute(Tuple tuple){
            String word = tuple.getStringByField("word");
            int count = tuple.getIntegerByField("count");
            if(result.containsKey(word)){
                int total = result.get(word);
                result.put(word,total+count);
            }else{
                result.put(word,1);
            }
            //输出结果到屏幕
            //System.out.println("输出的结果是:"+result);
            //将统计结果发送给下一个Bolt,即插入MySQL数据库
            this.collector.emit(new Values(word,result.get(word)));
        }
        @Override
        public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector){
            this.collector = collector;
        } 
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer){
            declarer.declare(new Fields("word","total"));
        }
    }
    

    2.4创建主程序Topology

    创建主程序Topology(WordCountTopology)

    public static class WordCountTopology{
        //创建JDBC Insert Bolt组件,需要实现在MySQL中创建对应的表:result
        private static IRichBolt createJDBCBolt(){
            ConnectionProvider connectionProvider = new MyConnectionProvider();
            JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper("aaa",connectionProvider);
            return new JdbcInsertBolt(connectionProvider,simpleJdbcMapper)
                       .withTableName("result").withQueryTimeoutSecs(30);
        }
        public static void main(String[] args){
            TopologyBuilder builder = new TopologyBuilder();  
            //设置任务的Spout组件
            builder.setSpout("wordcount_spout",new WordCountSpout());
            //设置任务的第一个Bolt组件
            builder.setBolt("wordcount_splitbolt",new WordCountSplitBolt())
                   .shuffleGrouping("wordcount_spout");
            //设置任务的第二个Bolt组件
            builder.setBolt("wordcount_count",new WordCountBoltCount())
                   .filedsGrouping("wordcount_splitbolt",new Fields("word"));
            //创建Topology任务
            StormTopology wc = builder.createTopology();
            Config config = new Config();
            //提交到Storm集群运行
            StormSubmitter.submitTopology(args[0],config,wc);
        }
    }
    

    2.5实现ConnectionProvider接口

    class MyConnectionProvider implements ConnectionProvider{
        private static String driver = "com.mysql.jdbc.Driver";
        private static String url = "jdbc:mysql://192.168.126.110:3306/demo";
        private static String user = "root";
        private static String password = "123456";
        static{
            try{
                Class.forName(dirver);
            }catch(ClassNotFoundException e){
                throw new ExecptionInInitializerError(e);
            }
        }
        @Override
        public Connection getConnection(){
            try{
                return DriverManager.getConnection(usl,user,password);
            }catch(SQLException e){
                e.printStackTrace();
            }
            return null;
        }
        public void cleanup(){}
        public void prepare(){}
    }
    

    相关文章

      网友评论

          本文标题:Storm从入门到精通11:Storm与JDBC集成

          本文链接:https://www.haomeiwen.com/subject/adjkdhtx.html