壹影博客.
我在下午4点钟开始想你
剖析高并发场景下超卖BUG及Resisson部分源码分析
  • 2024-2-15日
  • 0评论
  • 340围观

剖析高并发场景下超卖BUG及Resisson部分源码分析

前言:本篇文章将探讨分析我们在实际开发中扣减库存时,方法内潜在的问题,并对潜在问题进行处理优化,最后再来分析用市面上成熟的框架Redisson如何解决这些潜在问题

一、扣减库存超卖BUG分析

 如下是 常见的controller扣减库存的代码,以及潜在问题

@RequestMapping("/deduct_stock")
public String deductStock(){
   //查询redis里的库存 假设查询结果是200
   int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));

   //如果库存结果大于0
   if(stock > 0){
       int realStock = stock - 1; //库存-1  当在高并发情况下多个线程都扣减1 200-1结果为199
       stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); //重新设置库存
       System.out.println("扣减成功,剩余库存:" + realStock); //线程1、线程2...都输出199
       //出现数据不一致的BUG问题
   }else{
       //否则库存扣减失败
       System.out.println("扣减失败,库存不足");
   }
   return "end";
}

如上代码出现了扣减库存数据不一致的情况,其实本质上是多个线程同时扣减库存所导致的,那么我们能不能控制多个线程,只让一个线程去操作呢,我们很快会想到利用synchronized关键字去加锁,让其只能同时有一个线程进行去修改redis的数据

处理问题:确保只有一个线程修改数据
具体情况:多个线程同时对数据进行修改

代码如下

@RequestMapping("/deduct_stock")
public String deductStock(){
    //加锁-同时只能有一个线程对数据进行操作
    synchronized (this){
        //查询redis里的库存 假设查询结果是200
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));

        //如果库存结果大于0
        if(stock > 0){
            int realStock = stock - 1; //库存-1  结果为199
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); //重新设置库存
            System.out.println("扣减成功,剩余库存:" + realStock); //输出199
        }else{
            //否则库存扣减失败
            System.out.println("扣减失败,库存不足");
        }
        return "end";
    }
}

/**
 * 针对扣减库存的数据不一致的BUG进行的优化
 * 新增了synchronized锁住代码块
 * 但是依旧存在问题:synchronized是在是单机状态下并发问题解决方案
 * 但是在分布式集群的环境下synchronized就会出现问题
 * 解决方案:使用分布式锁
 */

针对扣减库存的数据不一致的BUG进行的优化,新增了synchronized锁住代码块,但是依旧存在问题:synchronized是在是单机服务下并发问题解决方案,在分布式集群的环境下synchronized就会出现问题

处理问题:集群服务下,保证线程数据安全
具体情况:高并发集群服务,多个线程线程改数据的安全问题

解决方案:使用分布式锁 --使用Redis的SETNX命令去解决
然后我们进步优化 -见如下代码:

@RequestMapping("/deduct_stock")
public String deductStock(){

  //根据商品ID生产一个KEY,商品ID是前端传过来的
  String lockKey="lock:product:id"; 

  //添加分布式锁进行加锁  -- 如果lockKey 不存在 则设置值返回true,如果存在则不会做任何操作返回false
  //添加分布式锁setNX
  Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "yyge");

  //如果加锁失败则返回前端提示
  if (Boolean.FALSE.equals(result)){return "biz_code";}

  //查询redis里的库存 假设查询结果是200
  int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));

  //如果库存结果大于0
  if(stock > 0){
    int realStock = stock - 1; //库存-1 结果为199
    stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); //重新设置库存
    System.out.println("扣减成功,剩余库存:" + realStock); //输出199
  }else{
    //否则库存扣减失败
    System.out.println("扣减失败,库存不足");
  }

  //删除分布式锁
  stringRedisTemplate.delete(lockKey);
  return "end";
}

/**
 * 进一步对代码进行优化-使用redis的SETNX 命令来实现分布式锁
 * stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "yyge");
 *
 * 但是即使如此依旧存在问题
 * 在高并发情况下,有一种场景,分布式锁添加成功
 * 但是在处理业务逻辑中发生了异常,导致未能正常执行释放锁的操作
 * 此时这个锁就会一直存在....
 *
 * 解决方案:捕获异常
 */

对扣减库存的操作设置了分布式锁后,依然存在问题,比如在高并发情况下,虽然redis加锁成功,在执行业务逻辑时出现异常,会导致锁未释放的bug问题,针对这个问题我们还需要对代码块进行异常捕获,在finally中,无论是否发生异常都需要将锁删掉

处理问题:加分布式锁成功,锁未释放
具体情况:业务逻辑代码抛出异常,导致锁未释放

见如下优化后的代码

