Appearance
一、可重入锁(Reentrant Lock)介绍
基于Redis的Redisson分布式可重入锁RLock,Java对象实现了java.util.concurrent.locks.Lock接口。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口,常见使用案例如下:
java
RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法
lock.lock();
Redisson同时还为分布式锁提供了异步执行的相关方法:
java
RLock lock = redisson.getLock("anyLock");
lock.lockAsync();
lock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);
RLock对象完全符合Java的Lock规范。也就是说只有拥有锁的进程才能解锁,其他进程解锁则会抛出IllegalMonitorStateException错误。但是如果遇到需要其他进程也能解锁的情况,则需要使用分布式信号量Semaphore 对象
二、大白话理解RedissonLock
分布式系统里面,如果多个机器上的服务要同时对一个共享资源(比如说修改数据库里的一份数据),此时的话,某台机器就需要先获取一个针对那个资源(数据库里的某一行数据)的分布式锁
获取到了分布式锁之后,就可以任由你查询那条数据,修改那条数据,在这个期间,没有任何其他的客户端可以来修改这条数据,获取了一个分布式锁之后,就对某个共享的数据获取了一定时间范围内的独享的操作
其他的客户端如果同时要修改那条数据,尝试去获取分布式锁,就会被卡住,他需要等待第一个客户端先操作完了之后释放锁
而在redisson在客户端里实现了一个看门狗,watchdog,主要是监控持有一把锁的客户端是否还存活着,如果还存活着,那么看门狗会不断的延长这个锁的过期时间
可以指定一个leaseTime,你获取了一把锁之后,可能你在锁定的期间,执行的操作特别的耗时,可能长达10分钟,1个小时。你就可以在获取锁的时候指定一个leaseTime,比如说,指定好,如果我自己1分钟之内没释放这把锁,redisson自动释放这把锁,让别的客户端可以获取锁来做一些操作。
java
// Acquire lock and release it automatically after 10 seconds
// if unlock method hasn't been invoked
lock.lock(10, TimeUnit.SECONDS);
// Wait for 100 seconds and automatically unlock it after 10 seconds
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
lock.unlock();
客户端A已经获取了一把锁,此时客户端B尝试去获取这把锁,默认情况下是无限制的等待,但是这里你在获取锁的时候是可以指定一个时间的,最多等待100秒的时间
如果获取不到锁直接就返回,boolean res,这个res如果是false就代表你加锁失败了,在指定时间范围内,没有获取到锁 如果获取到了锁之后,在10秒之内,没有手动释放锁,那么就自动释放锁
java
RLock lock = redisson.getLock("anyLock");
lock.lockAsync();
lock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);
如果是lock.lock()方法,是属于同步加锁,在这些代码执行的期间,如果等待锁什么的,都会被阻塞住,lock.lockAsync(),异步加锁,用了其他的线程去进行加锁,不会阻塞你当前主线程的执Future<Boolean> res
,不断的去查询这个feture对象的一些状态,看看异步加锁是否成功
你用哪个线程去加一把分布式锁,就必须用那个线程来对分布式锁进行释放,否则如果用不同的线程,会导致IllegalMonitorStateException
三、加锁小结
基本都是基于 lua 脚本来完成的 因为分布式锁肯定是具有比较复杂的判断逻辑,而lua脚本可以保证复杂判断和复杂操作的原子性,加锁主要步骤如下:
- 处理加锁Lua脚本
- 根据Key计算出slot,找到执行Lua脚本的节点
- 若是没有传入过期时间,默认启用看门狗续命持有锁
主要的加锁主逻辑源码
java
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(
this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
public void operationComplete(Future<Long> future) throws Exception {
if (future.isSuccess()) {
Long ttlRemaining = (Long)future.getNow();
if (ttlRemaining == null) {
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
return ttlRemainingFuture;
}
}
主要的加锁动作,执行Lua脚本
java
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.commandExecutor.evalWriteAsync(
this.getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);"
Collections.singletonList(this.getName()),
new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
- 第一个IF 判断一下,如果“anyLock”这个key不存在,那么就进行加锁
- 设置一个key value,相当于有一个anyLock的map结构,同时在map里面放着一对kv
- pexpire KEYS[1] ARGV[1],其实就是设置一个key的过期时间
- 第二个IF 判断KEYS[1](anyLock)这个名字的一个map,里面是否存在一个ARGV[2]的一个key
- 如果是存在的话,hincrby KEYS[1] ARGV[2] 1,将anyLock这个map中的这个key的值累加1
- pexpire KEYS[1] ARGV[1],其实就是重新设置key的过期时间
- pttl指令,就是返回anyLock这个key当前还剩下的一个有效的存活期
根据锁key计算出 slot,一个slot对应的是redis集群的一个节点
需要先找到当前锁key需要存放到哪个slot,即在集群中哪个节点进行操作,后续不同客户端或不同线程再使用这个锁key进行上锁,也需要到对应的节点的slot中进行加锁操作
java
@Override
public <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
// 根据锁key找到对应的redis节点
NodeSource source = getNodeSource(key);
return evalAsync(source, false, codec, evalCommandType, script, keys, params);
}
private NodeSource getNodeSource(String key) {
// 计算锁key对应的slot
int slot = connectionManager.calcSlot(key);
return new NodeSource(slot);
}
public int calcSlot(String key) {
if (key == null) {
return 0;
}
int start = key.indexOf('{');
if (start != -1) {
int end = key.indexOf('}');
key = key.substring(start+1, end);
}
// 使用 CRC16 算法来计算 slot,其中 MAX_SLOT 就是 16384,redis集群规定最多有 16384 个slot。
int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;
log.debug("slot {} for {}", result, key);
return result;
}
watchdog看门狗监听延长锁
加锁完成后给RFuture加了一个监听器,也就是说只要这个lua脚本执行完成,返回了pttl anyLock那个指令返回的一个剩余存活的时间之后,这个RFuture的监听器就会被触发执行的
java
RFuture<Long> ttlRemainingFuture =
this.tryLockInnerAsync(
this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(
new FutureListener<Long>() {
public void operationComplete(Future<Long> future) throws Exception {
if (future.isSuccess()) {
Long ttlRemaining = (Long)future.getNow();
if (ttlRemaining == null) {
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
创建了一个定时任务task,也就是10秒左右,当成功加锁之后开启定时调度的任务,初次执行是在10秒以后,这个定时任务会存储在一个ConcurrentHashMap对象expirationRenewalMap中,如果发现expirationRenewalMap中不存在对应当前线程key的话,定时任务就不会跑,这也是后面解锁中的一步重要操作
java
private void scheduleExpirationRenewal(final long threadId) {
if (!expirationRenewalMap.containsKey(this.getEntryName())) {
Timeout task =
this.commandExecutor.getConnectionManager()
.newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future =
RedissonLock.this.renewExpirationAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
public void operationComplete(Future<Boolean> future) throws Exception {
RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
if (!future.isSuccess()) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause());
} else {
if ((Boolean)future.getNow()) {
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(this.getEntryName(), new RedissonLock.ExpirationEntry(threadId, task)) != null) {
task.cancel();
}
}
}
//RFuture<Boolean> future = renewExpirationAsync(threadId);
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(
this.getName(), LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;"
)
}
加锁操作流程图
其他的线程或者是其他客户端也加锁处理情况
如果说客户端A已经上锁了,还持有着这把锁,此时客户端B尝试加锁,此时就会直接执行pttl anyLock指令,返回这个key剩余的一个存活时间 如果是第一次获取锁的时候就会获取结果Null,ttl一定是null;如果是一个线程多次加锁,可重入锁的概念,此时ttl也一定是null,lua脚本里返回的就是nil;但是如果加锁没成功,锁被其他机器占用了,执行lua脚本直接获取到的是这个key对应的剩余时间
java
if (ttl != null) {
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
this.commandExecutor.syncSubscription(future);
try {
while(true) {
ttl = this.tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {
return;
}
if (ttl >= 0L) {
this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
this.getEntry(threadId).getLatch().acquire();
}
}
} finally {
this.unsubscribe(future, threadId);
}
}
如果加锁不成功,直接会进入while(true)就是一个死循环内,在死循环内,再次执行尝试去获取这个分布式的锁,如果获取到了锁,证明ttl是null,此时就会退出死循环,如果ttl大于等于0,说明其他的客户端还是占据着这把锁。如果没有获取到一把分布式锁,可能就是等待那个ttl指定的时间,再次去尝试获取那把锁
四、释放锁小结
基本都是基于 lua 脚本来完成的 因为分布式锁肯定是具有比较复杂的判断逻辑,而lua脚本可以保证复杂判断和复杂操作的原子性,解锁主要步骤如下:
- 处理解锁Lua脚本
- 根据Key计算出slot,找到执行Lua脚本的节点
- 解锁成功,取消看门狗任务
主要的解锁主逻辑源码
java
public void unlock() {
try {
this.get(this.unlockAsync(Thread.currentThread().getId()));
} catch (RedisException var2) {
if (var2.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException)var2.getCause();
} else {
throw var2;
}
}
}
主要的解锁动作,执行Lua脚本
java
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(
this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.asList(this.getName(), this.getChannelName()),
new Object[]{
LockPubSub.unlockMessage,
this.internalLockLeaseTime,
this.getLockName(threadId)
}
);
}
- 第一IF 如果anyLock这个key不存在,则进行订阅通知
- publish 告诉其它订阅了这把锁的线程,我已经释放锁了,可以过来获取
- 释放锁成功,返回1
- 第二IF 判断一下当前这个锁key对应的hash数据结构中,判断锁是不是自己的
- 如果不是自己的锁,说明非法释放锁,返回nil
取消watchdog任务
当线程完全释放锁后,就会调用cancelExpirationRenewal()方法取消"看门狗"的续时线程,引用expirationRenewalMap中进行取消
java
void cancelExpirationRenewal(Long threadId) {
RedissonLock.ExpirationEntry task = (RedissonLock.ExpirationEntry)expirationRenewalMap.get(this.getEntryName());
if (task != null && (threadId == null || task.getThreadId() == threadId)) {
expirationRenewalMap.remove(this.getEntryName());
task.getTimeout().cancel();
}
}
释放锁流程图
五、可重入锁的特征总结
加锁,释放锁,以及lock的方式进行枷锁,tryLock的方式进行加锁,可以接下来做一个总结: (1)加锁:在redis里设置hash数据结构,生存周期是30000毫秒 (2)维持加锁:代码里一直加锁,redis里的key会一直保持存活,后台每隔10秒的定时任务(watchdog)不断的检查,只要客户端还在加锁,就刷新key的生存周期为30000毫秒 (3)可重入锁:同一个线程可以多次加锁,就是在hash数据结构中将加锁次数累加1 (4)锁互斥:不同客户端,或者不同线程,尝试加锁陷入死循环等待 (5)手动释放锁:可重入锁自动递减加锁次数,全部释放锁之后删除锁key (6)宕机自动释放锁:如果持有锁的客户端宕机了,那么此时后台的watchdog定时调度任务也没了,不会刷新锁key的生存周期,此时redis里的锁key会自动释放 (7)尝试加锁超时:在指定时间内没有成功加锁就自动退出死循环,标识本次尝试加锁失败 (8)超时锁自动释放:获取锁之后,在一定时间内没有手动释放锁,则redis里的key自动过期,自动释放锁
六、结合可重入锁来进行分布式锁隐患问题
redis加锁,本质还是在redis集群中挑选一个master实例来加锁,master -> slave,实现了高可用的机制,如果master宕机,slave会自动切换为master
假设客户端刚刚在master写入一个锁,此时发生了master的宕机,但是master还没来得及将那个锁key异步同步到slave,slave就切换成了新的master。此时别的客户端在新的master上也尝试获取同一个锁,会成功获取锁
此时两个客户端,都会获取同一把分布式锁,可能有的时候就会导致一些数据的问题,redisson的分布式锁,隐患主要就是在这里
七、可重入锁细节解析文章
04_redis分布式锁(一):可重入锁源码剖析之使用场景介绍
05_redis分布式锁(二):可重入锁源码剖析之lua脚本加锁逻辑
06_redis分布式锁(三):可重入锁源码剖析之watchdog维持加锁
07_redis分布式锁(四):可重入锁源码剖析之可重入加锁