博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
CuratorFramework Zookeeper 分布式锁的使用及手动实现
阅读量:4109 次
发布时间:2019-05-25

本文共 6100 字,大约阅读时间需要 20 分钟。

CuratorFramework Zookeeper 分布式锁的使用及手动实现

CuratorFramework 已经实现好的分布式锁

  • InterProcessLock (接口)
  • InterProcessMutex (可重入互斥锁)
  • InterProcessMultiLock (多个锁的管理容器)
  • InterProcessReadWriteLock (读写锁)
  • InterProcessSemaphoreMutex (不可重入的互斥锁)

使用很简单,拿InterProcessMutex 做个例子:

org.apache.curator
curator-recipes
4.2.0
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new RetryNTimes(                10, 5000));                        client.start();//启动客户端别忘记了                InterProcessMutex lock = new InterProcessMutex(client, "/mylock");        CountDownLatch countDownLatch = new CountDownLatch(2);        new Thread(()->{
try {
if (lock.acquire(5, TimeUnit.SECONDS)){
System.out.println(Thread.currentThread().getName()+" 获得了锁"); lock.release(); }else System.out.println(Thread.currentThread().getName()+" 没拿到锁"); } catch (Exception e) {
e.printStackTrace(); } countDownLatch.countDown(); },"一号线程").start(); new Thread(()->{
try {
if (lock.acquire(5, TimeUnit.SECONDS)){
System.out.println(Thread.currentThread().getName()+" 获得了锁"); lock.release(); }else System.out.println(Thread.currentThread().getName()+" 没拿到锁"); } catch (Exception e) {
e.printStackTrace(); } countDownLatch.countDown(); },"二号线程").start(); countDownLatch.await();

手动实现一个简单的不可重入锁

创建EPHEMERAL临时节点

释放时再删除就可以了
其中要注意一点,代码中没有写,为了直观看原理
在分布式中如果请求时间超过阈值会触发retry,但retry的机器也许不会是同一台,在你触发熔断时,可能已经获取到锁了,但是没有人去释放,因为下面的代码为了提高性能,不重复创建连接,client是在单例模式中永久开启的,除非宕机,否则就会死锁,在实际使用中要注意设置锁获取的超时时间,或给每个线程使用TLS声明client(不推荐),或使用MQ,在消费消息时使用分布式锁。

import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.cache.PathChildrenCache;import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;import org.apache.curator.retry.RetryNTimes;import org.apache.zookeeper.CreateMode;import org.springframework.stereotype.Component;@Componentpublic class MyLock {
private static final String LOCK_NAMESPACE = "/lock"; private static final String LOCK_SEPARATOR = "/"; private final CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.0.111", new RetryNTimes(5, 5000)); private final PathChildrenCache cache = new PathChildrenCache(client, LOCK_NAMESPACE,false); //监视器不在这初始化,因为如果使用tryLock没有必要添加监视器 private final ThreadLocal
cacheListener = new ThreadLocal<>(); public MyLock() throws Exception {
client.start(); cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); } public String tryLock(String lockName) {
final String lockFullPath = LOCK_NAMESPACE + LOCK_SEPARATOR+ lockName; try {
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(lockFullPath, null); return lockFullPath; } catch (Exception e) {
return null; } } public String acquire(String lockName) throws InterruptedException {
final String lockFullPath = LOCK_NAMESPACE + LOCK_SEPARATOR+ lockName; try {
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(lockFullPath, null); return lockFullPath; } catch (Exception ignored) {
} //添加监视器,用来进行重试通知 //但是需要注意里面的幂等性问题 //也许设置监视器过程中其他worker释放了锁 //所以无法监视到节点被移除,一直在wait等待。 if (cacheListener.get()==null){
cacheListener.set((CuratorFramework client, PathChildrenCacheEvent event)->{
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
String oldPath = event.getData().getPath(); System.out.println("上一个节点 " + oldPath + " 已经被移除"); if (oldPath.equals(lockFullPath)){
synchronized (this) {
//通知 this.notify(); } } } }); cache.getListenable().addListener(cacheListener.get()); try {
//设置完监听器 再尝试一次,避免因为设置监视器的期间,锁被其他worker释放造成无法通知wait client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(lockFullPath, null); //移除监视器 如果创建没有异常,说明设置监视器期间,锁被释放,我们获取到了锁,这时移除监视器,返回即可。 if (cacheListener.get()!=null) {
cache.getListenable().removeListener(cacheListener.get()); cacheListener.remove(); } return lockFullPath; } catch (Exception ignored) {
} } synchronized (this){
this.wait(); } return acquire(lockName);}public void release(String lockFullPath) {
try {
if (client.checkExists().forPath(lockFullPath) != null) {
//移除监视器 if (cacheListener.get()!=null) {
cache.getListenable().removeListener(cacheListener.get()); cacheListener.remove(); } //移除节点 client.delete().forPath(lockFullPath); } } catch (Exception ignored) {
}}}

可优化的部分:

为了避免羊群效应可以使用有序的临时节点,这样只需要监听自己之前的节点即可。懒得写了。

为什么使用ZK做分布式锁?

  1. 因为可以不设置过期时间,不会导致可能任务没结束其他服务拿到锁,并且临时节点的特性将不会因为获得锁后宕机,导致其他所有服务无法获取锁。
    有兴趣可以看我另外一篇redis分布式锁的实现,里面有一些关于幂等性和死锁问题的小细节。
  2. 因为Redis实现需要不停的setnx尝试,而Zookeeper可以设置一个Watcher,这样在获取锁失败的时候,可以不需要不停的尝试,在Watcher监视到节点被删除后,再尝试获取锁即可。
  3. Redis分布式锁有没有办法不需要不停的尝试?
    有,redis的pubsub同样可以做到锁的移除通知,所以没有必要纠结用什么方式。

总结: 其实分布式锁Redis,关系型数据库的行锁,Zookeeper等等都可以实现,甚至AMQP都能拿来做分布式锁,看情况看需要,如果单纯的trylock这种方式来说肯定是Redis>Zookeeper>关系型数据库,如果是普通的lock获取不到一直等待这种方式,Zookeeper>Redis>关系型数据库

转载地址:http://fxlsi.baihongyu.com/

你可能感兴趣的文章
Java中实例对象的状态转换简图
查看>>
设置MyEclipse的默认编码方式为"UTF-8"
查看>>
Java实现WebSocket聊天
查看>>
java实体类实现序列化的意义
查看>>
Spring MVC 框架搭建及详解
查看>>
FreeMarker帮助手册
查看>>
开启root远程连接mysql
查看>>
a标签中href常见参数的区别
查看>>
SpringMVC 基础教程 框架分析
查看>>
SpringMVC 文件上传配置,多文件上传,使用的MultipartFile
查看>>
SpringMVC+ibatis+maven+shiro环境搭建过程中的基本注意事项
查看>>
C#程序打包成安装项目详解
查看>>
Java中的基本数据类型
查看>>
Webpack Bundle Analyzer ,可缩放可视化文件大小配置
查看>>
Cannot read property compilation of undefined vue 打包运行npm run build 报错
查看>>
vue解决 有ts语法,vscode提示错误问题
查看>>
vue打包时候提示:Cannot read property compilation of undefined
查看>>
推荐:vue解决引入外部css问题和解决css引入背景问题
查看>>
vue.js页面加载数据时加载动画过渡效果
查看>>
推荐Node.js跨域问题实用解决方法
查看>>