博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用redis分布式锁实现一个秒杀业务
阅读量:4036 次
发布时间:2019-05-24

本文共 11903 字,大约阅读时间需要 39 分钟。

无论是秒杀还是其他的减库存业务,一个最基本的原则就是不能超卖和少卖。

物品的库存都在一个篮子里,要想不超卖,就必须得加锁控制,而对于分布式系统中,使用redis实现分布式锁也不是什么新鲜事物了。但是控制不好,即便使用redis进行分布式锁的话也有可能在高并发的情况下出现问题,比如超卖、redis连接不上等

在这里使用原生的Jedis和RedisTemplate分别写了两个简单的示例,模拟300并发下,抢100个库存物品。

在此之前先说一下我工程的maven环境,有时候jar包版本不一样也会产生不一样的问题。

pom.xml中主要的一些依赖的版本情况:

org.springframework.boot
spring-boot-starter-parent
2.2.2.RELEASE
org.springframework.boot
spring-boot-starter
org.springframework.boot
spring-boot-starter-logging
redis.clients
jedis
3.1.0
org.springframework.data
spring-data-redis
2.2.3.RELEASE

在redis的RedisConnectionUtils,有使用log:

所以你需要指定一种log方式,我使用的是logback.xml, 因为

spring-boot-starter-logging的依赖中会自动引入logback的依赖,所以无需单独引入logback的依赖。

logback.xml:

%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n

如果level设置的是debug,那么在redistemplate使用的时候就会打印类似的日志:

通过查看RedisTemplate源码可以看到,关键的方法如下:

@Nullable	public 
T execute(RedisCallback
action, boolean exposeConnection, boolean pipeline) { Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it"); Assert.notNull(action, "Callback object must not be null"); RedisConnectionFactory factory = getRequiredConnectionFactory(); RedisConnection conn = null; try { if (enableTransactionSupport) { // only bind resources in case of potential transaction synchronization conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport); } else { conn = RedisConnectionUtils.getConnection(factory); } boolean existingConnection = TransactionSynchronizationManager.hasResource(factory); RedisConnection connToUse = preProcessConnection(conn, existingConnection); boolean pipelineStatus = connToUse.isPipelined(); if (pipeline && !pipelineStatus) { connToUse.openPipeline(); } RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse)); T result = action.doInRedis(connToExpose); // close pipeline if (pipeline && !pipelineStatus) { connToUse.closePipeline(); } // TODO: any other connection processing? return postProcessResult(result, connToUse, existingConnection); } finally { RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport); } }

比较关键的是:

conn = RedisConnectionUtils.getConnection(factory); //创建redis连接
T result = action.doInRedis(connToExpose); //执行redis命令,可以指定pipeline方式,本文不介绍
RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport);//关闭redis连接

创建redis连接和关闭redis连接的时候,有如下代码:

如果是debug级别,就会打印,不是就不会打印。

 

 

1、直接使用Jedis

