当前位置:首页 >探索 >Redisson 高性能 Redis 分布式锁源码分析 续期需要注意的分布分析是

Redisson 高性能 Redis 分布式锁源码分析 续期需要注意的分布分析是

2024-06-30 23:43:23 [百科] 来源:避面尹邢网

Redisson 高性能 Redis 高性分布式锁源码分析

作者:老郑 数据库 Redis 锁的续期是在 org.redisson.RedissonLock#tryAcquireAsync方法中调用 scheduleExpirationRenewal实现的。续期需要注意的分布分析是,看门狗是式锁设置在主线程的延迟队列的线程中。

Redisson 实现分布式锁的源码机制如下:

图片

Redisson 高性能 Redis 分布式锁源码分析 续期需要注意的分布分析是

原理描述

  • 先线程 1 获取锁,如果获取锁成功,高性那么会开启一个后台线程,分布分析每次间隔 10 秒进行续期。式锁
  • 并发情况,源码线程 2 会进行加锁,高性如果无法获取锁,分布分析那么就会进行自旋等待,式锁等待到达一定次数过后,源码就会进行线程阻塞,高性并且订阅解锁消息。分布分析
  • 当线程 1 释放锁之后,式锁会触发 redis 的解锁消息,消息的观察者会观察到然后去唤醒解锁的逻辑,线程 2 继续竞争锁。
  • 对于锁的重入,Redisson 是通过 hash 为数据类型的,会存储当前线程的 tid (本质是生成的 uuid 唯一id)。

测试代码

下面我们将以一个秒杀的例子来说明:

Redisson 高性能 Redis 分布式锁源码分析 续期需要注意的分布分析是

依赖版本

implementation 'org.redisson:redisson-spring-boot-starter:3.17.0'

测试代码

下面是模拟一个商品秒杀的场景,示例代码如下:

Redisson 高性能 Redis 分布式锁源码分析 续期需要注意的分布分析是

public class RedissonTest { 

public static void main(String[] args) {
//1. 配置部分
Config config = new Config();
String address = "redis://127.0.0.1:6379";
SingleServerConfig serverConfig = config.useSingleServer();
serverConfig.setAddress(address);
serverConfig.setDatabase(0);
config.setLockWatchdogTimeout(5000);
Redisson redisson = (Redisson) Redisson.create(config);


RLock rLock = redisson.getLock("goods:1000:1");
//2. 加锁
rLock.lock();
try {
System.out.println("todo 逻辑处理 1000000.");
} finally {
if (rLock.isLocked() && rLock.isHeldByCurrentThread()) {
//3. 解锁
rLock.unlock();
}
}

}
}

加锁设计

rLock.lock();​是加锁的核心代码,我们一起来看看调用栈

图片

加锁的核心方法是:​org.redisson.RedissonLock#tryLockInnerAsync

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { 
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

其实它的本质是调用一段 LUA 脚本进行加锁, 需要注意的是这个地方使用的数据类型是 hash。这里是用 hash 的好处就是可以通过同一个 key 来存储重入的 tid

锁续期设计

锁的续期是在 org.redisson.RedissonLock#tryAcquireAsync​方法中调用 scheduleExpirationRenewal实现的。

续期需要注意的是,看门狗是设置在主线程的延迟队列的线程中。

这里的好处就是如果我在一个进程中,同时加了 1000 把锁,我们不需要启动 1000 个子线程去续期,只需要创建 1000 个续期任务对象即可,在到达续期时间才会唤醒续期线程。

tryAcquireAsync 代码如下:

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { 
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}

CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired
if (ttlRemaining == null) {
if (leaseTime != -1) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 锁过期时间续期
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}

锁续期 scheduleExpirationRenewal代码如下:

protected void scheduleExpirationRenewal(long threadId) { 
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}

然后在调用 renewExpiration(); 执行续期逻辑, 其实这里是一个定时任务 + 递归的方式实现续期的,用定时任务的好处就是不用去开 N 个字线程,只需要创建对应的任务对象即可。

备注:如果超级极端的情况下 N 把锁,同时加锁,同时需求。我们可以考虑在锁的有效期上,给它加一个浮动时间比如 100 - 500ms. 这样就能一定程度上避免 (参考的是缓存失效/击穿的解决方案)

private void renewExpiration() { 
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}

// 创建延迟任务
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}

