#flink demo开发坏境搭建
flink 入门idea的demo,Flink 提供了比较方便的创建 Flink 工程的方法
1、windows坏境,打开git bash
curl https://flink.apache.org/q/quickstart.sh | bash -s 1.10.0
图片.png
2、batch demo
public class BatchJob {
public static void main(String[] args) throws Exception {
// set up the batch execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> text = env.fromElements(
"Flink flink flink ",
"spark spark spark",
"Spark Spark Spark");
AggregateOperator<Tuple2<String, Integer>> sum = text.flatMap(new LineSplitter())
.groupBy(0)
.sum(1);
sum.print();
//env.execute("Flink Batch Java API Skeleton");
/*
* Here, you can start creating your execution plan for Flink.
*
* Start with getting some data from the environment, like
* env.readTextFile(textPath);
*
* then, transform the resulting DataSet<String> using operations
* like
* .filter()
* .flatMap()
* .join()
* .coGroup()
*
* and many more.
* Have a look at the programming guide for the Java API:
*
* https://flink.apache.org/docs/latest/apis/batch/index.html
*
* and the examples
*
* https://flink.apache.org/docs/latest/apis/batch/examples.html
*
*/
// execute program
}
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>>{
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] split = s.toLowerCase().split("\\W+");
for (String tokoen: split){
collector.collect(new Tuple2<String, Integer>(tokoen,1));
}
}
};
}
3 table sql demo
public class SqlDemo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(env);
String word="hello to flink flink";
String[] split = word.split("\\W+");
ArrayList<WC> list = new ArrayList<>();
for(String wc:split){
list.add(new WC(wc,1L));
}
DataSource<WC> arrayListDataSource = env.fromCollection(list);
Table table = batchTableEnvironment.fromDataSet(arrayListDataSource,"word,num");
table.printSchema();
batchTableEnvironment.createTemporaryView("test",table);
Table table1 = batchTableEnvironment.sqlQuery("select word,sum(num) as num from test group by word");
DataSet<WC> wcDataSet = batchTableEnvironment.toDataSet(table1, WC.class);
wcDataSet.printToErr();
}
public static class WC {
public String word;
public Long num;
public WC() {
}
public WC(String word, Long num) {
this.word = word;
this.num = num;
}
@Override
public String toString() {
return "WC{" +
"word='" + word + '\'' +
", num=" + num +
'}';
}
}
}
pom文件依赖
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<!-- Example:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
-->
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
网友评论