Dubbo 的注册中心虽然提供了多种实现,但生产上的事实标准基本上都是 基于 Zookeeper 实现的。这种注册中心的实现方法也是 Dubbo 最为推荐的。为了易于理解 Zookeeper 在 Dubbo 中的应用,我们先简单看一下 zookeeper。

由于 Dubbo 是一个分布式 RPC 开源框架,各服务之间单独部署,往往会出现资源之间数据不一致的问题,比如:某一个服务增加或减少了几台机器,某个服务提供者变更了服务地址,那么服务消费者是很难感知到这种变化的。而 Zookeeper 本身就有保证分布式数据一致性的特性。那么 Dubbo 服务是如何被 Zookeeper 的数据结构存储管理的呢,zookeeper 采用的是树形结构来组织数据节点,它类似于一个标准的文件系统,如下图所示。

avatar

该图展示了 dubbo 在 zookeeper 中存储的形式以及节点层级。dubbo 的 Root 层是根目录,通过的“group”来设置 zookeeper 的根节点,缺省值是“dubbo”。Service 层是服务接口的全名。Type 层是分类,一共有四种分类,分别是 providers 服务提供者列表、consumers 服务消费者列表、routes 路由规则列表、configurations 配置规则列表。URL 层 根据不同的 Type 目录:可以有服务提供者 URL 、服务消费者 URL 、路由规则 URL 、配置规则 URL 。不同的 Type 关注的 URL 不同。

zookeeper 以斜杠来分割每一层的 znode 节点,比如第一层根节点 dubbo 就是“/dubbo”,而第二层的 Service 层就是/dubbo/com.foo.Barservice,zookeeper 的每个节点通过路径来表示以及访问,例如服务提供者启动时,向/dubbo/com.foo.Barservice/providers 目录下写入自己的 URL 地址。

dubbo-registry-zookeeper 模块的工程结构如下图所示,里面就俩类,非常简单。

avatar

ZookeeperRegistry

