ActiveMQ在支付业务模块中应用

1 支付成功通知

支付模块利用消息队列通知订单系统,支付成功

在gmall-payment支付模块中配置application.properties

spring.activemq.broker-url=tcp://192.168.67.204:61616
spring.activemq.pool.enabled=true
activemq.listener.enable=true

 

在PaymentServiceImpl中增加发送方法:

接口:发送消息,给activemq 支付结果!success,fail

public void sendPaymentResult(PaymentInfo paymentInfo,String result);

 

//   添加发送方法
public void sendPaymentResult(PaymentInfo paymentInfo,String result){
    Connection connection = activeMQUtil.getConnection();
    try {
        connection.start();
        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
        // 创建队列
       
Queue paymentResultQueue = session.createQueue("PAYMENT_RESULT_QUEUE");
        MessageProducer producer = session.createProducer(paymentResultQueue);
        MapMessage mapMessage = new ActiveMQMapMessage();
        mapMessage.setString("orderId",paymentInfo.getOrderId());
        mapMessage.setString("result",result);
        producer.send(mapMessage);
        session.commit();
        producer.close();
        session.close();
        connection.close();
    } catch (JMSException e) {
        e.printStackTrace();
    }
}

 

在PaymentController中增加一个方法用来测试

// 发送验证
@RequestMapping("sendPaymentResult")
@ResponseBody
public String sendPaymentResult(PaymentInfo paymentInfo,@RequestParam("result") String result){
    paymentService.sendPaymentResult(paymentInfo,result);
    return "sent payment result";
}

 

 

ActiveMQ在支付业务模块中应用

在浏览器中访问:

ActiveMQ在支付业务模块中应用

查看队列内容:有一个在队列中没有被消费的消息。

ActiveMQ在支付业务模块中应用

2 订单模块消费消息

在订单模块中【gmall-order-service】新添加一个消费类

配置文件添加

spring.activemq.broker-url=tcp://192.168.67.204:61616
spring.activemq.pool.enabled=true
activemq.listener.enable=true

消费类

package com.atguigu.gmall.order.mq;

import com.alibaba.dubbo.config.annotation.Reference;
import com.atguigu.gmall.bean.enums.ProcessStatus;
import com.atguigu.gmall.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.MapMessage;

@Component
public class OrderConsumer {

    @Autowired
    OrderService orderService;

    @JmsListener(destination = "PAYMENT_RESULT_QUEUE",containerFactory = "jmsQueueListener")
    public  void  consumerPaymentResult(MapMessage mapMessage) throws JMSException {
        String orderId = mapMessage.getString("orderId");
        String result = mapMessage.getString("result");
        System.out.println("result = " + result);
        System.out.println("orderId = " + orderId);
        if ("success".equals(result)){
            orderService.updateOrderStatus(orderId, ProcessStatus.PAID);
        }
    }
}

 

实现类

public void updateOrderStatus(String orderId,ProcessStatus processStatus){
    OrderInfo orderInfo = new OrderInfo();
    orderInfo.setId(orderId);
    orderInfo.setProcessStatus(processStatus);
    orderInfo.setOrderStatus(processStatus.getOrderStatus());
    orderInfoMapper.updateByPrimaryKeySelective(orderInfo);
}

 

此处:记得在配置文件中添加

# application.properties

mapper.enum-as-simple-type=true

 

3 订单模块发送减库存通知

订单模块除了接收到请求改变单据状态,还要发送库存系统

查看看《库存管理系统接口手册》中【减库存的消息队列消费端接口】中的描述,组织相应的消息数据进行传递。

创建数据库表:

create table  ware_order_task

(

       id                bigint /*auto_increment*/ not null,

       order_id          bigint(20),

       consignee         VARCHAR(100),

       consignee_tel     VARCHAR(20),

       delivery_address  VARCHAR(1000),

       order_comment     VARCHAR(200),

       payment_way       VARCHAR(2),

       task_status       VARCHAR(20),

       order_body        VARCHAR(200),

       tracking_no       VARCHAR(200),

       create_time       DATETIME,

       ware_id           bigint,

       task_comment      VARCHAR(500)

);

alter  table ware_order_task

       add constraint PK_ware_order_task_id primary key (id);

 

create table  ware_order_task_detail

(

       id                bigint /*auto_increment*/ not null,

       sku_id            bigint,

       sku_name          VARCHAR(200),

       sku_num           INTEGER,

       task_id           bigint

);

alter  table ware_order_task_detail

       add constraint PK_ware_orail_id6CBB primary key (id);

 

OrderConsumer.java

@Component
public class OrderConsumer {

    @Reference
    OrderService orderService;

    @JmsListener(destination = "PAYMENT_RESULT_QUEUE",containerFactory = "jmsQueueListener")
    public  void  consumerPaymentResult(MapMessage mapMessage) throws JMSException {
        String orderId = mapMessage.getString("orderId");
        String result = mapMessage.getString("result");
        System.out.println("result = " + result);
        System.out.println("orderId = " + orderId);
        if ("success".equals(result)){
            // 更新支付状态
          
orderService.updateOrderStatus(orderId, ProcessStatus.PAID);
           // 通知减库存
           
orderService.sendOrderStatus(orderId);
            orderService.updateOrderStatus(orderId, ProcessStatus.DELEVERED);
        }else {
          orderService.updateOrderStatus(orderId,ProcessStatus.UNPAID);
        }
    }
}

 

OrderService接口

public void sendOrderStatus(String orderId);

 

OrderServiceImpl实现类

public void sendOrderStatus(String orderId){
    Connection connection = activeMQUtil.getConnection();
    String orderJson = initWareOrder(orderId);
    try {
        connection.start();
        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
        Queue order_result_queue = session.createQueue("ORDER_RESULT_QUEUE");
        MessageProducer producer = session.createProducer(order_result_queue);

        ActiveMQTextMessage textMessage = new ActiveMQTextMessage();
        textMessage.setText(orderJson);
        producer.send(textMessage);
        session.commit();
        session.close();
        producer.close();
        connection.close();
    } catch (JMSException e) {
        e.printStackTrace();
    }
}

public String initWareOrder(String orderId){
    OrderInfo orderInfo = getOrderInfo(orderId);
    Map map = initWareOrder(orderInfo);
    return JSON.toJSONString(map);
}
// 设置初始化仓库信息方法
public Map  initWareOrder (OrderInfo orderInfo){
    Map<String,Object> map = new HashMap<>();
    map.put("orderId",orderInfo.getId());
    map.put("consignee", orderInfo.getConsignee());
    map.put("consigneeTel",orderInfo.getConsigneeTel());
    map.put("orderComment",orderInfo.getOrderComment());
    map.put("orderBody",orderInfo.getTradeBody());
    map.put("deliveryAddress",orderInfo.getDeliveryAddress());
    map.put("paymentWay","2");
    map.put("wareId",orderInfo.getWareId());

    // 组合json
   
List detailList = new ArrayList();
    List<OrderDetail> orderDetailList = orderInfo.getOrderDetailList();
    for (OrderDetail orderDetail : orderDetailList) {
        Map detailMap = new HashMap();
        detailMap.put("skuId",orderDetail.getSkuId());
        detailMap.put("skuName",orderDetail.getSkuName());
        detailMap.put("skuNum",orderDetail.getSkuNum());
        detailList.add(detailMap);
    }
    map.put("details",detailList);
    return map;
}

 

 

注意:getOrderInfo(orderId); 方法中一定要根据orderId取得到orderDetail。
注意:需要在仓库系统中

@Component
public class WareConsumer

类上加个注解

在库存系统中将activemq的配置打开

spring.activemq.broker-url=tcp://192.168.67.200:61616
spring.activemq.in-memory=true
spring.activemq.pool.enabled=false

重启仓库系统

减库存:如果你的商品明细中的商品,在不同的仓库。则减库存会失败!

异常:需要拆单!因为我的数据,在两个仓库中

ActiveMQ在支付业务模块中应用

4 消费减库存结果

给仓库系统发送减库存消息后,还要接受减库存成功或者失败的消息。

同样根据《库存管理系统接口手册》中【商品减库结果消息】的说明完成。消费该消息的消息队列监听程序。

接受到消息后主要做的工作就是更新订单状态。

 

@JmsListener(destination = "SKU_DEDUCT_QUEUE",containerFactory = "jmsQueueListener")
public void consumeSkuDeduct(MapMessage mapMessage) throws JMSException {
    String orderId = mapMessage.getString("orderId");
    String  status = mapMessage.getString("status");
    if("DEDUCTED".equals(status)){
        orderService.updateOrderStatus(  orderId , ProcessStatus.WAITING_DELEVER);
    }else{
        orderService.updateOrderStatus(  orderId , ProcessStatus.STOCK_EXCEPTION);
    }
}

ActiveMQ在支付业务模块中应用