// 真正的续期,调用 LUA 脚本续期
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}

// 如果续期成功
if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

ee.setTimeout(task);
}

这里还有一个小的点,就是续期的时间是 1/3 为什么呢?保证在下次续期的时候锁不过期,如果是 1/2 可能在下次定时任务执行的时候 key 已经过期,如果小于 1/3 会导致频繁续期,任务代价/收益比不高。

renewExpirationAsync方法, 里面还是一段 LUA 脚本,进行重新设置锁的过期时间。

protected RFuture<Boolean> renewExpirationAsync(long threadId) { 
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}

锁的自旋重试

org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean)​在执行获取锁失败的时候,会进入重试。其实这里就会执行 18 行以后的 while (true)逻辑

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { 
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}

// 订阅锁过期的消息
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
RedissonLockEntry entry;
if (interruptibly) {
entry = commandExecutor.getInterrupted(future);
} else {
entry = commandExecutor.get(future);
}

try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}

// waiting for message
if (ttl >= 0) {
try {
// 阻塞锁的超时时间,等锁过期后再尝试加锁
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
entry.getLatch().acquire();
} else {
entry.getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(entry, threadId);
}
// get(lockAsync(leaseTime, unit));
}

entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);​其实这里就是一个间歇性自旋。等到上次锁过期的时间,在唤醒进行抢锁 entry.getLatch().acquire();

订阅锁失效

还有一个逻辑就是

CompletableFuture future = subscribe(threadId);

这里其实是会订阅一个消息,如果解锁过后,会发布解锁的消息。然后再唤醒当前多次竞争锁进入休眠的线程。

解锁设计

rLock.unlock(); 的核心就是释放锁,撤销续期和唤醒在等待加锁的线程(发布解锁成功消息)。

核心方法(解锁): org.redisson.RedissonLock#unlockInnerAsync

protected RFuture<Boolean> unlockInnerAsync(long threadId) { 
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
// 发布解锁成功消息
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

还是 LUA 的执行方式。

撤销锁续期

核心方法 org.redisson.RedissonBaseLock#unlockAsync(long)

@Override
public RFuture<Void> unlockAsync(long threadId) {
// 解锁
RFuture<Boolean> future = unlockInnerAsync(threadId);

// 撤销续期
CompletionStage<Void> f = future.handle((opStatus, e) -> {
cancelExpirationRenewal(threadId);

if (e != null) {
throw new CompletionException(e);
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
throw new CompletionException(cause);
}

return null;
});

return new CompletableFutureWrapper<>(f);
}

解锁成功唤排队线程

在 org.redisson.pubsub.LockPubSub#onMessage中回去唤醒阻塞的线程,让执行前面的锁自旋逻辑,具体代码如下:

@Override
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(UNLOCK_MESSAGE)) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}

value.getLatch().release();
} else if (message.equals(READ_UNLOCK_MESSAGE)) {
while (true) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute == null) {
break;
}
runnableToExecute.run();
}

value.getLatch().release(value.getLatch().getQueueLength());
}
}
责任编辑:武晓燕 来源: 运维开发故事 Redis分布式锁Redisson

