一.NacosPropertySourceLocator

  1. 当前类实现了PropertySourceLocator,主要用于配置文件的拓展,从配置中心读取配置
  1. private void loadApplicationConfiguration(CompositePropertySource compositePropertySource, String dataIdPrefix, NacosConfigProperties properties, Environment environment) {
  2. String fileExtension = properties.getFileExtension();
  3. String nacosGroup = properties.getGroup();
  4. //1.1读取配置
  5. this.loadNacosDataIfPresent(compositePropertySource, dataIdPrefix, nacosGroup, fileExtension, true);
  6. this.loadNacosDataIfPresent(compositePropertySource, dataIdPrefix + "." + fileExtension, nacosGroup, fileExtension, true);
  7. String[] var7 = environment.getActiveProfiles();
  8. int var8 = var7.length;
  9. for(int var9 = 0; var9 < var8; ++var9) {
  10. String profile = var7[var9];
  11. String dataId = dataIdPrefix + "-" + profile + "." + fileExtension;
  12. this.loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup, fileExtension, true);
  13. }
  14. }

二.RpcClient

  1. 在获取客户端配置的时候会启动Rpc客户端
  1. public final void start() throws NacosException {
  2. //将当前客户端跟改为启动状态
  3. boolean success = this.rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
  4. if (success) {
  5. //创建定时任务线程池(包含两个守护线程)
  6. this.clientEventExecutor = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
  7. public Thread newThread(Runnable r) {
  8. Thread t = new Thread(r);
  9. t.setName("com.alibaba.nacos.client.remote.worker");
  10. t.setDaemon(true);
  11. return t;
  12. }
  13. });
  14. this.clientEventExecutor.submit(new Runnable() {
  15. public void run() {
  16. while(!RpcClient.this.clientEventExecutor.isTerminated() && !RpcClient.this.clientEventExecutor.isShutdown()) {
  17. RpcClient.ConnectionEvent take = null;
  18. try {
  19. take = (RpcClient.ConnectionEvent)RpcClient.this.eventLinkedBlockingQueue.take();
  20. if (take.isConnected()) {
  21. RpcClient.this.notifyConnected();
  22. } else if (take.isDisConnected()) {
  23. RpcClient.this.notifyDisConnected();
  24. }
  25. } catch (Throwable var3) {
  26. }
  27. }
  28. }
  29. });
  30. this.clientEventExecutor.submit(new Runnable() {
  31. public void run() {
  32. while(true) {
  33. while(true) {
  34. while(true) {
  35. while(true) {
  36. while(true) {
  37. try {
  38. RpcClient.ReconnectContext reconnectContext;
  39. boolean serverExist;
  40. label69: {
  41. if (!RpcClient.this.isShutdown()) {
  42. reconnectContext = (RpcClient.ReconnectContext)RpcClient.this.reconnectionSignal.poll(RpcClient.this.keepAliveTime, TimeUnit.MILLISECONDS);
  43. if (reconnectContext != null) {
  44. break label69;
  45. }
  46. if (System.currentTimeMillis() - RpcClient.this.lastActiveTimeStamp < RpcClient.this.keepAliveTime) {
  47. continue;
  48. }
  49. serverExist = RpcClient.this.healthCheck();
  50. if (serverExist) {
  51. RpcClient.this.lastActiveTimeStamp = System.currentTimeMillis();
  52. continue;
  53. }
  54. if (RpcClient.this.currentConnection == null) {
  55. continue;
  56. }
  57. LoggerUtils.printIfInfoEnabled(RpcClient.LOGGER, "[{}]Server healthy check fail,currentConnection={}", new Object[]{RpcClient.this.name, RpcClient.this.currentConnection.getConnectionId()});
  58. RpcClientStatus rpcClientStatus = (RpcClientStatus)RpcClient.this.rpcClientStatus.get();
  59. if (!RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
  60. boolean success = RpcClient.this.rpcClientStatus.compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
  61. if (!success) {
  62. continue;
  63. }
  64. reconnectContext = RpcClient.this.new ReconnectContext((RpcClient.ServerInfo)null, false);
  65. break label69;
  66. }
  67. }
  68. return;
  69. }
  70. if (reconnectContext.serverInfo != null) {
  71. serverExist = false;
  72. Iterator var7 = RpcClient.this.getServerListFactory().getServerList().iterator();
  73. while(var7.hasNext()) {
  74. String server = (String)var7.next();
  75. RpcClient.ServerInfo serverInfo = RpcClient.this.resolveServerInfo(server);
  76. if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {
  77. serverExist = true;
  78. reconnectContext.serverInfo.serverPort = serverInfo.serverPort;
  79. break;
  80. }
  81. }
  82. if (!serverExist) {
  83. LoggerUtils.printIfInfoEnabled(RpcClient.LOGGER, "[{}] Recommend server is not in server list ,ignore recommend server {}", new Object[]{RpcClient.this.name, reconnectContext.serverInfo.getAddress()});
  84. reconnectContext.serverInfo = null;
  85. }
  86. }
  87. RpcClient.this.reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
  88. } catch (Throwable var6) {
  89. }
  90. }
  91. }
  92. }
  93. }
  94. }
  95. }
  96. });
  97. Connection connectToServer = null;
  98. this.rpcClientStatus.set(RpcClientStatus.STARTING);
  99. //失败重试次数
  100. int startUpRetryTimes = 3;
  101. while(startUpRetryTimes > 0 && connectToServer == null) {
  102. try {
  103. --startUpRetryTimes;
  104. RpcClient.ServerInfo serverInfo = this.nextRpcServer();
  105. LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}", new Object[]{this.name, serverInfo});
  106. connectToServer = this.connectToServer(serverInfo);
  107. } catch (Throwable var5) {
  108. LoggerUtils.printIfWarnEnabled(LOGGER, "[{}]Fail to connect to server on start up, error message={}, start up retry times left: {}", new Object[]{this.name, var5.getMessage(), startUpRetryTimes});
  109. }
  110. }
  111. if (connectToServer != null) {
  112. LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up,connectionId={}", new Object[]{this.name, connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId()});
  113. this.currentConnection = connectToServer;
  114. this.rpcClientStatus.set(RpcClientStatus.RUNNING);
  115. this.eventLinkedBlockingQueue.offer(new RpcClient.ConnectionEvent(1));
  116. } else {
  117. this.switchServerAsync();
  118. }
  119. this.registerServerRequestHandler(new RpcClient.ConnectResetRequestHandler());
  120. this.registerServerRequestHandler(new ServerRequestHandler() {
  121. public Response requestReply(Request request) {
  122. return request instanceof ClientDetectionRequest ? new ClientDetectionResponse() : null;
  123. }
  124. });
  125. }
  126. }

