美文网首页
store模块阅读22:HA(1):HAConnection

store模块阅读22:HA(1):HAConnection

作者: 赤子心_d709 | 来源:发表于2017-12-13 21:33 被阅读31次

    简介

    HAConnection用于描述master和slave用于同步数据的连接
    两个service如下
      ReadSocketService :读来自 Slave节点 的数据。
      WriteSocketService :写到往 Slave节点 的数据。
    重要方法
      ReadSocketService#run
      ReadSocketService#processReadEvent
      WriteSocketService#run
      WriteSocketService#transferData
    

    部分涉及HAService的代码后面再讲

    master和slave同步数据的协议如下


    参照refer

    属性

        private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
        private final HAService haService;//上层ha服务
        private final SocketChannel socketChannel;
        private final String clientAddr;//slave地址
        private WriteSocketService writeSocketService;//内部类
        private ReadSocketService readSocketService;//内部类
    
        private volatile long slaveRequestOffset = -1;//slave第一次请求的offset
        private volatile long slaveAckOffset = -1;//确认slave的最大位置
    

    函数

    构造函数和状态处理函数比较简单

        public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
            this.haService = haService;
            this.socketChannel = socketChannel;
            this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
            this.socketChannel.configureBlocking(false);
            this.socketChannel.socket().setSoLinger(false, -1);
            this.socketChannel.socket().setTcpNoDelay(true);
            this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
            this.socketChannel.socket().setSendBufferSize(1024 * 64);
            this.writeSocketService = new WriteSocketService(this.socketChannel);
            this.readSocketService = new ReadSocketService(this.socketChannel);
            this.haService.getConnectionCount().incrementAndGet();//ha服务连接数+1
        }
    
        public void start() {
            this.readSocketService.start();
            this.writeSocketService.start();
        }
    
        public void shutdown() {
            this.writeSocketService.shutdown(true);
            this.readSocketService.shutdown(true);
            this.close();
        }
    
        public void close() {
            if (this.socketChannel != null) {
                try {
                    this.socketChannel.close();
                } catch (IOException e) {
                    HAConnection.log.error("", e);
                }
            }
        }
    
        public SocketChannel getSocketChannel() {
            return socketChannel;
        }
    

    内部类

    ReadSocketService

    读socket服务

    属性以及构造函数

            private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
            private final Selector selector;
            private final SocketChannel socketChannel;
            private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);//读的buffer,分配1M
            private int processPostion = 0;//byteBufferRead处理到的位置
            private volatile long lastReadTimestamp = System.currentTimeMillis();//记录最后读取的时间,20s超时
    
            public ReadSocketService(final SocketChannel socketChannel) throws IOException {
                this.selector = RemotingUtil.openSelector();
                this.socketChannel = socketChannel;
                this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                this.thread.setDaemon(true);
            }
    

    run方法

    代码,注释如下

            /**
             * while循环中
             *  1.调用processReadEvent,如果出错则break
             *  2.如果读的间隔超过了指定时间(默认20s)则break
             * 退出了break
             *  1.读写service stop
             *  2.ha服务移除当前HAConnection记录
             *  3.selector,channel关闭,清理
             */
            @Override
            public void run() {
                HAConnection.log.info(this.getServiceName() + " service started");
    
                while (!this.isStopped()) {
                    try {
                        this.selector.select(1000);
                        boolean ok = this.processReadEvent();//处理读事件
                        if (!ok) {//处理错误
                            HAConnection.log.error("processReadEvent error");
                            break;
                        }
    
                        long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
                        if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {//心跳间隔超过了指定间隔,默认20s
                            log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
                            break;
                        }
                    } catch (Exception e) {
                        HAConnection.log.error(this.getServiceName() + " service has exception.", e);
                        break;
                    }
                }
    
                this.makeStop();//读线程stop
    
                writeSocketService.makeStop();//写线程stop
    
                //ha服务移除当前HAConnection记录
                haService.removeConnection(HAConnection.this);
    
                //ha服务的连接数-1
                HAConnection.this.haService.getConnectionCount().decrementAndGet();
    
                //取消注册的key
                SelectionKey sk = this.socketChannel.keyFor(this.selector);
                if (sk != null) {
                    sk.cancel();
                }
                //关闭socket以及channel
                try {
                    this.selector.close();
                    this.socketChannel.close();
                } catch (IOException e) {
                    HAConnection.log.error("", e);
                }
    
                HAConnection.log.info(this.getServiceName() + " service end");
            }
    

    processReadEvent

            /**
             * 处理读事件
             * 1.如果byteBufferRead写满了,就flip准备重新写,更新processPostion
             * 2.只要buffer还能写
             *  从socketChannel内容写到byteBufferRead中
             *  更新lastReadTimestamp
             *  找到pos之前最近%8==0的位置(每次传输的是一个long,刚好8个字节)
             *  读取之前8个字节,记录slaveAckOffset,代表slave发送过来的offset,即slave同步到的offset
             *  notifyTransferSome。master通知slave: "已经知道slave同步到slaveAckOffset这个位置了"
             */
            private boolean processReadEvent() {
                int readSizeZeroTimes = 0;
    
                if (!this.byteBufferRead.hasRemaining()) {//没有什么要读了
                    this.byteBufferRead.flip();//从头读
                    this.processPostion = 0;
                }
    
                while (this.byteBufferRead.hasRemaining()) {
                    try {
                        int readSize = this.socketChannel.read(this.byteBufferRead);//channel中的内容读到byteBufferRead中
                        if (readSize > 0) {
                            readSizeZeroTimes = 0;
                            //最后读取时间
                            this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                            if ((this.byteBufferRead.position() - this.processPostion) >= 8) {//buffer最新的位置离处理的位置超过了8字节(一个long占的位置)
                                int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);//找到对应的整
                                long readOffset = this.byteBufferRead.getLong(pos - 8);//读取这个long
                                this.processPostion = pos;//记录处理到的位置
                                // 设置已确认的最大位置
                                HAConnection.this.slaveAckOffset = readOffset;
                                if (HAConnection.this.slaveRequestOffset < 0) {//初始请求的位置,代表是slave第一次请求
                                    HAConnection.this.slaveRequestOffset = readOffset;
                                    log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
                                }
                                // 通知目前Slave进度
                                HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                            }
                        } else if (readSize == 0) {
                            if (++readSizeZeroTimes >= 3) {
                                break;
                            }
                        } else {
                            log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
                            return false;
                        }
                    } catch (IOException e) {
                        log.error("processReadEvent exception", e);
                        return false;
                    }
                }
    
                return true;
            }
    

    WriteSocketService

    写socket服务,完成master向slave发送数据

    属性以及构造函数

            private final Selector selector;
            private final SocketChannel socketChannel;
            
            private final int headerSize = 8 + 4;//头部信息 包含 physical offset(long) + bodySize(int)
            private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(headerSize);//master给slave同步数据时的头部buffer
            private long nextTransferFromWhere = -1;//传递数据从哪开始
            private SelectMappedBufferResult selectMappedBufferResult;//master给slave同步数据的内容,即body
            private boolean lastWriteOver = true;
            private long lastWriteTimestamp = System.currentTimeMillis();//最后一次写的时间
    
            public WriteSocketService(final SocketChannel socketChannel) throws IOException {
                this.selector = RemotingUtil.openSelector();
                this.socketChannel = socketChannel;
                this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
                this.thread.setDaemon(true);
            }
    

    run方法

            /**
             * 正常情况下while循环
             *  1.等到slave请求,读取服务中更新slaveRequestOffset
             *  2.计算nextTransferFromWhere
             *      2.1如果为首次连接,那么slave传递的offset为0, master收到为0的offset后,从最后一个mappedFile开始复制
             *      2.2否则为slave同步过来的slaveRequestOffset
             *  3.如果之前发送给slave内容没有完成,那么一直接着发送
             *  4.如果之前发送给slave内容发完了,而且过了心跳时间(默认5s),那么只传递一个header过去(header中记录的bodySize为0)
             *  5.根据nextTransferFromWhere去mappedFile找合适大小的mappedFile内容,记录在selectMappedBufferResult
             *  6.调用transferData把头部byteBufferHeader以及body内容selectMappedBufferResult发送过去
             * 如果遇到异常,break处while循环
             *  1.读写service stop
             *  2.haService移除当前记录
             *  3.socket,channel关闭,清理
             *
             */
            @Override
            public void run() {
                HAConnection.log.info(this.getServiceName() + " service started");
    
                while (!this.isStopped()) {
                    try {
                        this.selector.select(1000);
    
                        if (-1 == HAConnection.this.slaveRequestOffset) {//slave还未请求过
                            Thread.sleep(10);
                            continue;
                        }
    
                        if (-1 == this.nextTransferFromWhere) {
                            //如果为首次连接,那么offset为0, master收到为0的offset后,从最后一个mappedFile开始复制
                            if (0 == HAConnection.this.slaveRequestOffset) {
                                long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
                                masterOffset =
                                    masterOffset
                                        - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                                        .getMapedFileSizeCommitLog());
    
                                if (masterOffset < 0) {
                                    masterOffset = 0;
                                }
                                this.nextTransferFromWhere = masterOffset;
                            } else {//否则从指定的offset开始同步数据给slave
                                this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;//如果不是首次连接,就从确认的slave的offset开始
                            }
    
                            log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
                                + "], and slave request " + HAConnection.this.slaveRequestOffset);
                        }
    
                        if (this.lastWriteOver) {//上次写完了
    
                            long interval =
                                HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
                            //5s一次心跳,如果超时,记录的bodySize为0
                            if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                                .getHaSendHeartbeatInterval()) {
    
                                // Build Header
                                this.byteBufferHeader.position(0);
                                this.byteBufferHeader.limit(headerSize);
                                this.byteBufferHeader.putLong(this.nextTransferFromWhere);//头部信息 phyOffset为nextTransferFromWhere
                                this.byteBufferHeader.putInt(0);//头部信息 bodySize为0
                                this.byteBufferHeader.flip();
    
                                this.lastWriteOver = this.transferData();
                                if (!this.lastWriteOver)
                                    continue;
                            }
                        } else {
                            this.lastWriteOver = this.transferData();
                            if (!this.lastWriteOver)
                                continue;
                        }
    
                        //获取指定位置之后的buffer
                        SelectMappedBufferResult selectResult =
                            HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
                        if (selectResult != null) {
                            int size = selectResult.getSize();
                            //同步数据不得超过默认32k
                            if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
                                size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
                            }
    
                            long thisOffset = this.nextTransferFromWhere;
                            this.nextTransferFromWhere += size;//下次传输的位置
    
                            selectResult.getByteBuffer().limit(size);
                            this.selectMappedBufferResult = selectResult;
    
                            // Build Header
                            this.byteBufferHeader.position(0);
                            this.byteBufferHeader.limit(headerSize);//头部12字节
                            this.byteBufferHeader.putLong(thisOffset);//开始位置
                            this.byteBufferHeader.putInt(size);//bodySize大小
                            this.byteBufferHeader.flip();
    
                            this.lastWriteOver = this.transferData();
                        } else {
                            // 没新的消息,挂起等待
                            HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
                        }
                    } catch (Exception e) {
    
                        HAConnection.log.error(this.getServiceName() + " service has exception.", e);
                        break;
                    }
                }
                //break出来,代表异常
                if (this.selectMappedBufferResult != null) {
                    this.selectMappedBufferResult.release();
                }
                //停止读写service
                this.makeStop();
    
                readSocketService.makeStop();
    
                haService.removeConnection(HAConnection.this);//移除该记录
    
                SelectionKey sk = this.socketChannel.keyFor(this.selector);
                if (sk != null) {
                    sk.cancel();
                }
    
                try {
                    this.selector.close();
                    this.socketChannel.close();
                } catch (IOException e) {
                    HAConnection.log.error("", e);
                }
    
                HAConnection.log.info(this.getServiceName() + " service end");
            }
    

    transferData

    master给slave写数据

            /**
             * 传输数据
             * 1.写头部
             * 2.写body
             */
            private boolean transferData() throws Exception {
                int writeSizeZeroTimes = 0;//长度为0的写的次数
                // Write Header,写头部
                while (this.byteBufferHeader.hasRemaining()) {
                    int writeSize = this.socketChannel.write(this.byteBufferHeader);
                    if (writeSize > 0) {
                        writeSizeZeroTimes = 0;
                        //更新写时间
                        this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                    } else if (writeSize == 0) {
                        if (++writeSizeZeroTimes >= 3) {
                            break;
                        }
                    } else {
                        throw new Exception("ha master write header error < 0");
                    }
                }
    
                if (null == this.selectMappedBufferResult) {
                    return !this.byteBufferHeader.hasRemaining();
                }
    
                writeSizeZeroTimes = 0;
    
                // Write Body,写内容
                if (!this.byteBufferHeader.hasRemaining()) {//头部写完了
                    while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
                        int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());//把master的内容传递给slave
                        if (writeSize > 0) {
                            writeSizeZeroTimes = 0;//清空次数
                            this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                        } else if (writeSize == 0) {
                            if (++writeSizeZeroTimes >= 3) {
                                break;
                            }
                        } else {
                            throw new Exception("ha master write body error < 0");
                        }
                    }
                }
    
                boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
    
                if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
                    this.selectMappedBufferResult.release();
                    this.selectMappedBufferResult = null;//发送完了之后置为null
                }
    
                return result;
            }
    

    思考

    两个线程run方法的流程,思路

    都写在注释上了

    processReadEvent时如果byteBufferRead一次读入了大量数据怎么办

    源码上


    image.png

    如果一下子读的内容比processPostion 大了几十几百,会怎么办,为什么只读最后8个字节就行
    这里应该是HAClient的发送内容保证的,最后发送的offset一定是最大的,因此之前的offset,即使收到了没有处理,也没有关系

    WriteSocketService和ReadSocketService的异同

    同:都继承ServiceThread,都记录一个最后读(写)时间,出现问题时都需要关闭两个Service,处理socket等
    异:

    1.同步协议不一样
    读数据只要读一个long,代表maxPhyOffset
    写要有一个12字节的header,后面是同步的内容
    2.检测的时长不一样
    读数据是20s没有读算超时(houseKeep)
    写是5s没有写算超时(heartBeat)
    

    问题

    processReadEvent代码重复

    image.png

    我并不知道为啥要这样写

    备注

    思考中第二个对于HAClient的讲解,源码还没看

    refer

    http://blog.csdn.net/quhongwei_zhanqiu/article/details/39144469
    http://technoboy.iteye.com/blog/2368458

    相关文章

      网友评论

          本文标题:store模块阅读22:HA(1):HAConnection

          本文链接:https://www.haomeiwen.com/subject/bhswixtx.html