public class JedisSecKill {    static AtomicInteger integer = new AtomicInteger();    public static void secKillHandle() throws Exception {        String lockKey = "key:123456"; //分布式锁使用的key        String stockKey = "key:123456:info:stock"; //物品库存的key        String value = UUID.randomUUID().toString();        Jedis jedis = RedisPool.getJedis();        try{            jedis.set(stockKey,"100"); //初试化库存100个        }finally {            jedis.close();        }        //模拟300并发来抢100个物品        CountDownLatch cb = new CountDownLatch(1);        for(int i=0; i< 300; i++){            new Thread(new Runnable() {                @Override                public void run() {                    try {                        cb.await();                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    try {                        doHandle(lockKey, value, stockKey);                    } catch (Exception e) {                        e.printStackTrace();                    }                }            }).start();        }        cb.countDown();    }    private static void doHandle(String lockKey, String value, String stockKey) throws Exception {        Jedis jedis = RedisPool.getJedis();        try{            String lockFlag = null;            do{                lockFlag = jedis.set(lockKey, value, new SetParams().nx().ex(10)); //获取锁,设置10s                if(null !=lockFlag && lockFlag.equals("OK")) {                    Long currStock = jedis.incrBy(stockKey, -1); //直接扣减库存,原子操作                    if (currStock < 0) {                        System.out.println("东西已被抢完!");                        break;                    } else {                        //只有初始化的库存扣减了方可创建订单                        try {                            Thread.sleep(20);                            int i = integer.incrementAndGet();                            System.out.println("订单已经生成,第"+i+"单");                        } catch (InterruptedException e) {                            e.printStackTrace();                        }                    }                }else {                    System.out.println("没抢到,1s后继续抢");                    Thread.sleep(1000);                }            }while (lockFlag == null);        }finally {            safeUnlock2(lockKey,value,jedis);            jedis.close();        }    }    /**     * 使用lua脚本删除指定的key和value     * @param key     * @param value     * @param jedis     */    private static void safeUnlock2(String key, String value, Jedis jedis){        String lua = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then return redis.call(\"del\",KEYS[1]) else return 0 end";        Object result = jedis.eval(lua, Arrays.asList(key), Arrays.asList(value));        System.out.println("result==============="+result);    }}
public class RedisPool extends JedisPool{	/*private static final String HOST = ForResourceFiles.getValByKey("redis.properties", "redis.host");	private static final String PORT = ForResourceFiles.getValByKey("redis.properties", "redis.port");	private static final String AUTH = ForResourceFiles.getValByKey("redis.properties", "redis.auth");	*/				private static JedisPool pool = null;	private static boolean inited=false;			/*	static {		JedisPoolConfig config = new JedisPoolConfig();		config.setMaxIdle(50);		config.setMaxWaitMillis(10000L);		config.setMaxTotal(1024);		if (StringUtils.isBlank(AUTH)) {			pool = new JedisPool(config, HOST, Integer.parseInt(PORT), Protocol.DEFAULT_TIMEOUT*50);		} else {			pool = new JedisPool(config, HOST, Integer.parseInt(PORT), Protocol.DEFAULT_TIMEOUT*50, AUTH);		}	}	*/		/**	 * 是否初始化	 * @return	 */	public static boolean isInited() {		return inited;	}    /**     * 初始化在类BohConfiguration,spring初始化完成之后     */	public static void init() {				String HOST = "127.0.0.1";		String PORT = "6379";		String AUTH = "root";				JedisPoolConfig config = new JedisPoolConfig();		config.setMaxIdle(50);		config.setMaxWaitMillis(10000L);		config.setMaxTotal(1024);		if (StringUtils.isBlank(AUTH)) {			pool = new JedisPool(config, HOST, Integer.parseInt(PORT), Protocol.DEFAULT_TIMEOUT);		} else {			pool = new JedisPool(config, HOST, Integer.parseInt(PORT), Protocol.DEFAULT_TIMEOUT, AUTH);		}				inited=true;	}			public static JedisPool getPool()	{		if(null==pool)			init();		return pool;			}			public static Jedis getJedis() throws Exception {		return getPool().getResource();	}	/**	 * Jedis3.0后,jedisPool.returnResource(jedis) -> jedis.close();	 */	public static void close(Jedis jedis) {		if (jedis != null) {			jedis.close();		}	}	/**	 * Jedis3.0后,jedisPool.returnBrokenResource(jedis) -> jedis.close();	 */	public static void closeBroken(Jedis jedis) {		if (jedis != null) {			jedis.close();		}	}}

 

2、使用RedisTemplate:

public class RedisTemplateNoThreadLocalSecKill extends RedisConnectionUtils{    static StringRedisTemplate redisTemplate = new StringRedisTemplate();    static AtomicInteger integer = new AtomicInteger();    static StringRedisSerializer stringRedisSerializer=new StringRedisSerializer();    public static void secKillHandle(){        //初始化库存        initStock(initTemplate());        int total = 300;        //模拟300并发抢物品        CountDownLatch cb = new CountDownLatch(1);        for(int i=0; i< total; i++){            new Thread(new Runnable() {                @Override                public void run() {                    try {                        cb.await();                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    try {                        doHandle(redisTemplate);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                }            }).start();        }        cb.countDown();    }    private static StringRedisTemplate initTemplate(){        JedisConnectionFactory jedisConnectionFactory = connectRedis();        jedisConnectionFactory.afterPropertiesSet();        redisTemplate.setConnectionFactory(jedisConnectionFactory);        //设置默认序列化的方式        redisTemplate.setDefaultSerializer(stringRedisSerializer);        redisTemplate.afterPropertiesSet();        return redisTemplate;    }    private static void initStock(StringRedisTemplate redisTemplate){        redisTemplate.opsForHash().put("key:123456:info",                "stock", "100");    }    private static JedisConnectionFactory connectRedis(){        RedisPassword redisPassword = RedisPassword.of("root");        RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration("127.0.0.1",6379);        redisStandaloneConfiguration.setPassword(redisPassword);        return new JedisConnectionFactory(redisStandaloneConfiguration);    }    private static void doHandle(StringRedisTemplate redisTemplate) throws InterruptedException {        String key = "key:123456";        String value = UUID.randomUUID().toString();        try{            Boolean lockFlag = false;            do{                lockFlag = redisTemplate.opsForValue().setIfAbsent(key, value, 10, TimeUnit.SECONDS);                if(lockFlag) {                    Long currStock = redisTemplate.opsForHash().increment("key:123456:info","stock", -1);                    if (currStock < 0) {                        System.out.println("东西已被抢完!");                        break;                    } else {                        try {                            Thread.sleep(20);                            int i = integer.incrementAndGet();                            System.out.println("订单已经生成,第"+i+"单");                        } catch (InterruptedException e) {                            e.printStackTrace();                        }                    }                }else{                    System.out.println("没抢到,1s后继续抢");                    Thread.sleep(1000);                }            }while(!lockFlag);        }finally {            safeUnlock(key,value,redisTemplate);        }    }    private static void safeUnlock(String key, String value, StringRedisTemplate redisTemplate){        String luaScript = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then return redis.call(\"del\",KEYS[1]) else return 'OK' end";        DefaultRedisScript defaultRedisScript = new DefaultRedisScript();        defaultRedisScript.setScriptText(luaScript);        defaultRedisScript.setResultType(Long.class);        List
list = new ArrayList<>(); list.add(key); redisTemplate.execute(defaultRedisScript, list, value); }

 

两种方式都是使用lua脚本实现解锁。原生jedis比较简单,不做过多介绍。 RedisTemplate底层也是用的jedis。

在这里

用redisTemplate每执行一个命令,都会进行redis连接创建及redis连接释放的操作。追踪源码的话,这些命令都会执行RedisTemplate的execute方法(上文已经介绍)。

 

PS遇到的坑:

1、在redisTemplate.setConnectionFactory的时候,其参数值必须是明确的示例对象引用,一开始的时候,我写的是:

结果运行的时候就有很多连接报redis reset的的异常。

2、运行完redis里面的库存值是-200

一开始以为超卖了,想想是正常的。因为300个人抢,都会去扣库存,库存数初始化就100,所以扣完就是-200,只是有200个人抢不到而已。

代码控制主要是这里:

为什么直接用increment的值进行判断呢,因为increment是个原子性的,如果再添加一步get的话,两步合起来就不是原子性的了,高并发下会出意想不到的问题。

 

文中的代码因为是示例,有些粗糙,有不对的地方还望批评指正。

 

 

 

转载地址:http://jqcdi.baihongyu.com/

你可能感兴趣的文章
scrapy:xpath string(.)非常注意问题
查看>>
yuv to rgb 转换失败呀。天呀。谁来帮帮我呀。
查看>>
yuv420 format
查看>>
YUV420只绘制Y通道
查看>>
yuv420 还原为RGB图像
查看>>
LED恒流驱动芯片
查看>>
驱动TFT要SDRAM做为显示缓存
查看>>
使用file查看可执行文件的平台性,x86 or arm ?
查看>>
qt5 everywhere 编译summary
查看>>
qt5 everywhere编译完成后,找不到qmake
查看>>
arm-linux开机读取硬件时钟,设置系统时钟。
查看>>
交叉编译在x86上调试好的qt程序
查看>>
qt 创建异形窗体
查看>>
可重入函数与不可重入函数
查看>>
简单Linux C线程池
查看>>
内存池
查看>>
输入设备节点自动生成
查看>>
opencv test code-1
查看>>
eclipse 导入先前存在的项目
查看>>
GNU hello代码分析
查看>>