Redlock 算法和底层源码分析

Redlock 算法和底层源码分析

面试考点

  • 按照 JUC 里面 java.util.concurrent.locks.Lock 接口规范编写

  • lock() 加锁关键逻辑

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        if(time != -1L) {
            this.expireTime = unit.toSeconds(time);
        }
        String script =
                "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " +
                        "redis.call('hincrby',KEYS[1],ARGV[1],1) " +
                        "redis.call('expire',KEYS[1],ARGV[2]) " +
                        "return 1 " +
                "else " +
                        "return 0 " +
                "end";
        while (!stringRedisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime))) {
            TimeUnit.MILLISECONDS.sleep(50);
        }
        renewExpire();
        return true;
    }

    思路:

    • 加锁的 Lua 脚本,通过 Redis 里面的 Hash 数据结构,加锁和可重入性都要保证
    • 加锁不成,需要 while 进行重试并自旋
    • 自动续期

    功能:

    • 加锁:加锁实际就是在 Redis 中,给 Key 键设置一个值,为避免死锁,并给定一个过期时间。
    • 自旋
    • 续期
  • unlock() 解锁关键逻辑

    @Override
    public void unlock() {
        String script =
                "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then " +
                "   return nil " +
                "elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " +
                "   return redis.call('del',KEYS[1]) " +
                "else " +
                "   return 0 " +
                "end";
        // nil = false 1 = true 0 = false
        Long flag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime));
        if(flag == null) {
            throw new RuntimeException("This lock doesn't EXIST");
        }
     
    }

    思路

    • 考虑可重入性的递减,加锁几次就要减锁几次;
    • 最后到 0 了,直接 del 删除。

    功能:

    • 将 Key 键删除。但也不能乱删,不能说客户端 1 的请求将客户端 2 的锁给删除掉,只能自己删除自己的锁。

Redlock 红锁算法

官网说明

https://redis.io/docs/manual/patterns/distributed-locks/ (opens in a new tab)

Distributed Locks with Redis

A distributed lock pattern with Redis

Distributed locks are a very useful primitive in many environments where different processes must operate with shared resources in a mutually exclusive way.

There are a number of libraries and blog posts describing how to implement a DLM (Distributed Lock Manager) with Redis, but every library uses a different approach, and many use a simple approach with lower guarantees compared to what can be achieved with slightly more complex designs.

This page describes a more canonical algorithm to implement distributed locks with Redis. We propose an algorithm, called Redlock, which implements a DLM which we believe to be safer than the vanilla single instance approach. We hope that the community will analyze it, provide feedback, and use it as a starting point for the implementations or more complex or alternative designs.

为什么学习这个?怎么产生的?

上一章节写的分布式锁的缺点

Why Failover-based Implementations Are Not Enough

To understand what we want to improve, let’s analyze the current state of affairs with most Redis-based distributed lock libraries.

The simplest way to use Redis to lock a resource is to create a key in an instance. The key is usually created with a limited time to live, using the Redis expires feature, so that eventually it will get released (property 2 in our list). When the client needs to release the resource, it deletes the key.

Superficially this works well, but there is a problem: this is a single point of failure in our architecture. What happens if the Redis master goes down? Well, let’s add a replica! And use it if the master is unavailable. This is unfortunately not viable. By doing so we can’t implement our safety property of mutual exclusion, because Redis replication is asynchronous.

There is a race condition with this model:

  1. Client A acquires the lock in the master.
  2. The master crashes before the write to the key is transmitted to the replica.
  3. The replica gets promoted to master.
  4. Client B acquires the lock to the same resource A already holds a lock for. SAFETY VIOLATION!

Redis-手写-redis-分布式锁潜在问题

  • 客户 A 通过 Redis 的 set 命令建分布式锁成功并持有锁;
  • 正常情况主从机都有这分布式锁;
  • 突然出故障了,master 还没来得同步数据给 slave,此时 slave 机器上没有对应锁的信息;
  • 从机 slave 上位,变成新的 master;
  • 客户 B 照样可以同样的建锁成功,出现一锁被多建多用
