美文网首页
Storm HelloWorld

Storm HelloWorld

作者: Counting_S | 来源:发表于2016-12-23 09:29 被阅读167次

示例简介


一个简单的Storm的示例,拓扑结构由一个Spout和一个Bolt组成,Spout负责从Redis中读取经纬度点(Coordinate)的数据,并且在这些点中随机挑选一个放入tuple中发射给BoltBolt负责处理当前的点与上一个接收到的点之间的距离,并存储到Redis中的另一张表中。

Maven依赖


Demo使用的Storm版本为0.9.6,同时使用了Jedis来直接与Redis交互。在pom文件中添加如下依赖:

<dependency>
  <groupId>org.apache.storm</groupId>
  <artifactId>storm-core</artifactId>
  <version>0.9.6</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>redis.clients</groupId>
  <artifactId>jedis</artifactId>
  <version>2.8.1</version>
</dependency>

创建Spout文件


Spout创建时的核心方法是 public void nextTuple()方法,Storm在这里将信息发送到collectorStream中。

public void nextTuple() {
  Utils.sleep(100);
  Integer r = rand.nextInt(points.length);
  String coor = points[r];
  collector.emit(new Values(coor));
}

完整的代码如下:

package wech.storm_starter_demo;

import java.util.List;
import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import redis.clients.jedis.Jedis;

public class PointReader implements IRichSpout {

/**
*
*/
private static final long serialVersionUID = 1L;
private SpoutOutputCollector collector;
private Random rand;
private Jedis jedis;
private String[] points;
private boolean complete = false;
private static String POINTS_KEY = "points";

public boolean isDistributed() {
return false;
}

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
this.rand = new Random();
this.jedis = new Jedis();
List<String>list = jedis.lrange(POINTS_KEY, 0, -1);
this.points = list.toArray(new String[1]);
}

public void ack(Object object) {
System.out.println("OK" + object);
}

public void activate() {
// TODO Auto-generated method stub

}

public void close() {
// TODO Auto-generated method stub

}

public void deactivate() {
// TODO Auto-generated method stub

}

public void fail(Object object) {
System.out.println("Fail" + object);

}

public void nextTuple() {
Utils.sleep(100);
Integer r = rand.nextInt(points.length);
String coor = points[r];
collector.emit(new Values(coor));
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("point"));
}

public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}

}

创建Bolt


bolt主要负责接收stream中的tuple信息并且进行处理,然后选择传递到下一个bolt或者通知Spout处理完成(失败)。代码如下:

package wech.storm_starter_demo;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import redis.clients.jedis.Jedis;

public class PointHandle implements IRichBolt {
private OutputCollector collector;
private Jedis jedis;
private static String DISTANCE_KEY = "distancekey";
private String lastPoint;
private static Double R = 6378137.0;
public void cleanup() {
// TODO Auto-generated method stub

}

public Double[] splitPoint(String pointStr) {
String[] temp = pointStr.split(",");
Double[] fin = new Double[]{Double.parseDouble(temp[0]) , Double.parseDouble(temp[1])};
return fin;
}

public Double distance(Double lon1, Double lat1, Double lon2, Double lat2) {
Double a;
Double b;
lat1 = lat1 * Math.PI / 180.0;
lat2 = lat2 * Math.PI / 180.0;
a = lat1 - lat2;
b = (lon1 - lon2) * Math.PI / 180.0;
Double distance;
Double sa2,sb2;
sa2 = Math.sin(a / 2.0);
sb2 = Math.sin(b / 2.0);
distance = 2 * R * Math.asin(Math.sqrt(sa2 * sa2 + Math.cos(lat1)
* Math.cos(lat2) * sb2 * sb2));
return distance;

}

public void execute(Tuple input) {
String currentPoint = input.getString(0);
if (lastPoint.isEmpty()) {
lastPoint = currentPoint;
return;
} else {
Double[] lastCoor = this.splitPoint(lastPoint);
Double[] currentCoor = this.splitPoint(currentPoint);
Double distance = this.distance(lastCoor[0], lastCoor[1], currentCoor[0], currentCoor[1]);
String result = lastPoint + " -> " + currentPoint + " = " + distance;
System.out.println(result);
jedis.rpush(DISTANCE_KEY, result);
}

collector.ack(input);
}

public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
this.collector = collector;
jedis = new Jedis();
lastPoint = "";
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));

}

public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}

}

创建Topology


创建一个Topology结构,将SpoutBolt添加进去。代码如下:

package wech.storm_starter_demo;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class PointTopology {

public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new PointReader(),5);
builder.setBolt("bolt", new PointHandle(),8).shuffleGrouping("spout");
Config conf = new Config();
conf.setDebug(false);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);

StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {
conf.setMaxTaskParallelism(3);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("point-handle", conf, builder.createTopology());

Thread.sleep(10000);

cluster.shutdown();
};
}

}

相关文章

  • Storm HelloWorld

    示例简介 一个简单的Storm的示例,拓扑结构由一个Spout和一个Bolt组成,Spout负责从Redis中读取...

  • 无标题文章

    helloworld helloworld helloworld helloworld helloworld he...

  • 2019-01-05

    HelloWorld HelloWorld HelloWorld HelloWorld HelloWorld He...

  • Storm入门

    Storm 基本介绍 什么是 Storm 首先Storm是Apache顶级项目之一Storm 官网 Storm 是...

  • java大数据之storm

    一、Storm简介 1.1 Storm是什么 Apache Storm(http://storm.apache.o...

  • Apache Storm

    Apache Storm Apache Storm Use Cases Real Time Storm Proje...

  • Storm 性能优化

    目录 场景假设 调优步骤和方法 Storm 的部分特性 Storm 并行度 Storm 消息机制 Storm UI...

  • Storm(三) storm-starter

    原文链接storm-starter storm-starter就是Storm工程里边一个专门用来学习使用Storm...

  • CMake —— 获取环境变量

    $ENV{HELLOWORLD} 注意事项:set(HELLOWORLD $ENV{HELLOWORLD}) 这是...

  • storm 启动 uimbus ui supervisor卡死解

    storm uimbus执行以后,卡死,jps中有nimbus,如下图 storm ui 、storm super...

网友评论

      本文标题:Storm HelloWorld

      本文链接:https://www.haomeiwen.com/subject/zufkvttx.html