- 前言
- 第一部分 核心实现
- 第 1 章 Spring 整体架构和环境搭建
- 第 2 章 容器的基本实现
- 第 3 章 默认标签的解析
- 第 4 章 自定义标签的解析
- 第 5 章 bean 的加载
- 第 6 章 容器的功能扩展
- 第 7 章 AOP
- 第二部分 企业应用
- 第 8 章 数据库连接 JDBC
- 第 9 章 整合 MyBatis
- 第 10 章 事务
- 第 11 章 SpringMVC
- 第 12 章 远程服务
- 第 13 章 Spring 消息
10.3.1 创建事务
我们先分析事务创建的过程。
protected TransactionInfo createTransactionIfNecessary(
PlatformTransactionManager tm, TransactionAttribute txAttr, final String
joinpointIdentification) {
// If no name specified, apply method identification as transaction name.
//如果没有名称指定则使用方法唯一标识,并使用 DelegatingTransactionAttribute 封装 txAttr
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
//获取 TransactionStatus
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpoint
Identification +
"] because no transaction manager has been configured");
}
}
}
//根据指定的属性与 status 准备一个 TransactionInfo
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
对于 createTransactionIfNecessar 函数主要做了这样几件事情。
(1)使用 DelegatingTransactionAttribute 封装传入的 TransactionAttribute 实例。
对于传入的 TransactionAttribute 类型的参数 txAttr,当前的实际类型是 RuleBasedTransaction Attribute ,是由获取事务属性时生成,主要用于数据承载,而这里之所以使用 Delegating TransactionAttribute 进行封装,当然是提供了更多的功能。
(2)获取事务。
事务处理当然是以事务为核心,那么获取事务就是最重要的事情。
(3)构建事务信息。
根据之前几个步骤获取的信息构建 TransactionInfo 并返回。
我们分别对以上步骤进行详细的解析。
1.获取事务
Spring 中使用 getTransaction 来处理事务的准备工作,包括事务获取以及信息的构建。
public final TransactionStatus getTransaction(TransactionDefinition definition) throws
TransactionException {
Object transaction = doGetTransaction();
// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) {
// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}
//判断当前线程是否存在事务,判读依据为当前线程记录的连接不为空且连接中(connectionHolder) 中的
transactionActive 属性不为空
if (isExistingTransaction(transaction)) {
//当前线程已经存在事务
return handleExistingTransaction(definition, transaction, debugEnabled);
}
//事务超时设置验证
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout",
definition.getTimeout());
}
//如果当前线程不存在事务,但是 propagationBehavior 却被声明为 PROPAGATION_MANDATORY 抛
出异常
if (definition.getPropagationBehavior() == TransactionDefinition. PROPAGATION_
MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with
propagation 'mandatory'");
}else if (definition.getPropagationBehavior() == TransactionDefinition.
PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition. PROPAGATION_
REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
//PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED 都需要
新建事务
//空挂起
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.
getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() !=
SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled,
suspendedResources);
/*
* 构造 transaction,包括设置 ConnectionHolder、隔离级别、timout
* 如果是新连接,绑定到当前线程
*/
doBegin(transaction, definition);
//新同步事务的设置,针对于当前线程的设置
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException ex) {
resume(null, suspendedResources);
throw ex;
}
catch (Error err) {
resume(null, suspendedResources);
throw err;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially
synchronization.
boolean newSynchronization = (getTransactionSynchronization() ==
SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization,
debugEnabled, null);
}
}
当然,在 Spring 中每个复杂的功能实现,并不是一次完成的,而是会通过入口函数进行一个框架的搭建,初步构建完整的逻辑,而将实现细节分摊给不同的函数。那么,让我们看看事务的准备工作都包括哪些。
(1)获取事务
创建对应的事务实例,这里使用的是 DataSourceTransactionManager 中的 doGetTransaction 方法,创建基于 JDBC 的事务实例。如果当前线程中存在关于 dataSource 的连接,那么直接使用。这里有一个对保存点的设置,是否开启允许保存点取决于是否设置了允许嵌入式事务。
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
//如果当前线程已经记录数据库连接则使用原有连接
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
//false 表示非新创建连接。
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
(2)如果当先线程存在事务,则转向嵌套事务的处理。
(3)事务超时设置验证。
(4)事务 propagationBehavior 属性的设置验证。
(5)构建 DefaultTransactionStatus。
(6)完善 transaction,包括设置 ConnectionHolder、隔离级别、timeout,如果是新连接,则绑定到当前线程。
对于一些隔离级别、timeout 等功能的设置并不是由 Spring 来完成的,而是委托给底层的数据库连接去做的,而对于数据库连接的设置就是在 doBegin 函数中处理的。
/**
* 构造 transaction,包括设置 ConnectionHolder、隔离级别、timeout
* 如果是新连接,绑定到当前线程
*/
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
if (txObject.getConnectionHolder() == null ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.dataSource.getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
//设置隔离级别
Integer previousIsolationLevel = DataSourceUtils.prepareConnection
ForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
//更改自动提交设置,由 Spring 控制提交
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
//设置判断当前线程是否存在事务的依据
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the session holder to the thread.
if (txObject.isNewConnectionHolder()) {
//将当前获取到的连接绑定到当前线程
TransactionSynchronizationManager.bindResource(getDataSource(),
txObject.getConnectionHolder());
}
}
catch (Exception ex) {
DataSourceUtils.releaseConnection(con, this.dataSource);
throw new CannotCreateTransactionException("Could not open JDBC Connection
for transaction", ex);
}
}
可以说事务是从这个函数开始的,因为在这个函数中已经开始尝试了对数据库连接的获取,当然,在获取数据库连接的同时,一些必要的设置也是需要同步设置的。
n 尝试获取连接。
当然并不是每次都会获取新的连接,如果当前线程中的 connectionHolder 已经存在,则没有必要再次获取,或者,对于事务同步表示设置为 true 的需要重新获取连接。
o 设置隔离级别以及只读标识。
你是否有过这样的错觉?事务中的只读配置是 Spring 中做了一些处理呢?Spring 中确实是针对只读操作做了一些处理,但是核心的实现是设置 connection 上的 readOnly 属性。同样,对于隔离级别的控制也是交由 connection 去控制的。
p 更改默认的提交设置。
如果事务属性是自动提交,那么需要改变这种设置,而将提交操作委托给 Spring 来处理。
q 设置标志位,标识当前连接已经被事务激活。
r 设置过期时间。
s 将 connectionHolder 绑定到当前线程。
设置隔离级别的 prepareConnectionForTransaction 函数用于负责对底层数据库连接的设置,当然,只是包含只读标识和隔离级别的设置。由于强大的日志及异常处理,显得函数代码量比较大,但是单从业务角度去看,关键代码其实是不多的。
public static Integer prepareConnectionForTransaction(Connection con, Transaction Definition
definition)
throws SQLException {
Assert.notNull(con, "No Connection specified");
//设置数据连接的只读标识
if (definition != null && definition.isReadOnly()) {
try {
if (logger.isDebugEnabled()) {
logger.debug("Setting JDBC Connection [" + con + "] read-only");
}
con.setReadOnly(true);
}
catch (SQLException ex) {
Throwable exToCheck = ex;
while (exToCheck != null) {
if (exToCheck.getClass().getSimpleName().contains("Timeout")) {
// Assume it's a connection timeout that would otherwise get
lost: e.g. from JDBC 4.0
throw ex;
}
exToCheck = exToCheck.getCause();
}
logger.debug("Could not set JDBC Connection read-only", ex);
}
catch (RuntimeException ex) {
Throwable exToCheck = ex;
while (exToCheck != null) {
if (exToCheck.getClass().getSimpleName().contains("Timeout")) {
throw ex;
}
exToCheck = exToCheck.getCause();
}
logger.debug("Could not set JDBC Connection read-only", ex);
}
}
//设置数据库连接的隔离级别
Integer previousIsolationLevel = null;
if (definition != null && definition.getIsolationLevel() != Transaction
Definition.ISOLATION_DEFAULT) {
if (logger.isDebugEnabled()) {
logger.debug("Changing isolation level of JDBC Connection [" + con + "] to " +
definition.getIsolationLevel());
}
int currentIsolation = con.getTransactionIsolation();
if (currentIsolation != definition.getIsolationLevel()) {
previousIsolationLevel = currentIsolation;
con.setTransactionIsolation(definition.getIsolationLevel());
}
}
return previousIsolationLevel;
}
(7)将事务信息记录在当前线程中。
protected void prepareSynchronization(DefaultTransactionStatus status, Transaction
Definition definition) {
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.setActualTransactionActive(status.
hasTransaction());
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
(definition.getIsolationLevel() != TransactionDefinition.ISOLATION_
DEFAULT) ?
definition.getIsolationLevel() : null);
TransactionSynchronizationManager.setCurrentTransactionReadOnly (definition.
isReadOnly());
TransactionSynchronizationManager.setCurrentTransactionName(definition.
getName());
TransactionSynchronizationManager.initSynchronization();
}
}
2.处理已经存在的事务
之前讲述了普通事务建立的过程,但是 Spring 中支持多种事务的传播规则,比如 PROPAGATION_NESTED、PROPAGATION_REQUIRES_NEW 等,这些都是在已经存在事务的基础上进行进一步的处理,那么,对于已经存在的事务,准备操作是如何进行的呢?
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation
'never'");
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_
SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_
ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled,
suspendedResources);
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_
REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction
with name [" +
definition.getName() + "]");
}
//新事务的建立
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() !=
SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled,
suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
catch (Error beginErr) {
resumeAfterBeginException(transaction, suspendedResources, beginErr);
throw beginErr;
}
}
//嵌入式事务的处理
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_
NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by
default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" +
definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) {
//如果没有可以使用保存点的方式控制事务回滚,那么在嵌入式事务的建立初始建立保存点
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false,
false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}else {
//有些情况是不能使用保存点操作,比如 JTA,那么建立新事物
boolean newSynchronization = (getTransactionSynchronization() !=
SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition,transaction, true, newSynchronization,
debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.PROPAGATION_
REQUIRES_NEW) {
Integer currentIsolationLevel = TransactionSynchronizationManager.
getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel !=
definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction
with definition [" +
definition + "] specifies isolation level which is
incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel,
DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if
(TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction
with definition [" +
definition + "] is not marked as read-only but existing
transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_
NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization,
debugEnabled, null);
}
对于已经存在事务的处理过程中,我们看到了很多熟悉的操作,但是,也有些不同的地方,函数中对已经存在的事务处理考虑两种情况。
(1)PROPAGATION_REQUIRES_NEW 表示当前方法必须在它自己的事务里运行,一个新的事务将被启动,而如果有一个事务正在运行的话,则在这个方法运行期间被挂起。而 Spring 中对于此种传播方式的处理与新事务建立最大的不同点在于使用 suspend 方法将原事务挂起。将信息挂起的目的当然是为了在当前事务执行完毕后在将原事务还原。
(2)PROPAGATION_NESTED 表示如果当前正有一个事务在运行中,则该方法应该运行在一个嵌套的事务中,被嵌套的事务可以独立于封装事务进行提交或者回滚,如果封装事务不存在,行为就像 PROPAGATION_REQUIRES_NEW。对于嵌入式事务的处理,Spring 中主要考虑了两种方式的处理。
Spring 中允许嵌入事务的时候,则首选设置保存点的方式作为异常处理的回滚。
对于其他方式,比如 JTA 无法使用保存点的方式,那么处理方式与 PROPAGATION_REQUIRES_NEW 相同,而一旦出现异常,则由 Spring 的事务异常处理机制去完成后续操作。
对于挂起操作的主要目的是记录原有事务的状态,以便于后续操作对事务的恢复:
protected final SuspendedResourcesHolder suspend(Object transaction) throws Transaction
Exception {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<TransactionSynchronization> suspendedSynchronizations = doSuspend
Synchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
suspendedResources = doSuspend(transaction);
}
String name = TransactionSynchronizationManager. GetCurrent Transaction
Name();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransaction
ReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrent
TransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolation
Level(null);
boolean wasActive = TransactionSynchronizationManager.isActual
TransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly,
isolationLevel, wasActive);
}
catch (RuntimeException ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
catch (Error err) {
doResumeSynchronization(suspendedSynchronizations);
throw err;
}
}
else if (transaction != null) {
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}
else {
return null;
}
}
3.准备事务信息
当已经建立事务连接并完成了事务信息的提取后,我们需要将所有的事务信息统一记录在 TransactionInfo 类型的实例中,这个实例包含了目标方法开始前的所有状态信息,一旦事务执行失败,Spring 会通过 TransactionInfo 类型的实例中的信息来进行回滚等后续工作。
protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,
TransactionAttribute txAttr, String joinpointIdentification, Transaction
Status status) {
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
// We need a transaction for this method
if (logger.isTraceEnabled()) {
logger.trace("Getting transaction for [" + txInfo.getJoinpoint
Identification() + "]");
}
//记录事务状态
txInfo.newTransactionStatus(status);
}
else {
if (logger.isTraceEnabled())
logger.trace("Don't need to create transaction for [" + joinpoint
Identification +
"]: This method isn't transactional.");
}
txInfo.bindToThread();
return txInfo;
}
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论