侧边栏壁纸
博主头像
coydone博主等级

记录学习,分享生活的个人站点

  • 累计撰写 306 篇文章
  • 累计创建 51 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

ActiveMQ秒杀功能实现

coydone
2022-06-17 / 0 评论 / 0 点赞 / 401 阅读 / 10,356 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-05-01,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

使用Redis+ActiveMQ+MySQL实现秒杀功能。

准备工作

1、使用Docker启动Redis

docker run -d --name redis -p 6379:6379 redis --requirepass "123456"

2、准备ActiveMQ环境

准备mysql数据库

-- 创建数据库miaosan 字符集:utf8mb4 排序规则:utf8mb4_unicode_ci

DROP TABLE IF EXISTS `t_goods`;
CREATE TABLE `t_goods`  (
  `goods_id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '商品ID',
  `goods_name` varchar(300) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '商品名称',
  `shop_id` bigint(20) NULL DEFAULT NULL COMMENT '店铺id',
  `ori_price` decimal(15, 2) NULL DEFAULT 0.00 COMMENT '原价',
  `price` decimal(15, 2) NULL DEFAULT NULL COMMENT '现价',
  `brief` varchar(500) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT '' COMMENT '简要描述,卖点等',
  `content` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '详细描述',
  `picture` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '商品主图',
  `imgs` varchar(1000) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '商品图片,以,分割',
  `status` int(1) NULL DEFAULT 0 COMMENT '默认是1,表示正常状态, -1表示删除, 0下架',
  `category_id` bigint(20) UNSIGNED NULL DEFAULT NULL COMMENT '商品分类',
  `sold_num` int(11) NULL DEFAULT NULL COMMENT '销量',
  `total_stocks` int(11) NULL DEFAULT 0 COMMENT '总库存',
  `delivery_mode` json NULL COMMENT '配送方式json见TransportModeVO',
  `delivery_template_id` bigint(20) NULL DEFAULT NULL COMMENT '运费模板id',
  `create_time` datetime(0) NULL DEFAULT NULL COMMENT '录入时间',
  `update_time` datetime(0) NULL DEFAULT NULL COMMENT '修改时间',
  `putaway_time` datetime(0) NULL DEFAULT NULL COMMENT '上架时间',
  `version` int(11) NULL DEFAULT NULL COMMENT '版本 乐观锁',
  `ismiaosha` int(11) NULL DEFAULT 0 COMMENT '是否参与秒杀1是0否',
  PRIMARY KEY (`goods_id`) USING BTREE,
  INDEX `shop_id`(`shop_id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 95 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '商品' ROW_FORMAT = Dynamic;

DROP TABLE IF EXISTS `t_order`;
CREATE TABLE `t_order`  (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `userid` int(11) NULL DEFAULT NULL,
  `goodsid` int(11) NULL DEFAULT NULL,
  `createtime` datetime(0) NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;

表结构:

流程图:

创建项目并配置环境

1、创建SpringBoot项目,选择Lombok、SpringWeb、JDBC API、Mybatis Framework、MySQL Driver、Spring Data Redis(Access+Driver)、Spring for Apache ActiveMQ 5。

2、修改pom.xml引入Druid数据源。

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid-spring-boot-starter</artifactId>
    <version>1.1.21</version>
</dependency>

3、修改yaml。

server:
  port: 8080
#数据源的配置
spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/miaosha?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
    username: root
    password: 123456
    #注入数据源的类型   默认的为HikariDataSource
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
      #      filters: log4j,stat
      max-active: 20
      min-idle: 5
      validation-query: select x
      initial-size: 3
      max-wait: 5000
#      stat-view-servlet:
#        login-username: root
#        login-password: 123456
#        allow:
#        deny:
#        url-pattern: /druid/*
#        enabled: true  #启用数据源监控
  #redis配置
  redis:
    host: 127.0.0.1
    password: 123456
    port: 6390
  activemq:
    broker-url: tcp://127.0.0.1:61616
    user: admin
    password: admin
  jms:
    pub-sub-domain: false   #false代表队列[默认]  true代表主题

#mybatis的配置
mybatis:
  mapper-locations: classpath:mapper/*Mapper.xml  #配置mapper.xml的扫描
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

4、MBG自动生成实体类entity、mapper接口和映射文件。

实现秒杀业务

创建ActiveMQConfig配置类。

@Configuration
public class ActiveMQConfig {
    public static final String QUEUE_NAME="seconds-kill-queue";
    @Bean
    public Queue queue(){
        return new ActiveMQQueue(QUEUE_NAME);
    }
}

创建Sender发送消息。

@Service
@Log4j2
public class Sender {
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    private int i;
    /**
     * 如查用户抢到商品就发送到队列,信息包含商品ID和用户ID
     * @param goodsId
     * @param userId
     */
    public void sendDirectQueue(String goodsId,String userId){
        log.info(">>>>>>>>>>>>>>秒杀请求已发送,商品ID为:"+goodsId+"--用户ID:"+userId);
        try {
            Map<String,String> map=new HashMap<>();
            map.put("goodsId",goodsId);
            map.put("userId",userId);
            //第一个参数是指要发送到哪个队列里面, 第二个参数是指要发送的内容
            jmsMessagingTemplate.convertAndSend(ActiveMQConfig.QUEUE_NAME, map);
            //此处为了记录并发请求下,请求的次数及消息传递的次数
            log.info("发送请求>>>>>>>>>>>>>"+i++);
        } catch (Exception e) {
            log.error("请求发送异常:"+e.getMessage());
            e.printStackTrace();
        }
    }
}

创建Receiver异步接收消息。

@Service
@Log4j2
public class Receiver {
    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private GoodsService goodsService;

    @Autowired
    private OrderService orderService;
    
    private int i;

    @JmsListener(destination = ActiveMQConfig.QUEUE_NAME)
    public void receive(MapMessage mapMessage){
        try {
            //取到商品ID判断redis里面的该商品库存是否为0
            String goodsId = mapMessage.getString("goodsId");
            ValueOperations<String, String> opsForValue = redisTemplate.opsForValue();
            long num = opsForValue.decrement(goodsId).longValue();//递减
            if(num<0){
                /**
                 * 此处不能判断等于0,因为当商品库存为1时,Redis执行递减返回为0
                 * 如果判断为0商品最后不能卖完也就是当库存为1时此处就抛异常了
                 */
                throw new RuntimeException("库存不足啦,不能再抢了");
            }
            log.info("接收时>>>>>>>>>>>"+i++);
            Goods goods=new Goods();
            goods.setGoodsId(Long.valueOf(goodsId));
            //哪果不为0=则减小mysql里面该商品的库存
            goods.setTotalStocks((int)num);
            //根据商品的id和库存同步数据到MySQL
            if(!goodsService.updateByPrimaryKeySelective(goods)) {
                throw new RuntimeException("同步到商品表异常!");
            }
            ////生成订单
            String uid=mapMessage.getString("userId");
            log.info("成功了>>>>>>>>>>>"+uid+"  抢到了商品 O(∩_∩)O哈哈~");
            Order order=new Order();
            order.setGoodsid(Integer.valueOf(goodsId));
            order.setUserid(Integer.valueOf(uid));
            order.setCreatetime(new Date());
            orderService.insert(order);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

创建GoodsService及其实现类。

public interface GoodsService {
    boolean updateByPrimaryKeySelective(Goods goods);
}

@Transactional
@Service
public class GoodsServiceImpl implements GoodsService {
    @Autowired
    private GoodsMapper goodsMapper;
    
    @Override
    public boolean updateByPrimaryKeySelective(Goods goods){
        return goodsMapper.updateByPrimaryKeySelective(goods)>0;
    }
}

创建OrderService及其实现类。

public interface OrderService{
    void insert(Order order);
}

@Transactional
@Service
public class OrderServiceImpl implements OrderService{
    @Autowired
    private OrderMapper orderMapper;
    
    @Override
    public void insert(Order order){
        orderMapper.insert(order);
    }
}

程序启动时加载要秒杀的商品到Redis

创建ApplicationInitListener。

@Component
@Scope("singleton")
@Log4j2
public class ApplicationInitListener implements ApplicationListener<ContextRefreshedEvent> {
    @Autowired
    private GoodsService goodsService;

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if(event.getApplicationContext().getParent() == null) {
            ValueOperations<String, String> opsForValue = redisTemplate.opsForValue();
            log.info(">>>>>>>>>>>>项目初始化完成,执行监听器中逻辑");
            //mapper中的sql,返回全部上架(支持秒杀)的商品集合
            List<Goods> list = goodsService.selectGoodsToMiaoSha();
            Iterator<Goods> it = list.iterator();
            while(it.hasNext()) {
                Goods p = it.next();
                try {
                    opsForValue.set(String.valueOf(p.getGoodsId()), String.valueOf(p.getTotalStocks()));
                    log.info("商品放成Redis成功ID:"+p.getGoodsId()+"商品库存:"+p.getTotalStocks());
                } catch (Exception e) {
                    log.error("当前商品ID:"+p.getGoodsId()+"库存:"+p.getTotalStocks()+"放入Redis缓存异常<<<<<<<<<<<<<<<<<<<<");
                    e.printStackTrace();
                }
            }
        }
    }
}

修改GoodsService及其实现类。

public interface GoodsService {
    boolean updateByPrimaryKeySelective(Goods goods);
    
    List<Goods> selectGoodsToMiaoSha();
}

@Transactional
@Service
public class GoodsServiceImpl implements GoodsService {
    @Autowired
    private GoodsMapper goodsMapper;
    
    @Override
    public boolean updateByPrimaryKeySelective(Goods goods){
        return goodsMapper.updateByPrimaryKeySelective(goods)>0;
    }
    @Override
    public List<Goods> selectGoodsToMiaoSha(){
        GoodsExample example = new GoodsExample();
        GoodsExample.Criteria criteria = example.createCriteria();
        criteria.andIsmiaoshaEqualTo(1);//只找要秒杀的
        List<Goods> goods = this.goodMapper.selectByExample(example);
        return goods;
    }
}

创建统一返回的数据实体ResultObj。

@Data
@AllArgsConstructor
@NoArgsConstructor
public class ResultObj {
    private Integer code;
    private String msg;
}

创建ActiveController。

@RestController
public class ActiveController {
    @Autowired
    private GoodsService goodsService;

    /**
     * 秒杀入口
     * @param pid -商品id,做检查库存使用
     * @param userId -用户id,做订单和用户关联使用(比如生成成功秒杀商品的用户订单表)
     *                这里没做多余的逻辑,只看了相关情况的返回结果,有需要的可以自己去实现
     */
    @RequestMapping(value = "secondsKill")
    public ResultObj secondsKill(String pid, String userId) {
        try {
            //模拟发送100次请求,库存设置为少于100查看结果,此100次请求为顺序请求
            //for(int i=0; i<100; i++) {
            boolean result = goodsService.secondsKill(pid, userId);
            if(result) {
                return new ResultObj(1,"秒杀成功,请稍后去订单查询");
            }
            //}
        } catch (Exception e) {
            e.printStackTrace();
            return new ResultObj(-1,"秒杀失败,原因:"+e.getMessage());
        }
        return null;
    }
}

修改GoodsService及其实现类。

public interface GoodsService {
    boolean updateByPrimaryKeySelective(Goods goods);
    List<Goods> selectGoodsToMiaoSha();
    boolean secondsKill(String pid,String userId);
}

package com.coydone.service.impl;

@Transactional
@Service
public class GoodsServiceImpl implements GoodsService {

    @Autowired
    private GoodsMapper goodsMapper;

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private Sender sender;

    int i=0;

    @Override
    public boolean updateByPrimaryKeySelective(Goods goods) {
        return goodsMapper.updateByPrimaryKeySelective(goods)>0;
    }

    @Override
    public List<Goods> selectGoodsToMiaoSha() {
        GoodsExample example=new GoodsExample();
        GoodsExample.Criteria criteria = example.createCriteria();
        criteria.andIsmiaoshaEqualTo(1);//只找要秒杀的
        List<Goods> goods = this.goodsMapper.selectByExample(example);
        return goods;
    }

    @Override
    public boolean secondsKill(String pid, String userId) {
        ValueOperations<String, String> opsForValue = redisTemplate.opsForValue();
        boolean result = true;
        //根据商品id获取Redis中的库存数量
        String num =opsForValue.get(pid);
        System.out.println("redis>>>>>>>>>>"+num);
        if(new Long(num) <= 0) {
            result = false;
            throw new RuntimeException("库存不足");
        }
        //消息入队,调用相关方法
        sender.sendDirectQueue(pid,userId);
        //只为验证请求及发送消息次数
        System.out.println("service>>>>>>>>>>"+i++);
        return result;
    }
}

启动测试:http://127.0.0.1:8080/secondsKill?pid=18&userId=1

0

评论区