0.需求
实时输出当天的搜索词排名,即实时呈现热点搜索词
1.数据源读取
对接Kafka数据源,将消息转成实体类(实体类属性主要是关键词和搜索时间)
public class KafkaUtil {
public static FlinkKafkaConsumerBase<String> text(String topic) throws IOException {
return text(topic, "kafka.properties");
}
/**
* @param topic 主题名称
* @param configPath Kafka属性配置文件路径
*/
public static FlinkKafkaConsumerBase<String> text(String topic,String configPath) throws IOException {
//1.加载Kafka属性
Properties prop=new Properties();
//Class.getClassLoader.getResourceAsStream 默认是从ClassPath根下获取,path不能以“/"开头
InputStream in=KafkaUtil.class.getClassLoader().getResourceAsStream(configPath);
prop.load(in);
//2.构造FlinkKafkaConsumer
FlinkKafkaConsumerBase<String> consumer=new FlinkKafkaConsumer011<>(topic,new SimpleStringSchema(),prop);
//todo 可以进行消费者的相关配置
// 本地debug不提交offset consumer.setCommitOffsetsOnCheckpoints(false);
return consumer;
}
}
env.addSource(KafkaUtil.text("topic"));
2.搜索词统计
在进行窗口内统计时,首先需要根据yyyy-MM-dd的维度对搜索词消息进行keyby操作,形成KeyedStream,进一步调用实现了抽象类KeyedProcessFunction的方法。
2.1 KeyedProcessFunction介绍
顾名思义,KeyedProcessFunction,是针对具有相同key的stream进行元素处理的方法。
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction
三个泛型分别是key的类型,流输入类型,流输出类型。
<1> 方法public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
处理流中的每一个元素,即每有一个元素进来,就会执行一次processElement方法。
该方法可以通过参数Collector<O> out来产生0个或者多个元素;
也可以通过Context ctx来更新state或者设置定时器timers。
ctx通过调用timerService()可以注册定时器。
Context的属性方法:
public abstract class Context {
/**
* Timestamp of the element currently being processed or timestamp of a firing timer.
*
* <p>This might be {@code null}, for example if the time characteristic of your program
* is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
*/
public abstract Long timestamp();
/**
* A {@link TimerService} for querying time and registering timers.
*/
public abstract TimerService timerService();
/**
* Emits a record to the side output identified by the {@link OutputTag}.
*
* @param outputTag the {@code OutputTag} that identifies the side output to emit to.
* @param value The record to emit.
*/
public abstract <X> void output(OutputTag<X> outputTag, X value);
/**
* Get key of the element being processed.
*/
public abstract K getCurrentKey();
}
对于timestamp()方法,需要注意的是注释上说明可能会返回为null,如果设置的时间类型是ProcessingTime。
<2>方法public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
当通过某个定时器timer通过TimerService设置后,会调用onTimer方法。
2.2 具体实现
因为需要实时统计当天的搜索词热度,所以在KeyedProcessFunction实现方法中,需要使用到状态。
private MapState<String, DashboardKeyword> keywordState;
Map的键就是搜索词,值就是输出实体类(两个属性分别是关键词和搜索次数)
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StateTtlConfig retainOneDay = StateTtlUtil.retainOneDay();
MapStateDescriptor<String, DashboardKeyword> keywordStateDescriptor = new MapStateDescriptor<>(
"keyword",
Types.STRING,
Types.POJO(DashboardKeyword.class)
);
keywordStateDescriptor.enableTimeToLive(retainOneDay);
this.keywordState = getRuntimeContext().getMapState(keywordStateDescriptor);
}
KeyedProcessFunction继承了AbstractRichFunction,因此可以使用到RuntimeContext。也进一步可以使用到状态,在open方法里面定义State并且设置State的存活时间为1天。
@Override
public void processElement(Search search, Context context, Collector<Tuple2<Integer, DashboardKeyword>> collector) throws Exception {
Result result = DicAnalysis.parse(search.getKeyword());
for (Term term: result.getTerms()) {
String key = term.getName();
if (key.trim().length() < 2) continue;
DashboardKeyword value;
if (this.keywordState.get(key) != null) {
value = this.keywordState.get(key);
} else {
value = new DashboardKeyword();
value.setKeyword(key);
value.setFrequency(0L);
}
value.setFrequency(value.getFrequency() + 1L);
this.keywordState.put(key, value);
}
long coalescedTime = ((System.currentTimeMillis() + 5000) / 5000) * 5000;
context.timerService().registerProcessingTimeTimer(coalescedTime);
}
processElement里面的代码就是很常见的套路,如果MapState中已经存在当前搜索词了,即获取对应的value,并将其频次增加;如果MapState中并没有当前的搜索词,则将该关键词及对应value添加到Map当中。
这里是给定时器添加5s的时长,这里的写法((System.currentTimeMillis() + 5000) / 5000) * 5000
是计时器合并的目的,Flink对于每个键和时间戳都只会维护一个计时器(计时器太多会影响性能),需要通过降低计时器的精度来合并计时器,从而减少计时器的数量。
假设现在是15s,那么定时器为20s,利用上面的写法,16s,17s,18s,19s的定时器都是20s。
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<Integer, DashboardKeyword>> collector) throws Exception {
java.util.stream.Collector<DashboardKeyword, ?, List<DashboardKeyword>> top50Collector = Comparators.greatest(
50,
Comparator.comparingLong(DashboardKeyword::getFrequency)
);
List<DashboardKeyword> top50 = Streams.stream(this.keywordState.values()).collect(top50Collector);
for (int rank = 0; rank < top50.size(); rank++) {
collector.collect(Tuple2.of(rank, top50.get(rank)));
}
}
统计前50搜索词。
3.数据存储
存储到redis中,
注意flink1.7版本之后,官方没有redis sink,可以去http://bahir.apache.org/(flink以及spark扩展库),
粘贴源码实现redis sink。
网友评论