在支付完成后可以主动向支付宝查询支付的结果,用于更新支付记录和订单的状态;支付成功,通过消息队列的方式异步通知学习中心微服务,将选课记录添加到我的课表中。
接口信息
RabbitMQ
消息队列是异步调用的实现方式之一,本项目选择RabbitMQ框架完成异步通知:
- 发送消息前,将消息保存到本地数据库;
- 消息成功发送到交换机(exchange)时,删除数据库消息记录,即保存到历史消息表中;
- 当消息从交换机(exchange)到队列(queue)时发生异常,也要将消息保存到数据库
安装RabbitMQ
使用Docker方式进行安装
使用下面命令启动MQ容器
docker run \ -d \ --hostname mq1 \ --name mq \ -p 15672:15672 \ -p 5672:5672 \ -e RABBITMQ_DEFAULT_USER=swcode \ -e RABBITMQ_DEFAULT_PASS=123321 \ rabbitmq:3-management
|
Maven依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
发布订阅
常见exchange类型包括:
Fanout:广播
Direct:路由
Topic:话题
注意:exchange负责消息路由,而不是存储,路由失败则消息丢失
生产者消息确认
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
返回结果有两种方式:
- publisher-confirm,发送者确认
消息成功投递到交换机,返回ack
消息未投递到交换机,返回nack
- publisher-return,发送者回执
消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突
再配置文件中开启:
spring: rabbitmq: publisher-confirm-type: correlated publisher-returns: true template: mandatory: true
|
消息队列配置
创建交换机和队列,并绑定队列和交换机,同时在回调中处理失败消息。
在learning-online-orders-service
工程中(生产者)创建配置类 PayNotifyConfig
PayNotifyConfig@Slf4j @Configuration public class PayNotifyConfig implements ApplicationContextAware {
public static final String PAY_NOTIFY_EXCHANGE_FANOUT = "pay_notify_exchange_fanout"; public static final String Message_TYPE = "pay_result_notify"; public static final String PAY_NOTIFY_QUEUE = "pay_notify_queue";
@Bean(PAY_NOTIFY_EXCHANGE_FANOUT) public FanoutExchange payNotifyExchangeFanout() { return new FanoutExchange(PAY_NOTIFY_EXCHANGE_FANOUT, true, false); }
@Bean(PAY_NOTIFY_QUEUE) public Queue coursePublishQueue() { return QueueBuilder.durable(PAY_NOTIFY_QUEUE).build(); }
@Bean public Binding bindingCoursePublishQueue(@Qualifier(PAY_NOTIFY_QUEUE) Queue queue, @Qualifier(PAY_NOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); }
@Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); MqMessageService mqMessageService = applicationContext.getBean(MqMessageService.class); rabbitTemplate.setReturnCallback(((message, replyCode, replyText, exchange, routingKey) -> { log.info("消息发送失败, 应答码: {}, 原因: {}, 交换机: {}, 路由键: {}, 消息: {}", replyCode, replyText, exchange, routingKey, message); MqMessage mqMessage = JSON.parseObject(message.toString(), MqMessage.class); mqMessageService.addMessage(mqMessage.getMessageType(), mqMessage.getBusinessKey1(), mqMessage.getBusinessKey2(), mqMessage.getBusinessKey3()); })); } }
|
每个 RabbitTemplate 只能配置一个ReturnCallback,因此需要在项目加载时添加配置,为此需要实现ApplicationContextAware(实现了ApplicationContextAware接口的实现类,在Spring容器的Bean工厂创建完毕后会通知该实现类)
ReturnCallback的回调函数:当消息成功发送到交换机,但是没有成功发送到消息队列时,回退到回调函数,应该如何处理?就是回调函数里面的内容
在learning-online-learning-service
工程中(消费者)创建配置类 PayNotifyConfig
消费端配置一份,保证消费端先启动时不会报错
PayNotifyConfig
@Configuration public class PayNotifyConfig {
public static final String PAY_NOTIFY_EXCHANGE_FANOUT = "pay_notify_exchange_fanout"; public static final String Message_TYPE = "pay_result_notify"; public static final String PAY_NOTIFY_QUEUE = "pay_notify_queue";
@Bean(PAY_NOTIFY_EXCHANGE_FANOUT) public FanoutExchange payNotifyExchangeFanout() { return new FanoutExchange(PAY_NOTIFY_EXCHANGE_FANOUT, true, false); }
@Bean(PAY_NOTIFY_QUEUE) public Queue coursePublishQueue() { return QueueBuilder.durable(PAY_NOTIFY_QUEUE).build(); }
@Bean public Binding bindingCoursePublishQueue(@Qualifier(PAY_NOTIFY_QUEUE) Queue queue, @Qualifier(PAY_NOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } }
|
查询service
在com.swx.orders.service
包下的 OrderService 中定义查询结果方法:
OrderService
PayRecordVO queryRequestPay(String payNo);
|
在实现类中实现该方法
ConfirmCallback【可以在发送消息时指定】因为每个业务处理confirm成功或失败的逻辑不一定相同
注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突
XcPayRecordServiceImpl
@Service public class XcPayRecordServiceImpl extends ServiceImpl<XcPayRecordMapper, XcPayRecord> implements XcPayRecordService { private final XcOrdersService xcOrdersService; private final XcPayRecordService xcPayRecordService; private final TransactionTemplate transactionTemplate; private final RabbitTemplate rabbitTemplate; private final MqMessageService mqMessageService;
@Value("${pay.qrcodeUrl}") private String qrcodeUrl;
public OrderServiceImpl(XcOrdersService xcOrdersService, XcPayRecordService xcPayRecordService, TransactionTemplate transactionTemplate, RabbitTemplate rabbitTemplate, MqMessageService mqMessageService) { this.xcOrdersService = xcOrdersService; this.xcPayRecordService = xcPayRecordService; this.transactionTemplate = transactionTemplate; this.rabbitTemplate = rabbitTemplate; this.mqMessageService = mqMessageService; }
@Override public PayRecordVO queryRequestPay(String payNo) { PayStatusDTO payStatusDTO = queryPayResultFromAlipay(payNo); transactionTemplate.executeWithoutResult(status -> { try { saveAlipayStatus(payStatusDTO); } catch (Exception e) { status.setRollbackOnly(); } }); XcPayRecord oneByPayNo = xcPayRecordService.getOneByPayNo(payNo); PayRecordVO payRecordVO = new PayRecordVO(); BeanUtils.copyProperties(oneByPayNo, payRecordVO); return payRecordVO; }
@Transactional(rollbackFor = Exception.class) @Override public void saveAlipayStatus(PayStatusDTO dto) { String payNo = dto.getOut_trade_no(); XcPayRecord oneByPayNo = xcPayRecordService.getOneByPayNo(payNo); if (oneByPayNo == null) { throw new BizException("找不到相关的支付记录"); } Long orderId = oneByPayNo.getOrderId(); XcOrders xcOrder = xcOrdersService.getById(orderId); if (xcOrder == null) { throw new BizException("找不到相关连的订单"); } String status = oneByPayNo.getStatus(); if (status.equals("601002")) { return; } String tradeStatus = dto.getTrade_status(); if (!tradeStatus.equals("TRADE_SUCCESS")) { return; } oneByPayNo.setStatus("601002"); oneByPayNo.setOutPayNo(dto.getTrade_no()); oneByPayNo.setOutPayChannel("Alipay"); oneByPayNo.setPaySuccessTime(LocalDateTime.now()); xcPayRecordService.updateById(oneByPayNo);
xcOrder.setStatus("600002"); xcOrdersService.updateById(xcOrder);
MqMessage mqMessage = mqMessageService.addMessage("pay_result_notify", xcOrder.getOutBusinessId(), xcOrder.getOrderType(), null); notifyPayResult(mqMessage); }
@Override public void notifyPayResult(MqMessage mqMessage) { String jsonMsg = JSON.toJSONString(mqMessage); Message msgObj = MessageBuilder.withBody(jsonMsg.getBytes()).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build(); CorrelationData correlationData = new CorrelationData(mqMessage.getId().toString()); correlationData.getFuture().addCallback(result -> { if (result != null && result.isAck()) { log.debug("消息发送成功:{}", jsonMsg); mqMessageService.completed(mqMessage.getId()); } else { log.error("消息发送失败,id:{},原因:{}", mqMessage.getId(), result); } }, ex -> { log.error("消息发送异常,id:{},原因:{}", mqMessage.getId(), ex.getMessage()); }); rabbitTemplate.convertAndSend(PayNotifyConfig.PAY_NOTIFY_EXCHANGE_FANOUT, "", msgObj , correlationData); }
public PayStatusDTO queryPayResultFromAlipay(String payNo) { AlipayClient alipayClient = new DefaultAlipayClient(AlipayConfig.URL, APP_ID, APP_PRIVATE_KEY, AlipayConfig.FORMAT, AlipayConfig.CHARSET, ALIPAY_PUBLIC_KEY, AlipayConfig.SIGNTYPE); AlipayTradeQueryRequest request = new AlipayTradeQueryRequest(); JSONObject bizContent = new JSONObject(); bizContent.put("out_trade_no", payNo); request.setBizContent(bizContent.toString()); AlipayTradeQueryResponse response = null; try { response = alipayClient.execute(request); } catch (AlipayApiException e) { throw new BizException("请求支付查询支付结果异常"); } if (!response.isSuccess()) { throw new BizException("请求支付查询支付结果失败"); }
String body = response.getBody(); Map bodyMap = JSON.parseObject(body, Map.class); Map alipayTradeQueryResponse = (Map) bodyMap.get("alipay_trade_query_response");
String tradeNo = (String) alipayTradeQueryResponse.get("trade_no"); String tradeStatus = (String) alipayTradeQueryResponse.get("trade_status"); String totalAmount = (String) alipayTradeQueryResponse.get("total_amount"); PayStatusDTO payStatusDTO = new PayStatusDTO(); payStatusDTO.setOut_trade_no(payNo); payStatusDTO.setTrade_no(tradeNo); payStatusDTO.setTrade_status(tradeStatus); payStatusDTO.setApp_id(APP_ID); payStatusDTO.setTotal_amount(totalAmount); return payStatusDTO; } }
|
学习中心微服务
在学习中心微服务中添加消息队列的依赖信息,并添加yaml配置
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>com.swx</groupId> <artifactId>learning-online-message-sdk</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
|
shared-configs: - data-id: rabbitmq-${spring.profiles.active}.yaml group: learning-online-common refresh: true
|
监听消息队列
在learning-online-learning-service
工程的service.impl
包下创建监听类ReceivePayNotifyService
ReceivePayNotifyService@Slf4j @Service public class ReceivePayNotifyService {
private final MyCourseTablesService myCourseTablesService;
public ReceivePayNotifyService(MyCourseTablesService myCourseTablesService) { this.myCourseTablesService = myCourseTablesService; }
@RabbitListener(queues = PayNotifyConfig.PAY_NOTIFY_QUEUE) public void receive(Message message) { try { Thread.sleep(5000); } catch (InterruptedException e) { log.debug("消费睡眠异常:", e); }
String jsonMessage = new String(message.getBody()); MqMessage mqMessage = JSON.parseObject(jsonMessage, MqMessage.class); String orderType = mqMessage.getBusinessKey2(); if (!orderType.equals("60201")) { return; } String chooseCourseId = mqMessage.getBusinessKey1(); boolean b = myCourseTablesService.payChooseCourseSuccess(chooseCourseId); if (b) { throw new BizException("保存选课记录状态失败"); } } }
|
查询Controller
OrderController@ApiOperation("查询支付结果") @RequestMapping("/payresult") public PayRecordVO payResult(String payNo) { return orderService.queryRequestPay(payNo); }
|