美文网首页
storm+mysql集成

storm+mysql集成

作者: 数据萌新 | 来源:发表于2018-10-10 16:49 被阅读0次

    使用storm自带的JdbcInsertBolt插入数据

    maven

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
     
      <groupId>com.sid.bigdata</groupId>
      <artifactId>storm</artifactId>
      <version>0.0.1</version>
      <packaging>jar</packaging>
     
      <name>storm</name>
      <url>http://maven.apache.org</url>
     
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <storm.version>1.1.1</storm.version>
      </properties>
     
      <dependencies>
        
      <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>${storm.version}</version> 
      </dependency>
        <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-jdbc</artifactId>
        <version>${storm.version}</version> 
      </dependency>
      
      <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.31</version>
    </dependency>
      
      </dependencies>
    </project>
    
    

    mysql

    CREATE TABLE `wordcount` (
      `word` varchar(100) DEFAULT NULL,
      `count` int(11) DEFAULT NULL
    ) ENGINE=MyISAM DEFAULT CHARSET=utf8;
    

    spout

    
     
    package com.neusoft;
    
    import java.util.Map;
    import java.util.Random;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    
    public class WordCountSpout extends BaseRichSpout{
    
        private SpoutOutputCollector collector;
    
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
        }
    
        public static final String[] words = new String[]{"aaa","bbb","ccc","aa","bb","a"};
    
        /**
         * 1.把每一行数据发射出去
         * */
        public void nextTuple() {
            Random random = new Random();
            String word =words[random.nextInt(words.length)];                       //获取文件中的每行内容
            //发射出去
            this.collector.emit(new Values(word));
    
            System.out.println("emit: "+word);
    
            Utils.sleep(50);
        }
    
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    
    }
    
    
    

    CountBolt

    
    package com.neusoft;
    
    /**
     * Created by ttc on 2018/9/28.
     */
    
    import java.util.Map;
    
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    
    public class CountBolt extends BaseRichBolt{
    
        private OutputCollector collector;
    
    
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
    
        }
    
    
        /**
         * 业务逻辑
    
         * */
        public void execute(Tuple input) {
    
            collector.emit(new Values(input.getStringByField("word"),1));
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            //后面jdbc insert bolt直接把这里的输出写Mysql里去了,所以这里的fileds的名字要跟mysql表的字段名字对应
            declarer.declare(new Fields("word","count"));
        }
    
    
    }
    
    
    
    

    LocalWordCountStormJdbcTopology

    package com.neusoft;
    
    import java.util.Map;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    
    import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
    import org.apache.storm.jdbc.common.ConnectionProvider;
    import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
    
    import org.apache.storm.jdbc.mapper.JdbcMapper;
    import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
    import org.apache.storm.shade.com.google.common.collect.Maps;
    import org.apache.storm.topology.TopologyBuilder;
    
    
    
    public class LocalWordCountStormJdbcTopology {
    
        public static void main(String[] args) {
            //本地模式,没有提交到服务器集群上,不需要搭建storm集群
            LocalCluster cluster = new LocalCluster();
    
            //TopologyBuilder根据spout和bolt来构建Topology
            //storm中任何一个作业都是通过Topology方式进行提交的
            //Topology中需要指定spout和bolt的执行顺序
            TopologyBuilder tb = new TopologyBuilder();
            tb.setSpout("DataSourceSpout", new WordCountSpout());
            //SumBolt以随机分组的方式从DataSourceSpout中接收数据
            tb.setBolt("CountBolt", new CountBolt()).shuffleGrouping("DataSourceSpout");
    
            Map hikariConfigMap = Maps.newHashMap();
            hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
            hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/scott");
            hikariConfigMap.put("dataSource.user","root");
            hikariConfigMap.put("dataSource.password","root");
            ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
    
    
            //mysql的表名
            String tableName = "wordcount";
            JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);
            JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
                    .withTableName(tableName)
                    .withQueryTimeoutSecs(30);
    
            tb.setBolt("JdbcInsertBolt", userPersistanceBolt).shuffleGrouping("CountBolt");
    
    
            //第一个参数是topology的名称,第三个参数是Topology
            cluster.submitTopology("LocalWordCountStormJdbcTopology", new Config(), tb.createTopology());
    
        }
    }
    
    
    

    相关文章

      网友评论

          本文标题:storm+mysql集成

          本文链接:https://www.haomeiwen.com/subject/ofqmaftx.html