CAP 里面的 CP 遭到了破坏,而且 Redis 无论单机、主从、哨兵和主从均有这样风险

线程 1 首先获取锁成功,将键值对写入 Redis 的 master 节点,在 Redis 将该键值对同步到 slave 节点之前,master 发生了故障;Redis 触发故障转移,其中一个 slave 升级为新的 master,此时新上位的 master 并不包含线程 1 写入的键值对,因此线程 2 尝试获取锁也可以成功拿到锁,此时相当于有两个线程获取到了锁,可能会导致各种预期之外的情况发生,例如最常见的脏数据。

我们加的是排它独占锁,同一时间只能有一个建 Redis 锁成功并持有锁,严禁出现 2 个以上的请求线程拿到锁。

Redlock 算法设计理念

Redis 之父提出了 Redlock 算法解决这个问题

Redis 也提供了 Redlock 算法,用来实现基于多个实例的分布式锁。锁变量由多个实例维护,即使有实例发生了故障,锁变量仍然是存在的,客户端还是可以完成锁操作。Redlock 算法是实现高可靠分布式锁的一种有效解决方案,可以在实际开发中使用。

The Redlock Algorithm

In the distributed version of the algorithm we assume we have N Redis masters. Those nodes are totally independent, so we don’t use replication or any other implicit coordination system. We already described how to acquire and release the lock safely in a single instance. We take for granted that the algorithm will use this method to acquire and release the lock in a single instance. In our examples we set N=5, which is a reasonable value, so we need to run 5 Redis masters on different computers or virtual machines in order to ensure that they’ll fail in a mostly independent way.

In order to acquire the lock, the client performs the following operations:

  1. It gets the current time in milliseconds.
  2. It tries to acquire the lock in all the N instances sequentially, using the same key name and random value in all the instances. During step 2, when setting the lock in each instance, the client uses a timeout which is small compared to the total lock auto-release time in order to acquire it. For example if the auto-release time is 10 seconds, the timeout could be in the ~ 5-50 milliseconds range. This prevents the client from remaining blocked for a long time trying to talk with a Redis node which is down: if an instance is not available, we should try to talk with the next instance ASAP.
  3. The client computes how much time elapsed in order to acquire the lock, by subtracting from the current time the timestamp obtained in step 1. If and only if the client was able to acquire the lock in the majority of the instances (at least 3), and the total time elapsed to acquire the lock is less than lock validity time, the lock is considered to be acquired.
  4. If the lock was acquired, its validity time is considered to be the initial validity time minus the time elapsed, as computed in step 3.
  5. If the client failed to acquire the lock for some reason (either it was not able to lock N/2+1 instances or the validity time is negative), it will try to unlock all the instances (even the instances it believed it was not able to lock).

设计理念

该方案也是基于(set 加锁、Lua 脚本解锁)进行改良的,所以 Redis 之父 antirez 只描述了差异的地方,大致方案如下:

假设我们有 N 个 Redis 主节点,例如 N = 5 这些节点是完全独立的,我们不使用复制或任何其他隐式协调系统,为了取到锁客户端执行以下操作:

