博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【转载】zookeeper 分布式锁 实现
阅读量:4362 次
发布时间:2019-06-07

本文共 23114 字,大约阅读时间需要 77 分钟。

           

博客分类:
 

背景

 继续上一篇文章: ,项目中需要对分布式任务进行调度,那对应的分布式lock实现在所难免。

 

 这一周,在基于BooleanMutex的基础上,实现了zookeeper的分布式锁,用于控制多进程+多线程的lock控制

 

算法

可以预先看一下zookeeper的官方文档: 

 

lock操作过程:
  • 首先为一个lock场景,在zookeeper中指定对应的一个根节点,用于记录资源竞争的内容
  • 每个lock创建后,会lazy在zookeeper中创建一个node节点,表明对应的资源竞争标识。 (小技巧:node节点为EPHEMERAL_SEQUENTIAL,自增长的临时节点)
  • 进行lock操作时,获取对应lock根节点下的所有字节点,也即处于竞争中的资源标识
  • 按照Fair竞争的原则,按照对应的自增内容做排序,取出编号最小的一个节点做为lock的owner,判断自己的节点id是否就为owner id,如果是则返回,lock成功。
  • 如果自己非owner id,按照排序的结果找到序号比自己前一位的id,关注它锁释放的操作(也就是exist watcher),形成一个链式的触发过程。
unlock操作过程:
  • 将自己id对应的节点删除即可,对应的下一个排队的节点就可以收到Watcher事件,从而被唤醒得到锁后退出
 
其中的几个关键点:
  1. node节点选择为EPHEMERAL_SEQUENTIAL很重要。 * 自增长的特性,可以方便构建一个基于Fair特性的锁,前一个节点唤醒后一个节点,形成一个链式的触发过程。可以有效的避免"惊群效应"(一个锁释放,所有等待的线程都被唤醒),有针对性的唤醒,提升性能。 * 选择一个EPHEMERAL临时节点的特性。因为和zookeeper交互是一个网络操作,不可控因素过多,比如网络断了,上一个节点释放锁的操作会失败。临时节点是和对应的session挂接的,session一旦超时或者异常退出其节点就会消失,类似于ReentrantLock中等待队列Thread的被中断处理。
  2. 获取lock操作是一个阻塞的操作,而对应的Watcher是一个异步事件,所以需要使用信号进行通知,正好使用上一篇文章中提到的BooleanMutex,可以比较方便的解决锁重入的问题。(锁重入可以理解为多次读操作,锁释放为写抢占操作)
 
注意:
  • 使用EPHEMERAL会引出一个风险:在非正常情况下,网络延迟比较大会出现session timeout,zookeeper就会认为该client已关闭,从而销毁其id标示,竞争资源的下一个id就可以获取锁。这时可能会有两个process同时拿到锁在跑任务,所以设置好session timeout很重要。
  • 同样使用PERSISTENT同样会存在一个死锁的风险,进程异常退出后,对应的竞争资源id一直没有删除,下一个id一直无法获取到锁对象。
没有两全其美的做法,两者取其一,选择自己一个能接受的即可

 

代码

