可靠消息最终一致性是AP的解决方案

是什么?

可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致

此方案是利用消息中间件完成:

如下图: 事务发起方(消息生产方)将消息发给消息中间件,事务参与方从消息中间件接收消息,事务发起方和消 息中间件之间,事务参与方(消息消费方)和消息中间件之间都是通过网络通信,由于网络通信的不确定性会导致分布式事务问题。

因此可靠消息最终一致性方亲要解决以下几个问题:

本地事务与消息发送的原子性问题

本地事务与消息发送的原子性问题即:事务发起方在本地事务执行成功后消息必须发出去,否则就丢弃消息。即实现本地事务和消息发送的原子性,要么都成功,要么都失败。本地事务与消息发送的原子性问题是实现可靠消息最终一致性方案的关键问题。

先来尝试下这种操作,先发送消息,再操作数据库:

BEGIN TRANSACTION;
-- 发送MQ
-- 数据库操作
COMMIT TRANSACTION;

这种情况下无法保证数据库操作与发送消息的一致性,因为可能发送消息成功,数据库操作失败。

第二种方案,先进行数据库操作,再发送消息:

BEGIN TRANSACTION;
-- 数据库操作
-- 发送MQ
COMMIT TRANSACTION;

这种情况下貌似没有问题,如果发送MQ消息失败,就会拋出异常,导致数据库事务回滚。但如果是超时异常,数据库回滚,但MQ其实已经正常发送了,同样会导致不一致。

事务参与方接收消息的可靠性

事务参与方必须能够从消息队列接收到消息,如果接收消息失败可以重复接收消息。

消息重复消费的问题

由于网路存在,若某一个消费节点超时但是消费成功,此时消息中间件会重复投递此消理,就导致了消息的重复消费。

要解决消息重复消费的问题就要实现事务参与方的方法幂等性。

解决方案

本地消息方案

RocketMQ事务消息方案

RocketMQ事务消息设计则主要是为了解决Producer端的消息发送与本地事务执行的原子性问题,RocketMQ的设计中broker与producer端的双向通信能力,使得broker天生可以作为一个事务协调者存在;而RocketMQ本身提供的存储机制为事务消息提供了持久化能力;RocketMQ的高可用机制以及可靠消息设计则为事务消息在系统发生异常时依然能够保证事务的最终一致性。

执行流程如下:

为了方便理解,以注册送积分的例子来描述整个流程。

Producer即MQ发送方,本例中是用户服务,负责新增用户。MQ订阅方即消息消费方,本例中是积分服务,负责新增积分。

1、Producer发送事务消息
Producer(MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费消息的。本例中,Producer发送“增加积分消息”到MQ Server。

2、MQ Server回应消息发送成功
MQ Server接收到Producer发送的消息则回应发送成功表示MQ已接收到消息。

3、Producer执行本地事务
Producer端执行业务代码逻辑,通过本地数据库事务控制。本例中,Producer执行添加用户操作。

4、消息投递
若Producer本地事务执行成功则自动向MQ Server发送commit消息,MQ Server接收到commit消息后将“增加积分消息”状态标记为可消费,此时MQ订阅方(积分服务)可正常消息消息。

若Producer本地事务执行失败则自动向MQ Server发送rollback消息,MQ Server接收到rollback消息后将删除“增加积分消息”。

以上主干流程已由RocketMQ实现,对于开发者来说,用户需要分别实现本地事务执行以及本地事务回查方法,因此只需要关注本地事务的执行状态即可。

Rocket MQ提供RocketMQLocalTransactionListener接口:

public interface RocketMQLocalTransactionListener {
/**
* 发送prepare消息成功此被回调,该方法用于执行本地事务
* @param msg 回传的消息,利用transactionId即可获取该消息的唯一Id
* @param arg 调用send方法时传递的参数,当send时若有额外的参数可以传递到send中,这里能获取到
* @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOWN:回调,主动查询
*/
RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg);

/**
* 主动查询事务状态
* @param msg 利用transactionId来判断这条消息的本地事务执行状态
* @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOWN:回调,主动查询
*/
RocketMQLocalTransactionState checkLocalTransaction(Message msg);
}

