是什么?
可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致
此方案是利用消息中间件完成:
如下图: 事务发起方(消息生产方)将消息发给消息中间件,事务参与方从消息中间件接收消息,事务发起方和消 息中间件之间,事务参与方(消息消费方)和消息中间件之间都是通过网络通信,由于网络通信的不确定性会导致分布式事务问题。
因此可靠消息最终一致性方亲要解决以下几个问题:
本地事务与消息发送的原子性问题
本地事务与消息发送的原子性问题即:事务发起方在本地事务执行成功后消息必须发出去,否则就丢弃消息。即实现本地事务和消息发送的原子性,要么都成功,要么都失败。本地事务与消息发送的原子性问题是实现可靠消息最终一致性方案的关键问题。
先来尝试下这种操作,先发送消息,再操作数据库:
BEGIN TRANSACTION; COMMIT TRANSACTION;
|
这种情况下无法保证数据库操作与发送消息的一致性,因为可能发送消息成功,数据库操作失败。
第二种方案,先进行数据库操作,再发送消息:
BEGIN TRANSACTION; COMMIT TRANSACTION;
|
这种情况下貌似没有问题,如果发送MQ消息失败,就会拋出异常,导致数据库事务回滚。但如果是超时异常,数据库回滚,但MQ其实已经正常发送了,同样会导致不一致。
事务参与方接收消息的可靠性
事务参与方必须能够从消息队列接收到消息,如果接收消息失败可以重复接收消息。
消息重复消费的问题
由于网路存在,若某一个消费节点超时但是消费成功,此时消息中间件会重复投递此消理,就导致了消息的重复消费。
要解决消息重复消费的问题就要实现事务参与方的方法幂等性。
解决方案
本地消息方案
RocketMQ事务消息方案
RocketMQ事务消息设计则主要是为了解决Producer端的消息发送与本地事务执行的原子性问题,RocketMQ的设计中broker与producer端的双向通信能力,使得broker天生可以作为一个事务协调者存在;而RocketMQ本身提供的存储机制为事务消息提供了持久化能力;RocketMQ的高可用机制以及可靠消息设计则为事务消息在系统发生异常时依然能够保证事务的最终一致性。
执行流程如下:
为了方便理解,以注册送积分的例子来描述整个流程。
Producer即MQ发送方,本例中是用户服务,负责新增用户。MQ订阅方即消息消费方,本例中是积分服务,负责新增积分。
1、Producer发送事务消息
Producer(MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费消息的。本例中,Producer发送“增加积分消息”到MQ Server。
2、MQ Server回应消息发送成功
MQ Server接收到Producer发送的消息则回应发送成功表示MQ已接收到消息。
3、Producer执行本地事务
Producer端执行业务代码逻辑,通过本地数据库事务控制。本例中,Producer执行添加用户操作。
4、消息投递
若Producer本地事务执行成功则自动向MQ Server发送commit消息,MQ Server接收到commit消息后将“增加积分消息”状态标记为可消费,此时MQ订阅方(积分服务)可正常消息消息。
若Producer本地事务执行失败则自动向MQ Server发送rollback消息,MQ Server接收到rollback消息后将删除“增加积分消息”。
以上主干流程已由RocketMQ实现,对于开发者来说,用户需要分别实现本地事务执行以及本地事务回查方法,因此只需要关注本地事务的执行状态即可。
Rocket MQ提供RocketMQLocalTransactionListener接口:
public interface RocketMQLocalTransactionListener {
RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg);
RocketMQLocalTransactionState checkLocalTransaction(Message msg); }
|
RocketMQ实现实例
业务说明
本实例通过RocketMQ中间件实现可靠消息最终一致性分布式事务,模拟两个账户的转账交易过程。
两个账户在不同的银行(张三在bank1、李四在bank2),bank1、bank2是两个微服务。交易过程是,张三给李四转账指定金额。
上述交易步骤,张三扣减金额给bank2发转账消息,两个操作必须是一个整体性事务。
数据库
de_duplication表,交易记录表,用于交易幂等控制。
DROP TABLE IF EXISTS `de_duplication`; CREATE TABLE `de_duplication` { `tx_no` varchar(64) NOT NULL, `create_time` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE } ENGINE=InnoDB;
|
伪代码
消息传输的实体类对象
AccountChangeEvent@Data public class AccountChangeEvent implements Serializanble {
private String accountNo;
private double amount;
private String txNo; }
|
发送方
发送方Producer业务逻辑代码
AccountInfoServicepublic interface AccountInfoService {
public void sendUpdateAccountBalance (AccountChangeEvent accountChangeEvent);
public void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent); }
|
实现接口类
AccountInfoServiceImpl@Service public AccountInfoServiceImpl implements AccountInfoService {
@Autowire AccountInfoDao accountInfoDao;
@Autowire RocketMQTemplate rocketMQTemplate;
@Override public void sendUpdateAccountBalance (AccountChangeEvent accountChangeEvent) {
JSONObject jsonObject = new JSONObject(); jsonObject.put("accountChange", accountChangeEvent); Message<String> message = MessageBuilder.withPayload(jsonObject.toJSONStrong()).build();
rocketMQTemplate.sendMessageInTransaction("producer_group_txmsg_bank1", "topic_txmsg", message, null); }
@Override @Transactional(rollbackFor = Exception.class) public void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent) { if (accountInfoDao.isExistTx(accountChangeEvent.getTxNo() > 0)) { return; } accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(), accountChangeEvent.getAmount() * -1); accountInfoDao.addTx(accountChangeEvent.getTxNo()); if (accountChangeEvent.getAmount() == 3) { throw new RuntimeException("人为制造异常"); }
}
}
|
发送方Producer发送消息后,由MQ Server回调的方法实现
@Component @RocketMQTransactionListener public class ProducerTxmsgListener implements RocketMQLocalTransactionListener {
@Autowire AccountInfoService accountInfoService;
@Autowire AccountInfoDao accountInfoDao;
@Override @Transactional(rollbackFor = Exception.class) public RocketMQLocalTransactionState executedTransaction(Message message, Object arg) { try { String messageString = new String(byte[] message.getPayload()); String accountChange = JSONObject.parseObject(messageString).getString("accountChange"); AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChange, AccountChangeEvent.class);
accountInfoService.doUpdateAccountBalance(accountChangeEvent); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { e.printStackTrance(); return RocketMQLocalTransactionState.ROLLBACK; } }
@Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { String messageString = new String(byte[] message.getPayload()); String accountChange = JSONObject.parseObject(messageString).getString("accountChange"); AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChange, AccountChangeEvent.class); if (accountInfoDao.isExistTx(accountChangeEvent.getTxNo()) > 0) { return RocketMQLocalTransactionSta te.COMMIT; } return RocketMQLocalTransactionState.UNKNOWN; } }
|
发送方提供一个转账的接口,用于向MQ发送一条转账消息(Prepared状态)
@RestController public class AccountInfoController { @Autowire private AccountInfoService accountInfoService;
@GetMapping("/transfer") public String transfer(@RequestParam("accountNo") String accountNo, @RequestParam("amount") Double amount) { String tx_no = UUID.randomUUID().toString(); AccountChangeEvent accountChangeEvent = new AccountChangeEvent(accountNo, amount, tx_no);
accountInfoService.sendUpdateAccountBalance(accountChangeEvent); return "转账成功"; } }
|
消费方
定义消费方更新账户的方法
AccountInfoServicepublic interface AccountInfoService {
public void addAccountInfoBalance (AccountChangeEvent accountChangeEvent); }
|
实现上述接口,该方法可能会重复执行,需要做好幂等判断
AccountInfoServiceImpl@Slf4j @Service public AccountInfoServiceImpl implements AccountInfoService {
@Autowire AccountInfoDao accountInfoDao;
@Override @Transactional(rollbackFor = Exception.class) public void addAccountInfoBalance (AccountChangeEvent accountChangeEvent) { log.info("bank2更新本地账号, 账号: {}, 金额: {}", accountChangeEvent.getAccountNo(), accountChangeEvent.getAmount()) if (accountInfoDao.isExistTx(accountChangeEvent.getTxNo()) > 0) { return; } accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(), accountChangeEvent.getAmount()); accountInfoDao.addTx(accountChangeEvent.getTxNo()); if (accountChangeEvent.getAmount() == 4) { throw new RuntimeException("人为制造异常"); } } }
|
消费方监听MQ消息
TxmsgConsumer@Slf4j @Component @RocketMQMessageListener(comsumerGroup = "consumer_group_txmsg_bank2", topic = "topic_txmsg") public class TxmsgConsumer implements RocketMQListener<String> {
@Autowire private AccountInfoService accountInfoService;
@Override public void onMessage(String message) { log.info("开始消费消息: {}", message); String messageString = new String(byte[] message.getPayload()); String accountChange = JSONObject.parseObject(messageString).getString("accountChange"); AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChange, AccountChangeEvent.class); accountChangeEvent.setAccountNo("2"); accountInfoService.addAccountInfoBalance(accountChangeEvent); } }
|
小结
可靠消息最终一致性就是保证消息从生产方经过消息中间件传递到消费方的一致性,本案例使用了RacketMQ 作为消息中间件,RocketMQ主要解决了两个功能:
- 本地事务与消息发送的原子性问题。
- 事务参与方接收消息的可靠性。
可靠消息最终一致性事务适合执行周期长且实时性要求不高的场景。引入消息机制后,同步的事务操作变为基于消息执行的异步操作,避免了分布式事务中的同步阻塞操作的影响,并实现两个服务的解耦。