Java代码  
  1. public class DistributedLock {  
  2.   
  3.     private static final byte[]  data      = { 0x12, 0x34 };  
  4.     private ZooKeeperx           zookeeper = ZooKeeperClient.getInstance();  
  5.     private final String         root;                                     //根节点路径  
  6.     private String               id;  
  7.     private LockNode             idName;  
  8.     private String               ownerId;  
  9.     private String               lastChildId;  
  10.     private Throwable            other     = null;  
  11.     private KeeperException      exception = null;  
  12.     private InterruptedException interrupt = null;  
  13.   
  14.     public DistributedLock(String root) {  
  15.         this.root = root;  
  16.         ensureExists(root);  
  17.     }  
  18.   
  19.     /** 
  20.      * 尝试获取锁操作,阻塞式可被中断 
  21.      */  
  22.     public void lock() throws InterruptedException, KeeperException {  
  23.         // 可能初始化的时候就失败了  
  24.         if (exception != null) {  
  25.             throw exception;  
  26.         }  
  27.   
  28.         if (interrupt != null) {  
  29.             throw interrupt;  
  30.         }  
  31.   
  32.         if (other != null) {  
  33.             throw new NestableRuntimeException(other);  
  34.         }  
  35.   
  36.         if (isOwner()) {
    //锁重入  
  37.             return;  
  38.         }  
  39.   
  40.         BooleanMutex mutex = new BooleanMutex();  
  41.         acquireLock(mutex);  
  42.         // 避免zookeeper重启后导致watcher丢失,会出现死锁使用了超时进行重试  
  43.         try {  
  44.             mutex.get(DEFAULT_TIMEOUT_PERIOD, TimeUnit.MICROSECONDS);// 阻塞等待值为true  
  45.             // mutex.get();  
  46.         } catch (TimeoutException e) {  
  47.             if (!mutex.state()) {  
  48.                 lock();  
  49.             }  
  50.         }  
  51.   
  52.         if (exception != null) {  
  53.             throw exception;  
  54.         }  
  55.   
  56.         if (interrupt != null) {  
  57.             throw interrupt;  
  58.         }  
  59.   
  60.         if (other != null) {  
  61.             throw new NestableRuntimeException(other);  
  62.         }  
  63.     }  
  64.   
  65.     /** 
  66.      * 尝试获取锁对象, 不会阻塞 
  67.      *  
  68.      * @throws InterruptedException 
  69.      * @throws KeeperException 
  70.      */  
  71.     public boolean tryLock() throws KeeperException {  
  72.         // 可能初始化的时候就失败了  
  73.         if (exception != null) {  
  74.             throw exception;  
  75.         }  
  76.   
  77.         if (isOwner()) {
    //锁重入  
  78.             return true;  
  79.         }  
  80.   
  81.         acquireLock(null);  
  82.   
  83.         if (exception != null) {  
  84.             throw exception;  
  85.         }  
  86.   
  87.         if (interrupt != null) {  
  88.             Thread.currentThread().interrupt();  
  89.         }  
  90.   
  91.         if (other != null) {  
  92.             throw new NestableRuntimeException(other);  
  93.         }  
  94.   
  95.         return isOwner();  
  96.     }  
  97.   
  98.     /** 
  99.      * 释放锁对象 
  100.      */  
  101.     public void unlock() throws KeeperException {  
  102.         if (id != null) {  
  103.             try {  
  104.                 zookeeper.delete(root + "/" + id, -1);  
  105.             } catch (InterruptedException e) {  
  106.                 Thread.currentThread().interrupt();  
  107.             } catch (KeeperException.NoNodeException e) {  
  108.                 // do nothing  
  109.             } finally {  
  110.                 id = null;  
  111.             }  
  112.         } else {  
  113.             //do nothing  
  114.         }  
  115.     }  
  116.   
  117.     private void ensureExists(final String path) {  
  118.         try {  
  119.             Stat stat = zookeeper.exists(path, false);  
  120.             if (stat != null) {  
  121.                 return;  
  122.             }  
  123.   
  124.             zookeeper.create(path, data, CreateMode.PERSISTENT);  
  125.         } catch (KeeperException e) {  
  126.             exception = e;  
  127.         } catch (InterruptedException e) {  
  128.             Thread.currentThread().interrupt();  
  129.             interrupt = e;  
  130.         }  
  131.     }  
  132.   
  133.     /** 
  134.      * 返回锁对象对应的path 
  135.      */  
  136.     public String getRoot() {  
  137.         return root;  
  138.     }  
  139.   
  140.     /** 
  141.      * 判断当前是不是锁的owner 
  142.      */  
  143.     public boolean isOwner() {  
  144.         return id != null && ownerId != null && id.equals(ownerId);  
  145.     }  
  146.   
  147.     /** 
  148.      * 返回当前的节点id 
  149.      */  
  150.     public String getId() {  
  151.         return this.id;  
  152.     }  
  153.   
  154.     // ===================== helper method =============================  
  155.   
  156.     /** 
  157.      * 执行lock操作,允许传递watch变量控制是否需要阻塞lock操作 
  158.      */  
  159.     private Boolean acquireLock(final BooleanMutex mutex) {  
  160.         try {  
  161.             do {  
  162.                 if (id == null) {
    //构建当前lock的唯一标识  
  163.                     long sessionId = zookeeper.getDelegate().getSessionId();  
  164.                     String prefix = "x-" + sessionId + "-";  
  165.                     //如果第一次,则创建一个节点  
  166.                     String path = zookeeper.create(root + "/" + prefix, data,  
  167.                             CreateMode.EPHEMERAL_SEQUENTIAL);  
  168.                     int index = path.lastIndexOf("/");  
  169.                     id = StringUtils.substring(path, index + 1);  
  170.                     idName = new LockNode(id);  
  171.                 }  
  172.   
  173.                 if (id != null) {  
  174.                     List<String> names = zookeeper.getChildren(root, false);  
  175.                     if (names.isEmpty()) {  
  176.                         id = null;//异常情况,重新创建一个  
  177.                     } else {  
  178.                         //对节点进行排序  
  179.                         SortedSet<LockNode> sortedNames = new TreeSet<LockNode>();  
  180.                         for (String name : names) {  
  181.                             sortedNames.add(new LockNode(name));  
  182.                         }  
  183.   
  184.                         if (sortedNames.contains(idName) == false) {  
  185.                             id = null;//清空为null,重新创建一个  
  186.                             continue;  
  187.                         }  
  188.   
  189.                         //将第一个节点做为ownerId  
  190.                         ownerId = sortedNames.first().getName();  
  191.                         if (mutex != null && isOwner()) {  
  192.                             mutex.set(true);//直接更新状态,返回  
  193.                             return true;  
  194.                         } else if (mutex == null) {  
  195.                             return isOwner();  
  196.                         }  
  197.   
  198.                         SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName);  
  199.                         if (!lessThanMe.isEmpty()) {  
  200.                             //关注一下排队在自己之前的最近的一个节点  
  201.                             LockNode lastChildName = lessThanMe.last();  
  202.                             lastChildId = lastChildName.getName();  
  203.                             //异步watcher处理  
  204.                             zookeeper.exists(root + "/" + lastChildId, new AsyncWatcher() {  
  205.   
  206.                                 public void asyncProcess(WatchedEvent event) {  
  207.                                     acquireLock(mutex);  
  208.                                 }  
  209.   
  210.                             });  
  211.   
  212.                             if (stat == null) {  
  213.                                 acquireLock(mutex);// 如果节点不存在,需要自己重新触发一下,watcher不会被挂上去  
  214.                             }  
  215.                         } else {  
  216.                             if (isOwner()) {  
  217.                                 mutex.set(true);  
  218.                             } else {  
  219.                                 id = null;// 可能自己的节点已超时挂了,所以id和ownerId不相同  
  220.                             }  
  221.                         }  
  222.                     }  
  223.                 }  
  224.             } while (id == null);  
  225.         } catch (KeeperException e) {  
  226.             exception = e;  
  227.             if (mutex != null) {  
  228.                 mutex.set(true);  
  229.             }  
  230.         } catch (InterruptedException e) {  
  231.             interrupt = e;  
  232.             if (mutex != null) {  
  233.                 mutex.set(true);  
  234.             }  
  235.         } catch (Throwable e) {  
  236.             other = e;  
  237.             if (mutex != null) {  
  238.                 mutex.set(true);  
  239.             }  
  240.         }  
  241.   
  242.         if (isOwner() && mutex != null) {  
  243.             mutex.set(true);  
  244.         }  
  245.         return Boolean.FALSE;  
  246.     }  
  247. }  
