一、RPC 基本介绍
- RPC(Remote Procedure Call)—远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程
- 两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样(如图 RPC标准流程)

过程:
- 调用者(Caller),调用远程API(Remote API)
- 调用远程API会通过一个RPC代理(RpcProxy)
- RPC代理再去调用RpcInvoker(这个是PRC的调用者)
- RpcInvoker通过RPC连接器(RpcConnector)
- RPC连接器用两台机器规定好的PRC协议(RpcProtocol)把数据进行编码
- 接着RPC连接器通过RpcChannel通道发送到对方的PRC接收器(RpcAcceptor)
- PRC接收器通过PRC协议进行解码拿到数据
- 然后将数据传给RpcProcessor
- RpcProcessor再传给RpcInvoker
- RpcInvoker调用Remote API
- 最后推给 被调用者(Callee)
- 常见的 RPC 框架有:比较知名的如阿里的 Dubbo、Google 的 gRPC、Go 语言的 rpcx、Apache 的 thrift,Spring 旗下的 SpringCloud。
二、RPC调用流程落地常用实现


一次完整的RPC调用流程(同步)如下:
1、消费方调用 以本地调用 的方式调用服务。(本地调用的方式例如dubbo调用的本地jar 包)
2、client stub 接收到调用方后,负责将方法、参数等组装成能够进行网络传输的消息体(dubbo 就是序列化)
3、client stub 找到服务器地址,并将消息发送到服务器端;
4、server stub 收到消息后进行解码;
5、server stub 根据解码结果调用本地服务;
6、本地服务执行并将结果返回给server stub;
7、server stub 将返回结果打包成消息并发送给消费方
8、client stub接收到消息,并进行解码
9、服务消费方得到最终结果。
RPC框架的目标就是实现2~8这些步骤,将其封装起来,这些细节对用户来说是透明的,不可见的。
stub 可以理解为是一个代理对象
三、自己实现 dubbo RPC(基于Netty)
3.1 需求说明
- Dubbo 底层使用了 Netty 作为网络通讯框架,要求用 Netty 实现一个简单的 RPC 框架
模仿 Dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用 Netty 4.1.25.Final
3.2 设计说明
创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
- 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
- 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 请求提供者返回数据
- 开发的分析图
3.3 代码实现
3.3.1 封装的RPC部分
NettyClient
public class NettyClient {/*** 创建线程池*/private static final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());private static NettyClientHandler clientHandler;// 编写方法使用代理模式,获取一个代理对象public Object getBean(final Class<?> serviceClass, final String providerName) {return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{serviceClass}, new InvocationHandler() {// 获取方法的执行结果@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 每调用一次hello,就会进入到该代码if (clientHandler == null) {initClient();}// 设置要发给服务器端的信息 providerName就是协议头(HelloService#hello#),arg[0]就是hello方法的参数clientHandler.setParam(providerName + args[0]);// 返回一个代理对象return executor.submit(clientHandler).get();}});}// 初始化客户端private static void initClient() {clientHandler = new NettyClientHandler();NioEventLoopGroup group = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(clientHandler);}});try {ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8989).sync();// 不能关闭// channelFuture.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();}}}
NettyClientHandler
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {private ChannelHandlerContext context; // 上下文private String result; // 返回的结果private String param; // 客户端调用方法时,传入参数/*** (1)* 与服务器的链接创建后,就会被调用,这个方法被第一个调用** @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("NettyClientHandler channelActive 被调用");context = ctx; // 在其他方法会使用到 ctx}/*** (4)* 收到服务器的数据后,调用方法** @param ctx* @param msg* @throws Exception*/@Overridepublic synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("NettyClientHandler channelRead 被调用");result = msg.toString();notify(); // 唤醒等待的线程}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("NettyClientHandler exceptionCaught 被调用");cause.printStackTrace();ctx.close();}/*** (3)--- 被唤醒后 --> (5)* 被 代理对象调用,发送数据给服务器,--> wait --> 等待被唤醒(channelRead) --> 返回结果** @return* @throws Exception*/@Overridepublic synchronized Object call() throws Exception {System.out.println("NettyClientHandler call(1) 被调用");context.writeAndFlush(param);// 等待 channelRead 方法获取到服务器等结果后,唤醒wait();// 服务器返回的结果System.out.println("NettyClientHandler call(2) 被调用");return result;}/*** (2)** @param param*/void setParam(String param) {System.out.println("NettyClientHandler setParam 被调用");this.param = param;}}
NettyServer
public class NettyServer {public static void startServer(String hostName, int port) {startServer0(hostName, port);}// 编写一个方法,完成对 NettyServer 的初始化和启动private static void startServer0(String hostname, int port) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());// 业务处理器pipeline.addLast(new NettyServerHandler());}});ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync();System.out.println("服务提供方开始 提供服务........");channelFuture.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}
NettyServerHandler
public class NettyServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 获取客户端发送的消息,并调用服务System.out.println("msg" + msg);// 客户端在调用服务器的 api 时,我们需要定义一个协议// 比如我们要求 每次发消息都必须以某个字符串开头 "HelloService#hello#"if(msg.toString().startsWith("HelloService#hello#")){// 服务端调用 服务端的方法String response = (new HelloServiceImpl()).hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));ctx.writeAndFlush(response);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {super.exceptionCaught(ctx, cause);}}
3.3.2 接口
HelloService
/*** @description: 这个接口是服务提供方和服务消费方都需要的* @Author: wangchao* @Date: 2022/1/6*/public interface HelloService {String hello(String msg);}
3.3.3 提供端(provider)
HelloServiceImpl
public class HelloServiceImpl implements HelloService {// 验证每次调用的是不是用一个 serviceint count = 0;// 当有消费方调用该方法时,就返回一个结果@Overridepublic String hello(String msg) {System.out.println("收到客户端消息=" + msg);// 根据 msg 返回不同的结果if (msg != null) {return "你好客户端,我已经收到你的消息[" + msg + "] 第" + (++count) + "次";} else {return "你好客户端,我已经收到你的消息 ";}}}
ServerBootstrap
/*** @description: 会启用一个服务的提供者,就是NettyServer* @Author: wangchao* @Date: 2022/1/6*/public class ServerBootstrap {public static void main(String[] args) {NettyServer.startServer("127.0.0.1",8989);}}
3.3.4 消费端(customer)
ClientBootstrap
public class ClientBootstrap {public static final String providerName = "HelloService#hello#";public static void main(String[] args) throws InterruptedException {// 创建一个消费者NettyClient customer = new NettyClient();// 创建代理对象HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);for (; ; ) {// 验证每次调用的是不是用一个 service// 通过代理对象调用服务提供者的方法String hello = service.hello("你好 dubbo~");System.out.println("调用结果 res=" + hello);Thread.sleep(10 * 1000);}}}
3.3.5 效果
调用方
提供方
