本文共 6100 字,大约阅读时间需要 20 分钟。
- InterProcessLock (接口)
- InterProcessMutex (可重入互斥锁)
- InterProcessMultiLock (多个锁的管理容器)
- InterProcessReadWriteLock (读写锁)
- InterProcessSemaphoreMutex (不可重入的互斥锁)
使用很简单,拿InterProcessMutex 做个例子:
org.apache.curator curator-recipes 4.2.0
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new RetryNTimes( 10, 5000)); client.start();//启动客户端别忘记了 InterProcessMutex lock = new InterProcessMutex(client, "/mylock"); CountDownLatch countDownLatch = new CountDownLatch(2); new Thread(()->{ try { if (lock.acquire(5, TimeUnit.SECONDS)){ System.out.println(Thread.currentThread().getName()+" 获得了锁"); lock.release(); }else System.out.println(Thread.currentThread().getName()+" 没拿到锁"); } catch (Exception e) { e.printStackTrace(); } countDownLatch.countDown(); },"一号线程").start(); new Thread(()->{ try { if (lock.acquire(5, TimeUnit.SECONDS)){ System.out.println(Thread.currentThread().getName()+" 获得了锁"); lock.release(); }else System.out.println(Thread.currentThread().getName()+" 没拿到锁"); } catch (Exception e) { e.printStackTrace(); } countDownLatch.countDown(); },"二号线程").start(); countDownLatch.await();
创建EPHEMERAL临时节点
释放时再删除就可以了 其中要注意一点
,代码中没有写,为了直观看原理 在分布式中如果请求时间超过阈值
,会触发retry
,但retry的机器也许不会是同一台,在你触发熔断时
,可能已经获取到锁了
,但是没有人去释放,因为下面的代码为了提高性能,不重复创建连接,client是在单例模式中永久开启的,除非宕机,否则就会死锁,在实际使用中要注意设置锁获取的超时时间,或给每个线程使用TLS声明client(不推荐),或使用MQ,在消费消息时使用分布式锁。
import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.cache.PathChildrenCache;import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;import org.apache.curator.retry.RetryNTimes;import org.apache.zookeeper.CreateMode;import org.springframework.stereotype.Component;@Componentpublic class MyLock { private static final String LOCK_NAMESPACE = "/lock"; private static final String LOCK_SEPARATOR = "/"; private final CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.0.111", new RetryNTimes(5, 5000)); private final PathChildrenCache cache = new PathChildrenCache(client, LOCK_NAMESPACE,false); //监视器不在这初始化,因为如果使用tryLock没有必要添加监视器 private final ThreadLocalcacheListener = new ThreadLocal<>(); public MyLock() throws Exception { client.start(); cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); } public String tryLock(String lockName) { final String lockFullPath = LOCK_NAMESPACE + LOCK_SEPARATOR+ lockName; try { client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(lockFullPath, null); return lockFullPath; } catch (Exception e) { return null; } } public String acquire(String lockName) throws InterruptedException { final String lockFullPath = LOCK_NAMESPACE + LOCK_SEPARATOR+ lockName; try { client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(lockFullPath, null); return lockFullPath; } catch (Exception ignored) { } //添加监视器,用来进行重试通知 //但是需要注意里面的幂等性问题 //也许设置监视器过程中其他worker释放了锁 //所以无法监视到节点被移除,一直在wait等待。 if (cacheListener.get()==null){ cacheListener.set((CuratorFramework client, PathChildrenCacheEvent event)->{ if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { String oldPath = event.getData().getPath(); System.out.println("上一个节点 " + oldPath + " 已经被移除"); if (oldPath.equals(lockFullPath)){ synchronized (this) { //通知 this.notify(); } } } }); cache.getListenable().addListener(cacheListener.get()); try { //设置完监听器 再尝试一次,避免因为设置监视器的期间,锁被其他worker释放造成无法通知wait client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(lockFullPath, null); //移除监视器 如果创建没有异常,说明设置监视器期间,锁被释放,我们获取到了锁,这时移除监视器,返回即可。 if (cacheListener.get()!=null) { cache.getListenable().removeListener(cacheListener.get()); cacheListener.remove(); } return lockFullPath; } catch (Exception ignored) { } } synchronized (this){ this.wait(); } return acquire(lockName);}public void release(String lockFullPath) { try { if (client.checkExists().forPath(lockFullPath) != null) { //移除监视器 if (cacheListener.get()!=null) { cache.getListenable().removeListener(cacheListener.get()); cacheListener.remove(); } //移除节点 client.delete().forPath(lockFullPath); } } catch (Exception ignored) { }}}
可优化的部分:
为了避免羊群效应可以使用有序的临时节点,这样只需要监听自己之前的节点即可。懒得写了。
为什么使用ZK做分布式锁?
- 因为可以不设置过期时间,不会导致可能任务没结束其他服务拿到锁,并且临时节点的特性将不会因为获得锁后宕机,导致其他所有服务无法获取锁。 有兴趣可以看我另外一篇redis分布式锁的实现,里面有一些关于幂等性和死锁问题的小细节。
- 因为Redis实现需要不停的setnx尝试,而Zookeeper可以设置一个Watcher,这样在获取锁失败的时候,可以不需要不停的尝试,在Watcher监视到节点被删除后,再尝试获取锁即可。
- Redis分布式锁有没有办法不需要不停的尝试? 有,redis的pubsub同样可以做到锁的移除通知,所以没有必要纠结用什么方式。
总结: 其实分布式锁Redis,关系型数据库的行锁,Zookeeper等等都可以实现,甚至AMQP都能拿来做分布式锁,看情况看需要,如果单纯的trylock这种方式来说肯定是Redis>Zookeeper>关系型数据库,如果是普通的lock获取不到一直等待这种方式,Zookeeper>Redis>关系型数据库
转载地址:http://fxlsi.baihongyu.com/