美文网首页
分布式流式计算-jstorm部署

分布式流式计算-jstorm部署

作者: 史圣杰 | 来源:发表于2019-01-10 21:01 被阅读0次

jstorm是阿里巴巴使用java语言重写的storm,可以用来做流式计算,我们使用jstorm从kafka中读取数据,然后汇总单词数量

1.wordcount编写

jstorm的官网较为详细的介绍了jstorm的各个组件,与Kafka集成时,只需要替换spout即可。

处理流程

在编写代码时,我们首先要引入jstorm的jar包和storm的kafka包

<dependency>
      <groupId>com.alibaba.jstorm</groupId>
      <artifactId>jstorm-core</artifactId>
      <version>${jstorm.version}</version>

      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-nop</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-jdk14</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>jcl-over-slf4j</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
        </exclusion>
      </exclusions>
    </dependency>


    <dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-kafka</artifactId>
      <version>0.9.6</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.9.0.1</version>
      <exclusions>
        <exclusion>
          <groupId>org.apache.zookeeper</groupId>
          <artifactId>zookeeper</artifactId>
        </exclusion>
        <exclusion>
          <groupId>log4j</groupId>
          <artifactId>log4j</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

1.1 Spout

Kafka的Spout需要指定Kafka的zookeeper地址和broker路径,topic的名称,将这些信息放在SpoutConfig中,然后创建KafkaSpout即可。

private static KafkaSpout kafkaSpout(){
        String brokerZkStr = "106.12.196.74:2181";
        String brokerZkPath = "/brokers";
        String topic = "jianshu-topic-new";
        String zkRoot = "";
        //id 可以随意命名
        String id = "jstormspout";

        ZkHosts zk = new ZkHosts(brokerZkStr,brokerZkPath);
        SpoutConfig spoutConf = new SpoutConfig(zk, topic, zkRoot, id);

        List<String> zkServices = new ArrayList<String>();

        for(String str : zk.brokerZkStr.split(",")){
            zkServices.add(str.split(":")[0]);
        }
        spoutConf.zkServers = zkServices;
        spoutConf.zkPort = 2181;
        spoutConf.forceFromStart = true;
        spoutConf.socketTimeoutMs = 60 * 1000;
        spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
        return new KafkaSpout(spoutConf);
    }

1.2 拆分Bolt

拆分的逻辑比较简单,根据空格拆分即可,然后分别发送每个word

public class SplitBolt extends BaseRichBolt {
    OutputCollector outputCollector;
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.outputCollector = outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0); // getStringByField("sentence");
        String rs[] = sentence.split("\\s+");
        for (String sen : rs){
            outputCollector.emit(new Values(sen));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word"));
    }
}

1.3 统计Bolt

使用一个map统计每个单词出现的次数

public class TotalCount extends BaseRichBolt {
    OutputCollector outputCollector;
    Map<String,Integer> map ;
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.outputCollector = outputCollector;
        this.map = new ConcurrentHashMap<String, Integer>() {
        };
    }

    @Override
    public void execute(Tuple tuple) {
        String  word = tuple.getStringByField("word");
        Object obj = map.get(word);
        int size = 0;
        if(obj != null){
            size = Integer.valueOf(obj.toString());
        }
        map.put(word , size+1);
        System.out.println(String.format("[%s] is %d",word,size+1));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("count"));
    }
}

1.4 构建拓扑

// spout
SpoutDeclarer spout = builder.setSpout(KafkaTopology.SEQUENCE_SPOUT_NAME,
             kafkaSpout(), spoutParal);
// bolt,使用shuffleGrouping
builder.setBolt(SPLIT_BOLT_NAME,new SplitBolt()).shuffleGrouping(SEQUENCE_SPOUT_NAME);
// bolt,使用fieldsGrouping,相同的word会被发送到同一个bolt
BoltDeclarer totalBolt = builder.setBolt(KafkaTopology.TOTAL_BOLT_NAME, new TotalCount(),
                boltParal).fieldsGrouping(KafkaTopology.SPLIT_BOLT_NAME , new Fields("word"));

相关文章

  • 分布式流式计算-jstorm部署

    jstorm是阿里巴巴使用java语言重写的storm,可以用来做流式计算,我们使用jstorm从kafka中读取...

  • JStorm和Storm比较

    1、What——JStorm是什么? 概述: JStorm 是一个分布式实时计算引擎,类似Hadoop MapRe...

  • JStorm学习笔记 - 基本概念

    JStorm 是一个分布式实时计算引擎。JStorm 是一个类似Hadoop MapReduce的系统, 用户按照...

  • 大数据学习技术指南

    一、大数据通用处理平台1、Spark2、Flink3、Hadoop 二、流式计算 1、Storm/JStorm 2...

  • 分布式流式计算-Kafka部署

    Kafka是一个高性能的流式消息队列,适用于大数据场景下的消息传输、消息处理和消息存储。在学习过程中,我们通常使用...

  • 分布式计算入门知识

    分布式计算基础课程:分布式计算入门 (课程针对大数据分步式计算中的相关技术进行讲解,核心讲解流式计算和内存计算技术...

  • Lightweight Asynchronous Snapsho

    摘要 分布式状态流式处理能够使得持久化计算能够大规模部署到云上进行执行,达到低延迟和高吞吐的目标。不过所面临的最大...

  • 【译】使用Apache Kafka构建流式数据平台(1)

    前言:前段时间接触过一个流式计算的任务,使用了阿里巴巴集团的JStorm,发现这个领域值得探索,就发现了这篇文章—...

  • 流式计算

    流式计算 kafka + spark streaming进行流式计算

  • 数据亲和架构--流式计算

    关于计算有很多名词,比如实时计算、分布式计算,以及这里提到流式计算等等。他们是从计算形势的不同维度来描述,不必争议...

网友评论

      本文标题:分布式流式计算-jstorm部署

      本文链接:https://www.haomeiwen.com/subject/jzwarqtx.html