美文网首页
storm Trident编程的分组策略

storm Trident编程的分组策略

作者: 先生_吕 | 来源:发表于2017-04-26 11:49 被阅读38次

    Trident编程中的=数据分组策略演示

    代码

    public class StrategyTopology {
        
        public static class WriteFunction extends BaseFunction {
            private static final Log log = LogFactory.getLog(WriteBolt.class);
            @Override
            public void execute(TridentTuple tuple, TridentCollector collector) {
                // 获取上一个组件所声明的Filed
                String text = tuple.getStringByField("sub");
                //打印结果
                System.out.println(text);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        
        public static StormTopology buildTopology(){
            TridentTopology topology = new TridentTopology();
            //设定数据源
            @SuppressWarnings("unchecked")
            FixedBatchSpout spout = new FixedBatchSpout(
                    new Fields("sub"), //声明输出的域字段为“sub”
                    4,                  //设置逼出来大小为4
                    //设置数据源内容
                    new Values("java"),
                    new Values("python"),
                    new Values("php"),
                    new Values("c++"),
                    new Values("ruby")
                    );
            //指定是否循环
            spout.setCycle(true);
            //指定输入源spout
            Stream inputStream = topology.newStream("spout", spout);
            /**
             * 要实现sqout - bolt的模式 在trident里使用each来完成
             * each方法参数:
             *      1,输入源参数名称
             *      2,需要流转执行的function对象(就是bolt):new WriteFunction(),此function要求自己编写类
             *      3,指定function对象里的输出参数名称,没有则不在继续流向
             * */
            inputStream.
            //随机分组:shuffle
            shuffle().
            //分区分组:partitionBy
            //partitionBy(new Fields("sub")).
            //全局分组:global
            //global().
            //广播分组:broadcast
            //broadcast().
            each(new Fields("sub"), new WriteFunction(),new Fields()).parallelismHint(4);//parallelismHint设置并行度
            return topology.build();
        }
    
        
        public static void main(String[] args) throws Exception {
            Config conf = new Config();
            conf.setNumWorkers(2);
            conf.setMaxSpoutPending(20);
            if(args.length == 0){
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("trident-filter", conf, buildTopology());
                Thread.sleep(10000);
                cluster.shutdown();
            }else{
                StormSubmitter.submitTopology(args[0], conf, buildTopology());
            }
        }
    }
    
    

    相关文章

      网友评论

          本文标题:storm Trident编程的分组策略

          本文链接:https://www.haomeiwen.com/subject/nzrizttx.html