步骤详细过程
1获取当前时间,以毫秒为单位;
2依次尝试从 5 个实例,使用相同的 key 和随机值(例如 UUID)获取锁。当向 Redis 请求获取锁时,客户端应该设置一个超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为 10 秒,则超时时间应该在 5-50 毫秒之间。这样可以防止客户端在试图与一个宕机的 Redis 节点对话时长时间处于阻塞状态。如果一个实例不可用,客户端应该尽快尝试去另外一个 Redis 实例请求获取锁;
3客户端通过当前时间减去步骤 1 记录的时间来计算获取锁使用的时间。当且仅当从大多数(N/2+1,这里是 3 个节点)的 Redis 节点都取到锁,并且获取锁使用的时间小于锁失效时间时,锁才算获取成功;
4如果取到了锁,其真正有效时间等于初始有效时间减去获取锁所使用的时间(步骤 3 计算的结果);
5如果由于某些原因未能获得锁(无法在至少 N/2 + 1 个 Redis 实例获取锁、或获取锁的时间超过了有效时间),客户端应该在所有的 Redis 实例上进行解锁(即便某些 Redis 实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。

该方案为了解决数据不一致的问题,直接舍弃了异步复制只使用 master 节点,同时由于舍弃了 slave,为了保证可用性,引入了 N 个节点,官方建议 N = 5。

客户端只有在满足下面的这两个条件时,才能认为是加锁成功。
  • 条件1:客户端从超过半数(大于等于 N/2+1)的 Redis 实例上成功获取到了锁;
  • 条件2:客户端获取锁的总耗时没有超过锁的有效时间。

解决方案

使用容错公式

redis 只支持 AP,为了解决 CP 的风险,采用 N 个节点,N 为基数,N 个 master 各自完全独立,不是主从或者集群。

为什么 N 是奇数?N = 2X + 1(N 是最终部署机器数,X 是容错机器数)

1 先知道什么是容错

  • 失败了多少个机器实例后我还是可以容忍的,所谓的容忍就是数据一致性还是可以 OK 的,CP 数据一致性还是可以满足
  • 加入在集群环境中,redis 失败 1 台,可接受。2X+1 = 2 * 1+1 =3,部署 3 台,死了 1 个剩下 2 个可以正常工作,那就部署 3 台。
  • 加入在集群环境中,redis 失败 2 台,可接受。2X+1 = 2 * 2+1 =5,部署 5 台,死了 2 个剩下 3 个可以正常工作,那就部署 5 台。

2 为什么是奇数?

  • 最少的机器,最多的产出效果
  • 加入在集群环境中,redis 失败 1 台,可接受。2N+2= 2 * 1+2 =4,部署 4 台。
  • 加入在集群环境中,redis 失败 2 台,可接受。2N+2 = 2 * 2+2 =6,部署 6 台。

落地实现(Redisson)

使用 Redisson 改造手写 Redis 分布式锁

使用方法

https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers (opens in a new tab)

V9 版修改

<!--redisson-->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.4</version>
</dependency>
RedisConfig.java
package com.atguigu.redislock.config;
 
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
 
@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
        RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(lettuceConnectionFactory);
        //设置key序列化方式string
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        //设置value的序列化方式json
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
 
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
 
        redisTemplate.afterPropertiesSet();
 
        return redisTemplate;
    }
 
    //单Redis节点模式
    @Bean
    public Redisson redisson() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.111.175:6379").setDatabase(0).setPassword("111111");
        return (Redisson) Redisson.create(config);
    }
}
InventoryController.java
package com.atguigu.redislock.controller;
 
import com.atguigu.redislock.service.InventoryService;
import com.atguigu.redislock.service.InventoryService2;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
 
@RestController
@Api(tags = "redis分布式锁测试")
public class InventoryController {
    @Autowired
    private InventoryService inventoryService;
 
    @ApiOperation("扣减库存,一次卖一个")
    @GetMapping(value = "/inventory/sale")
    public String sale() {
        return inventoryService.sale();
    }
 
    @ApiOperation("扣减库存saleByRedisson,一次卖一个")
    @GetMapping(value = "/inventory/saleByRedisson")
    public String saleByRedisson() {
        return inventoryService.saleByRedisson();
    }
}
InventoryService2.java
package com.atguigu.redislock.service;
 
import cn.hutool.core.util.IdUtil;
import com.atguigu.redislock.mylock.DistributedLockFactory;
import com.atguigu.redislock.mylock.RedisDistributedLock;
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
 
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
 
@Service
@Slf4j
public class InventoryService2 {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Value("${server.port}")
    private String port;
    @Autowired
    private DistributedLockFactory distributedLockFactory;
 
