本文基于Seata 1.7.0搭建分布式事务Demo,实现简单的转账业务。使用Nacos作为配置中心和注册中心,使用Feign远程调用微服务。

本Demo工程的Git地址为:https://gitee.com/SX-Code/dtx-seata-demo

业务流程

转账流程

Seata执行流程

其中,TC 为单独部署的 Server 服务端,TM 和 RM 为嵌入到应用中的 Client 客户端。

细节说明:

  • TM 请求 TC 开启一个全局事务。TC 会生成一个 XID 作为该全局事务的编号。

    XID,会在微服务的调用链路中传播,保证将多个微服务的子事务关联在一起。

  • RM 请求 TC 将本地事务注册为全局事务的分支事务,通过全局事务的 XID 进行关联。

  • TM 请求 TC 告诉 XID 对应的全局事务是进行提交还是回滚。

  • TC 驱动 RM 们将 XID 对应的自己的本地事务进行提交还是回滚。

Seata Server搭建

服务端使用Nacos作为配置中心和注册中心,数据库则选择mysql。

下载 Seata

Seata TC服务源码下载地址:

https://github.com/seata/seata/releases

选择想要的 Seata 版本。这里,我们选择 v1.7.0 最新版本。

进入/opt目录:

cd /opt

下载文件

wget https://download.yzuu.cf/seata/seata/releases/download/v1.7.0/seata-server-1.7.0.tar.gz

解压:

tar -zxvf seata-server-1.7.0.tar.gz

查看目录:

$ cd seata
$ ls -ls
total 32
-rw-r--r--@ 1 502 staff 1388 1 6 2023 Dockerfile
-rw-r--r--@ 1 502 staff 12079 1 6 2023 LICENSE
drwxr-xr-x 5 root wheel 160 8 22 21:58 bin # 执行脚本
drwxr-xr-x 6 root wheel 192 8 22 21:58 conf # 配置文件
drwxr-xr-x 3 root wheel 96 8 22 21:58 ext
drwxr-xr-x@ 225 502 staff 7200 7 11 16:46 lib # seata-*.jar + 依赖库
drwxr-xr-x@ 14 502 staff 448 8 23 10:52 logs
drwxr-xr-x 5 root wheel 160 8 22 21:58 script # 数据库脚本等配置文件
drwxr-xr-x 3 root wheel 96 8 22 21:58 target

配置 Seata

进入/opt/seata/conf/目录编辑配置文件application.yml,参考配置信息在application.example.yml中,可以参考着配置。

application.yml
server:
port: 7091

spring:
application:
name: seata-tc-server

logging:
config: classpath:logback-spring.xml
file:
path: ${user.home}/logs/seata
extend:
logstash-appender:
destination: 127.0.0.1:4560
kafka-appender:
bootstrap-servers: 127.0.0.1:9092
topic: logback_to_logstash

console:
user:
username: seata
password: seata
seata:
config:
# support: nacos, consul, apollo, zk, etcd3
type: nacos
nacos:
server-addr: xxx.xxx.xxx.xxx:8848
namespace:
group: SEATA_GROUP
username: nacos
password: nacos
data-id: seataServer.properties # nacos配置中心文件ID
registry:
# support: nacos, eureka, redis, zk, consul, etcd3, sofa
type: nacos
nacos:
application: seata-tc-server
server-addr: xxx.xxx.xxx.xxx:8848 # nacos地址
group: SEATA_GROUP # 组
namespace:
cluster: SH # 集群
username: nacos
password: nacos
# server:
# service-port: 8091 #If not configured, the default is '${server.port} + 1000'
security:
secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
tokenValidityInMilliseconds: 1800000
ignore:
urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login%

剩余的配置我们放在Nacos配置中心

所有可配置内容都在/opt/seata/script/config-center/config.txt

  • Data ID:seataServer.properties

  • Group:SEATA_GROUP

  • 配置内容:

    store.mode=db
    store.db.datasource=druid
    store.db.dbType=mysql
    store.db.driverClassName=com.mysql.cj.jdbc.Driver
    store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useSSL=false&serverTimezone=UTC&characterEncoding=utf-8&allowPublicKeyRetrieval=true
    store.db.user=root
    store.db.password=root
    store.db.minConn=10
    store.db.maxConn=100
    store.db.globalTable=global_table
    store.db.branchTable=branch_table
    store.db.lockTable=lock_table
    store.db.distributedLockTable=distributed_lock
    store.db.queryLimit=1000
    store.db.maxWait=5000

初始化数据库

创建数据库seata,并使用 mysql.sql 脚本,初始化 Seata TC Server 的 db 数据库。脚本内容如下:

