美文网首页
RocketMQ系列2:主从同步

RocketMQ系列2:主从同步

作者: 过去今天和未来 | 来源:发表于2021-01-12 20:25 被阅读0次

    前提netty常用方法

         通过学习RocketMQ源码发现其中涉及到很多netty编程,下面先简单梳理一下Netty 常用类和方法,本次重点说一下ByteBuffer和SocketChannel的API

    1、ByteBuffer

         netty涉及到所有网络通信字节流都是通过ByteBuffer来完成的,ByteBuffer特性包括:动态扩容、读写操作采用不同指针不需要随意切换、支持缓冲池等特性。
         netty创建内存空间是堆外内存,最终调用的则是Unsafe#allocateMemory完成内存创建

     ByteBuffer reportOffset = ByteBuffer.allocate(8);
    

    ByteBuffer有4个属性,mark<=position<=limit<=capacity

    • capacity:容纳的最大数据量
    • limit:缓冲区的当前终点
    • position:下一个被读或写的元素的索引,每次读写都会改变值
    • mark:标记位置,数据读取关键位置,reset方法方便回退
      基于这几个属性值,可以实现以下方法
    limit()        // 设定ByteBuffer空间容量大小
    reset()        // position设置为mark位置,回退到已读取数据的位置
    clear()        //初始化,但不影响byte数据内容
    flip()         // 翻转就是将存数据状态转变为获取数据状态  
    hasRemaining() //返回是否还是未读的数据内容 limit - position>0
    get()          //从position位置读取Byte数据,position加1
    get(int index) //读取底层下标值
    put(byte b)    //写数据,向position指向的地址写字节
    

    2、SocketChannel

    • 创建服务端Channel:本质通过反射技术调用JDK底层Channel
    • 初始化Channel:设置Socket参数和用户自定义属性,并在pipeline上添加两个特殊handler。设置是否使用阻塞模式:true/false。configureBlocking(false)
    • 注册服务端Channel:调用底层JDK将channel注册到Selector上。register(Selector,int) 第一个参数可以传入一个选择器对象,第二个可传入SelectionKey代表操作类型的四个静态整型常量中的一个,表示该选择器关心的操作类型。包括OP_ACCEPT、OP_CONNECT、OP_READ、OP_WRITE
    • 端口绑定:调用底层JDK将端口绑定在channel,同时将OP_ACCEPT事件添加到Channel事件中。

    主从同步流程

    1、master启动建立监听
         HAService#start 启动流程主要包括 acceptSocketService用于服务端接受连接线程实现类、groupTransferService 判断主从是否同步复制完成、HAClient HA客户端实现。

    public void start() throws Exception {
        // 建立HA服务端监听服务,并启动监听逻辑
        this.acceptSocketService.beginAccept();
        this.acceptSocketService.start();
        this.groupTransferService.start();
        // HA客户端线程
        this.haClient.start();
    }
    

         首先看一下 AcceptSocketService 具体实现, beginAccept()核心是创建ServerSocketChannel、创建selector、端口绑定、设置为非阻塞、并在Select上注册连接事件。
    2、处理连接请求
         AcceptSocketService# run() 则每隔1s进行处理连接事件,这里只处理OP_ACCEPT事件, 当有SocketChannel 返回说明有一个slave和master建立连接请求,Master则为每一个连接创建一个HAConnetction。

     // AcceptSocketService#beginAccept  
    public void beginAccept() throws Exception {
        this.serverSocketChannel = ServerSocketChannel.open();   // 
        this.selector = RemotingUtil.openSelector();
        this.serverSocketChannel.socket().setReuseAddress(true);
        this.serverSocketChannel.socket().bind(this.socketAddressListen);
        this.serverSocketChannel.configureBlocking(false);
        this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
    }
    @Override
    public void run() {
        while (!this.isStopped()) { 
                // select进行轮训获取监听数据
                this.selector.select(1000);
                Set<SelectionKey> selected = this.selector.selectedKeys();
                if (selected != null) {
                    for (SelectionKey k : selected) {
                        if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                            SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
                            if (sc != null) {
                                try {
                                    // 创建 HAConnection对象,主要复制M-S数据同步
                                    HAConnection conn = new HAConnection(HAService.this, sc);
                                    conn.start();
                                    HAService.this.addConnection(conn);
                                } 
                            }
                        }
                    }
                    selected.clear();
                }
            } 
        }
    }
    

    slave端启动并同步偏移量

         1、启动 HAClient线程HAService #start :HAClient执行run方法 这里是while循环,主要做了三件事情。

    • 连接master节点,没有连接成功等待5s。
    • 判断是否需要向master上报当前节点commitlog的消息偏移量。this.isTimeToReportOffset() 主要进行判断了拉取间隔是否大于haSendHeartbeatInterval默认是5s。
    • 上报currentReportedOffset偏移量。
           public void run() {
               while (!this.isStopped()) {
                   try {
                       if (this.connectMaster()) {
                           if (this.isTimeToReportOffset()) {
                               boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                               if (!result) {
                                   this.closeMaster();
                               }
                           }
                       } else {
                           this.waitForRunning(1000 * 5);
                       }
                   } 
               }
           }
    

         2、HAClient#connectMaster 连接master:过程是创建socketChannel注册到selector上并注册OP_READ事件。对于slave怎么知道master地址的?这里BrokerController启动的时候 主要通过向NameServer 上报元数据调用registerBrokerAll时返回的RegisterBrokerResult值,其中就包含了masterAddr。
    开始初始化当前节点 commitlog的最大偏移量和当前时间戳,开始进行数据同步操作。

    private boolean connectMaster() throws ClosedChannelException {
        if (null == socketChannel) {
            String addr = this.masterAddress.get();
            if (addr != null) {
                SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
                if (socketAddress != null) {
                    this.socketChannel = RemotingUtil.connect(socketAddress);
                    if (this.socketChannel != null) {
                        this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                    }
                }
            }
            this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
            this.lastWriteTimestamp = System.currentTimeMillis();
        }
        return this.socketChannel != null;
    }
    

         3、上报偏移量HAClient#reportSlaveMaxOffset(this.currentReportedOffset):则是socketChannel.write 发送网络请求将slave偏移量上报给master节点。该方法 其实就是ByteBuffer操作,可以结合开篇ByteBuffer API来看。

    private boolean reportSlaveMaxOffset(final long maxOffset) {
                this.reportOffset.position(0);
                this.reportOffset.limit(8);
                this.reportOffset.putLong(maxOffset);
                this.reportOffset.position(0);
                this.reportOffset.limit(8);
                for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
                    try {
                        this.socketChannel.write(this.reportOffset);
                    }  
                }
                return !this.reportOffset.hasRemaining();
    }
    

    master节点处理请求

    master端更新slave最大偏移量并同步消息

         HAConnection 按照固定端口监听客户端连接,当客户端构建好channel后封装HAConnection,开始执行任务,主要包括两个核心内部类进行网络操作ReadSocketService读操作和WriteSocketService写操作
          1、HAConnection#ReadSocketService: 创建选择器后注册读事件。run 主要从从服务器拉取请求,每隔1s循环一次,获取从服务器最大偏移量是多少并更新。读取3次后数据大小为0则结束本次读取

    public void run() {
              while (!this.isStopped()) {
                  try {
                      this.selector.select(1000);
                      boolean ok = this.processReadEvent();
                   } 
              }      
          }
    private boolean processReadEvent() {
              int readSizeZeroTimes = 0;
              if (!this.byteBufferRead.hasRemaining()) {
                  this.byteBufferRead.flip();
                  this.processPosition = 0;
              }
              while (this.byteBufferRead.hasRemaining()) {
                  try {
                      int readSize = this.socketChannel.read(this.byteBufferRead);
                      if (readSize > 0) {
                          readSizeZeroTimes = 0;
                          this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                          if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
                              int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
                              long readOffset = this.byteBufferRead.getLong(pos - 8);
                              this.processPosition = pos;
                              HAConnection.this.slaveAckOffset = readOffset;
                              HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                          }
                      } else if (readSize == 0) {
                          if (++readSizeZeroTimes >= 3) {
                              break;
                          }
                      }
                  } 
              }
              return true;
          }
    

          2、HAConnection$WriteSocketService:根据ReadSocketService获取从服务器更新的最大偏移量,然后将主服务器增量的数据同步。
          当nextTransferFromWhere为-1 则表示初次进行数据传输。否则按照当前master的下一个消息的偏移量减去slave最大偏移量区间。从commitLog中获取消息 按照socketChannel将消息写给slave。同时在写操作时并不是一次就将数据写完,会分多次网络写操作。

    if (this.lastWriteOver) {
        long interval =
        HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
        if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) {
              this.byteBufferHeader.position(0);
              this.byteBufferHeader.limit(headerSize);
              this.byteBufferHeader.putLong(this.nextTransferFromWhere);
              this.byteBufferHeader.putInt(0);
              this.byteBufferHeader.flip();
              this.lastWriteOver = this.transferData();
         } else {
              this.lastWriteOver = this.transferData();
       }
    // this.transferData
    // Write Body
    if (!this.byteBufferHeader.hasRemaining()) {
        while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
            int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
            if (writeSize > 0) {
                writeSizeZeroTimes = 0;
                this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
            } else if (writeSize == 0) {
                if (++writeSizeZeroTimes >= 3) {
                    break;
                }
            }  
        }
    }   
    

    Slave处理master返回的数据

    HAService$HAClient#processReadEvent
         Slave端开始进行处理从master传回的消息数据,当读取的数据大于0,调用dispatchReadRequest()转发处理请求。否则连续3次从网络读取的数据为0结束本次读操作。

    private boolean processReadEvent() {
     int readSizeZeroTimes = 0;
     while (this.byteBufferRead.hasRemaining()) {
         try {
             int readSize = this.socketChannel.read(this.byteBufferRead);
             if (readSize > 0) {
                 readSizeZeroTimes = 0;
                 boolean result = this.dispatchReadRequest();
             } else if (readSize == 0) {
                 if (++readSizeZeroTimes >= 3) {
                     break;
                 }
             } 
         } 
     }
     return true;
    }
    

         HAService$HAClient# dispatchReadRequest主要从byteBufferRead中解析消息,并将消息存储在commitlog文件中。整个过程主要包括三件事情
    1. msgHeaderSize = 8+4,表示消息的物理偏移量和消息的长度,根据这个判断是否读到一个完整的消息。否则将本次数据先备份一下,等待下一个读取网络数据
    2. 获取当前消息文件的最大物理偏移量,如果slave的最大物理偏移量与master给的偏移量不相等,则返回false
    3. DefaultMessageStore#appendToCommitLog方法将消息内容追加到消息内存映射文件中,然后唤醒ReputMessageService实时将消息转发给消息消费队列与索引文件,更新dispatchPosition,并向服务端及时反馈当前已存储进度。

    private boolean dispatchReadRequest() {
        final int msgHeaderSize = 8 + 4; // phyoffset + size
        int readSocketPos = this.byteBufferRead.position();
        while (true) {
            int diff = this.byteBufferRead.position() - this.dispatchPosition;
            if (diff >= msgHeaderSize) {
                long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
                int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
                long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
                if (diff >= (msgHeaderSize + bodySize)) {
                    byte[] bodyData = new byte[bodySize];
                    this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
                    this.byteBufferRead.get(bodyData);
                    // 进行消息同步,将日志消息写入byteBuffer中
                  HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
                    this.byteBufferRead.position(readSocketPos);
                    this.dispatchPosition += msgHeaderSize + bodySize;
                    continue;
                }
            }
        }
        return true;
    }
    

    常见问题

    brokerIP1和BrokerIP2区别

    BrokerIP1: 当前Broker监听的IP
    BrokerIP2: broker主从时,在Broker主节点配置,broker从节点会连主节点IP2进行同步。

    主从服务器都在运行过程中,消息消费者是从主拉取消息还是从从拉取?

    答:默认情况下,RocketMQ消息消费者从主服务器拉取,当主服务器积压的消息超过了物理内存的40%,则建议从从服务器拉取。但如果slaveReadEnable为false,表示从服务器不可读,从服务器也不会接管消息拉取。

    相关文章

      网友评论

          本文标题:RocketMQ系列2:主从同步

          本文链接:https://www.haomeiwen.com/subject/orweaktx.html