您当前的位置:首页 > 电脑百科 > 数据库 > Redis

基于redis分布式锁实现“秒杀”

时间:2019-08-21 11:29:24  来源:  作者:



最近在项目中遇到了类似“秒杀”的业务场景,在本篇文章中,我将用一个非常简单的demo,阐述实现所谓“秒杀”的基本思路。

业务场景

所谓秒杀,从业务角度看,是短时间内多个用户“争抢”资源,这里的资源在大部分秒杀场景里是商品;将业务抽象,技术角度看,秒杀就是多个线程对资源进行操作,所以实现秒杀,就必须控制线程对资源的争抢,既要保证高效并发,也要保证操作的正确。

 

一些可能的实现

刚才提到过,实现秒杀的关键点是控制线程对资源的争抢,根据基本的线程知识,可以不加思索的想到下面的一些方法:

1、秒杀在技术层面的抽象应该就是一个方法,在这个方法里可能的操作是将商品库存-1,将商品加入用户的购物车等等,在不考虑缓存的情况下应该是要操作数据库的。那么最简单直接的实现就是在这个方法上加上synchronized关键字,通俗的讲就是锁住整个方法;

2、锁住整个方法这个策略简单方便,但是似乎有点粗暴。可以稍微优化一下,只锁住秒杀的代码块,比如写数据库的部分;

3、既然有并发问题,那我就让他“不并发”,将所有的线程用一个队列管理起来,使之变成串行操作,自然不会有并发问题。

 

上面所述的方法都是有效的,但是都不好。为什么?第一和第二种方法本质上是“加锁”,但是锁粒度依然比较高。什么意思?试想一下,如果两个线程同时执行秒杀方法,这两个线程操作的是不同的商品,从业务上讲应该是可以同时进行的,但是如果采用第一二种方法,这两个线程也会去争抢同一个锁,这其实是不必要的。第三种方法也没有解决上面说的问题。

 

那么如何将锁控制在更细的粒度上呢?可以考虑为每个商品设置一个互斥锁,以和商品ID相关的字符串为唯一标识,这样就可以做到只有争抢同一件商品的线程互斥,不会导致所有的线程互斥。分布式锁恰好可以帮助我们解决这个问题。

 

何为分布式锁

分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。

我们来假设一个最简单的秒杀场景:数据库里有一张表,column分别是商品ID,和商品ID对应的库存量,秒杀成功就将此商品库存量-1。现在假设有1000个线程来秒杀两件商品,500个线程秒杀第一个商品,500个线程秒杀第二个商品。我们来根据这个简单的业务场景来解释一下分布式锁。

通常具有秒杀场景的业务系统都比较复杂,承载的业务量非常巨大,并发量也很高。这样的系统往往采用分布式的架构来均衡负载。那么这1000个并发就会是从不同的地方过来,商品库存就是共享的资源,也是这1000个并发争抢的资源,这个时候我们需要将并发互斥管理起来。这就是分布式锁的应用。

而key-value存储系统,如redis,因为其一些特性,是实现分布式锁的重要工具。

 

具体的实现

先来看看一些redis的基本命令:

SETNX key value

如果key不存在,就设置key对应字符串value。在这种情况下,该命令和SET一样。当key已经存在时,就不做任何操作。SETNX是”SET if Not eXists”。

expire KEY seconds

设置key的过期时间。如果key已过期,将会被自动删除。

del KEY

删除key

由于笔者的实现只用到这三个命令,就只介绍这三个命令,更多的命令以及redis的特性和使用,可以参考redis官网。

 

需要考虑的问题

1、用什么操作redis?幸亏redis已经提供了jedis客户端用于JAVA应用程序,直接调用jedis API即可。

2、怎么实现加锁?“锁”其实是一个抽象的概念,将这个抽象概念变为具体的东西,就是一个存储在redis里的key-value对,key是于商品ID相关的字符串来唯一标识,value其实并不重要,因为只要这个唯一的key-value存在,就表示这个商品已经上锁。

3、如何释放锁?既然key-value对存在就表示上锁,那么释放锁就自然是在redis里删除key-value对。

4、阻塞还是非阻塞?笔者采用了阻塞式的实现,若线程发现已经上锁,会在特定时间内轮询锁。

