概述

随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

分布式锁的实现:

  • 基于数据库实现分布式锁
  • 基于缓存(Redis等)
  • 基于Zookeeper

分布式锁的条件:

  • 互斥性,在任意时刻,只有一个客户端能持有锁
  • 不会发生死锁,即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁
  • 加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了
  • 加锁和解锁必须具有原子性

实现原理

使用Redis中set命令的NX参数时,当key不存在时,可正常将value存入Redis中并返回1,而如果key已存在,则无法存入会返回0。那么我们可以将一个key作为标识,如果成功存入,则代表加锁成功,可进行业务操作,并在操作结束后删除这个key来代表解锁。反之,如果未成功存入,则代表获取锁失败,正在有其他线程进行操作,因当进行循环等待

--EX:key的超时秒数
--设置锁的过期时间

--NX:只有在key不存在时设置key的值
--如果key不存在则设置key的值,并返回1,代表获取所成功
--如果key已存在则无法设置key的值,并返回0,代表获取锁失败

set <key> <value> [EX seconds|PX milliseconds|KEEPTTL] [NX|XX]

简单实现

image.png

注意事项:

  • 需要设置锁的超时时间防止服务阻塞
  • 不能使用setnx和expire操作来进行加锁,因为两次操作无法保证原子性
  • 应当使用lua脚本执行加锁和解锁的操作来保证原子性
  • 应当使用UUID为redis中的value并在解锁时进行判断,防止误删
public void testLockLua() {
    //1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中
    String uuid = UUID.randomUUID().toString();
    //2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除!
    String lockName = "lockName";

    // 3 获取锁
    Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockName, uuid, 3, TimeUnit.SECONDS);

    // 如果true,代表成功获取到锁
    if (lock) {
        // 执行的业务逻辑开始
        System.out.println("业务逻辑");
        // 定义lua 脚本
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        // 使用redis执行lua执行
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptText(script);
        // 设置一下返回值类型 为Long
        // 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
        // 那么返回字符串与0 会有发生错误。
        redisScript.setResultType(Long.class);
        // 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
        redisTemplate.execute(redisScript, Arrays.asList(lockName), uuid);
    } else {
        // 获取锁失败,进行等待
        try {
            // 睡眠
            Thread.sleep(1000);
            // 睡醒了之后,重新获取锁
            testLockLua();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

基于Redisson实现

在上面代码中,虽然成功设置了分布式锁,但使用起来过于繁琐

Redisson提供了许多关于分布式锁的方法,因此我们可以基于Redisson来实现

-- 使用案例

public void testLockLua() {
    //定义一个锁名
    String lockName = "lockName";
    //获取Redisson提供的锁
    RLock lock = redisson.getLock(lockName);
    try {
        //加锁,并设置过期时间
        lock.lock(3,TimeUnit.SECONDS);
        // 执行的业务逻辑开始
        System.out.println("业务逻辑");
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        //解锁
        lock.unlock();
    }
}
  • Redisson加锁逻辑Lua
-- 若锁不存在:则新增锁,并设置锁重入计数为1、设置锁过期时间
if (redis.call('exists', KEYS[1]) == 0) then
    redis.call('hset', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
 
-- 若锁存在,且唯一标识也匹配:则表明当前加锁请求为锁重入请求,故锁重入计数+1,并再次设置锁过期时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
 
-- 若锁存在,但唯一标识不匹配:表明锁是被其他线程占用,当前线程无权解他人的锁,直接返回锁剩余过期时间
return redis.call('pttl', KEYS[1]);

-- Redisson解锁逻辑Lua

-- 若锁不存在:则直接广播解锁消息,并返回1
if (redis.call('exists', KEYS[1]) == 0) then
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1; 
end;
 
-- 若锁存在,但唯一标识不匹配:则表明锁被其他线程占用,当前线程不允许解锁其他线程持有的锁
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    return nil;
end; 
 
-- 若锁存在,且唯一标识匹配:则先将锁重入计数减1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
if (counter > 0) then 
    -- 锁重入计数减1后还大于0:表明当前线程持有的锁还有重入,不能进行锁删除操作,但可以友好地帮忙设置下过期时期
    redis.call('pexpire', KEYS[1], ARGV[2]); 
    return 0; 
else 
    -- 锁重入计数已为0:间接表明锁已释放了。直接删除掉锁,并广播解锁消息,去唤醒那些争抢过锁但还处于阻塞中的线程
    redis.call('del', KEYS[1]); 
    redis.call('publish', KEYS[2], ARGV[1]); 
    return 1;
end;
 
return nil;

参考文档