Appearance
一、redis开源客户端框架redisson
基本介绍
Redisson是在redis基础上实现的一套开源解决方案,不仅提供了一系列的分布式的java常用对象,还提供了许多分布式服务,宗旨是促进使用者对redis的关注分离,更多的关注业务逻辑的处理上
Redisson采用了基于NIO的Netty框架,不仅能作为Redis底层驱动客户端,具备提供对Redis各种组态形式的连接功能,对Redis命令能以同步发送、异步形式发送、异步流形式发送或管道形式发送的功能,LUA脚本执行处理,以及处理返回结果的功能
还在此基础上融入了更高级的应用方案,不但将原生的RedisHash,List,Set,String,Geo,HyperLogLog等数据结构封装为Java里大家最熟悉的映射(Map),列表(List),集(Set),通用对象桶(Object Bucket),地理空间对象桶(Geospatial Bucket),基数估计算法(HyperLogLog)等结构
在这基础上还提供了分布式的多值映射(Multimap),本地缓存映射(LocalCachedMap),有序集(SortedSet),计分排序集(ScoredSortedSet),字典排序集(LexSortedSet),列队(Queue),阻塞队列(Blocking Queue),有界阻塞列队(Bounded Blocking Queue),双端队列(Deque),阻塞双端列队(Blocking Deque),阻塞公平列队(Blocking Fair Queue),延迟列队(Delayed Queue),布隆过滤器(Bloom Filter),原子整长形(AtomicLong),原子双精度浮点数(AtomicDouble),BitSet等Redis原本没有的分布式数据结构
不仅如此,Redisson还实现了Redis文档中提到像分布式锁Lock这样的更高阶应用场景。事实上Redisson并没有不止步于此,在分布式锁的基础上还提供了联锁(MultiLock),读写锁(ReadWriteLock),公平锁(Fair Lock),红锁(RedLock),信号量(Semaphore),可过期性信号量(PermitExpirableSemaphore)和闭锁(CountDownLatch)这些实际当中对多线程高并发应用至关重要的基本部件
看门狗介绍
分布式系统里面,如果多个机器上的服务要同时对一个共享资源(比如说修改数据库里的一份数据),此时的话,某台机器就需要先获取一个针对那个资源(数据库里的某一行数据)的分布式锁
获取到了分布式锁之后,就可以任由你查询那条数据,修改那条数据,在这个期间,没有任何其他的客户端可以来修改这条数据,获取了一个分布式锁之后,就对某个共享的数据获取了一定时间范围内的独享的操作
其他的客户端如果同时要修改那条数据,尝试去获取分布式锁,就会被卡住,他需要等待第一个客户端先操作完了之后释放锁
而在redisson在客户端里实现了一个看门狗,watchdog,主要是监控持有一把锁的客户端是否还存活着,如果还存活着,那么看门狗会不断的延长这个锁的过期时间
可以指定一个leaseTime,你获取了一把锁之后,可能你在锁定的期间,执行的操作特别的耗时,可能长达10分钟,1个小时。你就可以在获取锁的时候指定一个leaseTime,比如说,指定好,如果我自己1分钟之内没释放这把锁,redisson自动释放这把锁,让别的客户端可以获取锁来做一些操作
二、redis分布式锁(一):可重入锁(Reentrant Lock)
基本介绍
基于Redis的Redisson分布式可重入锁RLock,Java对象实现了java.util.concurrent.locks.Lock接口。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口,常见使用案例如下:
java
RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法
lock.lock();
Redisson同时还为分布式锁提供了异步执行的相关方法:
java
RLock lock = redisson.getLock("anyLock");
lock.lockAsync();
lock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);
RLock对象完全符合Java的Lock规范。也就是说只有拥有锁的进程才能解锁,其他进程解锁则会抛出IllegalMonitorStateException错误。但是如果遇到需要其他进程也能解锁的情况,则需要使用分布式信号量Semaphore 对象
加锁逻辑和Lua分析
主要加锁Lua脚本
java
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.commandExecutor.evalWriteAsync(
this.getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);"
Collections.singletonList(this.getName()),
new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
根据锁key计算出 slot,找到执行Lua脚本的节点
需要先找到当前锁key需要存放到哪个slot,即在集群中哪个节点进行操作,后续不同客户端或不同线程再使用这个锁key进行上锁,也需要到对应的节点的slot中进行加锁操作
java
@Override
public <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
// 根据锁key找到对应的redis节点
NodeSource source = getNodeSource(key);
return evalAsync(source, false, codec, evalCommandType, script, keys, params);
}
private NodeSource getNodeSource(String key) {
// 计算锁key对应的slot
int slot = connectionManager.calcSlot(key);
return new NodeSource(slot);
}
public int calcSlot(String key) {
if (key == null) {
return 0;
}
int start = key.indexOf('{');
if (start != -1) {
int end = key.indexOf('}');
key = key.substring(start+1, end);
}
// 使用 CRC16 算法来计算 slot,其中 MAX_SLOT 就是 16384,redis集群规定最多有 16384 个slot。
int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;
log.debug("slot {} for {}", result, key);
return result;
}
watchdog看门狗监听延长锁
一个定时任务task,也就是10秒左右,当成功加锁之后开启定时调度的任务,初次执行是在10秒以后,这个定时任务会存储在一个ConcurrentHashMap对象expirationRenewalMap中,如果发现expirationRenewalMap中不存在对应当前线程key的话,定时任务就不会跑,这也是后面解锁中的一步重要操作
java
private void scheduleExpirationRenewal(final long threadId) {
if (!expirationRenewalMap.containsKey(this.getEntryName())) {
Timeout task =
this.commandExecutor.getConnectionManager()
.newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future =
RedissonLock.this.renewExpirationAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
public void operationComplete(Future<Boolean> future) throws Exception {
RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
if (!future.isSuccess()) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause());
} else {
if ((Boolean)future.getNow()) {
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(this.getEntryName(), new RedissonLock.ExpirationEntry(threadId, task)) != null) {
task.cancel();
}
}
}
//RFuture<Boolean> future = renewExpirationAsync(threadId);
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(
this.getName(), LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;"
)
}
加锁小结
lua脚本可以保证复杂判断和复杂操作的原子性,加锁主要步骤如下:
- 处理加锁Lua脚本
- 根据Key计算出slot,找到执行Lua脚本的节点
- 若是没有传入过期时间,默认启用看门狗续命持有锁
释放锁逻辑和Lua分析
主要释放锁Lua脚本
java
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(
this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.asList(this.getName(), this.getChannelName()),
new Object[]{
LockPubSub.unlockMessage,
this.internalLockLeaseTime,
this.getLockName(threadId)
}
);
}
释放锁取消看门狗逻辑
当线程完全释放锁后,就会调用cancelExpirationRenewal()方法取消"看门狗"的续时线程,引用expirationRenewalMap中进行取消
java
void cancelExpirationRenewal(Long threadId) {
RedissonLock.ExpirationEntry task = (RedissonLock.ExpirationEntry)expirationRenewalMap.get(this.getEntryName());
if (task != null && (threadId == null || task.getThreadId() == threadId)) {
expirationRenewalMap.remove(this.getEntryName());
task.getTimeout().cancel();
}
}
释放锁小结
还是基于 lua 脚本来完成的,lua脚本可以保证复杂判断和复杂操作的原子性,解锁主要步骤如下:
- 处理解锁Lua脚本
- 根据Key计算出slot,找到执行Lua脚本的节点
- 解锁成功,取消看门狗任务
可重入锁的特征总结
加锁,释放锁,以及lock的方式进行枷锁,tryLock的方式进行加锁,可以接下来做一个总结: (1)加锁:在redis里设置hash数据结构,生存周期是30000毫秒 (2)维持加锁:代码里一直加锁,redis里的key会一直保持存活,后台每隔10秒的定时任务(watchdog)不断的检查,只要客户端还在加锁,就刷新key的生存周期为30000毫秒 (3)可重入锁:同一个线程可以多次加锁,就是在hash数据结构中将加锁次数累加1 (4)锁互斥:不同客户端,或者不同线程,尝试加锁陷入死循环等待 (5)手动释放锁:可重入锁自动递减加锁次数,全部释放锁之后删除锁key (6)宕机自动释放锁:如果持有锁的客户端宕机了,那么此时后台的watchdog定时调度任务也没了,不会刷新锁key的生存周期,此时redis里的锁key会自动释放 (7)尝试加锁超时:在指定时间内没有成功加锁就自动退出死循环,标识本次尝试加锁失败 (8)超时锁自动释放:获取锁之后,在一定时间内没有手动释放锁,则redis里的key自动过期,自动释放锁
结合可重入锁来进行说明常见分布式锁隐患问题
redis加锁,本质还是在redis集群中挑选一个master实例来加锁,master -> slave,实现了高可用的机制,如果master宕机,slave会自动切换为master
假设客户端刚刚在master写入一个锁,此时发生了master的宕机,但是master还没来得及将那个锁key异步同步到slave,slave就切换成了新的master。此时别的客户端在新的master上也尝试获取同一个锁,会成功获取锁
此时两个客户端,都会获取同一把分布式锁,可能有的时候就会导致一些数据的问题,redisson的分布式锁,隐患主要就是在这里
三、redis分布式锁(二):公平锁(Fair Lock)
基本介绍
基于Redis的Redisson分布式可重入公平锁也是实现了java.util.concurrent.locks.Lock接口的一种RLock对象。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒
通过公平锁,可以保证说,客户端获取锁的顺序,就跟他们请求获取锁的顺序是一样的,公平锁排队,谁先申请获取这把锁,谁就可以先获取到这把锁,这个是按照顺序来的,不是胡乱的争抢的。可重入非公平锁、公平锁,他们在整体的技术实现上都是一样的,只不过唯一不同的一点就是在于加锁的逻辑那里
非公平锁加锁是比计较简单粗暴,争抢;公平锁,在加锁的逻辑里,要加入这个排队的机制,保证说各个客户端排队,按照顺序获取锁,不是胡乱争抢的
java
RLock fairLock = redisson.getFairLock("fairLock");
// 最常见的使用方法
fairLock.lock();
加锁逻辑和Lua分析
java
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
long currentTime = System.currentTimeMillis();
// 根据不同的命令类型执行不同的LUA脚本
// EVAL_LONG,重点关注这个
if (command == RedisCommands.EVAL_LONG) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
/**
* 第一部分: 死循环的作用主要是用于清理过期的等待线程,主要避免下面场景,避免无效客户端占用等待队列资源
* 1、获取锁失败,然后进入等待队列,但是网络出现问题,那么后续很有可能就不能继续正常获取锁了。
* 2、获取锁失败,然后进入等待队列,但是之后客户端所在服务器宕机了。
*/
// 开启死循环
"while true do " +
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end;" +
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
"if timeout <= tonumber(ARGV[4]) then " +
// a、通过zrem指令从redisson_lock_timeout:{fairLock}超时集合中删除第一个等待线程ID对应的元素
"redis.call('zrem', KEYS[3], firstThreadId2);" +
// b、通过lpop指令从redisson_lock_queue:{fairLock}等待队列中移除第一个等待线程ID对应的元素
"redis.call('lpop', KEYS[2]);" +
"else " +
// 如果超时时间戳 大于 当前时间,说明还没超时,则直接跳出循环
"break;" +
"end;" +
"end;" +
/**
* 第二部分: 检查是否可以获取锁
* 满足下面两个条件之一可以获取锁:
* 1、当前锁不存在(锁未被获取) and 等待队列不存在
* 2、当前锁不存在(锁未被获取) and 等待队列存在 and 等待队列中的第一个等待线程就是当前客户端当前线程
*/
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) " +
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
"redis.call('lpop', KEYS[2]);" +
"redis.call('zrem', KEYS[3], ARGV[2]);" +
"redis.call('hset', KEYS[1], ARGV[2], 1);" +
// 默认超时时间:30秒
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
// 返回nil,表示获取锁成功,如果执行到这里,就return结束了,不会执行下面的第三、四、五部分
"return nil;" +
"end;" +
/**
* 第三部分: 检查锁是否已经被持有,公平锁重入
*/
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
/**
* 第四部分: 检查线程是否已经在等待队列中,如果当前线程本就在等待队列中,返回等待时间
*/
"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
"if timeout ~= false then " +
"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
"end;" +
/**
* 第五部分: 将线程添加到队列末尾,并在timeout set中设置其超时时间为队列中前一个线程的超时时间(如果队列为空则为锁的超时时间)加上threadWaitTime
*/
// 获取等待队列redisson_lock_queue:{fairLock}最后一个元素,即等待队列中最后一个等待的线程
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
// 返回ttl
"return ttl;",
Arrays.asList(getRawName(), threadsQueueName, timeoutSetName),
unit.toMillis(leaseTime), getLockName(threadId), wait, currentTime);
}
throw new IllegalArgumentException();
}
Lua脚本第一部分:删除过期的等待线程
死循环的作用主要用于清理过期的等待线程,主要避免下面场景,避免无效客户端占用等待队列资源
- 获取锁失败,然后进入等待队列,但是网络出现问题,那么后续很有可能就不能继续正常获取锁了
- 获取锁失败,然后进入等待队列,但是之后客户端所在服务器宕机了
Lua脚本第二部分:检查是否可以获取锁
满足下面两个条件之一可以获取锁:
- 当前锁不存在(锁未被获取) and 等待队列不存在
- 当前锁不存在(锁未被获取) and 等待队列存在 and 等待队列中的第一个等待线程就是当前客户端当前线程
Lua脚本第三部分:检查锁是否已经被持有,公平锁重入
通过hexists指令判断当前持有锁的线程是不是自己,如果是自己的锁,则执行重入,增加加锁次数,并且刷新锁的过期时间
Lua脚本第四部分:检查线程是否已经在等待队列中
检查线程是否已经在等待队列中,如果当前线程本就在等待队列中,返回等待时间
Lua脚本第五部分: 将线程添加到队列末尾
将线程添加到队列末尾,并在timeout set中设置其超时时间为队列中前一个线程的超时时间(如果队列为空则为锁的超时时间)加上threadWaitTime
释放锁逻辑和Lua分析
java
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(
this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
/**
* 第一部分:开启死循环,移除超时的那些线程
*/
"while true do "+
// 获取等待队列中第一个等待的线程
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;"+
"end; "+
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"+
"if timeout <= tonumber(ARGV[4]) then "+
"redis.call('zrem', KEYS[3], firstThreadId2); "+
"redis.call('lpop', KEYS[2]); "+
"else "+
"break;"+
"end; "+
"end;"+
/**
* 第二部分:锁已经被释放,通知等待队列中第一个线程
* 场景:
* 1、成功获取锁线程重复调用释放锁的方法,第二次释放时,锁已不存在,就去通知等待队列中的第一个元素
* 2、又或者一个极端场景:当前线程未能成功获取锁,但是调用了释放锁的方法,并且刚好此时锁被释放
*/
"if (redis.call('exists', KEYS[1]) == 0) then " +
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; " +
"end;" +
/**
* 第三部分: 释放锁
*/
// 判断当前释放的锁是不是自己的,如果不是自己的,直接返回nil
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 通过hincrby更新hash表中锁的重入次数减一
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"end; " +
"redis.call('del', KEYS[1]); " +
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; ",
Arrays.asList(getRawName(), threadsQueueName, timeoutSetName, getChannelName()),
LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis());
}
Lua脚本第一部分:开启死循环,移除超时的那些线程
这一段跟公平锁加锁的第一部分一致,都是开启一个死循环,然后移除等待队列中超时的线程
主要流程
- 开启死循环;
- 利用 lindex 命令判断等待队列中第一个元素是否存在,如果存在,直接跳出循环。核心代码:lidex redisson_lock_queue:{myLock} 0
- 如果等待队列中第一个元素不为空(例如返回了LockName,即客户端UUID拼接线程ID),利用 zscore 在 超时记录集合(sorted set) 中获取对应的超时时间;核心代码:zscore redisson_lock_timeout:{myLock} UUID:threadId
- 如果超时时间已经小于当前时间,那么首先从超时集合中移除该节点,接着也在等待队列中弹出第一个节点;
- 核心代码:zrem redisson_lock_timeout:{myLock} UUID:threadId
- 核心代码:lpop redisson_lock_queue:
- 如果等待队列中的第一个元素还未超时,直接退出死循环
Lua脚本第二部分:锁已经被释放,通知等待队列中第一个线程
- 成功获取锁线程重复调用释放锁的方法,第二次释放时,锁已不存在,就去通知等待队列中的第一个元素
- 又或者一个极端场景:当前线程未能成功获取锁,但是调用了释放锁的方法,并且刚好此时锁被释放
主要流程
- 利用 exists 命令判断锁是否存在;核心代码:exists myLock
- 如果锁不存在,利用 lindex 获取等待队列中的第一个元素;核心代码:lindex redisson_lock_queue:{myLock} 0
- 如果等待列表中第一个元素不为空,即还存在等待线程,往等待线程的订阅channel发送消息,通知其可以尝试获取锁了;核心代码:publish redisson_lock__channel:{myLock}:UUID:threadId 0
注意,RedissonLock 所有等待线程都是订阅锁的同一个channel:redisson_lock__channel:{myLock},当有线程释放锁的时候,会往这个通道发送消息,此时所有等待现成都可以订阅消费这条消息,从而从等待状态中释放出来,重新尝试获取锁。
而 RedissonFairLock 不太一样,因为它要支持公平获取锁,即先到先得。所以每个等待线程订阅的都是不同的channel:redisson_lock__channel:{myLock}:UUID:threadId。当某个线程释放锁的时候,只会往等待队列中第一个线程对应订阅的channel发送消息
Lua脚本第二部分:释放锁,通知等待队列中第一个线程
主要流程
- 利用 hexists 命令判断加锁记录集合中,是否存在当前客户端当前线程,如果加锁记录不存在当前线程,返回nil;hexists myLock UUID:threadId,主要是兼容当前线程未能成功获取锁,但是调用了释放锁的方法
- 利用 hincrby 扣减当前线程的加锁次数;核心代码:hincrby myLock UUID:threadId -1
- 如果扣减后次数还是大于0,证明是重复获取锁,所以此时只需要重新刷新锁的过期时间,然后返回0;核心代码:expire myLock 30000
- 如果扣减后次数还是等于0,证明当前线程持有锁,并且只加锁一次,则利用 del 命令删除锁对应 redis key;核心代码:del myLock
- 往等待线程的订阅channel发送消息,通知等待队列中第一个线程可以尝试获取锁了;
- 利用 lindex 获取等待队列中的第一个线程,核心代码:lindex redisson_lock_queue:{myLock} 0
- 利用订阅channel给等待线程发送通知,核心代码:publish redisson_lock__channel:{myLock}:UUID:threadId 0
- 跟RedissonLock一致,释放锁之后,还需要执行取消看门狗定时任务等逻辑
四、redis分布式锁(三):联锁(MultiLock)
基本介绍
基于Redis的Redisson分布式联锁RedissonMultiLock对象可以将多个RLock对象关联为一个联锁,每个RLock对象实例可以来自于不同的Redisson实例
java
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
...
lock.unlock();
加锁逻辑分析
重点:遍历所有的锁,依次加锁
java
ListIterator iterator = this.locks.listIterator();
while(iterator.hasNext()) {
RLock lock = (RLock)iterator.next();
boolean lockAcquired;
try {
if (waitTime == -1L && leaseTime == -1L) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (Exception var21) {
lockAcquired = false;
}
if (lockAcquired) {
acquiredLocks.add(lock);
} else {
if (this.locks.size() - acquiredLocks.size() == this.failedLocksLimit()) {
break;
}
if (failedLocksLimit == 0) {
this.unlockInner(acquiredLocks);
if (waitTime == -1L && leaseTime == -1L) {
return false;
}
failedLocksLimit = this.failedLocksLimit();
acquiredLocks.clear();
while(iterator.hasPrevious()) {
iterator.previous();
}
} else {
--failedLocksLimit;
}
}
if (remainTime != -1L) {
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
if (remainTime <= 0L) {
this.unlockInner(acquiredLocks);
return false;
}
}
}
lock是底层的RedissonLock,加锁逻辑就和可重入锁加锁并无区别了。所以 Lua 脚本就不进行分析了。他没有使用lock.lock(),用的是tryLock(),指定了获取锁等待超时的时间,4500毫秒必须获取到这个锁,如果获取不到这个锁,就退出标记为获取锁失败
哪怕是获取到锁之后,这个锁在多长时间内会自动释放,因为你的newLeaseTime是-1,所以说如果获取到了锁,会启动一个lock watchdog不断的刷新你的锁key的生存时间为30000毫秒。加锁成功,则将成功的锁放进 acquiredLocks 集合中;加锁失败,需要判断 failedLocksLimit,因为这里是 0所以会直接对成功加锁集合 acquiredLocks 中的所有锁执行锁释放,同时清空成功集合,恢复迭代器
加锁失败释放锁逻辑代码
java
protected void unlockInner(Collection<RLock> locks) {
List<RFuture<Void>> futures = new ArrayList(locks.size());
Iterator var3 = locks.iterator();
while(var3.hasNext()) {
RLock lock = (RLock)var3.next();
futures.add(lock.unlockAsync());
}
var3 = futures.iterator();
while(var3.hasNext()) {
RFuture<Void> unlockFuture = (RFuture)var3.next();
unlockFuture.awaitUninterruptibly();
}
}
解析remainTime负数逻辑判断原因
经过了一个lock的获取,可能会消耗掉了比如说20毫秒,100毫秒,500毫秒,耗费了500毫秒,而我们的remainTime = 4500毫秒 - 500毫秒 = 4000毫秒,time = 当前时间
如果remainTime <= 0,意味着什么呢?获取锁的时间已经超过了4500毫秒了,迄今为止,你获取到这些所的时间,已经超过了预设的4500毫秒了,相当于是你获取多个锁的时间,最多不能超过4500毫秒
如果一旦获取各个锁的时间超过了4500毫秒,此时就会释放掉所有已经获取的锁,然后返回一个false,再次进入while true中的一个死循环,尝试走上述一模一样的流程。获取了三把锁,耗时了1000毫秒,此时remainTime还剩下3500毫秒,可以计算得到获取锁花费了多少秒
释放锁主要逻辑分析
java
public void unlock() {
List<RFuture<Void>> futures = new ArrayList(this.locks.size());
Iterator var2 = this.locks.iterator();
while(var2.hasNext()) {
RLock lock = (RLock)var2.next();
futures.add(lock.unlockAsync());
}
var2 = futures.iterator();
while(var2.hasNext()) {
RFuture<Void> future = (RFuture)var2.next();
future.syncUninterruptibly();
}
}
释放锁的话,就是依次调用所有的锁的释放的逻辑,lua脚本,同步等待所有的锁释放完毕,才会返回
默认的行为之下,你包裹了几把锁,就会锁数量 * 1500毫秒,获取所有的锁必须在多长时间之内就要结束,如果超时就会重新再次死循环尝试获取锁。使用的是各个锁的tryLock()方法,指定了说在获取每个单独的锁的时候,会有一个获取超时退出的时间
总体而言,就是将 key1、key2、key3 …… keyN 放到一个 List 集合中,然后迭代循环加锁,直到所有的都成功。解锁的时候就是再遍历锁进行释放锁
五、redis分布式锁(四):红锁(RedLock)
基本介绍
RedLock是基于redis实现的分布式锁,它能够保证以下特性: 互斥性:在任何时候,只能有一个客户端能够持有锁; 避免死锁:当客户端拿到锁后,即使发生了网络分区或者客户端宕机,也不会发生死锁;(利用key的存活时间) 容错性:只要多数节点的redis实例正常运行,就能够对外提供服务,加锁或者释放锁;而非redLock是无法满足互斥性的
RedLock算法
RedLock实现主要是RedLock算法,就是说假设有N个redis的master节点,这些节点是相互独立的(不需要主从或者其他协调的系统)。N推荐为奇数
客户端执行以下步骤获取一把锁:
- 获取当前时间戳,以毫秒为单位
- 使用相同的lockName和lockValue,尝试从N个节点获取锁
(在获取锁时,要求等待获取锁的时间远小于锁的释放时间,如锁的lease_time为10s,那么wait_time应该为5-50毫秒;避免因为redis实例挂掉,客户端需要等待更长的时间才能返回,即需要让客户端能够fast_fail;如果一个redis实例不可用,那么需要继续从下个redis实例获取锁)
- 当从N个节点获取锁结束后,如果客户端能够从多数节点(N/2 + 1)中成功获取锁,比如5个节点就要求是3个节点(n / 2 +1)。且获取锁的时间小于失效时间,那么可认为,客户端成功获得了锁。(获取锁的时间=当前时间戳 - 步骤1的时间戳)
- 客户端成功获得锁后,那么锁的实际有效时间 = 设置锁的有效时间 - 获取锁的时间
- 客户端获取锁失败后,N个节点的redis实例都会释放锁,即使未能加锁成功
java
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();
主要加锁逻辑分析
java
public class RedissonRedLock extends RedissonMultiLock {
public RedissonRedLock(RLock... locks) {
super(locks);
}
protected int failedLocksLimit() {
return this.locks.size() - this.minLocksAmount(this.locks);
}
protected int minLocksAmount(List<RLock> locks) {
return locks.size() / 2 + 1;
}
protected long calcLockWaitTime(long remainTime) {
return Math.max(remainTime / (long)this.locks.size(), 1L);
}
public void unlock() {
this.unlockInner(this.locks);
}
}
可以看到RedissonRedLock 完全是 RedissonMultiLock 的子类!只不过是重写 failedLocksLimit 方法。在 MultiLock 中,要所有的锁都锁成功才可以。在 RedLock 中,要一半以上的锁成功
重写代码逻辑分析
java
//比如你有3个lock,那么至少要成功加上3 / 2 + 1 = 2个lock
minLocksAmount(locks)里面就是用的大多数节点的一个算法,n / 2 + 1
//就是failedLocksLimit,默认是0,就是一个锁都不能容忍加失败了
//现在最新的是n - (n / 2 + 1),3个lock,最多可以容忍1个lock加锁失败
failedLocksLimit,locks.size() - minLocksAmount(locks) = 1
//这个东西算出来的时间,是说在对每个lock进行加锁的时候,有一个尝试获取锁超时的时间
//原来默认的就是remainTime,4500毫秒,4500毫秒 / 3 = 1500毫秒
//每个小lock获取锁超时的时间改成了1500毫秒
alcLockWaitTime
//对每个小lock尝试加锁的时候,能够容忍的最大超时时间,就是1500毫秒,1500毫秒之内必须加成功这个小锁,否则就是加锁失败
waitTime = 1500
之前讲解的MultiLock的话,只要你有任何一个锁加锁失败,此次加这个MultiLock就会标记为失败,再重来一次
但是现在的话呢,使用的是RedLock,faildLocksLimit(3个小lock的时候,这个值是1),可以容忍一个锁加锁失败的,此时就会将failedLockLimit--,从1变为了0,也就是说之前可以容忍一次lock加锁失败,若是继续失败,则不能再容忍加锁失败了
也就是说针对多个lock进行加锁,每个lock都有一个1500毫秒的加锁超时时间,如果在4500毫秒内,成功的对n / 2 + 1个lock加锁成功了,就可以认为这个RedLock加锁成功了,不要求所有的lock都加锁成功的
小结
RedLock是一个锁,只不过是在各个不同的master实例上进行加锁,但是现在说RedLock合并了多个小lock。也就是说,如果你有3个redis master实例,你就用lock1、lock2、lock3三个锁key,人家本来就是分布在3个不同的redis master实例上的
加这个RedLock,相当于是3个redis master实例上的key,有2个加成功了,就算这个RedLock加锁成功了 此时别人如果要来加锁,用一样的key,人家是无法成功加锁的,锁被你占用了,人家就会卡在那儿,死循环,不断的尝试加锁,直到你释放锁å
红锁其实也并不能解决根本问题,只是降低问题发生的概率。完全相互独立的redis,每一台至少也要保证高可用,还是会有主从节点。既然有主从节点,在持续的高并发下,master还是可能会宕机,从节点可能还没来得及同步锁的数据。很有可能多个主节点也发生这样的情况,那么问题还是回到一开始的问题,红锁只是降低了发生的概率
六、redis分布式锁(五):读写锁(ReadWriteLock)
基础介绍
基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态
java
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
当多个客户端同时加读锁,是不会互斥的,多个客户端可以同时加这个读锁,读锁和读锁是不互斥的。如果有人加了读锁,此时就不能加写锁,任何人都不能加写锁了,读锁和写锁是互斥的,如果有人加了写锁,此时任何人都不能加写锁了,写锁和写锁也是互斥的
读锁主要加锁逻辑
加锁源码基本和之前的可重入锁加锁无区别,唯一的差异就是在 Lua 脚本
java
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('set', KEYS[2] .. ':1', 1); " +
"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"local key = KEYS[2] .. ':' .. ind;" +
"redis.call('set', key, 1); " +
"redis.call('pexpire', key, ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end;" +
"return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)),
internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));
}
首次加锁处理分析
通过hget获取锁,当锁不存在,继续执行下面的逻辑:
- hset设置锁 anyRWLock 的 hash里kv值,key = mode ,value = read,表示这是个读锁
- hset设置锁 anyRWLock 的 hash里kv值,key = e70b1307-9ddd-43de-ac9d-9c42b5c99a0d:1,value= 1
- set设置锁,key字符串: {anyRWLock}:e70b1307-9ddd-43de-ac9d-9c42b5c99a0d:1:rwlock_timeout:1 ,value = 1
- pexpire设置两个 RedisKey 的过期时间
同时不要忘了,我们还有一个watchdog看门狗,会每隔10秒钟去执行一段lua脚本,判断一下当前这个线程是否还持有着这个锁,如果还持有锁,更新一下锁key的生存时间为30000毫秒,保持redis的锁key和java代码中持有的锁是保持同步的