RocketMQ实现实例

业务说明

本实例通过RocketMQ中间件实现可靠消息最终一致性分布式事务,模拟两个账户的转账交易过程。

两个账户在不同的银行(张三在bank1、李四在bank2),bank1、bank2是两个微服务。交易过程是,张三给李四转账指定金额。

上述交易步骤,张三扣减金额给bank2发转账消息,两个操作必须是一个整体性事务。

数据库

de_duplication表,交易记录表,用于交易幂等控制。

DROP TABLE IF EXISTS `de_duplication`;
CREATE TABLE `de_duplication` {
`tx_no` varchar(64) NOT NULL,
`create_time` datetime(0) NULL DEFAULT NULL,
PRIMARY KEY (`tx_no`) USING BTREE
} ENGINE=InnoDB;

伪代码

消息传输的实体类对象

AccountChangeEvent
@Data
public class AccountChangeEvent implements Serializanble {
/**
* 账号
*/
private String accountNo;

/**
* 变动金额
*/
private double amount;

/**
* 事务号
*/
private String txNo;
}

发送方

发送方Producer业务逻辑代码

AccountInfoService
public interface AccountInfoService {
/**
* 向MQ发送转账消息
*/
public void sendUpdateAccountBalance (AccountChangeEvent accountChangeEvent);

/**
* 扣减金额,由MQ回调执行
*/
public void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent);
}

实现接口类

AccountInfoServiceImpl
@Service
public AccountInfoServiceImpl implements AccountInfoService {

@Autowire
AccountInfoDao accountInfoDao;

@Autowire
RocketMQTemplate rocketMQTemplate;

/**
* 向 MQ 发送转账消息
*/
@Override
public void sendUpdateAccountBalance (AccountChangeEvent accountChangeEvent) {

// 将accountChangeEvent转为JSON
JSONObject jsonObject = new JSONObject();
jsonObject.put("accountChange", accountChangeEvent);

// 生成message类型
Message<String> message = MessageBuilder.withPayload(jsonObject.toJSONStrong()).build();
// 发送事务消息
/**
* String txProducerGroup 生产组
* String destnation topic
* Message<?> message, 消息内容
* Object arg 参数
*/
rocketMQTemplate.sendMessageInTransaction("producer_group_txmsg_bank1", "topic_txmsg", message, null);
}

@Override
@Transactional(rollbackFor = Exception.class)
public void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
// 幂等判断
if (accountInfoDao.isExistTx(accountChangeEvent.getTxNo() > 0)) {
return;
}
// 扣减金额
accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(), accountChangeEvent.getAmount() * -1);
// 添加事务日志
accountInfoDao.addTx(accountChangeEvent.getTxNo());
if (accountChangeEvent.getAmount() == 3) {
throw new RuntimeException("人为制造异常");
}

}

}

发送方Producer发送消息后,由MQ Server回调的方法实现

@Component
@RocketMQTransactionListener
public class ProducerTxmsgListener implements RocketMQLocalTransactionListener {

@Autowire
AccountInfoService accountInfoService;

@Autowire
AccountInfoDao accountInfoDao;

/**
* 事务消息发送后的回调,当消息发送给 MQ 成功,此方法会被回调
*/
@Override
@Transactional(rollbackFor = Exception.class)
public RocketMQLocalTransactionState executedTransaction(Message message, Object arg) {
try {
// 解析 Message, 转成 AccountChangeEvent
String messageString = new String(byte[] message.getPayload());
String accountChange = JSONObject.parseObject(messageString).getString("accountChange");
AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChange, AccountChangeEvent.class);

// 执行本地事务
accountInfoService.doUpdateAccountBalance(accountChangeEvent);
// 向 MQ 发送 commit 消息,MQ 将消息改为可消费状态
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
e.printStackTrance();
return RocketMQLocalTransactionState.ROLLBACK;
}
// 返回成功类型
}