    @Autowired
    private Redisson redisson;
    public String saleByRedisson() {
        String retMessage = "";
        String key = "RedisLock";
        RLock redissonLock = redisson.getLock(key);
        redissonLock.lock();
        try {
            //1 查询库存信息
            String result = stringRedisTemplate.opsForValue().get("inventory001");
            //2 判断库存是否足够
            Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
            //3 扣减库存
            if (inventoryNumber > 0) {
                stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventoryNumber));
                retMessage = "成功卖出一个商品,库存剩余: "+inventoryNumber;
                System.out.println(retMessage);
            } else {
                retMessage = "商品卖完了,o(╥﹏╥)o";
            }
        } finally {
          redissonLock.unlock();
        }
        return retMessage+"\t"+"服务端口号:"+port;
    }
}

测试

单机

成功

JMeter
Bug
java.lang.IllegalMonitorStateException Create breakpoint: attempt tp unlock lock, not locked by current thread by node id: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx thread-id: 105
解决方法 V9.1 版
InventoryService.java
package com.atguigu.redislock.service;
 
import cn.hutool.core.util.IdUtil;
import com.atguigu.redislock.mylock.DistributedLockFactory;
import com.atguigu.redislock.mylock.RedisDistributedLock;
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
 
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
 
@Service
@Slf4j
public class InventoryService {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Value("${server.port}")
    private String port;
    @Autowired
    private DistributedLockFactory distributedLockFactory;
 
    @Autowired
    private Redisson redisson;
    public String saleByRedisson() {
        String retMessage = "";
        String key = "RedisLock";
        RLock redissonLock = redisson.getLock(key);
        redissonLock.lock();
        try {
            //1 查询库存信息
            String result = stringRedisTemplate.opsForValue().get("inventory001");
            //2 判断库存是否足够
            Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
            //3 扣减库存
            if (inventoryNumber > 0) {
                stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventoryNumber));
                retMessage = "成功卖出一个商品,库存剩余: "+inventoryNumber;
                System.out.println(retMessage);
            } else {
                retMessage = "商品卖完了,o(╥﹏╥)o";
            }
        } finally {
            if (redissonLock.isLocked() && redissonLock.isHeldByCurrentThread()) {
                redissonLock.unlock();
            }
        }
        return retMessage+"\t"+"服务端口号:"+port;
    }
}

Redisson 源码解析

  • 加锁
  • 可重入
  • 自动续期
  • 解锁

分析步骤

守护线程“续命”

额外起一个线程,定期检查线程是否还持有锁,如果有则延长过期时间。

Redisson 里面就实现了这个方案,使用“看门狗”定期检查(每 1/3 的锁时间检查 1 次),如果线程还持有锁,则刷新过期时间。

WatchDog

在获取锁成功后,给锁加一个 watchdog,watchdog 会起一个定时任务,在锁没有被释放且快要过期的时候会续期。

https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers (opens in a new tab)

If Redisson instance which acquired lock crashes then such lock could hang forever in acquired state. To avoid this Redisson maintains lock watchdog, it prolongs lock expiration while lock holder Redisson instance is alive. By default lock watchdog timeout is 30 seconds and can be changed through Config.lockWatchdogTimeout (opens in a new tab) setting.

源码分析 1

通过 Redisson 新建出来的锁 Key,默认是 30 秒

RedissonLock.java 文件中 this.internalLockLeaseTime 对应的 getLockWatchdogTimeout() 所获取的 Redisson 默认的 Config.java 类中 lockWatchdogTimeout 变量 30 秒。

// 构造函数
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    super(commandExecutor, name);
    this.commandExecutor = commandExecutor;
    this.internalLockLeaseTime = getServiceManager().getCfg().getLockWatchdogTimeout();
    this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
 
@Override
public void lock() {
    try {
        lock(-1, null, false);
    } catch (InterruptedException e) {
        throw new IllegalStateException();
    }
}
public long getLockWatchdogTimeout() {
    return lockWatchdogTimeout;
}
private long lockWatchdogTimeout = 30 * 1000;