@RequestMapping("/deduct_stock")
public String deductStock(){

  //根据商品ID生产一个KEY,商品ID是前端传过来的
  String lockKey="lock:product:id"; 

  //添加分布式锁进行加锁  -- 如果lockKey 不存在 则设置值返回true,如果存在则不会做任何操作返回false
  //添加分布式锁setNX
  Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "yyge");

  //如果加锁失败则返回前端提示
  if (Boolean.FALSE.equals(result)){return "biz_code";}

  try {
    //查询redis里的库存 假设查询结果是200
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));

    //如果库存结果大于0
    if(stock > 0){
      int realStock = stock - 1; //库存-1 结果为199
      stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); //重新设置库存
      System.out.println("扣减成功,剩余库存:" + realStock); //输出199
    }else{
      //否则库存扣减失败
      System.out.println("扣减失败,库存不足");
    }
  }finally {
    //删除分布式锁
    stringRedisTemplate.delete(lockKey);
  }
  return "end";
}

/**
 * 进一步优化:利用try,finally捕获异常,无论是否发生异常
 * 都需要将锁删掉
 *
 * 这样操作还是会存在问题:
 * 当线程获取到了锁,在执行业务逻辑时服务器直接宕机了
 * 此时finally内的代码无法执行到,导致锁未释放
 *
 * 解决方案:设置过期时间
 */

对代码进行处理后,解决了抛出异常的问题,但是如果是服务器宕机的情况finally内的释放锁的代码还是不能够执行,所以我们还需要新的解决方案

处理问题:加分布式锁成功,锁未释放
具体情况:执行业务逻辑代码过程中,服务器直接宕机,导致锁未释放

解决方案:设置过期时间

 见如下进一步优化的代码

@RequestMapping("/deduct_stock")
public String deductStock(){
  String lockKey="lock:product:id"; //根据商品ID生产一个KEY,商品ID是前端传过来的
  //添加分布式锁进行加锁  -- 如果lockKey 不存在 则设置值返回true,如果存在则不会做任何操作返回false
  //添加分布式锁setNX,并设置过期时间10秒钟
  Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "yyge",10,TimeUnit.SECONDS);

  //如果加锁失败则返回前端提示
  if (Boolean.FALSE.equals(result)){return "biz_code";}

  try {
    //查询redis里的库存 假设查询结果是200
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));

    //如果库存结果大于0
    if(stock > 0){
      int realStock = stock - 1; //库存-1 结果为199
      stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); //重新设置库存
      System.out.println("扣减成功,剩余库存:" + realStock); //输出199
    } else {
      //否则库存扣减失败
      System.out.println("扣减失败,库存不足");
    }
  } finally {
    //删除分布式锁
    stringRedisTemplate.delete(lockKey);
  }
  return "end";
}

/**
 * 进一步优化:为了解决在执行业务逻辑代码时服务器宕机导致分布式锁未释放的问题
 * 我们在加锁的时候设置一个过期时间来防止这个问题
 *
 * 如此操作还是有问题:
 * 我们的业务流程收到各个方面的影响,执行时间是无法预料的
 * 如果业务代码的执行时间超过了锁的过期时间,下一个线程过来的时候由于之前的锁已经过期,所以会获得锁成功,然后线程1执行完毕释放锁,此时释放的其实的线程2的锁,那么就会出现其他线程释放锁的问题
 * 也就是说 上锁和释放锁的不是一个线程
 *
 * 解决方案:将线程ID或者UUID设置为锁的值
 */

此时我们为锁设置了过期时间,但还是存在一些问题,如果业务代码的执行时间超过了锁的过期时间,下一个线程过来的时候由于之前的锁已经过期,所以会获得锁成功,然后线程1执行完毕释放锁,此时释放的其实的线程2的锁,那么就会出现其他线程释放锁的问题

处理问题:加分布式锁成功,锁释放不是同一个线程
具体情况:执行业务逻辑代码过程中,执行时间超过了锁的过期时间,导致其他线程获取到了锁,释放锁也不是同一个线程

解决方案:将当前的线程ID设置为锁的值

见如下优化结果代码

