美文网首页
Flink-7.Flink 广告点击黑名单

Flink-7.Flink 广告点击黑名单

作者: 笨鸡 | 来源:发表于2022-03-10 16:13 被阅读0次
    package com.ctgu.flink.project;
    
    import com.ctgu.flink.entity.BlackInfo;
    import org.apache.flink.api.common.functions.AggregateFunction;
    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Schema;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.OutputTag;
    
    
    public class Flink_Sql_AdClick {
        public static void main(String[] args) throws Exception {
    
            long start = System.currentTimeMillis();
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            EnvironmentSettings settings = EnvironmentSettings
                    .newInstance()
                    .inStreamingMode()
                    .build();
    
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
    
            String createSql =
                    "CREATE TABLE source " +
                            "    (" +
                            "    `userId` BIGINT," +
                            "    `adId` BIGINT," +
                            "    `province` STRING," +
                            "    `city` STRING," +
                            "    `ts` BIGINT" +
                            "    )" +
                            "    WITH (" +
                            "       'connector'='filesystem'," +
                            "       'format'='csv'," +
                            "       'csv.field-delimiter'=','," +
                            "       'path'='data/AdClickLog.csv'" +
                            "    )";
    
            tableEnv.executeSql(createSql);
    
            String targetSql = "select *, ts * 1000 as `timestamp` from source";
    
            Table targetTable = tableEnv.sqlQuery(targetSql);
    
            DataStream<Row> rowDataStream = tableEnv.toDataStream(targetTable);
    
            Table source =
                    tableEnv.fromDataStream(
                            rowDataStream,
                            Schema.newBuilder()
                                    .columnByExpression("time_ltz", "TO_TIMESTAMP_LTZ(`timestamp`, 3)")
                                    .watermark("time_ltz", "time_ltz - INTERVAL '5' SECOND")
                                    .build());
    
            DataStream<Row> dataStream = tableEnv.toDataStream(source);
    
            SingleOutputStreamOperator<Row> filterDataStream = dataStream
                    .filter(row -> row.getField("userId") != null && row.getField("adId") != null)
                    .keyBy(new KeySelector<Row, Tuple2<Long, Long>>() {
    
                @Override
                public Tuple2<Long, Long> getKey(Row row) throws Exception {
                    return new Tuple2<>((Long) row.getField("userId"), (Long) row.getField("adId"));
                }
            }).process(new MyProcessFunction(2L));
    
            DataStream<BlackInfo> blackList =
                    filterDataStream.getSideOutput(new OutputTag<BlackInfo>("blackList"){});
    
            blackList.print();
    
            filterDataStream
                    .keyBy(new KeySelector<Row, String>() {
    
                        @Override
                        public String getKey(Row row) throws Exception {
                            return (String) row.getField("province");
                        }
                    })
                    .window(TumblingEventTimeWindows.of(Time.hours(1)))
                    .aggregate(new AverageAggregate(), new MyWindowFunction()).print();
    
            env.execute("Table SQL");
    
            System.out.println("耗时: " + (System.currentTimeMillis() - start) / 1000);
        }
    
        private static class AverageAggregate
                implements AggregateFunction<Row, Long, Long> {
            @Override
            public Long createAccumulator() {
                return 0L;
            }
    
            @Override
            public Long add(Row row, Long aLong) {
                return aLong + 1;
            }
    
            @Override
            public Long getResult(Long aLong) {
                return aLong;
            }
    
            @Override
            public Long merge(Long a, Long b) {
                return a + b;
            }
        }
    
        private static class MyWindowFunction
                implements WindowFunction<Long, Tuple3<String, Long, Long>, String, TimeWindow> {
    
            @Override
            public void apply(String key,
                              TimeWindow timeWindow,
                              Iterable<Long> iterable,
                              Collector<Tuple3<String, Long, Long>> collector) throws Exception {
                collector.collect(new Tuple3<>(key, timeWindow.getEnd(), iterable.iterator().next()));
            }
        }
    
        private static class MyProcessFunction
                extends KeyedProcessFunction<Tuple2<Long, Long>, Row, Row> {
    
            private Long threshold;
    
            private ValueState<Long> clickCount;
    
            private ValueState<Boolean> isBlack;
    
            public MyProcessFunction(Long threshold) {
                this.threshold = threshold;
            }
    
            @Override
            public void open(Configuration parameters) throws Exception {
                clickCount = getRuntimeContext().getState(
                        new ValueStateDescriptor<>("count", Long.class, 0L));
                isBlack = getRuntimeContext().getState(
                        new ValueStateDescriptor<>("isBlack", Boolean.class, false));
            }
    
            @Override
            public void processElement(Row row, Context context, Collector<Row> collector) throws Exception {
                Long curCount = clickCount.value();
                if (curCount == 0) {
                    Long ts = (context.timerService().currentProcessingTime() / (24 * 60 * 60 * 1000) + 1) * (24 * 60 * 60 * 1000);
                    context.timerService().registerProcessingTimeTimer(ts);
                }
                if (!isBlack.value()) {
                    if (curCount >= threshold) {
                        isBlack.update(true);
                        context.output(new OutputTag<BlackInfo>("blackList"){},
                                new BlackInfo((Long) row.getField("userId"),
                                        (Long) row.getField("adId"),
                                        (String) row.getField("province")));
                    } else {
                        curCount += 1;
                        clickCount.update(curCount);
                        collector.collect(row);
                    }
                }
            }
    
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<Row> out) throws Exception {
                isBlack.clear();
                clickCount.clear();
            }
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:Flink-7.Flink 广告点击黑名单

          本文链接:https://www.haomeiwen.com/subject/bnofdrtx.html