美文网首页
Flink(7) 自定义数据源

Flink(7) 自定义数据源

作者: hk_faith | 来源:发表于2021-03-14 14:38 被阅读0次

    简介

    只要实现 SourceFunction 接口对应的方法就可以自定义数据源
    1.创建环境

     public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<SensorReading> streamSource = env.addSource(new MySensorSource());
    
            streamSource.print();
    
            env.execute();
    
        }
    

    2.实现 SourceFunction 接口

     public static class MySensorSource implements SourceFunction<SensorReading> {
    
            //定义一个标识位用来控制数据
            private boolean running = true;
            //定义一个随机数发生器
            Random random = new Random();
    
            public void run(SourceContext<SensorReading> ctx) throws Exception {
    
                //设置10个传感器的初识温度
                HashMap<String, Double> sensorTempMap = new HashMap<String, Double>();
                for (int i = 0; i < 10; i++) {
                    sensorTempMap.put("sensor_" + (i + 1), 60 + random.nextGaussian() * 20);
                }
    
                while (running) {
    
                    for(String sensorId : sensorTempMap.keySet()){
                        //在当前的温度基础上随机波动
                        Double newtemp = sensorTempMap.get(sensorId) + random.nextGaussian();
    
                        sensorTempMap.put(sensorId,newtemp);
    
                        ctx.collect(new SensorReading(sensorId,System.currentTimeMillis(),newtemp));
                    }
                    //控制输出频率
                    Thread.sleep(1000L);
                }
    
            }
    
            public void cancel() {
                running = false;
            }
        }
    

    相关文章

      网友评论

          本文标题:Flink(7) 自定义数据源

          本文链接:https://www.haomeiwen.com/subject/wpvqcltx.html