数据库脚本文件在/opt/seata/script/server/db/mysql.sql

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128%} NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(128),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`status` TINYINT NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_status` (`status`),
KEY `idx_branch_id` (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

CREATE TABLE IF NOT EXISTS `distributed_lock`
(
`lock_key` CHAR(20) NOT NULL,
`lock_value` VARCHAR(20) NOT NULL,
`expire` BIGINT,
primary key (`lock_key`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);

启动 TC Server

进入到/opt/seata/bin,执行如下命令

sudo ./seata-server.sh 

使用下面命令查看日志:

cat check the /opt/seata/logs/start.out

可以在Nacos中查看服务,集群也是正常的

准备数据库

准备2个数据:bank1bank2,两个数据库都有两个表account_infoundo_log,表结构一致。

数据库bank1表的脚本文件:

CREATE TABLE `account_info` (
`id` bigint NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '户主姓名',
`account_no` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '银行卡号',
`account_password` varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '账户密码',
`account_balance` decimal(10,2) DEFAULT NULL COMMENT '账户余额',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
INSERT INTO `account_info` VALUES (1, '张三的账户', '1', NULL, 1000.00);

数据库bank2表的脚本文件

CREATE TABLE `account_info` (
`id` bigint NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '户主姓名',
`account_no` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '银行卡号',
`account_password` varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '账户密码',
`account_balance` decimal(10,2) DEFAULT NULL COMMENT '账户余额',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
INSERT INTO `account_info` VALUES (1, '李四的账户', '2', NULL, 0.00);

两个数据的undo_log表如下:

CREATE TABLE `undo_log` (
`id` bigint NOT NULL AUTO_INCREMENT,
`branch_id` bigint NOT NULL,
`xid` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`context` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`branch_id`,`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

准备微服务项目

项目使用微服务分模块开发方式,两个服务之间通过Feign进行远程调用。

项目结构

创建一个父工程dtx-seata-demo作版本控制,在其下创建两个子模块提供服务

dtx-seata-demo
├── dtx-seata-bank1
└── dtx-seata-bank2

版本控制

在父工程dtx-seata-demo的pom文件中添加版本控制信息

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.swx</groupId>
<artifactId>dtx-seata-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>dtx-seata-bank1</module>
<module>dtx-seata-bank2</module>
</modules>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.3.9.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Hoxton.SR10</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.2.5.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.23</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

完善转账服务

打开子模块dtx-seata-bank1,该模块操作数据库bank1。在减去自身金额时,使用Feign远程调用服务增加bank2的金额。

依赖信息

添加服务所需依赖信息:

pom.xml
<dependencies>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 微服务服务上下文 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
</dependency>
<!-- Feign远程调用 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<!-- Feign连接池 -->
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-httpclient</artifactId>
</dependency>
<!-- Nacos注册中心 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- 数据库连接 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- mybatis -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>
<!-- seata 分布式事务 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
</dependency>
</dependencies>

配置信息

dtx-seata-bank1模块的resources下创建bootstrap.yml,其中配置项目端口和路径、数据库源、Nacos注册中心、开启Feign熔断降级、配置Seata,以便找到Seata TC。

bootstrap.yml
server:
port: 56081
servlet:
context-path: /bank1

spring:
application:
name: seata-demo-bank1
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql:///bank1?useSSL=false&serverTimezone=UTC&characterEncoding=utf-8&allowPublicKeyRetrieval=true
username: root
password: swx852345
cloud:
nacos:
discovery:
server-addr: 124.221.23.47:8848

feign:
hystrix:
enabled: true
httpclient:
enabled: true

seata:
registry:
type: nacos
nacos:
application: seata-tc-server
server-addr: 124.221.23.47:8848
namespace: ""
group: SEATA_GROUP
username: nacos
password: nacos
# 使用事务组 seata-bank 和 组映射可以找到集群 SH
tx-service-group: seata-bank
service:
vgroup-mapping:
seata-bank: SH

启动类

在包com.swx.bank1下创建启动类

Bank1Application
@SpringBootApplication
@EnableTransactionManagement
@EnableFeignClients(basePackages = {"com.swx.bank1.spring"})
public class Bank1Application {
public static void main(String[] args) {
SpringApplication.run(Bank1Application.class, args);
}
}

实体类

在包com.swx.bank1.entry下创建AccountInfo实体类

AccountInfo
@Data
@TableName("account_info")
public class AccountInfo {

@TableId(value = "id", type = IdType.AUTO)
private Long id;

@TableField("account_name")
private String accountName;

@TableField("account_no")
private String accountNo;

@TableField("account_password")
private String accountPassword;

@TableField("account_balance")
private Double accountBalance;
}

