1 支付成功通知
支付模块利用消息队列通知订单系统,支付成功
在gmall-payment支付模块中配置application.properties
spring.activemq.broker-url=tcp://192.168.67.204:61616
spring.activemq.pool.enabled=true
activemq.listener.enable=true
|
在PaymentServiceImpl中增加发送方法:
接口:发送消息,给activemq 支付结果!success,fail
public void sendPaymentResult(PaymentInfo paymentInfo,String result);
|
// 添加发送方法
public void sendPaymentResult(PaymentInfo paymentInfo,String result){
Connection connection = activeMQUtil.getConnection();
try {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
// 创建队列
Queue paymentResultQueue = session.createQueue("PAYMENT_RESULT_QUEUE");
MessageProducer producer = session.createProducer(paymentResultQueue);
MapMessage mapMessage = new ActiveMQMapMessage();
mapMessage.setString("orderId",paymentInfo.getOrderId());
mapMessage.setString("result",result);
producer.send(mapMessage);
session.commit();
producer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
|
在PaymentController中增加一个方法用来测试
// 发送验证
@RequestMapping("sendPaymentResult")
@ResponseBody
public String sendPaymentResult(PaymentInfo paymentInfo,@RequestParam("result") String result){
paymentService.sendPaymentResult(paymentInfo,result);
return "sent payment result";
}
|
|
在浏览器中访问:
查看队列内容:有一个在队列中没有被消费的消息。
2 订单模块消费消息
在订单模块中【gmall-order-service】新添加一个消费类
配置文件添加
spring.activemq.broker-url=tcp://192.168.67.204:61616
spring.activemq.pool.enabled=true
activemq.listener.enable=true
|
消费类
package com.atguigu.gmall.order.mq;
import com.alibaba.dubbo.config.annotation.Reference;
import com.atguigu.gmall.bean.enums.ProcessStatus;
import com.atguigu.gmall.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.MapMessage;
@Component
public class OrderConsumer {
@Autowired
OrderService orderService;
@JmsListener(destination = "PAYMENT_RESULT_QUEUE",containerFactory = "jmsQueueListener")
public void consumerPaymentResult(MapMessage mapMessage) throws JMSException {
String orderId = mapMessage.getString("orderId");
String result = mapMessage.getString("result");
System.out.println("result = " + result);
System.out.println("orderId = " + orderId);
if ("success".equals(result)){
orderService.updateOrderStatus(orderId, ProcessStatus.PAID);
}
}
}
|
实现类
public void updateOrderStatus(String orderId,ProcessStatus processStatus){
OrderInfo orderInfo = new OrderInfo();
orderInfo.setId(orderId);
orderInfo.setProcessStatus(processStatus);
orderInfo.setOrderStatus(processStatus.getOrderStatus());
orderInfoMapper.updateByPrimaryKeySelective(orderInfo);
}
|
此处:记得在配置文件中添加
# application.properties
mapper.enum-as-simple-type=true
|
3 订单模块发送减库存通知
订单模块除了接收到请求改变单据状态,还要发送库存系统
查看看《库存管理系统接口手册》中【减库存的消息队列消费端接口】中的描述,组织相应的消息数据进行传递。
创建数据库表:
create table ware_order_task
(
id bigint /*auto_increment*/ not null,
order_id bigint(20),
consignee VARCHAR(100),
consignee_tel VARCHAR(20),
delivery_address VARCHAR(1000),
order_comment VARCHAR(200),
payment_way VARCHAR(2),
task_status VARCHAR(20),
order_body VARCHAR(200),
tracking_no VARCHAR(200),
create_time DATETIME,
ware_id bigint,
task_comment VARCHAR(500)
);
alter table ware_order_task
add constraint PK_ware_order_task_id primary key (id);
|
create table ware_order_task_detail
(
id bigint /*auto_increment*/ not null,
sku_id bigint,
sku_name VARCHAR(200),
sku_num INTEGER,
task_id bigint
);
alter table ware_order_task_detail
add constraint PK_ware_orail_id6CBB primary key (id);
|
OrderConsumer.java
@Component
public class OrderConsumer {
@Reference
OrderService orderService;
@JmsListener(destination = "PAYMENT_RESULT_QUEUE",containerFactory = "jmsQueueListener")
public void consumerPaymentResult(MapMessage mapMessage) throws JMSException {
String orderId = mapMessage.getString("orderId");
String result = mapMessage.getString("result");
System.out.println("result = " + result);
System.out.println("orderId = " + orderId);
if ("success".equals(result)){
// 更新支付状态
orderService.updateOrderStatus(orderId, ProcessStatus.PAID);
// 通知减库存
orderService.sendOrderStatus(orderId);
orderService.updateOrderStatus(orderId, ProcessStatus.DELEVERED);
}else {
orderService.updateOrderStatus(orderId,ProcessStatus.UNPAID);
}
}
}
|
OrderService接口
public void sendOrderStatus(String orderId);
|
OrderServiceImpl实现类
public void sendOrderStatus(String orderId){
Connection connection = activeMQUtil.getConnection();
String orderJson = initWareOrder(orderId);
try {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue order_result_queue = session.createQueue("ORDER_RESULT_QUEUE");
MessageProducer producer = session.createProducer(order_result_queue);
ActiveMQTextMessage textMessage = new ActiveMQTextMessage();
textMessage.setText(orderJson);
producer.send(textMessage);
session.commit();
session.close();
producer.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
public String initWareOrder(String orderId){
OrderInfo orderInfo = getOrderInfo(orderId);
Map map = initWareOrder(orderInfo);
return JSON.toJSONString(map);
}
// 设置初始化仓库信息方法
public Map initWareOrder (OrderInfo orderInfo){
Map<String,Object> map = new HashMap<>();
map.put("orderId",orderInfo.getId());
map.put("consignee", orderInfo.getConsignee());
map.put("consigneeTel",orderInfo.getConsigneeTel());
map.put("orderComment",orderInfo.getOrderComment());
map.put("orderBody",orderInfo.getTradeBody());
map.put("deliveryAddress",orderInfo.getDeliveryAddress());
map.put("paymentWay","2");
map.put("wareId",orderInfo.getWareId());
// 组合json
List detailList = new ArrayList();
List<OrderDetail> orderDetailList = orderInfo.getOrderDetailList();
for (OrderDetail orderDetail : orderDetailList) {
Map detailMap = new HashMap();
detailMap.put("skuId",orderDetail.getSkuId());
detailMap.put("skuName",orderDetail.getSkuName());
detailMap.put("skuNum",orderDetail.getSkuNum());
detailList.add(detailMap);
}
map.put("details",detailList);
return map;
}
|
注意:getOrderInfo(orderId); 方法中一定要根据orderId取得到orderDetail。
注意:需要在仓库系统中
@Component
public class WareConsumer
类上加个注解
在库存系统中将activemq的配置打开
spring.activemq.broker-url=tcp://192.168.67.200:61616
spring.activemq.in-memory=true
spring.activemq.pool.enabled=false
|
重启仓库系统
减库存:如果你的商品明细中的商品,在不同的仓库。则减库存会失败!
异常:需要拆单!因为我的数据,在两个仓库中
4 消费减库存结果
给仓库系统发送减库存消息后,还要接受减库存成功或者失败的消息。
同样根据《库存管理系统接口手册》中【商品减库结果消息】的说明完成。消费该消息的消息队列监听程序。
接受到消息后主要做的工作就是更新订单状态。
@JmsListener(destination = "SKU_DEDUCT_QUEUE",containerFactory = "jmsQueueListener")
public void consumeSkuDeduct(MapMessage mapMessage) throws JMSException {
String orderId = mapMessage.getString("orderId");
String status = mapMessage.getString("status");
if("DEDUCTED".equals(status)){
orderService.updateOrderStatus( orderId , ProcessStatus.WAITING_DELEVER);
}else{
orderService.updateOrderStatus( orderId , ProcessStatus.STOCK_EXCEPTION);
}
}
|