1、锁的分析互斥性
也就是说,在任意时刻,实n锁只能有一个客户端能获取到锁,机制不能同时有两个或多个客户端获取到锁。源码
简单来说,分析就比如上厕所,实n锁一个厕所只有一个坑位,机制只能一个人上,源码不能同时两个人或多个人上。
2、锁的同一性
也就是说,锁只能被持有该锁的客户端进行删除(释放锁),不能由其他客户端删除。
简单俩说,就是谁加的锁,就只能谁来解锁。也就是解铃还须系铃人。
3、锁的可重入性
也就是说,持有某个锁的客户端,可以继续对该锁进行加锁,实现锁的续租。
简单来说,就是你上厕所的按时间收费的,时间快到了会按照时间给你续租,而会给你价钱。
而Redisson则会增大的你的续租次数,也就是可重入次数。但绝不收费,因为Redis是开源的嘛。
4、锁的容错性
锁超过了最大续租时间后,会自动释放锁,其他客户端会继续获得该锁,从而防止死锁的发生。
简单来说,比如你上个厕所上了五小时,厕管员觉得不对劲,就来测试,发现你悄悄逃票了,此时测试会自动变成解锁状态,其他人就可以去上了,只是厕管员血亏5块大洋。
先回顾一下Redisson加解锁代码如何写的
public TestEntity getById2(Long id){ RLock lock = redissonClient.getLock("demo2_lock"); lock.lock(20, TimeUnit.SECONDS); index++; log.info("current index is : { }", index); TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10); log.info("模拟查询数据库:{ }", testEntity); lock.unlock(); return testEntity;}
@Overridepublic RLock getLock(String name) { return new RedissonLock(commandExecutor, name);}
其实就是创建一个RedissonLock对象, 所以加锁的逻辑就在RedissonLock.lock()中,解锁的逻辑就在RedissonLock.unlock()。
// RedissonLock.lock()的方法体public void lock(long leaseTime, TimeUnit unit) { try { // 调用了lock的重载方法 lock(leaseTime, unit, false); } catch (InterruptedException e) { throw new IllegalStateException(); }}
关注lock的重载方法
// leaseTime表示最大续时间,unit表示续约时间单位,interruptibly表示是否可以中断private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { // 获取当前线程的线程ID long threadId = Thread.currentThread().getId(); // 尝试获取锁,结果为null表示此时没有客户端占用锁,绝不矫情,直接拿到锁就返回。 // 结果ttl>0的话,表示此时已经有了其他不识好歹的客户端暂用了锁,那么就只能绝望的等待了 Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } // 等待时订阅一个渠道,如果锁被其他客户端释放了,会通过发布订阅模式在publish上发一个消息,表示锁已经释放了 CompletableFuture<RedissonLockEntry> future = subscribe(threadId); pubSub.timeout(future); RedissonLockEntry entry; if (interruptibly) { entry = commandExecutor.getInterrupted(future); } else { entry = commandExecutor.get(future); } try { // 我干等这不是办法,我还是要不断去尝试看能不能获取锁 while (true) { ttl = tryAcquire(-1, leaseTime, unit, threadId); // 如果TTL为空了,表示获取到了锁,那还等什么,长驱直入就是。 if (ttl == null) { // 结束循环等待 break; } // 如果ttl还是大于0的,表示其他客户端真的是过于不识好歹,还不肯释放锁。但好歹还是说了它还要持有错多久。 if (ttl >= 0) { try { // 既然如此,那么我就等待你的时间到达吧,除非我突然有啥事被中断了,否则我就等到你过期 entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { // 如果传入了中断标识,直接抛出异常,中断了,干别的事情去 if (interruptibly) { throw e; } // 否则还是老老实实的继续等待时间到来 entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } // 锁过期时间小于0, 表示那个杀千刀的客户端居然没有设置超时时间,它包场了,这可咋整。 else { // 如果不被中断,那么我也只有无期限的等待下去了,我不希望这个期限是一万年 if (interruptibly) { entry.getLatch().acquire(); } else { entry.getLatch().acquireUninterruptibly(); } } } } finally { // 最后,不管如何,我无论如何都要去取消订阅这个publish的消息,因为这会浪费我的精力,这已经是我最后的坚持了。 // 其实是释放资源 unsubscribe(entry, threadId); }// get(lockAsync(leaseTime, unit));}
关注tryAcquire加锁方法
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));}
该方法调用了tryAcquireAsync来实现的,所以我们关注tryAcquireAsync方法,继续跟进。
关注tryAcquireAsync加锁方法
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; // 首先判断租约时间是否大于0 if (leaseTime > 0) { // 大于零,调用tryLockInnerAsync获取锁 ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { // 否则,使用默认的租约时间 追溯下去发现private long lockWatchdogTimeout = 30 * 1000; 也就是30s的租约时间 ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } // CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> { // lock acquired // 结果为空,如果leaseTime大于哦,更新internalLockLeaseTime为指定的超时时间,并且不会启动看门狗(watch dog) if (ttlRemaining == null) { if (leaseTime > 0) { internalLockLeaseTime = unit.toMillis(leaseTime); } else { // 使用定时任务,自动续约(使用看门狗(watch dog)) scheduleExpirationRenewal(threadId); } } return ttlRemaining; }); return new CompletableFutureWrapper<>(f);}
可以看到,加锁最终会调用tryLockInnerAsync进行加锁,而续约会使用scheduleExpirationRenewal进行续约。
关注tryLockInnerAsync实现真正的加锁逻辑
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));}
这里执行了一段lua脚本(整个lua脚本保障原子性),我们将脚本内容复制出来,详细解释一下。
-- KEYS[1] 加锁的对象(也就是我们传入的的锁名称)-- ARGV[1] 表示锁的过期时间-- ARGV[2]:UUID+当前线程id-- 如果锁不存在。 == 0表示不存在 == 1表示存在if (redis.call('exists', KEYS[1]) == 0) then -- 对我自己的锁执行一个incrby(自增,表示锁的可重入次数)操作 redis.call('hincrby', KEYS[1], ARGV[2], 1); -- 对key设置一个过期时间(过期时间就是保证锁的容错性) redis.call('pexpire', KEYS[1], ARGV[1]); -- 返回nil, 相当于null, 表示获取锁成功 return nil;end ;-- 继续判断锁名成+UUID+当前线程id是否存在,其实就是判断我自己有没有已经拿到锁(保证锁的可重入性)if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then -- 自己已经持有锁,执行一个incrby(自增,表示锁的可重入次数)操作 redis.call('hincrby', KEYS[1], ARGV[2], 1); -- 重新设置过期时间 redis.call('pexpire', KEYS[1], ARGV[1]); -- 返回nil, 相当于null, 表示获取锁成功 return nil;end ;-- 都不是,表示已经有其他客户端获取到了锁,此时返回key的过期时间,也就是别人释放锁的时间(但其他客户端可能出现续约,存在会等待更久的可能)return redis.call('pttl', KEYS[1]);
整个lua脚本保障原子性,从而只会有一个客户端能够获取到锁,这样就保证了锁的互斥性。
打一个断点看获取到的锁信息
hash表中的第一个值表示UUID+线程ID,这二个值表示锁的重入次数,如果锁被多次获取,那么这个值就是大于1。
关注scheduleExpirationRenewal实现自动续约
protected void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); // 不为空表示已经开启了续约操作 if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { // 如果没有开启续约操作 entry.addThreadId(threadId); try { // 自动续约 renewExpiration(); } finally { if (Thread.currentThread().isInterrupted()) { cancelExpirationRenewal(threadId); } } }}
关注renewExpiration()方法
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } // 创建一个定时任务去实现自动续约 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { // 获取当前锁的ExpirationEntry 对象。 ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } // 获取第一个线程ID Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } // 锁续期 CompletionStage<Boolean> future = renewExpirationAsync(threadId); future.whenComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } // 续约成功,递归自己无限续约下去 if (res) { // reschedule itself renewExpiration(); } else { // 续约失败,表示锁已释放,取消续约任务 cancelExpirationRenewal(null); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // internalLockLeaseTime / 3表示每隔锁时间的三分之一,去续约一次 ee.setTimeout(task);}
关注renewExpirationAsync方法
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId));}
我们发现,又是一段lua脚本,还是复制出来,格式化后详细解释下代码。
-- KEYS[1] 加锁的对象(也就是我们传入的的锁名称)-- ARGV[1] 表示锁的过期时间-- ARGV[2]:UUID+当前线程id-- 使用hexists判断锁是不是自己持有的, == 1表示是自己持有,== 0 表示被其他客户端持有if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then -- 重新设置过期时间 redis.call('pexpire', KEYS[1], ARGV[1]); -- 返回1 表示续约成功 return 1;end ;-- 返回0 表示续约失败,也意味着锁已经被释放或者被其他客户端获取了return 0;
所以续约的逻辑就是,启动一个定时任务,每隔续约时间的三分之一次就执行一次。尝试去续约,续约成功则会一直递归续约下去。续约失败表示锁已被释放,则停止续约任务。
而续约的操作就是,判断是否是自己持有锁,是的话就重新设置过期时间,并且返回1表示续约成功,否则返回0表示续约失败。
@Overridepublic void unlock() { try { // 其实就是调用了unlockAsync进行解锁 get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException) e.getCause(); } else { throw e; } } // Future<Void> future = unlockAsync();// future.awaitUninterruptibly();// if (future.isSuccess()) { // return;// }// if (future.cause() instanceof IllegalMonitorStateException) { // throw (IllegalMonitorStateException)future.cause();// }// throw commandExecutor.convertException(future);}
我们可以看到,会使用unlockAsync方法进行解锁,并且在这里传入了当前的线程ID。
关注unlockAsync方法
@Overridepublic RFuture<Void> unlockAsync(long threadId) { // 调用unlockInnerAsync实现异步解锁 RFuture<Boolean> future = unlockInnerAsync(threadId); // 释放之后再处理一些事情 CompletionStage<Void> f = future.handle((opStatus, e) -> { // 取消(停止)续约任务,这里也会停止watch dog cancelExpirationRenewal(threadId); if (e != null) { throw new CompletionException(e); } if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); throw new CompletionException(cause); } return null; }); return new CompletableFutureWrapper<>(f);}
关注解锁的核心逻辑unlockInnerAsync方法
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));}
可以看到,其实又是一段lua脚本,继续复制出来分析一下。
-- KEYS[1] 加锁的对象(也就是我们传入的的锁名称)-- KEYS[2] 监听该锁的频道 也就是publish要发送锁被释放的频道,用于在锁释放时通知其他客户端可以重新获取锁了-- ARGV[1]:解锁消息-- ARGV[2] 表示锁的过期时间-- ARGV[3]:UUID+当前线程id-- 先判断自己的锁是不是已经释放了 ==0 表示key不存在了,也就是锁被释放了if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then -- 返回nil,也就是null, 表示释放锁成功 return nil;end ;-- 对锁的重入次数减一 因为重入一次counter会+1,所以释放时每次也只能-1,跟重入次数匹配local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);-- 如果重入次数仍然大于0,续约过期时间if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); -- 返回解说失败 return 0;else -- 表示重入次数已经为0了,删除锁的key redis.call('del', KEYS[1]); -- 使用publish发布一个消息,其他订阅了的客户端收到消息,就说明解锁成功了饿、然后可以重新获取锁了 redis.call('publish', KEYS[2], ARGV[1]); -- 返回1 表示解锁成功 return 1;end ;return nil;
其实就是在解锁的时候,已经解锁了直接返回成功,可重入次数没有到0,将会解锁失败,直到可重入次数重新减到0后,开始删除锁的key.
并且此时会使用publish发送一个消息在渠道上,订阅者们订阅到了,就说明锁已经被释放了,然后可以从重新获取锁了。
Redisson实现分布式锁,就是使用lua脚本保证原子性和互斥性的。每次都判断是不是自己持有锁,才进行操作,这就保证了同一性。
在加锁时使用incrby对key对应的value值进行自增,减锁时自减实现锁的可重入性。
使用redis的超时自动过期来保证锁的容错性,不会一直锁死下去。所以锁的最大续约时间是防止思索的一个有效的方法。
责任编辑:武晓燕 来源: 今日头条 Redisson锁机制源码(责任编辑:时尚)
外国人在华生活服务平台NIHAO(你好)获600万元天使轮融资
*ST康得(002450.SZ)2020年度实现归母净亏损32.05亿元 公司总资产81.01亿元
《Redemption Reapers》Steam页面上线 明年2月发售