Spring06-事务源码

事物概念解析

什么是事务

​ 事务是逻辑上的一组执行单元,要么都执行,要么都不执行.

事务的特性(ACID)

image-20210711230028299

什么是ACID

ACID是指数据库管理系统DBMS中事物所具有四个特性

  1. ​ 原子性:Automicity

    操作不能背分割,要么都成功,要么都失败,若事务出错了,那么事务就会回滚。

  2. ​ 一致性:Consistency

    数据库一直处于一致的状态,事务开始前是一个一致状态,结束后是另一个一致状态

  3. ​ 隔离性:Isolation

    一个事务的影响在该事务提交前对其它事务是不可见的

  4. ​ 持久性:Durablility

    若事务已经提交了,那么就回在数据库中永久的保存下来

Spring事务三大接口介绍

PlatformTransactionManager:事务管理器

​ Spring并不直接管理事务,而是提供了多种事务管理器 ,他们将事务管理的职责委托给Hibernate或者JTA等持久化机制所提供的相关平台框架的事务来实现。Spring事务管理器的接口是:
​ org.springframework.transaction.PlatformTransactionManager ,
​ 通过这个接口,Spring为各个平台如JDBC、Hibernate等都提供了对应的事务管理器,但是具体的实现就是各个平台自己的事情了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface PlatformTransactionManager {
/**
*获取事物状态
*/
TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException;
/**
*事物提交
*/
void commit(TransactionStatus status) throws TransactionException;
/**
*事物回滚
*/
void rollback(TransactionStatus status) throws TransactionException;
}

TransactionDefinition:事物属性的定义

​ 事务定义信息(事务隔离级别、传播行为、超时、只读、回滚)

​ org.springframework.transaction.TransactionDefinition

​ TransactionDefinition接口中定义了5个方法以及一些表示事务属性的常量比如隔离级别、传播行为等等的常量。下面只是列出了TransactionDefinition接口中的方法而没有给出接口中定义的常量,该接口中的常量信息会在后面依次介绍到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public interface TransactionDefinition {

/**
* 支持当前事物,若当前没有事物就创建一个事物
* */
int PROPAGATION_REQUIRED = 0;

/**
* 如果当前存在事务,则加入该事务;如果当前没有事务,则以非事务的方式继续运行
* */
int PROPAGATION_SUPPORTS = 1;

/**
*如果当前存在事务,则加入该事务;如果当前没有事务,则抛出异常
*/
int PROPAGATION_MANDATORY = 2;

/**
*创建一个新的事务,如果当前存在事务,则把当前事务挂起
**/
int PROPAGATION_REQUIRES_NEW = 3;

/**
* 以非事务方式运行,如果当前存在事务,则把当前事务挂起
* */
int PROPAGATION_NOT_SUPPORTED = 4;

/**
* 以非事务方式运行,如果当前存在事务,则抛出异常。
* */
int PROPAGATION_NEVER = 5;

/**
* 表示如果当前正有一个事务在运行中,则该方法应该运行在 一个嵌套的事务中,
被嵌套的事务可以独立于封装事务进行提交或者回滚(保存点),
如果封装事务不存在,行为就像 PROPAGATION_REQUIRES NEW
* */
int PROPAGATION_NESTED = 6;


/**
*使用后端数据库默认的隔离级别,Mysql 默认采用的 REPEATABLE_READ隔离级别 Oracle 默认采用的 READ_COMMITTED隔离级别
*/
int ISOLATION_DEFAULT = -1;

/**
*最低的隔离级别,允许读取尚未提交的数据变更,可能会导致脏读、幻读或不可重复读
*/
int ISOLATION_READ_UNCOMMITTED = Connection.TRANSACTION_READ_UNCOMMITTED;

/**
*允许读取并发事务已经提交的数据,可以阻止脏读,但是幻读或不可重复读仍有可能发生
*/
int ISOLATION_READ_COMMITTED = Connection.TRANSACTION_READ_COMMITTED;

/**
*对同一字段的多次读取结果都是一致的,除非数据是被本身事务自己所修改,可以阻止脏读和不可重复读,但幻读仍有可能发生
*/
int ISOLATION_REPEATABLE_READ = Connection.TRANSACTION_REPEATABLE_READ;

/**
*最高的隔离级别,完全服从ACID的隔离级别。所有的事务依次逐个执行,这样事务之间就完全不可能产生干扰,
* 也就是说,该级别可以防止脏读、不可重复读以及幻读。但是这将严重影响程序的性能通常情况下也不会用到该级别
*/
int ISOLATION_SERIALIZABLE = Connection.TRANSACTION_SERIALIZABLE;


/**
*使用默认的超时时间
*/
int TIMEOUT_DEFAULT = -1;


/**
*获取事物的传播行为
*/
int getPropagationBehavior();

/**
*获取事物的隔离级别
*/
int getIsolationLevel();

/**
*返回事物的超时时间
*/
int getTimeout();

/**
*返回当前是否为只读事物
*/
boolean isReadOnly();

/**
*获取事物的名称
*/
@Nullable
String getName();

}

