概述
随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!
分布式锁的实现:
- 基于数据库实现分布式锁
- 基于缓存(Redis等)
- 基于Zookeeper
分布式锁的条件:
- 互斥性,在任意时刻,只有一个客户端能持有锁
- 不会发生死锁,即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁
- 加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了
- 加锁和解锁必须具有原子性
实现原理
使用Redis中set命令的NX参数时,当key不存在时,可正常将value存入Redis中并返回1,而如果key已存在,则无法存入会返回0。那么我们可以将一个key作为标识,如果成功存入,则代表加锁成功,可进行业务操作,并在操作结束后删除这个key来代表解锁。反之,如果未成功存入,则代表获取锁失败,正在有其他线程进行操作,因当进行循环等待
--EX:key的超时秒数
--设置锁的过期时间
--NX:只有在key不存在时设置key的值
--如果key不存在则设置key的值,并返回1,代表获取所成功
--如果key已存在则无法设置key的值,并返回0,代表获取锁失败
set <key> <value> [EX seconds|PX milliseconds|KEEPTTL] [NX|XX]
简单实现
注意事项:
- 需要设置锁的超时时间防止服务阻塞
- 不能使用setnx和expire操作来进行加锁,因为两次操作无法保证原子性
- 应当使用lua脚本执行加锁和解锁的操作来保证原子性
- 应当使用UUID为redis中的value并在解锁时进行判断,防止误删
public void testLockLua() {
//1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中
String uuid = UUID.randomUUID().toString();
//2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除!
String lockName = "lockName";
// 3 获取锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockName, uuid, 3, TimeUnit.SECONDS);
// 如果true,代表成功获取到锁
if (lock) {
// 执行的业务逻辑开始
System.out.println("业务逻辑");
// 定义lua 脚本
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 使用redis执行lua执行
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
// 设置一下返回值类型 为Long
// 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
// 那么返回字符串与0 会有发生错误。
redisScript.setResultType(Long.class);
// 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
redisTemplate.execute(redisScript, Arrays.asList(lockName), uuid);
} else {
// 获取锁失败,进行等待
try {
// 睡眠
Thread.sleep(1000);
// 睡醒了之后,重新获取锁
testLockLua();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
基于Redisson实现
在上面代码中,虽然成功设置了分布式锁,但使用起来过于繁琐
Redisson提供了许多关于分布式锁的方法,因此我们可以基于Redisson来实现
-- 使用案例
public void testLockLua() {
//定义一个锁名
String lockName = "lockName";
//获取Redisson提供的锁
RLock lock = redisson.getLock(lockName);
try {
//加锁,并设置过期时间
lock.lock(3,TimeUnit.SECONDS);
// 执行的业务逻辑开始
System.out.println("业务逻辑");
} catch (Exception e) {
e.printStackTrace();
} finally {
//解锁
lock.unlock();
}
}
- Redisson加锁逻辑Lua
-- 若锁不存在:则新增锁,并设置锁重入计数为1、设置锁过期时间
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 若锁存在,且唯一标识也匹配:则表明当前加锁请求为锁重入请求,故锁重入计数+1,并再次设置锁过期时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 若锁存在,但唯一标识不匹配:表明锁是被其他线程占用,当前线程无权解他人的锁,直接返回锁剩余过期时间
return redis.call('pttl', KEYS[1]);
-- Redisson解锁逻辑Lua
-- 若锁不存在:则直接广播解锁消息,并返回1
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
-- 若锁存在,但唯一标识不匹配:则表明锁被其他线程占用,当前线程不允许解锁其他线程持有的锁
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
-- 若锁存在,且唯一标识匹配:则先将锁重入计数减1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
-- 锁重入计数减1后还大于0:表明当前线程持有的锁还有重入,不能进行锁删除操作,但可以友好地帮忙设置下过期时期
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
-- 锁重入计数已为0:间接表明锁已释放了。直接删除掉锁,并广播解锁消息,去唤醒那些争抢过锁但还处于阻塞中的线程
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;