美文网首页
【easy-rpc】二、优化日志:支持Reactor模式IO多路

【easy-rpc】二、优化日志:支持Reactor模式IO多路

作者: 大栗几 | 来源:发表于2020-08-15 20:58 被阅读0次

本文为原创文章,转载请注明出处
查看[easy-rpc]系列内容请点击:https://www.jianshu.com/nb/47424623

源码地址:这里

大家可以用在自己小型的项目上了,我自己测了,针对200个并发量,每个线程500个数字的数组排序的任务,在多线程情况下,仅需1.414秒~

默认情况下,easy-rpc的server端是以单线程模式启动的,启动后事务执行具有原子性。我们可以通过server的configuration来配置,使其以Reactor主从多线程方式运行,只需要按照如下代码:

server.getConfiguration().setThreadType(ThreadType.MULTI);

系统就会以多线程模式运行啦~

另外,这次对项目的结构做了一次优化。主要分为四个包:

  • server:服务器相关内容
  • client:客户端相关内容
  • core:服务器和客户端都需要的工具类
  • samples:一些示例可以从这里找哦~

在server中,IO多路复用机制主要定义在com.codelifeliwan.rpc.server.nio包中,入口类就是·MultiDataAcceptEventLoop·类,对应的单线程模式入口类是:SingleDataAcceptEventLoop

在该包下有一个子包reactor,其中包含了以下几个文件:

  • Reactor.java:Reactor模式的主类
  • Acceptor.java:理论上的读写IO主类(这里暂时读写IO不放在这里,而是放在了 SocketHandler.java中)
  • SocketHandler.java:数据处理主类

下面一一说明。

Reactor.java

Reactor模式的主类,只包含一个线程,主要负责对服务器端口的监听,其中有一个主要的对象ServerSocketChannel对象,以单线程模式监听服务器端口,一旦监听到事件,就会发送到Acceptor中进行处理。

一个Reactor包含了多个Acceptor,分别运行在不同的线程中,在Reactor中通过数组和线程池来管理Acceptor

除了负责监听端口外,Reactor还负责对于Accestor线程池和SocketHandler线程池的初始化工作。

// Reactor.java
package com.codelifeliwan.rpc.server.nio.reactor;

import com.codelifeliwan.rpc.server.config.Configuration;
import org.apache.log4j.Logger;

import java.net.InetSocketAddress;
import java.nio.channels.*;
import java.util.concurrent.*;

/**
 * @author LiWan
 * <p>
 * 主从Reactor多线程 模式处理,本类是主Reactor,只有一个线程
 * 主Reactor线程只负责连接的建立工作,具体的服务器通讯和IO操作放在Acceptor里面实现
 * <p>
 * 默认的Acceptor工作线程数(即Acceptor个数)为CPU线程数*2,可自主设置
 */
public class Reactor extends Thread {
    private static final Logger log = Logger.getLogger(Reactor.class);

    private volatile boolean started = false;

    /**
     * 业务处理的线程池
     * 默认为自动伸缩的线程池
     */
    private Executor processThreadPool;

    /**
     * 执行Accestor的线程池
     */
    private Executor accestorThreadPool;

    /**
     * 服务器配置信息
     */
    private Configuration configuration;

    private Acceptor[] acceptors;
    private volatile int acceptorCount = 1;

    /**
     * 服务器事件监听,只监听 ACCEPT 事件
     */
    private ServerSocketChannel channel;

    public Reactor(Configuration configuration, int acceptorCount) throws Exception {
        this(null, configuration, acceptorCount);
    }

    public Reactor(Configuration configuration) throws Exception {
        this(null, configuration);
    }

    public Reactor(Executor processThreadPool, Configuration configuration) throws Exception {
        this(processThreadPool, configuration, Runtime.getRuntime().availableProcessors() * 2);
    }

