一、什么是RPC(Remote Procedure Call)
RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的方法,由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。
1.1 RPC 框架组件
- User 客户端
- User-stub 客户端存根
- RPCRuntime RPC 通信组件
- Server-stub 服务端存根
- Server 服务端
1.2 RPC 工作原理

- Client像调用本地服务似的调用远程服务;
- Client stub接收到调用后,将方法、参数序列化
- 客户端通过sockets将消息发送到服务端
- Server stub 收到消息后进行解码(将消息对象反序列化)
- Server stub 根据解码结果调用本地的服
- 本地服务执行(对于服务端来说是本地执行)并将结果返回给Server stub
- Server stub将返回结果打包成消息(将结果消息对象序列化)
- 服务端通过sockets将消息发送到客户端
- Client stub接收到结果消息,并进行解码(将结果消息反序列化)
- 客户端得到最终结果。
注意: RPC 调用分以下两种:
- 同步调用:客户方等待调用执行完成并返回结果。
- 异步调用:客户方调用后不用等待执行结果返回,但依然可以通过回调通知等方式获取返回结果。若客户方不关心调用返回结果,则变成单向异步调用,单向调用不用返回结果。
1.3 rpc 可用的序列化协议方案
- jdk 的序列化方法。(不推荐,不利于之后的跨语言调用)
- json 可读性强,但是序列化速度慢,体积大。
- protobuf
- kyro
- Hessian
1.4 rpc 动态代理方案
- jdk 动态代理
- cglib 动态代理
- javassist 动态代理
- ASM 字节码
- javassist 字节码
二、有那些RPC 框架的实现
- Spring Cloud
- Dubbo
- Thrift
- Rabbitmq RPC
三、手动实现一个RPC调用
这里采用简单的json 序列化方式,使用socket 寻址通信
3.1 对解码编码的包装
//编码 包装需要调用服务端的方法参数@Datapublic class RpcRequest implements Serializable {private static final long serialVersionUID = 1L;// 需要请求的类名private String className;// 需求请求的方法名private String methodName;// 请求方法的参数类型private Class<?>[] paramTypes;// 请求的参数值private Object[] params;}//解码 服务端响应结果包装@Datapublic class RpcResponse implements Serializable {private static final long serialVersionUID = 1L;// 可能抛出的异常private Throwable error;// 响应的内容或结果private Object result;}
3.2 客户端stub 代理调用服务端(JDK代理)
public class RpcServiceHandler implements InvocationHandler {private String host; // 服务端地址private int port; // 服务端口号public RpcServiceHandler(String host, int port) {this.host = host;this.port = port;}/*** 动态代理做的事情,接口的实现不在本地,在网络中的其他进程中,我们通过实现了Rpc客户端的对象来发起远程服务的调用。*/@Overridepublic Object invoke(Object obj, Method method, Object[] params) throws Throwable {// 调用前System.out.println("执行远程方法前,可以做些事情");// 封装参数,类似于序列化的过程RpcRequest request = new RpcRequest();request.setClassName(method.getDeclaringClass().getName());request.setMethodName(method.getName());request.setParamTypes(method.getParameterTypes());request.setParams(params);// 连接服务器调用服务Object rst = execute(request, method);// 调用后System.out.println("执行远程方法后,也可以做些事情");return rst;}private Object execute(RpcRequest request, Method method) throws Throwable {// 打开远端服务连接Socket server = new Socket(host, port);OutputStream out = null;InputStream in = null;try {// 1. 服务端输出流,写入请求数据,发送请求数据out = server.getOutputStream();IoUtil.writeUtf8(out, false, JSONUtil.toJsonStr(request));IoUtil.flush(out);//告知服务端已写入完成server.shutdownOutput();// 2. 服务端输入流,获取返回数据,转换参数类型// 类似于反序列化的过程in = server.getInputStream();String json = IoUtil.read(in, "utf-8");RpcResponse response = JSONUtil.toBean(json, new TypeReference<RpcResponse>() {}, true);// 3. 返回服务端响应结果if (response.getError() != null) { // 服务器产生异常throw response.getError();}return JSONUtil.toBean((JSON) response.getResult(), new TypeReference<Object>() {@Overridepublic Type getType() {return TypeUtil.getReturnType(method);}}, true);} finally {IoUtil.close(in);IoUtil.close(out);IoUtil.close(server);}}}
3.3 服务端stub
public class RpcProvider {/*** 线程池*/private static final ExecutorService executorService = new ThreadPoolExecutor(10,10,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());/*** rpc 提供端,暴露服务*/public static <T> void provider(final T service, int port) throws IOException {//创建服务端的套接字,绑定端口portServerSocket serverSocket = new ServerSocket(port);while (true) {//开始接收客户端的消息,并以此创建套接字final Socket socket = serverSocket.accept();executorService.execute(new Handler(socket, service));}}/*** 响应调用端** @param oout* @param result*/private static void response(OutputStream oout, Object result, Throwable throwable) {RpcResponse response = new RpcResponse();response.setResult(result);response.setError(throwable);IoUtil.writeUtf8(oout, false, JSONUtil.toJsonStr(response));IoUtil.flush(oout);System.out.println(Thread.currentThread().getName() + "=====> 响应结果" + response);}public static class Handler<T> implements Runnable {private final Socket socket;private T service;public Handler(Socket socket, T service) {this.socket = socket;this.service = service;}@Overridepublic void run() {final Socket client = socket;//创建呢一个对内传输的对象流,并绑定套接字RpcRequest request = null;InputStream in;try {// 1. 获取流以待操作in = client.getInputStream();String json = IoUtil.read(in, "utf-8");System.out.println(Thread.currentThread().getName() + "<===== 接受rpc 调用端请求" + json);//2读取参数request = JSONUtil.toBean(json, RpcRequest.class);// 3. 执行服务方法, 返回结果Class<?> clazz = service.getClass();Method method = clazz.getMethod(request.getMethodName(), request.getParamTypes());Object result = method.invoke(service, request.getParams());System.out.println(Thread.currentThread().getName() + "<===== 处理结果" + result);// 4. 返回RPC响应,序列化RpcResponseresponse(client.getOutputStream(), result, null);} catch (Exception e) {try { //异常处理if (client.getOutputStream() != null) {response(client.getOutputStream(), null, e);}} catch (Exception e1) {e1.printStackTrace();}} finally {IoUtil.close(client);}}}}
3.4 测试
服务端启动 ```java public class RpcProviderApp {
public static void main(String[] args) throws IOException {
//实例化服务StudentService studentService = new StudentServiceImpl();//暴露服务RpcProvider.provider(studentService, 9999);
}
}
- 客户端启动```java//客户端启动public class RpcConsumerApp {public static void main(String[] args) {StudentService studentService = RpcServiceFactory.create(StudentService.class, "127.0.0.1", 9999);System.out.println(studentService.getInfo());}}
项目地址
https://github.com/h-dj/Spring-Learning
