一.NacosPropertySourceLocator
当前类实现了PropertySourceLocator,主要用于配置文件的拓展,从配置中心读取配置
private void loadApplicationConfiguration(CompositePropertySource compositePropertySource, String dataIdPrefix, NacosConfigProperties properties, Environment environment) {String fileExtension = properties.getFileExtension();String nacosGroup = properties.getGroup();//1.1读取配置this.loadNacosDataIfPresent(compositePropertySource, dataIdPrefix, nacosGroup, fileExtension, true);this.loadNacosDataIfPresent(compositePropertySource, dataIdPrefix + "." + fileExtension, nacosGroup, fileExtension, true);String[] var7 = environment.getActiveProfiles();int var8 = var7.length;for(int var9 = 0; var9 < var8; ++var9) {String profile = var7[var9];String dataId = dataIdPrefix + "-" + profile + "." + fileExtension;this.loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup, fileExtension, true);}}
二.RpcClient
在获取客户端配置的时候会启动Rpc客户端
public final void start() throws NacosException {//将当前客户端跟改为启动状态boolean success = this.rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);if (success) {//创建定时任务线程池(包含两个守护线程)this.clientEventExecutor = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {public Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.remote.worker");t.setDaemon(true);return t;}});this.clientEventExecutor.submit(new Runnable() {public void run() {while(!RpcClient.this.clientEventExecutor.isTerminated() && !RpcClient.this.clientEventExecutor.isShutdown()) {RpcClient.ConnectionEvent take = null;try {take = (RpcClient.ConnectionEvent)RpcClient.this.eventLinkedBlockingQueue.take();if (take.isConnected()) {RpcClient.this.notifyConnected();} else if (take.isDisConnected()) {RpcClient.this.notifyDisConnected();}} catch (Throwable var3) {}}}});this.clientEventExecutor.submit(new Runnable() {public void run() {while(true) {while(true) {while(true) {while(true) {while(true) {try {RpcClient.ReconnectContext reconnectContext;boolean serverExist;label69: {if (!RpcClient.this.isShutdown()) {reconnectContext = (RpcClient.ReconnectContext)RpcClient.this.reconnectionSignal.poll(RpcClient.this.keepAliveTime, TimeUnit.MILLISECONDS);if (reconnectContext != null) {break label69;}if (System.currentTimeMillis() - RpcClient.this.lastActiveTimeStamp < RpcClient.this.keepAliveTime) {continue;}serverExist = RpcClient.this.healthCheck();if (serverExist) {RpcClient.this.lastActiveTimeStamp = System.currentTimeMillis();continue;}if (RpcClient.this.currentConnection == null) {continue;}LoggerUtils.printIfInfoEnabled(RpcClient.LOGGER, "[{}]Server healthy check fail,currentConnection={}", new Object[]{RpcClient.this.name, RpcClient.this.currentConnection.getConnectionId()});RpcClientStatus rpcClientStatus = (RpcClientStatus)RpcClient.this.rpcClientStatus.get();if (!RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {boolean success = RpcClient.this.rpcClientStatus.compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);if (!success) {continue;}reconnectContext = RpcClient.this.new ReconnectContext((RpcClient.ServerInfo)null, false);break label69;}}return;}if (reconnectContext.serverInfo != null) {serverExist = false;Iterator var7 = RpcClient.this.getServerListFactory().getServerList().iterator();while(var7.hasNext()) {String server = (String)var7.next();RpcClient.ServerInfo serverInfo = RpcClient.this.resolveServerInfo(server);if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {serverExist = true;reconnectContext.serverInfo.serverPort = serverInfo.serverPort;break;}}if (!serverExist) {LoggerUtils.printIfInfoEnabled(RpcClient.LOGGER, "[{}] Recommend server is not in server list ,ignore recommend server {}", new Object[]{RpcClient.this.name, reconnectContext.serverInfo.getAddress()});reconnectContext.serverInfo = null;}}RpcClient.this.reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);} catch (Throwable var6) {}}}}}}}});Connection connectToServer = null;this.rpcClientStatus.set(RpcClientStatus.STARTING);//失败重试次数int startUpRetryTimes = 3;while(startUpRetryTimes > 0 && connectToServer == null) {try {--startUpRetryTimes;RpcClient.ServerInfo serverInfo = this.nextRpcServer();LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}", new Object[]{this.name, serverInfo});connectToServer = this.connectToServer(serverInfo);} catch (Throwable var5) {LoggerUtils.printIfWarnEnabled(LOGGER, "[{}]Fail to connect to server on start up, error message={}, start up retry times left: {}", new Object[]{this.name, var5.getMessage(), startUpRetryTimes});}}if (connectToServer != null) {LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up,connectionId={}", new Object[]{this.name, connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId()});this.currentConnection = connectToServer;this.rpcClientStatus.set(RpcClientStatus.RUNNING);this.eventLinkedBlockingQueue.offer(new RpcClient.ConnectionEvent(1));} else {this.switchServerAsync();}this.registerServerRequestHandler(new RpcClient.ConnectResetRequestHandler());this.registerServerRequestHandler(new ServerRequestHandler() {public Response requestReply(Request request) {return request instanceof ClientDetectionRequest ? new ClientDetectionResponse() : null;}});}}
GrpcClient
public Connection connectToServer(ServerInfo serverInfo) {try {int port;if (this.grpcExecutor == null) {port = ThreadUtils.getSuitableThreadCount(8);this.grpcExecutor = new ThreadPoolExecutor(port, port, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue(10000), (new ThreadFactoryBuilder()).setDaemon(true).setNameFormat("nacos-grpc-client-executor-%d").build());this.grpcExecutor.allowCoreThreadTimeOut(true);}port = serverInfo.getServerPort() + this.rpcPortOffset();RequestFutureStub newChannelStubTemp = this.createNewChannelStub(serverInfo.getServerIp(), port);if (newChannelStubTemp != null) {Response response = this.serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);if (response != null && response instanceof ServerCheckResponse) {BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub(newChannelStubTemp.getChannel());GrpcConnection grpcConn = new GrpcConnection(serverInfo, this.grpcExecutor);grpcConn.setConnectionId(((ServerCheckResponse)response).getConnectionId());StreamObserver<Payload> payloadStreamObserver = this.bindRequestStream(biRequestStreamStub, grpcConn);grpcConn.setPayloadStreamObserver(payloadStreamObserver);grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);grpcConn.setChannel((ManagedChannel)newChannelStubTemp.getChannel());ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());conSetupRequest.setLabels(super.getLabels());conSetupRequest.setAbilities(super.clientAbilities);conSetupRequest.setTenant(super.getTenant());grpcConn.sendRequest(conSetupRequest);Thread.sleep(100L);return grpcConn;} else {this.shuntDownChannel((ManagedChannel)newChannelStubTemp.getChannel());return null;}} else {return null;}} catch (Exception var9) {LOGGER.error("[{}]Fail to connect to server!,error={}", this.getName(), var9);return null;}}
NacosWatch
SmartLifecycle 接口。当Spring容器加载所有bean并完成初始化之后,会接着回调实现该接口的类中对应的方法NacosWatch实现了该接口,主要作用试发布事件
NacosServiceRegistry
#spring容器完成之后会发布事件,当前这个接口实现了服务注册接口AbstractAutoServiceRegistrationpublic class NacosServiceRegistry implements ServiceRegistry<Registration> {private static final Logger log = LoggerFactory.getLogger(NacosServiceRegistry.class);private final NacosDiscoveryProperties nacosDiscoveryProperties;private final NamingService namingService;public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {this.nacosDiscoveryProperties = nacosDiscoveryProperties;this.namingService = nacosDiscoveryProperties.namingServiceInstance();}#Registration在实例化的时候会把ip超时时间,心跳超时时间等一些数据放入metadata中public void register(Registration registration) {if (StringUtils.isEmpty(registration.getServiceId())) {log.warn("No service to register for nacos client...");} else {String serviceId = registration.getServiceId();Instance instance = this.getNacosInstanceFromRegistration(registration);try {this.namingService.registerInstance(serviceId, instance);log.info("nacos registry, {} {}:{} register finished", new Object[]{serviceId, instance.getIp(), instance.getPort()});} catch (Exception var5) {log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var5});}}}}
一、创建心跳信息信息,执行心跳定时任务
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {if (instance.isEphemeral()) {BeatInfo beatInfo = new BeatInfo();beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));beatInfo.setIp(instance.getIp());beatInfo.setPort(instance.getPort());beatInfo.setCluster(instance.getClusterName());beatInfo.setWeight(instance.getWeight());beatInfo.setMetadata(instance.getMetadata());beatInfo.setScheduled(false);long instanceInterval = instance.getInstanceHeartBeatInterval();// 设置心跳检测的时间周期,默认是5s// 可以通过属性spring.cloud.nacos.discovery.heartBeatInterval配置beatInfo.setPeriod(instanceInterval == 0L ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);//创建心跳定时任务this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);}this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);}

