zookeeper 应用场景
一、分布式集群管理
- 主动查看线上服务节点
- 查看服务节点资源使用情况
- 服务离线通知
- 服务资源(CPU、内存、硬盘)超出阀值通知
架构设计:

节点结构:
server-manager //根节点
server00000001: //服务节点
server00000002: //服务节点
server00000003: //服务节点
服务状态信息:
a. ip
b. cpu
c. memory
d. disk
功能实现
各个服务节点数据生成与上报
集群管理中心主动查询(拉)
监听被动通知节点变化情况,根据规则实时告警等(推)
关键示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
| public class Agent { public static void main(String[] args) { Agent.premain(null,null); }
private String server = "192.168.136.1:2181"; ZkClient zkClient; private static Agent instance; private static final String rootPath = "/server-manger"; private static final String servicePath = rootPath + "/service"; private String nodePath; private Thread stateThread; List<OsBean> list = new ArrayList<>();
public static void premain(String args, Instrumentation instrumentation) { instance = new Agent(); if (args != null) { instance.server = args; } instance.init(); }
public void init() { zkClient = new ZkClient(server); System.out.println("zk连接成功" + server); buildRoot(); createServerNode(); stateThread = new Thread(() -> { while (true) { updateServerNode(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }, "zk_stateThread"); stateThread.setDaemon(true); stateThread.start(); while (true){
} }
public void buildRoot() { if (!zkClient.exists(rootPath)) { zkClient.createPersistent(rootPath); } }
public void createServerNode() { nodePath = zkClient.createEphemeralSequential(servicePath, getOsInfo()); System.out.println("创建节点:" + nodePath); }
public void updateServerNode() { zkClient.writeData(nodePath, getOsInfo()); }
public String getOsInfo() { OsBean bean = new OsBean();
bean.lastUpdateTime = System.currentTimeMillis(); bean.ip = getLocalIp(); bean.cpu = CPUMonitorCalc.getInstance().getProcessCpu(); MemoryUsage memoryUsag = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); bean.usableMemorySize = memoryUsag.getUsed() / 1024 / 1024; bean.maxMemorySize = memoryUsag.getMax() / 1024 / 1024; return JSON.toJSONString(bean); }
public void updateNode(String path, Object data) { if (zkClient.exists(path)) { zkClient.writeData(path, data); } else { zkClient.createEphemeral(path, data); } }
public static String getLocalIp() { InetAddress addr = null; try { addr = InetAddress.getLocalHost(); } catch (UnknownHostException e) { throw new RuntimeException(e); } return addr.getHostAddress(); }
}
|

效果图:

⼆ 、分布式注册中⼼ (服务发现与注册)
在单体式服务中,通常是由多个客户端去调用一个服务,只要在客户端中配置唯一服务节点地址即可,当升级到分布式后,服务节点变多,像阿里一线大厂服务节点更是上万之多,这么多节点不可能手动配置在客户端,这里就需要一个中间服务,专门用于帮助客户端发现服务节点,即许多技术书籍经常提到的服务发现。

一个完整的注册中心涵盖以下功能特性:
- 服务注册:提供者上线时将自提供的服务提交给注册中心。
- 服务注销:通知注册心提供者下线。
- 服务订阅:动态实时接收服务变更消息。
- 可靠:注册服务本身是集群的,数据冗余存储。避免单点故障,及数据丢失。
- 容错:当服务提供者出现宕机,断电等极情况时,注册中心能够动态感知并通知客户端服务提供者的状态。
Dubbo 对zookeeper的使用
阿里著名的开源项目Dubbo 是一个基于JAVA的RCP框架,其中必不可少的注册中心可基于多种第三方组件实现,但其官方推荐的还是Zookeeper做为注册中心服务。

节点说明:
| 类别 |
属性 |
说明 |
| Root |
持久节点 |
根节点名称,默认是 “dubbo” |
| Service |
持久节点 |
服务名称,完整的服务类名 |
| type |
持久节点 |
可选值:providers(提供者)、consumers(消费者)、configurators(动态配置)、routers |
| URL |
临时节点 |
url名称 包含服务提供者的 IP 端口 及配置等信息。 |
流程说明
- 服务提供者启动时: 向 /dubbo/com.foo.BarService/providers 目录下写入自己的 URL 地址
- 服务消费者启动时: 订阅 /dubbo/com.foo.BarService/providers 目录下的提供者 URL 地址。并向 /dubbo/com.foo.BarService/consumers 目录下写入自己的 URL 地址
- 监控中心启动时: 订阅 /dubbo/com.foo.BarService 目录下的所有提供者和消费者 URL 地址。
与其他服务发现与注册的区别
参考

三、分布式JOB

- 多个服务节点只允许其中一个主节点运行JOB任务。
- 当主节点挂掉后能自动切换主节点,继续执行JOB任务。
node结构:
- server-master
- server0001:master
- server0002:slave
- server000n:slave
选举流程:
服务启动:
- 在server-master下创建server子节点,值为slave
- 获取所有server-master 下所有子节点
- 判断是否存在master 节点
- 如果没有设置自己为master节点
子节点删除事件触发:
- 获取所有server-master 下所有子节点
- 判断是否存在master 节点
- 如果没有设置最小值序号为master 节点
四、分布式锁
锁的的基本概念:
开发中锁的概念并不陌生,通过锁可以实现在多个线程或多个进程间在争抢资源时,能够合理的分配置资源的所有权。在单体应用中我们可以通过 synchronized 或ReentrantLock 来实现锁。但在分布式系统中,仅仅是加synchronized 是不够的,需要借助第三组件来实现。比如一些简单的做法是使用 关系型数据行级锁来实现不同进程之间的互斥,但大型分布式系统的性能瓶颈往往集中在数据库操作上。为了提高性能得采用如Redis、Zookeeper之内的组件实现分布式锁。
共享锁:也称作只读锁,当一方获得共享锁之后,其它方也可以获得共享锁。但其只允许读取。在共享锁全部释放之前,其它方不能获得写锁。
排它锁:也称作读写锁,获得排它锁后,可以进行数据的读写。在其释放之前,其它方不能获得任何锁。
锁的获取:
某银行帐户,可以同时进行帐户信息的读取,但读取其间不能修改帐户数据。其帐户ID为:888
获得读锁流程:
1、基于资源ID创建临时序号读锁节点 /lock/888.R0000000002 Read
2、获取 /lock 下所有⼦节点,判断其最⼩的节点是否为读锁,如果是则获锁成功
3、最⼩节点不是读锁,则阻塞等待。添加lock/ ⼦节点变更监听。
4、当节点变更监听触发,执⾏第2步

数据结构:

获得写锁流程:
1、基于资源ID创建临时序号写锁节点 /lock/888.R0000000002 Write
2、获取 /lock 下所有子节点,判断其最小的节点是否为自己,如果是则获锁成功
3、最小节点不是自己,则阻塞等待。添加lock/ 子节点变更监听。
4、当节点变更监听触发,执行第2步
释放锁:
读取完毕后,⼿动删除临时节点,如果获锁期间宕机,则会在会话失效后⾃动删除。
关于羊群效应:
在等待锁获得期间,所有等待节点都在监听 Lock节点,一但lock 节点变更所有等待节点都会被触发,然后在同时反查Lock 子节点。如果等待对例过大会使用Zookeeper承受非常大的流量压力。
为了改善这种情况,可以采用监听链表的方式,每个等待对列只监听前一个节点(如果本身是读锁,则监听自己之前的最近的一个读锁),如果前一个节点释放锁的时候,才会被触发通知。这样就形成了一个监听链表。


ZkClient自定义实现分布式锁
自定义实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
| public class ZookeeperLock { private String server = "192.168.0.149:2181"; private ZkClient zkClient; private static final String rootPath = "/tuling-lock";
public ZookeeperLock() { zkClient = new ZkClient(server, 5000, 20000); buildRoot(); }
public void buildRoot() { if (!zkClient.exists(rootPath)) { zkClient.createPersistent(rootPath); } }
public Lock lock(String lockId, long timeout) { Lock lockNode = createLockNode(lockId); lockNode = tryActiveLock(lockNode); if (!lockNode.isActive()) { try { synchronized (lockNode) { lockNode.wait(timeout); } } catch (InterruptedException e) { throw new RuntimeException(e); } } if (!lockNode.isActive()) { throw new RuntimeException(" lock timeout"); } return lockNode; }
public void unlock(Lock lock) { if (lock.isActive()) { zkClient.delete(lock.getPath()); } }
private Lock tryActiveLock(Lock lockNode) { List<String> list = zkClient.getChildren(rootPath) .stream() .sorted() .map(p -> rootPath + "/" + p) .collect(Collectors.toList()); String firstNodePath = list.get(0); if (firstNodePath.equals(lockNode.getPath())) { lockNode.setActive(true); } else { String upNodePath = list.get(list.indexOf(lockNode.getPath()) - 1); zkClient.subscribeDataChanges(upNodePath, new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception {
}
@Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println("节点删除:" + dataPath); Lock lock = tryActiveLock(lockNode); synchronized (lockNode) { if (lock.isActive()) { lockNode.notify(); } } zkClient.unsubscribeDataChanges(upNodePath, this); } }); } return lockNode; }
public Lock createLockNode(String lockId) { String nodePath = zkClient.createEphemeralSequential(rootPath + "/" + lockId, "lock"); return new Lock(lockId, nodePath); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class Lock {
private String lockId;
public Lock(String lockId, String path) { this.lockId = lockId; this.path = path; }
private String path; private Boolean isActive; public String getPath() { return path; }
public void setPath(String path) { this.path = path; }
public Boolean isActive() { return isActive; }
public void setActive(Boolean active) { isActive = active; }
public Boolean getActive() { return isActive; } }
|
代码存在的漏洞
1、监听的节点可能在添加监听前释放了,导致线程卡死
2、即使监听前判断了节点是否存在,监听的节点可能在添加监听过程中释放了,导致线程卡死
自定义实现1.0-wait|notify
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
| public class ZookeeperLock1 { private String server = "zookeeper01.com:2181"; private ZkClient zkClient; private static final String rootPath = "/distribute-lock"; private Logger logger = LoggerFactory.getLogger(ZookeeperLock1.class); private Random random = new Random();
public ZookeeperLock1() { zkClient = new ZkClient(server, 5000, 20000); buildRoot(); }
public void buildRoot() { if (!zkClient.exists(rootPath)) { zkClient.createPersistent(rootPath); } }
public Lock lock(String lockId, long timeout) throws Exception { Lock lockNode = createLockNode(lockId); lockNode = tryActiveLock(lockNode); if (!lockNode.isActive()) { try { synchronized (lockNode) { logger.warn("wait()"); lockNode.wait(timeout); } } catch (InterruptedException e) { throw new RuntimeException(e); } } if (!lockNode.isActive()) { throw new Exception(" lock timeout"); } return lockNode; }
public void unlock(Lock lock) { logger.info("释放锁:"+lock.getPath()); zkClient.delete(lock.getPath());
}
private Lock tryActiveLock(Lock lockNode) { List<String> list = zkClient.getChildren(rootPath) .stream() .sorted() .map(p -> rootPath + "/" + p) .collect(Collectors.toList()); String firstNodePath = list.get(0); if (firstNodePath.equals(lockNode.getPath())) { lockNode.setActive(true); logger.info(lockNode.getPath()+"成功获取锁"); } else { String upNodePath = list.get(list.indexOf(lockNode.getPath()) - 1); try { Thread.sleep(random.nextInt(10) * 200L ); } catch (InterruptedException e) { e.printStackTrace(); } doListenPreNode(lockNode, upNodePath); logger.info(lockNode.getPath()+"成功监听"+upNodePath); if(!zkClient.exists(upNodePath)){ logger.info(upNodePath+"已被删除,监听无效,从新尝试获取锁"); return tryActiveLock(lockNode); } } return lockNode; }
private void doListenPreNode(Lock lockNode, String upNodePath) { zkClient.subscribeDataChanges(upNodePath, new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception {
}
@Override public void handleDataDeleted(String dataPath) throws Exception { logger.info("监听到节点删除:" + dataPath); Lock lock = tryActiveLock(lockNode); synchronized (lockNode) { if (lock.isActive()) { logger.info(dataPath+"节点监听后抢到锁:" + lock.getPath()); lockNode.notify(); } } zkClient.unsubscribeDataChanges(upNodePath, this); } }); }
public Lock createLockNode(String lockId) { String nodePath = zkClient.createEphemeralSequential(rootPath + "/", "lock"); return new Lock(lockId, nodePath); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public class LockTest {
private static Integer i = 0;
public static void main(String[] args) throws InterruptedException, KeeperException { Logger logger = LoggerFactory.getLogger(LockTest.class); for (int j = 0; j < 1000; j++) { new Thread(() -> { i = i+1; }).start(); } Thread.sleep(100); i = 0; ZookeeperLock1 zookeeperLock1 = new ZookeeperLock1(); ExecutorService executorService = Executors.newCachedThreadPool(); for (int j = 1; j <= 1000; j++) { String lockId = j + ""; executorService.submit(new Runnable() { @Override public void run() { String lock = null; Lock lock1 = null; try { lock1 = zookeeperLock1.lock(lockId, 5000L); i = i + Integer.parseInt(lockId); logger.info("i:"+i); } catch (Exception e) { e.printStackTrace(); logger.info("执行异常"); }finally { zookeeperLock1.unlock(lock1); }
} }); } } }
|
自定义实现2.0-利用CountDownLatch阻塞
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
| public class ZookeeperLock { private String server = "zookeeper01.com:2181"; private ZkClient zkClient; private static final String rootPath = "/distribute-lock";
public ZookeeperLock() { zkClient = new ZkClient(server, 50000,50000); buildRoot(); }
public void buildRoot() { if (!zkClient.exists(rootPath)) { zkClient.createPersistent(rootPath); } }
public String lock(String lockId) throws InterruptedException { String lockNode = createLockNode(lockId); tryGetDisLock(lockNode); return lockNode; }
public String lock(String lockId, long timeout) throws Exception { String lockNode = createLockNode(lockId); if(tryGetDisLock(lockNode,timeout)){ return lockNode; } return null; }
public void unlock(String lock) { System.out.println("释放:"+ lock); zkClient.delete(lock); }
private void tryGetDisLock(String lockNode) throws InterruptedException { List<String> list = zkClient.getChildren(rootPath) .stream() .sorted() .map(p -> rootPath + "/" + p) .collect(Collectors.toList()); String firstNodePath = list.get(0); if (firstNodePath.equals(lockNode)) { return; } else { String upNodePath = list.get(list.indexOf(lockNode) - 1); CountDownLatch countDownLatch = new CountDownLatch(1); zkClient.subscribeDataChanges(upNodePath, new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception {
}
@Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println("监控到了," + dataPath + "节点发生变化了"); countDownLatch.countDown(); zkClient.unsubscribeDataChanges(upNodePath,this); } }); if (!zkClient.exists(upNodePath)){ tryGetDisLock(lockNode); return; } countDownLatch.await(); tryGetDisLock(lockNode); } }
public Boolean tryGetDisLock(String lockNode, long timeout) throws Exception { long begin = System.currentTimeMillis(); List<String> list = zkClient.getChildren(rootPath) .stream() .sorted() .map(p -> rootPath + "/" + p) .collect(Collectors.toList()); String firstNodePath = list.get(0); if (firstNodePath.equals(lockNode)) { return true; } else { String upNodePath = list.get(list.indexOf(lockNode) - 1); CountDownLatch countDownLatch = new CountDownLatch(1); zkClient.subscribeDataChanges(upNodePath, new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception {
}
@Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println("监控到了," + dataPath + "节点发生变化了"); countDownLatch.countDown(); zkClient.unsubscribeDataChanges(upNodePath,this); } }); if (!zkClient.exists(upNodePath)){ timeout = System.currentTimeMillis() - begin; if(timeout < 0){ return false; } return tryGetDisLock(lockNode,timeout); } if(!countDownLatch.await(timeout,TimeUnit.MILLISECONDS)){ return false; } timeout = System.currentTimeMillis() - begin; if(timeout < 0){ return false; } return tryGetDisLock(lockNode,timeout); } }
public String createLockNode(String lockId) { String nodePath = zkClient.createEphemeralSequential(rootPath + "/", "lock"); return nodePath; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| public class LockTest {
private static Integer i = 0;
public static void main(String[] args) throws InterruptedException, KeeperException {
for (int j = 0; j < 1000; j++) { new Thread(() -> { i = i+1; }).start(); } Thread.sleep(100); System.out.println(i); i = 0; ZookeeperLock zookeeperLock = new ZookeeperLock(); ExecutorService executorService = Executors.newCachedThreadPool();
for (int j = 1; j <= 1000; j++) { String lockId = j + ""; executorService.submit(new Runnable() { @Override public void run() { String lock = null; try { lock = zookeeperLock.lock(lockId,15000L); if(Objects.nonNull(lock)){ i = i + Integer.parseInt(lockId); System.out.println("lockId:"+lockId); System.out.println("lock:"+lock); System.out.println("i:"+i); zookeeperLock.unlock(lock); }else{ System.out.println("lockId:"+lockId+"获取锁超时失败"); } } catch (Exception e) { e.printStackTrace(); }
} }); }
} }
|
ZK分布式锁的成熟框架curator
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.3.0</version> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
|
public class CuratorLock { private int i;
public void setI(int i) { this.i = i; }
public int getI() { return i; }
public static void main(String[] args) throws Exception { CuratorLock curatorLock = new CuratorLock(); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("zookeeper01.com:2181") .connectionTimeoutMs(10000) .sessionTimeoutMs(10000) .retryPolicy(new ExponentialBackoffRetry(3000, 4)).build(); client.start(); InterProcessMutex lock = new InterProcessMutex(client, "/distributedLock"); ExecutorService pool = Executors.newCachedThreadPool(); for (int i = 0; i < 100; i++) { pool.submit(() -> { try { lock.acquire(); curatorLock.setI(curatorLock.getI() + 1); lock.release(); } catch (Exception e) { e.printStackTrace(); } }); } Thread.sleep(10000); System.out.println(curatorLock.getI()); } }
|