14-第13课:分布式事务

  1. 为什么需要分布式事务?
  2. 应用场景
    1. 支付
    2. 在线下单
    3. 银行转账
  3. SpringBoot 集成 Atomikos 实现分布式事务
    1. Atomikos 简介
    2. 具体实现

首先我们应知道,事务是为了保证数据的一致性而产生的。那么分布式事务,顾名思义,就是我们要保证分布在不同数据库、不同服务器、不同应用之间的数据一致性。

为什么需要分布式事务?

最传统的架构是单一架构,数据是存放在一个数据库上的,采用数据库的事务就能满足我们的要求。随着业务的不断扩张,数据的不断增加,单一数据库已经到达了一个瓶颈,因此我们需要对数据库进行分库分表。为了保证数据的一致性,可能需要不同的数据库之间的数据要么同时成功,要么同时失败,否则可能导致产生一些脏数据,也可能滋生 Bug。

在这种情况下,分布式事务思想应运而生。

应用场景

分布式事务的应用场景很广,我也无法一一举例,本文列举出比较常见的场景,以便于读者在实际项目中,在用到了一些场景时即可考虑分布式事务。

支付

最经典的场景就是支付了,一笔支付,是对买家账户进行扣款,同时对卖家账户进行加钱,这些操作必须在一个事务里执行,要么全部成功,要么全部失败。而对于买家账户属于买家中心,对应的是买家数据库,而卖家账户属于卖家中心,对应的是卖家数据库,对不同数据库的操作必然需要引入分布式事务。

在线下单

买家在电商平台下单,往往会涉及到两个动作,一个是扣库存,第二个是更新订单状态,库存和订单一般属于不同的数据库,需要使用分布式事务保证数据一致性。

银行转账

账户 A 转账到账户 B,实际操作是账户 A 减去相应金额,账户 B 增加相应金额,在分库分表的前提下,账户 A 和账户 B 可能分别存储在不同的数据库中,这时需要使用分布式事务保证数据库一致性。否则可能导致的后果是 A 扣了钱 B 却没有增加钱,或者 B 增加了钱 A 却没有扣钱。

SpringBoot 集成 Atomikos 实现分布式事务

Atomikos 简介

Atomikos 是一个为 Java 平台提供增值服务的开源类事务管理器。

以下是包括在这个开源版本中的一些功能:

  • 全面崩溃 / 重启恢复
  • 兼容标准的 SUN 公司 JTA API;
  • 嵌套事务
  • 为 XA 和非 XA 提供内置的 JDBC 适配器。

注释:XA 协议由 Tuxedo 首先提出的,并交给 X/Open 组织,作为资源管理器(数据库)与事务管理器的接口标准。目前,Oracle、Informix、DB2 和 Sybase 等各大数据库厂家都提供对 XA 的支持。XA 协议采用两阶段提交方式来管理分布式事务。XA 接口提供资源管理器与事务管理器之间进行通信的标准接口。XA 协议包括两套函数,以 xa_ 开头的及以 ax_ 开头的。

具体实现

