先实现服务端代码
1.rpc服务端框架实现
package com.zhang.frame;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class RpcServerFrame {
//线程池,用来处理客户端socket
ThreadFactory threadFactory = new ThreadFactory() {
private AtomicInteger atomicInteger = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "thread" + atomicInteger.get());
}
};
private ExecutorService executorService = new ThreadPoolExecutor(10, 15, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(1024), threadFactory, new ThreadPoolExecutor.AbortPolicy()
);
//服务注册中心
private static final Map<String, Class> serverHolder = new HashMap<>();
private int port;
public RpcServerFrame(int port) {
this.port = port;
}
//注册服务
public void registServer(String className, Class impl) {
serverHolder.put(className, impl);
}
//处理服务请求任务
private static class ServerTask implements Runnable {
private Socket socket;
public ServerTask(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try (ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream())
) {
//方法所在类的接口名
String serviceName = inputStream.readUTF();
//方法名
String methodName = inputStream.readUTF();
//参数类型
Class<?>[] classes = (Class<?>[]) inputStream.readObject();
//参数值
String[] args = (String[]) inputStream.readObject();
//从注册中心取出方法类
Class serviceClass = serverHolder.get(serviceName);
//找不到,报类找不到异常
if (serviceClass == null) {
throw new ClassNotFoundException(serviceName + " not found");
}
//根据方法名获取method
Method method = serviceClass.getMethod(methodName, classes);
//反射进行方法调用
Object obj = method.invoke(serviceClass.newInstance(), args);
outputStream.writeObject(obj);
outputStream.flush();
} catch (Exception e) {
e.printStackTrace();
}
}
}
//启动rpc服务
public void startService() throws IOException {
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(port));
System.out.println("RPC server on:" + port + " run");
try {
while (true) {
executorService.execute(new ServerTask(serverSocket.accept()));
}
} finally {
serverSocket.close();
}
}
}
2.创建服务端service接口与实体,供客户端调用
public interface UserService {
User getUser();
}
public class UserServiceImpl implements UserService {
@Override
public User getUser() {
User user=new User("张三",18);
return user;
}
}
public class User implements Serializable {
private final String username;
private final Integer age;
public String getUsername() {
return username;
}
public Integer getAge() {
return age;
}
public User(String username, Integer age) {
this.username = username;
this.age = age;
}
}
3.服务端启动
public class UserRpcServer {
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
RpcServerFrame userServer = new RpcServerFrame(8001);
userServer.registServer(UserService.class.getName(), UserServiceImpl.class);
userServer.startService();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}
客户端代码实现
1.客户端rpc框架实现
package com.zhang.frame;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;
public class RpcClientFrame {
//获取远程代理对象方法
public static <T> T getRemoteProxyObj(Class<?> serviceInterface, String host, int port) {
return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface},
new DynProxy(serviceInterface, new InetSocketAddress(host, port)));
}
//代理类
private static class DynProxy implements InvocationHandler {
private Class<?> serviceInterface;
private InetSocketAddress socketAddress;
public DynProxy(Class<?> serviceInterface, InetSocketAddress socketAddress) {
this.serviceInterface = serviceInterface;
this.socketAddress = socketAddress;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket = null;
ObjectOutputStream outputStream = null;
ObjectInputStream inputStream = null;
try {
socket = new Socket();
socket.connect(socketAddress);
outputStream = new ObjectOutputStream(socket.getOutputStream());
//写入接口名
outputStream.writeUTF(serviceInterface.getName());
//写入方法名
outputStream.writeUTF(method.getName());
//写入参数类型
outputStream.writeObject(method.getParameterTypes());
//写入参数
outputStream.writeObject(args);
outputStream.flush();
inputStream = new ObjectInputStream(socket.getInputStream());
return inputStream.readObject();
} finally {
if (socket != null) {
socket.close();
}
if (outputStream != null) {
outputStream.close();
}
if (inputStream != null) {
inputStream.close();
}
}
}
}
}
2.客户端加入需要调用的服务端接口(反射)和实体(反序列化)
public interface UserService {
public User getUser();
}
public class User implements Serializable {
private final String username;
private final Integer age;
public String getUsername() {
return username;
}
public Integer getAge() {
return age;
}
public User(String username, Integer age) {
this.username = username;
this.age = age;
}
}
3.客户端调用代码
package com.zhang;
import com.zhang.frame.RpcClientFrame;
import com.zhang.service.UserService;
public class UserRpcClient {
public static void main(String[] args) {
UserService userService=RpcClientFrame.getRemoteProxyObj(UserService.class,"127.0.0.1",8001);
System.out.println(userService.getUser().getUsername()+"------"+userService.getUser().getAge());
}
}
大功告成,下一步要将注册中心提出来!
网友评论