    public Reactor(Executor processThreadPool, Configuration configuration, int acceptorCount) throws Exception {
        if (processThreadPool != null) {
            this.processThreadPool = processThreadPool;
        } else {
            // 默认创建线程池中的线程个数从 cpu线程数 到 cpu线程数*5,线程过期时间1分钟
            this.processThreadPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                    Runtime.getRuntime().availableProcessors() * 5,
                    1,
                    TimeUnit.MINUTES,
                    new LinkedBlockingDeque<>());
        }
        this.configuration = configuration;
        this.acceptorCount = acceptorCount;
    }

    /**
     * 初始化Reactor
     * 初始化IO线程池(Acceptor线程池)并开始执行
     *
     * @throws Exception
     */
    private synchronized void init() throws Exception {
        channel = ServerSocketChannel.open();
        // channel.configureBlocking(false);
        channel.socket().bind(new InetSocketAddress(configuration.getListeningPort()));

        acceptors = new Acceptor[this.acceptorCount];
        for (int i = 0; i < acceptors.length; i++) {
            acceptors[i] = new Acceptor(processThreadPool, configuration);
        }

        // 初始化并执行IO线程池
        accestorThreadPool = Executors.newFixedThreadPool(acceptors.length);
        for (Acceptor acceptor : acceptors) {
            accestorThreadPool.execute(acceptor);
        }
    }


    /**
     * 复写run方法
     */
    public synchronized void run() {
        if (started) {
            log.info("Thread " + Thread.currentThread().getId() + " already started.");
            return;
        }

        started = true;

        try {
            init();

            // 轮流使用Acceptor执行IO任务
            int round = 0; // 下一个要使用的Acceptor指针
            acceptorCount = acceptors.length;
            while (started && (!Thread.currentThread().isInterrupted())) {

                // 此处会阻塞到有连接请求为止
                SocketChannel c = channel.accept();

                // 将该请求转发到对应的线程上执行IO操作
                c.configureBlocking(false);
                acceptors[round++].registerConnectChannel(c);

                round = round % acceptorCount;
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error(e.getMessage());
        }
    }

    public void shutdown() throws Exception {
        for (Acceptor acceptor : acceptors) acceptor.close();
        started = false;
        channel.close();
        Thread.currentThread().interrupt();
    }
}

Acceptor.java

理论上来说,Accestor是对网络IO读写的操作,但是这里因为直接使用Socket连接来进行,所以目前Acceptor中只负责对于不同网络事件的轮询、转发和状态转换工作。

// Acceptor.java
package com.codelifeliwan.rpc.server.nio.reactor;

import com.codelifeliwan.rpc.server.config.Configuration;
import com.codelifeliwan.rpc.core.RPCByteBuffer;
import lombok.Getter;
import org.apache.log4j.Logger;

import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;

/**
 * @author LiWan
 * <p>
 * 负责具体的网络IO操作,并将执行下发到线程池中运行
 */
@Getter
public class Acceptor implements Closeable, Runnable {
    private static final Logger log = Logger.getLogger(Acceptor.class);

    /**
     * 具体处理的线程池,在一个Server内共享该线程池
     */
    private Executor processThreadPool;

    private volatile Selector selector;

    private Configuration configuration;

    private volatile boolean closed = true;

    public Acceptor(Executor processThreadPool, Configuration configuration) throws Exception {
        this.processThreadPool = processThreadPool;
        this.configuration = configuration;
        this.selector = Selector.open();
    }


    @Override
    public void close() throws IOException {
        selector.close();
        closed = true;
        Thread.currentThread().interrupt();
    }