源码分析 2

RedissonLock.java 中,锁的调用顺序 lock() → tryAcquire() → tryAcquireAsync()

private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RFuture<Long> ttlRemainingFuture;
        if (leaseTime > 0) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
    ttlRemainingFuture = new CompletableFutureWrapper<>(s);
 
    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
        // lock acquired
        if (ttlRemaining == null) {
            if (leaseTime > 0) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                scheduleExpirationRenewal(threadId);
            }
        }
        return ttlRemaining;
    });
    return new CompletableFutureWrapper<>(f);
}

加锁的逻辑会进入到 org.redisson.RedissonLock#tryAcquireAsync 中,在获取锁成功后,会进入 scheduleExpirationRenewal

源码分析 3

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command,
            "if ((redis.call('exists', KEYS[1]) == 0) " +
                        "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                "end; " +
                "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

流程解释

  • 通过 exists 判断,如果锁不存在,则设置值和过期时间,加锁成功;
  • 通过 hexists 判断,如果锁已存在,并且锁的是当前线程,则证明是重入锁,加锁成功;
  • 如果锁已经存在,但锁的不是当前线程,则证明有其他线程持有锁。返回当前锁的过期时间(代表了锁 key 的剩余生存时间),加锁失败。

源码分析 4

protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        try {
            renewExpiration();
        } finally {
            if (Thread.currentThread().isInterrupted()) {
                cancelExpirationRenewal(threadId);
            }
        }
    }
}

这里面初始化一个定时器,delay 的时间是 internalLockLeaseTime / 3 。在 Redisson 中,internalLockLeaseTime30s ,也就是每隔 10s 续期一次,每次 30s。

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
 
    Timeout task = getServiceManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
 
            CompletionStage<Boolean> future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock {} expiration", getRawName(), e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
 
                if (res) {
                    // reschedule itself
                    renewExpiration();
                } else {
                    cancelExpirationRenewal(null);
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
 
    ee.setTimeout(task);
}
  • Watch dog 自动延期机制

    客户端 A 加锁成功,就会启动一个 watch dog 看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端 A 还持有锁 Key,那么就会不断的延长锁 Key 的生存时间,默认每次续命又从 30 秒新开始

  • 自动续期 Lua 脚本分析

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command,
                "if ((redis.call('exists', KEYS[1]) == 0) " +
                            "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

解锁

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
          "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call(ARGV[4], KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end; " +
                "return nil;",
            Arrays.asList(getRawName(), getChannelName()),
            LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), getSubscribeService().getPublishCommand());
}
  • 如果释放锁的线程和已存在锁的线程不是同一个线程,返回 null
  • 通过 hincrby 递减 1,先释放一次锁。若剩余次数还大于 0,则证明当前锁是重入锁,刷新过期时间;
  • 若剩余次数小于 0,删除 Key 并发布锁释放的消息,解锁成功。

多机案例

理论参考来源

官网

The Redlock Algorithm

In the distributed version of the algorithm we assume we have N Redis masters. Those nodes are totally independent, so we don’t use replication or any other implicit coordination system. We already described how to acquire and release the lock safely in a single instance. We take for granted that the algorithm will use this method to acquire and release the lock in a single instance. In our examples we set N=5, which is a reasonable value, so we need to run 5 Redis masters on different computers or virtual machines in order to ensure that they’ll fail in a mostly independent way.

