前言
春节期间,新的非典型性肺炎肆虐。开工后,团队有的远程办公,有的公司上班。公司很多对外工作只能延后。也趁这个时间,有时间商讨公司架构的未来走向。年前我写了些总结,这两天经过商讨,我们基本定下了平台的核心是一个olap系统。针对olap系统,我就找到了flink和druid。先研究flink吧。研究清楚了flink就可以了解现有的市面上是如何看待实时或者准实时的分布式流式计算的,在此基础之上再对公司需要的计算集合进行定义。于是就有了本次探索。
昨天翻了半天的flink官网。对flink有了基础的了解。flink既支持在线计算,也支持离线计算。而且基于kafka支持分布式计算。同时它还有个webui用来对其进行监控和管理。另外就是,它的计算进程似乎是独立的,默认是不包含springboot的,粗略的搜了下,集成似乎也是存在一定问题的。也是需要研究,然后再弄清楚是否需要集成,或者怎么集成。这大概就是本次探索的目的吧。
安装
安装时参考官网的。虽然英文也是读得懂,但是还是慢,google翻译不错。(https://ci.apache.org/projects/flink/flink-docs-release-1.9/getting-started/tutorials/local_setup.html
)
由于我机器上早就安装了open-jdk11,jdk的安装就跳过了。
加压,启动,然后,就可以访问webui了
tar xzvf flink-1.9.2-bin-scala_2.12.tgz
mv flink-1.9.2 /opt/flink
cd /opt/flink
./bin/start-cluster.sh
访问 http://192.168.137.12:8081 即可看到了。
本来想用supervisor配置开机启动及进程守护的,但是这个脚本执行了是后台启动,没搞定怎么弄。就算了。
demo
创建maven项目
创建项目真是一波三折呀。官网提供了一个创建的maven骨架,然后我发现用idea来创建卡在那里半个多小时也完不成,直接用命令执行,它会找我先要一个pom文件。网上搜了一大圈也没有用成功。然后我在官网又看见了给出的几个直接的maven依赖。。。然后我就放弃了骨架。依赖如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.9.2</version>
</dependency>
demo代码
package net.naturetribe.flink.demo;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// the port to connect to
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
return;
}
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("192.168.137.12", port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// Data type for words with count
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
这是经过我的本地调整后的官方的demo。全项目就这一个java文件。
将项目的jar包发到服务器上,然后执行命令:
/opt/flink/bin/flink run --class net.naturetribe.flink.demo.SocketWindowWordCount /home/jobs/flink-demo.jar --port 9000
需要注意的是要指定--class,早先我没有指定,报错说找不到运行的主函数入口。
这里会遇见一个坑,因为指定的9000端口是没有任何进程监听的。需要先安装nc命令并进行监听这个任务才能正常执行。
官方用了一个叫做nc的命令,查了下,是一个叫做netcat的软件包,先装上吧,然后再说怎么用的,官网也给了需要的命令。
dnf install nc//centos8把yum换成了dnf,这里换成yum也是一样的
nc -l 9000
这样就监听好了,启动任务,在这个进程里随意输入一些词,就可以进行词频统计了。
也可以通过web端上传jar包,设置启动参数进行调用,就不用去服务器写命令了。
结果嘛,需要在flink目录下的log目录里面的日志中看。只要你启动flink进程,就会出现一个模式为flink--taskexecutor-.out的日志文件,tail它,你就可以看到相应的词频显示结果了。
不过这个例子太初级了,后面我们构造一个实用一些的例子。
网友评论