什么是最大努力通知

下面是一个充值的例子:

最大努力通知与可靠消息一致性有什么不同?

可靠消息一致性有什么不同?

1、解决方案思想不同
可靠消息一致性,发起通知方需要保证将消息发出去,并且将消息发到接收通知方 ,消息的可靠性关键由发起通知方来保证。

最大努力通知,发起通知方尽最大的努力将业务处理结果通知到接收通知方,但是可能消息接收不到,此时需要接收通知方主动调用发起通知方的接口查询业务处理结果 ,通知的可靠性关键在接收通知方。

2、两者的业务应用场暑不同

可靠消息一致性关注的是交易过程的事务一致,以异步的方式完成交易。
最大努力通知关注的是交易后的通知事务 ,即将交易结果可靠的通知出去。

3、技术解决方向不同

可靠消息一致性要解决消息从发出到接收的一致性,即消息发出并且被接收到。
最大努力通知无法保证消息从发出到接收的一致性 ,只提供消息接收的可靠性机制。可靠机制是,最大努力的。

解决方案

采用MQ的ack机制就可以实现最大努力通知

方案一

本方案是利用MQ的ack机制由MQ向接收通知方发送通知,流程如下:

  1. 发起通知方将通知发送给MQ,使用普通消息机制发送。

    注意:如果消息没有发送出去可由接收方主动请求发起通知方查询业务执行结果

  2. 接收方监听MQ。

  3. 接收方接收消息,业务处理完成回应ack。

  4. 接收通知方若没有回应ack则MQ会重复通知。

    MQ会按照间隔1min、5min、10min、30min、1h、2h、5h、10h的方式,逐步拉大通知间隔(如果MQ采用rocketMQ,在broker中可进行设置),直到达到通知要求的时间窗口上限。

  5. 接收通知方可通过消息校对接口来校对消息一致性。

方案二

方案一和方案二的不同点:

  1. 方案一种接收通知方与MQ接口,即接收通知方监听MQ,此方案主要应用于内部应用之间的通知
  2. 方案二由通知程序与MQ接口,通知程序监听MQ,收到MQ的消息后由通知程序通过互联网接口协议调用接收通知方。此方案主要应用于外部应用之间的通知,例如支付宝、微信的支付结果通知。

RocketMQ实现方案一

本案例有账户系统和充值系统两个微服务,其中账户系统的数据库是bank数据库,其中有张三账户。充值系统的数据库使用bank_pay数据库,记录了账户的充值记录。

数据库

bank_pay数据库,记录充值信息

