UDN-企业互联网技术人气社区

板块导航

浏览  : 1180
回复  : 0

[讨论交流] Java实现基于Redis的分布式锁

[复制链接]
白青青的头像 楼主
发表于 2016-9-10 10:38:14 | 显示全部楼层 |阅读模式
  单JVM内同步好办, 直接用JDK提供的锁就可以了,但是跨进程同步靠这个肯定是不可能的,这种情况下肯定要借助第三方,我这里实现用Redis,当然还有很多其他的实现方式。其实基于Redis实现的原理还算比较简单的,在看代码之前建议大家先去这里看看原理,我就不翻译了,免得变味了,看懂了之后看代码应该就容易理解了。

  我这里不实现JDK的java.util.concurrent.locks.Lock接口,而是自定义一个,因为JDK的有个newCondition方法我这里暂时没实现。这个Lock提供了5个lock方法的变体,可以自行选择使用哪一个来获取锁,我的想法是

  最好用带超时返回的那几个方法,因为不这样的话,假如redis挂了,线程永远都在那死循环了(关于这里,应该还可以进一步优化,如果redis挂了,Jedis的操作肯定会抛异常之类的,可以定义个机制让redis挂了的时候通知使用这个lock的用户,或者说是线程)
  1. package cc.lixiaohui.lock;

  2. import java.util.concurrent.TimeUnit;

  3. public interface Lock {

  4.         /**
  5.          * 阻塞性的获取锁, 不响应中断
  6.          */
  7.         void lock;
  8.         
  9.         /**
  10.          * 阻塞性的获取锁, 响应中断
  11.          *
  12.          * @throws InterruptedException
  13.          */
  14.         void lockInterruptibly throws InterruptedException;
  15.         
  16.         /**
  17.          * 尝试获取锁, 获取不到立即返回, 不阻塞
  18.          */
  19.         boolean tryLock;
  20.         
  21.         /**
  22.          * 超时自动返回的阻塞性的获取锁, 不响应中断
  23.          *
  24.          * @param time
  25.          * @param unit
  26.          * @return {@code true} 若成功获取到锁, {@code false} 若在指定时间内未���取到锁
  27.      *
  28.          */
  29.         boolean tryLock(long time, TimeUnit unit);
  30.         
  31.         /**
  32.          * 超时自动返回的阻塞性的获取锁, 响应中断
  33.          *
  34.          * @param time
  35.          * @param unit
  36.          * @return {@code true} 若成功获取到锁, {@code false} 若在指定时间内未获取到锁
  37.          * @throws InterruptedException 在尝试获取锁的当前线程被中断
  38.          */
  39.         boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException;
  40.         
  41.         /**
  42.          * 释放锁
  43.          */
  44.         void unlock;
  45.         
  46. }
复制代码

  看其抽象实现:
  1. package cc.lixiaohui.lock;

  2. import java.util.concurrent.TimeUnit;

  3. /**
  4. * 锁的骨架实现, 真正的获取锁的步骤由子类去实现.
  5. *
  6. * @author lixiaohui
  7. *
  8. */
  9. public abstract class AbstractLock implements Lock {

  10.         /**
  11.          * <pre>
  12.          * 这里需不需要保证可见性值得讨论, 因为是分布式的锁,
  13.          * 1.同一个jvm的多个线程使用不同的锁对象其实也是可以的, 这种情况下不需要保证可见性
  14.          * 2.同一个jvm的多个线程使用同一个锁对象, 那可见性就必须要保证了.
  15.          * </pre>
  16.          */
  17.         protected volatile boolean locked;

  18.         /**
  19.          * 当前jvm内持有该锁的线程(if have one)
  20.          */
  21.         private Thread exclusiveOwnerThread;

  22.         public void lock {
  23.                 try {
  24.                         lock(false, 0, null, false);
  25.                 } catch (InterruptedException e) {
  26.                         // TODO ignore
  27.                 }
  28.         }

  29.         public void lockInterruptibly throws InterruptedException {
  30.                 lock(false, 0, null, true);
  31.         }

  32.         public boolean tryLock(long time, TimeUnit unit) {
  33.                 try {
  34.                         return lock(true, time, unit, false);
  35.                 } catch (InterruptedException e) {
  36.                         // TODO ignore
  37.                 }
  38.                 return false;
  39.         }

  40.         public boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException {
  41.                 return lock(true, time, unit, true);
  42.         }

  43.         public void unlock {
  44.                 // TODO 检查当前线程是否持有锁
  45.                 if (Thread.currentThread != getExclusiveOwnerThread) {
  46.                         throw new IllegalMonitorStateException("current thread does not hold the lock");
  47.                 }
  48.                
  49.                 unlock0;
  50.                 setExclusiveOwnerThread(null);
  51.         }

  52.         protected void setExclusiveOwnerThread(Thread thread) {
  53.                 exclusiveOwnerThread = thread;
  54.         }

  55.         protected final Thread getExclusiveOwnerThread {
  56.                 return exclusiveOwnerThread;
  57.         }

  58.         protected abstract void unlock0;
  59.         
  60.         /**
  61.          * 阻塞式获取锁的实现
  62.          *
  63.          * @param useTimeout
  64.          * @param time
  65.          * @param unit
  66.          * @param interrupt 是否响应中断
  67.          * @return
  68.          * @throws InterruptedException
  69.          */
  70.         protected abstract boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException;

  71. }
