美文网首页
KafkaConsumer RequestFuture异步流程控

KafkaConsumer RequestFuture异步流程控

作者: 不存在的里皮 | 来源:发表于2020-06-30 11:37 被阅读0次

org.apache.kafka.clients.consumer.internals包内的RequestFuture类可用来定义异步流程,常用的addListener、compose作用如图所示:

addListener

addListener可以使一个流程添加到RequestFuture后


compose

compose利用addListener,使其挂在RequestFuture<T>完成后的流程上。同时返回一个新创建的RequestFuture<S>。

  • 用户需要实现RequestFutureAdapter的接口onSucess/onFailure,进一步加工这个异步流程。当onSucess/onFailure被调用时,上游流程已经完成,那么在你实现的方法中:
    • 既可以选择完成RequestFuture<S>
    • 又可以在RequestFuture<S>前增添其它的异步流程。
    • 我们将在下文举例说明这两种用法。
    public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
        // 创建了一个RequestFuture<S>并在方法结束时返回,但并没有调用其complete或raise方法。
        final RequestFuture<S> adapted = new RequestFuture<>();
        addListener(new RequestFutureListener<T>() {
            @Override
            public void onSuccess(T value) {
                adapter.onSuccess(value, adapted);  // 在用户实现的onSuccess中,可以完成adapted,也可以为它添加前置流程
            }

            @Override
            public void onFailure(RuntimeException e) {
                adapter.onFailure(e, adapted);
            }
        });
        return adapted;
    }

compose的效果如下图所示,加了listener2是出于严谨考虑,因为compose调用了addListener方法。



什么叫"完成RequestFuture<S>"? 比如下面的实现,在onSuccess中可以调用future.complete



什么叫"也可以在RequestFuture<S>前增添其它的异步流程"?这是第二种用法。
我们先看CoordinatorResponseHandler,onSuccess会调用handle接口。


再看它的一个实现类JoinGroupResponseHandler,调用onJoinLeader新创建了一个RequestFuture,并调用chain,将handle方法参数中的future接在了新建RequestFuture的流程后面。这样,我们就为future添加了前置流程

以上两种用法图示就相当于下图:


相关文章

网友评论

      本文标题:KafkaConsumer RequestFuture异步流程控

      本文链接:https://www.haomeiwen.com/subject/lywtqktx.html