返回介绍

10.3.1 创建事务

发布于 2025-04-22 22:09:16 字数 20490 浏览 0 评论 0 收藏

我们先分析事务创建的过程。

protected TransactionInfo createTransactionIfNecessary(

PlatformTransactionManager tm, TransactionAttribute txAttr, final String

joinpointIdentification) {

  // If no name specified, apply method identification as transaction name.

 //如果没有名称指定则使用方法唯一标识,并使用 DelegatingTransactionAttribute 封装 txAttr

  if (txAttr != null && txAttr.getName() == null) {

   txAttr = new DelegatingTransactionAttribute(txAttr) {

   @Override

    public String getName() {

     return joinpointIdentification;

   }

  };

 }

  TransactionStatus status = null;

  if (txAttr != null) {

   if (tm != null) {

   //获取 TransactionStatus

    status = tm.getTransaction(txAttr);

  }

   else {

    if (logger.isDebugEnabled()) {

     logger.debug("Skipping transactional joinpoint [" + joinpoint

     Identification +

     "] because no transaction manager has been configured");

   }

  }

 }

 //根据指定的属性与 status 准备一个 TransactionInfo

  return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);

}

对于 createTransactionIfNecessar 函数主要做了这样几件事情。

(1)使用 DelegatingTransactionAttribute 封装传入的 TransactionAttribute 实例。

对于传入的 TransactionAttribute 类型的参数 txAttr,当前的实际类型是 RuleBasedTransaction Attribute ,是由获取事务属性时生成,主要用于数据承载,而这里之所以使用 Delegating TransactionAttribute 进行封装,当然是提供了更多的功能。

(2)获取事务。

事务处理当然是以事务为核心,那么获取事务就是最重要的事情。

(3)构建事务信息。

根据之前几个步骤获取的信息构建 TransactionInfo 并返回。

我们分别对以上步骤进行详细的解析。

1.获取事务

Spring 中使用 getTransaction 来处理事务的准备工作,包括事务获取以及信息的构建。

public final TransactionStatus getTransaction(TransactionDefinition definition) throws

