在支付完成后可以主动向支付宝查询支付的结果,用于更新支付记录和订单的状态;支付成功,通过消息队列的方式异步通知学习中心微服务,将选课记录添加到我的课表中。

接口信息

路径地址 http://localhost:63020/orders/payresult
请求方式 GET
请求参数 String
返回结果 PayRecordVO

RabbitMQ

消息队列是异步调用的实现方式之一,本项目选择RabbitMQ框架完成异步通知:

  1. 发送消息前,将消息保存到本地数据库;
  2. 消息成功发送到交换机(exchange)时,删除数据库消息记录,即保存到历史消息表中;
  3. 当消息从交换机(exchange)到队列(queue)时发生异常,也要将消息保存到数据库

安装RabbitMQ

使用Docker方式进行安装

docker pull rabbitmq

使用下面命令启动MQ容器

docker run \
-d \
--hostname mq1 \
--name mq \
-p 15672:15672 \
-p 5672:5672 \
-e RABBITMQ_DEFAULT_USER=swcode \
-e RABBITMQ_DEFAULT_PASS=123321 \
rabbitmq:3-management

Maven依赖

<!-- 消息队列 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

发布订阅

常见exchange类型包括:

  • Fanout:广播

  • Direct:路由

  • Topic:话题

注意:exchange负责消息路由,而不是存储,路由失败则消息丢失

生产者消息确认

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。

返回结果有两种方式:

  • publisher-confirm,发送者确认
    消息成功投递到交换机,返回ack
    消息未投递到交换机,返回nack
  • publisher-return,发送者回执
    消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
    注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突

再配置文件中开启:

spring:
rabbitmq:
publisher-confirm-type: correlated
# 开启publisher-confirm,且选择correlated:【异步】回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publisher-returns: true # 开启publish-return功能
template:
mandatory: true # 定义当消息从交换机路由到队列失败时的策略。【true,则调用ReturnCallback;false:则直接丢弃消息】

消息队列配置

创建交换机和队列,并绑定队列和交换机,同时在回调中处理失败消息。

learning-online-orders-service工程中(生产者)创建配置类 PayNotifyConfig

