美文网首页
storm整合kafka

storm整合kafka

作者: piziyang12138 | 来源:发表于2018-10-04 16:26 被阅读0次

    maven

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>storm-kafka-mysql</groupId>
        <artifactId>storm-kafka-mysql</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>jar</packaging>
        <name>storm-kafka-mysql</name>
        <description />
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
        <dependencies>
            <dependency>
                <groupId>javax</groupId>
                <artifactId>javaee-api</artifactId>
                <version>8.0</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.glassfish.web</groupId>
                <artifactId>javax.servlet.jsp.jstl</artifactId>
                <version>1.2.2</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-core</artifactId>
                <version>0.9.5</version>
                <!--<scope>provided</scope>-->
            </dependency>
    
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-kafka</artifactId>
                <version>0.9.5</version>
                <!--<scope>provided</scope>-->
            </dependency>
    
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>0.8.2.0</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.zookeeper</groupId>
                        <artifactId>zookeeper</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.31</version>
            </dependency>
    
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>2.3.2</version>
                    <configuration>
                        <source>1.7</source>
                        <target>1.7</target>
                    </configuration>
                </plugin>
                <!--<plugin>-->
                    <!--<artifactId>maven-war-plugin</artifactId>-->
                    <!--<version>2.2</version>-->
                    <!--<configuration>-->
                        <!--<version>3.1</version>-->
                        <!--<failOnMissingWebXml>false</failOnMissingWebXml>-->
                    <!--</configuration>-->
                <!--</plugin>-->
            </plugins>
        </build>
    </project>
    
    

    MyKafkaTopology

    package com.neusoft;
    
    import java.util.Arrays;
    
    import storm.kafka.BrokerHosts;
    import storm.kafka.KafkaSpout;
    import storm.kafka.SpoutConfig;
    import storm.kafka.StringScheme;
    import storm.kafka.ZkHosts;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.AlreadyAliveException;
    import backtype.storm.generated.InvalidTopologyException;
    import backtype.storm.spout.SchemeAsMultiScheme;
    import backtype.storm.topology.TopologyBuilder;
    public class MyKafkaTopology {
    
         public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
              String zks = "hadoop01:2181,hadoop02:2181,hadoop03:2181";
              String topic = "orderMq";
             // String zkRoot = "/opt/modules/app/zookeeper/zkdata"; // default zookeeper root configuration for storm
              String id = "wordtest";
    
              BrokerHosts brokerHosts = new ZkHosts(zks);
              SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, "", id);
              spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
              spoutConf.forceFromStart = false;
              spoutConf.zkServers = Arrays.asList(new String[] {"hadoop01", "hadoop02", "hadoop03"});
              spoutConf.zkPort = 2181;
    
              TopologyBuilder builder = new TopologyBuilder();
              builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 2); // Kafka我们创建了一个2分区的Topic,这里并行度设置为2
              builder.setBolt("print-bolt", new PrintBolt(), 2).shuffleGrouping("kafka-reader");
    
              Config conf = new Config();
              String name = MyKafkaTopology.class.getSimpleName();
              if (args != null && args.length > 0) {
                   // Nimbus host name passed from command line
                   conf.put(Config.NIMBUS_HOST, args[0]);
                   conf.setNumWorkers(3);
                   StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
              } 
              else {
                   conf.setMaxTaskParallelism(3);
                   LocalCluster cluster = new LocalCluster();
                   cluster.submitTopology(name, conf, builder.createTopology());
    //               Thread.sleep(60000);
    //               cluster.killTopology(name);
    //               cluster.shutdown();
    
    //               StormSubmitter.submitTopology(name, conf, builder.createTopology());
              }
         }
    }
    
    

    bolt

    package com.neusoft;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseBasicBolt;
    
    import backtype.storm.tuple.Tuple;
    
    public class PrintBolt extends BaseBasicBolt {
    
        public static final Log log = LogFactory.getLog(PrintBolt.class);
    
        public static final long serialVersionUID = 1L;
    
        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            //获取上一个组件所声明的Field
    
            String print = input.getString(0);
            log.info("message: " + print);
            System.out.println("message is : " + print);
    
            //进行传递给下一个bolt
            //collector.emit(new Values(print));
    
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            //declarer.declare(new Fields("write"));
        }
    
    }
    

    相关文章

      网友评论

          本文标题:storm整合kafka

          本文链接:https://www.haomeiwen.com/subject/lfbxaftx.html