TransactionException {

  Object transaction = doGetTransaction();

  // Cache debug flag to avoid repeated checks.

  boolean debugEnabled = logger.isDebugEnabled();

  if (definition == null) {

   // Use defaults if no transaction definition given.

   definition = new DefaultTransactionDefinition();

 }

//判断当前线程是否存在事务,判读依据为当前线程记录的连接不为空且连接中(connectionHolder) 中的

transactionActive 属性不为空

  if (isExistingTransaction(transaction)) {

  //当前线程已经存在事务

   return handleExistingTransaction(definition, transaction, debugEnabled);

 }

 //事务超时设置验证

  if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {

   throw new InvalidTimeoutException("Invalid transaction timeout",

  definition.getTimeout());

 }

 //如果当前线程不存在事务,但是 propagationBehavior 却被声明为 PROPAGATION_MANDATORY 抛

 出异常

  if (definition.getPropagationBehavior() == TransactionDefinition. PROPAGATION_

 MANDATORY) {

   throw new IllegalTransactionStateException(

   "No existing transaction found for transaction marked with

   propagation 'mandatory'");

  }else if (definition.getPropagationBehavior() == TransactionDefinition.

  PROPAGATION_REQUIRED ||

  definition.getPropagationBehavior() == TransactionDefinition. PROPAGATION_

 REQUIRES_NEW ||

  definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {

  //PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED 都需要

  新建事务

  //空挂起

   SuspendedResourcesHolder suspendedResources = suspend(null);

   if (debugEnabled) {

    logger.debug("Creating new transaction with name [" + definition.

    getName() + "]: " + definition);

  }

   try {

    boolean newSynchronization = (getTransactionSynchronization() !=

   SYNCHRONIZATION_NEVER);

    DefaultTransactionStatus status = newTransactionStatus(

    definition, transaction, true, newSynchronization, debugEnabled,

   suspendedResources);

   /*

    * 构造 transaction,包括设置 ConnectionHolder、隔离级别、timout

    * 如果是新连接,绑定到当前线程

    */

   doBegin(transaction, definition);

   //新同步事务的设置,针对于当前线程的设置

   prepareSynchronization(status, definition);

    return status;

  }

   catch (RuntimeException ex) {

    resume(null, suspendedResources);

    throw ex;

  }

   catch (Error err) {

    resume(null, suspendedResources);

    throw err;

  }

 }

  else {

   // Create "empty" transaction: no actual transaction, but potentially

  synchronization.

   boolean newSynchronization = (getTransactionSynchronization() ==

  SYNCHRONIZATION_ALWAYS);

  return prepareTransactionStatus(definition, null, true, newSynchronization,

  debugEnabled, null);

 }

}

当然,在 Spring 中每个复杂的功能实现,并不是一次完成的,而是会通过入口函数进行一个框架的搭建,初步构建完整的逻辑,而将实现细节分摊给不同的函数。那么,让我们看看事务的准备工作都包括哪些。

(1)获取事务

创建对应的事务实例,这里使用的是 DataSourceTransactionManager 中的 doGetTransaction 方法,创建基于 JDBC 的事务实例。如果当前线程中存在关于 dataSource 的连接,那么直接使用。这里有一个对保存点的设置,是否开启允许保存点取决于是否设置了允许嵌入式事务。

protected Object doGetTransaction() {

  DataSourceTransactionObject txObject = new DataSourceTransactionObject();

 txObject.setSavepointAllowed(isNestedTransactionAllowed());

 //如果当前线程已经记录数据库连接则使用原有连接

  ConnectionHolder conHolder =

  (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);

  //false 表示非新创建连接。

  txObject.setConnectionHolder(conHolder, false);

  return txObject;

}

(2)如果当先线程存在事务,则转向嵌套事务的处理。

(3)事务超时设置验证。

(4)事务 propagationBehavior 属性的设置验证。

(5)构建 DefaultTransactionStatus。

(6)完善 transaction,包括设置 ConnectionHolder、隔离级别、timeout,如果是新连接,则绑定到当前线程。

对于一些隔离级别、timeout 等功能的设置并不是由 Spring 来完成的,而是委托给底层的数据库连接去做的,而对于数据库连接的设置就是在 doBegin 函数中处理的。

/**

 * 构造 transaction,包括设置 ConnectionHolder、隔离级别、timeout

 * 如果是新连接,绑定到当前线程

 */

@Override

protected void doBegin(Object transaction, TransactionDefinition definition) {

DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

  Connection con = null;

  try {

   if (txObject.getConnectionHolder() == null ||

   txObject.getConnectionHolder().isSynchronizedWithTransaction()) {

   Connection newCon = this.dataSource.getConnection();

   if (logger.isDebugEnabled()) {

    logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");

  }

   txObject.setConnectionHolder(new ConnectionHolder(newCon), true);

  }

  txObject.getConnectionHolder().setSynchronizedWithTransaction(true);

   con = txObject.getConnectionHolder().getConnection();

  //设置隔离级别

   Integer previousIsolationLevel = DataSourceUtils.prepareConnection

   ForTransaction(con, definition);

  txObject.setPreviousIsolationLevel(previousIsolationLevel);

  //更改自动提交设置,由 Spring 控制提交

   if (con.getAutoCommit()) {

   txObject.setMustRestoreAutoCommit(true);

    if (logger.isDebugEnabled()) {

     logger.debug("Switching JDBC Connection [" + con + "] to manual commit");

   }

   con.setAutoCommit(false);

  }

  //设置判断当前线程是否存在事务的依据

  txObject.getConnectionHolder().setTransactionActive(true);

   int timeout = determineTimeout(definition);

   if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {

   txObject.getConnectionHolder().setTimeoutInSeconds(timeout);

  }

   // Bind the session holder to the thread.

   if (txObject.isNewConnectionHolder()) {

   //将当前获取到的连接绑定到当前线程

   TransactionSynchronizationManager.bindResource(getDataSource(),

   txObject.getConnectionHolder());

  }

 }

  catch (Exception ex) {

   DataSourceUtils.releaseConnection(con, this.dataSource);

   throw new CannotCreateTransactionException("Could not open JDBC Connection

   for transaction", ex);

 }

}

可以说事务是从这个函数开始的,因为在这个函数中已经开始尝试了对数据库连接的获取,当然,在获取数据库连接的同时,一些必要的设置也是需要同步设置的。

n 尝试获取连接。

当然并不是每次都会获取新的连接,如果当前线程中的 connectionHolder 已经存在,则没有必要再次获取,或者,对于事务同步表示设置为 true 的需要重新获取连接。

o 设置隔离级别以及只读标识。

你是否有过这样的错觉?事务中的只读配置是 Spring 中做了一些处理呢?Spring 中确实是针对只读操作做了一些处理,但是核心的实现是设置 connection 上的 readOnly 属性。同样,对于隔离级别的控制也是交由 connection 去控制的。

p 更改默认的提交设置。

如果事务属性是自动提交,那么需要改变这种设置,而将提交操作委托给 Spring 来处理。

q 设置标志位,标识当前连接已经被事务激活。

r 设置过期时间。

s 将 connectionHolder 绑定到当前线程。

设置隔离级别的 prepareConnectionForTransaction 函数用于负责对底层数据库连接的设置,当然,只是包含只读标识和隔离级别的设置。由于强大的日志及异常处理,显得函数代码量比较大,但是单从业务角度去看,关键代码其实是不多的。

public static Integer prepareConnectionForTransaction(Connection con, Transaction Definition

definition)

  throws SQLException {

Assert.notNull(con, "No Connection specified");

//设置数据连接的只读标识

if (definition != null && definition.isReadOnly()) {

  try {

   if (logger.isDebugEnabled()) {

    logger.debug("Setting JDBC Connection [" + con + "] read-only");

  }

  con.setReadOnly(true);

 }

  catch (SQLException ex) {

   Throwable exToCheck = ex;

   while (exToCheck != null) {

    if (exToCheck.getClass().getSimpleName().contains("Timeout")) {

     // Assume it's a connection timeout that would otherwise get

     lost: e.g. from JDBC 4.0

     throw ex;

   }

    exToCheck = exToCheck.getCause();

  }

   logger.debug("Could not set JDBC Connection read-only", ex);

 }

  catch (RuntimeException ex) {

   Throwable exToCheck = ex;

   while (exToCheck != null) {

    if (exToCheck.getClass().getSimpleName().contains("Timeout")) {

     throw ex;

   }

    exToCheck = exToCheck.getCause();

  }

   logger.debug("Could not set JDBC Connection read-only", ex);

 }

}

//设置数据库连接的隔离级别

Integer previousIsolationLevel = null;

if (definition != null && definition.getIsolationLevel() != Transaction

Definition.ISOLATION_DEFAULT) {

  if (logger.isDebugEnabled()) {

   logger.debug("Changing isolation level of JDBC Connection [" + con + "] to " +

  definition.getIsolationLevel());

  }

   int currentIsolation = con.getTransactionIsolation();

   if (currentIsolation != definition.getIsolationLevel()) {

    previousIsolationLevel = currentIsolation;

   con.setTransactionIsolation(definition.getIsolationLevel());

  }

 }

  return previousIsolationLevel;

}

(7)将事务信息记录在当前线程中。

protected void prepareSynchronization(DefaultTransactionStatus status, Transaction

Definition definition) {

  if (status.isNewSynchronization()) {

  TransactionSynchronizationManager.setActualTransactionActive(status.

  hasTransaction());

  TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(

   (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_

  DEFAULT) ?

   definition.getIsolationLevel() : null);

   TransactionSynchronizationManager.setCurrentTransactionReadOnly (definition.

  isReadOnly());

  TransactionSynchronizationManager.setCurrentTransactionName(definition.

  getName());

  TransactionSynchronizationManager.initSynchronization();

 }

}

2.处理已经存在的事务

之前讲述了普通事务建立的过程,但是 Spring 中支持多种事务的传播规则,比如 PROPAGATION_NESTED、PROPAGATION_REQUIRES_NEW 等,这些都是在已经存在事务的基础上进行进一步的处理,那么,对于已经存在的事务,准备操作是如何进行的呢?

private TransactionStatus handleExistingTransaction(

TransactionDefinition definition, Object transaction, boolean debugEnabled)

throws TransactionException {

  if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {

   throw new IllegalTransactionStateException(

   "Existing transaction found for transaction marked with propagation

  'never'");

 }

  if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_

  SUPPORTED) {

   if (debugEnabled) {

    logger.debug("Suspending current transaction");

  }

   Object suspendedResources = suspend(transaction);

   boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_

  ALWAYS);

  return prepareTransactionStatus(

  definition, null, false, newSynchronization, debugEnabled,

 suspendedResources);

}

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_

REQUIRES_NEW) {

  if (debugEnabled) {

   logger.debug("Suspending current transaction, creating new transaction

   with name [" +

   definition.getName() + "]");

 }

 //新事务的建立

  SuspendedResourcesHolder suspendedResources = suspend(transaction);

  try {

   boolean newSynchronization = (getTransactionSynchronization() !=

  SYNCHRONIZATION_NEVER);

   DefaultTransactionStatus status = newTransactionStatus(

   definition, transaction, true, newSynchronization, debugEnabled,

  suspendedResources);

   doBegin(transaction, definition);

   prepareSynchronization(status, definition);

   return status;

 }

  catch (RuntimeException beginEx) {

   resumeAfterBeginException(transaction, suspendedResources, beginEx);

   throw beginEx;

 }

  catch (Error beginErr) {

   resumeAfterBeginException(transaction, suspendedResources, beginErr);

   throw beginErr;

 }

}

//嵌入式事务的处理

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_

NESTED) {

  if (!isNestedTransactionAllowed()) {

   throw new NestedTransactionNotSupportedException(

   "Transaction manager does not allow nested transactions by

   default - " +

   "specify 'nestedTransactionAllowed' property with value 'true'");

 }

  if (debugEnabled) {

   logger.debug("Creating nested transaction with name [" +

   definition.getName() + "]");

 }

  if (useSavepointForNestedTransaction()) {

  //如果没有可以使用保存点的方式控制事务回滚,那么在嵌入式事务的建立初始建立保存点

   DefaultTransactionStatus status =

   prepareTransactionStatus(definition, transaction, false,

   false, debugEnabled, null);

  status.createAndHoldSavepoint();

   return status;

  }else {

  //有些情况是不能使用保存点操作,比如 JTA,那么建立新事物

   boolean newSynchronization = (getTransactionSynchronization() !=

  SYNCHRONIZATION_NEVER);

   DefaultTransactionStatus status = newTransactionStatus(

  definition,transaction, true, newSynchronization,

   debugEnabled, null);

   doBegin(transaction, definition);

   prepareSynchronization(status, definition);

   return status;

 }

}

if (debugEnabled) {

  logger.debug("Participating in existing transaction");

}

if (isValidateExistingTransaction()) {

  if (definition.getIsolationLevel() != TransactionDefinition.PROPAGATION_

 REQUIRES_NEW) {

   Integer currentIsolationLevel = TransactionSynchronizationManager.

  getCurrentTransactionIsolationLevel();

   if (currentIsolationLevel == null || currentIsolationLevel !=

   definition.getIsolationLevel()) {

    Constants isoConstants = DefaultTransactionDefinition.constants;

    throw new IllegalTransactionStateException("Participating transaction

    with definition [" +

    definition + "] specifies isolation level which is

    incompatible with existing transaction: " +

    (currentIsolationLevel != null ?

   isoConstants.toCode(currentIsolationLevel,

    DefaultTransactionDefinition.PREFIX_ISOLATION) :

   "(unknown)"));

  }

 }

  if (!definition.isReadOnly()) {

  if

   (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {

    throw new IllegalTransactionStateException("Participating transaction

    with definition [" +

    definition + "] is not marked as read-only but existing

    transaction is");

  }

 }

 }

  boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_

 NEVER);

  return prepareTransactionStatus(definition, transaction, false, newSynchronization,

  debugEnabled, null);

}

对于已经存在事务的处理过程中,我们看到了很多熟悉的操作,但是,也有些不同的地方,函数中对已经存在的事务处理考虑两种情况。

(1)PROPAGATION_REQUIRES_NEW 表示当前方法必须在它自己的事务里运行,一个新的事务将被启动,而如果有一个事务正在运行的话,则在这个方法运行期间被挂起。而 Spring 中对于此种传播方式的处理与新事务建立最大的不同点在于使用 suspend 方法将原事务挂起。将信息挂起的目的当然是为了在当前事务执行完毕后在将原事务还原。

(2)PROPAGATION_NESTED 表示如果当前正有一个事务在运行中,则该方法应该运行在一个嵌套的事务中,被嵌套的事务可以独立于封装事务进行提交或者回滚,如果封装事务不存在,行为就像 PROPAGATION_REQUIRES_NEW。对于嵌入式事务的处理,Spring 中主要考虑了两种方式的处理。

Spring 中允许嵌入事务的时候,则首选设置保存点的方式作为异常处理的回滚。

对于其他方式,比如 JTA 无法使用保存点的方式,那么处理方式与 PROPAGATION_REQUIRES_NEW 相同,而一旦出现异常,则由 Spring 的事务异常处理机制去完成后续操作。

对于挂起操作的主要目的是记录原有事务的状态,以便于后续操作对事务的恢复:

protected final SuspendedResourcesHolder suspend(Object transaction) throws Transaction

Exception {

  if (TransactionSynchronizationManager.isSynchronizationActive()) {

   List<TransactionSynchronization> suspendedSynchronizations = doSuspend

  Synchronization();

   try {

    Object suspendedResources = null;

    if (transaction != null) {

     suspendedResources = doSuspend(transaction);

   }

    String name = TransactionSynchronizationManager. GetCurrent Transaction

   Name();

   TransactionSynchronizationManager.setCurrentTransactionName(null);

    boolean readOnly = TransactionSynchronizationManager.isCurrentTransaction

   ReadOnly();

   TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);

    Integer isolationLevel = TransactionSynchronizationManager.getCurrent

   TransactionIsolationLevel();

   TransactionSynchronizationManager.setCurrentTransactionIsolation

   Level(null);

    boolean wasActive = TransactionSynchronizationManager.isActual

   TransactionActive();

   TransactionSynchronizationManager.setActualTransactionActive(false);

    return new SuspendedResourcesHolder(

    suspendedResources, suspendedSynchronizations, name, readOnly,

    isolationLevel, wasActive);

  }

   catch (RuntimeException ex) {

    // doSuspend failed - original transaction is still active...

   doResumeSynchronization(suspendedSynchronizations);

    throw ex;

  }

   catch (Error err) {

   doResumeSynchronization(suspendedSynchronizations);

    throw err;

  }

 }

  else if (transaction != null) {

   Object suspendedResources = doSuspend(transaction);

   return new SuspendedResourcesHolder(suspendedResources);

 }

  else {

   return null;

 }

}

3.准备事务信息

当已经建立事务连接并完成了事务信息的提取后,我们需要将所有的事务信息统一记录在 TransactionInfo 类型的实例中,这个实例包含了目标方法开始前的所有状态信息,一旦事务执行失败,Spring 会通过 TransactionInfo 类型的实例中的信息来进行回滚等后续工作。

protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,

TransactionAttribute txAttr, String joinpointIdentification, Transaction

Status status) {

  TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);

  if (txAttr != null) {

   // We need a transaction for this method

   if (logger.isTraceEnabled()) {

    logger.trace("Getting transaction for [" + txInfo.getJoinpoint

    Identification() + "]");

  }

  //记录事务状态

  txInfo.newTransactionStatus(status);

 }

  else {

   if (logger.isTraceEnabled())

   logger.trace("Don't need to create transaction for [" + joinpoint

   Identification +

   "]: This method isn't transactional.");

 }

 txInfo.bindToThread();

  return txInfo;

}

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。