美文网首页
为初学者而来~手工最简MQ(二)Broker

为初学者而来~手工最简MQ(二)Broker

作者: eSoo | 来源:发表于2020-10-16 15:07 被阅读0次

    本文仅展示核心代码,全部代码,请移步:git-soomq

    1,服务端

    服务端的设计就非常简单了,最核心的就是消息的存取,以及响应生产者和消费者的网络请求
    分为2部分:

    1.1 消息文件

    消息的存储我们参考kafka,并简化其逻辑,因为是最简单的mq,我们只考虑单机的情况的就行,每个topic存储2个文件

    topicname.index
    topicname.data

    .index 文件存储格式为:
    消息顺序号:消息截止位置
    .data 文件按照顺序存储具体的消息

    文件操作:

    package com.esoo.mq.server.message;
    
    import com.alibaba.fastjson.JSON;
    import com.esoo.mq.common.ProcessorCommand;
    
    import java.io.RandomAccessFile;
    
    /**
     * 为每个topic创建一个对象进行管理
     */
    public class MessageFile {
        private String topic;
        private Long offset;
        //索引文件
        private RandomAccessFile indexFile = null ;
        //数据文件
        private RandomAccessFile dataFile = null ;
    
        //追加消息(生产者进行调用)
        public ProcessorCommand appendMsg(ProcessorCommand in){
    
            try {
                //加锁,避免竞争,文件乱码
                synchronized (in.getResult().getTopic()) {
    
                    //读取index文件最后一行
                    String lastLine = readLastLine(indexFile, null);
                    int lastOffset = 1;
                    //消息体追加到data文件中,并返回文件末尾位置,作为本条消息的offset
                    long lastindex =  writeEndLine(dataFile, in.getResult().getBody());
                    if (lastLine != null && !lastLine.equals("")) {
                        String index[] = lastLine.split(":");
                        lastOffset = Integer.valueOf(index[0]);
                        lastOffset = lastOffset + 1;
                    }
                    //组装本条消息index 序列号:消息体末尾位置
                    String insertMsgIndex = lastOffset + ":" + lastindex + "\t\n";
                    writeEndLine(indexFile, insertMsgIndex.getBytes());
                    in.setSuccess(true);
                }
            }catch (Exception e){
                e.printStackTrace();
    
                in.setSuccess(false);
                in.setExmsg(e.getMessage());
            }
            return in;
    
        }
    
        //读取消息,消费者进行调用
        public ProcessorCommand readMsg(ProcessorCommand in){
    
    
            try {
                synchronized (in.getResult().getTopic()) {
                    // 消息定位位置
                    int seekIn = 0;
                    // 消息体大小
                    int bodySize = 0;
                    //先定位到开始
                    indexFile.seek(0);
                    String indesMap=null;
                    //遍历index文件,找到上一个消息 offset 与本消息offset 进行相减就是消息体大小
                    while ((indesMap = indexFile.readLine())!=null){
                        String index[] = indesMap.split(":");
                        int inNum = Integer.valueOf(String.valueOf(index[0]).trim());
                        int off = Integer.valueOf(String.valueOf(index[1]).trim());
                        if (inNum == in.getResult().getOffset()) {
                            seekIn = off;
                        }
                        if (inNum == (in.getResult().getOffset() + 1)) {
                            bodySize = off - seekIn;
                        }
                    }
                    if (bodySize == 0) {
                        in.setSuccess(false);
                        in.setExmsg("offset is end");
                        return in;
                    }
                    //定位到具体位置
                    dataFile.seek(seekIn);
    
                    //进行消息读取
                    byte[] b = new byte[bodySize];
                    dataFile.read(b);
                    in.getResult().setBody(b);
    
                    in.setSuccess(true);
                    System.out.println(" READ MSG IS: "+JSON.toJSONString(in));
                }
            }catch (Exception e){
                e.printStackTrace();
                in.setSuccess(false);
                in.setExmsg(e.getMessage());
            }
            return in;
    
        }
    
        //写消息到最后一行
        public static long writeEndLine(RandomAccessFile file, byte[] msg)
                throws Exception {
            // 文件长度,字节数
            long fileLength = file.length();
            // 将写文件指针移到文件尾。
            file.seek(fileLength);
            file.write(msg);
            return file.getFilePointer();
    
        }
    
        //读取最后一行的消息
        public static String readLastLine(RandomAccessFile file, String charset) throws Exception {
    
            long len = file.length();
            if (len == 0L) {
                return "";
            } else {
                long pos = len - 1;
                while (pos > 0) {
                    pos--;
                    file.seek(pos);
                    if (file.readByte() == '\n') {
                        break;
                    }
                }
                if (pos == 0) {
                    file.seek(0);
                }
                byte[] bytes = new byte[(int) (len - pos)];
                file.read(bytes);
                if (charset == null) {
                    return new String(bytes);
                } else {
                    return new String(bytes, charset);
                }
            }
    
        }
    
        public static String readByOffset(RandomAccessFile file, String charset) throws Exception {
    
            return null;
        }
    
    
    
        public String getTopic() {
            return topic;
        }
    
        public void setTopic(String topic) {
            this.topic = topic;
        }
    
        public Long getOffset() {
            return offset;
        }
    
        public void setOffset(Long offset) {
            this.offset = offset;
        }
    
        public RandomAccessFile getIndexFile() {
            return indexFile;
        }
    
        public void setIndexFile(RandomAccessFile indexFile) {
            this.indexFile = indexFile;
        }
    
        public RandomAccessFile getDataFile() {
            return dataFile;
        }
    
        public void setDataFile(RandomAccessFile dataFile) {
            this.dataFile = dataFile;
        }
    }
    
    

    1.2 网络编程

    利用netty 开放端口,响应生产者与消费者,每个消息包装成一个commod,commod类型

    • 消息类型(消费/生产)
    • 消息topic
    • 消息体(生产时用)
    • 消息顺序号(消费时用)
    • 处理结果(成功/失败)
    • 处理消息(失败时添加原因)

    网络启动

    package com.esoo.mq.server;
    
    import com.esoo.mq.server.netty.handler.NettySooMqServerHandler;
    import com.esoo.mq.server.netty.handler.NettySooMqServerOutHandler;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.PooledByteBufAllocator;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.serialization.ClassResolvers;
    import io.netty.handler.codec.serialization.ObjectDecoder;
    import io.netty.handler.codec.serialization.ObjectEncoder;
    
    public class SooMQServer {
        private static Integer serverPort=9870;
        ServerBootstrap b = new ServerBootstrap();
    
        public void start(){
            //创建reactor 线程组
            EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
    
            try {
                //1 设置reactor 线程组
                b.group(bossLoopGroup, workerLoopGroup);
                //2 设置nio类型的channel
                b.channel(NioServerSocketChannel.class);
                //3 设置监听端口
                b.localAddress(serverPort);
                //4 设置通道的参数
                b.option(ChannelOption.SO_KEEPALIVE, true);
                b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
                b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    
                //5 装配子通道流水线
                b.childHandler(new ChannelInitializer<SocketChannel>() {
                    //有连接到达时会创建一个channel
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // pipeline管理子通道channel中的Handler
                        // 向子channel流水线添加一个handler处理器
                        ch.pipeline().addLast(new ObjectEncoder());
                        ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE,
                                ClassResolvers.cacheDisabled(null)));
                        ch.pipeline().addLast(new NettySooMqServerOutHandler());
                        ch.pipeline().addLast(new NettySooMqServerHandler());
                    }
                });
                // 6 开始绑定server
                // 通过调用sync同步方法阻塞直到绑定成功
                ChannelFuture channelFuture = b.bind().sync();
                System.out.println(" 服务器启动成功,监听端口: " +
                        channelFuture.channel().localAddress());
    
                // 7 等待通道关闭的异步任务结束
                // 服务监听通道会一直等待通道关闭的异步任务结束
                ChannelFuture closeFuture = channelFuture.channel().closeFuture();
                closeFuture.sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                // 8 优雅关闭EventLoopGroup,
                // 释放掉所有资源包括创建的线程
                workerLoopGroup.shutdownGracefully();
                bossLoopGroup.shutdownGracefully();
            }
        }
    
    }
    
    

    网络逻辑分发

    注意:回写给客户端的消息体类型必须与入参保持一致,否则netty无法解析


    netty
    package com.esoo.mq.server.netty.handler;
    
    
    import com.alibaba.fastjson.JSON;
    import com.esoo.mq.common.ProcessorCommand;
    import com.esoo.mq.server.processor.Processor;
    import com.esoo.mq.server.processor.ProcessorFactory;
    import io.netty.channel.*;
    
    @ChannelHandler.Sharable
    public class NettySooMqServerHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
            try {
                ProcessorCommand command = (ProcessorCommand) msg;
                System.out.println("["+ctx.channel().remoteAddress()+"] msg:"+JSON.toJSONString(msg));
                Processor processor = ProcessorFactory.getProcessorInstantiate(command.getType());
                msg = processor.handle(command);
                ChannelFuture f = ctx.writeAndFlush(msg);
                f.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        System.out.println("msg ctx send");
                    }
                });
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            super.channelInactive(ctx);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println(cause.getMessage());
            ctx.close();
        }
    
    
    
    }
    
    

    生产者

    package com.esoo.mq.server.processor;
    
    import com.esoo.mq.common.Message;
    import com.esoo.mq.common.ProcessorCommand;
    import com.esoo.mq.server.message.MessageFile;
    import com.esoo.mq.server.message.MessageFileFactory;
    
    public class SendMessageProcessor implements Processor<Message,Message> {
    
        @Override
        public ProcessorCommand handle(ProcessorCommand task) {
            MessageFile file = MessageFileFactory.getTopicFile(task.getResult().getTopic());
            task = file.appendMsg(task);
            return task;
        }
    
    
    }
    
    

    消费者

    package com.esoo.mq.server.processor;
    
    import com.esoo.mq.common.Message;
    import com.esoo.mq.common.ProcessorCommand;
    import com.esoo.mq.server.message.MessageFile;
    import com.esoo.mq.server.message.MessageFileFactory;
    
    public class ReadMessageProcessor implements Processor<Message,Message> {
    
        @Override
        public ProcessorCommand handle(ProcessorCommand task) {
            Message msg = task.getResult();
            MessageFile file = MessageFileFactory.getTopicFile(msg.getTopic());
            task = file.readMsg(task);
            return task;
        }
    
    
    }
    
    

    相关文章

      网友评论

          本文标题:为初学者而来~手工最简MQ(二)Broker

          本文链接:https://www.haomeiwen.com/subject/pwbzpktx.html