美文网首页
Flink自定义StreamOperator

Flink自定义StreamOperator

作者: Flink实战剖析 | 来源:发表于2019-12-13 19:46 被阅读0次

    在上一篇StreamOperator源码简析从源码角度分析了StreamOperator以及其实现类,此篇幅主要分析一下如何自定义一个StreamOperator。

    StreamOperator接口提供了其生命周期的抽象方法,例如初始化方法setup、open、initializeState,checkpoint相关方法prepareSnapshotPreBarrier、snapshotState,但是我们没有必要去自己一一实现这些方法,可以继承其抽象类AbstractStreamOperator,覆盖一些我们需要重写的方法。在上一篇分析中提到对于source端不需要接受上游数据,也就不需要实现OneInputStreamOperator或者TwoInputStreamOperator接口,如果我们需要接收上游数据就必须实现这两个接口中的一个,主要看一个输入还是两个输入来选择。
    案例:假设我们现在需要实现一个通用的定时、定量的输出的StreamOperator。
    实现步骤:

    1. 继承AbstractStreamOperator抽象类,实现OneInputStreamOperator接口

    2. 重写open方法,调用flink 提供的定时接口,并且注册定时器

    3. 重写initializeState/snapshotState方法,由于批量写需要做缓存,那么需要保证数据的一致性,将缓存数据存在状态中

    4. 重写processElement方法,将数据存在缓存中,达到一定大小然后输出

    5. 由于需要做定时调用,那么需要有一个定时调用的回调方法,那么定义的类需要实现ProcessingTimeCallback接口,并且实现其onProcessingTime方法(关于flink定时可以参考定时系列文章)

    代码:

    1. publicabstractclassCommonSinkOperator<T extendsSerializable>extendsAbstractStreamOperator<Object>

    2. implementsProcessingTimeCallback,OneInputStreamOperator<T,Object>{

    3. privateList<T> list;

    4. privateListState<T> listState;

    5. privateint batchSize;

    6. privatelong interval;

    7. privateProcessingTimeService processingTimeService;

    8. publicCommonSinkOperator(){

    9. }

    10. publicCommonSinkOperator(int batchSize,long interval){

    11. this.chainingStrategy =ChainingStrategy.ALWAYS;

    12. this.batchSize = batchSize;

    13. this.interval = interval;

    14. }

    15. @Overridepublicvoid open()throwsException{

    16. super.open();

    17. if(interval >0&& batchSize >1){

    18. //获取AbstractStreamOperator里面的ProcessingTimeService, 该对象用来做定时调用

    19. //注册定时器将当前对象作为回调对象,需要实现ProcessingTimeCallback接口

    20. processingTimeService = getProcessingTimeService();

    21. long now = processingTimeService.getCurrentProcessingTime();

    22. processingTimeService.registerTimer(now + interval,this);

    23. }

    24. }

    25. //状态恢复

    26. @Overridepublicvoid initializeState(StateInitializationContext context)throwsException{

    27. super.initializeState(context);

    28. this.list =newArrayList<T>();

    29. listState = context.getOperatorStateStore().getSerializableListState("batch-interval-sink");

    30. if(context.isRestored()){

    31. listState.get().forEach(x ->{

    32. list.add(x);

    33. });

    34. }

    35. }

    36. @Overridepublicvoid processElement(StreamRecord<T> element)throwsException{

    37. list.add(element.getValue());

    38. if(list.size()>= batchSize){

    39. saveRecords(list);

    40. }

    41. }

    42. //checkpoint

    43. @Overridepublicvoid snapshotState(StateSnapshotContext context)throwsException{

    44. super.snapshotState(context);

    45. if(list.size()>0){

    46. listState.clear();

    47. listState.addAll(list);

    48. }

    49. }

    50. //定时回调

    51. @Overridepublicvoid onProcessingTime(long timestamp)throwsException{

    52. if(list.size()>0){

    53. saveRecords(list);

    54. list.clear();

    55. }

    56. long now = processingTimeService.getCurrentProcessingTime();

    57. processingTimeService.registerTimer(now + interval,this);//再次注册

    58. }

    59. publicabstractvoid saveRecords(List<T> datas);

    60. }

    如何调用?直接使用dataStream.transform方式即可。

    整体来说这个demo相对来说是比较简单的,但是这里面涉及的定时、状态管理也是值得研究,比喻说在这里定时我们直接选择ProcessingTimeService,而没有选择InternalTimerService来完成定时注册,主要是由于InternalTimerService会做定时调用状态保存,在窗口操作中需要任务失败重启仍然可以触发定时,但是在我们案例中不需要,直接下次启动重新注册即可,因此选择了ProcessingTimeService。

    推荐阅读

    1. Flink中延时调用设计与实现

    2. Flink维表关联系列之Hbase维表关联:LRU策略

    3. 你应该了解的Watermark

    4. Flink exactly-once系列之事务性输出实现

    5. Flink时间系统系列之实例讲解:如何做定时输出

    6. Flink实战:全局TopN分析与实现

    7. Flink per-Job模式InfluxdbReporter上报JobName

    8. Flink SQL自定义聚合函数

    image

    关注回复Flink获取更多信息~

    image

    相关文章

      网友评论

          本文标题:Flink自定义StreamOperator

          本文链接:https://www.haomeiwen.com/subject/yfrqnctx.html