    /**
     * 从主Reactor注册channel,这里只监听OP_CONNECT和OP_READ请求
     *
     * @param sc
     * @throws Exception
     */
    public void registerConnectChannel(SocketChannel sc) throws Exception {
        registerConnectChannel(sc, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
    }

    public void registerConnectChannel(SocketChannel sc, int status) throws Exception {
        sc.register(selector, status, this);
        selector.wakeup();
    }

    /**
     * 读取并处理消息
     */
    public void handleExecutor(SocketChannel channel, RPCByteBuffer buffer) throws Exception {
        processThreadPool.execute(new SocketHandler(configuration, channel, buffer));
    }

    private void handleKey(SelectionKey key) throws Exception {
        if (key.isConnectable()) {
            handleConnectEvent(key);
        } else if (key.isAcceptable()) {
            handleAcceptEvent(key);
        } else if (key.isReadable()) {
            handleReadEvent(key);
        } else if (key.isWritable()) {
            handleOtherEvent(key);
        } else {
            handleOtherEvent(key);
        }
    }

    public void run() {
        closed = false;
        while (!closed && (!Thread.currentThread().isInterrupted())) {
            try {
                int eventCount = selector.select();
                if (eventCount == 0) continue;

                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> iter = keys.iterator();

                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove();

                    try {
                        if (!key.isValid()) {
                            key.channel().close();
                            key.cancel();
                            continue;
                        }
                        handleKey(key);
                    } catch (Exception e1) {
                        e1.printStackTrace();
                        log.error(e1.getMessage());
                    }
                }

            } catch (Exception e) {
                e.printStackTrace();
                log.error(e.getMessage());
            }
        }
    }

    /**
     * 处理CONNECT事件
     *
     * @param key
     * @throws Exception
     */
    private void handleConnectEvent(SelectionKey key) throws Exception {
        log.info("*** connectable");
        SocketChannel ch = (SocketChannel) key.channel();
        ch.finishConnect();
        key.interestOps(SelectionKey.OP_READ);
    }

    /**
     * 读取数据
     *
     * @param key
     * @throws Exception
     */
    private void handleReadEvent(SelectionKey key) throws Exception {
        log.info("*** readable");
        key.cancel();
        SocketChannel ch = (SocketChannel) key.channel();

        RPCByteBuffer buffer = RPCByteBuffer.fromChannel(ch);
        ch.shutdownInput();
        handleExecutor(ch, buffer);
    }

    /**
     * 测试此键的通道是否已准备好接受新的套接字连接
     * 本项目中,应该是在Reactor中使用,这里不使用
     *
     * @param key
     * @throws Exception
     */
    private void handleAcceptEvent(SelectionKey key) throws Exception {
        log.error("*** acceptable, this is impossiable");
        throw new Exception("no such event : accept");
    }

    /**
     * 写事件等,暂时不使用
     *
     * @param key
     * @throws Exception
     */
    private void handleOtherEvent(SelectionKey key) throws Exception {
        log.error("*** not used");
        throw new Exception("no such event : other");
    }
}

SocketHandler.java

SocketHandler负责对于服务器的方法执行操作,也是通过线程池来管理SocketHandler的:

// SocketHandler.java
package com.codelifeliwan.rpc.server.nio.reactor;

import com.codelifeliwan.rpc.core.RPCDefaultMessage;
import com.codelifeliwan.rpc.core.RPCStatus;
import com.codelifeliwan.rpc.core.serializer.MessageSerializer;
import com.codelifeliwan.rpc.server.config.BeanScope;
import com.codelifeliwan.rpc.server.config.ClassInfo;
import com.codelifeliwan.rpc.server.config.Configuration;
import com.codelifeliwan.rpc.core.RPCByteBuffer;
import com.google.gson.Gson;
import org.apache.log4j.Logger;

import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author LiWan
 * <p>
 * 负责具体的消息序列化、方法调用等操作
 */
public class SocketHandler implements Runnable {
    private static final Logger log = Logger.getLogger(SocketHandler.class);

    private Configuration configuration;
    private Map<String, ClassInfo<?>> beans;
    private RPCByteBuffer buffer;

    /**
     * Method的缓存
     * key = beanName-methodName
     */
    private static Map<String, Method> methodsCache = new ConcurrentHashMap<>();

