博客分类:
背景
继续上一篇文章: ,项目中需要对分布式任务进行调度,那对应的分布式lock实现在所难免。
这一周,在基于BooleanMutex的基础上,实现了zookeeper的分布式锁,用于控制多进程+多线程的lock控制
算法
可以预先看一下zookeeper的官方文档:
lock操作过程:
- 首先为一个lock场景,在zookeeper中指定对应的一个根节点,用于记录资源竞争的内容
- 每个lock创建后,会lazy在zookeeper中创建一个node节点,表明对应的资源竞争标识。 (小技巧:node节点为EPHEMERAL_SEQUENTIAL,自增长的临时节点)
- 进行lock操作时,获取对应lock根节点下的所有字节点,也即处于竞争中的资源标识
- 按照Fair竞争的原则,按照对应的自增内容做排序,取出编号最小的一个节点做为lock的owner,判断自己的节点id是否就为owner id,如果是则返回,lock成功。
- 如果自己非owner id,按照排序的结果找到序号比自己前一位的id,关注它锁释放的操作(也就是exist watcher),形成一个链式的触发过程。
unlock操作过程:
- 将自己id对应的节点删除即可,对应的下一个排队的节点就可以收到Watcher事件,从而被唤醒得到锁后退出
其中的几个关键点:
- node节点选择为EPHEMERAL_SEQUENTIAL很重要。 * 自增长的特性,可以方便构建一个基于Fair特性的锁,前一个节点唤醒后一个节点,形成一个链式的触发过程。可以有效的避免"惊群效应"(一个锁释放,所有等待的线程都被唤醒),有针对性的唤醒,提升性能。 * 选择一个EPHEMERAL临时节点的特性。因为和zookeeper交互是一个网络操作,不可控因素过多,比如网络断了,上一个节点释放锁的操作会失败。临时节点是和对应的session挂接的,session一旦超时或者异常退出其节点就会消失,类似于ReentrantLock中等待队列Thread的被中断处理。
- 获取lock操作是一个阻塞的操作,而对应的Watcher是一个异步事件,所以需要使用信号进行通知,正好使用上一篇文章中提到的BooleanMutex,可以比较方便的解决锁重入的问题。(锁重入可以理解为多次读操作,锁释放为写抢占操作)
注意:
- 使用EPHEMERAL会引出一个风险:在非正常情况下,网络延迟比较大会出现session timeout,zookeeper就会认为该client已关闭,从而销毁其id标示,竞争资源的下一个id就可以获取锁。这时可能会有两个process同时拿到锁在跑任务,所以设置好session timeout很重要。
- 同样使用PERSISTENT同样会存在一个死锁的风险,进程异常退出后,对应的竞争资源id一直没有删除,下一个id一直无法获取到锁对象。
没有两全其美的做法,两者取其一,选择自己一个能接受的即可
代码
- public class DistributedLock {
- private static final byte[] data = { 0x12, 0x34 };
- private ZooKeeperx zookeeper = ZooKeeperClient.getInstance();
- private final String root; //根节点路径
- private String id;
- private LockNode idName;
- private String ownerId;
- private String lastChildId;
- private Throwable other = null;
- private KeeperException exception = null;
- private InterruptedException interrupt = null;
- public DistributedLock(String root) {
- this.root = root;
- ensureExists(root);
- }
- /**
- * 尝试获取锁操作,阻塞式可被中断
- */
- public void lock() throws InterruptedException, KeeperException {
- // 可能初始化的时候就失败了
- if (exception != null) {
- throw exception;
- }
- if (interrupt != null) {
- throw interrupt;
- }
- if (other != null) {
- throw new NestableRuntimeException(other);
- }
- if (isOwner()) { //锁重入
- return;
- }
- BooleanMutex mutex = new BooleanMutex();
- acquireLock(mutex);
- // 避免zookeeper重启后导致watcher丢失,会出现死锁使用了超时进行重试
- try {
- mutex.get(DEFAULT_TIMEOUT_PERIOD, TimeUnit.MICROSECONDS);// 阻塞等待值为true
- // mutex.get();
- } catch (TimeoutException e) {
- if (!mutex.state()) {
- lock();
- }
- }
- if (exception != null) {
- throw exception;
- }
- if (interrupt != null) {
- throw interrupt;
- }
- if (other != null) {
- throw new NestableRuntimeException(other);
- }
- }
- /**
- * 尝试获取锁对象, 不会阻塞
- *
- * @throws InterruptedException
- * @throws KeeperException
- */
- public boolean tryLock() throws KeeperException {
- // 可能初始化的时候就失败了
- if (exception != null) {
- throw exception;
- }
- if (isOwner()) { //锁重入
- return true;
- }
- acquireLock(null);
- if (exception != null) {
- throw exception;
- }
- if (interrupt != null) {
- Thread.currentThread().interrupt();
- }
- if (other != null) {
- throw new NestableRuntimeException(other);
- }
- return isOwner();
- }
- /**
- * 释放锁对象
- */
- public void unlock() throws KeeperException {
- if (id != null) {
- try {
- zookeeper.delete(root + "/" + id, -1);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (KeeperException.NoNodeException e) {
- // do nothing
- } finally {
- id = null;
- }
- } else {
- //do nothing
- }
- }
- private void ensureExists(final String path) {
- try {
- Stat stat = zookeeper.exists(path, false);
- if (stat != null) {
- return;
- }
- zookeeper.create(path, data, CreateMode.PERSISTENT);
- } catch (KeeperException e) {
- exception = e;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- interrupt = e;
- }
- }
- /**
- * 返回锁对象对应的path
- */
- public String getRoot() {
- return root;
- }
- /**
- * 判断当前是不是锁的owner
- */
- public boolean isOwner() {
- return id != null && ownerId != null && id.equals(ownerId);
- }
- /**
- * 返回当前的节点id
- */
- public String getId() {
- return this.id;
- }
- // ===================== helper method =============================
- /**
- * 执行lock操作,允许传递watch变量控制是否需要阻塞lock操作
- */
- private Boolean acquireLock(final BooleanMutex mutex) {
- try {
- do {
- if (id == null) { //构建当前lock的唯一标识
- long sessionId = zookeeper.getDelegate().getSessionId();
- String prefix = "x-" + sessionId + "-";
- //如果第一次,则创建一个节点
- String path = zookeeper.create(root + "/" + prefix, data,
- CreateMode.EPHEMERAL_SEQUENTIAL);
- int index = path.lastIndexOf("/");
- id = StringUtils.substring(path, index + 1);
- idName = new LockNode(id);
- }
- if (id != null) {
- List<String> names = zookeeper.getChildren(root, false);
- if (names.isEmpty()) {
- id = null;//异常情况,重新创建一个
- } else {
- //对节点进行排序
- SortedSet<LockNode> sortedNames = new TreeSet<LockNode>();
- for (String name : names) {
- sortedNames.add(new LockNode(name));
- }
- if (sortedNames.contains(idName) == false) {
- id = null;//清空为null,重新创建一个
- continue;
- }
- //将第一个节点做为ownerId
- ownerId = sortedNames.first().getName();
- if (mutex != null && isOwner()) {
- mutex.set(true);//直接更新状态,返回
- return true;
- } else if (mutex == null) {
- return isOwner();
- }
- SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName);
- if (!lessThanMe.isEmpty()) {
- //关注一下排队在自己之前的最近的一个节点
- LockNode lastChildName = lessThanMe.last();
- lastChildId = lastChildName.getName();
- //异步watcher处理
- zookeeper.exists(root + "/" + lastChildId, new AsyncWatcher() {
- public void asyncProcess(WatchedEvent event) {
- acquireLock(mutex);
- }
- });
- if (stat == null) {
- acquireLock(mutex);// 如果节点不存在,需要自己重新触发一下,watcher不会被挂上去
- }
- } else {
- if (isOwner()) {
- mutex.set(true);
- } else {
- id = null;// 可能自己的节点已超时挂了,所以id和ownerId不相同
- }
- }
- }
- }
- } while (id == null);
- } catch (KeeperException e) {
- exception = e;
- if (mutex != null) {
- mutex.set(true);
- }
- } catch (InterruptedException e) {
- interrupt = e;
- if (mutex != null) {
- mutex.set(true);
- }
- } catch (Throwable e) {
- other = e;
- if (mutex != null) {
- mutex.set(true);
- }
- }
- if (isOwner() && mutex != null) {
- mutex.set(true);
- }
- return Boolean.FALSE;
- }
- }
public class DistributedLock { private static final byte[] data = { 0x12, 0x34 }; private ZooKeeperx zookeeper = ZooKeeperClient.getInstance(); private final String root; //根节点路径 private String id; private LockNode idName; private String ownerId; private String lastChildId; private Throwable other = null; private KeeperException exception = null; private InterruptedException interrupt = null; public DistributedLock(String root) { this.root = root; ensureExists(root); } /** * 尝试获取锁操作,阻塞式可被中断 */ public void lock() throws InterruptedException, KeeperException { // 可能初始化的时候就失败了 if (exception != null) { throw exception; } if (interrupt != null) { throw interrupt; } if (other != null) { throw new NestableRuntimeException(other); } if (isOwner()) {//锁重入 return; } BooleanMutex mutex = new BooleanMutex(); acquireLock(mutex); // 避免zookeeper重启后导致watcher丢失,会出现死锁使用了超时进行重试 try { mutex.get(DEFAULT_TIMEOUT_PERIOD, TimeUnit.MICROSECONDS);// 阻塞等待值为true // mutex.get(); } catch (TimeoutException e) { if (!mutex.state()) { lock(); } } if (exception != null) { throw exception; } if (interrupt != null) { throw interrupt; } if (other != null) { throw new NestableRuntimeException(other); } } /** * 尝试获取锁对象, 不会阻塞 * * @throws InterruptedException * @throws KeeperException */ public boolean tryLock() throws KeeperException { // 可能初始化的时候就失败了 if (exception != null) { throw exception; } if (isOwner()) {//锁重入 return true; } acquireLock(null); if (exception != null) { throw exception; } if (interrupt != null) { Thread.currentThread().interrupt(); } if (other != null) { throw new NestableRuntimeException(other); } return isOwner(); } /** * 释放锁对象 */ public void unlock() throws KeeperException { if (id != null) { try { zookeeper.delete(root + "/" + id, -1); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (KeeperException.NoNodeException e) { // do nothing } finally { id = null; } } else { //do nothing } } private void ensureExists(final String path) { try { Stat stat = zookeeper.exists(path, false); if (stat != null) { return; } zookeeper.create(path, data, CreateMode.PERSISTENT); } catch (KeeperException e) { exception = e; } catch (InterruptedException e) { Thread.currentThread().interrupt(); interrupt = e; } } /** * 返回锁对象对应的path */ public String getRoot() { return root; } /** * 判断当前是不是锁的owner */ public boolean isOwner() { return id != null && ownerId != null && id.equals(ownerId); } /** * 返回当前的节点id */ public String getId() { return this.id; } // ===================== helper method ============================= /** * 执行lock操作,允许传递watch变量控制是否需要阻塞lock操作 */ private Boolean acquireLock(final BooleanMutex mutex) { try { do { if (id == null) {//构建当前lock的唯一标识 long sessionId = zookeeper.getDelegate().getSessionId(); String prefix = "x-" + sessionId + "-"; //如果第一次,则创建一个节点 String path = zookeeper.create(root + "/" + prefix, data, CreateMode.EPHEMERAL_SEQUENTIAL); int index = path.lastIndexOf("/"); id = StringUtils.substring(path, index + 1); idName = new LockNode(id); } if (id != null) { Listnames = zookeeper.getChildren(root, false); if (names.isEmpty()) { id = null;//异常情况,重新创建一个 } else { //对节点进行排序 SortedSet sortedNames = new TreeSet (); for (String name : names) { sortedNames.add(new LockNode(name)); } if (sortedNames.contains(idName) == false) { id = null;//清空为null,重新创建一个 continue; } //将第一个节点做为ownerId ownerId = sortedNames.first().getName(); if (mutex != null && isOwner()) { mutex.set(true);//直接更新状态,返回 return true; } else if (mutex == null) { return isOwner(); } SortedSet lessThanMe = sortedNames.headSet(idName); if (!lessThanMe.isEmpty()) { //关注一下排队在自己之前的最近的一个节点 LockNode lastChildName = lessThanMe.last(); lastChildId = lastChildName.getName(); //异步watcher处理 zookeeper.exists(root + "/" + lastChildId, new AsyncWatcher() { public void asyncProcess(WatchedEvent event) { acquireLock(mutex); } }); if (stat == null) { acquireLock(mutex);// 如果节点不存在,需要自己重新触发一下,watcher不会被挂上去 } } else { if (isOwner()) { mutex.set(true); } else { id = null;// 可能自己的节点已超时挂了,所以id和ownerId不相同 } } } } } while (id == null); } catch (KeeperException e) { exception = e; if (mutex != null) { mutex.set(true); } } catch (InterruptedException e) { interrupt = e; if (mutex != null) { mutex.set(true); } } catch (Throwable e) { other = e; if (mutex != null) { mutex.set(true); } } if (isOwner() && mutex != null) { mutex.set(true); } return Boolean.FALSE; }}
相关说明:
测试代码:
- @Test
- public void test_lock() {
- ExecutorService exeucotr = Executors.newCachedThreadPool();
- final int count = 50;
- final CountDownLatch latch = new CountDownLatch(count);
- final DistributedLock[] nodes = new DistributedLock[count];
- for (int i = 0; i < count; i++) {
- final DistributedLock node = new DistributedLock(dir);
- nodes[i] = node;
- exeucotr.submit(new Runnable() {
- public void run() {
- try {
- Thread.sleep(1000);
- node.lock(); //获取锁
- Thread.sleep(100 + RandomUtils.nextInt(100));
- System.out.println("id: " + node.getId() + " is leader: " + node.isOwner());
- } catch (InterruptedException e) {
- want.fail();
- } catch (KeeperException e) {
- want.fail();
- } finally {
- latch.countDown();
- try {
- node.unlock();
- } catch (KeeperException e) {
- want.fail();
- }
- }
- }
- });
- }
- try {
- latch.await();
- } catch (InterruptedException e) {
- want.fail();
- }
- exeucotr.shutdown();
- }
@Test public void test_lock() { ExecutorService exeucotr = Executors.newCachedThreadPool(); final int count = 50; final CountDownLatch latch = new CountDownLatch(count); final DistributedLock[] nodes = new DistributedLock[count]; for (int i = 0; i < count; i++) { final DistributedLock node = new DistributedLock(dir); nodes[i] = node; exeucotr.submit(new Runnable() { public void run() { try { Thread.sleep(1000); node.lock(); //获取锁 Thread.sleep(100 + RandomUtils.nextInt(100)); System.out.println("id: " + node.getId() + " is leader: " + node.isOwner()); } catch (InterruptedException e) { want.fail(); } catch (KeeperException e) { want.fail(); } finally { latch.countDown(); try { node.unlock(); } catch (KeeperException e) { want.fail(); } } } }); } try { latch.await(); } catch (InterruptedException e) { want.fail(); } exeucotr.shutdown(); }
升级版
实现了一个分布式lock后,可以解决多进程之间的同步问题,但设计多线程+多进程的lock控制需求,单jvm中每个线程都和zookeeper进行网络交互成本就有点高了,所以基于DistributedLock,实现了一个分布式二层锁。
大致原理就是ReentrantLock 和 DistributedLock的一个结合。
- 单jvm的多线程竞争时,首先需要先拿到第一层的ReentrantLock的锁
- 拿到锁之后这个线程再去和其他JVM的线程竞争锁,最后拿到之后锁之后就开始处理任务。
锁的释放过程是一个反方向的操作,先释放DistributedLock,再释放ReentrantLock。 可以思考一下,如果先释放ReentrantLock,假如这个JVM ReentrantLock竞争度比较高,一直其他JVM的锁竞争容易被饿死。
代码:
- public class DistributedReentrantLock extends DistributedLock {
- private static final String ID_FORMAT = "Thread[{0}] Distributed[{1}]";
- private ReentrantLock reentrantLock = new ReentrantLock();
- public DistributedReentrantLock(String root) {
- super(root);
- }
- public void lock() throws InterruptedException, KeeperException {
- reentrantLock.lock();//多线程竞争时,先拿到第一层锁
- super.lock();
- }
- public boolean tryLock() throws KeeperException {
- //多线程竞争时,先拿到第一层锁
- return reentrantLock.tryLock() && super.tryLock();
- }
- public void unlock() throws KeeperException {
- super.unlock();
- reentrantLock.unlock();//多线程竞争时,释放最外层锁
- }
- @Override
- public String getId() {
- return MessageFormat.format(ID_FORMAT, Thread.currentThread().getId(), super.getId());
- }
- @Override
- public boolean isOwner() {
- return reentrantLock.isHeldByCurrentThread() && super.isOwner();
- }
- }
public class DistributedReentrantLock extends DistributedLock { private static final String ID_FORMAT = "Thread[{0}] Distributed[{1}]"; private ReentrantLock reentrantLock = new ReentrantLock(); public DistributedReentrantLock(String root) { super(root); } public void lock() throws InterruptedException, KeeperException { reentrantLock.lock();//多线程竞争时,先拿到第一层锁 super.lock(); } public boolean tryLock() throws KeeperException { //多线程竞争时,先拿到第一层锁 return reentrantLock.tryLock() && super.tryLock(); } public void unlock() throws KeeperException { super.unlock(); reentrantLock.unlock();//多线程竞争时,释放最外层锁 } @Override public String getId() { return MessageFormat.format(ID_FORMAT, Thread.currentThread().getId(), super.getId()); } @Override public boolean isOwner() { return reentrantLock.isHeldByCurrentThread() && super.isOwner(); }}
测试代码:
- @Test
- public void test_lock() {
- ExecutorService exeucotr = Executors.newCachedThreadPool();
- final int count = 50;
- final CountDownLatch latch = new CountDownLatch(count);
- final DistributedReentrantLock lock = new DistributedReentrantLock(dir); //单个锁
- for (int i = 0; i < count; i++) {
- exeucotr.submit(new Runnable() {
- public void run() {
- try {
- Thread.sleep(1000);
- lock.lock();
- Thread.sleep(100 + RandomUtils.nextInt(100));
- System.out.println("id: " + lock.getId() + " is leader: " + lock.isOwner());
- } catch (InterruptedException e) {
- want.fail();
- } catch (KeeperException e) {
- want.fail();
- } finally {
- latch.countDown();
- try {
- lock.unlock();
- } catch (KeeperException e) {
- want.fail();
- }
- }
- }
- });
- }
- try {
- latch.await();
- } catch (InterruptedException e) {
- want.fail();
- }
- exeucotr.shutdown();
- }
@Test public void test_lock() { ExecutorService exeucotr = Executors.newCachedThreadPool(); final int count = 50; final CountDownLatch latch = new CountDownLatch(count); final DistributedReentrantLock lock = new DistributedReentrantLock(dir); //单个锁 for (int i = 0; i < count; i++) { exeucotr.submit(new Runnable() { public void run() { try { Thread.sleep(1000); lock.lock(); Thread.sleep(100 + RandomUtils.nextInt(100)); System.out.println("id: " + lock.getId() + " is leader: " + lock.isOwner()); } catch (InterruptedException e) { want.fail(); } catch (KeeperException e) { want.fail(); } finally { latch.countDown(); try { lock.unlock(); } catch (KeeperException e) { want.fail(); } } } }); } try { latch.await(); } catch (InterruptedException e) { want.fail(); } exeucotr.shutdown(); }
最后
其实再可以发散一下,实现一个分布式的read/write lock,也差不多就是这个理了。项目结束后,有时间可以写一下
大致思路:
- 竞争资源标示: read_自增id , write_自增id
- 首先按照自增id进行排序,如果队列的前边都是read标识,对应的所有read都获得锁。如果队列的前边是write标识,第一个write节点获取锁
- watcher监听: read监听距离自己最近的一个write节点的exist,write监听距离自己最近的一个节点(read或者write节点)
|
- 2011-09-30 17:00
- 浏览 6709
- 分类:
评论