5、如何处理异常情况?比如一个线程把一个商品上了锁,但是由于各种原因,没有完成操作(在上面的业务场景里就是没有将库存-1写入数据库),自然没有释放锁,这个情况笔者加入了锁超时机制,利用redis的expire命令为key设置超时时长,过了超时时间redis就会将这个key自动删除,即强制释放锁(可以认为超时释放锁是一个异步操作,由redis完成,应用程序只需要根据系统特点设置超时时间即可)。

 

talk is cheap,show me the code

在代码实现层面,注解有并发的方法和参数,通过动态代理获取注解的方法和参数,在代理中加锁,执行完被代理的方法后释放锁。

几个注解定义:

cachelock是方法级的注解,用于注解会产生并发问题的方法:

  •  
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CacheLock {
 String lockedPrefix() default "";//redis 锁key的前缀
 long timeOut() default 2000;//轮询锁的时间
 int expireTime() default 1000;//key在redis里存在的时间,1000S
}

lockedObject是参数级的注解,用于注解商品ID等基本类型的参数:

@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockedObject {
 //不需要值
}

LockedComplexObject也是参数级的注解,用于注解自定义类型的参数:

@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockedComplexObject {
 String field() default "";//含有成员变量的复杂对象中需要加锁的成员变量,如一个商品对象的商品ID
}

CacheLockInterceptor实现InvocationHandler接口,在invoke方法中获取注解的方法和参数,在执行注解的方法前加锁,执行被注解的方法后释放锁:

public class CacheLockInterceptor implements InvocationHandler{
 public static int ERROR_COUNT = 0;
 private Object proxied;
 public CacheLockInterceptor(Object proxied) {
 this.proxied = proxied;
 }
 @Override
 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
 CacheLock cacheLock = method.getAnnotation(CacheLock.class);
 //没有cacheLock注解,pass
 if(null == cacheLock){
 System.out.println("no cacheLock annotation"); 
 return method.invoke(proxied, args);
 }
 //获得方法中参数的注解
 Annotation[][] annotations = method.getParameterAnnotations();
 //根据获取到的参数注解和参数列表获得加锁的参数
 Object lockedObject = getLockedObject(annotations,args);
 String objectValue = lockedObject.toString();
 //新建一个锁
 RedisLock lock = new RedisLock(cacheLock.lockedPrefix(), objectValue);
 //加锁
 boolean result = lock.lock(cacheLock.timeOut(), cacheLock.expireTime());
 if(!result){//取锁失败
 ERROR_COUNT += 1;
 throw new CacheLockException("get lock fail");
 }
 try{
 //加锁成功,执行方法
 return method.invoke(proxied, args);
 }finally{
 lock.unlock();//释放锁
 }
 }
 /**
 * 
 * @param annotations
 * @param args
 * @return
 * @throws CacheLockException
 */
 private Object getLockedObject(Annotation[][] annotations,Object[] args) throws CacheLockException{
 if(null == args || args.length == 0){
 throw new CacheLockException("方法参数为空,没有被锁定的对象");
 }
 if(null == annotations || annotations.length == 0){
 throw new CacheLockException("没有被注解的参数");
 }
 //不支持多个参数加锁,只支持第一个注解为lockedObject或者lockedComplexObject的参数
 int index = -1;//标记参数的位置指针
 for(int i = 0;i < annotations.length;i++){
 for(int j = 0;j < annotations[i].length;j++){
 if(annotations[i][j] instanceof LockedComplexObject){//注解为LockedComplexObject
 index = i;
 try {
 return args[i].getClass().getField(((LockedComplexObject)annotations[i][j]).field());
 } catch (NoSuchFieldException | SecurityException e) {
 throw new CacheLockException("注解对象中没有该属性" + ((LockedComplexObject)annotations[i][j]).field());
 }
 }
 if(annotations[i][j] instanceof LockedObject){
 index = i;
 break;
 }
 }
 //找到第一个后直接break,不支持多参数加锁
 if(index != -1){
 break;
 }
 }
 if(index == -1){
 throw new CacheLockException("请指定被锁定参数");
 }
 return args[index];
 }
}

最关键的RedisLock类中的lock方法和unlock方法:

/**
 * 加锁
 * 使用方式为:
 * lock();
 * try{
 * executeMethod();
 * }finally{
 * unlock();
 * }
 * @param timeout timeout的时间范围内轮询锁
 * @param expire 设置锁超时时间
 * @return 成功 or 失败
 */
 public boolean lock(long timeout,int expire){
 long nanoTime = System.nanoTime();
 timeout *= MILLI_NANO_TIME;
 try {
 //在timeout的时间范围内不断轮询锁
 while (System.nanoTime() - nanoTime < timeout) {
 //锁不存在的话,设置锁并设置锁过期时间,即加锁
 if (this.redisClient.setnx(this.key, LOCKED) == 1) {
 this.redisClient.expire(key, expire);//设置锁过期时间是为了在没有释放
 //锁的情况下锁过期后消失,不会造成永久阻塞
 this.lock = true;
 return this.lock;
 }
 System.out.println("出现锁等待");
 //短暂休眠,避免可能的活锁
 Thread.sleep(3, RANDOM.nextInt(30));
 } 
 } catch (Exception e) {
 throw new RuntimeException("locking error",e);
 }
 return false;
 }
 public void unlock() {
 try {
 if(this.lock){
 redisClient.delKey(key);//直接删除
 }
 } catch (Throwable e) {
 }
 }

上述的代码是框架性的代码,现在来讲解如何使用上面的简单框架来写一个秒杀函数。

先定义一个接口,接口里定义了一个秒杀方法:

public interface SeckillInterface {
/**
*现在暂时只支持在接口方法上注解
*/
 //cacheLock注解可能产生并发的方法
 @CacheLock(lockedPrefix="TEST_PREFIX")
 public void secKill(String userID,@LockedObject Long commidityID);//最简单的秒杀方法,参数是用户ID和商品ID。可能有多个线程争抢一个商品,所以商品ID加上LockedObject注解
}

上述SeckillInterface接口的实现类,即秒杀的具体实现:

public class SecKillImpl implements SeckillInterface{
 static Map<Long, Long> inventory ;
 static{
 inventory = new HashMap<>();
 inventory.put(10000001L, 10000l);
 inventory.put(10000002L, 10000l);
 }
 @Override
 public void secKill(String arg1, Long arg2) {
 //最简单的秒杀,这里仅作为demo示例
 reduceInventory(arg2);
 }
 //模拟秒杀操作,姑且认为一个秒杀就是将库存减一,实际情景要复杂的多
 public Long reduceInventory(Long commodityId){
 inventory.put(commodityId,inventory.get(commodityId) - 1);
 return inventory.get(commodityId);
 }
}

模拟秒杀场景,1000个线程来争抢两个商品:

@Test
 public void testSecKill(){
 int threadCount = 1000;
 int splitPoint = 500;
 CountDownLatch endCount = new CountDownLatch(threadCount);
 CountDownLatch beginCount = new CountDownLatch(1);
 SecKillImpl testClass = new SecKillImpl();
 Thread[] threads = new Thread[threadCount];
 //起500个线程,秒杀第一个商品
 for(int i= 0;i < splitPoint;i++){
 threads[i] = new Thread(new Runnable() {
 public void run() {
 try {
 //等待在一个信号量上,挂起
 beginCount.await();
 //用动态代理的方式调用secKill方法
 SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(), 
 new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
 proxy.secKill("test", commidityId1);
 endCount.countDown();
 } catch (InterruptedException e) {
 // TODO Auto-generated catch block
 e.printStackTrace();
 }
 }
 });
 threads[i].start();
 }
 //再起500个线程,秒杀第二件商品
 for(int i= splitPoint;i < threadCount;i++){
 threads[i] = new Thread(new Runnable() {
 public void run() {
 try {
 //等待在一个信号量上,挂起
 beginCount.await();
 //用动态代理的方式调用secKill方法
 SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(), 
 new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
 proxy.secKill("test", commidityId2);
 //testClass.testFunc("test", 10000001L);
 endCount.countDown();
 } catch (InterruptedException e) {
 // TODO Auto-generated catch block
 e.printStackTrace();
 }
 }
 });
 threads[i].start();
 }
 long startTime = System.currentTimeMillis();
 //主线程释放开始信号量,并等待结束信号量,这样做保证1000个线程做到完全同时执行,保证测试的正确性
 beginCount.countDown();
 try {
 //主线程等待结束信号量
 endCount.await();
 //观察秒杀结果是否正确
 System.out.println(SecKillImpl.inventory.get(commidityId1));
 System.out.println(SecKillImpl.inventory.get(commidityId2));
 System.out.println("error count" + CacheLockInterceptor.ERROR_COUNT);
 System.out.println("total cost " + (System.currentTimeMillis() - startTime));
 } catch (InterruptedException e) {
 // TODO Auto-generated catch block
 e.printStackTrace();
 }
 }

