秒杀案例

使用Springboot+MyBatis+Redis+Rabbit+Mysql实现商品秒杀的功能。

项目的整体流程

在秒杀开始之前,将数据库中的参加秒杀的商品同步至Redis中。秒杀开始,用户发起请求,为了流程简洁,直接在网关层中的Controller中负责对部分请求进行过滤,与Redis进行交互。然后发送消息,即将用户请求信息放入Rabbitmq中。在serivce项目中消费消息,即获取到Rabbitmq中的用户请求信息,随后完成与Mysql的交互,将信息持久化到数据库中。

网关项目中的Controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@RestController
public class SpikeController {

@Autowired
private MyBloomFilter bloomFilter;
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;

@GetMapping("/doSpike")
public String doSpike(Long productId, Long userId) {
String key = productId + ":" + userId;
// 使用bloomFilter快速判断key值是否存在
if (bloomFilter.isExist(key)) {
return "已经参与抢购,不能重复参加!";
}
// 请求次数累计
Long count = redisTemplate.opsForValue().increment("requestCount:productId:userId:" + productId + "_" + userId);
if (count != null && count > 500) {
return "该商品已被抢完!";
}
// 更新redis中的数据
Long finalStock = redisTemplate.opsForValue().decrement("product:" + productId);
if ( finalStock < 0) {
return "该商品已被抢完!";
}
// 将请求信息存入mq
Map<String, Object> map = new HashMap<>(16);
map.put("userId", userId);
map.put("productId", productId);
rabbitTemplate.convertAndSend("spike.queue", JSON.toJSONString(map));
return "抢购成功!";
}
}

布隆过滤器工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Component
public class MyBloomFilter {

@Autowired
private StringRedisTemplate redisTemplate;
/**
* 范围
*/
private int size = 2 << 29;
/**
* 质数的数组
*/
private int[] seeds = {7, 11, 13, 17, 19};
/**
* 特征函数的个数
*/
private final int HASH_FUN_NUM = 5;
/**
* 搞一个数组 专门放特征函数
*/
private HashFun[] hashFuns = new HashFun[HASH_FUN_NUM];

public MyBloomFilter() {
for (int i = 0; i < HASH_FUN_NUM; i++) {
hashFuns[i] = new HashFun(size, seeds[i]);
}
}
/**
* 把字符串做特征函数 然后添加到位图中去
* @param str
*/
public void addBloom(String str) {
// 循环特征函数的数组
for (HashFun hashFun : hashFuns) {
// 每次计算出一个点位
int pos = hashFun.getPos(str);
System.out.println(pos);
// 把点位存起来 存在redis的bitmap中去 map<String,BitMap>
redisTemplate.opsForValue().setBit("SPIKE", pos, true);
}
}

/**
* 判断是否重复
* @param str
* @return
*/
public Boolean isExist(String str) {
// 计算五次
for (HashFun hashFun : hashFuns) {
int pos = hashFun.getPos(str);
// 从位图中判断是否存在
Boolean flag = redisTemplate.opsForValue().getBit("SPIKE", pos);
if (null != flag && !flag) {
// 只要有一个点位 不存在 说明不重复
return false;
}
}
return true;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class HashFun {
/**
* 范围
*/
private int size;

/**
* seed 种子
*/
private int seed;


public HashFun(int size, int seed) {
this.size = size;
this.seed = seed;
}

/**
* hash值
* @param str
* @return
*/
public int hash(String str) {
int h = 0;
if (!StringUtils.hasText(str)) {
// 空字符串的hash值就是0
return h;
}
char[] chars = str.toCharArray();
for (int i = 0; i < chars.length; i++) {
h = seed * h + chars[i];
}
return h;
}
/**
* 得到位置
* @param str
* @return
*/
public int getPos(String str) {
int hash = hash(str);
return hash & (size - 1);
}
}

Service项目

数据同步类

可写成任务调度的形式,需要在主启动类上添加注解@EnableScheduling,开启任务调度功能。

这里采用每次项目启动时进行数据同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Component
public class MysqlToRedis implements CommandLineRunner {

@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private ProductMapper productMapper;

/**
* 实现CommandLineRunner接口
* springboot启动后会自动调用重写的 run 方法
* @param args
* @throws Exception
*/
@Override
public void run(String... args) throws Exception {
// 查询秒杀商品
List<Product> spikeProducts = productMapper.listSpikeProducts();
// 存入redis
if (CollectionUtils.isEmpty(spikeProducts)) {
return;
}
spikeProducts.forEach(p -> stringRedisTemplate.opsForValue().set("product:" + p.getId(), String.valueOf(p.getStock())));
}

// /**
// * 每天12点执行
// */
// @Scheduled(cron="0 0 12 * * ? *")
// public void spikeTask(){
//
// }

// /**
// * 项目执行完成后执行
// */
// @PostConstruct
// public void spikeTask(){
//
// }
}

消息监视类

用来监听消息和获取消息,从消息队列中获取请求信息,完成与数据库的交互。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Component
public class SpikeListener {

@Autowired
private IOrderService orderService;

@Autowired
private MyBloomFilter bloomFilter;

@RabbitListener(queues = "spike.queue",concurrency="3-5")
public void handleSpike(Channel channel, Message message) throws Exception {
// 获取消息队列中的数据
String msg = new String(message.getBody());
JSONObject jsonObject = JSON.parseObject(msg);
Long userId = jsonObject.getLong("userId");
Long productId = jsonObject.getLong("productId");
// 生成订单记录 减少库存
orderService.handleOrderAndStock(userId,productId);
// 将商品id和用户id信息对应的key加入bloomFilter
bloomFilter.addBloom(productId + ":" + userId);
// 签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}

处理秒杀业务方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Transactional(rollbackFor = RuntimeException.class)
public void handleOrderAndStock(Long userId, Long productId) {
int time = 0;
// 自旋拿锁
while (time < waitTime) {
// 使用redis分布式锁 防止线程安全问题
String lockKey = "lock:" + productId;
String uuid = UUIDUtils.getUUID().toString();
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, uuid, 3, TimeUnit.SECONDS);
if (null != lock && !lock) {
// 没拿到锁
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
time++;
} else {
try {
Product product = productMapper.selectById(productId);
long finalStock = product.getStock() - 1;
if (finalStock < 0) {
log.info("库存不足!");
return;
} else {
product.setStock(finalStock);
productMapper.update(product);
insert(new Order(null, userId, productId));
return;
}
} finally {
// 释放锁 使用uuid校验防止误删
if (uuid.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
stringRedisTemplate.delete(lockKey);
}
}
}
}
}

业务方法的优化

使用Redisson解决分布式锁问题,保证线程操作的原子性,防止线程释放掉其他线程获取的锁。

引入Redisson依赖

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.6</version>
</dependency>

编写配置类

1
2
3
4
5
6
7
8
9
10
@Configuration
public class RedissonConfig {

@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer().setTimeout(50000).setAddress("redis://127.0.0.1:6379");
return Redisson.create(config);
}
}

改写业务方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Transactional(rollbackFor = RuntimeException.class)
public void handleOrderAndStock(Long userId, Long productId) {

RLock lock = redissonClient.getLock("lock:" + productId);
lock.lock(30, TimeUnit.SECONDS);
try {
Product product = productMapper.selectById(productId);
long finalStock = product.getStock() - 1;
if (finalStock < 0) {
log.info("库存不足!");
return;
} else {
product.setStock(finalStock);
productMapper.update(product);
insert(new Order(null, userId, productId));
return;
}
} finally {
lock.unlock();
}
}