In order to acquire the lock, the client performs the following operations:

  1. It gets the current time in milliseconds.
  2. It tries to acquire the lock in all the N instances sequentially, using the same key name and random value in all the instances. During step 2, when setting the lock in each instance, the client uses a timeout which is small compared to the total lock auto-release time in order to acquire it. For example if the auto-release time is 10 seconds, the timeout could be in the ~ 5-50 milliseconds range. This prevents the client from remaining blocked for a long time trying to talk with a Redis node which is down: if an instance is not available, we should try to talk with the next instance ASAP.
  3. The client computes how much time elapsed in order to acquire the lock, by subtracting from the current time the timestamp obtained in step 1. If and only if the client was able to acquire the lock in the majority of the instances (at least 3), and the total time elapsed to acquire the lock is less than lock validity time, the lock is considered to be acquired.
  4. If the lock was acquired, its validity time is considered to be the initial validity time minus the time elapsed, as computed in step 3.
  5. If the client failed to acquire the lock for some reason (either it was not able to lock N/2+1 instances or the validity time is negative), it will try to unlock all the instances (even the instances it believed it was not able to lock).

总结

这个锁的算法实现了多 redis 实例的情况,相对于单 redis 节点来说,优点在于防止了单节点故障造成整个服务停止运行的情况且在多节点中锁的设计,及多节点同时崩溃等各种意外情况有自己独特的设计方法。

Redisson 分布式锁支持 MultiLock 机制可以将多个锁合并为一个大锁,对一个大锁进行统一的申请加锁以及释放锁。

最低保证分布式锁的有效性及安全性的要求如下:
  • 1.互斥;任何时刻只能有一个 Client 获取锁

  • 2.释放死锁;即使锁定资源的服务崩溃或者分区,仍然能释放锁

  • 3.容错性;只要多数 Redis 节点(一半以上)在使用,client 就可以获取和释放锁

网上讲的基于故障转移实现的 Redis 主从无法真正实现 Redlock:

因为 Redis 在进行主从复制时是异步完成的,比如在 Client A 获取锁后,主 Redis 复制数据到从 Redis 过程中崩溃了,导致没有复制到从 Redis 中,然后从 Redis 选举出一个升级为主 Redis,造成新的主 Redis 没有 Client A 设置的锁,这是 Client B 尝试获取锁,并且能够成功获取锁,导致互斥失效。

代码参考来源

https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers (opens in a new tab)

❌2022 年第 8 章第 4 小节

8.4. 红锁(RedLock)

基于 Redis 的 Redisson 红锁 RedissonRedLock 对象实现了 Redlock 介绍的加锁算法。该对象也可以用来将多个 RLock 对象关联为一个红锁,每个 RLock 对象实例可以来自不同的 Redisson 实例。

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance1.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
 
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功
lock.lock();
...
lock.unlock();

大家都知道,如果负责存储某些分布式锁的某些 Redis 节点宕机以后,而且这些锁正好处于锁住的状态时,这些锁会出现锁死的状态。为了避免这种情况的发生,Redisson 内部提供了一个监控锁的看门狗,它的作用是在 Redisson 实例被关闭前,不断地延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是 30 秒钟,也可以通过修改 Config.lockWatchdogTimeout 来另行指定。

另外 Redisson 还通过加锁的方法提供了 leaseTime 的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 给 lock1,lock2,lock3 加锁,如果没有手动解开的话,10 秒钟后将会自动解开
lock.lock(10, TimeUnit.SECONDS);
 
// 为加锁等待 100 秒时间,并在加锁成功 10 秒钟后自动解开
boolen res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();

❌2023 年第 8 章第 4 小节

8.4. RedLock

This object is deprecated. Use RLock (opens in a new tab) or RFencedLock (opens in a new tab) instead.

2023 年第 8 章第 3 小节

8.3. MultiLock

Redis based distributed MultiLock object allows to group Lock (opens in a new tab) objects and handle them as a single lock. Each RLock object may belong to different Redisson instances.

If Redisson instance which acquired MultiLock crashes then such MultiLock could hang forever in acquired state. To avoid this Redisson maintains lock watchdog, it prolongs lock expiration while lock holder Redisson instance is alive. By default lock watchdog timeout is 30 seconds and can be changed through Config.lockWatchdogTimeout (opens in a new tab) setting.

leaseTime parameter during lock acquisition can be defined. After specified time interval locked lock will be released automatically.

