什么是最大努力通知
下面是一个充值的例子:
最大努力通知与可靠消息一致性有什么不同?
可靠消息一致性有什么不同?
1、解决方案思想不同
可靠消息一致性,发起通知方需要保证将消息发出去,并且将消息发到接收通知方 ,消息的可靠性关键由发起通知方来保证。
最大努力通知,发起通知方尽最大的努力将业务处理结果通知到接收通知方,但是可能消息接收不到,此时需要接收通知方主动调用发起通知方的接口查询业务处理结果 ,通知的可靠性关键在接收通知方。
2、两者的业务应用场暑不同
可靠消息一致性关注的是交易过程的事务一致,以异步的方式完成交易。
最大努力通知关注的是交易后的通知事务 ,即将交易结果可靠的通知出去。
3、技术解决方向不同
可靠消息一致性要解决消息从发出到接收的一致性,即消息发出并且被接收到。
最大努力通知无法保证消息从发出到接收的一致性 ,只提供消息接收的可靠性机制。可靠机制是,最大努力的。
解决方案
采用MQ的ack机制就可以实现最大努力通知
方案一
本方案是利用MQ的ack机制由MQ向接收通知方发送通知,流程如下:
发起通知方将通知发送给MQ,使用普通消息机制发送。
注意:如果消息没有发送出去可由接收方主动请求发起通知方查询业务执行结果
接收方监听MQ。
接收方接收消息,业务处理完成回应ack。
接收通知方若没有回应ack则MQ会重复通知。
MQ会按照间隔1min、5min、10min、30min、1h、2h、5h、10h的方式,逐步拉大通知间隔(如果MQ采用rocketMQ,在broker中可进行设置),直到达到通知要求的时间窗口上限。
接收通知方可通过消息校对接口来校对消息一致性。
方案二
方案一和方案二的不同点:
- 方案一种接收通知方与MQ接口,即接收通知方监听MQ,此方案主要应用于内部应用之间的通知
- 方案二由通知程序与MQ接口,通知程序监听MQ,收到MQ的消息后由通知程序通过互联网接口协议调用接收通知方。此方案主要应用于外部应用之间的通知,例如支付宝、微信的支付结果通知。
RocketMQ实现方案一
本案例有账户系统和充值系统两个微服务,其中账户系统的数据库是bank数据库,其中有张三账户。充值系统的数据库使用bank_pay数据库,记录了账户的充值记录。
数据库
bank_pay数据库,记录充值信息
CREATE DATABASE `bank_pay` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci'; CREATE TABLE `account_pay` ( `id` varchar(64) NOT NULL, `account_no` varchar(100) NULL DEFAULT NULL COMMENT '账户', `pay_amount` double NULL DEFAULT NULL COMMENT '充值金额', `result` varchar(20) DEFAULT NULL COMMENT '充值结果:success, fail', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
|
伪代码
充值服务(发送方)
充值微服务实现如下功能:
- 充值接口
- 充值结果查询接口
Dao
@Mapper @Component public interface AccountPayDao { @Insert("insert into account_pay(id, account_no, pay_amount, result) values(#{id}, #{accountNo}, #{payAmount}, #{result})") int insertAccountPay(@Param("id") String id, @Param("accountNo") String accountNo, @Param("payAmount") Double payAmount, @Param("result") String result);
@Select("select id, account_no accountNo, pay_amount payAmount,result from account_pay where id=#{txNo}") AccountPay findByIdTxNo(@Param("txNo") String txNo); }
|
Service
AccountPayServicepublic interface AccountPayService { public AccountPay insertAccountPay(AccountPay accountPay); public AccountPay getAccountPay(String txNo); }
|
实现类
AccountPayServiceImpl@Service @Slf4j public class AccountPayServiceImpl implements AccountPayService {
@Autowire RocketMQTemplate rocketMQTemplate;
@Autowire AccountPayDao accountPayDao;
@Transactional @Override public AccountPay insertAccountPay(AccountPay accountPay) { int result = accountPayDao.insertAccountPay(accountPay.getId(), accountPay.getAccountNo(), accountPay.getPayAmount(), "success"); if (result > 0) { accountPay.setResult("success"); rocketMQTemplate.convertAndSend("topic_notifymsg", accountPay); return accountPay; } return null; }
@Override public AccountPay getAccountPay(String txNo) { return accountPayDao.findByIdTxNo(txNo); } }
|
Controller
AccountPayController@RestController public class AccountPayController { @Autowired AccountPayService accountPayService;
@GetMapping("/paydo") public AccountPay pay(AccountPay accountPay) { String txNo = UUID.randomUUID.toString(); accountPay.setId(txNo); return accountPayService.insertAccountPay(accountPay); }
@GetMapping("/payresult/{txNo}") public AccountPay payresult(@Pathvariable("txNo") String txNo) { return accountPayService.getAccountPay(txNo); } }
|
账户服务(接收方)
账户微服务实现如下功能:
- 监听MQ,接收充值结果,根据充值结果完成账户金额修改。
- 主动查询充值系统,根据充值结果完成账户修改。
Dao
AccountInfoDao@Mapper @Component public interface AccountInfoDao { @Update("update account_info set account_balance= account_balance+#{amount} where account_no = #{accountNo}") int updateAccountBalance(@Param("id") String id, @Param("accountNo") String accountNo, @Param("amount") Double amount);
@Select("select count(1) from de_duplication where tx_no=#{txNo}") int isExistTx(@Param("txNo") String txNo);
@Insert("insert into de_duplication values(#{txNo}, now())") int addTx(@Param("txNo") String txNo); }
|
Client
通过Feign远程调用充值系统的结果查询接口
PayClient
@FeignClient(value = "bank-pay", fallback = PayFallback.class) public interface PayClient { @GetMapping("/pay/payresult/{txNo}") public AccountPay payresult(@Pathvariable("txNo") String txNo); }
|
服务降级方法
PayFallback@Component public class PayFallback implements PayClient { @Override public AccountPay payresult(String txNo) { AccountPay accountPay = new AccountPay(); accountPay.setResult("fail"); return accountPay; } }
|
Service
AccountInfoServicepublic interface AccountInfoService { public void updateAccountBalance(AccountChangeEvent accountChange); public AccountPay queryPayResult(String txNo); }
|
实现该接口
AccountInfoServiceImpl@Service @Slf4j public class AccountInfoServiceImpl implements AccountInfoService {
@Autowire PayClient payClient;
@Autowire AccountPayDao accountInfoDao;
@Transactional @Override public void updateAccountBalance(AccountChangeEvent accountChange) { int existTx = accountInfoDao.isExistTx(accountChange.getTxNo()); if (existTx > 0) { log.info("该消息已处理: {}", JSONObject.toJSONStrong(accountChange)); return; } accountInfoDao.addTx(accountChange.getTxNo()); accountInfoDao.updateAccountBalance(accountChange.getAccountNo(), accountChange.getAmount()); }
@Override public AccountPay queryPayResult(String txNo) { AccountPay accountPay = payClient.queryPayResult(txNo); String result = accountPay.getResult(); if ("success".equals(result)) { AccountChangeEvent accountChangeEvent = new AccountChangeEvent(); accountChangeEvent.setAccountNo(accountPay.getAccountNo()); accountChangeEvent.setAmount(accountPay.getPayAmount()); accountChangeEvent.setTxNo(accountPay.getId()); updateAccountBalance(accountChangeEvent); } return accountPay; } }
|
Listener
监听MQ,接收充值服务发送的消息,成功后更新数据库。
NotifyMsgListerner @RocketMQMessageListener(topic = "topic_notifymsg", consumerGroup = "consumer_gruop_notifymsg_bank") public class NotifyMsgListerner implements RocketMQListener<AccountPay> {
@Autowire AccountInfoService accountInfoService;
@Override public void onMessage(AccountPay accountPay) { if ("success".equals(result)) { AccountChangeEvent accountChangeEvent = new AccountChangeEvent(); accountChangeEvent.setAccountNo(accountPay.getAccountNo()); accountChangeEvent.setAmount(accountPay.getPayAmount()); accountChangeEvent.setTxNo(accountPay.getId()); accountInfoService.updateAccountBalance(accountChangeEvent); } } }
|
Controller
接收方可以主动查询通知方事务结果
AccountInfoController@RestController public class AccountInfoController { @Autowired AccountInfoService accountInfoService;
@GetMapping("/payresult/{txNo}") public AccountPay result(@Pathvariable("txNo") String txNo) { return accountInfoService.queryPayResult(txNo); } }
|