动手实现RPC
最近利用业于时间实现一个了一个简简单单的 RPC demo,在这里过程中,遇到了几个问题,也收获一些东西,分享一下这个过程.
前期准备
RPC 是我们在企业开发中比较常见的,同时也是比较熟悉的,但是对于开发一个 RPC 来说,我们需要掌握一些基本的理论知识.
1、RPC 是网络通信(进程之间的通信),所以相比单进程的通信来的慢
2、网络传输的是 字节 ,而不是 字符,或者其他的传输媒介.
3、了解or掌握了 动态代理的运用。
本质是传输信息,将 我(消费端) 所知的信息通过通信告诉你 (服务端) ,然后你给我返回最终的信息.
动手实现
有了上述的前期准备之后,先剖析一下我们是如何使用 RPC ,服务提供端需要将服务暴露出去,服务消费端需要接入需要的服务。这个过程如下所示,我的讲述也围绕这个过程展开.
服务端暴露服务 —> 消费端引用服务 —> 动态代理 —> 序列化 —> 网络请求 —-> 服务端处理 —> 序列化返回 —> 消费端返回结果.
服务暴露
服务暴露就是将服务发布到(zookeeper)上,提供给其他的消费者访问,当消费者拿到这个信息之后,就可以连接服务端,并发起请求。
暴露就是需要将该服务的基本信息发布出去,这里列举的主要信息有 服务机器host 服务监听端口 服务接口 服务序列化方式 服务权重 服务版本 等
代码如下
1、服务暴露的信息
public class Provider implements Serializable, Cloneable {private String serviceName;private String host;private Integer port;private String version;private Integer weight;private String serialization;get and set}
2、暴露服务的过程,这里以 zookeeper 为例,主要是将服务以节点的形式添加到 zk 上
@Overridepublic void registerService(List<Provider> providerList) {assert zkClient != null;providerList.parallelStream().forEach(provider -> {String host = provider.getHost();Integer port = provider.getPort();String serviceName = provider.getServiceName();String version = provider.getVersion();Integer weight = provider.getWeight();String serverPath = root_path + "/" + serviceName + root_provider;if (!zkClient.exists(serverPath)) {zkClient.createPersistent(serverPath, true);}String finalInfo = host + split + port + split + serviceName + split + version + split + weight + split + provider.getSerialization();String path = serverPath + "/" + finalInfo;if (!zkClient.exists(path)) {log.info("注册服务:{}到ZooKeeper", serverPath);zkClient.createEphemeral(path);} else {log.warn("服务:{}已被注册", serverPath);}});}
然后可以通过 zkCli 命令查看zk上服务的状况.
[zk: localhost:2181(CONNECTED) 0] ls /fuck/top.huzhurong.fuck.UserService/provider[]
消费端引用服务和动态代理
消费端引用服务端方式大多数还是通过 Spring 的自定义标签引入.使用方式如下.
<fuck:reference id="test" interface="top.huzhurong.fuck.UserService" version="0.0.1"/>
这样就引用了 top.huzhurong.fuck.UserService 这个版本为 0.0.1 的服务. 在使用Spring的时候,我们需要通过使用动态代理去走网络,调用服务上的接口,那么就不可能想普通的 bean
一样去配置,一般说来都是通过 FactoryBean 来进行配置,因为在 getObject 中可以进行 bean的定制化(大部分的框架也是FactoryBean引入的)。
FactoryBean的使用方式如下
public class ProxyBean implements FactoryBean, InitializingBean {private Class name;private Object object;@Overridepublic Object getObject() {return object;}@Overridepublic Class<?> getObjectType() {return name;}@Overridepublic void afterPropertiesSet() {this.build();}private void build() {//动态代理产生一个代理beanthis.object = Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{this.name}, (proxy, method, args) -> {//在invoke中定制我们的服务,可以走tcp/http等等if (method.getName().equalsIgnoreCase("name")) {return "调用name方法";}if (method.getName().equalsIgnoreCase("toString")) {return "调用toString方法";}return "111";});}}
这里只是一个简单的说明服务引用的过程,具体的过程如下
- 注册消费节点到zookeeper
- 获取服务者列表
- 订阅该服务接口
- 搭建动态代理
篇幅有限,我就介绍下搭建动态代理,其他三个还挺简单的。
Object object = Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{Class.forName(this.interfaceName)}, new FuckRpcInvocationHandler(this));class FuckRpcInvocationHandler implements InvocationHandler {//字段//构造方法@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {//获取服务列表List<Provider> all = ProviderSet.getAll(this.className);//软负载均衡Provider provider = loadBalance.getProvider(all);String info = provider.buildIfno();SocketChannel channel = ChannelMap.get(info);if (channel == null) {//建立TCP连接Client client = new NettyClient(provider, this.serialization);client.connect(provider.getHost(), provider.getPort());channel = ChannelMap.get(info);}SocketChannel finalChannel = channel;Future<Response> submit = TempResultSet.executorService.submit(() -> {//请求写入tcp通道finalChannel.writeAndFlush(request);//这里可以改造成 CountDownLatch,可以比 ;; 循环要好for (; ; ) {Response response = TempResultSet.get(request.getRequestId());if (response != null) {return response;}}});Response response = submit.get(this.timeout, TimeUnit.SECONDS);}}
动态代理做的主要是,获取服务列表,软件负载处理,建立tcp连接,通信这几步.
动态代理仅仅是实际调用的时候才会进入invoke方法,实例化不进入,每一次代理调用都会进入
invoke
序列化和网络请求
序列化网上都文章很多,我也只是支持了 protostuff 和 jdk 序列化方式. 本身很难,都是封装之后还是ok的,重点介绍一下网络传输的处理
当解决完序列化之后,就是网络传输了,都知道网络传输的是字节,但是怎么去使用网络进行透明传输我们都很头痛,而netty在使用上降低了我们的入门难度,其简单的api可以快速的进行上手,如果你还没有通过,可以看看netty的example就可以动手写了。
在网络传输部分的处理中,一个是协议的处理,一个是如何将处理完的请求结果赋值给正确的请求对象.
1、协议的处理也非常简单,就是一个常规的length + data , 作为自定义协议,这种处理可以让我们快速的处理而不会纠结于协议的正确性
@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) {if (byteBuf.readableBytes() <= HEAD_LENGTH) {return;}byteBuf.markReaderIndex();//标记位置int dataLength = byteBuf.readInt();if (byteBuf.readableBytes() < dataLength) {byteBuf.resetReaderIndex();//可读取的数据不够return;}byte[] dataArray = new byte[dataLength];byteBuf.readBytes(dataArray);Request request = serialization.deSerialize(dataArray, Request.class);if (log.isDebugEnabled()) {log.debug("接受到消费者请求:{},请求内容:{}", ctx.channel().toString(), request);}list.add(request);}
2、如何将处理结果返回给正确的请求对象,我们都知道在执行请求的时候都需要一个 RI --> requestId 去标示这个请求,在我们的rpc当中也是如此,确保请求和响应的是一家人
public class Request implements Serializable {private String requestId;//请求标示private String serviceName;//服务名称private String methodName;//方法名,Method不能被序列化private Class<?>[] parameters;//参数类型,用于获取对于的执行方法private Object[] args;//实际参数}
3、服务处理,可以选择新建立线程池而不是直接使用 netty 的work io 线程池。
@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable) {if (serializable instanceof Request) {Request request = (Request) serializable;responseTask.execute(new ResponseTask(request, channelHandlerContext, this.applicationContext));}}@Overridepublic void run() {String serviceName = request.getServiceName();String methodName = request.getMethodName();Class<?>[] parameters = request.getParameters();Object[] args = request.getArgs();Response response = new Response();response.setRequestId(request.getRequestId());response.setSuccess(false);try {//获取服务Object service = ServiceCache.getService(serviceName);if (service == null) {Class<?> aClass = ClassUtils.forName(serviceName, ClassUtils.getDefaultClassLoader());service = applicationContext.getBean(aClass);ServiceCache.put(serviceName, service);}//获取方法Method method = service.getClass().getDeclaredMethod(methodName, parameters);Object invoke = method.invoke(service, args);System.out.println("invoke:" + invoke);response.setSuccess(true);response.setObject(invoke);} catch (ClassNotFoundException | IllegalAccessException e) {response.setException(e);} catch (NoSuchMethodException e) {e.printStackTrace();response.setException(e);} catch (InvocationTargetException e) {e.printStackTrace();response.setException(e.getTargetException());}//写入channel中channelHandlerContext.writeAndFlush(response);}
消费端返回结果
当服务端写入数据的时候,如果网络通畅,基本上一下就到了客户端,我们的处理也很简单,就是放到一个Map中,而消费线程一直在map中找这个数据
@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable) {if (serializable instanceof Response) {Response response = (Response) serializable;//写入mapTempResultSet.put(response.getRequestId(), response);}}//循环写入for (; ; ) {Response response = TempResultSet.get(request.getRequestId());if (response != null) {return response;}}
到这里一个简单的rpc调用就可以起来了。
小结
一个简单的 rpc 就新鲜出炉了,但是其实还有很多可以改造的点,例如拦截(责任链+SPI),例如观察者模式的运用,不过也是从中学习到了一个RPC基本的使用方法,还需要深入到了解线程池使用,这里的使用也是很粗制滥造的。不过能在几天之内写完还是很开心,项目在这里 ——> fuck-rpc 的简单实现,你也可以试试哦
2018-12-05
