美文网首页
Zookeeper(四)-客户端-消息处理流程

Zookeeper(四)-客户端-消息处理流程

作者: 进击的蚂蚁zzzliu | 来源:发表于2020-12-23 15:11 被阅读0次

概述

本节分析下客户端消息的处理,重点关注Watcher及DataCallback;本节以getData的同步和异步方法为例进行分析;


RPC方法流程.png

处理流程

客户端请求处理流程.png
1.示例代码
public class DemoTest implements Watcher, AsyncCallback.DataCallback {
    private static Stat stat = new Stat();

    public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
        DemoTest demoTest = new DemoTest();
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 3000000, demoTest);
        // 同步
        byte[] bytes = zooKeeper.getData("/test1", demoTest, stat);
        // 异步
        zooKeeper.getData("/test1", demoTest, demoTest, "异步回调需要传递的数据");
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println("接收到watch通知:" + event);
    }

    @Override
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
        System.out.println("结果回调:" + path + "----" + ctx);
    }
}
  • 实现Watcher接口,重写process方法用于watch事件回调;
  • 实现AsyncCallback接口,重写processResult方法用于异步结果回调;

2.queuePacket构造packet入队outgoingQueue

Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
        Record response, AsyncCallback cb, String clientPath,
        String serverPath, Object ctx, WatchRegistration watchRegistration){
    Packet packet = null;
    synchronized (outgoingQueue) {
        packet = new Packet(h, r, request, response, watchRegistration);
        // AsyncCallback默认为空
        packet.cb = cb;
        packet.ctx = ctx;
        packet.clientPath = clientPath;
        packet.serverPath = serverPath;
        if (!state.isAlive() || closing) {
            conLossPacket(packet);
        } else {
            // If the client is asking to close the session then mark as closing
            if (h.getType() == OpCode.closeSession) {
                closing = true;
            }
            // 数据包入队 需要发送的队列
            outgoingQueue.add(packet);
        }
    }
    // selector.wakeup();
    sendThread.getClientCnxnSocket().wakeupCnxn();
    return packet;
}
  • new Packet构造Packet(包含Request/Response/Watcher等);
  • outgoingQueue.add(packet)入队outgoingQueue,SendThread.run中进行处理;
  • wakeupCnxn()唤醒多路复用器立即进行select;

3.同步处理流程阻塞

public ReplyHeader submitRequest(RequestHeader h, Record request,
        Record response, WatchRegistration watchRegistration)
        throws InterruptedException {
    ReplyHeader r = new ReplyHeader();
    Packet packet = queuePacket(h, r, request, response, null, null, null,
                null, watchRegistration);
    synchronized (packet) {
        // packet响应后置为true,否则 wait
        while (!packet.finished) {
            packet.wait();
        }
    }
    return r;
}
  • packet.wait()阻塞等待packet响应后进行notify,然后同步返回;

4.异步处理流程结束

  • 异步流程同步结束,等待服务端响应后回调processResult;

5.doTransport

  • doTransport先write请求包再read响应包,具体流程跟启动流程类似,参考上一节的分析;

6.readResponse处理响应

void readResponse(ByteBuffer incomingBuffer) throws IOException {
    ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
    ReplyHeader replyHdr = new ReplyHeader();
    // 反序列化响应头
    replyHdr.deserialize(bbia, "header");
    
    ......

    Packet packet;
    synchronized (pendingQueue) {
        if (pendingQueue.size() == 0) {
            throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
        }
        packet = pendingQueue.remove();
    }
    /*
     * Since requests are processed in order, we better get a response to the first request!
     * 由于请求是按顺序处理的,因此响应也要按顺序处理
     */
    try {
        // 比对响应xid 跟 原请求xid是否相等,保证顺序性
        if (packet.requestHeader.getXid() != replyHdr.getXid()) {
            packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
            throw new IOException("Xid out of order. Got Xid "
                    + replyHdr.getXid() + " with err " +
                    + replyHdr.getErr() +
                    " expected Xid "
                    + packet.requestHeader.getXid()
                    + " for a packet with details: "
                    + packet );
        }
        // 设置响应头
        packet.replyHeader.setXid(replyHdr.getXid());
        packet.replyHeader.setErr(replyHdr.getErr());
        packet.replyHeader.setZxid(replyHdr.getZxid());
        if (replyHdr.getZxid() > 0) {
            lastZxid = replyHdr.getZxid();
        }
        // 反序列化响应体
        if (packet.response != null && replyHdr.getErr() == 0) {
            packet.response.deserialize(bbia, "response");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Reading reply sessionid:0x" + Long.toHexString(sessionId) + ", packet:: " + packet);
        }
    } finally {
        // packet响应反序列化后,处理watch注册
        finishPacket(packet);
    }
}
  • replyHdr.deserialize反序列化ReplyHeader;
  • pendingQueue.remove()从pendingQueue中移除Packet;
  • packet.requestHeader.getXid() != replyHdr.getXid()比对响应xid 跟 原请求xid是否相等,不相等抛出异常,保证请求处理的顺序性;
  • packet.response.deserialize反序列化响应体;