@RequestMapping("/deduct_stock")
public String deductStock(){
  String lockKey="lock:product:id"; //根据商品ID生产一个KEY,商品ID是前端传过来的
  //获取当前线程ID !!!这里也可以换成UUID
  String theadId = String.valueOf(Thread.currentThread().getId());

  //添加分布式锁进行加锁  -- 如果lockKey 不存在 则设置值返回true,如果存在则不会做任何操作返回false
  //添加分布式锁setNX,并设置过期时间10秒钟
  Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, theadId,10,TimeUnit.SECONDS);
  if (Boolean.FALSE.equals(result)){
    //如果加锁失败则返回前端提示
    return "biz_code";
  }
  try {
    //查询redis里的库存 假设查询结果是200
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));

    //如果库存结果大于0
    if(stock > 0){
      int realStock = stock - 1; //库存-1  结果为199
      stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); //重新设置库存
      System.out.println("扣减成功,剩余库存:" + realStock); //输出199
    }else{
      //否则库存扣减失败
      System.out.println("扣减失败,库存不足");
    }
  } finally {
    //判断当前的线程ID是否是当前Key的value值,如果是则删除锁
    if(theadId.equals(stringRedisTemplate.opsForValue().get(lockKey))){
      //删除分布式锁
      stringRedisTemplate.delete(lockKey);
    }

  }
  return "end";
}
/**
 * 我们进一步优化:为了解决在分布式情况下,当前的锁被别的线程释放的问题
 * 我们在设置锁的时候存入线程ID作为值
 *
 * 依然存在问题:
 * 当我们判断线程ID的时候如果服务器宕机,会导致锁一直不会被释放
 *
 * 解决方案:使用Lua保证原子性...
 */

优化到了这里虽然对于中小型公司的业务需求而言,是足够使用的,但对于大于大数据的高并发其实还是存在一些问题的,因为我们判断线程ID和删除锁的操作并不是一个原子操作,此时我们需要将执行的代码做成一个原子操作,此时市面上有许多比较成熟且完善的开源框架可以做到这一点,如Redisson,该框架帮我们做了如上的操作,并且帮我去封装调用了Lua代码来保证执行的原子性...

二、Resisson框架的基本使用及部分源码分析

Redisson在基于NIO的Netty框架上,充分的利用了Redis键值数据库提供的一系列优势,在Java实用工具包中常用接口的基础上,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。

官网:https://redisson.org/
 GitHub:https://github.com/redisson/redisson

1.简单使用

 ①导入依赖 - pom.xml

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson</artifactId>
  <version>3.6.5</version>
</dependency>

②简单配置初始化(也可以自定义配置类) - 启动类

@SpringBootApplication
public class DemoApplication {
  public static void main(String[] args) {
      SpringApplication.run(DemoApplication.class, args);
  }

  @Bean
  public Redisson redisson() {
      //此为单机模式
      Config config = new Config();
      config.useSingleServer().setAddress(" edis://localhost:6379").setDatabase(0);
      //config.setLockWatchdogTimeout(10080);//设置分布式锁watch dog超时时间
      return (Redisson) Redisson.create(config);
  }
}

③使用

@Autowired
Redisson redisson;

@Autowired
StringRedisTemplate stringRedisTemplate;

@RequestMapping("/deduct_stock")
public String deductStock(){
  String lockKey="lock:product:id"; //根据商品ID生产一个KEY,商品ID是前端传过来的

  //获得锁对象
  RLock redissonLock = redisson.getLock(lockKey);
  redissonLock.lock(); //加锁
  try {
    //查询redis里的库存 假设查询结果是200
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); 
    //如果库存结果大于0
    if(stock > 0){
      int realStock = stock - 1; //库存-1  结果为199
      stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); //重新设置库存
      System.out.println("扣减成功,剩余库存:" + realStock); //输出199
    }else{
      //否则库存扣减失败
      System.out.println("扣减失败,库存不足");
    }
  }finally {
    redissonLock.unlock(); //释放锁
  }
  return "end";
}

如上就是Resisson的使用,来解决我们之前的问题,Resisson为什么可以解决我们之前的问题呢,我们下面来来分析分析它的源码

2.Resisson锁实现原理

如果所示,在当有两个线程 线程1和线程2,都调用了Redisson的Lock方法来加锁,在Redisson底层就执行了类似于setNx的操作,假如线程1抢占到了锁,线程1加锁成功后(在redisson中锁过期时间默认是30秒),redisson会在后台开启一个线程(守护线程),每间隔10秒检查是否还持有锁,如果持有的则延长锁的时间,此时另外一个线程(线程2)如果是也想要去加锁,那么会通过while自旋的形式 间歇性的尝试加锁,当线程1释放锁之后,线程2才会尝试加锁成功

3.Redisson核心源码分析

通过跟进redisson的加锁方法内部(redissonLock.lock(); //加锁)

在RedissonLock.java类中定位到tryAcquireOnceAsync
方法看到如下代码:

// 参数:leaseTime - 锁的租期时长,unit - 时间单位,threadId - 当前尝试获取锁的线程ID
private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
    // 如果指定了锁的租期(不为-1)
   if (leaseTime != -1) { 
    // 异步尝试获取具有指定租期的锁,并返回结果
     return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);                                                                                                             
   }
   RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);  
   
   // 为锁的获取结果添加监听器
   ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
     @Override
     public void operationComplete(Future<Boolean> future) throws Exception {
       // 如果获取锁的操作没有成功
       if (!future.isSuccess()) {
        // 直接返回,不执行任何操作
        return;
       }

       Boolean ttlRemaining = future.getNow();
       // lock acquired                                                                                                                                                                              
       if (ttlRemaining) {     
         //刷新过期时间(续时)                                                                                                                                                                      
         scheduleExpirationRenewal(threadId);                                                                                                                                                      
       }                                                                                                                                                                                             
     }                                                                                                                                                                                                 
   });
    return ttlRemainingFuture;                                                                                                                                                                            
}

