本文共 11903 字,大约阅读时间需要 39 分钟。
无论是秒杀还是其他的减库存业务,一个最基本的原则就是不能超卖和少卖。
物品的库存都在一个篮子里,要想不超卖,就必须得加锁控制,而对于分布式系统中,使用redis实现分布式锁也不是什么新鲜事物了。但是控制不好,即便使用redis进行分布式锁的话也有可能在高并发的情况下出现问题,比如超卖、redis连接不上等
在这里使用原生的Jedis和RedisTemplate分别写了两个简单的示例,模拟300并发下,抢100个库存物品。
在此之前先说一下我工程的maven环境,有时候jar包版本不一样也会产生不一样的问题。
pom.xml中主要的一些依赖的版本情况:
org.springframework.boot spring-boot-starter-parent 2.2.2.RELEASE org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-logging redis.clients jedis 3.1.0 org.springframework.data spring-data-redis 2.2.3.RELEASE
在redis的RedisConnectionUtils,有使用log:
所以你需要指定一种log方式,我使用的是logback.xml, 因为
spring-boot-starter-logging的依赖中会自动引入logback的依赖,所以无需单独引入logback的依赖。
logback.xml:
%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
如果level设置的是debug,那么在redistemplate使用的时候就会打印类似的日志:
通过查看RedisTemplate源码可以看到,关键的方法如下:
@Nullable publicT execute(RedisCallback action, boolean exposeConnection, boolean pipeline) { Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it"); Assert.notNull(action, "Callback object must not be null"); RedisConnectionFactory factory = getRequiredConnectionFactory(); RedisConnection conn = null; try { if (enableTransactionSupport) { // only bind resources in case of potential transaction synchronization conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport); } else { conn = RedisConnectionUtils.getConnection(factory); } boolean existingConnection = TransactionSynchronizationManager.hasResource(factory); RedisConnection connToUse = preProcessConnection(conn, existingConnection); boolean pipelineStatus = connToUse.isPipelined(); if (pipeline && !pipelineStatus) { connToUse.openPipeline(); } RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse)); T result = action.doInRedis(connToExpose); // close pipeline if (pipeline && !pipelineStatus) { connToUse.closePipeline(); } // TODO: any other connection processing? return postProcessResult(result, connToUse, existingConnection); } finally { RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport); } }
比较关键的是:
conn = RedisConnectionUtils.getConnection(factory); //创建redis连接
T result = action.doInRedis(connToExpose); //执行redis命令,可以指定pipeline方式,本文不介绍
RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport);//关闭redis连接
创建redis连接和关闭redis连接的时候,有如下代码:
如果是debug级别,就会打印,不是就不会打印。
1、直接使用Jedis
public class JedisSecKill { static AtomicInteger integer = new AtomicInteger(); public static void secKillHandle() throws Exception { String lockKey = "key:123456"; //分布式锁使用的key String stockKey = "key:123456:info:stock"; //物品库存的key String value = UUID.randomUUID().toString(); Jedis jedis = RedisPool.getJedis(); try{ jedis.set(stockKey,"100"); //初试化库存100个 }finally { jedis.close(); } //模拟300并发来抢100个物品 CountDownLatch cb = new CountDownLatch(1); for(int i=0; i< 300; i++){ new Thread(new Runnable() { @Override public void run() { try { cb.await(); } catch (InterruptedException e) { e.printStackTrace(); } try { doHandle(lockKey, value, stockKey); } catch (Exception e) { e.printStackTrace(); } } }).start(); } cb.countDown(); } private static void doHandle(String lockKey, String value, String stockKey) throws Exception { Jedis jedis = RedisPool.getJedis(); try{ String lockFlag = null; do{ lockFlag = jedis.set(lockKey, value, new SetParams().nx().ex(10)); //获取锁,设置10s if(null !=lockFlag && lockFlag.equals("OK")) { Long currStock = jedis.incrBy(stockKey, -1); //直接扣减库存,原子操作 if (currStock < 0) { System.out.println("东西已被抢完!"); break; } else { //只有初始化的库存扣减了方可创建订单 try { Thread.sleep(20); int i = integer.incrementAndGet(); System.out.println("订单已经生成,第"+i+"单"); } catch (InterruptedException e) { e.printStackTrace(); } } }else { System.out.println("没抢到,1s后继续抢"); Thread.sleep(1000); } }while (lockFlag == null); }finally { safeUnlock2(lockKey,value,jedis); jedis.close(); } } /** * 使用lua脚本删除指定的key和value * @param key * @param value * @param jedis */ private static void safeUnlock2(String key, String value, Jedis jedis){ String lua = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then return redis.call(\"del\",KEYS[1]) else return 0 end"; Object result = jedis.eval(lua, Arrays.asList(key), Arrays.asList(value)); System.out.println("result==============="+result); }}
public class RedisPool extends JedisPool{ /*private static final String HOST = ForResourceFiles.getValByKey("redis.properties", "redis.host"); private static final String PORT = ForResourceFiles.getValByKey("redis.properties", "redis.port"); private static final String AUTH = ForResourceFiles.getValByKey("redis.properties", "redis.auth"); */ private static JedisPool pool = null; private static boolean inited=false; /* static { JedisPoolConfig config = new JedisPoolConfig(); config.setMaxIdle(50); config.setMaxWaitMillis(10000L); config.setMaxTotal(1024); if (StringUtils.isBlank(AUTH)) { pool = new JedisPool(config, HOST, Integer.parseInt(PORT), Protocol.DEFAULT_TIMEOUT*50); } else { pool = new JedisPool(config, HOST, Integer.parseInt(PORT), Protocol.DEFAULT_TIMEOUT*50, AUTH); } } */ /** * 是否初始化 * @return */ public static boolean isInited() { return inited; } /** * 初始化在类BohConfiguration,spring初始化完成之后 */ public static void init() { String HOST = "127.0.0.1"; String PORT = "6379"; String AUTH = "root"; JedisPoolConfig config = new JedisPoolConfig(); config.setMaxIdle(50); config.setMaxWaitMillis(10000L); config.setMaxTotal(1024); if (StringUtils.isBlank(AUTH)) { pool = new JedisPool(config, HOST, Integer.parseInt(PORT), Protocol.DEFAULT_TIMEOUT); } else { pool = new JedisPool(config, HOST, Integer.parseInt(PORT), Protocol.DEFAULT_TIMEOUT, AUTH); } inited=true; } public static JedisPool getPool() { if(null==pool) init(); return pool; } public static Jedis getJedis() throws Exception { return getPool().getResource(); } /** * Jedis3.0后,jedisPool.returnResource(jedis) -> jedis.close(); */ public static void close(Jedis jedis) { if (jedis != null) { jedis.close(); } } /** * Jedis3.0后,jedisPool.returnBrokenResource(jedis) -> jedis.close(); */ public static void closeBroken(Jedis jedis) { if (jedis != null) { jedis.close(); } }}
2、使用RedisTemplate:
public class RedisTemplateNoThreadLocalSecKill extends RedisConnectionUtils{ static StringRedisTemplate redisTemplate = new StringRedisTemplate(); static AtomicInteger integer = new AtomicInteger(); static StringRedisSerializer stringRedisSerializer=new StringRedisSerializer(); public static void secKillHandle(){ //初始化库存 initStock(initTemplate()); int total = 300; //模拟300并发抢物品 CountDownLatch cb = new CountDownLatch(1); for(int i=0; i< total; i++){ new Thread(new Runnable() { @Override public void run() { try { cb.await(); } catch (InterruptedException e) { e.printStackTrace(); } try { doHandle(redisTemplate); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } cb.countDown(); } private static StringRedisTemplate initTemplate(){ JedisConnectionFactory jedisConnectionFactory = connectRedis(); jedisConnectionFactory.afterPropertiesSet(); redisTemplate.setConnectionFactory(jedisConnectionFactory); //设置默认序列化的方式 redisTemplate.setDefaultSerializer(stringRedisSerializer); redisTemplate.afterPropertiesSet(); return redisTemplate; } private static void initStock(StringRedisTemplate redisTemplate){ redisTemplate.opsForHash().put("key:123456:info", "stock", "100"); } private static JedisConnectionFactory connectRedis(){ RedisPassword redisPassword = RedisPassword.of("root"); RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration("127.0.0.1",6379); redisStandaloneConfiguration.setPassword(redisPassword); return new JedisConnectionFactory(redisStandaloneConfiguration); } private static void doHandle(StringRedisTemplate redisTemplate) throws InterruptedException { String key = "key:123456"; String value = UUID.randomUUID().toString(); try{ Boolean lockFlag = false; do{ lockFlag = redisTemplate.opsForValue().setIfAbsent(key, value, 10, TimeUnit.SECONDS); if(lockFlag) { Long currStock = redisTemplate.opsForHash().increment("key:123456:info","stock", -1); if (currStock < 0) { System.out.println("东西已被抢完!"); break; } else { try { Thread.sleep(20); int i = integer.incrementAndGet(); System.out.println("订单已经生成,第"+i+"单"); } catch (InterruptedException e) { e.printStackTrace(); } } }else{ System.out.println("没抢到,1s后继续抢"); Thread.sleep(1000); } }while(!lockFlag); }finally { safeUnlock(key,value,redisTemplate); } } private static void safeUnlock(String key, String value, StringRedisTemplate redisTemplate){ String luaScript = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then return redis.call(\"del\",KEYS[1]) else return 'OK' end"; DefaultRedisScript defaultRedisScript = new DefaultRedisScript(); defaultRedisScript.setScriptText(luaScript); defaultRedisScript.setResultType(Long.class); Listlist = new ArrayList<>(); list.add(key); redisTemplate.execute(defaultRedisScript, list, value); }
两种方式都是使用lua脚本实现解锁。原生jedis比较简单,不做过多介绍。 RedisTemplate底层也是用的jedis。
在这里
用redisTemplate每执行一个命令,都会进行redis连接创建及redis连接释放的操作。追踪源码的话,这些命令都会执行RedisTemplate的execute方法(上文已经介绍)。
PS遇到的坑:
1、在redisTemplate.setConnectionFactory的时候,其参数值必须是明确的示例对象引用,一开始的时候,我写的是:
结果运行的时候就有很多连接报redis reset的的异常。
2、运行完redis里面的库存值是-200
一开始以为超卖了,想想是正常的。因为300个人抢,都会去扣库存,库存数初始化就100,所以扣完就是-200,只是有200个人抢不到而已。
代码控制主要是这里:
为什么直接用increment的值进行判断呢,因为increment是个原子性的,如果再添加一步get的话,两步合起来就不是原子性的了,高并发下会出意想不到的问题。
文中的代码因为是示例,有些粗糙,有不对的地方还望批评指正。
转载地址:http://jqcdi.baihongyu.com/