GrpcClient

  1. public Connection connectToServer(ServerInfo serverInfo) {
  2. try {
  3. int port;
  4. if (this.grpcExecutor == null) {
  5. port = ThreadUtils.getSuitableThreadCount(8);
  6. this.grpcExecutor = new ThreadPoolExecutor(port, port, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue(10000), (new ThreadFactoryBuilder()).setDaemon(true).setNameFormat("nacos-grpc-client-executor-%d").build());
  7. this.grpcExecutor.allowCoreThreadTimeOut(true);
  8. }
  9. port = serverInfo.getServerPort() + this.rpcPortOffset();
  10. RequestFutureStub newChannelStubTemp = this.createNewChannelStub(serverInfo.getServerIp(), port);
  11. if (newChannelStubTemp != null) {
  12. Response response = this.serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
  13. if (response != null && response instanceof ServerCheckResponse) {
  14. BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub(newChannelStubTemp.getChannel());
  15. GrpcConnection grpcConn = new GrpcConnection(serverInfo, this.grpcExecutor);
  16. grpcConn.setConnectionId(((ServerCheckResponse)response).getConnectionId());
  17. StreamObserver<Payload> payloadStreamObserver = this.bindRequestStream(biRequestStreamStub, grpcConn);
  18. grpcConn.setPayloadStreamObserver(payloadStreamObserver);
  19. grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
  20. grpcConn.setChannel((ManagedChannel)newChannelStubTemp.getChannel());
  21. ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
  22. conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
  23. conSetupRequest.setLabels(super.getLabels());
  24. conSetupRequest.setAbilities(super.clientAbilities);
  25. conSetupRequest.setTenant(super.getTenant());
  26. grpcConn.sendRequest(conSetupRequest);
  27. Thread.sleep(100L);
  28. return grpcConn;
  29. } else {
  30. this.shuntDownChannel((ManagedChannel)newChannelStubTemp.getChannel());
  31. return null;
  32. }
  33. } else {
  34. return null;
  35. }
  36. } catch (Exception var9) {
  37. LOGGER.error("[{}]Fail to connect to server!,error={}", this.getName(), var9);
  38. return null;
  39. }
  40. }

NacosWatch

  1. SmartLifecycle 接口。当Spring容器加载所有bean并完成初始化之后,会接着回调实现该接口的类中对应的方法
  2. NacosWatch实现了该接口,主要作用试发布事件