在该方法内我们可以看到有续时的操作,我们继续跟进方法tryLockInnerAsync,可以看到如下核心代码

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  internalLockLeaseTime = unit.toMillis(leaseTime);                                                          
                                                                                                                
  return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,                              
   "if (redis.call('exists', KEYS[1]) == 0) then " +                                                
       "redis.call('hset', KEYS[1], ARGV[2], 1); " +                                                
       "redis.call('pexpire', KEYS[1], ARGV[1]); " +                                                
       "return nil; " +                                                                             
   "end; " +                                                                                        
   "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +                                      
       "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +                                             
       "redis.call('pexpire', KEYS[1], ARGV[1]); " +                                                
       "return nil; " +                                                                             
   "end; " +                                                                                        
   "return redis.call('pttl', KEYS[1]);",                                                           
    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));   
}                                                                                                              

如上代码Redisson将Lua脚本丢到Redis服务端中去执行,由于Lua脚本是单线程执行,能够确保执行的原子性。

--Lua脚本解释

--redis.call() 执行redis命令
--KEYS[1]、ARGV[1]、ARGV[2]都是外部传进来的参数
--KEYS[1] 其实就是lockKey
--ARGV[2] 线程ID 
--ARGV[1] 过期时间 默认是30秒

--判断 lockKey 是否存在 判断锁是否存在  
if (redis.call('exists', KEYS[1]) == 0) then   
  --==0锁不存在
  --在redis中存入Hash结构 等同于 set(k,v)
  redis.call('hset', KEYS[1], ARGV[2], 1);  

  --设置过期时间                                           
  redis.call('pexpire', KEYS[1], ARGV[1]);                                             
  return nil;                                                                          
end

--实现锁充入逻辑
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then                                   
  redis.call('hincrby', KEYS[1], ARGV[2], 1)                                      
  redis.call('pexpire', KEYS[1], ARGV[1]);                                             
  return nil;                                                                          
end

看完核心的加锁操作后我们来看看这个,锁是如何进行续时的,我们回到在RedissonLock.java类中定位到tryAcquireOnceAsync方法,找到scheduleExpirationRenewal方法,跟进去看看

private void scheduleExpirationRenewal(final long threadId) {
 //检查是否需要续时
 //首先检查 expirationRenewalMap 中是否已经有了当前锁的续时任务。如果有了,就直接返回,不再重复创建任务。
  if (expirationRenewalMap.containsKey(getEntryName())) {
    return;
  }
  
  //创建一个新的超时任务,这个超时任务会在指定的时间(internalLockLeaseTime / 3)后执行。 
  Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
    @Override
    public void run(Timeout timeout) throws Exception {
      //利用lua代码来做续时操作,lua代码确保操作的原子性
      //返回布尔值表示是否续时成功
      RFuture < Boolean > future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
        "return 1; " +
        "end; " +
        "return 0;",
        Collections. < Object > singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

        //为续时操作的 RFuture 添加一个监听器
        future.addListener(new FutureListener < Boolean > () {
          @Override
          public void operationComplete(Future < Boolean > future) throws Exception {
              //首先从 expirationRenewalMap 中移除该锁的续时任务,因为任务已经执行完毕。
              expirationRenewalMap.remove(getEntryName());

              //判断续时操作是否成功,如果失败则记录日志
              if (!future.isSuccess()) {
                  log.error("Can't update lock " + getName() + " expiration", future.cause());
                  return;
              }
              
              //如果续时成功,则重新调度该续时任务,确保锁的持续有效性。
              //递归调用自己
              if (future.getNow()) {
                  // reschedule itself                                                                                                                                                 
                  scheduleExpirationRenewal(threadId);
              }
          }
        });
    }
  }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
  
  //确保任务唯一性
  //尝试将新创建的续时任务添加到 expirationRenewalMap 中
  //如果 putIfAbsent 返回的值不为 null,说明之前已经有一个相同的续时任务存在,那么取消新创建的任务。
  if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
      task.cancel();
  }
}

通过分析源代码我们可以发现,该方法本质上是一个递归调用,如果续时成功则会递归调用自身续时,直到任务停止

以上就是本篇文章的全部分享了
最后小编还准备了本篇文章所涉及的源码

仅供参考学习

源码:点我下载

by壹影

 

 

 

发表评论