(责任编辑:百科)

    推荐文章
    • 云南省国资委印发《关于加快推进绿色国资建设的实施意见》 推进示范引领

      云南省国资委印发《关于加快推进绿色国资建设的实施意见》 推进示范引领为推动国资国企坚定不移走生态优先、绿色发展之路,以绿色国资建设助力国资国企高质量发展,云南省国资委印发《关于加快推进绿色国资建设的实施意见》(以下简称《意见》)。《意见》强调,全云南国资国企要全面践行 ...[详细]
    • 搜狗输入法怎么开启双拼

      搜狗输入法怎么开启双拼搜狗输入法是一款很好用的打字键盘软件,有很多的小伙伴在使用的时候比较好奇搜狗输入法怎么开启双拼呢?现在就来看一下小编带来的搜狗输入法开启双拼的方法吧。1、首先打开搜狗输入法进入到首页之后点击右下角的【 ...[详细]
    • 一加7T Pro真旗舰 你值得考虑的高颜值高性能手机 -

      一加7T Pro真旗舰 你值得考虑的高颜值高性能手机 -【手机中国行情】一年一度的双12已经拉开序幕,众多的手机厂商纷纷推出各种优惠活动。但是,手机的型号太多款式太多,你可能会挑花了眼。不用担心,我给你的建议是买手机一定要买性能强劲的手机,运行稳定速度又快 ...[详细]
    • 丰田新款卡罗拉锐放上市 最大功率为126千瓦 12.98万起 -

      丰田新款卡罗拉锐放上市 最大功率为126千瓦 12.98万起 -【智车派新闻】近日,智车派从一汽丰田官方了解到,2023款卡罗拉锐放正式上市。新车共推出7款车型,售价区间为12.98万-16.98万元。新款丰田卡罗拉锐放外观设计方面,新款卡罗拉锐放基本延续了202 ...[详细]
    • 微粒贷分两次借款怎么还 具体情况是怎样的?

      微粒贷分两次借款怎么还 具体情况是怎样的?微粒贷是微众银行推出的信用贷款,借款人可以直接在微信钱包上申请。有不少人在微粒贷不止借了一次,不知道是分开还款还是要在同一天内还款。那么,微粒贷分两次借款怎么还?这里就来介绍下微粒贷还款相关内容。微粒 ...[详细]
    • 剪映怎么设置视频封面

      剪映怎么设置视频封面我们在使用剪映软件编辑视频的时候,想要设置一下视频的封面,但是不知道要怎么样来操作才可以,那么现在就来看一下小编给大家带来的设置视频封面的方法吧。1.首先打开剪映软件进入到首页之后点击【开始创作】;2 ...[详细]
    • 京东怎么删除自己评价

      京东怎么删除自己评价我们在使用京东软件的时候,购买过的商品已经收货之后可以对商品进行评价,那么想要删除自己评价的记录要怎么操作呢?现在就来看一下删除评价的教程吧。1.首先打开京东软件进入到首页之后点击右下角的【我的】;2 ...[详细]
    • 新款宝马X6正式上市!百公里加速5.4秒 79.99万元起 -

      新款宝马X6正式上市!百公里加速5.4秒 79.99万元起 -【智车派新闻】日前,智车派从宝马官方了解到,新款宝马X6正式上市,共推出2款车型,分别为xDrive30i M运动套装,售价79.99万元,xDrive40i尊享型M运动套装,售价97.59万元。新款 ...[详细]
    • 受跟踪指数下调影响 “10月最惨基金”一月跌超30%

      受跟踪指数下调影响 “10月最惨基金”一月跌超30%受跟踪指数下调影响,近日一只基金净值接连“受挫”。据Wind最新数据显示,11月2日当天,建信易盛郑商所能源化工期货ETF联接A/C份额的单日净值跌幅达3.66%、3.67%, ...[详细]
    • 为什么有时候马赛克会打在人的眼睛上?蚂蚁庄园8.4日答案

      为什么有时候马赛克会打在人的眼睛上?蚂蚁庄园8.4日答案在蚂蚁庄园小课堂中有关于8月4日问题的答案已经更新了,下面就和小编一起来看一下“为什么有时候马赛克会打在人的眼睛上?这个问题的正确答案是什么吧。问题:为什么有时候马赛克会打在人的眼睛上?选项:A、眼睛 ...[详细]
    热点阅读