40Storm

作者: 文茶君 | 来源:发表于2020-02-24 22:25 被阅读0次

Storm框架介绍
流式处理框架
storm是个实时的,分布以及具备高容错的计算系统

  • storm进程常驻内存
  • storm数据不经过磁盘,在内存中处理

Twitter开源的分布式实时大数据处理框架,最早开源于github

架构

  • Nimbus主节点
  • Supervisor从节点
  • Worker 进程

编程模型

  • DAG(Topology)
  • Spout
  • Bolt

高可靠性

  • 异常处理
  • 消息可靠性保障机制(ACK)

流式处理

  • 流式处理(异步与同步)
    客户端提交数据进行结算,并不会等待数据计算结果
  • 逐条处理
    例:ETL(数据清洗)extracted transform load
  • 统计分析
    例:计算PV,UV,访问热点以及某些数据的聚合,加和,平均等
  • 客户端提交数据之后,计算完成结果存储到Redis,hbase,MySQL或者其他MQ中
    客户端并不关心最终结果是多少

实时请求

  • 实时请求应答服务(同步)
    客户端提交数据请求之后,立刻取得计算结果并返回客户端
  • Drpc
  • 实时请求处理
    例:图片特征提取

storm:进程,线程常驻内存运行,数据不进入磁盘,数据通过网络传递

storm:纯流式处理

  • 专门为流式处理设计
  • 数据传输模式更为简单,很多地方也更为高效
  • 并不是不能做批处理,他也可以用来做微批处理,来提高吞吐

Spark Streaming:微批处理

  • 将RDD做的很小来用小的批处理来接近流式处理
  • 基于内存和DAG可以把处理任务做的很快

Topology – DAG有向无环图的实现

  • 对于Storm实时计算逻辑的封装
  • 即,由一系列通过数据流相互关联的Spout、Bolt所组成的拓扑结构
  • 生命周期:此拓扑只要启动就会一直在集群中运行,直到手动将其kill,否则不会终止
    (区别于MapReduce当中的Job,MR当中的Job在计算执行完成就会终止)

Tuple – 元组
Stream中最小数据组成单元

Stream – 数据流

  • 从Spout中源源不断传递数据给Bolt、以及上一个Bolt传递数据给下一个Bolt,所形成的这些数据通道即叫做Stream
  • Stream声明时需给其指定一个Id(默认为Default)
    实际开发场景中,多使用单一数据流,此时不需要单独指定StreamId

Spout – 数据源

  • 拓扑中数据流的来源。一般会从指定外部的数据源读取元组(Tuple)发送到拓扑(Topology)中
  • 一个Spout可以发送多个数据流(Stream)
    可先通过OutputFieldsDeclarer中的declare方法声明定义的不同数据流,发送数据时通过-
    SpoutOutputCollector中的emit方法指定数据流Id(streamId)参数将数据发送出去
  • Spout中最核心的方法是nextTuple,该方法会被Storm线程不断调用、主动从数据源拉取数据,再通过emit方法将数据生成元组(Tuple)发送给之后的Bolt计算

Bolt – 数据流处理组件

  • 拓扑中数据处理均有Bolt完成。对于简单的任务或者数据流转换,单个Bolt可以简单实现;更加复杂场景往往需要多个Bolt分多个步骤完成
  • 一个Bolt可以发送多个数据流(Stream)
    可先通过OutputFieldsDeclarer中的declare方法声明定义的不同数据流,发送数据时通过SpoutOutputCollector中的emit方法指定数据流Id(streamId)参数将数据发送出去
  • Bolt中最核心的方法是execute方法,该方法负责接收到一个元组(Tuple)数据、真正实现核心的业务逻辑

Stream Grouping – 数据流分组(即数据分发策略)

  1. Shuffle Grouping
    随机分组,随机派发stream里面的tuple,保证每个bolt task接收到的tuple数目大致相同。
    轮询,平均分配

  2. Fields Grouping
    按字段分组,比如,按"user-id"这个字段来分组,那么具有同样"user-id"的 tuple 会被分到相同的Bolt里的一个task, 而不同的"user-id"则可能会被分配到不同的task。

  3. All Grouping
    广播发送,对于每一个tuple,所有的bolts都会收到

  4. Global Grouping
    全局分组,把tuple分配给task id最低的task 。

  5. None Grouping
    不分组,这个分组的意思是说stream不关心到底怎样分组。目前这种分组和Shuffle grouping是一样的效果。 有一点不同的是storm会把使用none grouping的这个bolt放到这个bolt的订阅者同一个线程里面去执行(未来Storm如果可能的话会这样设计)。

  6. Direct Grouping
    指向型分组, 这是一种比较特别的分组方法,用这种分组意味着消息(tuple)的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为 Direct Stream 的消息流可以声明这种分组方法。而且这种消息tuple必须使用 emitDirect 方法来发射。消息处理者可以通过 TopologyContext 来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)

  7. Local or shuffle grouping
    本地或随机分组。如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中,tuple将会被随机发送给这些同进程中的tasks。否则,和普通的Shuffle Grouping行为一致