image-20210712001245156

TransactionStatus:事务运行状态

​ TransactionStatus接口用来记录事务的状态,该接口定义了一组方法,用来获取或判断事务的相应状态信息.

​ PlatformTransactionManager.getTransaction(…) 方法返回一个 TransactionStatus 对象。返回的TransactionStatus 对象可能代表一个新的或已经存在的事务(如果在当前调用堆栈有一个符合条件的事物

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public interface TransactionStatus extends SavepointManager, Flushable {

/**
* 是否为新事物
* */
boolean isNewTransaction();

/**
*是否有保存点
*/
boolean hasSavepoint();

/**
*设置为只回滚
*/
void setRollbackOnly();

/**
*是否为只回滚
*/
boolean isRollbackOnly();

/**
*属性
*/
@Override
void flush();

/**
*判断当前事物是否已经完成
*/
boolean isCompleted();

}

@EnableTransactionManagement

注入了哪些组件

​ 1、注解导入了TransactionManagementConfigurationSelector

​ 2、TransactionManagementConfigurationSelector导入了

​ 1)AutoProxyRegistrar:导入了 InfrastructureAdvisorAutoProxyCreator

​ 2)ProxyTransactionManagementConfiguration

​ BeanFactoryTransactionAttributeSourceAdvisor

​ TransactionAttributeSource

​ TransactionInterceptor

image-20210712105211686

因此,EnableTransactionManagement,主要帮我们导入了(默认)4个组件

  1. InfrastructureAdvisorAutoProxyCreator
    1. 注意,如果开启了@EnableAspectJAutoProxy,则实际上使用的还是AnnotationAwareAspectJAutoProxyCreator(优先级高)
    2. image-20210712152123224
  2. TransactionAttributeSource 事务属性源
  3. TransactionInterceptor 事务拦截器
  4. BeanFactoryTransactionAttributeSourceAdvisor 事务属性源工厂**==增强器==**

InfrastructureAdvisorAutoProxyCreator

类结构图如下,发现与@EnableAspectJAutoProxy导入的:AnnotationAwareAspectJAutoProxyCreator类似

image-20210712105627407

结合具体源码得到流程图如下:发现结构和也和AnnotationAwareAspectJAutoProxyCreator类似

image-20210712110900830

因此Spring事务注解的启动原理与AOP类似

事务源码解析

Spring事务方法调用了流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
new AnnotationConfigApplicationContext
refresh()
// 创建剩余的组件
finishBeanFactoryInitialization(beanFactory);
// 第一次运行时缓存所有@Aspect的增强器且 每次都寻找容器中的Advisor
AbstractAutoProxyCreator.postProcessBeforeInstantiation()
shouskip()
//找到符合条件的增强器
findEligibleAdvisors
//找到候选的增强器
findCandidateAdvisors()
//从第一步中获取能用的增强器
findAdvisorsThatCanApply()
getTransactionAttributeSource()
computeTransactionAttribute()
//查找@Transational注解
findTransactionAttribute
1、实现类的目标方法上找
2、实现类上找
3、原方法上找
4、原类上找
// 初始化后,判断是否存在增强器,存在则创建动态代理对象
AbstractAutoProxyCreator.postProcessAfterInitialization()

