美文网首页
2022-04-04-Flink-44(三)

2022-04-04-Flink-44(三)

作者: 冰菓_ | 来源:发表于2022-04-04 23:54 被阅读0次

    1. Environment

    getExecutionEnvironment

    表示当前执行程序的上下文

    createLocalEnvironment

    返回本地执行环境,需要在调用时指定默认的并行度。

    createRemoteEnvironment

    返回集群执行环境,将 Jar 提交到远程服务器。需要在调用时指定 JobManager的 IP 和端口号,并指定要在集群中运行的 Jar 包。

    执行模式

    2. Source

    从文件中读取
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class Source {
    
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(2);
            //从文件中读取
            DataStreamSource<String> textSource = env.readTextFile("src/main/resources/b.txt");
            textSource.print();
    
    
            env.execute();
        }
    }
    
    从集合中读取

    fromCollection 和 fromElements

    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.ArrayList;
    
    public class Source {
    
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(2);
    
    
            ArrayList<String> list = new ArrayList<>();
            list.add("小米");
            list.add("小蓝");
            list.add("小黄");
            env.fromCollection(list).print();
    
            env.fromElements(
                    new Event("小米","0001",100000L),
                    new Event("小米","0001",200000L)
            ).print();
            env.execute();
        }
    }
    
    Socket文本流
    public class Source {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(2);
    
            DataStreamSource<String> socketTextStream = env.socketTextStream("hadoop1", 7777);
            socketTextStream.print();
            env.execute();
        }
    }
    
    从KAFKA读取数据
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import java.util.Properties;
    
    public class kafkaSource {
            public static void main(String[] args) throws Exception {
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(2);
                Properties properties = new Properties();
                properties.setProperty("bootstrap.servers", "hadoop100:9092");
                properties.setProperty("group.id", "consumer_group");
                properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                properties.setProperty("auto.offset.reset", "latest");
                env.addSource(new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties)).print();
                env.execute();
            }
        }
    
    自定义Source

    实现SourceFunction并行度只能为1,要读取并行数据源要实现并行的抽象类,ParallelSourceFunction

    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import java.util.Random;
    
    public class addSource {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<Event> source = env.addSource(new mySource());
            source.print();
            env.execute();
        }
    }
    
     class mySource implements  SourceFunction<Event> {
        private Boolean lag = true;
    
        @Override
        public void run(SourceContext<Event> sourceContext) throws Exception {
            Random random = new Random();
    
            String[] users = new String[]{"1","2","3"};
            String[] urls = new String[]{"click","pjv","slide"};
            while (lag) {
                long millis = System.currentTimeMillis();
                String user = users[random.nextInt(users.length)];
                String url = urls[random.nextInt(urls.length)];
                sourceContext.collect(new Event(user,url,millis));
                Thread.sleep(1000);
            }
        }
    
        @Override
        public void cancel() {
            lag = false;
        }
    }
    
    
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
    import java.util.Random;
    
    public class addSource {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<Event> source = env.addSource(new myParallelSource()).setParallelism(3);
            source.print();
            env.execute();
        }
    }
    
    class myParallelSource implements ParallelSourceFunction<Event> {
        private Boolean lag = true;
    
        @Override
        public void run(SourceContext<Event> sourceContext) throws Exception {
            Random random = new Random();
    
            String[] users = new String[]{"1","2","3"};
            String[] urls = new String[]{"click","pjv","slide"};
            while (lag) {
                long millis = System.currentTimeMillis();
                String user = users[random.nextInt(users.length)];
                String url = urls[random.nextInt(urls.length)];
                sourceContext.collect(new Event(user,url,millis));
                Thread.sleep(1000);
            }
        }
    
        @Override
        public void cancel() {
            lag = false;
        }
    }
    
    
    Flink支持的数据类型
    1. 基本数据类型
    2. 数组类型(基本类型数组,对象数组)
    3. 复合数据类型
    4. 辅助类型:Option Either List Map
    5. 泛型类型

    Flink具备一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器,但是,由于Java中泛型擦除的存在,在某些特殊情况下,比如Lambda表达式中,会自动提取的信息是不够精细的,为了解决这类问题,JAVA API 提供了专门的类型提示(type hints)

    相关文章

      网友评论

          本文标题:2022-04-04-Flink-44(三)

          本文链接:https://www.haomeiwen.com/subject/zpxqsrtx.html