美文网首页程序员Kafka文字欲
无镜--kafka之消费者(三)

无镜--kafka之消费者(三)

作者: 绍圣 | 来源:发表于2018-09-14 11:15 被阅读5次

    消费者轮询通过拉取器(Fetcher)发送拉取请求,拉取器会调用消费者网络客户端的发送方法(send)和网络轮询方法(poll)。在拉取器的层面拉取请求是没有真正发送到服务端的。发送方法只是把请求存在到变量中,真正发送到服务端是调用了消费者网络客户端对象的网络轮询方法(poll)。

    消费者网络客户端:ConsumerNetworkClient,对NetworkClient的一层封装。

    消费者网络客户端发送方法

    // 发送请求,只是把请求暂时存放在unsent变量中

    private RequestFuture<ClientResponse> send(Node node, ApiKeys api, short version, AbstractRequest request) {

    long now = time.milliseconds();

    RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();

    RequestHeader header = client.nextRequestHeader(api, version);

    RequestSend send = new RequestSend(node.idString(),

    header, request.toStruct());

    put(node, new ClientRequest(now, true, send, completionHandler)); // 没有真正发送 client.wakeup();

    return completionHandler.future;

    }

    private void put(Node node, ClientRequest request) {

    synchronized (this) {

    List<ClientRequest> nodeUnsent = unsent.get(node);

    if (nodeUnsent == null) {

    nodeUnsent = new ArrayList<>();

    unsent.put(node, nodeUnsent);

    }

    nodeUnsent.add(request);

    }

    }

    保存到变量unsent(Map<Node,List<ClientRequest>>)中的对象是ClientRequest。

    ClientRequest相关类图:

    请求对象相关的类

    消费者网络客户端异步发送请求涉及的相关类说明:

    1,异步请求完成处理器(RequestFutureCompletionHandler)

    2,异步请求(RequestFuture):客户端调用发送请求的返回值。当异步请求完成时,可以获取异步请求的结果。

    3,异步请求监听器(RequestFutureListener):异步请求添加监听器,当异步请求有结果时,调用监听器的onSuccess方法。

    以拉取器发送拉取请求为例,看看监听器的使用方式:

    1,消费者客户端调用ConsumerNetworkClient发送拉取请求返回异步请求对象(RequestFuture)。同时也创建了异步请求完成处理器(RequestFutureCompletionHandler),RequestFutureCompletionHandler实例持有RequestFuture实例,返回其实也是返回的RequestFutureCompletionHandler持有的RequestFuture实例。

    2,在返回的异步请求对象上添加异步请求监听器,监听器会处理拉取的到结果。

    3,客户端轮询,在收到拉取请求结果后,调用回调处理器的onComplete方法(RequestFutureCompletionHandler.onComplete(ClientResponse))。

    4,触发监听器onSuccess方法(RequestFutureListener.onSuccess(ClientResponse))

    调用发送方法(send)返回的是一个异步请求对象:RequestFuture。RequestFuture封装了请求相关的信息,表示异步请求的结果。发送方法中还会创建一个异步请求的完成处理器(RequestFutureCompletionHandler:当请求被服务器处理完成,并返回响应结果给客户端,客户端会根据响应结果执行具体逻辑)

    网络客户端轮询到拉取请求结果后的处理流程图:

    网络客户端轮询到拉取请求结果后的处理流程

    网络客户端轮询到拉取请求结果后的处理时序图:

    网络客户端轮询到拉取请求结果后的处理时序图

    组合加适配器模式(compose+adapter)

    异步请求(RequestFuture)为客户端提供了调用自定义业务处理逻辑的的入口,除了添加监听器的方式(类似拉取请求),还可以使用组合加适配器模式:监听器+适配器(对客户端响应结果做进行解析成客户想要的数据格式)。所以一句话概括组合加适配器:监听器+转换响应结果成客户想要的数据格式。组合表示组装一个监听器,适配器表示对客户端响应结果进行适配。

    普通模式

    伪代码(发送拉取请求):为异步请求添加监听器,当请求完成时,会调用监听器的回调方法会对响应进行处理。

    client.send(fetchTarget, ApiKeys.FETCH, request)

    .addListener(new RequestFutureListener() { // 监听器

    public void onSuccess(ClientResponse resp) {

    }

    }

    组合加适配器模式

    伪代码(发送列举偏移量请求给分区的主节点):适配器中对响应结果进行适配。在组合方法(RequestFuture.compose())中,创建一个新的异步请求对象S,在旧的异步请求对象T(调用compose方法的异步请求对象)添加监听器,并传递新的异步请求对象S到适配器的onSuccess方法中。在适配器的onSuccess方法中进行消息的转换。

    client.send(node, ApiKeys.LIST_OFFSETS, request)

    .compose(new RequestFutureAdapter>() { // 适配器

    public void onSuccess(ClientResponse response, RequestFuture> future) {

    future.complete(); // 新建的异步请求的complete方法 触发调用新建的异步请求的监听器的onSuccess方法

    }

    });

    public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {

    // 在异步请求T里,新建了一个异步请求S

    final RequestFuture<S> adapted = new RequestFuture<>();

    addListener(new RequestFutureListener<T>() { // 为T旧的异步请求添加监听器

    // 客户端轮询到结果时,会调用监听器的回调方法

    public void onSuccess(T value) {

    adapter.onSuccess(value, adapted); // 调用适配器的回调方法 进行消息转换

    }

    public void onFailure(RuntimeException e) {

    adapter.onFailure(e, adapted);

    }

    });

    return adapted; // 返回新建的异步请求对象S

    }

    组合+适配器模式的调用流程(列举偏移量请求)

    组合+适配器模式的调用流程(列举偏移量请求)

    普通模式和组合加适配器模式的区别:

    普通模式使用监听器,并将异步请求的结果穿给监听器的回调方法。组合模式的监听器,对异步请求的结果在适配器中做一次转换。

    ConsumerNetworkClient.polll(RequestFuture)和ConsumerNetworkClient.poll(timeout)

    ConsumerNetworkClient.polll(RequestFuture):必须等到异步请求完成才会结束,在轮询结束后可以获取异步请求的结果。

    ConsumerNetworkClient.poll(timeout):不管异步请求有没有完成,都会在给定的超时时间内返回。这时在轮询完成后获取异步请求的结果不一定有结果。

    异步请求的链式模式

    将另一个异步请求链接起来。

    链式调用和组合模式的区别

    1,组合模式和链接模式都会为当前异步请求添加一个监听器。

    2,组合模式会创建一个新的异步请求,链接模式则传入一个已有的异步请求。

    3,组合模式返回新异步请求,链接模式返回当前异步请求,不是返回传入的异步请求。

    // 链路模式 @param future 已有的异步请求

    public void chain(final RequestFuture future) {

    addListener(new RequestFutureListener() {

    public void onSuccess(T value) { // value是当前异步请求的结果

    // 用当前异步请求的结果作为传入的异步请求的的结果

    future.complete(value); // 调用异步请求的完成方法

    }

    public void onFailure(RuntimeException e) { future.raise(e);

    }

    }); // 没有返回值,所以调用方还是使用的当前的异步请求对象,而不是传入的的异步请求

    }

    组合模式会调用新异步请求的complete方法。链式模式会调用传入已有异步请求的complete方法。

    组合模式和链式模式的异步请求调用流程区别

    组合模式:在当前异步请求完成时,调用当前异步请求的监听器回调,在监听器回调中将新异步对象传给适配器的回调方法。在适配器的回调方法中会调用新异步请求的complete方法,完成新异步请求。即组合模式返回的异步请求(新的)

    链式模式:在当前异步请求完成时,调用当前异步请求的监听器回调,在监听器回调中调用传入的异步请求的complete方法,完成传入的异步请求。没有返回值。

    链式模式运用

    消费者加入消费组就是运用了监听器 + 组合模式 + 链式模式,保证业务逻辑的准确和异步请求的调用顺序。

    消费者加入消费组:加入消费组必须完成同步组,加入消费组请求必须比同步组请求先发送,同步组的异步请求完成后加入组的异步请求才能完成。

    消费者加入消费组步骤:

    1,客户端发送加入组请求JoinGroup,采用组合模式返回加入组的异步请求。

    2,在加入组的适配器处理中,发送同步组请求,采用组合模式返回同步组的异步请求。

    3,将同步组的异步请求使用链式模式链接上加入组的异步请求,为同步组的异步请求添加一个监听器。

    4,当同步组请求收到客户端响应结果,完成同步组的异步请求。

    5,调用同步组异步请求的监听器回调方法,完成加入组的异步请求。

    6,加入组请求完成,获取加入组异步请求的结果。

    伪代码:

    第一段:

    RequestFuture joinFuture = sendJoinGroupRequest();

    client.poll(joinFuture);

    ByteBuffer byteBuffer = future.value(); // 第六步:获取JoinGroup异步请求的结果

    第二段:

    // 采用组合模式发送加入组请求

    private RequestFuture sendJoinGroupRequest() {

    return client.send(coordinator, ApiKeys.JOIN_GROUP, request) .compose(new JoinGroupResponseHandler()); // 第一步:发送JoinGroup请求

    }

    第三段:

    private class JoinGroupResponseHandler extends RequestFutureAdapter {

    public void onSuccess(ClientResponse clientResponse, RequestFuture future) { // future:JoinGroup的异步请求

    SyncGroupRequest syncGroupRequest = new SyncGroupRequest();

    RequestFuture syncFuture = sendSyncGroupRequest(syncGroupRequest); // 发送同步组请求

    syncFuture.chain(joinFuture); // 第三步:将JoinGroup异步请求链接到SyncGroup异步请求

    }

    }

    第四段:

    // 采用组合模式发送同步组请求

    private RequestFuture sendSyncGroupRequest(SyncGroupRequest request) {

    return client.send(coordinator, ApiKeys.SYNC_GROUP, request) .compose(new SyncGroupResponseHandler()); // 第二步:发送SyncGroup请求

    }

    第五段:

    private class SyncGroupResponseHandler extends RequestFutureAdapter {

    public void onSuccess(ClientResponse clientResponse, RequestFuture future) { // future:SyncGroup的异步请求

    future.complete(future) // 第四步:完成SyncGroup的异步请求

    }

    }

    第六段:

    public void chain(final RequestFuture future) {

    addListener(new RequestFutureListener() {

    public void onSuccess(T value) {

    future.complete(value); // 第五步:完成JoinGroup异步请求

    }

    以上重点分析了异步请求的相关调用流程,后面将重点分析消费者网络客户端的轮询。

    参考资料:

    Kafka技术内幕:图文详解Kafka源码设计与实现

    相关文章

      网友评论

        本文标题:无镜--kafka之消费者(三)

        本文链接:https://www.haomeiwen.com/subject/yhrawftx.html