PayNotifyConfig
@Slf4j
@Configuration
public class PayNotifyConfig implements ApplicationContextAware {

// 交换机
public static final String PAY_NOTIFY_EXCHANGE_FANOUT = "pay_notify_exchange_fanout";
// 支付结果通知消息类型
public static final String Message_TYPE = "pay_result_notify";
// 支付通知队列
public static final String PAY_NOTIFY_QUEUE = "pay_notify_queue";

// 声明交换机,且持久化
@Bean(PAY_NOTIFY_EXCHANGE_FANOUT)
public FanoutExchange payNotifyExchangeFanout() {
// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
return new FanoutExchange(PAY_NOTIFY_EXCHANGE_FANOUT, true, false);
}

// 支付通知队列
@Bean(PAY_NOTIFY_QUEUE)
public Queue coursePublishQueue() {
return QueueBuilder.durable(PAY_NOTIFY_QUEUE).build();
}

// 交换机和支付队列绑定
@Bean
public Binding bindingCoursePublishQueue(@Qualifier(PAY_NOTIFY_QUEUE) Queue queue, @Qualifier(PAY_NOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}

/**
* 当消息投递失败时,将消息写入到本地消息表
* exchange到queue失败
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 消息处理service
MqMessageService mqMessageService = applicationContext.getBean(MqMessageService.class);
// 设置ReturnCallback
rabbitTemplate.setReturnCallback(((message, replyCode, replyText, exchange, routingKey) -> {
// 交换机到队列投递失败,记录日志
log.info("消息发送失败, 应答码: {}, 原因: {}, 交换机: {}, 路由键: {}, 消息: {}",
replyCode, replyText, exchange, routingKey, message);
MqMessage mqMessage = JSON.parseObject(message.toString(), MqMessage.class);
// 将消息添加到消息表
mqMessageService.addMessage(mqMessage.getMessageType(), mqMessage.getBusinessKey1(), mqMessage.getBusinessKey2(), mqMessage.getBusinessKey3());
}));
}
}

每个 RabbitTemplate 只能配置一个ReturnCallback,因此需要在项目加载时添加配置,为此需要实现ApplicationContextAware(实现了ApplicationContextAware接口的实现类,在Spring容器的Bean工厂创建完毕后会通知该实现类)

ReturnCallback的回调函数:当消息成功发送到交换机,但是没有成功发送到消息队列时,回退到回调函数,应该如何处理?就是回调函数里面的内容

learning-online-learning-service工程中(消费者)创建配置类 PayNotifyConfig

消费端配置一份,保证消费端先启动时不会报错

PayNotifyConfig
/**
* 消费端配置一份,保证消费端先启动时不会报错
*/
@Configuration
public class PayNotifyConfig {

// 交换机
public static final String PAY_NOTIFY_EXCHANGE_FANOUT = "pay_notify_exchange_fanout";
// 支付结果通知消息类型
public static final String Message_TYPE = "pay_result_notify";
// 支付通知队列
public static final String PAY_NOTIFY_QUEUE = "pay_notify_queue";

// 声明交换机,且持久化
@Bean(PAY_NOTIFY_EXCHANGE_FANOUT)
public FanoutExchange payNotifyExchangeFanout() {
// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
return new FanoutExchange(PAY_NOTIFY_EXCHANGE_FANOUT, true, false);
}

// 支付通知队列
@Bean(PAY_NOTIFY_QUEUE)
public Queue coursePublishQueue() {
return QueueBuilder.durable(PAY_NOTIFY_QUEUE).build();
}

// 交换机和支付队列绑定
@Bean
public Binding bindingCoursePublishQueue(@Qualifier(PAY_NOTIFY_QUEUE) Queue queue, @Qualifier(PAY_NOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
}

查询service

com.swx.orders.service包下的 OrderService 中定义查询结果方法:

OrderService
/**
* 查询支付结果
*
* @param payNo 支付交易号
* @return 交付结果
*/
PayRecordVO queryRequestPay(String payNo);

在实现类中实现该方法

ConfirmCallback【可以在发送消息时指定】因为每个业务处理confirm成功或失败的逻辑不一定相同

注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突

XcPayRecordServiceImpl
/**
* <p>
* 服务实现类
* </p>
*
* @author sw-code
* @since 2023-09-04
*/
@Service
public class XcPayRecordServiceImpl extends ServiceImpl<XcPayRecordMapper, XcPayRecord> implements XcPayRecordService {

private final XcOrdersService xcOrdersService;
private final XcPayRecordService xcPayRecordService;
private final TransactionTemplate transactionTemplate;
private final RabbitTemplate rabbitTemplate;
private final MqMessageService mqMessageService;

@Value("${pay.qrcodeUrl}")
private String qrcodeUrl;

public OrderServiceImpl(XcOrdersService xcOrdersService, XcPayRecordService xcPayRecordService,
TransactionTemplate transactionTemplate, RabbitTemplate rabbitTemplate,
MqMessageService mqMessageService) {
this.xcOrdersService = xcOrdersService;
this.xcPayRecordService = xcPayRecordService;
this.transactionTemplate = transactionTemplate;
this.rabbitTemplate = rabbitTemplate;
this.mqMessageService = mqMessageService;
}

/**
* 查询支付结果
*
* @param payNo 支付交易号
* @return 交付结果
*/
@Override
public PayRecordVO queryRequestPay(String payNo) {
PayStatusDTO payStatusDTO = queryPayResultFromAlipay(payNo);
// 更新
transactionTemplate.executeWithoutResult(status -> {
try {
saveAlipayStatus(payStatusDTO);
} catch (Exception e) {
status.setRollbackOnly();
}
});
XcPayRecord oneByPayNo = xcPayRecordService.getOneByPayNo(payNo);
PayRecordVO payRecordVO = new PayRecordVO();
BeanUtils.copyProperties(oneByPayNo, payRecordVO);
return payRecordVO;
}

/**
* 更新支付记录和订单状态
* @param dto 支付宝支付状态
*/
@Transactional(rollbackFor = Exception.class)
@Override
public void saveAlipayStatus(PayStatusDTO dto) {
String payNo = dto.getOut_trade_no();
XcPayRecord oneByPayNo = xcPayRecordService.getOneByPayNo(payNo);
if (oneByPayNo == null) {
throw new BizException("找不到相关的支付记录");
}
Long orderId = oneByPayNo.getOrderId();
XcOrders xcOrder = xcOrdersService.getById(orderId);
if (xcOrder == null) {
throw new BizException("找不到相关连的订单");
}
String status = oneByPayNo.getStatus();
if (status.equals("601002")) {
return;
}
String tradeStatus = dto.getTrade_status();
if (!tradeStatus.equals("TRADE_SUCCESS")) {
return;
}
oneByPayNo.setStatus("601002");
// 支付宝订单号
oneByPayNo.setOutPayNo(dto.getTrade_no());
oneByPayNo.setOutPayChannel("Alipay");
oneByPayNo.setPaySuccessTime(LocalDateTime.now());
xcPayRecordService.updateById(oneByPayNo);

// 更新订单表状态
xcOrder.setStatus("600002");
xcOrdersService.updateById(xcOrder);

// 消息写入数据库, 消息类型,选课ID,购买课程(订单类型),null
MqMessage mqMessage = mqMessageService.addMessage("pay_result_notify", xcOrder.getOutBusinessId(), xcOrder.getOrderType(), null);
// 发送消息
notifyPayResult(mqMessage);
}

/**
* 通知支付结果
*
* @param mqMessage 结果
*/
@Override
public void notifyPayResult(MqMessage mqMessage) {
// 1. 将消息体转为Json
String jsonMsg = JSON.toJSONString(mqMessage);
// 2. 设消息的持久化方式为PERSISTENT,即消息会被持久化到磁盘上,确保即使在RabbitMQ服务器重启后也能够恢复消息。
Message msgObj = MessageBuilder.withBody(jsonMsg.getBytes()).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
// 3. 封装CorrelationData,用于跟踪消息的相关信息
CorrelationData correlationData = new CorrelationData(mqMessage.getId().toString());
// 3.1 添加一个Callback对象,该对象用于在消息确认时处理消息的结果
correlationData.getFuture().addCallback(result -> {
if (result != null && result.isAck()) {
// 3.2 消息成功发送到交换机,删除消息表中的记录
log.debug("消息发送成功:{}", jsonMsg);
mqMessageService.completed(mqMessage.getId());
} else {
// 3.3 消息发送失败
log.error("消息发送失败,id:{},原因:{}", mqMessage.getId(), result);
}
}, ex -> {
// 3.4 消息异常
log.error("消息发送异常,id:{},原因:{}", mqMessage.getId(), ex.getMessage());
});
// 4. 发送消息
// 发送消息
rabbitTemplate.convertAndSend(PayNotifyConfig.PAY_NOTIFY_EXCHANGE_FANOUT, "", msgObj , correlationData);
}

/**
* 查询支付宝,获取支付状态
*
* @param payNo 支付交易号
* @return 交付结果
*/
public PayStatusDTO queryPayResultFromAlipay(String payNo) {
AlipayClient alipayClient = new DefaultAlipayClient(AlipayConfig.URL, APP_ID, APP_PRIVATE_KEY, AlipayConfig.FORMAT, AlipayConfig.CHARSET, ALIPAY_PUBLIC_KEY, AlipayConfig.SIGNTYPE);
AlipayTradeQueryRequest request = new AlipayTradeQueryRequest();
JSONObject bizContent = new JSONObject();
bizContent.put("out_trade_no", payNo);
request.setBizContent(bizContent.toString());
AlipayTradeQueryResponse response = null;
try {
response = alipayClient.execute(request);
} catch (AlipayApiException e) {
throw new BizException("请求支付查询支付结果异常");
}
if (!response.isSuccess()) {
throw new BizException("请求支付查询支付结果失败");
}

String body = response.getBody();
// 解析支付结果
Map bodyMap = JSON.parseObject(body, Map.class);
Map alipayTradeQueryResponse = (Map) bodyMap.get("alipay_trade_query_response");

String tradeNo = (String) alipayTradeQueryResponse.get("trade_no");
String tradeStatus = (String) alipayTradeQueryResponse.get("trade_status");
String totalAmount = (String) alipayTradeQueryResponse.get("total_amount");
PayStatusDTO payStatusDTO = new PayStatusDTO();
payStatusDTO.setOut_trade_no(payNo);
payStatusDTO.setTrade_no(tradeNo);
payStatusDTO.setTrade_status(tradeStatus);
payStatusDTO.setApp_id(APP_ID);
payStatusDTO.setTotal_amount(totalAmount);
return payStatusDTO;
}
}

学习中心微服务

在学习中心微服务中添加消息队列的依赖信息,并添加yaml配置

<!-- 消息队列 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.swx</groupId>
<artifactId>learning-online-message-sdk</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
shared-configs:
- data-id: rabbitmq-${spring.profiles.active}.yaml
group: learning-online-common
refresh: true

监听消息队列

learning-online-learning-service工程的service.impl包下创建监听类ReceivePayNotifyService

ReceivePayNotifyService
@Slf4j
@Service
public class ReceivePayNotifyService {

private final MyCourseTablesService myCourseTablesService;

public ReceivePayNotifyService(MyCourseTablesService myCourseTablesService) {
this.myCourseTablesService = myCourseTablesService;
}

@RabbitListener(queues = PayNotifyConfig.PAY_NOTIFY_QUEUE)
public void receive(Message message) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
log.debug("消费睡眠异常:", e);
}

// 解析消息,转为对象
String jsonMessage = new String(message.getBody());
MqMessage mqMessage = JSON.parseObject(jsonMessage, MqMessage.class);
// 订单类型
String orderType = mqMessage.getBusinessKey2();
if (!orderType.equals("60201")) {
// 不是购买课程的订单类型
return;
}
String chooseCourseId = mqMessage.getBusinessKey1();
// 更新选课记录表
boolean b = myCourseTablesService.payChooseCourseSuccess(chooseCourseId);
if (b) {
throw new BizException("保存选课记录状态失败");
}
}
}

查询Controller

OrderController
@ApiOperation("查询支付结果")
@RequestMapping("/payresult")
public PayRecordVO payResult(String payNo) {
return orderService.queryRequestPay(payNo);
}