美文网首页
Flink入门技术分享之三(英文讲义)

Flink入门技术分享之三(英文讲义)

作者: LittleMagic | 来源:发表于2020-07-17 21:52 被阅读0次

今天用半小时为小伙伴们简单分享了Flink Streaming中窗口的一些基础扩展用法(增量聚合、触发器和双流join),将Markdown版讲义贴在下面。


Introducing Apache Flink - Part 3

Extended Usage of DataStream Windowing


Section A - Revision

Time Characteristics

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // EventTime / IngestionTime

Event Time & Watermarking

  • A watermark containing timestamp T declares that all data with event time t <= T have arrived
DataStream<OrderDoneLogRecord> watermarkedStream = recordStream
  .assignTimestampsAndWatermarks(  // AssignerWith[Periodic / Punctuated]Watermarks
    // This provides a certain tolerance interval for out-of-ordering
    new BoundedOutOfOrdernessTimestampExtractor<OrderDoneLogRecord>(Time.seconds(10)) {
      @Override
      public long extractTimestamp(OrderDoneLogRecord element) {
        return element.getTs();
      }
    }
  );

Windowing Basics

  • Windows split the unbounded stream into bounded 'buckets' of finite size, over which users can apply computations
  • 3 types [Tumbling / Sliding / Session] with 2 time characteristics [Processing time / Event time]
  • Keyed windows are more common in real-world applications
KeyedStream<OrderDoneLogRecord, Tuple> siteKeyedStream = watermarkedStream
  .keyBy("siteId", "siteName");
WindowedStream<OrderDoneLogRecord, Tuple, TimeWindow> siteWindowedStream = siteKeyedStream
  .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)));

Section B - Window Aggregation

  • Aggregation is the most generalized use case for keyed stream windowing
  • (e.g. Calculate buyer count & GMV grouping by sites)
  • Core API: WindowedStream.aggregate(AggregateFunction, WindowFunction)

AggregateFunction

  • 3 type parameters:
    • IN (input data)
    • ACC (accumulator)
    • OUT (output result)
  • 4 methods to implement:
    • createAccumulator()
    • add() (adds an input record to the accumulator instance)
    • getResult()
    • merge() (merges two accumulator instances into one)
  • Accumulator & result instances are both simple POJOs
@Getter
@Setter
public class BuyerAndGmvAccumulator {
  private Set<Long> buyerIds;
  private long gmv;

  public BuyerAndGmvAccumulator() {
    buyerIds = new HashSet<>();
    gmv = 0;
  }

  public void addGmv(long gmv) { this.gmv += gmv; }

  public void addBuyerId(long buyerId) { this.buyerIds.add(buyerId); }

  public void addBuyerIds(Collection<Long> buyerIds) { this.buyerIds.addAll(buyerIds); }
}

@Getter
@Setter
@NoArgsConstructor
@ToString
public class BuyerAndGmvResult {
  private long siteId;
  private String siteName;
  private long buyerCount;
  private long gmv;
  private long windowStartTs;
  private long windowEndTs;
}
  • Let's fill in some blanks...
private static class BuyerAndGmvAggregateFunc
  implements AggregateFunction<OrderDoneLogRecord, BuyerAndGmvAccumulator, BuyerAndGmvResult> {
  @Override
  public BuyerAndGmvAccumulator createAccumulator() {
    return new BuyerAndGmvAccumulator();
  }

  @Override
  public BuyerAndGmvAccumulator add(OrderDoneLogRecord record, BuyerAndGmvAccumulator acc) {
    acc.addBuyerId(record.getUserId());
    acc.addGmv(record.getQuantity() * record.getMerchandisePrice());
    return acc;
  }

  @Override
  public BuyerAndGmvResult getResult(BuyerAndGmvAccumulator acc) {
    BuyerAndGmvResult result = new BuyerAndGmvResult();
    result.setBuyerCount(acc.getBuyerIds().size());
    result.setGmv(acc.getGmv());
    return result;
  }

  @Override
  public BuyerAndGmvAccumulator merge(BuyerAndGmvAccumulator acc1, BuyerAndGmvAccumulator acc2) {
    acc1.addBuyerIds(acc2.getBuyerIds());
    acc1.addGmv(acc2.getGmv());
    return acc1;
  }
}

