Hmily实现TCC
流程介绍
try: try幂等校验 try悬挂处理 检查余额是否够扣减金额 扣减金额
confirm: 空
cacel: cancel幂等校验 cancel空回滚处理 增加可用余额
|
数据库
每个数据库都要创建try、confirm、cacel三张日志表
CREATE TABLE `local_try_log` ( `tx_no` varchar(64) NOT NULL COMMENT '事务ID', `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 CREATE TABLE `local_confirm_log` ( `tx_no` varchar(64) NOT NULL COMMENT '事务ID', `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 CREATE TABLE `local_cancel_log` ( `tx_no` varchar(64) NOT NULL COMMENT '事务ID', `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
|
伪代码
使用伪代码方式实现TCC流程
远程调用接口
@FeignClient(value="tcc-demo-bank2") public interface Bank2Client {
@GetMapping("/bank2/transfer") @Hmily Boolean transfer(@RequestParam("amount"), Double amount); }
|
发起方,扣减金额
@Slf4j @Service public AccountInfoServiceImpl implements AccountInfoService { @Autowire AccountInfoDao accountInfoDao;
@Autowire Bank2Client bank2Client;
@Override @Transactional @Hmily(confirmMethod="commit", cancelMethod="rollback") public void updateAccountBalance(String accountNo, Double amount) { String transId = HmilyTransactionContextLocal.getInstance().get().getTransactionId(); log.info("bank1 try begin 开始执行... xid: {}", transId); if (accountInfoDao.isExistTry(transId) > 0) { log.info("bank1 try 已经执行,无需重复执行, xid: {}", transId); return; } if (accountInfoDao.isExistConfirm(transId) > 0 || accountInfoDao.isExistCancel(transId) > 0) { log.info("bank1 try悬挂处理 cancel或confirm已经执行, 不允许执行try, xid: {}", transId); return; } if (accountInfoDao.substractAccountBalance(accountNo, amount) <= 0) { log.error("bank1 扣减失败"); throw new RuntimeException("bank1 try 扣减金额失败, xid: " + transId); } accountInfoDao.addTry(transId);
if (bank2Client.transfer(amount)) { throw new RuntimeException("bank1 远程调用 bank2 微服务失败, xid: " + transId); } }
public void commit(String accountNo, Double amount) { String transId = HmilyTransactionContextLocal.getInstance().get().getTransactionId(); log.info("bank1 comfirm begin 开始执行... xid: {}", transId); }
@Transactional public void rollback(String accountNo, Double amount) { String transId = HmilyTransactionContextLocal.getInstance().get().getTransactionId(); log.info("bank1 try cancel 开始执行... xid: {}", transId); if (accountInfoDao.isExistCancel(transId) > 0) { log.info("bank2 cancel 已经执行,无需重复执行, xid: {}", transId); return; } if (accountInfoDao.isExistTry(transId) <= 0) { log.info("bank2 空回滚处理, try 没有执行,不允许cancel执行, xid: {}", transId); return; } accountInfoDao.addAcountBalance(accountNo, amount); accountInfoDao.addCancel(transId); } }
|
被调用方,增加金额
@Slf4j @Service public AccountInfoServiceImpl implements AccountInfoService { @Autowire AccountInfoDao accountInfoDao;
@Override @Hmily(confirmMethod="commit", cancelMethod="rollback") public void updateAccountBalance(String accountNo, Double amount) { String transId = HmilyTransactionContextLocal.getInstance().get().getTransactionId(); log.info("bank2 try begin 开始执行... xid: {}", transId); }
@Transactional public void commit(String accountNo, Double amount) { String transId = HmilyTransactionContextLocal.getInstance().get().getTransactionId(); log.info("bank1 comfirm begin 开始执行... xid: {}", transId); if (accountInfoDao.isExistConfirm() > 0) { log.info("bank2 comfirm 已经执行, 无需重复执行 xid: {}", transId); return; } accountInfoDao.addAcountBalance(accountNo, amount); accountInfoDao.addConfirm(transId); }
public void rollback(String accountNo, Double amount) { String transId = HmilyTransactionContextLocal.getInstance().get().getTransactionId(); log.info("bank1 cancel begin 开始执行... xid: {}", transId); } }
|
Seata实现TCC
为了实现空回滚、防止业务悬挂,以及幂等性要求。我们必须在数据库记录冻结金额的同时,记录当前事务id和执行状态,为此我们设计了一张表:
CREATE TABLE `account_freeze_tbl` ( `xid` varchar (128) NOT NULL, `user_id` varchar (255) DEFAULT NULL COMMENT '用户id', `freeze_money` int(11) unsigned DEFAULT '0' COMMENT,冻结金额’, `state` int (1) DEFAULT NULL COMMENT #事务状态, 0:try, 1: confirm, 2:cancel', PRIMARY KEY (`xid`) USING BTREE ) ENGINE=InnODB DEFAULT CHARSET-utf8 ROW_FORMAT=COMPACT;
|
流程介绍
Try业务
- 记录冻结金额和事务状态到
account_freeze
表
- 扣减account表可用
Confirm业务
- 根据xid删除
account_freeze
表的冻结记录
Cancel业务
- 修改
account_freeze
表,冻冻结金额为0,state为2
- 修改
account
表,恢复可用金额
如何判断是否空回滚
- cancel业务中,根据xid查询
account_freeze
,如果为null则说明try还没做,需要空回滚
如何避免业务悬挂
- try业务中,根据xid查询
account_freeze
,如果 已经存在则证明Cancel已经执行,拒绝执行try业务
伪代码
TCC的Try、Confirm、Cancel方法都需要在接口中基于注解来声明,语法如下:
@LocalTCC public interface AccountTCCService {
@TwoPhaseBusinessAction(name="deduct", commitMethod="confirm", rollbackMethod="cancel") void deduct(@BusinessActionContextParameter(paramName="param") String userId, @BusinessActionContextParameter(paramName="money") double money);
boolean confirm(BusinessActionContext context);
boolean cancel(BusinessActionContext context);
}
|
实现类:
@Slf4j @Service public class AccountTCCServiceImpl implements AccountTCCService {
@Autowired private AccountMapper accountMapper;
@Autowired private AccountFreezeMapper accountFreezeMapper; @Override @Transactional(rollbackFor = Exception.class) void deduct(String userId, double money) { String xid = RootContext.getXID(); AccountFreeze oldFreeze = accountFreezeMapper.selectById(xid); if (oldFreeze != null) { return; } accountMapper.deduct(userId, money); AccountFreeze freeze = new AccountFreeze(); freeze.setUserId(userId); freeze.setFreeMoney(money); freeze.setState(AccountFreeze.State.TRY); freeze.setXid(xid); accountFreezeMapper.insert(freeze); }
@Override boolean confirm(BusinessActionContext ctx) { String xid = RootContext.getXID(); int count = accountFreezeMapper.deleteById(xid); return count == 1; }
@Override boolean cancel(BusinessActionContext ctx) { String xid = RootContext.getXID(); AccountFreeze freeze = accountFreezeMapper.selectById(xid); if (freeze == null) { String userId = ctx.getActionContext("userId").toString() AccountFreeze freeze = new AccountFreeze(); freeze.setUserId(userId); freeze.setFreeMoney(0); freeze.setState(AccountFreeze.State.CANCEL); freeze.setXid(xid); accountFreezeMapper.insert(freeze); return true; } if (freeze.getState() == AccountFreeze.State.CANCEL) { return true; } accountMapper.refund(freeze.getUserId(), freeze.getFreezeMoney()); freeze.setFreeMoney(0); freeze.setState(AccountFreeze.State.CANCEL); int count = accountFreezeMapper.updateById(freeze); return count == 1; } }
|
小结
如果拿TCC事务的处理流程与2PC两阶段提交做比较,2PC通常都是在跨库的DB层面,而TCC则在应用层面的处理,需要通过业务逻辑来实现。这种分布式事务的实现方式的优势在于,可以让应用自己定义数据操作的粒度,使得降低锁冲突、提高吞吐量成为可能。
而不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现try、confirm、cancel三个操作。此外,其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实现不同的回滚策略。
由于从业务服务是同步调用,其结果会影响到主业务服务的决策,因此通用型 TCC 分布式事务解决方案适用于执行时间确定且较短的业务,比如互联网金融企业最核心的三个服务:交易、支付、账务: