美文网首页flink入门
flink学习之十-window&ProcessingT

flink学习之十-window&ProcessingT

作者: AlanKim | 来源:发表于2019-03-19 09:59 被阅读33次

    这里先使用Processing Time,使用window来处理,看下demo

    package myflink.job;
    
    import com.alibaba.fastjson.JSON;
    import myflink.model.UrlInfo;
    import org.apache.commons.codec.digest.DigestUtils;
    import org.apache.commons.lang3.time.DateFormatUtils;
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
    
    import java.util.Date;
    import java.util.Properties;
    
    public class WindowTest {
    
        public static void main(String[] args) throws Exception {
    
            // 从kafka中获取数据
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "localhost:9092");
            properties.put("zookeeper.connect", "localhost:2181");
            properties.put("group.id", "metric-group");
            properties.put("auto.offset.reset", "latest");
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            SingleOutputStreamOperator<UrlInfo> dataStreamSource = env.addSource(
                    new FlinkKafkaConsumer010<String>(
                            "testjin",// topic
                            new SimpleStringSchema(),
                            properties
                    )
            ).setParallelism(1)
                    // map操作,转换,从一个数据流转换成另一个数据流,这里是从string-->UrlInfo
                    .map(string -> {
                        UrlInfo urlInfo = JSON.parseObject(string, UrlInfo.class);
                        urlInfo.setDomain(urlInfo.generateDomain());
                        return urlInfo;
                    });
    
            // 根据domain做keyby
            KeyedStream<UrlInfo, String> keyedStream = dataStreamSource.keyBy(new KeySelector<UrlInfo, String>() {
                @Override
                public String getKey(UrlInfo urlInfo) throws Exception {
                    return urlInfo.getDomain();
                }
            });
    
            // 设置时间类型为Processing Time
            env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    
            // 使用timeWindow
            SingleOutputStreamOperator<UrlInfo> windowReduceStream = keyedStream.timeWindow(Time.seconds(30))
            .reduce((ReduceFunction<UrlInfo>) (t1, t2) -> {
                UrlInfo urlInfo = new UrlInfo();
    
                // domain都是同一个partition,所以都是同一个
                urlInfo.setDomain(t1.getDomain());
                urlInfo.setUrl(urlInfo.getDomain() + "/reduce/" + DateFormatUtils.format(new Date(),"yyyy-MM-dd'T'HH:mm:ss"));
                urlInfo.setHash(DigestUtils.md5Hex(urlInfo.getUrl()));
    
                urlInfo.setCount(t1.getCount() + 1);// 在reduce中做累加计数
    
                return urlInfo;
            }).returns(UrlInfo.class);
    
            windowReduceStream.addSink(new PrintSinkFunction<>());
    
            env.execute("execute window reduce info");
        }
    }
    
    

    可以看到,这里使用window,在window内,每隔30秒做一次reduce,统计窗口内总共的数据个数。

    由于用了window+reduce,这里30秒只有一个结果出来。

    运行后,看下结果:

    2> UrlInfo(id=0, url=so.com/reduce/2019-01-24T17:35:56, hash=e7b48416a083727b703df80008dfe4e8, domain=so.com, count=16)
    2> UrlInfo(id=0, url=baidu.com/reduce/2019-01-24T17:35:59, hash=e478c32f727bd95507a409d6c6b08146, domain=baidu.com, count=6)
    
    2> UrlInfo(id=0, url=baidu.com/reduce/2019-01-24T17:36:26, hash=b22e6462ab7f2a263eb7934fa0fe110f, domain=baidu.com, count=3)
    2> UrlInfo(id=0, url=so.com/reduce/2019-01-24T17:36:29, hash=7da591487d9c624ae7209b7c2028eec0, domain=so.com, count=5)
    
    2> UrlInfo(id=0, url=so.com/reduce/2019-01-24T17:36:59, hash=f2a7487a54a4fb193d5acbac00a0d539, domain=so.com, count=5)
    2> UrlInfo(id=0, url=baidu.com/reduce/2019-01-24T17:36:53, hash=ce326552180fe4e1465a90ac7baeb380, domain=baidu.com, count=3)
    

    相关文章

      网友评论

        本文标题:flink学习之十-window&ProcessingT

        本文链接:https://www.haomeiwen.com/subject/qvxouqtx.html