/**
* 事务状态回查,查询张三是否扣减金额
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
// 解析 Message, 转成 AccountChangeEvent
String messageString = new String(byte[] message.getPayload());
String accountChange = JSONObject.parseObject(messageString).getString("accountChange");
AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChange, AccountChangeEvent.class);

// 判断事物是否执行
if (accountInfoDao.isExistTx(accountChangeEvent.getTxNo()) > 0) {
return RocketMQLocalTransactionSta te.COMMIT;
}
return RocketMQLocalTransactionState.UNKNOWN;
}
}

发送方提供一个转账的接口,用于向MQ发送一条转账消息(Prepared状态)

@RestController
public class AccountInfoController {
@Autowire
private AccountInfoService accountInfoService;

@GetMapping("/transfer")
public String transfer(@RequestParam("accountNo") String accountNo, @RequestParam("amount") Double amount) {
// 创建一个事务ID,作为消息发送到 MQ
String tx_no = UUID.randomUUID().toString();
AccountChangeEvent accountChangeEvent = new AccountChangeEvent(accountNo, amount, tx_no);

// 发送消息
accountInfoService.sendUpdateAccountBalance(accountChangeEvent);
return "转账成功";
}
}

消费方

定义消费方更新账户的方法

AccountInfoService
public interface AccountInfoService {
/**
* 更新账户,增加金额
*/
public void addAccountInfoBalance (AccountChangeEvent accountChangeEvent);
}

实现上述接口,该方法可能会重复执行,需要做好幂等判断

AccountInfoServiceImpl
@Slf4j
@Service
public AccountInfoServiceImpl implements AccountInfoService {

@Autowire
AccountInfoDao accountInfoDao;

@Override
@Transactional(rollbackFor = Exception.class)
public void addAccountInfoBalance (AccountChangeEvent accountChangeEvent) {
log.info("bank2更新本地账号, 账号: {}, 金额: {}", accountChangeEvent.getAccountNo(), accountChangeEvent.getAmount())
// 幂等判断
if (accountInfoDao.isExistTx(accountChangeEvent.getTxNo()) > 0) {
return;
}
// 增加金额
accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(), accountChangeEvent.getAmount());
// 添加事务记录
accountInfoDao.addTx(accountChangeEvent.getTxNo());
if (accountChangeEvent.getAmount() == 4) {
throw new RuntimeException("人为制造异常");
}
}
}

消费方监听MQ消息

TxmsgConsumer
@Slf4j
@Component
@RocketMQMessageListener(comsumerGroup = "consumer_group_txmsg_bank2", topic = "topic_txmsg")
public class TxmsgConsumer implements RocketMQListener<String> {

@Autowire
private AccountInfoService accountInfoService;

@Override
public void onMessage(String message) {
log.info("开始消费消息: {}", message);
// 解析 Message, 转成 AccountChangeEvent
String messageString = new String(byte[] message.getPayload());
String accountChange = JSONObject.parseObject(messageString).getString("accountChange");
AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChange, AccountChangeEvent.class);

// 增加金额
accountChangeEvent.setAccountNo("2"); // 账户设置为李四的
accountInfoService.addAccountInfoBalance(accountChangeEvent);
}
}

小结

可靠消息最终一致性就是保证消息从生产方经过消息中间件传递到消费方的一致性,本案例使用了RacketMQ 作为消息中间件,RocketMQ主要解决了两个功能:

  1. 本地事务与消息发送的原子性问题。
  2. 事务参与方接收消息的可靠性。

可靠消息最终一致性事务适合执行周期长且实时性要求不高的场景。引入消息机制后,同步的事务操作变为基于消息执行的异步操作,避免了分布式事务中的同步阻塞操作的影响,并实现两个服务的解耦。