参考 :https://gongfukangee.github.io/2019/05/19/Hash/
缓存集群
假设现在我们需要使用 Redis 存储图片资源,存储的格式为键值对,key 为图片名称,value 为该图片所在文件服务器的路径,我们需要根据文件名查询该文件所在文件服务器上的路径,假设数据有 100W,部署 4 台缓存服务器,每台服务器大约存储 25W 数据,如果按照随机分配的规则,某一条数据可能会存储在任何一台缓存服务器上,这样,假设查找 beijing.jpg 图片,则最坏需要 4 次查询才能找到,效率很低。
余数 Hash
如果在取数据时能够提前知道某条数据存储在哪个服务器上,则仅需要一次查询就能找到,效率提高很多。这种方案可以通过余数 Hash 来实现,在存储数据时,首先得到数据 key 值的 HashCode,然后对缓存服务器数量取余,得到的余数就是要存储的服务器编号。由于 HashCode 是具有随机性的,因此余数 Hash 算法可以保证缓存数据在整个服务器集群中比较均匀分布。假设图片 beijing.jpg 的 hashcode 值是 10,则存放时放在 10 % 4 = 2 编号为 2 的缓存服务器上,下次取 beijing.jpg 时,可以直接根据 hashcode 值,去编号为 2 的缓存服务器取,只需要一次查询。
Hash 算法问题
这种方法虽然提升了性能,但是还是存在一些缺陷,主要体现在服务器数量发生变动时,缓存的位置就需要发生改变。假设现在 4 台服务器已经满足不了我们的需求,需要添加到 5 台服务器,则还是以 beijing.jpg 为例,之前存储的时候存储在编号为 2 的缓存服务器,现在在取图片时,10 % 5 = 0 ,根据 hashcode 值计算出的缓存服务器编号为 0,无法命中缓存。也就说说缓存服务器的数量发生变动时,大部分缓存一定时间内是失效的,在缓存重建的这段时间内,所有的请求都会从数据库获取数据,容易导致缓存雪崩问题。
一致性 Hash 算法原理
一致性 Hash 算法通过构建环状的 Hash 空间替线性 Hash 空间的方法解决了这个问题,整个 Hash 空间被构建成一个首位相接的环。
其具体的构造过程为:
- 先构造一个长度为 2^32 的一致性 Hash 环
- 计算每个缓存服务器的 Hash 值,并记录,这就是它们在 Hash 环上的位置
- 对于每个图片,先根据 key 的 hashcode 得到它在 Hash 环上的位置,然后在 Hash 环上顺时针查找距离这个 Key 的 Hash 值最近的缓存服务器节点,这就是该图片所要存储的缓存服务器
当缓存服务器需要扩容的时候,只需要将新加入缓存服务器的 Hash 值放入一致性 Hash 环中,由于 Key 是顺时针查找距离其最近的节点,因此新加入的几点只影响整个环中的一小段。加入新节点 NODE3 后,原来的 Key 大部分还能继续计算到原来的节点,只有 Key3、Key0 从原来的 NODE1 重新计算到 NODE3,这样就能保证大部分被缓存数据还可以命中。当节点被删除时,其他节点在环上的映射不会发生改变,只是原来打在对应节点的 key 现在会转移到顺时针方向的下一个节点上。
Hash 环数据倾斜
新加入的节点 NODE3 只影响了原来的节点 NODE1,也就是说一部分原来需要访问 NODE1 的缓存数据现在需要访问 NODE3(概率上是 50%)但是原来的节点 NODE0 和 NODE2 不受影响,这就意味着 NODE0 和 NODE2 缓存数据量和负载压力是 NODE1 与 NODE3 的两倍。
虚拟节点
为了解决这个问题,最好的办法就是扩展整个环上的节点数量,可以将每台物理缓存服务器虚拟为一组虚拟缓存服务器,使得 Hash 环在空间上的分割更加均匀。
这样只是将虚拟节点的 Hash 值放置在 Hash 环上,在查找时,首先根据 Key 值找到环上的虚拟节点,然后再根据虚拟节点找到真实额缓存服务器。虚拟节点的数目足够多,就会使得节点在 Hash 环上的分布更加随机化,也就是增加或者删除一台缓存服务器时,都会较为均匀的影响原来集群中已经存在的缓存服务器。
一致性 Hash 在 Dubbo 中的应用
在 Dubbo 中如果同一环境下服务提供者的数量大于 1,就会出现服务消费者要选择哪个 Provider 进行调用的问题。为了避免集群中部分服务器压力过大,另一些服务器比较空闲的情况,就需要通过负载均衡,让负载 “均摊” 到不同的机器上。Dubbo 中就提供了基于一致性 Hash 的负载均衡算法,只是将缓存服务器替换成 Provider,根据 Provider 和 Consumer 的 IP 地址获取 hashcode。
一致性 Hash 算法的缺陷
虽然一致性 Hash 算法已经十分完善,但是还是有很多不足的地方
- Hash 环上的节点非常多或者更新频繁时,查询效率比较低下
- 整个 Hash 环需要一个服务路由来做负载均衡,存在单点问题
针对这两个问题,Redis 在实现自己的分布式集群方案时,采用了基于 P2P 结构的哈希槽算法
- 使用哈希槽
- Redis Cluster 通过分片的方式将整个缓存划分为 16384 个槽,每个缓存节点就相当于 Hash 换上的一个节点,接入集群时,所有实例都将均匀占有这些哈希槽,当需要查询一个 Key 是,首先根据 Key 的 hashcode 对 16384 取余来得到 Key 属于哪个槽,并映射到缓存实例上。
- 为了方便描述,将 16384 抽象为 20 个哈希槽单位,在增加缓存实例时,假设原来有 4 个节点,分配的哈希槽为 0-4,5-9,10-14,15-19 现在增加一个节点,Redis Cluster 的做法是将之前每台服务器上的一部分哈希槽移动到第四个节点上,更新后的哈希槽分配为 1-4,6-9,11-14,16-19 第四个节点为 0,5,10,15。删除也是同理
去中心化
- 每个节点都保存有完整的哈希槽 - 节点的映射表,也就是说,每个节点都知道自己拥有哪些哈希槽,以及某个确定的哈希槽究竟对应着哪个节点。
- 无论向哪个节点发出寻找 Key 的请求,该节点都会通过求余计算该 Key 究竟存在于哪个哈希槽,并将请求转发至该哈希槽所在的节点。
一致性 Hash 算法实现
数据结构选取
一致性 Hash 算法首先需要考虑的问题就是:构造一个长度为 2^32 的整数环,根据节点名称的 Hash 值将服务器节点放置在这个 Hash 环上。排序 + List
算出所有待加入的数据结构的节点名称的 Hash 值放入一个数组中,然后使用排序算法将其从小到大进行排序,最后将排序后的数据放入 List 中。之后,待路由的节点,只需要在 List 中找到第一个 Hash 值比它大的服务器节点就可以了。比如服务器节点的 Hash 值是 [0,2,4,6,8,10],待路由的节点是 7,则只需要找到第一个比 7 大的整数,也就是 8,就是我们要找的最终需要路由过去的服务器节点。
直接使用 Arrays#sort() 时间复杂度为 O (NLogN),则总的时间复杂度为:
最好情况,一次找到 O (1) + O (NLogN)
- 最坏情况,最后一次找到 O (N) + O (NLogN)
遍历 + List
由于排序比较消耗性能,那么可以选择不排序,直接遍历的方式:
- 服务器节点不排序,其 Hash 值全部放到一个 List 中
- 等待路由的节点,算出其 Hash 值,由于指明了顺时针,因此遍历 List,比待路由节点 Hash 值大的算出差值并记录,比待路由节点 Hash 值小的忽略
- 算出所有的差值之后,最小的那个,就是最终需要路由过去的节点
现在算下时间复杂度:
- 最好的情况是只有一个服务器节点的 Hash 值大于带路由节点的 Hash 值,O (N) + O (1)
- 最坏的情况是所有服务器节点的 Hash 值都大于待路由节点的 Hash 值,时间复杂度是 O (N) + (N)
红黑树
选用红黑树的原因
- 红黑树主要的作用是用于存储有序的数据,因此相当于省略了排序这步骤,效率很高
- JDK 里面提供了红黑树的代码实现 TreeMap 和 TreeSet
- 红黑树提供了一个方法 ceilingEntry (Integer key) 可以直接获取 key 右边的第一个节点,如果节点为空,表示已经到尾,则直接取树的第一个节点
Hash 值重新计算
服务器节点一般用字符串表示,比如 192.168.1.1,根据字符串得到其 Hash 值,所以一个重要的问题就是 Hash 值重新计算,首先看下直接使用 String#hashCode() 的结果
public static void main(String[] args) {System.out.println("192.168.0.0:111 的哈希值:" + Math.abs("192.168.0.0:111".hashCode()));System.out.println("192.168.0.1:111 的哈希值:" + Math.abs("192.168.0.1:111".hashCode()));System.out.println("192.168.0.2:111 的哈希值:" + Math.abs("192.168.0.2:111".hashCode()));System.out.println("192.168.0.3:111 的哈希值:" + Math.abs("192.168.0.3:111".hashCode()));System.out.println("192.168.0.4:111 的哈希值:" + Math.abs("192.168.0.4:111".hashCode()));}/** output **/192.168.0.0:111 的哈希值:771739798192.168.0.1:111 的哈希值:770816277192.168.0.2:111 的哈希值:769892756192.168.0.3:111 的哈希值:768969235192.168.0.4:111 的哈希值:768045714
明显可以看到,在 [0,2^23 - 1] 这个大区间中,5 和 HashCode 值仅仅分布在一个小区间,导致某个服务器的负载会特别大。因此需要一种新的计算 Hash 值的方法, 这种重新计算 Hash 值的算法有很多,比如 CRC32_HASH、FNV1_32_HASH、KETAMA_HASH 等,其中 KETAMA_HASH 是默认的 Reids 推荐的一致性 Hash 算法,用别的 Hash 算法也可以,比如 FNV1_32_HASH 算法的计算效率就会高一些,这里选用 FNV1_32_HASH 算法
虚拟节点
没有虚拟节点,就会出现前文提到的 Hash 环数据倾斜问题,某种程度上来说,这样就失去了负载均衡的意义,因为负载均衡的目的就是为了使得目标服务器均分所有的请求
可以通过引入虚拟节点减小负载均衡问题,查阅资料了解到,物理服务器很少,则需要更多的虚拟节点;反之,物理服务器比较多,虚拟节点就少一点。
构造虚拟节点需要考虑两个问题:
- 一个真实的节点如何对应多个虚拟节点
- 虚拟节点找到后如何还原成真实的节点
参考其他博客有个简单的方法,给每个真实节点机上后缀再取 Hash 值,比如:”192.168.0.0:111” 就可以变成 “192.168.0.0:111&&VN0”,”192.168.0.0:111&&VN1”,”192.168.0.0:111&&VN2”,”192.168.0.0:111&&VN3”,还原的时候只需要从头截取字符串到 ”&&“ 位置就可以了
代码实现
通用部分
通用接口
public interface LoadBalancer {// 添加服务器节点public void addServerNode(String serverNodeName);// 删除服务器节点public HashMap<String, String> delServerNode(String serverNodeName);// 选择服务器节点public String selectServerNode(String requestURL);}
使用 FNVI_32_HASH 算法计算 Hash 值
public class GetHashCode {private static final long FNV_32_INIT = 2166136261L;private static final int FNV_32_PRIME = 16777619;public int getHashCode(String origin) {final int p = FNV_32_PRIME;int hash = (int)FNV_32_INIT;for (int i = 0; i < origin.length(); i++) {hash = (hash ^ origin.charAt(i)) * p;}hash += hash << 13;hash ^= hash >> 7;hash += hash << 3;hash ^= hash >> 17;hash += hash << 5;hash = Math.abs(hash);return hash;}}
请求 IP 构造
public class IPAddressGenerate {public String[] getIPAddress(int num) {String[] res = new String[num];Random random = new Random();for (int i = 0; i < num; i++) {res[i] = String.valueOf(random.nextInt(256)) + "." + String.valueOf(random.nextInt(256)) + "."+ String.valueOf(random.nextInt(256)) + "." + String.valueOf(random.nextInt(256)) + ":"+ String.valueOf(random.nextInt(9999));}return res;}}
请求在每台服务器上分布的数量方差计算
public class Analysis {// <节点,服务器>public double analysis(HashMap<String, String> map) {Iterator<Map.Entry<String, String>> iterator = map.entrySet().iterator();// 计数 map <Server, Count>ConcurrentHashMap<String, Integer> countMap = new ConcurrentHashMap<>();while (iterator.hasNext()) {Map.Entry<String, String> entry = iterator.next();String server = entry.getValue();if (countMap.containsKey(server)) {countMap.put(server, countMap.get(server) + 1);} else {countMap.put(server, 1);}}Collection<Integer> values = countMap.values();Iterator<Integer> val = values.iterator();int count = 0;int[] res = new int[values.size()];while (val.hasNext()) {res[count] = val.next();}return variance(res);}// 求方差private static double variance(int[] arr) {int m = arr.length;double sum = 0;for (int i = 0; i < m; i++) {sum += arr[i];}double dAve = sum / m;double dVar = 0;for (int i = 0; i < m; i++) {dVar += (arr[i] - dAve) * (arr[i] - dAve);}return dVar / m;}}
不带虚拟节点实现
public class SortedMapWithoutVirtualNode implements LoadBalancer {private static Logger logger = LoggerFactory.getLogger(SortedMapWithoutVirtualNode.class);private TreeMap<Integer, String> treeMapHash;@Overridepublic void addServerNode(String serverNodeName) {int hash = new GetHashCode().getHashCode(serverNodeName);treeMapHash.put(hash, serverNodeName);}@Overridepublic void delServerNode(String serverNodeName) {int hash = new GetHashCode().getHashCode(serverNodeName);treeMapHash.remove(hash);logger.info("服务器节点:{} 下线", serverNodeName);}@Overridepublic String selectServerNode(String requestURL) {int hash = new GetHashCode().getHashCode(requestURL);// 向右寻找第一个 keyMap.Entry<Integer, String> subEntry= treeMapHash.ceilingEntry(hash);// 设置成一个环,如果超过尾部,则取第一个点subEntry = subEntry == null ? treeMapHash.firstEntry() : subEntry;return subEntry.getValue();}// 构建 Hash 环public SortedMap<Integer, String> buildHash(TreeMap<Integer, String> treeMap) {this.treeMapHash = treeMap;return treeMapHash;}}
测试代码
public class SortedMapWithoutVirtualNodeTest {private static Logger logger = LoggerFactory.getLogger(SortedMapWithoutVirtualNodeTest.class);private static SortedMapWithoutVirtualNode sortedMapWithoutVirtualNode = new SortedMapWithoutVirtualNode();// Hash 环private static SortedMap<Integer, String> treeMapHash;// 服务器总数private static final int SERVERS_NUM = 100;// 待加入 Hash 环的服务器列表private static ArrayList<String> servers = new ArrayList<>();private static void init() {// 构造服务器数据for (int i = 0; i < SERVERS_NUM; i++) {StringBuilder stringBuilder = new StringBuilder();servers.add(stringBuilder.append("192.168.0.").append(String.valueOf(i)).toString());}// 构建 Hash 环treeMapHash = sortedMapWithoutVirtualNode.buildHash(new TreeMap<Integer, String>());// 将服务器添加到 Hash 环中for (int i = 0; i < SERVERS_NUM; i++) {sortedMapWithoutVirtualNode.addServerNode(servers.get(i));}}public static void main(String[] args) {init();// 请求节点String[] nodes = new IPAddressGenerate().getIPAddress(10000);// <节点,服务器>HashMap<String, String> map = new HashMap<>();for (int i = 0; i < nodes.length; i++) {// 选择服务器String serverIP = sortedMapWithoutVirtualNode.selectServerNode(nodes[i]);// 记录服务器信息map.put(nodes[i], serverIP);}logger.info("初始方差: " + new Analysis().analysis(map));}}
带虚拟节点实现
public class SortedMapWithVirtualNode implements LoadBalancer {private static Logger logger = LoggerFactory.getLogger(SortedMapWithVirtualNode.class);private TreeMap<Integer, String> treeMapHash;@Overridepublic void addServerNode(String serverNodeName) {int hash = new GetHashCode().getHashCode(serverNodeName);treeMapHash.put(hash, serverNodeName);// logger.info("服务器节点:{} 上线", serverNodeName);}@Overridepublic void delServerNode(String serverNodeName) {int hash = new GetHashCode().getHashCode(serverNodeName);treeMapHash.remove(hash);logger.info("服务器节点:{} 下线", serverNodeName);}@Overridepublic String selectServerNode(String requestURL) {int hash = new GetHashCode().getHashCode(requestURL);// 向右寻找第一个 keyMap.Entry<Integer, String> subEntry = treeMapHash.ceilingEntry(hash);// 设置成一个环,如果超过尾部,则取第一个点subEntry = subEntry == null ? treeMapHash.firstEntry() : subEntry;String VNNode = subEntry.getValue();return VNNode.substring(0, VNNode.indexOf("&&"));}// 构建 Hash 环public SortedMap<Integer, String> buildHash(TreeMap<Integer, String> treeMap) {this.treeMapHash = treeMap;return treeMapHash;}}
测试代码
public class SortedMapWithVirtualNodeTest {private static Logger logger = LoggerFactory.getLogger(SortedMapWithVirtualNodeTest.class);private static SortedMapWithVirtualNode sortedMapWithVirtualNode = new SortedMapWithVirtualNode();// Hash 环private static SortedMap<Integer, String> treeMapHash;// 服务器总数private static final int SERVERS_NUM = 100;// 每台服务器需要设置的虚拟节点数private static final int VIRTUAL_NODES = 10;// 待加入 Hash 环的服务器列表private static ArrayList<String> serverList = new ArrayList<>();private static void init() {// 构造服务器数据for (int i = 0; i < SERVERS_NUM; i++) {String s = new StringBuilder().append("192.168.0.").append(String.valueOf(i)).toString();serverList.add(s);}// 构建 Hash 环treeMapHash = sortedMapWithVirtualNode.buildHash(new TreeMap<Integer, String>());// 将服务器的虚拟节点添加到 Hash 环中for (String s : serverList) {for (int i = 0; i < VIRTUAL_NODES; i++) {String VNNode = s + "&&VN" + String.valueOf(i);sortedMapWithVirtualNode.addServerNode(VNNode);}}}public static void main(String[] args) {init();// <节点,服务器>HashMap<String, String> map = new HashMap<>();// 请求节点String[] nodes = new IPAddressGenerate().getIPAddress(10000);// <节点,服务器>for (int i = 0; i < nodes.length; i++) {// 选择服务器String serverIP = sortedMapWithVirtualNode.selectServerNode(nodes[i]);// 记录服务器信息map.put(nodes[i], serverIP);}logger.info("虚拟节点,初始方差: " + new Analysis().analysis(map));}}
性能分析
模拟 1000 个请求,100 台服务器,每台服务器 100 个虚拟节点,比较方差为:
- 不带虚拟节点:143.98
- 带虚拟节点为:51.32
参考其他博客,使用 FNVI_32_HASH 算法计算 Hash 值,在服务器增加后,缓存的命中率为 78% 左右