NacosServiceRegistry

  1. #spring容器完成之后会发布事件,当前这个接口实现了服务注册接口AbstractAutoServiceRegistration
  2. public class NacosServiceRegistry implements ServiceRegistry<Registration> {
  3. private static final Logger log = LoggerFactory.getLogger(NacosServiceRegistry.class);
  4. private final NacosDiscoveryProperties nacosDiscoveryProperties;
  5. private final NamingService namingService;
  6. public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
  7. this.nacosDiscoveryProperties = nacosDiscoveryProperties;
  8. this.namingService = nacosDiscoveryProperties.namingServiceInstance();
  9. }
  10. #Registration在实例化的时候会把ip超时时间,心跳超时时间等一些数据放入metadata
  11. public void register(Registration registration) {
  12. if (StringUtils.isEmpty(registration.getServiceId())) {
  13. log.warn("No service to register for nacos client...");
  14. } else {
  15. String serviceId = registration.getServiceId();
  16. Instance instance = this.getNacosInstanceFromRegistration(registration);
  17. try {
  18. this.namingService.registerInstance(serviceId, instance);
  19. log.info("nacos registry, {} {}:{} register finished", new Object[]{serviceId, instance.getIp(), instance.getPort()});
  20. } catch (Exception var5) {
  21. log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var5});
  22. }
  23. }
  24. }
  25. }

一、创建心跳信息信息,执行心跳定时任务

  1. public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
  2. if (instance.isEphemeral()) {
  3. BeatInfo beatInfo = new BeatInfo();
  4. beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
  5. beatInfo.setIp(instance.getIp());
  6. beatInfo.setPort(instance.getPort());
  7. beatInfo.setCluster(instance.getClusterName());
  8. beatInfo.setWeight(instance.getWeight());
  9. beatInfo.setMetadata(instance.getMetadata());
  10. beatInfo.setScheduled(false);
  11. long instanceInterval = instance.getInstanceHeartBeatInterval();
  12. // 设置心跳检测的时间周期,默认是5s
  13. // 可以通过属性spring.cloud.nacos.discovery.heartBeatInterval配置
  14. beatInfo.setPeriod(instanceInterval == 0L ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
  15. //创建心跳定时任务
  16. this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
  17. }
  18. this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
  19. }

Nacos源码2.03二、服务注册 - 图1
1.1创建心跳定时任务(默认为5秒)

  1. public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
  2. LogUtils.NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
  3. //将服务名 ip 端口作为key 心跳信息作为value存入map
  4. this.dom2Beat.put(this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);
  5. //创建定时任务
  6. this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), 0L, TimeUnit.MILLISECONDS);
  7. #创建prometheus监视器监控心跳
  8. MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
  9. }

1.2线程池的创建

  1. #默认取namingClientBeatThreadCount数量如果没有根据下面算法得出
  2. #线程数的定义算法
  3. Runtime.getRuntime().availableProcessors() > 1 ? Runtime.getRuntime().availableProcessors() / 2 : 1
  4. #线程池的创建是在对象创建的构造器注入的,默认为8个(守护线程)
  5. public BeatReactor(NamingProxy serverProxy, int threadCount) {
  6. this.dom2Beat = new ConcurrentHashMap();
  7. this.serverProxy = serverProxy;
  8. this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
  9. public Thread newThread(Runnable r) {
  10. Thread thread = new Thread(r);
  11. thread.setDaemon(true);
  12. thread.setName("com.alibaba.nacos.naming.beat.sender");
  13. return thread;
  14. }
  15. });
  16. }

1.3线程任务的创建

  1. class BeatTask implements Runnable {
  2. BeatInfo beatInfo;
  3. public BeatTask(BeatInfo beatInfo) {
  4. this.beatInfo = beatInfo;
  5. }
  6. public void run() {
  7. if (!this.beatInfo.isStopped()) {
  8. //向客户端的心跳接口请求/instance/beat
  9. long result = BeatReactor.this.serverProxy.sendBeat(this.beatInfo);
  10. //获取下一次的请求时间
  11. long nextTime = result > 0L ? result : this.beatInfo.getPeriod();
  12. //将任务放入定时任务池中(守护线程)
  13. BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(this.beatInfo), nextTime, TimeUnit.MILLISECONDS);
  14. }
  15. }
  16. }

2、创建监视器

  1. private static Gauge nacosMonitor = (Gauge)((Builder)((Builder)((Builder)Gauge.build().name("nacos_monitor")).labelNames(new String[]{"module", "name"})).help("nacos_monitor")).register();
  2. //设置监视器
  3. public static Child getDom2BeatSizeMonitor() {
  4. return (Child)nacosMonitor.labels(new String[]{"naming", "dom2BeatSize"});
  5. }

二.注册服务调用

2.1参数封装

  1. public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
  2. LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance});
  3. Map<String, String> params = new HashMap(9);
  4. params.put("namespaceId", this.namespaceId);
  5. params.put("serviceName", serviceName);
  6. params.put("groupName", groupName);
  7. params.put("clusterName", instance.getClusterName());
  8. params.put("ip", instance.getIp());
  9. params.put("port", String.valueOf(instance.getPort()));
  10. params.put("weight", String.valueOf(instance.getWeight()));
  11. params.put("enable", String.valueOf(instance.isEnabled()));
  12. params.put("healthy", String.valueOf(instance.isHealthy()));
  13. params.put("ephemeral", String.valueOf(instance.isEphemeral()));
  14. params.put("metadata", JSON.toJSONString(instance.getMetadata()));
  15. this.reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, (String)"POST");
  16. }

