1. Environment
getExecutionEnvironment
表示当前执行程序的上下文
createLocalEnvironment
返回本地执行环境,需要在调用时指定默认的并行度。
createRemoteEnvironment
返回集群执行环境,将 Jar 提交到远程服务器。需要在调用时指定 JobManager的 IP 和端口号,并指定要在集群中运行的 Jar 包。
执行模式
2. Source
从文件中读取
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Source {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
//从文件中读取
DataStreamSource<String> textSource = env.readTextFile("src/main/resources/b.txt");
textSource.print();
env.execute();
}
}
从集合中读取
fromCollection 和 fromElements
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
public class Source {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
ArrayList<String> list = new ArrayList<>();
list.add("小米");
list.add("小蓝");
list.add("小黄");
env.fromCollection(list).print();
env.fromElements(
new Event("小米","0001",100000L),
new Event("小米","0001",200000L)
).print();
env.execute();
}
}
Socket文本流
public class Source {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStreamSource<String> socketTextStream = env.socketTextStream("hadoop1", 7777);
socketTextStream.print();
env.execute();
}
}
从KAFKA读取数据
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class kafkaSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop100:9092");
properties.setProperty("group.id", "consumer_group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
env.addSource(new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties)).print();
env.execute();
}
}
自定义Source
实现SourceFunction并行度只能为1,要读取并行数据源要实现并行的抽象类,ParallelSourceFunction
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
public class addSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Event> source = env.addSource(new mySource());
source.print();
env.execute();
}
}
class mySource implements SourceFunction<Event> {
private Boolean lag = true;
@Override
public void run(SourceContext<Event> sourceContext) throws Exception {
Random random = new Random();
String[] users = new String[]{"1","2","3"};
String[] urls = new String[]{"click","pjv","slide"};
while (lag) {
long millis = System.currentTimeMillis();
String user = users[random.nextInt(users.length)];
String url = urls[random.nextInt(urls.length)];
sourceContext.collect(new Event(user,url,millis));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
lag = false;
}
}
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import java.util.Random;
public class addSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Event> source = env.addSource(new myParallelSource()).setParallelism(3);
source.print();
env.execute();
}
}
class myParallelSource implements ParallelSourceFunction<Event> {
private Boolean lag = true;
@Override
public void run(SourceContext<Event> sourceContext) throws Exception {
Random random = new Random();
String[] users = new String[]{"1","2","3"};
String[] urls = new String[]{"click","pjv","slide"};
while (lag) {
long millis = System.currentTimeMillis();
String user = users[random.nextInt(users.length)];
String url = urls[random.nextInt(urls.length)];
sourceContext.collect(new Event(user,url,millis));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
lag = false;
}
}
Flink支持的数据类型
- 基本数据类型
- 数组类型(基本类型数组,对象数组)
- 复合数据类型
- 辅助类型:Option Either List Map
- 泛型类型
Flink具备一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器,但是,由于Java中泛型擦除的存在,在某些特殊情况下,比如Lambda表达式中,会自动提取的信息是不够精细的,为了解决这类问题,JAVA API 提供了专门的类型提示(type hints)
网友评论