Trident中提供了一个数据过滤的方式,实现类需要继承BaseFilter类并重写isKeep方法,对要处理的数据进行过滤处理。
上代码
对数据进行隔行过滤
/**
*
* @author mis
*
* Trident的作用就是简化了bolt
*/
public class TridentFunction {
public static class SumFunction extends BaseFunction {
public void execute(TridentTuple tuple, TridentCollector collector) {
System.out.println("传入进来的内容为:" + tuple);
//获取a、b两个域值
int a = tuple.getInteger(0);//第0个元素
int b = tuple.getInteger(1);//第1个元素
int sum = a + b ;
//发射数据
collector.emit(new Values(sum));
}
}
public static class Result extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
//获取tuple输入内容
Integer a = tuple.getIntegerByField("a");
Integer b = tuple.getIntegerByField("b");
Integer c = tuple.getIntegerByField("c");
Integer d = tuple.getIntegerByField("d");
Integer sum = tuple.getIntegerByField("sum");
System.out.println("a: " + a +", b: " + b + ", c: " + c + ", d: " +d + ", sum: " + sum);
}
}
public static StormTopology buildTopology() {
TridentTopology topology = new TridentTopology();
//设定数据源
@SuppressWarnings("unchecked")
FixedBatchSpout spout = new FixedBatchSpout(
new Fields("a","b","c","d"),//声明输入的域字段为"a","b","c","d"
4,//设置批处理大小
//设置数据源内容(此处测试数据)
new Values(1,4,7,10),
new Values(1,1,3,11),
new Values(2,2,7,1),
new Values(2,5,7,2));
//指定是否循环
spout.setCycle(false);
//指定输入员
Stream inputStream = topology.newStream("spout", spout);
/**
* 要实现流spout - bolt的模式 在trident里是使用each来做到的
* each方法参数:
* 1,输入数据源参数名称:"a","b","c","d"
* 2,需要流转执行的function对象(也就是bolt对象):new SumFunction()
* 3,指定function对象里的输出参数名称:sum
* */
inputStream.each(new Fields("a","b","c","d"), new SumFunction(),new Fields("sum"))
/**
* 继续使用each调用下一个function(bolt)
* 1,参数一为:"a","b","c","d","sum"
* 2,参数二为:new Result() 也就是执行函数,第三个参数为没有输出(即这是最后一个bolt)
* */
.each(new Fields("a","b","c","d","sum"),new Result(),new Fields());
return topology.build();//利用这种方式,我们返回一个StormTopology对象,进行提交
}
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setNumWorkers(2);
conf.setMaxSpoutPending(20);
if(args.length == 0){//本地模式运行
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("trident-function", conf, buildTopology());
Thread.sleep(10000);
cluster.shutdown();
}else{//集群模式运行
StormSubmitter.submitTopology(args[0], conf, buildTopology());
}
}
}
发送了四行数据,过滤后只接收到两行数据
2017-04-26_114105.png
网友评论