14-第13课:分布式事务
首先我们应知道,事务是为了保证数据的一致性而产生的。那么分布式事务,顾名思义,就是我们要保证分布在不同数据库、不同服务器、不同应用之间的数据一致性。
为什么需要分布式事务?
最传统的架构是单一架构,数据是存放在一个数据库上的,采用数据库的事务就能满足我们的要求。随着业务的不断扩张,数据的不断增加,单一数据库已经到达了一个瓶颈,因此我们需要对数据库进行分库分表。为了保证数据的一致性,可能需要不同的数据库之间的数据要么同时成功,要么同时失败,否则可能导致产生一些脏数据,也可能滋生 Bug。
在这种情况下,分布式事务思想应运而生。
应用场景
分布式事务的应用场景很广,我也无法一一举例,本文列举出比较常见的场景,以便于读者在实际项目中,在用到了一些场景时即可考虑分布式事务。
支付
最经典的场景就是支付了,一笔支付,是对买家账户进行扣款,同时对卖家账户进行加钱,这些操作必须在一个事务里执行,要么全部成功,要么全部失败。而对于买家账户属于买家中心,对应的是买家数据库,而卖家账户属于卖家中心,对应的是卖家数据库,对不同数据库的操作必然需要引入分布式事务。
在线下单
买家在电商平台下单,往往会涉及到两个动作,一个是扣库存,第二个是更新订单状态,库存和订单一般属于不同的数据库,需要使用分布式事务保证数据一致性。
银行转账
账户 A 转账到账户 B,实际操作是账户 A 减去相应金额,账户 B 增加相应金额,在分库分表的前提下,账户 A 和账户 B 可能分别存储在不同的数据库中,这时需要使用分布式事务保证数据库一致性。否则可能导致的后果是 A 扣了钱 B 却没有增加钱,或者 B 增加了钱 A 却没有扣钱。
SpringBoot 集成 Atomikos 实现分布式事务
Atomikos 简介
Atomikos 是一个为 Java 平台提供增值服务的开源类事务管理器。
以下是包括在这个开源版本中的一些功能:
- 全面崩溃 / 重启恢复;
- 兼容标准的 SUN 公司 JTA API;
- 嵌套事务;
- 为 XA 和非 XA 提供内置的 JDBC 适配器。
注释:XA 协议由 Tuxedo 首先提出的,并交给 X/Open 组织,作为资源管理器(数据库)与事务管理器的接口标准。目前,Oracle、Informix、DB2 和 Sybase 等各大数据库厂家都提供对 XA 的支持。XA 协议采用两阶段提交方式来管理分布式事务。XA 接口提供资源管理器与事务管理器之间进行通信的标准接口。XA 协议包括两套函数,以
xa_
开头的及以ax_
开头的。
具体实现
主要两步:添加jta-atomikos依赖、通过Atomikos创建XA事务的数据源
在本地创建两个数据库:test01,test02,并且创建相同的数据库表:
create database if not exists test01; use test01; create table if not exists test_user ( id int auto_increment primary key, name char(20) not null, age tinyint not null ); create database if not exists test02; use test02; create table if not exists test_user ( id int auto_increment primary key, name char(20) not null, age tinyint not null );
改造上篇的工程,在 pom.xml 增加以下依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.40</version> </dependency>
修改配置文件 application.yml 如下:
mysql: datasource: test1: url: jdbc:mysql://localhost:3306/test01?useUnicode=true&characterEncoding=utf-8 username: root password: 1qaz2wsx minPoolSize: 3 maxPoolSize: 25 maxLifetime: 20000 borrowConnectionTimeout: 30 loginTimeout: 30 maintenanceInterval: 60 maxIdleTime: 60 testQuery: select 1 test2: url: jdbc:mysql://localhost:3306/test02?useUnicode=true&characterEncoding=utf-8 username: root password: 1qaz2wsx minPoolSize: 3 maxPoolSize: 25 maxLifetime: 20000 borrowConnectionTimeout: 30 loginTimeout: 30 maintenanceInterval: 60 maxIdleTime: 60 testQuery: select 1
创建以下类:
- 创建配置类
@Data public class DBConfig { private String url; private String username; private String password; private int minPoolSize; private int maxPoolSize; private int maxLifetime; private int borrowConnectionTimeout; private int loginTimeout; private int maintenanceInterval; private int maxIdleTime; private String testQuery; }
@ConfigurationProperties(prefix = "mysql.datasource.test1") @SpringBootConfiguration public class DBConfig1 extends DBConfig { }
- 通过Atomikos创建XA事务的数据源
@SpringBootConfiguration @MapperScan(basePackages = "com.lynn.demo.test01", sqlSessionTemplateRef = "sqlSessionTemplate1") public class MyBatisConfig1 { // 配置数据源 @Primary @Bean(name = "dataSource1") public DataSource dataSource(DBConfig1 testConfig) throws SQLException { MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource(); mysqlXaDataSource.setUrl(testConfig.getUrl()); mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true); mysqlXaDataSource.setPassword(testConfig.getPassword()); mysqlXaDataSource.setUser(testConfig.getUsername()); mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true); AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean(); xaDataSource.setXaDataSource(mysqlXaDataSource); xaDataSource.setUniqueResourceName("testDataSource1"); xaDataSource.setMinPoolSize(testConfig.getMinPoolSize()); xaDataSource.setMaxPoolSize(testConfig.getMaxPoolSize()); xaDataSource.setMaxLifetime(testConfig.getMaxLifetime()); xaDataSource.setBorrowConnectionTimeout(testConfig.getBorrowConnectionTimeout()); xaDataSource.setLoginTimeout(testConfig.getLoginTimeout()); xaDataSource.setMaintenanceInterval(testConfig.getMaintenanceInterval()); xaDataSource.setMaxIdleTime(testConfig.getMaxIdleTime()); xaDataSource.setTestQuery(testConfig.getTestQuery()); return xaDataSource; } @Primary @Bean(name = "sqlSessionFactory1") public SqlSessionFactory sqlSessionFactory(@Qualifier("dataSource1") DataSource dataSource) throws Exception { SqlSessionFactoryBean bean = new SqlSessionFactoryBean(); bean.setDataSource(dataSource); return bean.getObject(); } @Primary @Bean(name = "sqlSessionTemplate1") public SqlSessionTemplate sqlSessionTemplate(@Qualifier("sqlSessionFactory1") SqlSessionFactory sqlSessionFactory) throws Exception { return new SqlSessionTemplate(sqlSessionFactory); } }
在 com.lynn.demo.test01 和 com.lynn.demo.test02 中分别创建以下 mapper:
@Mapper public interface UserMapper1 { @Insert("insert into test_user(name, age) values(#{name}, #{age})") void addUser(@Param("name")String name, @Param("age") int age); }
创建 service 类:
@Service public class UserService { @Autowired private UserMapper1 userMapper1; @Autowired private UserMapper2 userMapper2; @Transactional public void addUser(User user) throws Exception { userMapper1.addUser(user.getName(), user.getAge()); userMapper2.addUser(user.getName(), user.getAge()); } }
创建单元测试类进行测试:
@RunWith(SpringRunner.class) @SpringBootTest(classes = Application.class) public class TestDB { @Autowired private UserService userService; @Test public void test() throws Exception { User user = new User(); user.setName("lynn"); user.setAge(10); userService.addUser(user); } }
经过测试,如果没有报错,则数据被分别添加到两个数据库表中,如果有报错,则数据不会增加。
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 tuyrk@qq.com
文章标题:14-第13课:分布式事务
文章字数:1.5k
本文作者:神秘的小岛岛
发布时间:2020-07-06, 00:55:00
最后更新:2020-07-14, 23:05:09
原始链接:https://www.tuyrk.cn/gitchat/springcloud-quickly/14-springcloud-atomikos/版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。