该类继承了 FailbackRegistry 抽象类,针对注册中心核心的 服务注册、服务订阅、取消注册、取消订阅,查询注册列表进行展开,这里用到了 模板方法设计模式,FailbackRegistry 中定义了 register()、subscribe()等模板方法和 doRegister()、doSubscribe()抽象方法,ZookeeperRegistry 基于 zookeeper 对这些抽象方法进行了实现。其实你会发现 zookeeper 虽然是最被推荐的,反而它的实现逻辑相对简单,因为调用了 zookeeper 服务组件,很多的逻辑不需要在 dubbo 中自己去实现。

  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License. You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package com.alibaba.dubbo.registry.zookeeper;
  18. import com.alibaba.dubbo.common.Constants;
  19. import com.alibaba.dubbo.common.URL;
  20. import com.alibaba.dubbo.common.logger.Logger;
  21. import com.alibaba.dubbo.common.logger.LoggerFactory;
  22. import com.alibaba.dubbo.common.utils.ConcurrentHashSet;
  23. import com.alibaba.dubbo.common.utils.UrlUtils;
  24. import com.alibaba.dubbo.registry.NotifyListener;
  25. import com.alibaba.dubbo.registry.support.FailbackRegistry;
  26. import com.alibaba.dubbo.remoting.zookeeper.ChildListener;
  27. import com.alibaba.dubbo.remoting.zookeeper.StateListener;
  28. import com.alibaba.dubbo.remoting.zookeeper.ZookeeperClient;
  29. import com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter;
  30. import com.alibaba.dubbo.rpc.RpcException;
  31. import java.util.ArrayList;
  32. import java.util.List;
  33. import java.util.Set;
  34. import java.util.concurrent.ConcurrentHashMap;
  35. import java.util.concurrent.ConcurrentMap;
  36. /**
  37. * ZookeeperRegistry
  38. *
  39. * Zookeeper Registry 实现类
  40. */
  41. public class ZookeeperRegistry extends FailbackRegistry {
  42. private final static Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class);
  43. /**
  44. * 默认端口
  45. */
  46. private final static int DEFAULT_ZOOKEEPER_PORT = 2181;
  47. /**
  48. * 默认 Zookeeper 根节点
  49. */
  50. private final static String DEFAULT_ROOT = "dubbo";
  51. /**
  52. * Zookeeper 根节点
  53. */
  54. private final String root;
  55. /**
  56. * Service 接口全名集合
  57. */
  58. private final Set<String> anyServices = new ConcurrentHashSet<String>();
  59. /**
  60. * 监听器集合
  61. */
  62. private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();
  63. /**
  64. * Zookeeper 客户端
  65. */
  66. private final ZookeeperClient zkClient;
  67. public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
  68. super(url);
  69. if (url.isAnyHost()) {
  70. throw new IllegalStateException("registry address == null");
  71. }
  72. // 获得 Zookeeper 根节点
  73. String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); // `url.parameters.group` 参数值
  74. if (!group.startsWith(Constants.PATH_SEPARATOR)) {
  75. group = Constants.PATH_SEPARATOR + group;
  76. }
  77. this.root = group;
  78. // 创建 Zookeeper Client
  79. zkClient = zookeeperTransporter.connect(url);
  80. // 添加 StateListener 对象。该监听器,在重连时,调用恢复方法。
  81. zkClient.addStateListener(new StateListener() {
  82. public void stateChanged(int state) {
  83. if (state == RECONNECTED) {
  84. try {
  85. recover();
  86. } catch (Exception e) {
  87. logger.error(e.getMessage(), e);
  88. }
  89. }
  90. }
  91. });
  92. }
  93. // 目前只有测试方法使用
  94. static String appendDefaultPort(String address) {
  95. if (address != null && address.length() > 0) {
  96. int i = address.indexOf(':');
  97. if (i < 0) {
  98. return address + ":" + DEFAULT_ZOOKEEPER_PORT;
  99. } else if (Integer.parseInt(address.substring(i + 1)) == 0) {
  100. return address.substring(0, i + 1) + DEFAULT_ZOOKEEPER_PORT;
  101. }
  102. }
  103. return address;
  104. }
  105. @Override
  106. public boolean isAvailable() {
  107. return zkClient.isConnected();
  108. }
  109. @Override
  110. public void destroy() {
  111. // 调用父方法,取消注册和订阅
  112. super.destroy();
  113. try {
  114. // 关闭 Zookeeper 客户端连接
  115. zkClient.close();
  116. } catch (Exception e) {
  117. logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e);
  118. }
  119. }
  120. @Override
  121. protected void doRegister(URL url) {
  122. try {
  123. zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
  124. } catch (Throwable e) {
  125. throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
  126. }
  127. }
  128. @Override
  129. protected void doUnregister(URL url) {
  130. try {
  131. zkClient.delete(toUrlPath(url));
  132. } catch (Throwable e) {
  133. throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
  134. }
  135. }
  136. @Override
  137. protected void doSubscribe(final URL url, final NotifyListener listener) {
  138. try {
  139. // 处理所有 Service 层的发起订阅,例如监控中心的订阅
  140. if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
  141. String root = toRootPath();
  142. // 获得 url 对应的监听器集合
  143. ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
  144. if (listeners == null) { // 不存在,进行创建
  145. zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
  146. listeners = zkListeners.get(url);
  147. }
  148. // 获得 ChildListener 对象
  149. ChildListener zkListener = listeners.get(listener);
  150. if (zkListener == null) { // 不存在 ChildListener 对象,进行创建 ChildListener 对象
  151. listeners.putIfAbsent(listener, new ChildListener() {
  152. public void childChanged(String parentPath, List<String> currentChilds) {
  153. for (String child : currentChilds) {
  154. child = URL.decode(child);
  155. // 新增 Service 接口全名时(即新增服务),发起该 Service 层的订阅
  156. if (!anyServices.contains(child)) {
  157. anyServices.add(child);
  158. subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
  159. Constants.CHECK_KEY, String.valueOf(false)), listener);
  160. }
  161. }
  162. }
  163. });
  164. zkListener = listeners.get(listener);
  165. }
  166. // 创建 Service 节点。该节点为持久节点。
  167. zkClient.create(root, false);
  168. // 向 Zookeeper ,Service 节点,发起订阅
  169. List<String> services = zkClient.addChildListener(root, zkListener);
  170. // 首次全量数据获取完成时,循环 Service 接口全名数组,发起该 Service 层的订阅
  171. if (services != null && !services.isEmpty()) {
  172. for (String service : services) {
  173. service = URL.decode(service);
  174. anyServices.add(service);
  175. subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
  176. Constants.CHECK_KEY, String.valueOf(false)), listener);
  177. }
  178. }
  179. // 处理指定 Service 层的发起订阅,例如服务消费者的订阅
  180. } else {
  181. // 子节点数据数组
  182. List<URL> urls = new ArrayList<URL>();
  183. // 循环分类数组
  184. for (String path : toCategoriesPath(url)) {
  185. // 获得 url 对应的监听器集合
  186. ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
  187. if (listeners == null) { // 不存在,进行创建
  188. zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
  189. listeners = zkListeners.get(url);
  190. }
  191. // 获得 ChildListener 对象
  192. ChildListener zkListener = listeners.get(listener);
  193. if (zkListener == null) { // 不存在 ChildListener 对象,进行创建 ChildListener 对象
  194. listeners.putIfAbsent(listener, new ChildListener() {
  195. public void childChanged(String parentPath, List<String> currentChilds) {
  196. // 变更时,调用 `#notify(...)` 方法,回调 NotifyListener
  197. ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
  198. }
  199. });
  200. zkListener = listeners.get(listener);
  201. }
  202. // 创建 Type 节点。该节点为持久节点。
  203. zkClient.create(path, false);
  204. // 向 Zookeeper ,PATH 节点,发起订阅
  205. List<String> children = zkClient.addChildListener(path, zkListener);
  206. // 添加到 `urls` 中
  207. if (children != null) {
  208. urls.addAll(toUrlsWithEmpty(url, path, children));
  209. }
  210. }
  211. // 首次全量数据获取完成时,调用 `#notify(...)` 方法,回调 NotifyListener
  212. notify(url, listener, urls);
  213. }
  214. } catch (Throwable e) {
  215. throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
  216. }
  217. }
  218. @Override
  219. protected void doUnsubscribe(URL url, NotifyListener listener) {
  220. ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
  221. if (listeners != null) {
  222. ChildListener zkListener = listeners.get(listener);
  223. if (zkListener != null) {
  224. // 向 Zookeeper ,移除订阅
  225. zkClient.removeChildListener(toUrlPath(url), zkListener);
  226. }
  227. }
  228. }
  229. @Override
  230. public List<URL> lookup(URL url) {
  231. if (url == null) {
  232. throw new IllegalArgumentException("lookup url == null");
  233. }
  234. try {
  235. // 循环分类数组,获得所有的 URL 数组
  236. List<String> providers = new ArrayList<String>();
  237. for (String path : toCategoriesPath(url)) {
  238. List<String> children = zkClient.getChildren(path);
  239. if (children != null) {
  240. providers.addAll(children);
  241. }
  242. }
  243. // 匹配
  244. return toUrlsWithoutEmpty(url, providers);
  245. } catch (Throwable e) {
  246. throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
  247. }
  248. }
  249. /**
  250. * 获得根目录
  251. * @return 路径
  252. */
  253. private String toRootDir() {
  254. if (root.equals(Constants.PATH_SEPARATOR)) {
  255. return root;
  256. }
  257. return root + Constants.PATH_SEPARATOR;
  258. }
  259. /**
  260. * @return 根路径
  261. */
  262. private String toRootPath() {
  263. return root;
  264. }
  265. /**
  266. * 获得服务路径
  267. *
  268. * Root + Type
  269. *
  270. * @param url URL
  271. * @return 服务路径
  272. */
  273. private String toServicePath(URL url) {
  274. String name = url.getServiceInterface();
  275. if (Constants.ANY_VALUE.equals(name)) {
  276. return toRootPath();
  277. }
  278. return toRootDir() + URL.encode(name);
  279. }
  280. /**
  281. * 获得分类路径数组
  282. *
  283. * Root + Service + Type
  284. *
  285. * @param url URL
  286. * @return 分类路径数组
  287. */
  288. private String[] toCategoriesPath(URL url) {
  289. // 获得分类数组
  290. String[] categories;
  291. if (Constants.ANY_VALUE.equals(url.getParameter(Constants.CATEGORY_KEY))) { // * 时,
  292. categories = new String[]{Constants.PROVIDERS_CATEGORY, Constants.CONSUMERS_CATEGORY,
  293. Constants.ROUTERS_CATEGORY, Constants.CONFIGURATORS_CATEGORY};
  294. } else {
  295. categories = url.getParameter(Constants.CATEGORY_KEY, new String[]{Constants.DEFAULT_CATEGORY});
  296. }
  297. // 获得分类路径数组
  298. String[] paths = new String[categories.length];
  299. for (int i = 0; i < categories.length; i++) {
  300. paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categories[i];
  301. }
  302. return paths;
  303. }
  304. /**
  305. * 获得分类路径
  306. *
  307. * Root + Service + Type
  308. *
  309. * @param url URL
  310. * @return 分类路径
  311. */
  312. private String toCategoryPath(URL url) {
  313. return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
  314. }
  315. /**
  316. * 获得 URL 的路径
  317. *
  318. * Root + Service + Type + URL
  319. *
  320. * 被 {@link #doRegister(URL)} 和 {@link #doUnregister(URL)} 调用
  321. *
  322. * @param url URL
  323. * @return 路径
  324. */
  325. private String toUrlPath(URL url) {
  326. return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString());
  327. }
  328. /**
  329. * 获得 providers 中,和 consumer 匹配的 URL 数组
  330. *
  331. * @param consumer 用于匹配 URL
  332. * @param providers 被匹配的 URL 的字符串
  333. * @return 匹配的 URL 数组
  334. */
  335. private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
  336. List<URL> urls = new ArrayList<URL>();
  337. if (providers != null && !providers.isEmpty()) {
  338. for (String provider : providers) {
  339. provider = URL.decode(provider);
  340. if (provider.contains("://")) { // 是 url
  341. URL url = URL.valueOf(provider); // 将字符串转化成 URL
  342. if (UrlUtils.isMatch(consumer, url)) { // 匹配
  343. urls.add(url);
  344. }
  345. }
  346. }
  347. }
  348. return urls;
  349. }
  350. /**
  351. * 获得 providers 中,和 consumer 匹配的 URL 数组
  352. *
  353. * 若不存在匹配,则创建 `empty://` 的 URL返回。通过这样的方式,可以处理类似服务提供者为空的情况。
  354. *
  355. * @param consumer 用于匹配 URL
  356. * @param path 被匹配的 URL 的字符串
  357. * @param providers 匹配的 URL 数组
  358. * @return 匹配的 URL 数组
  359. */
  360. private List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
  361. // 获得 providers 中,和 consumer 匹配的 URL 数组
  362. List<URL> urls = toUrlsWithoutEmpty(consumer, providers);
  363. // 若不存在匹配,则创建 `empty://` 的 URL返回
  364. if (urls == null || urls.isEmpty()) {
  365. int i = path.lastIndexOf('/');
  366. String category = i < 0 ? path : path.substring(i + 1);
  367. URL empty = consumer.setProtocol(Constants.EMPTY_PROTOCOL).addParameter(Constants.CATEGORY_KEY, category);
  368. urls.add(empty);
  369. }
  370. return urls;
  371. }
  372. }

ZookeeperRegistryFactory

ZookeeperRegistryFactory 继承了 AbstractRegistryFactory 抽象类,实现了其中的抽象方法 如 createRegistry(),源码如下。

  1. /**
  2. * Zookeeper Registry 工厂
  3. */
  4. public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
  5. /**
  6. * Zookeeper 工厂
  7. */
  8. private ZookeeperTransporter zookeeperTransporter;
  9. /**
  10. * 设置 Zookeeper 工厂,该方法,通过 Dubbo SPI 注入
  11. *
  12. * @param zookeeperTransporter Zookeeper 工厂对象
  13. */
  14. public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
  15. this.zookeeperTransporter = zookeeperTransporter;
  16. }
  17. @Override
  18. public Registry createRegistry(URL url) {
  19. return new ZookeeperRegistry(url, zookeeperTransporter);
  20. }
  21. }