RPC框架远程调用的实现方式在原理上是比较简单的,即将调用的方法(接口名、方法名、参数类型、参数)序列化之后发送到远端,在远端反序列化之后调用接口的实现类的方法(接口主要是为了使用动态代理)。
所以我们在实现RPC框架的时候需要选择合适的序列化与反序列化方式
常见的序列化与反序列化协议
- Java内置的序列化
生成的字节数组较大、不能跨语言、效率低下,不建议使用 - Hessian
简单高效特点而使用广泛 - protobuf或者protostuff
google开源的protobuf采用更为紧凑的二进制数组,表现更加优异,不过其使用方法极其繁琐,需要编写proto文件,然后使用protobuf的编译工具生成pojo类
protostuff基于protobuf的序列化,在运行只需要添加模式schema即可序列化对象,schema可由RuntimeSchema生成,然后调用ProtostuffIOUtil的toByteArray()方法;反序列化也需要添加模式schema,然后调用ProtostuffIOUtil的mergeFrom()方法即可将字节还原成对象 - Kryo
速度快,序列化后体积小,跨语言支持较复杂。spark推荐的序列化方式,需要注册对象 - Avro
Avro 是属于 Hadoop 的一个子项目 - Thrift
和protobuf一样不支持动态特性
其中Hessian和Kryo使用最简单,protobuf/protostuff应该是效率最高的,Thrift应该是跨语言支持最好的,目前我知道的在大型开源项目中使用Avro只有Hadoop,感觉应该和Kyro类似。
基于Netty开发简单的RPC框架
首先说明,此框架不是我写的,这是Github上某user的思考成果。不偷不抢不吹牛逼,看大神代码,一起成长
版权请见https://github.com/tang-jie/NettyRPC
消息体MessageRequest与MessageResponse
/**
* @filename
* @Description RPC请求体
* @Author
* @Date
*/
public class MessageRequest implements Serializable {
private String messageId; //唯一ID
private String className; //接口名
private String methodName; //方法名
private Class<?>[] typeParameters; //参数类型
private Object[] parametersVal; //参数
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getTypeParameters() {
return typeParameters;
}
public void setTypeParameters(Class<?>[] typeParameters) {
this.typeParameters = typeParameters;
}
public Object[] getParameters() {
return parametersVal;
}
public void setParameters(Object[] parametersVal) {
this.parametersVal = parametersVal;
}
public String toString() {
return ReflectionToStringBuilder.toStringExclude(this, new String[]{"typeParameters", "parametersVal"});
}
}
/**
* @filename
* @Description RPC响应体
* @Author
* @Date
*/
public class MessageResponse implements Serializable {
private String messageId; //唯一ID
private String error; //错误消息
private Object resultDesc; //方法调用结果
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
public String getError() {
return error;
}
public void setError(String error) {
this.error = error;
}
public Object getResult() {
return resultDesc;
}
public void setResult(Object resultDesc) {
this.resultDesc = resultDesc;
}
public String toString() {
return ReflectionToStringBuilder.toString(this);
}
}
/**
* @filename
* @Description RPC服务映射容器
* @Author
* @Date
*/
public class MessageKeyVal {
private Map<String, Object> messageKeyVal;
public void setMessageKeyVal(Map<String, Object> messageKeyVal) {
this.messageKeyVal = messageKeyVal;
}
public Map<String, Object> getMessageKeyVal() {
return messageKeyVal;
}
}
自定义线程池和线程工厂
作者可能觉得java内置的线程池和线程工厂不满足自己的需求,所以实现了自己的线程池和线程工厂,将调度任务放在特定的线程池去工作。
自定义线程池其实还比较简单,我在http://www.jianshu.com/p/4f368625294b有过简单的介绍,线程池异常策略有AbortPolicy(默认的,直接抛出一个RejectedExecutionException异常)、DiscardPolicy(rejectedExecution直接是空方法,什么也不干,如果队列满了,后续的任务都抛弃掉)、DiscardOldestPolicy(将等待队列里最旧的任务踢走,让新任务得以执行)、CallerRunsPolicy(既不抛弃新任务,也不抛弃旧任务,而是直接在当前线程运行这个任务)。
/**
* @filename
* @Description 自定义线程工厂
* @Author
* @Date
*/
public class NamedThreadFactory implements ThreadFactory {
private static final AtomicInteger threadNumber = new AtomicInteger(1);
private final AtomicInteger mThreadNum = new AtomicInteger(1);
private final String prefix;
private final boolean daemoThread;
private final ThreadGroup threadGroup;
public NamedThreadFactory() {
this("rpcserver-threadpool-" + threadNumber.getAndIncrement(), false);
}
public NamedThreadFactory(String prefix) {
this(prefix, false);
}
public NamedThreadFactory(String prefix, boolean daemo) {
this.prefix = prefix + "-thread-";
daemoThread = daemo;
SecurityManager s = System.getSecurityManager();
threadGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
}
public Thread newThread(Runnable runnable) {
String name = prefix + mThreadNum.getAndIncrement();
Thread ret = new Thread(threadGroup, runnable, name, 0);
ret.setDaemon(daemoThread);
return ret;
}
public ThreadGroup getThreadGroup() {
return threadGroup;
}
}
/**
* @filename
* @Description 自定义线程池
* @Author
* @Date
*/
public class RpcThreadPool {
public static Executor getExecutor(int threads, int queues) {
String name = "RpcThreadPool";
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>()
: (queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name));
}
}
/**
* @filename
* @Description 线程池异常策略
* @Author
* @Date
*/
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
private final String threadName;
public AbortPolicyWithReport(String threadName) {
this.threadName = threadName;
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("RpcServer["
+ " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d),"
+ " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)]",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating());
System.out.println(msg);
throw new RejectedExecutionException(msg);
}
}
序列化与反序列化接口
/**
* @filename
* @Description RPC消息序列化/反序列化接口定义
* @Author
* @Date
*/
public interface RpcSerialize {
void serialize(OutputStream output, Object object) throws IOException;
Object deserialize(InputStream input) throws IOException;
}
/**
* @filename
* @Description RPC消息序序列化协议选择器接口
* @Author
* @Date
*/
public interface RpcSerializeFrame {
void select(RpcSerializeProtocol protocol, ChannelPipeline pipeline);
}
/**
* @filename
* @Description 消息编码器
* @Author
* @Date
*/
public class MessageEncoder extends MessageToByteEncoder<Object> {
private MessageCodecUtil util = null;
public MessageEncoder(final MessageCodecUtil util) {
this.util = util;
}
protected void encode(final ChannelHandlerContext ctx, final Object msg, final ByteBuf out) throws Exception {
util.encode(out, msg);
}
}
/**
* @filename
* @Description 消息解码器
* @Author
* @Date
*/
public class MessageDecoder extends ByteToMessageDecoder {
final public static int MESSAGE_LENGTH = MessageCodecUtil.MESSAGE_LENGTH;
private MessageCodecUtil util = null;
public MessageDecoder(final MessageCodecUtil util) {
this.util = util;
}
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < MessageDecoder.MESSAGE_LENGTH) {
return;
}
in.markReaderIndex();
int messageLength = in.readInt();
if (messageLength < 0) {
ctx.close();
}
if (in.readableBytes() < messageLength) {
in.resetReaderIndex();
} else {
byte[] messageBody = new byte[messageLength];
in.readBytes(messageBody);
try {
Object obj = util.decode(messageBody);
out.add(obj);
} catch (IOException ex) {
Logger.getLogger(MessageDecoder.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
/**
* @filename
* @Description RPC消息序序列化协议类型
* @Author
* @Date
*/
public enum RpcSerializeProtocol {
JDKSERIALIZE("jdknative"), KRYOSERIALIZE("kryo"), HESSIANSERIALIZE("hessian");
private String serializeProtocol;
RpcSerializeProtocol(String serializeProtocol) {
this.serializeProtocol = serializeProtocol;
}
public String toString() {
ReflectionToStringBuilder.setDefaultStyle(ToStringStyle.SHORT_PREFIX_STYLE);
return ReflectionToStringBuilder.toString(this);
}
public String getProtocol() {
return serializeProtocol;
}
}
Hessian序列化与反序列化的具体实现
- Hessian序列化/反序列化
/**
* @filename
* @Description Hessian序列化/反序列化实现
* @Author
* @Date
*/
public class HessianSerialize implements RpcSerialize {
public void serialize(OutputStream output, Object object) {
Hessian2Output ho = new Hessian2Output(output);
try {
ho.startMessage();
ho.writeObject(object);
ho.completeMessage();
ho.close();
output.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public Object deserialize(InputStream input) {
Object result = null;
try {
Hessian2Input hi = new Hessian2Input(input);
hi.startMessage();
result = hi.readObject();
hi.completeMessage();
hi.close();
input.close();
} catch (IOException e) {
e.printStackTrace();
}
return result;
}
}
- Hessian编解码工具类
/**
* @filename
* @Description Hessian编解码工具类
* @Author
* @Date
*/
public class HessianCodecUtil implements MessageCodecUtil {
HessianSerializePool pool = HessianSerializePool.getHessianPoolInstance();
private static Closer closer = Closer.create();
public HessianCodecUtil() {
}
public void encode(final ByteBuf out, final Object message) throws IOException {
try {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
closer.register(byteArrayOutputStream);
HessianSerialize hessianSerialization = pool.borrow();
hessianSerialization.serialize(byteArrayOutputStream, message);
byte[] body = byteArrayOutputStream.toByteArray();
int dataLength = body.length;
out.writeInt(dataLength);
out.writeBytes(body);
pool.restore(hessianSerialization);
} finally {
closer.close();
}
}
public Object decode(byte[] body) throws IOException {
try {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body);
closer.register(byteArrayInputStream);
HessianSerialize hessianSerialization = pool.borrow();
Object object = hessianSerialization.deserialize(byteArrayInputStream);
pool.restore(hessianSerialization);
return object;
} finally {
closer.close();
}
}
}
- 对象的池化
这里并不是将Hessian对象池化,而是将序列化/反序列化工具对象池化,池化用的是commons.pool2,具体使用看这里http://ju.outofmemory.cn/entry/75642
/**
* @filename
* @Description Hessian序列化/反序列化对象工厂池
* @Author
* @Date
*/
public class HessianSerializeFactory extends BasePooledObjectFactory<HessianSerialize> {
public HessianSerialize create() throws Exception {
return createHessian();
}
public PooledObject<HessianSerialize> wrap(HessianSerialize hessian) {
return new DefaultPooledObject<>(hessian);
}
private HessianSerialize createHessian() {
return new HessianSerialize();
}
}
/**
* @filename
* @Description Hessian序列化/反序列化池
* @Author
* @Date
*/
public class HessianSerializePool {
private GenericObjectPool<HessianSerialize> hessianPool;
private static HessianSerializePool poolFactory = null;
private HessianSerializePool() {
hessianPool = new GenericObjectPool<>(new HessianSerializeFactory());
}
public static HessianSerializePool getHessianPoolInstance() {
if (poolFactory == null) {
synchronized (HessianSerializePool.class) {
if (poolFactory == null) {
poolFactory = new HessianSerializePool();
}
}
}
return poolFactory;
}
public HessianSerializePool(final int maxTotal, final int minIdle, final long maxWaitMillis, final long minEvictableIdleTimeMillis) {
hessianPool = new GenericObjectPool<>(new HessianSerializeFactory());
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(maxTotal);
config.setMinIdle(minIdle);
config.setMaxWaitMillis(maxWaitMillis);
config.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
hessianPool.setConfig(config);
}
public HessianSerialize borrow() {
try {
return getHessianPool().borrowObject();
} catch (final Exception ex) {
ex.printStackTrace();
return null;
}
}
public void restore(final HessianSerialize object) {
getHessianPool().returnObject(object);
}
public GenericObjectPool<HessianSerialize> getHessianPool() {
return hessianPool;
}
}
- Hessian编码器
/**
* @filename
* @Description Hessian编码器
* @Author
* @Date
*/
public class HessianEncoder extends MessageEncoder {
public HessianEncoder(MessageCodecUtil util) {
super(util);
}
}
- Hessian解码器
/**
* @filename
* @Description Hessian解码器
* @Author
* @Date
*/
public class HessianDecoder extends MessageDecoder {
public HessianDecoder(MessageCodecUtil util) {
super(util);
}
}
** Kryo的编解码与Hessian的几乎一样,不同之处是Kyro有自己的池化KryoPool **
RPC服务端的实现
- Rpc服务器执行模块(依赖于Spring)
/**
* @filename
* @Description Rpc服务器执行模块
* @Author
* @Date
*/
public class MessageRecvExecutor implements ApplicationContextAware, InitializingBean {
private String serverAddress;
private RpcSerializeProtocol serializeProtocol = RpcSerializeProtocol.JDKSERIALIZE;
private final static String DELIMITER = ":";
private Map<String, Object> handlerMap = new ConcurrentHashMap<>();
private static ListeningExecutorService threadPoolExecutor;
public MessageRecvExecutor(String serverAddress, String serializeProtocol) {
this.serverAddress = serverAddress;
this.serializeProtocol = Enum.valueOf(RpcSerializeProtocol.class, serializeProtocol);
}
/**
* 异步执行,并将结果response返回给调用方
* @param task
* @param ctx
* @param request
* @param response
*/
public static void submit(Callable<Boolean> task, final ChannelHandlerContext ctx, final MessageRequest request, final MessageResponse response) {
if (threadPoolExecutor == null) {
synchronized (MessageRecvExecutor.class) {
if (threadPoolExecutor == null) {
threadPoolExecutor = MoreExecutors.listeningDecorator((ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1));
}
}
}
ListenableFuture<Boolean> listenableFuture = threadPoolExecutor.submit(task);
Futures.addCallback(listenableFuture, new FutureCallback<Boolean>() {
public void onSuccess(Boolean result) {
ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("RPC Server Send message-id respone:" + request.getMessageId());
}
});
}
public void onFailure(Throwable t) {
t.printStackTrace();
}
}, threadPoolExecutor);
}
/**
* 获取接口与实现类的对应
* @param ctx
* @throws BeansException
*/
public void setApplicationContext(ApplicationContext ctx) throws BeansException {
try {
MessageKeyVal keyVal = (MessageKeyVal) ctx.getBean(Class.forName("org.microframe.rpc.model.MessageKeyVal"));
Map<String, Object> rpcServiceObject = keyVal.getMessageKeyVal();
Set s = rpcServiceObject.entrySet();
Iterator<Map.Entry<String, Object>> it = s.iterator();
Map.Entry<String, Object> entry;
while (it.hasNext()) {
entry = it.next();
handlerMap.put(entry.getKey(), entry.getValue());
}
} catch (ClassNotFoundException ex) {
java.util.logging.Logger.getLogger(MessageRecvExecutor.class.getName()).log(Level.SEVERE, null, ex);
}
}
/**
* 经典的netty服务端套路
* @throws Exception
*/
public void afterPropertiesSet() throws Exception {
ThreadFactory threadRpcFactory = new NamedThreadFactory("NettyRPC ThreadFactory");
int parallel = Runtime.getRuntime().availableProcessors() * 2;
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup(parallel, threadRpcFactory, SelectorProvider.provider());
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
.childHandler(new MessageRecvChannelInitializer(handlerMap).buildRpcSerializeProtocol(serializeProtocol)) //通过serializeProtocol选择MessageRecvHandler
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
String[] ipAddr = serverAddress.split(MessageRecvExecutor.DELIMITER);
if (ipAddr.length == 2) {
String host = ipAddr[0];
int port = Integer.parseInt(ipAddr[1]);
ChannelFuture future = bootstrap.bind(host, port).sync();
System.out.printf("Netty RPC Server start success!\nip:%s\nport:%d\nprotocol:%s\n\n", host, port, serializeProtocol);
future.channel().closeFuture().sync();
} else {
System.out.printf("Netty RPC Server start fail!\n");
}
} finally {
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
}
- Rpc服务端管道初始化
/**
* @filename
* @Description Rpc服务器执行模块
* @Author
* @Date
*/
public class MessageRecvChannelInitializer extends ChannelInitializer<SocketChannel> {
private RpcSerializeProtocol protocol;
private RpcRecvSerializeFrame frame = null;
MessageRecvChannelInitializer buildRpcSerializeProtocol(RpcSerializeProtocol protocol) {
this.protocol = protocol;
return this;
}
MessageRecvChannelInitializer(Map<String, Object> handlerMap) {
frame = new RpcRecvSerializeFrame(handlerMap);
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
frame.select(protocol, pipeline);
}
}
- RPC服务端消息序列化协议框架
/**
* @filename
* @Description RPC服务端消息序列化协议框架
* @Author
* @Date
*/
public class RpcRecvSerializeFrame implements RpcSerializeFrame {
private Map<String, Object> handlerMap = null;
public RpcRecvSerializeFrame(Map<String, Object> handlerMap) {
this.handlerMap = handlerMap;
}
public void select(RpcSerializeProtocol protocol, ChannelPipeline pipeline) {
switch (protocol) {
case JDKSERIALIZE: {
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageCodecUtil.MESSAGE_LENGTH, 0, MessageCodecUtil.MESSAGE_LENGTH));
pipeline.addLast(new LengthFieldPrepender(MessageCodecUtil.MESSAGE_LENGTH));
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
pipeline.addLast(new MessageRecvHandler(handlerMap));
break;
}
case KRYOSERIALIZE: {
KryoCodecUtil util = new KryoCodecUtil(KryoPoolFactory.getKryoPoolInstance());
pipeline.addLast(new KryoEncoder(util));
pipeline.addLast(new KryoDecoder(util));
pipeline.addLast(new MessageRecvHandler(handlerMap));
break;
}
case HESSIANSERIALIZE: {
HessianCodecUtil util = new HessianCodecUtil();
pipeline.addLast(new HessianEncoder(util));
pipeline.addLast(new HessianDecoder(util));
pipeline.addLast(new MessageRecvHandler(handlerMap));
break;
}
}
}
}
- Rpc服务器消息处理
/**
* @filename
* @Description Rpc服务器消息处理
* @Author
* @Date
*/
public class MessageRecvHandler extends ChannelInboundHandlerAdapter {
private final Map<String, Object> handlerMap;
public MessageRecvHandler(Map<String, Object> handlerMap) {
this.handlerMap = handlerMap;
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MessageRequest request = (MessageRequest) msg;
MessageResponse response = new MessageResponse();
MessageRecvInitializeTask recvTask = new MessageRecvInitializeTask(request, response, handlerMap);
MessageRecvExecutor.submit(recvTask, ctx, request, response);
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
- Rpc服务器消息线程任务处理
/**
* @filename
* @Description Rpc服务器消息线程任务处理
* @Author
* @Date
*/
public class MessageRecvInitializeTask implements Callable<Boolean> {
private MessageRequest request = null;
private MessageResponse response = null;
private Map<String, Object> handlerMap = null;
private ChannelHandlerContext ctx = null;
public MessageResponse getResponse() {
return response;
}
public MessageRequest getRequest() {
return request;
}
public void setRequest(MessageRequest request) {
this.request = request;
}
MessageRecvInitializeTask(MessageRequest request, MessageResponse response, Map<String, Object> handlerMap) {
this.request = request;
this.response = response;
this.handlerMap = handlerMap;
this.ctx = ctx;
}
public Boolean call() {
response.setMessageId(request.getMessageId());
try {
Object result = reflect(request);
response.setResult(result);
return Boolean.TRUE;
} catch (Throwable t) {
response.setError(t.toString());
t.printStackTrace();
System.err.printf("RPC Server invoke error!\n");
return Boolean.FALSE;
}
}
/**
* 服务端调用本地方法
* @param request
* @return
* @throws Throwable
*/
private Object reflect(MessageRequest request) throws Throwable {
String className = request.getClassName();
Object serviceBean = handlerMap.get(className);
String methodName = request.getMethodName();
Object[] parameters = request.getParameters();
return MethodUtils.invokeMethod(serviceBean, methodName, parameters);
}
}
RPC客户端的实现
- Rpc客户端执行模块 实际上都把任务交给了RpcServerLoader
/**
* @filename
* @Description Rpc客户端执行模块
* @Author
* @Date
*/
public class MessageSendExecutor {
private RpcServerLoader loader = RpcServerLoader.getInstance();
public MessageSendExecutor() {
}
public MessageSendExecutor(String serverAddress, RpcSerializeProtocol serializeProtocol) {
loader.load(serverAddress, serializeProtocol);
}
/**
* 启动客户端netty
* @param serverAddress
* @param serializeProtocol
*/
public void setRpcServerLoader(String serverAddress, RpcSerializeProtocol serializeProtocol) {
loader.load(serverAddress, serializeProtocol);
}
public void stop() {
loader.unLoad();
}
/**
* 动态代理
* @param rpcInterface 必须是一个接口类
* @param <T>
* @return
*/
public static <T> T execute(Class<T> rpcInterface) {
return Reflection.newProxy(rpcInterface, new MessageSendProxy());
}
}
- rpc客户端配置加载
/**
* @filename
* @Description rpc服务器配置加载
* @Author
* @Date
*/
public class RpcServerLoader {
private volatile static RpcServerLoader rpcServerLoader;
private final static String DELIMITER = ":";
private RpcSerializeProtocol serializeProtocol = RpcSerializeProtocol.JDKSERIALIZE;
private final static int parallel = Runtime.getRuntime().availableProcessors() * 2;
private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(parallel);
private static ListeningExecutorService threadPoolExecutor = MoreExecutors.listeningDecorator((ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1));
private MessageSendHandler messageSendHandler = null;
private Lock lock = new ReentrantLock();
private Condition connectStatus = lock.newCondition();
private Condition handlerStatus = lock.newCondition();
private RpcServerLoader() {
}
public static RpcServerLoader getInstance() {
if (rpcServerLoader == null) {
synchronized (RpcServerLoader.class) {
if (rpcServerLoader == null) {
rpcServerLoader = new RpcServerLoader();
}
}
}
return rpcServerLoader;
}
/**
* 客户端启动
* @param serverAddress
* @param serializeProtocol
*/
public void load(String serverAddress, RpcSerializeProtocol serializeProtocol) {
String[] ipAddr = serverAddress.split(RpcServerLoader.DELIMITER);
if (ipAddr.length == 2) {
String host = ipAddr[0];
int port = Integer.parseInt(ipAddr[1]);
final InetSocketAddress remoteAddr = new InetSocketAddress(host, port);
ListenableFuture<Boolean> listenableFuture = threadPoolExecutor.submit(new MessageSendInitializeTask(eventLoopGroup, remoteAddr, serializeProtocol));
Futures.addCallback(listenableFuture, new FutureCallback<Boolean>() {
public void onSuccess(Boolean result) {
try {
lock.lock();
if (messageSendHandler == null) {
handlerStatus.await();
}
if (result == Boolean.TRUE && messageSendHandler != null) {
connectStatus.signalAll();
}
} catch (InterruptedException ex) {
Logger.getLogger(RpcServerLoader.class.getName()).log(Level.SEVERE, null, ex);
} finally {
lock.unlock();
}
}
public void onFailure(Throwable t) {
t.printStackTrace();
}
}, threadPoolExecutor);
}
}
public void setMessageSendHandler(MessageSendHandler messageInHandler) {
try {
lock.lock();
this.messageSendHandler = messageInHandler;
handlerStatus.signal();
} finally {
lock.unlock();
}
}
public MessageSendHandler getMessageSendHandler() throws InterruptedException {
try {
lock.lock();
if (messageSendHandler == null) {
connectStatus.await();
}
return messageSendHandler;
} finally {
lock.unlock();
}
}
public void unLoad() {
messageSendHandler.close();
threadPoolExecutor.shutdown();
eventLoopGroup.shutdownGracefully();
}
public void setSerializeProtocol(RpcSerializeProtocol serializeProtocol) {
this.serializeProtocol = serializeProtocol;
}
}
- Rpc客户端线程任务处理
/**
* @filename
* @Description Rpc客户端线程任务处理
* @Author
* @Date
*/
public class MessageSendInitializeTask implements Callable<Boolean> {
private EventLoopGroup eventLoopGroup = null;
private InetSocketAddress serverAddress = null;
private RpcSerializeProtocol protocol;
MessageSendInitializeTask(EventLoopGroup eventLoopGroup, InetSocketAddress serverAddress, RpcSerializeProtocol protocol) {
this.eventLoopGroup = eventLoopGroup;
this.serverAddress = serverAddress;
this.protocol = protocol;
}
@Override
public Boolean call() {
Bootstrap b = new Bootstrap();
b.group(eventLoopGroup)
.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new MessageSendChannelInitializer().buildRpcSerializeProtocol(protocol));
ChannelFuture channelFuture = b.connect(serverAddress);
channelFuture.addListener(new ChannelFutureListener() {
public void operationComplete(final ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
MessageSendHandler handler = channelFuture.channel().pipeline().get(MessageSendHandler.class);
RpcServerLoader.getInstance().setMessageSendHandler(handler);
}
}
});
return Boolean.TRUE;
}
}
- Rpc客户端管道初始化
/**
* @filename
* @Description Rpc客户端管道初始化
* @Author
* @Date
*/
public class MessageSendChannelInitializer extends ChannelInitializer<SocketChannel> {
private RpcSerializeProtocol protocol;
private RpcSendSerializeFrame frame = new RpcSendSerializeFrame();
MessageSendChannelInitializer buildRpcSerializeProtocol(RpcSerializeProtocol protocol) {
this.protocol = protocol;
return this;
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
frame.select(protocol, pipeline);
}
}
- RPC客户端消息序列化协议
/**
* @filename
* @Description RPC客户端消息序列化协议框架
* @Author
* @Date
*/
public class RpcSendSerializeFrame implements RpcSerializeFrame {
public void select(RpcSerializeProtocol protocol, ChannelPipeline pipeline) {
switch (protocol) {
case JDKSERIALIZE: {
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageCodecUtil.MESSAGE_LENGTH, 0, MessageCodecUtil.MESSAGE_LENGTH));
pipeline.addLast(new LengthFieldPrepender(MessageCodecUtil.MESSAGE_LENGTH));
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
pipeline.addLast(new MessageSendHandler());
break;
}
case KRYOSERIALIZE: {
KryoCodecUtil util = new KryoCodecUtil(KryoPoolFactory.getKryoPoolInstance());
pipeline.addLast(new KryoEncoder(util));
pipeline.addLast(new KryoDecoder(util));
pipeline.addLast(new MessageSendHandler());
break;
}
case HESSIANSERIALIZE: {
HessianCodecUtil util = new HessianCodecUtil();
pipeline.addLast(new HessianEncoder(util));
pipeline.addLast(new HessianDecoder(util));
pipeline.addLast(new MessageSendHandler());
break;
}
}
}
}
- Rpc消息回调
/**
* @filename
* @Description Rpc消息回调
* @Author
* @Date
*/
public class MessageCallBack {
private MessageRequest request;
private MessageResponse response;
private Lock lock = new ReentrantLock();
private Condition finish = lock.newCondition();
public MessageCallBack(MessageRequest request) {
this.request = request;
}
public Object start() throws InterruptedException {
try {
lock.lock();
finish.await(10*1000, TimeUnit.MILLISECONDS);
if (this.response != null) {
return this.response.getResult();
} else {
return null;
}
} finally {
lock.unlock();
}
}
public void over(MessageResponse reponse) {
try {
lock.lock();
finish.signal();
this.response = reponse;
} finally {
lock.unlock();
}
}
}
- Rpc客户端代理
** 然而,只有当代理开始执行方法时,客户端才会发送request,然后异步等待执行结果 **
/**
* @filename
* @Description Rpc客户端代理
* @Author
* @Date
*/
public class MessageSendProxy extends AbstractInvocationHandler {
public Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable {
MessageRequest request = new MessageRequest();
request.setMessageId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setTypeParameters(method.getParameterTypes());
request.setParameters(args);
MessageSendHandler handler = RpcServerLoader.getInstance().getMessageSendHandler();
//将request序列化发送到netty server端并异步获取结果
MessageCallBack callBack = handler.sendRequest(request);
return callBack.start();
}
}
网友评论