在正确的预想下,应该每个商品的库存都减少了500,在多次试验后,实际情况符合预想。如果不采用锁机制,会出现库存减少499,498的情况。

这里采用了动态代理的方法,利用注解和反射机制得到分布式锁ID,进行加锁和释放锁操作。当然也可以直接在方法进行这些操作,采用动态代理也是为了能够将锁操作代码集中在代理中,便于维护。

通常秒杀场景发生在web项目中,可以考虑利用spring的AOP特性将锁操作代码置于切面中,当然AOP本质上也是动态代理。

 

小结

这篇文章从业务场景出发,从抽象到实现阐述了如何利用redis实现分布式锁,完成简单的秒杀功能,也记录了笔者思考的过程,希望能给阅读到本篇文章的人一些启发。

原文:https://blog.csdn.net/u010359884/article/details/50310387

作者:Isfire

来源:CSDN



Tags:分布式锁   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
概述以前参加过一个库存系统,由于其业务复杂性,搞了很多个应用来支撑。这样的话一份库存数据就有可能同时有多个应用来修改库存数据。比如说,有定时任务域xx.cron,和SystemA域...【详细内容】
2021-11-05  Tags: 分布式锁  点击:(31)  评论:(0)  加入收藏
作者 | mushishi来源 | urlify.cn/Mry6biredis分布式锁基本原理采用 redis 实现分布式锁,主要是利用其单线程命令执行的特性,一般是 setnx, 只会有一个线程会执行成功,也就是只...【详细内容】
2021-07-07  Tags: 分布式锁  点击:(105)  评论:(0)  加入收藏
前言之前的文章中通过电商场景中秒杀的例子和大家分享了单体架构中锁的使用方式,但是现在很多应用系统都是相当庞大的,很多应用系统都是微服务的架构体系,那么在这种跨jvm的场...【详细内容】
2021-01-05  Tags: 分布式锁  点击:(142)  评论:(0)  加入收藏
Nginx+Redis+MQ+DB下秒杀实现原理 Nginx+Redis+MQ+DB下限购实现原理 Nginx+Redis+MQ+DB下亿级流量实现原理 Redis在架构中的意义 分布式微服务是快了还是慢了 高可用和可用...【详细内容】
2020-12-22  Tags: 分布式锁  点击:(365)  评论:(0)  加入收藏
应用场景分布式系统中,面对高并发场景,又对数据一致性有一定要求的情况下,使用分布式锁。例如商城中下单扣库存这种情况。解决方案基于数据库例如:select * from mall_spu where...【详细内容】
2020-11-09  Tags: 分布式锁  点击:(174)  评论:(0)  加入收藏
当下在互联网技术架构中,最流行的莫过于分布式架构了。为什么大家纷纷都采用分布式架构呢? 1、高效低廉,将部署在高性能机的程序分散在多个小型机中部署;2、扩展性强,可随着业务...【详细内容】
2020-11-05  Tags: 分布式锁  点击:(96)  评论:(0)  加入收藏
分布式锁使用场景现在的系统都是集群部署,每个服务都不是单节点的了。比如库存服务,可能部署到3台机器上分别命名为节点1,节点2,节点3。库存服务需要扣减库存,扣减库存肯定需要锁...【详细内容】
2020-09-27  Tags: 分布式锁  点击:(140)  评论:(0)  加入收藏
在单进程的系统中,当存在多个线程可以同时改变某个变量(可变共享变量)时,就需要对变量或代码块做同步,使其在修改这种变量时能够线性执行消除并发修改变量。...【详细内容】
2020-09-01  Tags: 分布式锁  点击:(44)  评论:(0)  加入收藏
分布式锁通常有很多选择,基于 Redis 的,基于 Zookeeper 的,基于数据库等等方案。Redis 用于缓存数据,在项目中都有使用,所以使用 Redis 来做分布式锁的会稍微多些。如果用 Redis...【详细内容】
2020-09-01  Tags: 分布式锁  点击:(103)  评论:(0)  加入收藏
分布式锁三种实现方式:1. 基于数据库实现分布式锁;2. 基于缓存(Redis等)实现分布式锁;3. 基于Zookeeper实现分布式锁; 一, 基于数据库实现分布式锁 1. 悲观锁利用select &hellip; w...【详细内容】
2020-07-30  Tags: 分布式锁  点击:(84)  评论:(0)  加入收藏
▌简易百科推荐
来源: my.oschina.net/xiaomu0082/blog/2990388首先说下问题现象:内网sandbox环境API持续1周出现应用卡死,所有api无响应现象刚开始当测试抱怨环境响应慢的时候 ,我们重启一下应...【详细内容】
2021-12-08  Java识堂    Tags:Redis   点击:(18)  评论:(0)  加入收藏
我不知道为什么你会选择对特定数量的“错误”(或警告)如此具体。听起来您正在寻找将要发布到 Yahoo! 的某些文章的内容。 Insider (N Foos to Blah for the BlahBlah)。那说:...【详细内容】
2021-12-07  富集云科技有限公司    Tags:Redis   点击:(14)  评论:(0)  加入收藏
目录 一、背景 二、步骤 0.理论支持 1、获取数据 2、结果 3、分析数据并评估大小 三、关于repl-backlog-size 一、背景 repl-backlog-size控制这个环形缓冲区. ​ 主从断...【详细内容】
2021-11-05  弈秋的美好生活    Tags:redis   点击:(41)  评论:(0)  加入收藏
Redis 性能测试是通过同时执行多个命令实现的。1,Redis-benchmarkRedis性能命令:redis性能命令格式: redis-benchmark [option] [option value] redis 性能测试工具可选参数如...【详细内容】
2021-11-02  川石信息    Tags:Redis   点击:(41)  评论:(0)  加入收藏
1 概述数据结构和内部编码 无传统关系型数据库的 Table 模型schema 所对应的db仅以编号区分。同一 db 内,key 作为顶层模型,它的值是扁平化的。即 db 就是key的命名空间。 key...【详细内容】
2021-11-01  JavaEdge    Tags:Redis   点击:(28)  评论:(0)  加入收藏
普通java中使用引用Java redis 驱动,即可连接:import redis.clients.jedis.Jedis; public class RedisTestJava { public static void main(String[] args) { //连...【详细内容】
2021-10-13  faesuite    Tags:Redis   点击:(34)  评论:(0)  加入收藏
Redis常用的数据结构有 string list set zset hashstringstring 是 Redis 的基本的数据类型,一个 key 对应一个 value。string 类型是二进制安全的,Redis的string可以包含任...【详细内容】
2021-10-12  语霖    Tags:Redis   点击:(36)  评论:(0)  加入收藏
列表类型可以存储一组按插入顺序排序的字符串,它非常灵活,支持在两端插入、弹出数据,可以充当栈和队列的角色。> LPUSH fruit apple(integer) 1> RPUSH fruit banana(integer)...【详细内容】
2021-09-17  深夜敲代码    Tags:Redis   点击:(54)  评论:(0)  加入收藏
Redis持久化意义 是做灾难恢复,数据恢复,也可以归类到高可用的一个环节里面去,比如你的redis整个挂了,然后redis就不可用了,你要做的事情是让redis变得可用,尽快变得可用 大量的请...【详细内容】
2021-08-12  小李说IT    Tags:Redis   点击:(77)  评论:(0)  加入收藏
当查询Redis中没有的数据时,该查询会下沉到数据库层,同时数据库层也没有该数据,当这种情况大量出现或被恶意攻击时,接口的访问全部透过Redis访问数据库,而数据库中也没有这些数据...【详细内容】
2021-07-30  随便t    Tags:缓存穿透   点击:(91)  评论:(0)  加入收藏
最新更新
栏目热门
栏目头条