customGrouping
自定义,相当于mapreduce那里自己去实现一个partition一样。

接下来代码讲grouping
为了方便就在代码里注释写上



track.log

public class MySpout implements IRichSpout {

    private static final long serialVersionUID = 1L;

    FileInputStream fis;
    InputStreamReader isr;
    BufferedReader br;//输入流,转换流,缓冲流

    SpoutOutputCollector collector = null;
    String str = null;

    @Override
    public void nextTuple() {
        try {
            while ((str = this.br.readLine()) != null) {
                // 过滤动作
                collector.emit(new Values(str, str.split("\t")[1]));//相当于发送了两个,一个是当前行,一个是当前行的第二个元素,即id
            }
        } catch (Exception e) {
        }

    }

    @Override
    public void close() {
        try {
            br.close();
            isr.close();
            fis.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        try {
            this.collector = collector;
            this.fis = new FileInputStream("track.log");//文件放在项目下所以没盘符
            this.isr = new InputStreamReader(fis, "UTF-8");
            this.br = new BufferedReader(isr);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("log", "session_id"));
    }//声明几个发送结果,要匹配

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    @Override
    public void ack(Object msgId) {
        System.out.println("spout ack:" + msgId.toString());
    }

    @Override
    public void activate() {
    }

    @Override
    public void deactivate() {
    }

    @Override
    public void fail(Object msgId) {
        System.out.println("spout fail:" + msgId.toString());
    }

}

mybolt接收上游数据

public class MyBolt implements IRichBolt {

    private static final long serialVersionUID = 1L;

    OutputCollector collector = null;
    int num = 0;
    String valueString = null;

    @Override
    public void cleanup() {

    }

    @Override
    public void execute(Tuple input) {
        try {
            valueString = input.getStringByField("log");//获取值

            if (valueString != null) {
                num++;
                System.err.println(input.getSourceStreamId() + " " + Thread.currentThread().getName() + "--id="
                        + Thread.currentThread().getId() + "   lines  :" + num + "   session_id:"
                        + valueString.split("\t")[1]);//打印流的id,当前线程名称id
            }
            collector.ack(input);
            // Thread.sleep(2000);
        } catch (Exception e) {
            collector.fail(input);
            e.printStackTrace();
        }

    }

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(""));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

}

public class Main {

    /**
     * @param args
     */
    public static void main(String[] args) {

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout", new MySpout(), 1);

        // shuffleGrouping其实就是随机往下游去发,不自觉的做到了负载均衡
//      builder.setBolt("bolt", new MyBolt(), 2).shuffleGrouping("spout");

        // fieldsGrouping其实就是MapReduce里面理解的Shuffle,根据fields求hash来取模
//      builder.setBolt("bolt", new MyBolt(), 2).fieldsGrouping("spout", new Fields("session_id"));

        // 只往一个里面发,往taskId小的那个里面去发送
//      builder.setBolt("bolt", new MyBolt(), 2).globalGrouping("spout");

        // 等于shuffleGrouping
//      builder.setBolt("bolt", new MyBolt(), 2).noneGrouping("spout");

        // 广播
        builder.setBolt("bolt", new MyBolt(), 2).allGrouping("spout");

        // Map conf = new HashMap();
        // conf.put(Config.TOPOLOGY_WORKERS, 4);
        Config conf = new Config();
        conf.setDebug(false);
        conf.setMessageTimeoutSecs(30);

        if (args.length > 0) {
            try {
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            }
        } else {
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("mytopology", conf, builder.createTopology());//不传参数就以本地方式执行
        }

    }

}


storm架构



storm任务提交流程


图片2.png
Storm 本地目录树
图片3.png
Storm Zookeeper目录树 图片4.png

相关文章

  • 40Storm

    Storm框架介绍流式处理框架storm是个实时的,分布以及具备高容错的计算系统 storm进程常驻内存 stor...

网友评论

      本文标题:40Storm

      本文链接:https://www.haomeiwen.com/subject/wjtjqhtx.html