DAO层

该层负责操作数据库,在包com.swx.bank1.mapper下创建AccountInfoMapper接口

AccountInfoMapper
@Mapper
public interface AccountInfoMapper {
@Update("UPDATE account_info SET account_balance = account_balance + #{amount} WHERE account_no = #{accountNo}")
int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
}

这个增加金额的SQL语句漏洞百出,但这不影响测试Seata分布式事务,请忽略!

FeignClient

该层负责远程调用,在包com.swx.bank1.spring下创建Bank2Client接口

Bank2Client
// Feign整合了Ribbon,会从注册中心获取实例,这里写实例名就行了
@FeignClient(value = "seata-demo-bank2", fallback = Bank2ClientFallback.class)
public interface Bank2Client {
// 远程调用李四的微服务
@GetMapping("/bank2/transfer")
public String transfer(@RequestParam("amount") Double amount);
}

添加熔断降级,请求失败时的操作

@Component
public class Bank2ClientFallback implements Bank2Client {
@Override
public String transfer(Double amount) {
return "error fallback";
}
}

Service层

该层负责操作事务,在包com.swx.bank1.service下创建AccountInfoService接口

AccountInfoService
public interface AccountInfoService {
public void updateAccountBalance(String accountNo, Double amount);
}

在包com.swx.bank1.service.impl下实现该接口:

@Transactional(rollbackFor = Exception.class)为开启本地事务
@GlobalTransactional为开启全局事务,Seata事务

AccountInfoServiceImpl
@Slf4j
@Service
public class AccountInfoServiceImpl implements AccountInfoService {

private final AccountInfoMapper accountInfoMapper;
private final Bank2Client bank2Client;

public AccountInfoServiceImpl(AccountInfoMapper accountInfoMapper, Bank2Client bank2Client) {
this.accountInfoMapper = accountInfoMapper;
this.bank2Client = bank2Client;
}

@Transactional(rollbackFor = Exception.class)
@GlobalTransactional
@Override
public void updateAccountBalance(String accountNo, Double amount) {
log.info("bank1 service begin, XID: {}", RootContext.getXID());
// 扣减张三的金额
accountInfoMapper.updateAccountBalance(accountNo, amount * -1);
// 调用李四微服务,转账
String transfer = bank2Client.transfer(amount);
if (transfer.equals("error fallback")) {
// 调用李四微服务异常
throw new RuntimeException("调用李四微服务异常");
}
if (amount == 2) {
throw new RuntimeException("bank1 make exception.");
}
}
}

Controller层

Bank1Controller
@RestController
public class Bank1Controller {
private final AccountInfoService accountInfoService;

public Bank1Controller(AccountInfoService accountInfoService) {
this.accountInfoService = accountInfoService;
}

@GetMapping("/transfer")
public String transfer(Double amount) {
// 张三转账
accountInfoService.updateAccountBalance("1", amount);
return "bank1:" + amount;
}
}

复制转账服务

将转账业务复制一份到dtx-seata-bank2中,将所有 bank1 更改为 bank2

例如配置文件更该为如下:

bootstrap.yml
server:
port: 56082
servlet:
context-path: /bank2

spring:
application:
name: seata-demo-bank2
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql:///bank2?useSSL=false&serverTimezone=UTC&characterEncoding=utf-8&allowPublicKeyRetrieval=true
username: root
password: swx852345

Service更改如下:

AccountInfoServiceImpl
@Slf4j
@Service
public class AccountInfoServiceImpl implements AccountInfoService {

private final AccountInfoMapper accountInfoMapper;

public AccountInfoServiceImpl(AccountInfoMapper accountInfoMapper) {
this.accountInfoMapper = accountInfoMapper;
}

@Transactional(rollbackFor = Exception.class)
@Override
public void updateAccountBalance(String accountNo, Double amount) {
log.info("bank2 service begin, XID: {}", RootContext.getXID());
// 增加李四的金额
accountInfoMapper.updateAccountBalance(accountNo, amount);
if (amount == 3) {
throw new RuntimeException("bank2 make exception.");
}
}
}

测试事务

正常流程

浏览器访问:http://localhost:56081/bank1/transfer?amount=100

调用者异常

浏览器访问:http://localhost:56081/bank1/transfer?amount=2

被调者异常

浏览器访问:http://localhost:56081/bank1/transfer?amount=3

分布式服务失效

Feign导致XID为null

使用Feign远程调用,被调方法打印的XID为null。检查自己的Maven依赖,确保引入的是

<!-- seata 分布式事务 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
</dependency>