美文网首页
04 kafka与storm的整合

04 kafka与storm的整合

作者: 张力的程序园 | 来源:发表于2020-06-29 13:47 被阅读0次

    本节将展示storm作为kafka的消费者。

    1、前提约束

    2、操作步骤

    • 在idea或者eclipse中创建一个maven项目,加入以下依赖:
    <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-core</artifactId>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>log4j-over-slf4j</artifactId>
                    </exclusion>
                </exclusions>
                <version>1.2.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-kafka-client</artifactId>
                <version>1.1.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.10.0.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-kafka</artifactId>
                <version>1.2.1</version>
            </dependency>
    
    • 在项目的src/main/java文件夹下创建PrintBolt.java
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseBasicBolt;
    import org.apache.storm.tuple.Tuple;
    
    public class PrintBolt extends BaseBasicBolt {
        /**
         * execute会被storm一直调用
         *
         * @param tuple
         * @param basicOutputCollector
         */
        public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
            System.err.println(tuple.getValue(4));
        }
    
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    
        }
    }
    
    • 在项目的src/main/java文件夹下创建MainTopology.java
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.kafka.spout.KafkaSpout;
    import org.apache.storm.kafka.spout.KafkaSpoutConfig;
    import org.apache.storm.topology.TopologyBuilder;
    
    public class MainTopology {
        public static void main(String[] args) throws Exception {
            TopologyBuilder builder = new TopologyBuilder();
            KafkaSpoutConfig.Builder<String, String> kafkaBuilder = KafkaSpoutConfig.builder("192.168.100.141:9092", "kafka-storm");
            // 设置kafka属于哪个组
            kafkaBuilder.setGroupId("testgroup");
            // 创建kafkaspoutConfig
            KafkaSpoutConfig<String, String> build = kafkaBuilder.build();
            // 通过kafkaspoutConfig获得kafkaspout
            KafkaSpout<String, String> kafkaSpout = new KafkaSpout<String, String>(build);
            // 设置5个线程接收数据
            builder.setSpout("kafkaSpout", kafkaSpout, 1);
            // 设置2个线程处理数据
            builder.setBolt("printBolt", new PrintBolt(), 1).localOrShuffleGrouping("kafkaSpout");
            Config config = new Config();
            if (args.length > 0) {
                // 集群提交模式
                config.setDebug(false);
                StormSubmitter.submitTopology(args[0], config, builder.createTopology());
            } else {
                // 本地测试模式
                config.setDebug(true);
                // 设置2个进程
                config.setNumWorkers(2);
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("kafkaSpout", config, builder.createTopology());
            }
        }
    }
    
    • 启动该main方法
    • 在192.168.100.141服务器上执行以下命令:
    # 启动zookeeper
    cd /root/zookeeper-3.4.11
    ./zkServer.sh start
    # 启动kafka
    cd /root/kafka_2.11-2.2.1
    bin/kafka-server-start.sh config/server.properties
    # 创建topic
    bin/kafka-topics.sh --create --bootstrap-server 192.168.100.141:9092 --replication-factor 1 --partitions 1 --topic  kafka-storm
    # 启动storm的nimbus和supervisor
    cd /root/apache-storm-1.2.1
    ./storm nimbus
    ./storm supervisor
    # 启动kafka的生产者
    bin/kafka-console-producer.sh --broker-list  192.1100.141:9092  --topic kafka-storm
    
    • 测试
      (1)在192.168.100.141的生产者命令行中输入字符串,在idea或者eclipse的命令行中就能看到该字符串。
      (2)也可以把上述项目打包,假设该包名称为kafka-maven-1.0-SNAPSHOT.jar,上传到/root/apache-storm-1.2.1,以集群的方式启动,注意这个过程可能缺少jar包,需要上传到/root/apache-storm-1.2.1/lib。
    cd /root/apache-storm-1.2.1/bin
    ./storm jar  kafka-maven-1.0-SNAPSHOT.jar  MainTopology  wordcount
    

    在192.168.100.141的生产者命令行中输入字符串,在apache-storm-1.2.1的日志文件中也能看到输入的字符串。
    以上就是kafka与storm整合的过程。

    相关文章

      网友评论

          本文标题:04 kafka与storm的整合

          本文链接:https://www.haomeiwen.com/subject/bnzwfktx.html