public class DistributedLock {    private static final byte[]  data      = { 0x12, 0x34 };    private ZooKeeperx           zookeeper = ZooKeeperClient.getInstance();    private final String         root;                                     //根节点路径    private String               id;    private LockNode             idName;    private String               ownerId;    private String               lastChildId;    private Throwable            other     = null;    private KeeperException      exception = null;    private InterruptedException interrupt = null;    public DistributedLock(String root) {        this.root = root;        ensureExists(root);    }    /**     * 尝试获取锁操作,阻塞式可被中断     */    public void lock() throws InterruptedException, KeeperException {        // 可能初始化的时候就失败了        if (exception != null) {            throw exception;        }        if (interrupt != null) {            throw interrupt;        }        if (other != null) {            throw new NestableRuntimeException(other);        }        if (isOwner()) {//锁重入            return;        }        BooleanMutex mutex = new BooleanMutex();        acquireLock(mutex);        // 避免zookeeper重启后导致watcher丢失,会出现死锁使用了超时进行重试        try {            mutex.get(DEFAULT_TIMEOUT_PERIOD, TimeUnit.MICROSECONDS);// 阻塞等待值为true            // mutex.get();        } catch (TimeoutException e) {            if (!mutex.state()) {                lock();            }        }        if (exception != null) {            throw exception;        }        if (interrupt != null) {            throw interrupt;        }        if (other != null) {            throw new NestableRuntimeException(other);        }    }    /**     * 尝试获取锁对象, 不会阻塞     *      * @throws InterruptedException     * @throws KeeperException     */    public boolean tryLock() throws KeeperException {        // 可能初始化的时候就失败了        if (exception != null) {            throw exception;        }        if (isOwner()) {//锁重入            return true;        }        acquireLock(null);        if (exception != null) {            throw exception;        }        if (interrupt != null) {            Thread.currentThread().interrupt();        }        if (other != null) {            throw new NestableRuntimeException(other);        }        return isOwner();    }    /**     * 释放锁对象     */    public void unlock() throws KeeperException {        if (id != null) {            try {                zookeeper.delete(root + "/" + id, -1);            } catch (InterruptedException e) {                Thread.currentThread().interrupt();            } catch (KeeperException.NoNodeException e) {                // do nothing            } finally {                id = null;            }        } else {            //do nothing        }    }    private void ensureExists(final String path) {        try {            Stat stat = zookeeper.exists(path, false);            if (stat != null) {                return;            }            zookeeper.create(path, data, CreateMode.PERSISTENT);        } catch (KeeperException e) {            exception = e;        } catch (InterruptedException e) {            Thread.currentThread().interrupt();            interrupt = e;        }    }    /**     * 返回锁对象对应的path     */    public String getRoot() {        return root;    }    /**     * 判断当前是不是锁的owner     */    public boolean isOwner() {        return id != null && ownerId != null && id.equals(ownerId);    }    /**     * 返回当前的节点id     */    public String getId() {        return this.id;    }    // ===================== helper method =============================    /**     * 执行lock操作,允许传递watch变量控制是否需要阻塞lock操作     */    private Boolean acquireLock(final BooleanMutex mutex) {        try {            do {                if (id == null) {//构建当前lock的唯一标识                    long sessionId = zookeeper.getDelegate().getSessionId();                    String prefix = "x-" + sessionId + "-";                    //如果第一次,则创建一个节点                    String path = zookeeper.create(root + "/" + prefix, data,                            CreateMode.EPHEMERAL_SEQUENTIAL);                    int index = path.lastIndexOf("/");                    id = StringUtils.substring(path, index + 1);                    idName = new LockNode(id);                }                if (id != null) {                    List
names = zookeeper.getChildren(root, false); if (names.isEmpty()) { id = null;//异常情况,重新创建一个 } else { //对节点进行排序 SortedSet
sortedNames = new TreeSet
(); for (String name : names) { sortedNames.add(new LockNode(name)); } if (sortedNames.contains(idName) == false) { id = null;//清空为null,重新创建一个 continue; } //将第一个节点做为ownerId ownerId = sortedNames.first().getName(); if (mutex != null && isOwner()) { mutex.set(true);//直接更新状态,返回 return true; } else if (mutex == null) { return isOwner(); } SortedSet
lessThanMe = sortedNames.headSet(idName); if (!lessThanMe.isEmpty()) { //关注一下排队在自己之前的最近的一个节点 LockNode lastChildName = lessThanMe.last(); lastChildId = lastChildName.getName(); //异步watcher处理 zookeeper.exists(root + "/" + lastChildId, new AsyncWatcher() { public void asyncProcess(WatchedEvent event) { acquireLock(mutex); } }); if (stat == null) { acquireLock(mutex);// 如果节点不存在,需要自己重新触发一下,watcher不会被挂上去 } } else { if (isOwner()) { mutex.set(true); } else { id = null;// 可能自己的节点已超时挂了,所以id和ownerId不相同 } } } } } while (id == null); } catch (KeeperException e) { exception = e; if (mutex != null) { mutex.set(true); } } catch (InterruptedException e) { interrupt = e; if (mutex != null) { mutex.set(true); } } catch (Throwable e) { other = e; if (mutex != null) { mutex.set(true); } } if (isOwner() && mutex != null) { mutex.set(true); } return Boolean.FALSE; }}

相关说明:

 

 

测试代码:

 

Java代码  
  1. @Test  
  2.     public void test_lock() {  
  3.         ExecutorService exeucotr = Executors.newCachedThreadPool();  
  4.         final int count = 50;  
  5.         final CountDownLatch latch = new CountDownLatch(count);  
  6.         final DistributedLock[] nodes = new DistributedLock[count];  
  7.         for (int i = 0; i < count; i++) {  
  8.             final DistributedLock node = new DistributedLock(dir);  
  9.             nodes[i] = node;  
  10.             exeucotr.submit(new Runnable() {  
  11.   
  12.                 public void run() {  
  13.                     try {  
  14.                         Thread.sleep(1000);  
  15.                         node.lock(); //获取锁  
  16.                         Thread.sleep(100 + RandomUtils.nextInt(100));  
  17.   
  18.                         System.out.println("id: " + node.getId() + " is leader: " + node.isOwner());  
  19.                     } catch (InterruptedException e) {  
  20.                         want.fail();  
  21.                     } catch (KeeperException e) {  
  22.                         want.fail();  
  23.                     } finally {  
  24.                         latch.countDown();  
  25.                         try {  
  26.                             node.unlock();  
  27.                         } catch (KeeperException e) {  
  28.                             want.fail();  
  29.                         }  
  30.                     }  
  31.   
  32.                 }  
  33.             });  
  34.         }  
  35.   
  36.         try {  
  37.             latch.await();  
  38.         } catch (InterruptedException e) {  
  39.             want.fail();  
  40.         }  
  41.   
  42.         exeucotr.shutdown();  
  43.     }  
@Test    public void test_lock() {        ExecutorService exeucotr = Executors.newCachedThreadPool();        final int count = 50;        final CountDownLatch latch = new CountDownLatch(count);        final DistributedLock[] nodes = new DistributedLock[count];        for (int i = 0; i < count; i++) {            final DistributedLock node = new DistributedLock(dir);            nodes[i] = node;            exeucotr.submit(new Runnable() {                public void run() {                    try {                        Thread.sleep(1000);                        node.lock(); //获取锁                        Thread.sleep(100 + RandomUtils.nextInt(100));                        System.out.println("id: " + node.getId() + " is leader: " + node.isOwner());                    } catch (InterruptedException e) {                        want.fail();                    } catch (KeeperException e) {                        want.fail();                    } finally {                        latch.countDown();                        try {                            node.unlock();                        } catch (KeeperException e) {                            want.fail();                        }                    }                }            });        }        try {            latch.await();        } catch (InterruptedException e) {            want.fail();        }        exeucotr.shutdown();    }

 

升级版

 实现了一个分布式lock后,可以解决多进程之间的同步问题,但设计多线程+多进程的lock控制需求,单jvm中每个线程都和zookeeper进行网络交互成本就有点高了,所以基于DistributedLock,实现了一个分布式二层锁。

 

大致原理就是ReentrantLock 和 DistributedLock的一个结合。

 

 

  •  单jvm的多线程竞争时,首先需要先拿到第一层的ReentrantLock的锁
  • 拿到锁之后这个线程再去和其他JVM的线程竞争锁,最后拿到之后锁之后就开始处理任务。
锁的释放过程是一个反方向的操作,先释放DistributedLock,再释放ReentrantLock。 可以思考一下,如果先释放ReentrantLock,假如这个JVM ReentrantLock竞争度比较高,一直其他JVM的锁竞争容易被饿死。
 
代码:
Java代码  
  1. public class DistributedReentrantLock extends DistributedLock {  
  2.   
  3.     private static final String ID_FORMAT     = "Thread[{0}] Distributed[{1}]";  
  4.     private ReentrantLock       reentrantLock = new ReentrantLock();  
  5.   
  6.     public DistributedReentrantLock(String root) {  
  7.         super(root);  
  8.     }  
  9.   
  10.     public void lock() throws InterruptedException, KeeperException {  
  11.         reentrantLock.lock();//多线程竞争时,先拿到第一层锁  
  12.         super.lock();  
  13.     }  
  14.   
  15.     public boolean tryLock() throws KeeperException {  
  16.         //多线程竞争时,先拿到第一层锁  
  17.         return reentrantLock.tryLock() && super.tryLock();  
  18.     }  
  19.   
  20.     public void unlock() throws KeeperException {  
  21.         super.unlock();  
  22.         reentrantLock.unlock();//多线程竞争时,释放最外层锁  
  23.     }  
  24.   
  25.     @Override  
  26.     public String getId() {  
  27.         return MessageFormat.format(ID_FORMAT, Thread.currentThread().getId(), super.getId());  
  28.     }  
  29.   
  30.     @Override  
  31.     public boolean isOwner() {  
  32.         return reentrantLock.isHeldByCurrentThread() && super.isOwner();  
  33.     }  
  34.   
  35. }  
public class DistributedReentrantLock extends DistributedLock {    private static final String ID_FORMAT     = "Thread[{0}] Distributed[{1}]";    private ReentrantLock       reentrantLock = new ReentrantLock();    public DistributedReentrantLock(String root) {        super(root);    }    public void lock() throws InterruptedException, KeeperException {        reentrantLock.lock();//多线程竞争时,先拿到第一层锁        super.lock();    }    public boolean tryLock() throws KeeperException {        //多线程竞争时,先拿到第一层锁        return reentrantLock.tryLock() && super.tryLock();    }    public void unlock() throws KeeperException {        super.unlock();        reentrantLock.unlock();//多线程竞争时,释放最外层锁    }    @Override    public String getId() {        return MessageFormat.format(ID_FORMAT, Thread.currentThread().getId(), super.getId());    }    @Override    public boolean isOwner() {        return reentrantLock.isHeldByCurrentThread() && super.isOwner();    }}
 
测试代码:
Java代码  
  1. @Test  
  2.     public void test_lock() {  
  3.         ExecutorService exeucotr = Executors.newCachedThreadPool();  
  4.         final int count = 50;  
  5.         final CountDownLatch latch = new CountDownLatch(count);  
  6.   
  7.         final DistributedReentrantLock lock = new DistributedReentrantLock(dir); //单个锁  
  8.         for (int i = 0; i < count; i++) {  
  9.             exeucotr.submit(new Runnable() {  
  10.   
  11.                 public void run() {  
  12.                     try {  
  13.                         Thread.sleep(1000);  
  14.                         lock.lock();  
  15.                         Thread.sleep(100 + RandomUtils.nextInt(100));  
  16.   
  17.                         System.out.println("id: " + lock.getId() + " is leader: " + lock.isOwner());  
  18.                     } catch (InterruptedException e) {  
  19.                         want.fail();  
  20.                     } catch (KeeperException e) {  
  21.                         want.fail();  
  22.                     } finally {  
  23.                         latch.countDown();  
  24.                         try {  
  25.                             lock.unlock();  
  26.                         } catch (KeeperException e) {  
  27.                             want.fail();  
  28.                         }  
  29.                     }  
  30.   
  31.                 }  
  32.             });  
  33.         }  
  34.   
  35.         try {  
  36.             latch.await();  
  37.         } catch (InterruptedException e) {  
  38.             want.fail();  
  39.         }  
  40.   
  41.         exeucotr.shutdown();  
  42.     }  
@Test    public void test_lock() {        ExecutorService exeucotr = Executors.newCachedThreadPool();        final int count = 50;        final CountDownLatch latch = new CountDownLatch(count);        final DistributedReentrantLock lock = new DistributedReentrantLock(dir); //单个锁        for (int i = 0; i < count; i++) {            exeucotr.submit(new Runnable() {                public void run() {                    try {                        Thread.sleep(1000);                        lock.lock();                        Thread.sleep(100 + RandomUtils.nextInt(100));                        System.out.println("id: " + lock.getId() + " is leader: " + lock.isOwner());                    } catch (InterruptedException e) {                        want.fail();                    } catch (KeeperException e) {                        want.fail();                    } finally {                        latch.countDown();                        try {                            lock.unlock();                        } catch (KeeperException e) {                            want.fail();                        }                    }                }            });        }        try {            latch.await();        } catch (InterruptedException e) {            want.fail();        }        exeucotr.shutdown();    }

最后

其实再可以发散一下,实现一个分布式的read/write lock,也差不多就是这个理了。项目结束后,有时间可以写一下

 

大致思路:

 

  1. 竞争资源标示:  read_自增id , write_自增id
  2. 首先按照自增id进行排序,如果队列的前边都是read标识,对应的所有read都获得锁。如果队列的前边是write标识,第一个write节点获取锁
  3. watcher监听: read监听距离自己最近的一个write节点的exist,write监听距离自己最近的一个节点(read或者write节点)

 

 

  • 大小: 74.4 KB
    
分享到:                  
       |           
  • 2011-09-30 17:00       
  • 浏览 6709       
  •                   
  • 分类:            
  •     
评论
    
3 楼         2013-11-18            
accp_huangxin 写道
大哥,能提供一下LockNode这个对象结构吗
Java代码  
  1. public class LockNode implements Comparable<LockNode> {  
  2.   
  3.     private final String name;  
  4.     private String       prefix;  
  5.     private int          sequence = -1;  
  6.   
  7.     public LockNode(String name){  
  8.         Assert.notNull(name, "id cannot be null");  
  9.         this.name = name;  
  10.         this.prefix = name;  
  11.         int idx = name.lastIndexOf('-');  
  12.         if (idx >= 0) {  
  13.             this.prefix = name.substring(0, idx);  
  14.             try {  
  15.                 this.sequence = Integer.parseInt(name.substring(idx + 1));  
  16.             } catch (Exception e) {  
  17.                 // ignore  
  18.             }  
  19.         }  
  20.     }  
  21.   
  22.     public int compareTo(LockNode that) {  
  23.         int s1 = this.sequence;  
  24.         int s2 = that.sequence;  
  25.         if (s1 == -1 && s2 == -1) {  
  26.             return this.name.compareTo(that.name);  
  27.         }  
  28.   
  29.         if (s1 == -1) {  
  30.             return -1;  
  31.         } else if (s2 == -1) {  
  32.             return 1;  
  33.         } else {  
  34.             return s1 - s2;  
  35.         }  
  36.     }  
  37.   
  38.     public String getName() {  
  39.         return name;  
  40.     }  
  41.   
  42.     public int getSequence() {  
  43.         return sequence;  
  44.     }  
  45.   
  46.     public String getPrefix() {  
  47.         return prefix;  
  48.     }  
  49.   
  50.     public String toString() {  
  51.         return name.toString();  
  52.     }  
  53.   
  54.     // ==================== hashcode & equals方法=======================  
  55.   
  56.     @Override  
  57.     public int hashCode() {  
  58.         final int prime = 31;  
  59.         int result = 1;  
  60.         result = prime * result + ((name == null) ? 0 : name.hashCode());  
  61.         return result;  
  62.     }  
  63.   
  64.     @Override  
  65.     public boolean equals(Object obj) {  
  66.         if (this == obj) {  
  67.             return true;  
  68.         }  
  69.         if (obj == null) {  
  70.             return false;  
  71.         }  
  72.         if (getClass() != obj.getClass()) {  
  73.             return false;  
  74.         }  
  75.         LockNode other = (LockNode) obj;  
  76.         if (name == null) {  
  77.             if (other.name != null) {  
  78.                 return false;  
  79.             }  
  80.         } else if (!name.equals(other.name)) {  
  81.             return false;  
  82.         }  
  83.         return true;  
  84.     }  
  85.   
  86. }  
public class LockNode implements Comparable
{ private final String name; private String prefix; private int sequence = -1; public LockNode(String name){ Assert.notNull(name, "id cannot be null"); this.name = name; this.prefix = name; int idx = name.lastIndexOf('-'); if (idx >= 0) { this.prefix = name.substring(0, idx); try { this.sequence = Integer.parseInt(name.substring(idx + 1)); } catch (Exception e) { // ignore } } } public int compareTo(LockNode that) { int s1 = this.sequence; int s2 = that.sequence; if (s1 == -1 && s2 == -1) { return this.name.compareTo(that.name); } if (s1 == -1) { return -1; } else if (s2 == -1) { return 1; } else { return s1 - s2; } } public String getName() { return name; } public int getSequence() { return sequence; } public String getPrefix() { return prefix; } public String toString() { return name.toString(); } // ==================== hashcode & equals方法======================= @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((name == null) ? 0 : name.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) { return true; } if (obj == null) { return false; } if (getClass() != obj.getClass()) { return false; } LockNode other = (LockNode) obj; if (name == null) { if (other.name != null) { return false; } } else if (!name.equals(other.name)) { return false; } return true; }}
   

 

        
        
 

 

     
            

转载于:https://www.cnblogs.com/hansongjiang/p/3918152.html

你可能感兴趣的文章
1. 自动化运维系列之Cobbler自动装机
查看>>
ASP.NET MVC Model绑定(二)
查看>>
一步一步写算法(之hash表)
查看>>
漫谈并发编程(一) - 并发简单介绍
查看>>
JDBC连接MySQL数据库及演示样例
查看>>
Beta 冲刺(1/7)
查看>>
修改 Vultr 登录密码
查看>>
CSS学习
查看>>
Centos 安装lnmp完整版
查看>>
【转】Eclipse和PyDev搭建完美Python开发环境(Ubuntu篇)
查看>>
redis安装和配置
查看>>
2016424王启元 Exp5 msf基础应用
查看>>
Differences between page and segment
查看>>
Jdk与Tomcat配置与安装
查看>>
关于一个Java web与JFrame的深度结合
查看>>
VB连数据库conn.open的参数
查看>>
《信息安全系统设计基础》实验三
查看>>
SpringBoot Docs
查看>>
解决sublime text 2总是在新窗口中打开文件(标签中打开)
查看>>
VUE AntDesign DatePicker设置默认显示当前日期
查看>>