事务代码运行调用流程

以下是某个事务方法执行大致过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//动态代理+责任链+递归 
JdkDynamicAopProxy.invoke->MethodInterceptor.invoke()
->ReflectiveMethodInvocation.proceed()
->TransactionInterceptor.invoke()
->invokeWithinTransaction
//获取事务属性源
getTransactionAttributeSource();
try{
//在事务执行
invocation.proceedWithInvocation();
}catch{
//可回滚事务
completeTransactionAfterThrowing(txInfo, ex);
}
//提交事务
commitTransactionAfterReturning(txInfo);


实例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
public interface PayService {
void pay(String accountId,double money);
void updateProductStore(Integer productId);
}

@Component
public class PayServiceImpl implements PayService {
@Autowired
private AccountInfoDao accountInfoDao;
@Autowired
private ProductInfoDao productInfoDao;

@Transactional
@Override
public void pay(String accountId, double money) {
//查询余额
double blance = accountInfoDao.qryBlanceByUserId(accountId);
//余额不足正常逻辑
if(new BigDecimal(blance).compareTo(new BigDecimal(money))<0) {
throw new RuntimeException("余额不足");
}
((PayService) AopContext.currentProxy()).updateProductStore(1);
//更新余额
int retVal = accountInfoDao.updateAccountBlance(accountId,money);
System.out.println(1/0);
}

@Transactional(propagation =Propagation.REQUIRES_NEW)
@Override
public void updateProductStore(Integer productId) {
try{
productInfoDao.updateProductInfo(productId);
}
catch (Exception e) {
throw new RuntimeException("内部异常");
}
}
}

@Configuration
@EnableAspectJAutoProxy
@EnableTransactionManagement
@ComponentScan(basePackages = {"com.study.tx"})
public class TxConfig {
@Bean
public DataSource dataSource() {
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUsername("root");
dataSource.setPassword("123456");
dataSource.setUrl("jdbc:mysql://localhost:3306/spring-study");
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
return dataSource;
}

@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}

@Bean
public PlatformTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
}

public class TxMain {
public static void main(String[] args) {
AnnotationConfigApplicationContext ioc = new AnnotationConfigApplicationContext(TxConfig.class);
String[] names = ioc.getBeanDefinitionNames();
for (String name : names) {
System.out.println(name);
}
PayService payService = ioc.getBean(PayService.class);
payService.pay("123456789",10);
}
}

@Configuration
@EnableAspectJAutoProxy
@EnableTransactionManagement
@ComponentScan(basePackages = {"com.study.tx"})
public class TxConfig {
@Bean
public DataSource dataSource() {
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUsername("root");
dataSource.setPassword("123456");
dataSource.setUrl("jdbc:mysql://localhost:3306/spring-study");
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
return dataSource;
}

@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}

@Bean
public PlatformTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
}

如代码所示

​ payservice.pay() 隔离级别是 Propagation.REQUIRED,加入事务:如果不存在事务,新建事务,存在事务则加入该事务

​ payService.updateProductStore() 隔离级别是 Propagation.REQUIRES_NEW 新建事务:不管是否存在事务,新建一个事务

具体执行过程如下

事物流程简图

穷举所有情况:

事物代理对象调用流程图

事务提交后执行的原理

案例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public interface VoidSupplier {
/**
* Gets a result.
*
* @throws Exception
*/
void get();
}

@Slf4j
public class ThreadPoolUtil {

/**
* 线程池execute
*
* @param threadPoolName
* @param supplier
*/
public static void execute(String threadPoolName, VoidSupplier supplier) {
ThreadPoolTaskExecutor executor = SpringUtil.getBean(threadPoolName, ThreadPoolTaskExecutor.class);
executor.execute(() -> {
try {
supplier.get();
} catch (Throwable e) {
log.error("Unexpected error occurred invoking thread pool execute", e);
}
});
}
}