MultiLock object behaves according to the Java Lock specification. It means only lock owner thread can unlock it otherwise IllegalMonitorStateException would be thrown. Otherwise consider to use RSemaphore (opens in a new tab) object.

Code example:

RLock lock1 = redisson1.getLock("lock1");
RLock lock2 = redisson2.getLock("lock2");
RLock lock3 = redisson3.getLock("lock3");
 
RLock multiLock = anyRedisson.getMultiLock(lock1, lock2, lock3);
 
// traditional lock method
multiLock.lock();
 
// or acquire lock and automatically unlock it after 10 seconds
multiLock.lock(10, TimeUnit.SECONDS);
 
// or wait for lock aquisition up to 100 seconds 
// and automatically unlock it after 10 seconds
boolean res = multiLock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       multiLock.unlock();
   }
}

案例

准备工作

使用 Docker 起 3 台 Redis 的 master 机器,本次设置 3 台 master 各自独立无从属关系。

docker run -p 6381:6379 --name redis-master-1 -d redis
docker run -p 6382:6379 --name redis-master-2 -d redis
docker run -p 6383:6379 --name redis-master-3 -d redis

Demo

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.10.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
 
    <groupId>com.atguigu.redis.redlock</groupId>
    <artifactId>redis_redlock</artifactId>
    <version>0.0.1-SNAPSHOT</version>
 
 
    <properties>
        <java.version>1.8</java.version>
    </properties>
 
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
 
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
 
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.19.1</version>
        </dependency>
 
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.8</version>
        </dependency>
        <!--swagger-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!--swagger-ui-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.4</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.11</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>
 
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.springframework.boot</groupId>
                            <artifactId>spring-boot-configuration-processor</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
 
</project>
application.properties
server.port=9090
spring.application.name=redlock
 
spring.swagger2.enabled=true
 
spring.redis.database=0
spring.redis.password=
spring.redis.timeout=3000
spring.redis.mode=single
 
spring.redis.pool.conn-timeout=3000
spring.redis.pool.so-timeout=3000
spring.redis.pool.size=10
 
spring.redis.single.address1=192.168.111.185:6381
spring.redis.single.address2=192.168.111.185:6382
spring.redis.single.address3=192.168.111.185:6383
package com.atguigu.redis.redlock;
 
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
 
@SpringBootApplication
public class RedisRedlockApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(RedisRedlockApplication.class, args);
    }
 
}
package com.atguigu.redis.redlock.config;
 
import org.apache.commons.lang3.StringUtils;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
 
@Configuration
@EnableConfigurationProperties(RedisProperties.class)
public class CacheConfiguration {
 
    @Autowired
    RedisProperties redisProperties;
 
    @Bean
    RedissonClient redissonClient1() {
        Config config = new Config();
        String node = redisProperties.getSingle().getAddress1();
        node = node.startsWith("redis://") ? node : "redis://" + node;
        SingleServerConfig serverConfig = config.useSingleServer()
                .setAddress(node)
                .setTimeout(redisProperties.getPool().getConnTimeout())
                .setConnectionPoolSize(redisProperties.getPool().getSize())
                .setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle());
        if (StringUtils.isNotBlank(redisProperties.getPassword())) {
            serverConfig.setPassword(redisProperties.getPassword());
        }
        return Redisson.create(config);
    }
 
    @Bean
    RedissonClient redissonClient2() {
        Config config = new Config();
        String node = redisProperties.getSingle().getAddress2();
        node = node.startsWith("redis://") ? node : "redis://" + node;
        SingleServerConfig serverConfig = config.useSingleServer()
                .setAddress(node)
                .setTimeout(redisProperties.getPool().getConnTimeout())
                .setConnectionPoolSize(redisProperties.getPool().getSize())
                .setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle());
        if (StringUtils.isNotBlank(redisProperties.getPassword())) {
            serverConfig.setPassword(redisProperties.getPassword());
        }
        return Redisson.create(config);
    }
 
    @Bean
    RedissonClient redissonClient3() {
        Config config = new Config();
        String node = redisProperties.getSingle().getAddress3();
        node = node.startsWith("redis://") ? node : "redis://" + node;
        SingleServerConfig serverConfig = config.useSingleServer()
                .setAddress(node)
                .setTimeout(redisProperties.getPool().getConnTimeout())
                .setConnectionPoolSize(redisProperties.getPool().getSize())
                .setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle());
        if (StringUtils.isNotBlank(redisProperties.getPassword())) {
            serverConfig.setPassword(redisProperties.getPassword());
        }
        return Redisson.create(config);
    }
 
 
    /**
     * 单机
     * @return
     */
    /*@Bean
    public Redisson redisson()
    {
        Config config = new Config();
 
        config.useSingleServer().setAddress("redis://192.168.111.147:6379").setDatabase(0);
 
        return (Redisson) Redisson.create(config);
    }*/
 
}
package com.atguigu.redis.redlock.config;
 