主要两步:添加jta-atomikos依赖、通过Atomikos创建XA事务的数据源

  1. 在本地创建两个数据库:test01,test02,并且创建相同的数据库表:

    create database if not exists test01;
    use test01;
    create table if not exists test_user (
      id   int auto_increment
      primary key,
      name char(20) not null,
      age  tinyint  not null
    );
    
    create database if not exists test02;
    use test02;
    create table if not exists test_user (
      id   int auto_increment
      primary key,
      name char(20) not null,
      age  tinyint  not null
    );
  2. 改造上篇的工程,在 pom.xml 增加以下依赖:

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-jta-atomikos</artifactId>
    </dependency>
    
    <dependency>
      <groupId>org.mybatis.spring.boot</groupId>
      <artifactId>mybatis-spring-boot-starter</artifactId>
      <version>1.1.1</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.40</version>
    </dependency>
  3. 修改配置文件 application.yml 如下:

    mysql:
      datasource:
        test1:
          url: jdbc:mysql://localhost:3306/test01?useUnicode=true&characterEncoding=utf-8
          username: root
          password: 1qaz2wsx
          minPoolSize: 3
          maxPoolSize: 25
          maxLifetime: 20000
          borrowConnectionTimeout: 30
          loginTimeout: 30
          maintenanceInterval: 60
          maxIdleTime: 60
          testQuery: select 1
        test2:
          url: jdbc:mysql://localhost:3306/test02?useUnicode=true&characterEncoding=utf-8
          username: root
          password: 1qaz2wsx
          minPoolSize: 3
          maxPoolSize: 25
          maxLifetime: 20000
          borrowConnectionTimeout: 30
          loginTimeout: 30
          maintenanceInterval: 60
          maxIdleTime: 60
          testQuery: select 1
  4. 创建以下类:

    • 创建配置类
    @Data
    public class DBConfig {
        private String url;
        private String username;
        private String password;
        private int minPoolSize;
        private int maxPoolSize;
        private int maxLifetime;
        private int borrowConnectionTimeout;
        private int loginTimeout;
        private int maintenanceInterval;
        private int maxIdleTime;
        private String testQuery;
    }
    @ConfigurationProperties(prefix = "mysql.datasource.test1")
    @SpringBootConfiguration
    public class DBConfig1 extends DBConfig {
    }
    • 通过Atomikos创建XA事务的数据源
    @SpringBootConfiguration
    @MapperScan(basePackages = "com.lynn.demo.test01", sqlSessionTemplateRef = "sqlSessionTemplate1")
    public class MyBatisConfig1 {
      // 配置数据源
      @Primary
      @Bean(name = "dataSource1")
      public DataSource dataSource(DBConfig1 testConfig) throws SQLException {
        MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
        mysqlXaDataSource.setUrl(testConfig.getUrl());
        mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
        mysqlXaDataSource.setPassword(testConfig.getPassword());
        mysqlXaDataSource.setUser(testConfig.getUsername());
        mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
    
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(mysqlXaDataSource);
        xaDataSource.setUniqueResourceName("testDataSource1");
        xaDataSource.setMinPoolSize(testConfig.getMinPoolSize());
        xaDataSource.setMaxPoolSize(testConfig.getMaxPoolSize());
        xaDataSource.setMaxLifetime(testConfig.getMaxLifetime());
        xaDataSource.setBorrowConnectionTimeout(testConfig.getBorrowConnectionTimeout());
        xaDataSource.setLoginTimeout(testConfig.getLoginTimeout());
        xaDataSource.setMaintenanceInterval(testConfig.getMaintenanceInterval());
        xaDataSource.setMaxIdleTime(testConfig.getMaxIdleTime());
        xaDataSource.setTestQuery(testConfig.getTestQuery());
        return xaDataSource;
      }
    
      @Primary
      @Bean(name = "sqlSessionFactory1")
      public SqlSessionFactory sqlSessionFactory(@Qualifier("dataSource1") DataSource dataSource) throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        return bean.getObject();
      }
    
      @Primary
      @Bean(name = "sqlSessionTemplate1")
      public SqlSessionTemplate sqlSessionTemplate(@Qualifier("sqlSessionFactory1") SqlSessionFactory sqlSessionFactory) throws Exception {
        return new SqlSessionTemplate(sqlSessionFactory);
      }
    }

    在 com.lynn.demo.test01 和 com.lynn.demo.test02 中分别创建以下 mapper:

    @Mapper
    public interface UserMapper1 {
        @Insert("insert into test_user(name, age) values(#{name}, #{age})")
        void addUser(@Param("name")String name, @Param("age") int age);
    }

    创建 service 类:

    @Service
    public class UserService {
      @Autowired
      private UserMapper1 userMapper1;
      @Autowired
      private UserMapper2 userMapper2;
    
      @Transactional
      public void addUser(User user) throws Exception {
        userMapper1.addUser(user.getName(), user.getAge());
        userMapper2.addUser(user.getName(), user.getAge());
      }
    }
  5. 创建单元测试类进行测试:

    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = Application.class)
    public class TestDB {
      @Autowired
      private UserService userService;
    
      @Test
      public void test() throws Exception {
        User user = new User();
        user.setName("lynn");
        user.setAge(10);
        userService.addUser(user);
      }
    }

    经过测试,如果没有报错,则数据被分别添加到两个数据库表中,如果有报错,则数据不会增加。


转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 tuyrk@qq.com

文章标题:14-第13课:分布式事务

文章字数:1.5k

本文作者:神秘的小岛岛

发布时间:2020-07-06, 00:55:00

最后更新:2020-07-14, 23:05:09

原始链接:https://www.tuyrk.cn/gitchat/springcloud-quickly/14-springcloud-atomikos/

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

目录
×

喜欢就点赞,疼爱就打赏