美文网首页
如何在Storm编程实现与Kafka的集成

如何在Storm编程实现与Kafka的集成

作者: 大时代_f479 | 来源:发表于2017-11-21 22:46 被阅读0次

    问题导读

    1.如何编程实现Storm与Kafka集成?

    2.Storm中Topology如何实现的?

    3.如何验证集成效果?

    一、实现模型

    数据流程:

    1、Kafka Producter生成topic1主题的消息

    2、Storm中有个Topology,包含了KafkaSpout、SenqueceBolt、KafkaBolt三个组件。其中KafkaSpout订阅了topic1主题消息,然后发送

    给SenqueceBolt加工处理,最后数据由KafkaBolt生成topic2主题消息发送给Kafka

    3、Kafka Consumer负责消费topic2主题的消息

    二、Topology实现

    1、创建maven工程,配置pom.xml

    需要依赖storm-core、kafka_2.10、storm-kafka三个包

    org.apache.storm

    storm-core

    0.9.2-incubating

    provided

    org.apache.kafka

    kafka_2.10

    0.8.1.1

    org.apache.zookeeper

    zookeeper

    log4j

    log4j

    org.apache.storm

    storm-kafka

    0.9.2-incubating

    maven-assembly-plugin

    2.4

    jar-with-dependencies

    make-assembly

    package

    single

    复制代码

    2、KafkaSpout

    KafkaSpout是Storm中自带的Spout,源码在https://github.com/apache/incubator-storm/tree/master/external

    使用KafkaSpout时需要子集实现Scheme接口,它主要负责从消息流中解析出需要的数据

    public class MessageScheme implements Scheme {

    /* (non-Javadoc)

    * @see backtype.storm.spout.Scheme#deserialize(byte[])

    */

    public List deserialize(byte[] ser) {

    try {

    String msg = new String(ser, "UTF-8");

    return new Values(msg);

    } catch (UnsupportedEncodingException e) {

    }

    return null;

    }

    /* (non-Javadoc)

    * @see backtype.storm.spout.Scheme#getOutputFields()

    */

    public Fields getOutputFields() {

    // TODO Auto-generated method stub

    return new Fields("msg");

    }

    }

    复制代码

    3、SenqueceBolt

    SenqueceBolt实现很简单,在接收的spout的消息前面加上“I‘m”

    public class SenqueceBolt extends BaseBasicBolt{

    /* (non-Javadoc)

    * @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector)

    */

    public void execute(Tuple input, BasicOutputCollector collector) {

    // TODO Auto-generated method stub

    String word = (String) input.getValue(0);

    String out = "I'm " + word +  "!";

    System.out.println("out=" + out);

    collector.emit(new Values(out));

    }

    /* (non-Javadoc)

    * @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm.topology.OutputFieldsDeclarer)

    */

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    declarer.declare(new Fields("message"));

    }

    }

    复制代码

    4、KafkaBolt

    KafkaBolt是Storm中自带的Bolt,负责向Kafka发送主题消息

    5、Topology

    public class StormKafkaTopo {

    public static void main(String[] args) throws Exception {

    // 配置Zookeeper地址

    BrokerHosts brokerHosts = new ZkHosts("node04:2181,node05:2181,node06:2181");

    // 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字

    SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "topic1", "/zkkafkaspout" , "kafkaspout");

    // 配置KafkaBolt中的kafka.broker.properties

    Config conf = new Config();

    Map map = new HashMap();

    // 配置Kafka broker地址

    map.put("metadata.broker.list", "node04:9092");

    // serializer.class为消息的序列化类

    map.put("serializer.class", "kafka.serializer.StringEncoder");

    conf.put("kafka.broker.properties", map);

    // 配置KafkaBolt生成的topic

    conf.put("topic", "topic2");

    spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());

    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("spout", new KafkaSpout(spoutConfig));

    builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout");

    builder.setBolt("kafkabolt", new KafkaBolt()).shuffleGrouping("bolt");

    if (args != null && args.length > 0) {

    conf.setNumWorkers(3);

    StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

    } else {

    LocalCluster cluster = new LocalCluster();

    cluster.submitTopology("Topo", conf, builder.createTopology());

    Utils.sleep(100000);

    cluster.killTopology("Topo");

    cluster.shutdown();

    }

    }

    }

    复制代码

    三、测试验证

    1、使用Kafka client模拟Kafka Producter ,生成topic1主题

    bin/kafka-console-producer.sh --broker-list node04:9092 --topic topic1

    2、使用Kafka client模拟Kafka Consumer,订阅topic2主题

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic2 --from-beginning

    3、运行Strom Topology

    bin/storm jar storm-kafka-0.0.1-SNAPSHOT-jar-with-dependencies.jar  StormKafkaTopo KafkaStorm

    4、运行结果

    原创文章,转载请注明: 转载自http://www.cnblogs.com/tovin/p/3974417.html

    public class StormKafkaTopo {

    public static void main(String[] args) throws Exception {

    // 配置Zookeeper地址

    BrokerHosts brokerHosts = new ZkHosts("storm1:2181,storm2:2181,storm3:2181");

    // 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字

    SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "topic1", "/zkkafkaspout" , "kafkaspout");

    // 配置KafkaBolt中的kafka.broker.properties

    Config conf = new Config();

    Map map = new HashMap();

    // 配置Kafka broker地址

    map.put("metadata.broker.list", "storm3:9092");

    // serializer.class为消息的序列化类

    map.put("serializer.class", "kafka.serializer.StringEncoder");

    conf.put("kafka.broker.properties", map);

    // 配置KafkaBolt生成的topic

    conf.put("topic", "topic2");

    spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());

    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("spout", new KafkaSpout(spoutConfig));

    builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout");

    builder.setBolt("kafkabolt", new KafkaBolt()).shuffleGrouping("bolt");

    if (args != null && args.length > 0) {

    conf.setNumWorkers(3);

    StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

    } else {

    LocalCluster cluster = new LocalCluster();

    cluster.submitTopology("Topo", conf, builder.createTopology());

    Utils.sleep(100000);

    cluster.killTopology("Topo");

    cluster.shutdown();

    }

    }

    }

    相关文章

      网友评论

          本文标题:如何在Storm编程实现与Kafka的集成

          本文链接:https://www.haomeiwen.com/subject/pkugvxtx.html