@Slf4j
@Component
public class TransactionalUtil {
@Resource
private DataSource dataSource;

private static DataSource MY_DATA_SOURCE;

@PostConstruct
public void init() {
MY_DATA_SOURCE = dataSource;
}

/**
*
* @param supplier 执行任务
*/
public static void afterCommit(VoidSupplier supplier) throws Exception {
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
supplier.get();
return;
}

TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
//本线程跑supplier
supplier.get();
}
});
}



/**
* 事务提交后异步操作
* 可指定线程池
*
* @param threadPoolName 线程池名称
* @param supplier 执行任务
*/
public static void asyncAfterCommit(String threadPoolName,VoidSupplier supplier) {
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
ThreadPoolUtil.execute(threadPoolName, supplier);
return;
}
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
//另起一个线程跑supplier
ThreadPoolUtil.execute(threadPoolName, supplier);
}
});
}
}

其中关键代码TransactionalUtil.afterCommit的原理是什么,我们来一步一步分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
	/**
* 1. 如果当前线程没有开启事务,将立即进行操作
* 2. 事务提交后,在当前线程执行任务操作
* @param supplier 执行任务
*/
public static void afterCommit(VoidSupplier supplier) throws Exception {
//1、判断当前线程是否开启了事务
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
supplier.get();
return;
}

TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
//2. 事务提交后,在当前线程执行任务操作
@Override
public void afterCommit() {
supplier.get();
}
});
}

public static boolean isSynchronizationActive() {
return (synchronizations.get() != null);
}

第1处标注的原理是,以payservice.pay 举例,方法开启了事务,则在执行代理对象执行时

  • invokeWithinTransaction
    • createTransactionIfNecessary
      • status = tm.getTransaction(txAttr);
        • doGetTransaction
          • isExistingTransaction(transaction)
          • suspend(null)(挂机事务)由于不存在事务,不需要挂起当前的
          • newTransactionStatus创建一个事务状态
            • doBegin
            • prepareSynchronization
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
if (status.isNewSynchronization()) {
//激活事务
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
//事务隔离级别
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
//判断是否为只读
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly())
//事务名称
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
//初始化同步:就是在这里初始化事务同步回调接口
TransactionSynchronizationManager.initSynchronization();
}
}

public static void initSynchronization() throws IllegalStateException {
if (isSynchronizationActive()) {
throw new IllegalStateException("Cannot activate transaction synchronization - already active");
}
logger.trace("Initializing transaction synchronization");
synchronizations.set(new LinkedHashSet<>());
}

第2处标注的原理是

invokeWithinTransaction

  • createTransactionIfNecessary
  • retVal = invocation.proceedWithInvocation();
  • commitTransactionAfterReturning(txInfo); 目标方法执行成功,触发事务提交
    • txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
      • processCommit(defStatus);
        • triggerAfterCommit(status);
          • TransactionSynchronizationUtils.triggerAfterCommit();
            • invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());
        • cleanupAfterCompletion(status);
1
2
3
4
5
6
7
8
public static void invokeAfterCommit(@Nullable List<TransactionSynchronization> synchronizations) {
if (synchronizations != null) {
for (TransactionSynchronization synchronization : synchronizations) {
//触发执行我们手动新增的TransactionSynchronizationAdaptor的afterCommit
synchronization.afterCommit();
}
}
}

小坑:最好不要在TransactionSynchronization.afterCommit方法里执行DML操作,如果在事务提交后抛出异常,这个时候对于

DataSourceTransactionManager
cleanupAfterCompletion(status) 会把连接设置为自动提交,导致DML操作回滚不了(原事务已提交,afterCommit 方法报错,afterCommit 的数据操作回滚不了)

JPASourceTransactionManager

​ 正常回滚

最佳时间:

​ 1、尽量不在aftercommit里做事务操作

​ 2、如果有,也是现实声明为 @Transactional(propagation =Propagation.REQUIRES_NEW),详细看PayServiceImpl中updateProductStoreInNewTx,updateProductStore这两个方法,前者事务正常回滚,后者不行

​ 3、在aftercommit异步执行业务