    /**
     * 消息会从该socket解析,channel.socket()
     * 并通过该socket写回消息,用完后关闭
     */
    private SocketChannel channel;

    public SocketHandler(Configuration configuration, SocketChannel channel, RPCByteBuffer buffer) {
        this.configuration = configuration;
        this.channel = channel;
        this.beans = configuration.getBeanClasses();
        this.buffer = buffer;
    }

    @Override
    public void run() {
        MessageSerializer serializer = configuration.getMessageSerializer();

        try {
            byte[] bytes = this.buffer.getByteArray();
            RPCDefaultMessage fromMessage = (RPCDefaultMessage) serializer.unSerializeMessage(new String(bytes), null, null);

            if (!beans.containsKey(fromMessage.getBeanName())) {
                String warn = "unknown message : " + fromMessage.getBeanName();
                log.error(warn);
                throw new Exception(warn);
            }

            ClassInfo bean = beans.get(fromMessage.getBeanName());

            Object beanObj = null;
            if (bean.getScope() == BeanScope.SINGLETON) {
                // 单例模式
                beanObj = bean.getDefaultObject();
            } else {
                // 原型模式
                beanObj = bean.getClazz().getConstructor().newInstance();
            }

            // 实际方法调用处理过程, 这次重新序列化类型
            RPCDefaultMessage response = processMessage(beanObj, fromMessage);

            // 将返回结果写回
            String responseStr = serializer.serializeMessage(response);
            channel.write(ByteBuffer.wrap(responseStr.getBytes()));
        } catch (Exception e) {
            e.printStackTrace();
            RPCDefaultMessage response = new RPCDefaultMessage();
            response.setStatus(RPCStatus.GENERAL_ERRPR);
            response.setValue("error:" + e.getMessage());
            try {
                String responseStr = serializer.serializeMessage(response);
                channel.write(ByteBuffer.wrap(responseStr.getBytes()));
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        } finally {
            try {
                channel.close();
            } catch (Exception e) {
                e.printStackTrace();
                log.error(e.getMessage());
            }
        }
    }


    /**
     * 具体调用方法实现
     *
     * @param bean    实例化的bean
     * @param message 客户端传来的消息
     * @return
     * @throws Exception
     */
    private RPCDefaultMessage processMessage(Object bean, RPCDefaultMessage message) throws Exception {
        String key = message.getBeanName() + "-" + message.getMethodName();
        if (!methodsCache.containsKey(key)) {
            synchronized (SocketHandler.class) {
                if (!methodsCache.containsKey(key)) {
                    Method[] methods = bean.getClass().getDeclaredMethods();
                    if (methods != null) {
                        for (Method m : methods) {
                            if (m.getName().equalsIgnoreCase(message.getMethodName())) {
                                methodsCache.put(key, m);
                                break;
                            }
                        }
                    }
                }
            }
        }

        // 利用反射机制来实现方法调用
        if (!methodsCache.containsKey(key)) throw new Exception("no such method : " + message.getMethodName());
        Method method = methodsCache.get(key);

        RPCDefaultMessage response = new RPCDefaultMessage();

        // 参数类型转化,避免因为json传输造成的类型不匹配问题
        Gson gson = new Gson();
        Object[] params = message.getParamValues();
        if (params == null) params = new Object[0];
        Class[] paramTypes = method.getParameterTypes();
        if (paramTypes == null) paramTypes = new Class[0];

        if (params.length != paramTypes.length) throw new Exception("method param(s) not match.");
        for (int i = 0; i < params.length; i++) {
            params[i] = gson.fromJson(gson.toJson(params[i]), paramTypes[i]);
        }

        response.setValue(method.invoke(bean, params));
        return response;
    }
}

欢迎不懂的小伙伴留言~

相关文章

网友评论

      本文标题:【easy-rpc】二、优化日志:支持Reactor模式IO多路

      本文链接:https://www.haomeiwen.com/subject/jwxzdktx.html