复制代码

  基于Redis的最终实现,关键的获取锁,释放锁的代码在这个类的lock方法和unlock0方法里,大家可以只看这两个方法然后完全自己写一个:
  1. package cc.lixiaohui.lock;

  2. import java.util.concurrent.TimeUnit;

  3. import redis.clients.jedis.Jedis;

  4. /**
  5. * <pre>
  6. * 基于Redis的SETNX操作实现的分布式锁
  7. *
  8. * 获取锁时最好用lock(long time, TimeUnit unit), 以免网路问题而导致线程一直阻塞
  9. *
  10. * <a href="http://redis.io/commands/setnx">SETNC操作参考资料</a>
  11. * </pre>
  12. *
  13. * @author lixiaohui
  14. *
  15. */
  16. public class RedisBasedDistributedLock extends AbstractLock {
  17.         
  18.         private Jedis jedis;
  19.         
  20.         // 锁的名字
  21.         protected String lockKey;
  22.         
  23.         // 锁的有效时长(毫秒)
  24.         protected long lockExpires;
  25.         
  26.         public RedisBasedDistributedLock(Jedis jedis, String lockKey, long lockExpires) {
  27.                 this.jedis = jedis;
  28.                 this.lockKey = lockKey;
  29.                 this.lockExpires = lockExpires;
  30.         }

  31.         // 阻塞式获取锁的实现
  32.         protected boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException{
  33.                 if (interrupt) {
  34.                         checkInterruption;
  35.                 }
  36.                
  37.                 long start = System.currentTimeMillis;
  38.                 long timeout = unit.toMillis(time); // if !useTimeout, then it's useless
  39.                
  40.                 while (useTimeout ? isTimeout(start, timeout) : true) {
  41.                         if (interrupt) {
  42.                                 checkInterruption;
  43.                         }
  44.                         
  45.                         long lockExpireTime = System.currentTimeMillis + lockExpires + 1;//锁超时时间
  46.                         String stringOfLockExpireTime = String.valueOf(lockExpireTime);
  47.                         
  48.                         if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 获取到锁
  49.                                 // TODO 成功获取到锁, 设置相关标识
  50.                                 locked = true;
  51.                                 setExclusiveOwnerThread(Thread.currentThread);
  52.                                 return true;
  53.                         }
  54.                         
  55.                         String value = jedis.get(lockKey);
  56.                         if (value != null && isTimeExpired(value)) { // lock is expired
  57.                                 // 假设多个线程(非单jvm)同时走到这里
  58.                                 String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic
  59.                                 // 但是走到这里时每个线程拿到的oldValue肯定不可能一样(因为getset是原子性的)
  60.                                 // 加入拿到的oldValue依然是expired的,那么就说明拿到锁了
  61.                                 if (oldValue != null && isTimeExpired(oldValue)) {
  62.                                         // TODO 成功获取到锁, 设置相关标识
  63.                                         locked = true;
  64.                                         setExclusiveOwnerThread(Thread.currentThread);
  65.                                         return true;
  66.                                 }
  67.                         } else {
  68.                                 // TODO lock is not expired, enter next loop retrying
  69.                         }
  70.                 }
  71.                 return false;
  72.         }
  73.         
  74.         public boolean tryLock {
  75.                 long lockExpireTime = System.currentTimeMillis + lockExpires + 1;//锁超时时间
  76.                 String stringOfLockExpireTime = String.valueOf(lockExpireTime);
  77.                
  78.                 if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 获取到锁
  79.                         // TODO 成功获取到锁, 设置相关标识
  80.                         locked = true;
  81.                         setExclusiveOwnerThread(Thread.currentThread);
  82.                         return true;
  83.                 }
  84.                
  85.                 String value = jedis.get(lockKey);
  86.                 if (value != null && isTimeExpired(value)) { // lock is expired
  87.                         // 假设多个线程(非单jvm)同时走到这里
  88.                         String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic
  89.                         // 但是走到这里时每个线程拿到的oldValue肯定不可能一样(因为getset是原子性的)
  90.                         // 假如拿到的oldValue依然是expired的,那么就说明拿到锁了
  91.                         if (oldValue != null && isTimeExpired(oldValue)) {
  92.                                 // TODO 成功获取到锁, 设置相关标识
  93.                                 locked = true;
  94.                                 setExclusiveOwnerThread(Thread.currentThread);
  95.                                 return true;
  96.                         }
  97.                 } else {
  98.                         // TODO lock is not expired, enter next loop retrying
  99.                 }
  100.                
  101.                 return false;
  102.         }
  103.         
  104.         /**
  105.          * Queries if this lock is held by any thread.
  106.          *
  107.          * @return {@code true} if any thread holds this lock and
  108.      *         {@code false} otherwise
  109.          */
  110.         public boolean isLocked {
  111.                 if (locked) {
  112.                         return true;
  113.                 } else {
  114.                         String value = jedis.get(lockKey);
  115.                         // TODO 这里其实是有问题的, 想:当get方法返回value后, 假设这个value已经是过期的了,
  116.                         // 而就在这瞬间, 另一个节点set了value, 这时锁是被别的线程(节点持有), 而接下来的判断
  117.                         // 是检测不出这种情况的.不过这个问题应该不会导致其它的问题出现, 因为这个方法的目的本来就
  118.                         // 不是同步控制, 它只是一种锁状态的报告.
  119.                         return !isTimeExpired(value);
  120.                 }
  121.         }

  122.         @Override
  123.         protected void unlock0 {
  124.                 // TODO 判断锁是否过期
  125.                 String value = jedis.get(lockKey);
  126.                 if (!isTimeExpired(value)) {
  127.                         doUnlock;
  128.                 }
  129.         }

  130.         private void checkInterruption throws InterruptedException {
  131.                 if(Thread.currentThread.isInterrupted) {
  132.                         throw new InterruptedException;
  133.                 }
  134.         }
  135.         
  136.         private boolean isTimeExpired(String value) {
  137.                 return Long.parseLong(value) < System.currentTimeMillis;
  138.         }
  139.         
  140.         private boolean isTimeout(long start, long timeout) {
  141.                 return start + timeout > System.currentTimeMillis;
  142.         }
  143.         
  144.         private void doUnlock {
  145.                 jedis.del(lockKey);
  146.         }

  147. }
