/**
* 实现简单的本地求和功能
*/
public class LocalSumTopology {
/**
* Spout组件
* 产生数据并且发送
*/
public static class SumSpout extends BaseRichSpout{
private SpoutOutputCollector collector;
/**
* 初始化操作
* @param conf 初始化配置项
* @param context 上下文
* @param collector 数据发射器
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector=collector;
}
int number=0;
/**
* 发射数据
* 该方法是一个死循环
*/
@Override
public void nextTuple() {
//Values类实现了ArrayList
collector.emit(new Values(++number));
System.out.println("Spout number: "+number);
//防止数据产生太快,睡眠一秒
Utils.sleep(1000);
}
/**
* 定义输出端字段
* @param declarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//与上面的number变量对应
declarer.declare(new Fields("num"));
}
}
/**
* Bolt组件
* 实现业务的逻辑处理,这里求和
*/
public static class SumBolt extends BaseRichBolt{
/**
* 因为 这里接收数据之后不需要再发送给下一个Bolt,因此再初始化collector发射器
* @param stormConf
* @param context
* @param collector
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
int sum=0;
/**
* 执行业务逻辑的处理
* 该方法也是一个死循环
* @param input
*/
@Override
public void execute(Tuple input) {
//可以通过字段名或者下标索引获取
Integer value=input.getIntegerByField("num");
sum+=value;
System.out.println("Bolt sum: "+sum);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
public static void main(String[] args) {
//使用TopologyBuilder设置Spout和Bolt,并且将其关联 在一起
//创建Topology
TopologyBuilder builder=new TopologyBuilder();
builder.setSpout("SumSpout",new SumSpout());
builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("SumSpout");
//使用本地模式
LocalCluster localCluster=new LocalCluster();
localCluster.submitTopology("LocalSumTopology",new Config(),builder.createTopology());
}
}
提交到集群运行
public static void main (String[] args){
//TopologyBuilder根据spout和bolt来构建Topology
//storm中任何一个作业都是通过Topology方式进行提交的
//Topology中需要指定spout和bolt的执行顺序
TopologyBuilder tb = new TopologyBuilder();
tb.setSpout("SumSpout", new SumSpout());
//SumBolt以随机分组的方式从DataSourceSpout中接收数据
tb.setBolt("SumBolt", new SumBolt()).shuffleGrouping("SumSpout");
//代码提交到storm集群上运行
try {
StormSubmitter.submitTopology("ClusterSumStormTopology", new Config(), tb.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
}
运行命令
./storm jar /root/untitled5.jar com.neusoft.LocalSumTopology
查看输出
image.png
网友评论