当前位置:首页 >探索 >Redis分布式锁抽丝剥茧 例如库存只剩1个商品

Redis分布式锁抽丝剥茧 例如库存只剩1个商品

2024-06-30 20:14:48 [百科] 来源:避面尹邢网

Redis分布式锁抽丝剥茧

作者: 小码甲 存储 存储软件 分布式 Redis 布式最近首度应用"分布式锁",现在想想,锁抽丝剥分布式锁不是布式孤立的技能点,这其实就是锁抽丝剥跨主机的线程同步。

[[405785]]

分布式锁是布式"线程同步"的延续

最近首度应用"分布式锁",现在想想,锁抽丝剥分布式锁不是布式孤立的技能点,这其实就是锁抽丝剥跨主机的线程同步。

进程内跨进程跨主机
Lock/Monitor、SemaphoreSlimMetux、Semaphore分布式锁
用户态线程安全内核态线程安全 

单机服务器可以通过共享某堆内存来标记上锁/解锁,线程同步说到底是锁抽丝剥建立在单机操作系统的用户态/内核态对共享内存的访问控制。

Redis分布式锁抽丝剥茧 例如库存只剩1个商品

而分布式服务器不是布式在同一台机器上:跨主机,因此需要将锁标记存储在所有机器进程都能看到的锁抽丝剥地方。

Redis分布式锁抽丝剥茧 例如库存只剩1个商品

在开发很多业务场景会使用到锁,布式例如库存控制,抽奖等。

Redis分布式锁抽丝剥茧 例如库存只剩1个商品

例如库存只剩1个商品,有三个用户同时打算购买,谁先购买库存立即清零,不能让其他二人也购买成功。

解读分布式锁

我们常说的线程安全、线程同步方案,包括此次的分布式锁都是基于

“多线程/多进程对特定资源同时有更新操作”。

基本考量

1.分布式系统,一个锁在同一时间只能被一个服务器获取 (这是分布式锁的基础)

2.具备锁失效机制,防止死锁 (防止某些意外,锁没有得到释放,别人也无法得到锁)

Redis SET resource-name anystring NX EX max-lock-time

是一种最简单的分布式锁实现方案。

SET 命令支持多个参数:

  • EX seconds-- 设置过期时间(s)
  • NX -- 如果key不存在,则设置 ......

因为SET命令参数可以替代SETNX,SETEX,GETSET,这些命令在未来可能被废弃。

上面的命令返回OK(或经过重试),客户端就获取到这个锁;

使用DEL命令解锁;到达超时时间会自动释放锁。

在解锁时,增加一些设计,让系统更加健壮:

3.不要使用固定的String值作为锁标记值,而是使用一个不易被猜中的随机值, 业内称为token

4.不使用DEL命令释放锁,而是发送script去移除key

第3、4点是为了解决 :“锁提前过期,客户端A还没有执行完,然后客户端B获取了锁,这时客户端A执行完了,会不会在删锁的时候把B的锁给删掉” -- 4是3技术上的推荐实现。

脚本如下:

  1. if redis.call("get",KEYS1] ==ARGV[1]) 
  2. then 
  3.    return  redis.call("DEL",KEYS[1]) 
  4. else 
  5.   return 0 
  6. end 

下面使用StackExchange.Redis 写了基于以上考量的代码示例:

  1. /// <summary> 
  2. /// Acquires the lock. 
  3. /// </summary> 
  4. /// <param name="key"></param> 
  5. /// <param name="token">随机值</param> 
  6. /// <param name="expireSecond"></param> 
  7.  /// <param name="waitLockSeconds">非阻塞锁</param> 
  8. static bool Lock(string key, string token,int expireSecond=10, double waitLockSeconds = 0) 
  9. {  
  10.     var waitIntervalMs = 50; 
  11.     bool isLock; 
  12.              
  13.     DateTime begin = DateTime.Now; 
  14.     do 
  15.     {  
  16.          isLock = Connection.GetDatabase().StringSet(key, token, TimeSpan.FromSeconds(expireSecond), When.NotExists); 
  17.          if (isLock) 
  18.              return true; 
  19.              //不等待锁则返回 
  20.              if (waitLockSeconds == 0) break; 
  21.              //超过等待时间,则不再等待 
  22.              if ((DateTime.Now - begin).TotalSeconds >= waitLockSeconds) break; 
  23.              Thread.Sleep(waitIntervalMs); 
  24.      } while (!isLock); 
  25.      return false; 
  26.  } 
  27.         
  28. /// <summary>   
  29. /// Releases the lock.   
  30. /// </summary>   
  31. /// <returns><c>true</c>, if lock was released, <c>false</c> otherwise.</returns>   
  32. /// <param name="key">Key.</param>   
  33. /// <param name="value">value</param>   
  34. static bool UnLock(string key, string value) 
  35. {  
  36.     string lua_script = @"   
  37.     if (redis.call('GET', KEYS[1]) == ARGV[1]) then   
  38.          redis.call('DEL', KEYS[1])   
  39.           return true   
  40.           else   
  41.           return false   
  42.         end   
  43.       "; 
  44.      try 
  45.      {  
  46.           var res = Connection.GetDatabase().ScriptEvaluate(lua_script, 
  47.                                                            new RedisKey[] {  key }, 
  48.                                                            new RedisValue[] {  value }); 
  49.             return (bool)res; 
  50.       } 
  51.      catch (Exception ex) 
  52.      {  
  53.           Console.WriteLine($"ReleaseLock lock fail...{ ex.Message}"); 
  54.           return false; 
  55.      } 
  56.          
  57.         private static Lazy<ConnectionMultiplexer> lazyConnection = new Lazy<ConnectionMultiplexer>(() => 
  58.         {  
  59.             ConfigurationOptions configuration = new ConfigurationOptions 
  60.             {  
  61.                 AbortOnConnectFail = false, 
  62.                 ConnectTimeout = 5000, 
  63.             }; 
  64.             configuration.EndPoints.Add("10.100.219.9", 6379); 
  65.             return ConnectionMultiplexer.Connect(configuration.ToString()); 
  66.         });  
  67.         public static ConnectionMultiplexer Connection => lazyConnection.Value; 

以上代码新增了第五点考量:

5. 为避免无限制抢锁,增加了非阻塞锁:轮询_s等待锁,未等到则不再抢锁

使用方式:

下面并行开启三个任务,同时减少库存:

  1. static void Main(string[] args) 
  2. {  
  3.      // 尝试并行执行3个任务 
  4.      Parallel.For(0, 3, x => 
  5.      {  
  6.            string token = $"loki:{ x}"; 
  7.            bool isLocked = Lock("loki", token, 5, 10); 
  8.              
  9.            if (isLocked) 
  10.            {  
  11.                Console.WriteLine($"{ token} begin reduce stocks (with lock) at { DateTime.Now}."); 
  12.                Thread.Sleep(1000); 
  13.                Console.WriteLine($"{ token} release lock { UnLock("loki", token)} at { DateTime.Now}. "); 
  14.            } 
  15.            else 
  16.            {  
  17.              Console.WriteLine($"{ token} begin reduce stocks at { DateTime.Now}."); 
  18.            } 
  19.        }); 

可以看到三个并行任务依次获取/释放锁

输出总结

 

本文从基础的线程安全、线程同步,认识到分布式锁是跨主机的资源线程/进程同步方案, 以步步为营的风格 演示了RedisSET命令做分布式锁的设计考量,好记性不如烂笔头。

 

责任编辑:武晓燕 来源: 精益码农 Redis分布式锁

(责任编辑:焦点)

    推荐文章
    热点阅读