WindowFunction

  • The result of AggregateFunction doesn't seem to have any 'metadata' about the window
  • Calls for a WindowFunction, which needs 4 type parameters:
    • IN (input data)
    • OUT (output result)
    • KEY (type of key, depending on the KeySelector, mostly it is a Tuple)
    • W (type of window, mostly it is a TimeWindow)
  • Only 1 method apply() to implement
private static class BuyerAndGmvResultWindowFunc
  implements WindowFunction<BuyerAndGmvResult, BuyerAndGmvResult, Tuple, TimeWindow> {
  @Override
  public void apply(
    Tuple keys,
    TimeWindow window,
    Iterable<BuyerAndGmvResult> agg,
    Collector<BuyerAndGmvResult> out
  ) throws Exception {
    // Fetch the result produced by AggregateFunction above
    BuyerAndGmvResult result = agg.iterator().next();
    // Explicit conversions here
    result.setSiteId(((Tuple2<Long, String>) keys).f0);
    result.setSiteName(((Tuple2<Long, String>) keys).f1);
    // Get window borders
    result.setWindowStartTs(window.getStart() / 1000);
    result.setWindowEndTs(window.getEnd() / 1000);
    // Emit the 'true' result
    out.collect(result);
  }
}

Do Aggregation

DataStream<BuyerAndGmvResult> gmvResultStream = siteWindowedStream
  .aggregate(new BuyerAndGmvAggregateFunc(), new BuyerAndGmvResultWindowFunc());
  • Records flow into AggregateFunction and are computed incrementally, thus keeping the result instance only
  • If we use WindowFunction alone, all records will be kept in memory until evaluation
  • When the window fires, the result of AggregateFunction is provided to WindowFunction
  • The result stream can be keyed or processed (e.g. with a ProcessFunction) afterwards
private static class GmvTopProcessFunc
  extends KeyedProcessFunction<Tuple, BuyerAndGmvResult, String> {
  private final int topN;
  private PriorityQueue<BuyerAndGmvResult> minHeap;

  public GmvTopProcessFunc(int topN) {
    this.topN = topN;
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    minHeap = new PriorityQueue<>(topN, Comparator.comparingLong(BuyerAndGmvResult::getGmv));
  }

  @Override
  public void close() throws Exception {
    minHeap.clear();
    super.close();
  }

  @Override
  public void processElement(BuyerAndGmvResult value, Context ctx, Collector<String> out) throws Exception {
    if (minHeap.size() < topN) {
      minHeap.offer(value);
    } else if (minHeap.peek().getGmv() >= value.getGmv()) {
      minHeap.poll();
      minHeap.offer(value);
    }
    ctx.timerService().registerEventTimeTimer(value.getWindowEndTs() + 1);
  }

  @Override
  public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
    List<BuyerAndGmvResult> ranking = new ArrayList<>();
    for (int k = 0; k < topN && !minHeap.isEmpty(); k++) {
      ranking.add(minHeap.poll());
    }
    Collections.reverse(ranking);

    StringBuilder output = new StringBuilder();
    output.append("-----------------\n");
    for (BuyerAndGmvResult result : ranking) {
      output.append(result.toString() + "\n");
    }
    output.append("-----------------\n");
    out.collect(output.toString());
  }
}
  • ProcessFunction involves function lifecycle, state & timers, thus won't be further discussed in this part

Section C - Window Trigger

Revisit Window Life Cycle

  • By default, a window is evaluated when timestamp/watermark passes the end
  • Trigger enables early-fire mechanism for (especially long) windows
  • Call trigger() method on WindowedStream, built-in & customization available