复制代码

  如果将来还换一种实现方式(比如zookeeper之类的),到时直接继承AbstractLock并实现lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt), unlock0方法即可(所谓抽象嘛)

  测试

  模拟全局ID增长器,设计一个IDGenerator类,该类负责生成全局递增ID,其代码如下:
  1. package cc.lixiaohui.lock;

  2. import java.math.BigInteger;
  3. import java.util.concurrent.TimeUnit;

  4. /**
  5. * 模拟ID生成
  6. * @author lixiaohui
  7. *
  8. */
  9. public class IDGenerator {

  10.         private static BigInteger id = BigInteger.valueOf(0);

  11.         private final Lock lock;

  12.         private static final BigInteger INCREMENT = BigInteger.valueOf(1);

  13.         public IDGenerator(Lock lock) {
  14.                 this.lock = lock;
  15.         }
  16.         
  17.         public String getAndIncrement {
  18.                 if (lock.tryLock(3, TimeUnit.SECONDS)) {
  19.                         try {
  20.                                 // TODO 这里获取到锁, 访问临界区资源
  21.                                 return getAndIncrement0;
  22.                         } finally {
  23.                                 lock.unlock;
  24.                         }
  25.                 }
  26.                 return null;
  27.                 //return getAndIncrement0;
  28.         }

  29.         private String getAndIncrement0 {
  30.                 String s = id.toString;
  31.                 id = id.add(INCREMENT);
  32.                 return s;
  33.         }
  34. }