import lombok.Data;
 
@Data
public class RedisPoolProperties {
 
    private int maxIdle;
 
    private int minIdle;
 
    private int maxActive;
 
    private int maxWait;
 
    private int connTimeout;
 
    private int soTimeout;
 
    /**
     * 池大小
     */
    private  int size;
 
}
RedisProperties.java
package com.atguigu.redis.redlock.config;
 
import lombok.Data;
import lombok.ToString;
import org.springframework.boot.context.properties.ConfigurationProperties;
 
@ConfigurationProperties(prefix = "spring.redis", ignoreUnknownFields = false)
@Data
public class RedisProperties {
 
    private int database;
 
    /**
     * 等待节点回复命令的时间。该时间从命令发送成功时开始计时
     */
    private int timeout;
 
    private String password;
 
    private String mode;
 
    /**
     * 池配置
     */
    private RedisPoolProperties pool;
 
    /**
     * 单机信息配置
     */
    private RedisSingleProperties single;
}
RedisSingleProperties.java
package com.atguigu.redis.redlock.config;
 
import lombok.Data;
 
@Data
public class RedisSingleProperties {
    private  String address1;
    private  String address2;
    private  String address3;
}
RedLockController.java
package com.atguigu.redis.redlock.controller;
 
import cn.hutool.core.util.IdUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.RedissonMultiLock;
import org.redisson.RedissonRedLock;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
 
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
 
@RestController
@Slf4j
public class RedLockController {
 
    public static final String CACHE_KEY_REDLOCK = "_REDLOCK";
 
    @Autowired
    RedissonClient redissonClient1;
 
    @Autowired
    RedissonClient redissonClient2;
 
    @Autowired
    RedissonClient redissonClient3;
 
    boolean isLockBoolean;
 
    @GetMapping(value = "/multiLock")
    public String getMultiLock() throws InterruptedException
    {
        String uuid =  IdUtil.simpleUUID();
        String uuidValue = uuid+":"+Thread.currentThread().getId();
 
        RLock lock1 = redissonClient1.getLock(CACHE_KEY_REDLOCK);
        RLock lock2 = redissonClient2.getLock(CACHE_KEY_REDLOCK);
        RLock lock3 = redissonClient3.getLock(CACHE_KEY_REDLOCK);
 
        RedissonMultiLock redLock = new RedissonMultiLock(lock1, lock2, lock3);
        redLock.lock();
        try
        {
            System.out.println(uuidValue+"\t"+"---come in biz multiLock");
            try { TimeUnit.SECONDS.sleep(30); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println(uuidValue+"\t"+"---task is over multiLock");
        } catch (Exception e) {
            e.printStackTrace();
            log.error("multiLock exception ",e);
        } finally {
            redLock.unlock();
            log.info("释放分布式锁成功key:{}", CACHE_KEY_REDLOCK);
        }
 
        return "multiLock task is over  "+uuidValue;
    }
}

测试