Built-in Triggers

  • ContinuousProcessingTimeTrigger/ContinuousEventTimeTrigger

    • A trigger that continuously fires based on a given time interval (according to timestamps/watermarks)
    • siteIdWindowedStream.trigger(ContinuousEventTimeTrigger.of(Time.seconds(3)))
  • CountTrigger

    • A trigger that fires once the count of elements in a window pane reaches a given limit
    • siteIdWindowedStream.trigger(CountTrigger.of(100))
  • DeltaTrigger

    • A trigger that fires based on a DeltaFunction and a threshold
    • The DeltaFunction calculates an offset between the data point which triggered last and the currently arrived data point...
    • ...and will trigger if the offset is higher than the threshold, e.g.
siteIdWindowedStream.trigger(DeltaTrigger.of(
  100.0,   // Order ID offset threshold of 100
  (oldPoint, newPoint) -> newPoint.getOrderId() - oldPoint.getOrderId(),
  TypeInformation.of(OrderDoneLogRecord.class).createSerializer(env.getConfig())
));

Customize Trigger

  • TBD =。=
  • Please refer to the official documentation for details

Section D - Window Joining

  • Join operation exists in batching as well as streaming
  • Windowing converts infinite data set to multiple blocks of finite data sets for joining
  • Only equi-joins are available

Inner Join

  • With tumbling window
  • With sliding window
  • Using join() API with JoinFunction
clickRecordStream
  .join(orderRecordStream)
  .where(record -> record.getMerchandiseId())    // key from left stream
  .equalTo(record -> record.getMerchandiseId())  // key from right stream
  .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  .apply(new JoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {
    @Override
    public String join(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord) throws Exception {
      return StringUtils.join(Arrays.asList(
        accessRecord.getMerchandiseId(),
        orderRecord.getPrice(),
        orderRecord.getCouponMoney(),
        orderRecord.getRebateAmount()
      ), '\t');
    }
  });

Interval Inner Join

  • The two streams may fall out of step regarding to event time
  • Interval inner join allows relative time association
  • i.e. right.timestamp ∈ [left.timestamp + lowerBound, left.timestamp + upperBound]
  • No need for explicit windowing, but using intervalJoin() API and ProcessJoinFunction
clickRecordStream
  .keyBy(record -> record.getMerchandiseId())
  .intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId()))
  .between(Time.seconds(-5), Time.seconds(15))    // lower & upper bounds
  .process(new ProcessJoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {
    @Override
    public void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector<String> collector) throws Exception {
      collector.collect(StringUtils.join(Arrays.asList(
        accessRecord.getMerchandiseId(),
        orderRecord.getPrice(),
        orderRecord.getCouponMoney(),
        orderRecord.getRebateAmount()
      ), '\t'));
    }
  });

Left/Right Outer Join

  • No native implementations, using coGroup() as an alternative
  • Co-groups two data streams on a given key and a common window
  • Illustrating left outer join logic as below
clickRecordStream
  .coGroup(orderRecordStream)
  .where(record -> record.getMerchandiseId())    // key from left stream
  .equalTo(record -> record.getMerchandiseId())  // key from right stream
  .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
  .apply(new CoGroupFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, Tuple2<String, Long>>() {
    @Override
    public void coGroup(Iterable<AnalyticsAccessLogRecord> accessRecords, Iterable<OrderDoneLogRecord> orderRecords, Collector<Tuple2<String, Long>> collector) throws Exception {
      for (AnalyticsAccessLogRecord accessRecord : accessRecords) {
        boolean isMatched = false;
        for (OrderDoneLogRecord orderRecord : orderRecords) {
          collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), orderRecord.getPrice()));
          isMatched = true;
        }
        if (!isMatched) {
          collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), null));
        }
      }
    }
  });
  • Naive nested-loop join --- Iterating through both streams & emitting equi-records

THE END

  • To be followed: State & fault tolerance
    • Keyed state & operator state
    • Usage of managed states
    • Checkpointing & state backends
    • Checkpointing internals: Chandy-Lamport algorithm, ABS (Asynchronous Barrier Snapshotting) mechanism

相关文章

网友评论

      本文标题:Flink入门技术分享之三(英文讲义)

      本文链接:https://www.haomeiwen.com/subject/nzgwhktx.html