1.1创建心跳定时任务(默认为5秒)
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {LogUtils.NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);//将服务名 ip 端口作为key 心跳信息作为value存入mapthis.dom2Beat.put(this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);//创建定时任务this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), 0L, TimeUnit.MILLISECONDS);#创建prometheus监视器监控心跳MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());}
1.2线程池的创建
#默认取namingClientBeatThreadCount数量如果没有根据下面算法得出#线程数的定义算法Runtime.getRuntime().availableProcessors() > 1 ? Runtime.getRuntime().availableProcessors() / 2 : 1#线程池的创建是在对象创建的构造器注入的,默认为8个(守护线程)public BeatReactor(NamingProxy serverProxy, int threadCount) {this.dom2Beat = new ConcurrentHashMap();this.serverProxy = serverProxy;this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {public Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setDaemon(true);thread.setName("com.alibaba.nacos.naming.beat.sender");return thread;}});}
1.3线程任务的创建
class BeatTask implements Runnable {BeatInfo beatInfo;public BeatTask(BeatInfo beatInfo) {this.beatInfo = beatInfo;}public void run() {if (!this.beatInfo.isStopped()) {//向客户端的心跳接口请求/instance/beatlong result = BeatReactor.this.serverProxy.sendBeat(this.beatInfo);//获取下一次的请求时间long nextTime = result > 0L ? result : this.beatInfo.getPeriod();//将任务放入定时任务池中(守护线程)BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(this.beatInfo), nextTime, TimeUnit.MILLISECONDS);}}}
2、创建监视器
private static Gauge nacosMonitor = (Gauge)((Builder)((Builder)((Builder)Gauge.build().name("nacos_monitor")).labelNames(new String[]{"module", "name"})).help("nacos_monitor")).register();//设置监视器public static Child getDom2BeatSizeMonitor() {return (Child)nacosMonitor.labels(new String[]{"naming", "dom2BeatSize"});}
二.注册服务调用
2.1参数封装
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance});Map<String, String> params = new HashMap(9);params.put("namespaceId", this.namespaceId);params.put("serviceName", serviceName);params.put("groupName", groupName);params.put("clusterName", instance.getClusterName());params.put("ip", instance.getIp());params.put("port", String.valueOf(instance.getPort()));params.put("weight", String.valueOf(instance.getWeight()));params.put("enable", String.valueOf(instance.isEnabled()));params.put("healthy", String.valueOf(instance.isHealthy()));params.put("ephemeral", String.valueOf(instance.isEphemeral()));params.put("metadata", JSON.toJSONString(instance.getMetadata()));this.reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, (String)"POST");}
2.2采用故障转移算法,请求调用注册地址/nacos/v1/ns/instance
public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {params.put("namespaceId", this.getNamespaceId());if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(this.nacosDomain)) {throw new IllegalArgumentException("no server available");} else {Exception exception = new Exception();if (servers != null && !servers.isEmpty()) {Random random = new Random(System.currentTimeMillis());//根据当前时间创建随机数int index = random.nextInt(servers.size());//根据服务数量获取随机数for(int i = 0; i < servers.size(); ++i) {String server = (String)servers.get(index);//获取随机的注册地址try {return this.callServer(api, params, server, method);//调用nacos注册中心} catch (NacosException var11) {exception = var11;LogUtils.NAMING_LOGGER.error("request {} failed.", server, var11);} catch (Exception var12) {exception = var12;LogUtils.NAMING_LOGGER.error("request {} failed.", server, var12);}index = (index + 1) % servers.size();//出现异常请求下一个}throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());} else {int i = 0;while(i < 3) {try {return this.callServer(api, params, this.nacosDomain);} catch (Exception var13) {exception = var13;LogUtils.NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + this.nacosDomain, var13);++i;}}throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());}}}
public String callServer(String api, Map<String, String> params, String curServer, String method) throws NacosException {long start = System.currentTimeMillis();long end = 0L;this.checkSignature(params);List<String> headers = this.builderHeaders();String url;//请求路径为:/nacos/v1/ns/instanceif (!curServer.startsWith("https://") && !curServer.startsWith("http://")) {if (!curServer.contains(":")) {curServer = curServer + ":" + this.serverPort;}url = HttpClient.getPrefix() + curServer + api;} else {url = curServer + api;}HttpResult result = HttpClient.request(url, headers, params, "UTF-8", method);end = System.currentTimeMillis();MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code)).observe((double)(end - start));if (200 == result.code) {return result.content;} else if (304 == result.code) {return "";} else {throw new NacosException(500, "failed to req API:" + curServer + api + ". code:" + result.code + " msg: " + result.content);}}
3.Nacos Server端接收/nacos/v1/ns/instance请求
@CanDistro@PostMapping@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)public String register(HttpServletRequest request) throws Exception {final String namespaceId = WebUtils//根据namespaceId获取命名空间,如果没有则为默认public.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);//根据serviceName获取服务名NamingUtils.checkServiceNameFormat(serviceName);//服务名格式为DEFAULT_GROUP@@nacos-testfinal Instance instance = HttpRequestInstanceBuilder.newBuilder()//根据当前请求创建临时实例.setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();getInstanceOperator().registerInstance(namespaceId, serviceName, instance);//注册实例return "ok";}
这里是客户端请求会进入InstanceOperatorClientImpl
boolean ephemeral = instance.isEphemeral();//判断是否是临时实例String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);//根据ip获取clientId——》127.0.0.1:8181#truecreateIpPortClientIfAbsent(clientId);//如果容器已有当前ip实例建立连接Service service = getService(namespaceId, serviceName, ephemeral);//创建服务实例clientOperationService.registerInstance(service, instance, clientId);//注册客户端实例
@Overridepublic void registerInstance(Service service, Instance instance, String clientId) {Service singleton = ServiceManager.getInstance().getSingleton(service);//获取服务单实例Client client = clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {//判断连接是否已经断开或者不是临时连接return;}InstancePublishInfo instanceInfo = getPublishInfo(instance);//创建事件实例client.addServiceInstance(singleton, instanceInfo);client.setLastUpdatedTime();NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));//发布事件}