复制代码

  测试主逻辑:同一个JVM内开两个线程死循环地(循环之间无间隔,有的话测试就没意义了)获取ID(我这里并不是死循环而是跑20s),获取到ID存到同一个Set里面,在存之前先检查该ID在set中是否存在,如果已存在,则让两个线程都停止。如果程序能正常跑完20s,那么说明这个分布式锁还算可以满足要求,如此测试的效果应该和不同JVM(也就是真正的分布式环境中)测试的效果是一样的,下面是测试类的代码:
  1. package cc.lixiaohui.DistributedLock.DistributedLock;

  2. import java.util.HashSet;
  3. import java.util.Set;

  4. import org.junit.Test;

  5. import redis.clients.jedis.Jedis;
  6. import cc.lixiaohui.lock.IDGenerator;
  7. import cc.lixiaohui.lock.Lock;
  8. import cc.lixiaohui.lock.RedisBasedDistributedLock;

  9. public class IDGeneratorTest {
  10.         
  11.         private static Set<String> generatedIds = new HashSet<String>;
  12.         
  13.         private static final String LOCK_KEY = "lock.lock";
  14.         private static final long LOCK_EXPIRE = 5 * 1000;
  15.         
  16.         @Test
  17.         public void test throws InterruptedException {
  18.                 Jedis jedis1 = new Jedis("localhost", 6379);
  19.                 Lock lock1 = new RedisBasedDistributedLock(jedis1, LOCK_KEY, LOCK_EXPIRE);
  20.                 IDGenerator g1 = new IDGenerator(lock1);
  21.                 IDConsumeMission consume1 = new IDConsumeMission(g1, "consume1");
  22.                
  23.                 Jedis jedis2 = new Jedis("localhost", 6379);
  24.                 Lock lock2 = new RedisBasedDistributedLock(jedis2, LOCK_KEY, LOCK_EXPIRE);
  25.                 IDGenerator g2 = new IDGenerator(lock2);
  26.                 IDConsumeMission consume2 = new IDConsumeMission(g2, "consume2");
  27.                
  28.                 Thread t1 = new Thread(consume1);
  29.                 Thread t2 = new Thread(consume2);
  30.                 t1.start;
  31.                 t2.start;
  32.                
  33.                 Thread.sleep(20 * 1000); //让两个线程跑20秒
  34.                
  35.                 IDConsumeMission.stop;
  36.                
  37.                 t1.join;
  38.                 t2.join;
  39.         }
  40.         
  41.         static String time {
  42.                 return String.valueOf(System.currentTimeMillis / 1000);
  43.         }
  44.         
  45.         static class IDConsumeMission implements Runnable {

  46.                 private IDGenerator idGenerator;
  47.                
  48.                 private String name;
  49.                
  50.                 private static volatile boolean stop;
  51.                
  52.                 public IDConsumeMission(IDGenerator idGenerator, String name) {
  53.                         this.idGenerator = idGenerator;
  54.                         this.name = name;
  55.                 }
  56.                
  57.                 public static void stop {
  58.                         stop = true;
  59.                 }
  60.                
  61.                 public void run {
  62.                         System.out.println(time + ": consume " + name + " start ");
  63.                         while (!stop) {
  64.                                 String id = idGenerator.getAndIncrement;
  65.                                 if(generatedIds.contains(id)) {
  66.                                         System.out.println(time + ": duplicate id generated, id = " + id);
  67.                                         stop = true;
  68.                                         continue;
  69.                                 }
  70.                                 
  71.                                 generatedIds.add(id);
  72.                                 System.out.println(time + ": consume " + name + " add id = " + id);
  73.                         }
  74.                         System.out.println(time + ": consume " + name + " done ");
  75.                 }
  76.                
  77.         }
  78.         
  79. }
复制代码

  说明一点,我这里停止两个线程的方式并不是很好,我是为了方便才这么做的,因为只是测试,最好不要这么做。

  测试结果

  跑20s打印的东西太多,前面打印的被clear了,只有差不多跑完的时候才有,下面截图。说明了这个锁能正常工作:
1.jpg

  当IDGererator没有加锁(即IDGererator的getAndIncrement方法内部获取id时不上锁)时,测试是不通过的,非常大的概率中途就会停止,下面是不加锁时的测试结果:

  这个1秒都不到:
1.jpg

  这个也1秒都不到:
1.jpg


原文作者:屌丝程序员的自我对白 来源:http://www.58maisui.com/

相关帖子

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关于我们
联系我们
  • 电话:010-86393388
  • 邮件:udn@yonyou.com
  • 地址:北京市海淀区北清路68号
移动客户端下载
关注我们
  • 微信公众号:yonyouudn
  • 扫描右侧二维码关注我们
  • 专注企业互联网的技术社区
版权所有:用友网络科技股份有限公司82041 京ICP备05007539号-11 京公网网备安1101080209224 Powered by Discuz!
快速回复 返回列表 返回顶部