2.2采用故障转移算法,请求调用注册地址/nacos/v1/ns/instance

  1. public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
  2. params.put("namespaceId", this.getNamespaceId());
  3. if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(this.nacosDomain)) {
  4. throw new IllegalArgumentException("no server available");
  5. } else {
  6. Exception exception = new Exception();
  7. if (servers != null && !servers.isEmpty()) {
  8. Random random = new Random(System.currentTimeMillis());//根据当前时间创建随机数
  9. int index = random.nextInt(servers.size());//根据服务数量获取随机数
  10. for(int i = 0; i < servers.size(); ++i) {
  11. String server = (String)servers.get(index);//获取随机的注册地址
  12. try {
  13. return this.callServer(api, params, server, method);//调用nacos注册中心
  14. } catch (NacosException var11) {
  15. exception = var11;
  16. LogUtils.NAMING_LOGGER.error("request {} failed.", server, var11);
  17. } catch (Exception var12) {
  18. exception = var12;
  19. LogUtils.NAMING_LOGGER.error("request {} failed.", server, var12);
  20. }
  21. index = (index + 1) % servers.size();//出现异常请求下一个
  22. }
  23. throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());
  24. } else {
  25. int i = 0;
  26. while(i < 3) {
  27. try {
  28. return this.callServer(api, params, this.nacosDomain);
  29. } catch (Exception var13) {
  30. exception = var13;
  31. LogUtils.NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + this.nacosDomain, var13);
  32. ++i;
  33. }
  34. }
  35. throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());
  36. }
  37. }
  38. }
  1. public String callServer(String api, Map<String, String> params, String curServer, String method) throws NacosException {
  2. long start = System.currentTimeMillis();
  3. long end = 0L;
  4. this.checkSignature(params);
  5. List<String> headers = this.builderHeaders();
  6. String url;//请求路径为:/nacos/v1/ns/instance
  7. if (!curServer.startsWith("https://") && !curServer.startsWith("http://")) {
  8. if (!curServer.contains(":")) {
  9. curServer = curServer + ":" + this.serverPort;
  10. }
  11. url = HttpClient.getPrefix() + curServer + api;
  12. } else {
  13. url = curServer + api;
  14. }
  15. HttpResult result = HttpClient.request(url, headers, params, "UTF-8", method);
  16. end = System.currentTimeMillis();
  17. MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code)).observe((double)(end - start));
  18. if (200 == result.code) {
  19. return result.content;
  20. } else if (304 == result.code) {
  21. return "";
  22. } else {
  23. throw new NacosException(500, "failed to req API:" + curServer + api + ". code:" + result.code + " msg: " + result.content);
  24. }
  25. }

3.Nacos Server端接收/nacos/v1/ns/instance请求

  1. @CanDistro
  2. @PostMapping
  3. @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
  4. public String register(HttpServletRequest request) throws Exception {
  5. final String namespaceId = WebUtils//根据namespaceId获取命名空间,如果没有则为默认public
  6. .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
  7. final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);//根据serviceName获取服务名
  8. NamingUtils.checkServiceNameFormat(serviceName);//服务名格式为DEFAULT_GROUP@@nacos-test
  9. final Instance instance = HttpRequestInstanceBuilder.newBuilder()//根据当前请求创建临时实例
  10. .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
  11. getInstanceOperator().registerInstance(namespaceId, serviceName, instance);//注册实例
  12. return "ok";
  13. }

这里是客户端请求会进入InstanceOperatorClientImpl

  1. boolean ephemeral = instance.isEphemeral();//判断是否是临时实例
  2. String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);//根据ip获取clientId——》127.0.0.1:8181#true
  3. createIpPortClientIfAbsent(clientId);//如果容器已有当前ip实例建立连接
  4. Service service = getService(namespaceId, serviceName, ephemeral);//创建服务实例
  5. clientOperationService.registerInstance(service, instance, clientId);//注册客户端实例
  1. @Override
  2. public void registerInstance(Service service, Instance instance, String clientId) {
  3. Service singleton = ServiceManager.getInstance().getSingleton(service);//获取服务单实例
  4. Client client = clientManager.getClient(clientId);
  5. if (!clientIsLegal(client, clientId)) {//判断连接是否已经断开或者不是临时连接
  6. return;
  7. }
  8. InstancePublishInfo instanceInfo = getPublishInfo(instance);//创建事件实例
  9. client.addServiceInstance(singleton, instanceInfo);
  10. client.setLastUpdatedTime();
  11. NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
  12. NotifyCenter
  13. .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));//发布事件
  14. }