美文网首页
Storm应用系列之——集成Kafka

Storm应用系列之——集成Kafka

作者: 大时代_f479 | 来源:发表于2017-11-21 22:51 被阅读0次

    问题导读:

    Kafka集群中的Broker地址,有哪两种方法指定?

    TransactionalTridentKafkaSpout的作用是什么?

    本地模式无法保存Offset该如何解决?

    前言Storm的Spout应该是源源不断的取数据,不能间断。那么,很显然,消息队列系统、分布式内存系统或内存数据库是作为其数据源的很好的选择。本文就如何集成Kafka进行介绍。

    Kafka的基本介绍:什么是Kafka

    准备工作

    KafkaSpout其实网上已经有人写了,在github上开源了,不用我们自己造轮子。只是要注意版本问题:

    0.7版本的Kafka,对应KafkaSpout可以使用Storm-contrib下面的例子

    源码:https://github.com/nathanmarz/st ... /master/storm-kafka

    Maven依赖:https://clojars.org/storm/storm-kafka

    0.8版本的Kafka在API上和底层Offset的处理方式上发生了重大变化,所以老的KafkaSpout不再适用,必须使用新的KafkaAPI

    源码:https://github.com/wurstmeister/storm-kafka-0.8-plus

    这里因为0.8版本的Kafka必然是将来主流,所以我就不介绍0.7 的了,使用方式基本上是类似的。

    PS:

    是人写的,就会有bug,何况是别人分享出来的。所以,遇到bug,还请去github上提交一个issue告诉作者修正。

    2014/7/29 更新:

    wurstmeister/storm-kafka-0.8-plus 现在合并到Apache Storm了,在其external/storm-kakfa目录

    Maven依赖直接更新成:

    org.apache.storm

    storm-kafka

    0.9.2-incubating

    复制代码

    但是storm似乎没有直接把external的包加载到classpath,所以使用时,还得手动把该jar包从external/storm-kafka/下拷到storm的lib目录。

    当然,也可以在maven中加上compile,直接把该jar打到你项目一起。

    使用KafkaSpout一个KafkaSpout只能去处理一个topic的内容,所以,它要求初始化时提供如下与topic相关信息:

    Kafka集群中的Broker地址 (IP+Port)

    有两种方法指定:

    1. 使用静态地址,即直接给定Kafka集群中所有Broker信息

    GlobalPartitionInformation info = new GlobalPartitionInformation();

    info.addPartition(0, new Broker("10.1.110.24",9092));

    info.addPartition(0, new Broker("10.1.110.21",9092));

    BrokerHosts brokerHosts = new StaticHosts(info);

    复制代码

    2. 从Zookeeper动态读取

    BrokerHosts brokerHosts = new ZkHosts("10.1.110.24:2181,10.1.110.22:2181");

    复制代码

    推荐使用这种方法,因为Kafka的Broker可能会动态的增减

    topic名字

    当前spout的唯一标识Id (以下代称$spout_id)

    zookeeper上用于存储当前处理到哪个Offset了 (以下代称$zk_root)

    当前topic中数据如何解码

    了解Kafka的应该知道,Kafka中当前处理到哪的Offset是由客户端自己管理的。所以,后面两个的目的,其实是在zookeeper上建立一个 $zk_root/$spout_id 的节点,其值是一个map,存放了当前Spout处理的Offset的信息。

    在Topology中加入Spout的代码:

    String topic = "test";

    String zkRoot = "kafkastorm";

    String spoutId = "myKafka";

    SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId);

    spoutConfig.scheme = new SchemeAsMultiScheme(new TestMessageScheme());

    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("spout", new KafkaSpout(spoutConfig), spoutNum);

    复制代码

    其中TestMessageScheme就是告诉KafkaSpout如何去解码数据,生成Storm内部传递数据

    public class TestMessageScheme implements Scheme {

    private static final Logger LOGGER = LoggerFactory.getLogger(TestMessageScheme.class);

    @Override

    public List deserialize(byte[] bytes) {

    try {

    String msg = new String(bytes, "UTF-8");

    return new Values(msg);

    } catch (InvalidProtocolBufferException e) {

    LOGGER.error("Cannot parse the provided message!");

    }

    //TODO: what happend if returns null?

    return null;

    }

    @Override

    public Fields getOutputFields() {

    return new Fields("msg");

    }

    }

    复制代码

    这个解码方式是与Producer端生成时塞入数据的编码方式配套的。这里我Producer端塞入的是String的byte,所以这里也还原成String,定义输出为一个名叫"msg"的field。

    后面就可以自己添加Bolt处理tuple中该field的数据了。

    使用TransactionalTridentKafkaSpoutTransactionalTridentKafkaSpout是为事务性的Trident而用的。用法与KafkaSpout有所不同。

    TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, topic, spoutId);

    kafkaConfig.scheme = new SchemeAsMultiScheme(new TestMessageScheme());

    TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);

    TridentTopology topology = new TridentTopology();

    topology.newStream("test_str", kafkaSpout).shuffle().each(new Fields("msg", new PrintFunction());

    复制代码

    看到它并没有要求我们提供zkRoot,因为直接代码里面写死了…… -_-T

    地址是 /transactional//,在上面的例子中,就是  /transactional/test_str/myKafaka

    常见问题1. 本地模式无法保存Offset

    KafkaSpout初始化时,会去取spoutConfig.zkServers 和 spoutConfig.zkPort 变量的值,而该值默认是没塞的,所以是空,那么它就会去取当前运行的Storm所配置的zookeeper地址和端口,而本地运行的Storm,是一个临时的zookeeper实例,并不会真正持久化。所以,每次关闭后,数据就没了。

    本地模式,要显示的去配置

    spoutConfig.zkServers = new ArrayList(){{

    add("10.1.110.20");

    add("10.1.110.21");

    add("10.1.110.24");

    }};

    spoutConfig.zkPort = 2181;

    复制代码

    net.wurstmeister.storm

    storm-kafka-0.8-plus

    0.2.0

    org.slf4j

    slf4j-simple

    复制代码

    相关文章

      网友评论

          本文标题:Storm应用系列之——集成Kafka

          本文链接:https://www.haomeiwen.com/subject/sbugvxtx.html