CREATE DATABASE `bank_pay` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
CREATE TABLE `account_pay` (
`id` varchar(64) NOT NULL,
`account_no` varchar(100) NULL DEFAULT NULL COMMENT '账户',
`pay_amount` double NULL DEFAULT NULL COMMENT '充值金额',
`result` varchar(20) DEFAULT NULL COMMENT '充值结果:success, fail',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';

伪代码

充值服务(发送方)

充值微服务实现如下功能:

  1. 充值接口
  2. 充值结果查询接口

Dao

@Mapper
@Component
public interface AccountPayDao {
@Insert("insert into account_pay(id, account_no, pay_amount, result) values(#{id}, #{accountNo}, #{payAmount}, #{result})")
int insertAccountPay(@Param("id") String id, @Param("accountNo") String accountNo, @Param("payAmount") Double payAmount, @Param("result") String result);

@Select("select id, account_no accountNo, pay_amount payAmount,result from account_pay where id=#{txNo}")
AccountPay findByIdTxNo(@Param("txNo") String txNo);
}

Service

AccountPayService
public interface AccountPayService {
// 充值
public AccountPay insertAccountPay(AccountPay accountPay);

// 查询充值结果
public AccountPay getAccountPay(String txNo);
}

实现类

AccountPayServiceImpl
@Service
@Slf4j
public class AccountPayServiceImpl implements AccountPayService {

@Autowire
RocketMQTemplate rocketMQTemplate;

@Autowire
AccountPayDao accountPayDao;

@Transactional
@Override
public AccountPay insertAccountPay(AccountPay accountPay) {
int result = accountPayDao.insertAccountPay(accountPay.getId(), accountPay.getAccountNo(), accountPay.getPayAmount(), "success");
if (result > 0) {
// 发送通知
accountPay.setResult("success");
rocketMQTemplate.convertAndSend("topic_notifymsg", accountPay);
return accountPay;
}
return null;
}

@Override
public AccountPay getAccountPay(String txNo) {
return accountPayDao.findByIdTxNo(txNo);
}
}

Controller

AccountPayController
@RestController
public class AccountPayController {
@Autowired
AccountPayService accountPayService;

@GetMapping("/paydo")
public AccountPay pay(AccountPay accountPay) {
String txNo = UUID.randomUUID.toString();
accountPay.setId(txNo);
return accountPayService.insertAccountPay(accountPay);
}

@GetMapping("/payresult/{txNo}")
public AccountPay payresult(@Pathvariable("txNo") String txNo) {
return accountPayService.getAccountPay(txNo);
}
}

账户服务(接收方)

账户微服务实现如下功能:

  1. 监听MQ,接收充值结果,根据充值结果完成账户金额修改。
  2. 主动查询充值系统,根据充值结果完成账户修改。

Dao

AccountInfoDao
@Mapper
@Component
public interface AccountInfoDao {
// 修改账户金额
@Update("update account_info set account_balance= account_balance+#{amount} where account_no = #{accountNo}")
int updateAccountBalance(@Param("id") String id, @Param("accountNo") String accountNo, @Param("amount") Double amount);

// 查询幂等记录,用于幂等控制
@Select("select count(1) from de_duplication where tx_no=#{txNo}")
int isExistTx(@Param("txNo") String txNo);

// 添加事务记录,用于幂等控制
@Insert("insert into de_duplication values(#{txNo}, now())")
int addTx(@Param("txNo") String txNo);
}

Client

通过Feign远程调用充值系统的结果查询接口

PayClient
/**
* 远程调用充值系统查询接口
*/
@FeignClient(value = "bank-pay", fallback = PayFallback.class)
public interface PayClient {
@GetMapping("/pay/payresult/{txNo}")
public AccountPay payresult(@Pathvariable("txNo") String txNo);
}

服务降级方法

PayFallback
@Component
public class PayFallback implements PayClient {
@Override
public AccountPay payresult(String txNo) {
AccountPay accountPay = new AccountPay();
accountPay.setResult("fail");
return accountPay;
}
}

Service

AccountInfoService
public interface AccountInfoService {
public void updateAccountBalance(AccountChangeEvent accountChange);
public AccountPay queryPayResult(String txNo);
}

实现该接口

AccountInfoServiceImpl
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {

@Autowire
PayClient payClient;

@Autowire
AccountPayDao accountInfoDao;

/**
* 更新账户余额
*/
@Transactional
@Override
public void updateAccountBalance(AccountChangeEvent accountChange) {
// 幂等校验
int existTx = accountInfoDao.isExistTx(accountChange.getTxNo());
if (existTx > 0) {
log.info("该消息已处理: {}", JSONObject.toJSONStrong(accountChange));
return;
}
// 添加事务记录
accountInfoDao.addTx(accountChange.getTxNo());
// 更新账户金额
accountInfoDao.updateAccountBalance(accountChange.getAccountNo(), accountChange.getAmount());
}

/**
* 主动查询充值结果
*
* @param txNo 事务ID
*/
@Override
public AccountPay queryPayResult(String txNo) {
// 主动请求充值系统查询充值结果
AccountPay accountPay = payClient.queryPayResult(txNo);
// 充值结果
String result = accountPay.getResult();
if ("success".equals(result)) {
AccountChangeEvent accountChangeEvent = new AccountChangeEvent();
accountChangeEvent.setAccountNo(accountPay.getAccountNo());
accountChangeEvent.setAmount(accountPay.getPayAmount());
accountChangeEvent.setTxNo(accountPay.getId());
// TODO: 注意这里事务会失效,同类中非事务调用事务
updateAccountBalance(accountChangeEvent);
}
return accountPay;
}
}

Listener

监听MQ,接收充值服务发送的消息,成功后更新数据库。

NotifyMsgListerner
// 监听MQ
@RocketMQMessageListener(topic = "topic_notifymsg", consumerGroup = "consumer_gruop_notifymsg_bank")
public class NotifyMsgListerner implements RocketMQListener<AccountPay> {

@Autowire
AccountInfoService accountInfoService;

@Override
public void onMessage(AccountPay accountPay) {
if ("success".equals(result)) {
AccountChangeEvent accountChangeEvent = new AccountChangeEvent();
accountChangeEvent.setAccountNo(accountPay.getAccountNo());
accountChangeEvent.setAmount(accountPay.getPayAmount());
accountChangeEvent.setTxNo(accountPay.getId());
// TODO: 注意这里事务会失效,同类中非事务调用事务
accountInfoService.updateAccountBalance(accountChangeEvent);
}
}
}

Controller

接收方可以主动查询通知方事务结果

AccountInfoController
@RestController
public class AccountInfoController {
@Autowired
AccountInfoService accountInfoService;

// 主动查询充值结果
@GetMapping("/payresult/{txNo}")
public AccountPay result(@Pathvariable("txNo") String txNo) {
return accountInfoService.queryPayResult(txNo);
}
}