Hmily实现TCC

流程介绍

try:
try幂等校验
try悬挂处理
检查余额是否够扣减金额
扣减金额

confirm:


cacel:
cancel幂等校验
cancel空回滚处理
增加可用余额

数据库

每个数据库都要创建try、confirm、cacel三张日志表

CREATE TABLE `local_try_log` (
`tx_no` varchar(64) NOT NULL COMMENT '事务ID',
`create_time` datetime DEFAULT NULL,
PRIMARY KEY (`tx_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
CREATE TABLE `local_confirm_log` (
`tx_no` varchar(64) NOT NULL COMMENT '事务ID',
`create_time` datetime DEFAULT NULL,
PRIMARY KEY (`tx_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
CREATE TABLE `local_cancel_log` (
`tx_no` varchar(64) NOT NULL COMMENT '事务ID',
`create_time` datetime DEFAULT NULL,
PRIMARY KEY (`tx_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

伪代码

使用伪代码方式实现TCC流程

远程调用接口

@FeignClient(value="tcc-demo-bank2")
public interface Bank2Client {

@GetMapping("/bank2/transfer")
@Hmily // 将全局事务信息带到bank2
Boolean transfer(@RequestParam("amount"), Double amount);
}

发起方,扣减金额

/**
* Bank1微服务
*/
@Slf4j
@Service
public AccountInfoServiceImpl implements AccountInfoService {

@Autowire
AccountInfoDao accountInfoDao;

@Autowire
Bank2Client bank2Client; // Feign接口

/**
* 账户扣款,就是TCC的try方法
*/
@Override
@Transactional
@Hmily(confirmMethod="commit", cancelMethod="rollback")
public void updateAccountBalance(String accountNo, Double amount) {
// 获取全局事务ID
String transId = HmilyTransactionContextLocal.getInstance().get().getTransactionId();
log.info("bank1 try begin 开始执行... xid: {}", transId);
// try幂等校验,判断local_try_log表中是否有try日志记录,如果有则不执行
if (accountInfoDao.isExistTry(transId) > 0) {
log.info("bank1 try 已经执行,无需重复执行, xid: {}", transId);
return;
}
// try悬挂处理,如果cacel、confirm有一个已经执行了,try不再执行
if (accountInfoDao.isExistConfirm(transId) > 0 || accountInfoDao.isExistCancel(transId) > 0) {
log.info("bank1 try悬挂处理 cancel或confirm已经执行, 不允许执行try, xid: {}", transId);
return;
}
// 扣减金额,检查余额是足够 SET banlance = banlance- #{amount} WHERE banlance >= #{amount}
if (accountInfoDao.substractAccountBalance(accountNo, amount) <= 0) {
log.error("bank1 扣减失败");
throw new RuntimeException("bank1 try 扣减金额失败, xid: " + transId);
}
// 新增 try 执行记录,用于幂等判断
accountInfoDao.addTry(transId);

// 远程调用李四,转账
if (bank2Client.transfer(amount)) {
throw new RuntimeException("bank1 远程调用 bank2 微服务失败, xid: " + transId);
}
}

/**
* confirm方法
*/
public void commit(String accountNo, Double amount) {
// 获取全局事务ID
String transId = HmilyTransactionContextLocal.getInstance().get().getTransactionId();
log.info("bank1 comfirm begin 开始执行... xid: {}", transId);
}

/**
* cancel方法
*/
@Transactional
public void rollback(String accountNo, Double amount) {
// 获取全局事务ID
String transId = HmilyTransactionContextLocal.getInstance().get().getTransactionId();
log.info("bank1 try cancel 开始执行... xid: {}", transId);
// cancel幂等校验
if (accountInfoDao.isExistCancel(transId) > 0) {
log.info("bank2 cancel 已经执行,无需重复执行, xid: {}", transId);
return;
}
// cancel空回滚处理,如果try没有执行,cancel不允许执行
if (accountInfoDao.isExistTry(transId) <= 0) {
log.info("bank2 空回滚处理, try 没有执行,不允许cancel执行, xid: {}", transId);
return;
}
// 增加可用余额
accountInfoDao.addAcountBalance(accountNo, amount);
// 增加一条 cancel 的执行记录
accountInfoDao.addCancel(transId);
}
}

被调用方,增加金额

/**
* Bank2微服务
*/
@Slf4j
@Service
public AccountInfoServiceImpl implements AccountInfoService {

@Autowire
AccountInfoDao accountInfoDao;

/**
* 账户扣款,就是TCC的try方法
*/
@Override
@Hmily(confirmMethod="commit", cancelMethod="rollback")
public void updateAccountBalance(String accountNo, Double amount) {
// 获取全局事务ID
String transId = HmilyTransactionContextLocal.getInstance().get().getTransactionId();
log.info("bank2 try begin 开始执行... xid: {}", transId);
}

/**
* confirm方法
*/
@Transactional
public void commit(String accountNo, Double amount) {
// 获取全局事务ID
String transId = HmilyTransactionContextLocal.getInstance().get().getTransactionId();
log.info("bank1 comfirm begin 开始执行... xid: {}", transId);
if (accountInfoDao.isExistConfirm() > 0) {
log.info("bank2 comfirm 已经执行, 无需重复执行 xid: {}", transId);
return;
}
// 增加金额
accountInfoDao.addAcountBalance(accountNo, amount);
// 新增 confirm 执行记录,用于幂等判断
accountInfoDao.addConfirm(transId);
}

/**
* cancel方法
*/
public void rollback(String accountNo, Double amount) {
// 获取全局事务ID
String transId = HmilyTransactionContextLocal.getInstance().get().getTransactionId();
log.info("bank1 cancel begin 开始执行... xid: {}", transId);
}
}

Seata实现TCC

为了实现空回滚、防止业务悬挂,以及幂等性要求。我们必须在数据库记录冻结金额的同时,记录当前事务id和执行状态,为此我们设计了一张表:

CREATE TABLE `account_freeze_tbl` (
`xid` varchar (128) NOT NULL,
`user_id` varchar (255) DEFAULT NULL COMMENT '用户id',
`freeze_money` int(11) unsigned DEFAULT '0' COMMENT,冻结金额’,
`state` int (1) DEFAULT NULL COMMENT #事务状态, 0:try, 1: confirm, 2:cancel',
PRIMARY KEY (`xid`) USING BTREE
) ENGINE=InnODB DEFAULT CHARSET-utf8 ROW_FORMAT=COMPACT;

流程介绍

Try业务

  • 记录冻结金额和事务状态到account_freeze
  • 扣减account表可用

Confirm业务

  • 根据xid删除account_freeze表的冻结记录

Cancel业务

  • 修改account_freeze表,冻冻结金额为0,state为2
  • 修改account表,恢复可用金额

如何判断是否空回滚

  • cancel业务中,根据xid查询account_freeze,如果为null则说明try还没做,需要空回滚

如何避免业务悬挂

  • try业务中,根据xid查询account_freeze,如果 已经存在则证明Cancel已经执行,拒绝执行try业务

伪代码

TCC的Try、Confirm、Cancel方法都需要在接口中基于注解来声明,语法如下:

@LocalTCC
public interface AccountTCCService {
/**
* Try逻辑,@TwoPhaseBusinessAction中的name属性要与当前方法名一致,用于指定Try逻辑对应的方法
*/
@TwoPhaseBusinessAction(name="deduct", commitMethod="confirm", rollbackMethod="cancel")
void deduct(@BusinessActionContextParameter(paramName="param") String userId,
@BusinessActionContextParameter(paramName="money") double money);

/**
* 二阶段confirm确认方法,可以另命名,但要保证与commitMethod一致
*
* @param context 上下文,可以传递 try 方法的参数
* @return boolean 执行是否成功
*/
boolean confirm(BusinessActionContext context);

/**
* 二阶段回滚方法,但要保证与rollbackMethod一致
*
* @param context 上下文,可以传递 try 方法的参数
* @return boolean 执行是否成功
*/
boolean cancel(BusinessActionContext context);

}

实现类:

@Slf4j
@Service
public class AccountTCCServiceImpl implements AccountTCCService {

@Autowired
private AccountMapper accountMapper;

@Autowired
private AccountFreezeMapper accountFreezeMapper;

@Override
@Transactional(rollbackFor = Exception.class)
void deduct(String userId, double money) {
// 获取事务ID
String xid = RootContext.getXID();
// 判断freeze是否有冻结记录,如果有一定是CANCEL执行过了
AccountFreeze oldFreeze = accountFreezeMapper.selectById(xid);
if (oldFreeze != null) {
// CANCEL执行过,拒绝业务
return;
}
// 扣减可用余额,余额的数据库字段为 unsigned,如果扣减为负报异常,直接回滚
accountMapper.deduct(userId, money);
// 记录冻结金额,事务状态
AccountFreeze freeze = new AccountFreeze();
freeze.setUserId(userId);
freeze.setFreeMoney(money);
freeze.setState(AccountFreeze.State.TRY);
freeze.setXid(xid);
accountFreezeMapper.insert(freeze);
}

@Override
boolean confirm(BusinessActionContext ctx) {
// 获取事务ID
String xid = RootContext.getXID();
// 删除冻结金额
int count = accountFreezeMapper.deleteById(xid);
return count == 1;
}

/**
* 二阶段回滚方法,但要保证与rollbackMethod一致
*
* @param context 上下文,可以传递 try 方法的参数
* @return boolean 执行是否成功
*/
@Override
boolean cancel(BusinessActionContext ctx) {
// 获取事务ID
String xid = RootContext.getXID();
AccountFreeze freeze = accountFreezeMapper.selectById(xid);
// 判断空回滚,判断freeze是否为null,为null证明try没执行,需要空回滚
if (freeze == null) {
String userId = ctx.getActionContext("userId").toString()
AccountFreeze freeze = new AccountFreeze();
freeze.setUserId(userId);
freeze.setFreeMoney(0);
freeze.setState(AccountFreeze.State.CANCEL);
freeze.setXid(xid);
accountFreezeMapper.insert(freeze);
return true;
}
// 幂等判断
if (freeze.getState() == AccountFreeze.State.CANCEL) {
// 已经处理过,无需处理
return true;
}
// 恢复可用金额
accountMapper.refund(freeze.getUserId(), freeze.getFreezeMoney());
// 清空冻结金额,状态置为CANCEL,避免业务悬挂
freeze.setFreeMoney(0);
freeze.setState(AccountFreeze.State.CANCEL);
int count = accountFreezeMapper.updateById(freeze);
return count == 1;
}
}

小结

如果拿TCC事务的处理流程与2PC两阶段提交做比较,2PC通常都是在跨库的DB层面,而TCC则在应用层面的处理,需要通过业务逻辑来实现。这种分布式事务的实现方式的优势在于,可以让应用自己定义数据操作的粒度,使得降低锁冲突、提高吞吐量成为可能

而不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现try、confirm、cancel三个操作。此外,其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实现不同的回滚策略。

由于从业务服务是同步调用,其结果会影响到主业务服务的决策,因此通用型 TCC 分布式事务解决方案适用于执行时间确定且较短的业务,比如互联网金融企业最核心的三个服务:交易、支付、账务: