机缘巧合下在面试中被问到库存超卖现象的并发场景解决方案,因主观思想作祟一直觉得既然是锁,对性能肯定有损耗,遂回答的解决点包括延迟双删、队列、令牌桶,都是基于缓存与数据库数据一致性的解决方案或是并发限流的解决方案,而真正能解决并发场景数据一致性问题的关键就是分布式锁。
市面上实现分布式锁的方式,汇总起来大概包括基于数据库实现的锁(基于数据库表、排它锁、共享锁)、基于 redis 实现的锁,及基于 Zookeeper 实现的锁,本篇文章着重针对 redis 实现的分布式锁进行分析处理。
使用 redis 作为分布式锁,一般可以用 redis 的 setnx 、expire 方法来实现,原理是通过 setnx 来获取锁,并通过 expire 来给锁加上过期时间。(redis 2.8 版本前只能通过 setnx 及 expire 来实现,2.8 版本后可直接使用 set 的 nx+ex 选项直接来实现分布式锁,也是laravel内置的解决方案)
- setnx()
setnx 的含义就是 SET if Not Exists,其主要有两个参数 setnx (key, value)。该方法是原子的,如果 key 不存在,则设置当前 key 成功,返回 1;如果当前 key 已经存在,则设置当前 key 失败,返回 0。
- expire()
expire 设置过期时间,要注意的是 setnx 命令不能设置 key 的超时时间,只能通过 expire () 来对 key 设置。
- 具体步骤
- setnx (lockKey, 1) 如果返回 0,则说明占位失败;如果返回 1,则说明占位成功
- expire () 命令对 lockKey 设置超时时间,为的是避免死锁问题。
- 执行完业务代码后,可以通过 delete 命令删除 key。
这个方案其实是可以解决日常工作中的需求的,但从技术方案的探讨上来说,可能还有一些可以完善的地方。比如,如果在第一步 setnx 执行成功后,在 expire () 命令执行成功前,发生了宕机的现象,那么就依然会出现死锁的问题,所以如果要对其进行完善的话,可以使用 redis 的 setnx 、get 和 getset 方法来实现分布式锁。
- getset()
这个命令主要有两个参数 getset (key,newValue)。该方法是原子的,对 key 设置 newValue 这个值,并且返回 key 原来的旧值。假设 key 原来是不存在的,那么多次执行这个命令,会出现下边的效果:
- getset (key, "value1") 返回 null 此时 key 的值会被设置为 value1
- getset (key, "value2") 返回 value1 此时 key 的值会被设置为 value2
- 依次类推!
- 使用步骤
- setnx (lockKey, 当前时间 + 过期超时时间),如果返回 1,则获取锁成功;如果返回 0 则没有获取到锁,转到步骤 2。
- get (lockKey) 获取值,值是当前 lockKey 的过期时间用 oldExpireTime 代表 ,并将这个 oldExpireTime 与当前的系统时间进行比较,如果早于当前系统时间,则认为这个锁已经超时,可以允许别的请求重新获取,转向 步骤 3,否则等待指定时间后返回步骤 2 重新开始判定。
- 计算 newExpireTime = 当前时间 + 过期超时时间,然后 getset (lockKey, newExpireTime) 会返回当前 lockKey 之前设置的旧值 currentExpireTime。
- 判断 currentExpireTime 与 oldExpireTime 是否相等,如果相等,说明当前进程 getset 设置锁成功,获取到了锁。如果不相等,说明这个锁已经被别的进程获取走了,那么当前请求可以根据具体需求逻辑直接返回失败,或者返回步骤 2 继续重试。
- 在获取到锁之后,当前进程可以开始自己的业务处理,当处理完毕后,比较当前理时间和对锁设置的超时时间,如果小于锁设置的超时时间,则直接执行 delete 释放锁;如果大于锁设置的超时时间,锁可能已由其他进程获得,这时执行 delete 释放锁的操作会导致把其他进程已获得的锁释放掉。
下面是用 PHP 代码实现的 Redis 分布式锁,关于 Redis 部分使用的是伪代码,请根据自己的情况用 Redis 连接对象替代其中的伪代码。
/**
* 获取Redis分布式锁
*
* @param $lockKey
* @return bool
*/
function getRedisDistributedLock(string $lockKey) : bool
{
$lockTimeout = 2000;// 锁的超时时间2000毫秒
$now = intval(microtime(true) * 1000);
$lockExpireTime = $now + $lockTimeout;
$lockResult = Redis::setnx($lockKey, $lockExpireTime);
if ($lockResult) {
// 当前进程设置锁成功
return true;
} else {
$oldLockExpireTime = Redis::get($lockKey);
if ($now > $oldLockExpireTime && $oldLockExpireTime == Redis::getset($lockKey, $lockExpireTime)) {
return true;
}
}
return false;
}
/**
* 串行执行程序
*
* @param string $lockKey Key for lock
* @param Closure $closure 获得锁后进程要执行的闭包
* @return mixed
*/
function serialProcessing(string $lockKey, Closure $closure)
{
if (getRedisDistributedLock($lockKey)) {
$result = $closure();
$now = intval(microtime(true) * 1000);
if ($now < Redis::get($lockKey)) {
Redis::del($lockKey);
}
} else {
// 延迟200毫秒再执行
usleep(200 * 1000);
return serialProcessing($lockKey, $closure);
}
return $result;
}
上面 serialProcessing 方法里当前进程设置锁成功,获取了代码块的执行权后就会执行闭包参数 $closure 里的代码块,通过传递闭包给方法,让我们可以在项目任何需要确保程序串行执行的地方使用 serialProcessing 方法给程序加分布式锁解决并发请求的问题。
上面代码实现用面向过程的方式是为了能简单明了的描述怎么设置分布式锁,读者可以针对自己的情况执行设计实现代码。针对于大型系统使用集群 Redis 的情况,设置分布式锁的步骤更复杂,有兴趣的可以查看 Redlock 算法和 redisson 分布式锁组件。