7.finishPacket

private void finishPacket(Packet p) {
    // submitRequest时的wcb,watcher不为空时p.watchRegistration != null
    if (p.watchRegistration != null) {
        // 注册watcher
        p.watchRegistration.register(p.replyHeader.getErr());
    }

    // 异步回调AsyncCallback为空,需要同步返回
    if (p.cb == null) {
        synchronized (p) {
            // finished置为true,
            p.finished = true;
            // 唤醒submitRequest时线程
            p.notifyAll();
        }
    } else {
        // 异步返回,通过eventThread线程处理
        p.finished = true;
        // 加入阻塞队列waitingEvents (LinkedBlockingQueue)
        eventThread.queuePacket(p);
    }
}
  • p.watchRegistration.register注册watcher,分别放到ZKWatchManager的三个Map中,用于在服务端触发相应事件时回调自定义的process方法;
private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
  • p.notifyAll()p.cb == null即异步回调AsyncCallback为空,唤醒submitRequest时线程,同步返回;
  • eventThread.queuePacket(p)加入阻塞队列waitingEvents (LinkedBlockingQueue线程安全,不需要synchronized),触发EventThread.run方法,异步返回;

8.processEvent处理事件

private void processEvent(Object event) {
      try {
              Packet p = (Packet) event;
              int rc = 0;
              String clientPath = p.clientPath;
              if (p.replyHeader.getErr() != 0) {
                  rc = p.replyHeader.getErr();
              }
              
              ......
              
              else if (p.response instanceof GetDataResponse) {
                  DataCallback cb = (DataCallback) p.cb;
                  GetDataResponse rsp = (GetDataResponse) p.response;
                  if (rc == 0) {
                      cb.processResult(rc, clientPath, p.ctx, rsp.getData(), rsp.getStat());
                  } else {
                      cb.processResult(rc, clientPath, p.ctx, null, null);
                  }
              } 
          }
      } catch (Throwable t) {
          LOG.error("Caught unexpected throwable", t);
      }
}
  • p.response instanceof GetDataResponse判断当前事件时什么类型;
  • cb.processResult回调processResult,执行用户自定义逻辑;
    -------over-------

相关文章

  • Zookeeper(四)-客户端-消息处理流程

    概述 本节分析下客户端消息的处理,重点关注Watcher及DataCallback;本节以getData的同步和异...

  • Hbase 知识要点归总

    一、HBase Get 流程 1.1、客户端流程解析 客户端首先会根据配置文件中zookeeper地址连接zook...

  • Zookeeper(四)-客户端-启动流程

    概述 本节重点分析下客户端启动流程 启动流程 1. 构造ZooKeeper,如下代码为例: 2.构造hostPro...

  • zookeeper源码分析(2)-客户端启动流程

    zookeeper原生客户端启动流程 客户端整体结构如下: 客户端的入口,负责启动整个客户端。持有 和 的实例,提...

  • 打造自己的通信框架四——NettyServer搭建

    前言 从客户端发出一条消息到服务端接收并处理这条消息,大概可以分成下面的流程 黄色部分为客户端逻辑,蓝色为网络传输...

  • Zookeeper -- 消息处理链

    对于客户端消息,zk创建了一系列的RequestProcessor来对消息进行链式处理。zk服务承担不同角色时,消...

  • 查询处理 - 查询处理流程

    查询处理 熟悉查询处理流程 查询处理流程 查询流程由下面5个后端进程处理。 Parser从客户端接收SQL语句,解...

  • 音视频处理流程

    分为2个流程 直播客户端的处理流程 音频数据的流转 直播客户端的处理流程 音频数据的流转 PCM:是设备采集到数据...

  • Zookeeper之客户端连接源码分析

    客户端连接 通过Zookeeper客户端类库连接org.apache.zookeeper.ZooKeeper 1、...

  • Future模式

    Future模式 概念 处理流程 传统处理流程 客户端发出call请求,这个请求需要很长一段时间才能返回。客户端一...

网友评论

      本文标题:Zookeeper(四)-客户端-消息处理流程

      本文链接:https://www.haomeiwen.com/subject/naoigktx.html