重拾-Spring Transaction

tangxiong0 2019-07-01

问题

面试中是不是有时经常会被问到 “Spring 事务如何管理的了解吗?” ,“Spring 事务的传播性有哪些,能聊聊它们的使用场景吗?”, “事务回滚的时候是所有异常下都会回滚吗?”; 下面我们就带着这些问题来看看 Spring 事务是如何实现的吧。

实现分析

首先我们还是先通过一个使用示例,先看下 Spring 事务是如何工作的。

使用示例

本文我们先采用 TransactionProxyFactoryBean 配置的方式来看下, Spring 事务如何实现
<beans>
    <!-- 配置数据源 -->
    <bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" >
        <property name="driverClassName">
            <value>com.mysql.jdbc.Driver</value>
        </property>
        <property name="url">
            <value>url</value>
        </property>
        <property name="username">
            <value>username</value>
        </property>
        <property name="password">
            <value>password</value>
        </property>
    </bean>

    <!-- 配置事务管理 -->
    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource">
            <ref bean="dataSource"></ref>
        </property>
    </bean>

    <!-- 配置 jdbcTemplate -->
    <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource">
            <ref bean="dataSource"/>
        </property>
    </bean>

    <bean id="userServiceTarget" class="org.springframework.transaction.UserServiceImpl">
        <property name="jdbcTemplate">
            <ref bean="jdbcTemplate"></ref>
        </property>
    </bean>

    <!--
        TransactionProxyFactoryBean 实现了接口 InitializingBean,在初始化过程中会调用 afterPropertiesSet
        1 : 创建事务拦截器
        2 : 创建事务 advisor (事务的拦截切面)
        3 : 创建代理
    -->
    <bean id="userService" class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean">
        <property name="transactionManager">
            <ref bean="transactionManager" />
        </property>

        <property name="target">
            <ref bean="userServiceTarget"></ref>
        </property>

        <property name="proxyInterfaces">
            <value>org.springframework.transaction.UserService</value>
        </property>
        <!--
            配置事务属性 传播行为, 事务隔离级别, 是否只读, 回滚规则(哪些异常下执行回滚),key 为配置需要事务管理的方法名;
            在代理目标执行的时候会通过该属性判断方法是否需要事务管理
        -->
        <property name="transactionAttributes">
            <props>
                <prop key="*">PROPAGATION_REQUIRED</prop>
            </props>
        </property>
    </bean>
</beans>

TransactionProxyFactoryBean 的属性配置中如果您对 transactionAttributes 属性不熟悉的话,是不是会感觉一头雾水呢? 这个玩意怎么配置的? 配置格式又是什么样的呢? 配置值有哪些呢 ?; 下面将会通过对 TransactionProxyFactoryBean 的源码分析来一一解答。

源码分析

类结构

重拾-Spring Transaction

TransactionProxyFactoryBean 类结构我们知道,其实现了接口 InitializingBeanFactoryBean; 那么也就是在 TransactionProxyFactoryBean 实例化后会调用方法 afterPropertiesSet, 在获取目标对象实例时会调用方法 getObject; 下面将主要看下这两个方法的实现。

afterPropertiesSet-创建目标代理对象

public void afterPropertiesSet() throws AopConfigException {
    // 校验 Target 目标对象
    if (this.target == null) {
        throw new AopConfigException("Target must be set");
    }

    // 校验事务属性定义,从抛出的异常信息可以看出 Spring 在此做了强校验;
    // 也就是说如果没有需要 Spring 事务管理的方法,就不要采用 TransactionProxyFactoryBean 了
    // 那么 transactionAttributeSource 是怎么来的呢? 见下文分析
    if (this.transactionAttributeSource == null) {
        throw new AopConfigException("Either 'transactionAttributeSource' or 'transactionAttributes' is required: " +
                                                                 "If there are no transactional methods, don't use a transactional proxy.");
    }

    // 创建事务拦截器 transactionInterceptor
    TransactionInterceptor transactionInterceptor = new TransactionInterceptor();
    transactionInterceptor.setTransactionManager(this.transactionManager);
    transactionInterceptor.setTransactionAttributeSource(this.transactionAttributeSource);
    transactionInterceptor.afterPropertiesSet();

    ProxyFactory proxyFactory = new ProxyFactory();

    // 是否配置了前置拦截
    if (this.preInterceptors != null) {
        for (int i = 0; i < this.preInterceptors.length; i++) {
            proxyFactory.addAdvisor(GlobalAdvisorAdapterRegistry.getInstance().wrap(this.preInterceptors[i]));
        }
    }

    if (this.pointcut != null) {
        // 如果配置了 pointcut 切入点,则按配置的 pointcut 创建 advisor
        Advisor advice = new DefaultPointcutAdvisor(this.pointcut, transactionInterceptor);
        proxyFactory.addAdvisor(advice);
    }
    else {
        // rely on default pointcut
        // 创建事务拦截切面 advisor
        proxyFactory.addAdvisor(new TransactionAttributeSourceAdvisor(transactionInterceptor));
        // could just do the following, but it's usually less efficient because of AOP advice chain caching
        // proxyFactory.addInterceptor(transactionInterceptor);
    }

    // 是否配置了后置拦截
    if (this.postInterceptors != null) {
        for (int i = 0; i < this.postInterceptors.length; i++) {
            proxyFactory.addAdvisor(GlobalAdvisorAdapterRegistry.getInstance().wrap(this.postInterceptors[i]));
        }
    }

    proxyFactory.copyFrom(this);

    proxyFactory.setTargetSource(createTargetSource(this.target));
    // 设置代理的接口
    if (this.proxyInterfaces != null) {
        proxyFactory.setInterfaces(this.proxyInterfaces);
    }
    else if (!getProxyTargetClass()) {
        // rely on AOP infrastructure to tell us what interfaces to proxy
        proxyFactory.setInterfaces(AopUtils.getAllInterfaces(this.target));
    }
    // 创建目标对象的代理对象
    this.proxy = proxyFactory.getProxy();
}

从源码中我们知道 afterPropertiesSet 主要做以下几件事:

  • 参数有效性校验; 校验目标对象,事务属性定义
  • 设置代理的 advisor chain, 包括用户自定义的前置拦截, 内置的事务拦截器,用户自定义的后置拦截
  • 创建目标代理对象
afterPropertiesSet 的实现中有个针对 transactionAttributeSource 的非空校验,那么这个变量是何时赋值的呢 ? 还记得使用示例中的关于事务属性的定义 transactionAttributes 吗 ?

setTransactionAttributes-设置事务属性定义

public void setTransactionAttributes(Properties transactionAttributes) {
    NameMatchTransactionAttributeSource tas = new NameMatchTransactionAttributeSource();
    tas.setProperties(transactionAttributes);
    this.transactionAttributeSource = tas;
}

public void setProperties(Properties transactionAttributes) {
    TransactionAttributeEditor tae = new TransactionAttributeEditor();
    // 遍历 properties
    for (Iterator it = transactionAttributes.keySet().iterator(); it.hasNext(); ) {
        // key 为匹配的方法名
        String methodName = (String) it.next();
        String value = transactionAttributes.getProperty(methodName);
        // 解析 value
        tae.setAsText(value);
        TransactionAttribute attr = (TransactionAttribute) tae.getValue();
        // 将方法名与事务属性定义匹配关联
        addTransactionalMethod(methodName, attr);
    }
}

下面我们就看下 setAsText 方法是如何解析事务属性的配置

/**
 * Format is PROPAGATION_NAME,ISOLATION_NAME,readOnly,+Exception1,-Exception2.
 * Null or the empty string means that the method is non transactional.
 * @see java.beans.PropertyEditor#setAsText(java.lang.String)
 */
public void setAsText(String s) throws IllegalArgumentException {
    if (s == null || "".equals(s)) {
        setValue(null);
    }
    else {
        // tokenize it with ","
        // 按 , 分割配置信息
        String[] tokens = StringUtils.commaDelimitedListToStringArray(s);
        RuleBasedTransactionAttribute attr = new RuleBasedTransactionAttribute();

        for (int i = 0; i < tokens.length; i++) {
            String token = tokens[i];
            // 以 PROPAGATION 开头,则配置事务传播性
            if (token.startsWith(TransactionDefinition.PROPAGATION_CONSTANT_PREFIX)) {
                attr.setPropagationBehaviorName(tokens[i]);
            }
            // 以 ISOLATION 开头,则配置事务隔离级别
            else if (token.startsWith(TransactionDefinition.ISOLATION_CONSTANT_PREFIX)) {
                attr.setIsolationLevelName(tokens[i]);
            }
            // 以 timeout_ 开头,则设置事务超时时间
            else if (token.startsWith(DefaultTransactionAttribute.TIMEOUT_PREFIX)) {
                String value = token.substring(DefaultTransactionAttribute.TIMEOUT_PREFIX.length());
                attr.setTimeout(Integer.parseInt(value));
            }
            // 若等于 readOnly 则配置事务只读
            else if (token.equals(DefaultTransactionAttribute.READ_ONLY_MARKER)) {
                attr.setReadOnly(true);
            }
            // 以 + 开头,则配置哪些异常下不回滚
            else if (token.startsWith(DefaultTransactionAttribute.COMMIT_RULE_PREFIX)) {
                attr.getRollbackRules().add(new NoRollbackRuleAttribute(token.substring(1)));
            }
            // 以 - 开头,则配置哪些异常下回滚
            else if (token.startsWith(DefaultTransactionAttribute.ROLLBACK_RULE_PREFIX)) {
                attr.getRollbackRules().add(new RollbackRuleAttribute(token.substring(1)));
            }
            else {
                throw new IllegalArgumentException("Illegal transaction token: " + token);
            }
        }

        setValue(attr);
    }
}

setAsText 方法的实现我们就可以搞明白在配置文件中 transactionAttributes 如何配置了,譬如:

<property name="transactionAttributes">
    <props>
        <prop key="*">PROPAGATION_REQUIRED, ISOLATION_DEFAULT, readOnly</prop>
    </props>
</property>

也可以这样配置:

<property name="transactionAttributes">
    <props>
        <prop key="*">readOnly, ISOLATION_DEFAULT, PROPAGATION_REQUIRED</prop>
    </props>
</property>

也就是说 transactionAttributes 的配置只要保证 token 格式正确即可,顺序无关;但是从规范来讲建议还是保持 PROPAGATION_NAME,ISOLATION_NAME,readOnly,+Exception1,-Exception2. 的格式。

getObject-获取代理对象

public Object getObject() {
    // proxy 对象在 afterPropertiesSet 方法执行时产生
    return this.proxy;
}

代理执行

是否支持事务

重拾-Spring AOP 中我们知道,当代理对象在执行的时候会先获取当前方法所匹配的 advisor (参见类 JdkDynamicAopProxy); 而 TransactionProxyFactoryBean 在创建代理对象的时候会将 TransactionInterceptor 绑定到 TransactionAttributeSourceAdvisor 上,那么我就看下 TransactionAttributeSourceAdvisor 是如何匹配方法的。

public class TransactionAttributeSourceAdvisor extends StaticMethodMatcherPointcutAdvisor {

    private TransactionAttributeSource transactionAttributeSource;

    public TransactionAttributeSourceAdvisor(TransactionInterceptor ti) {
        super(ti);
        if (ti.getTransactionAttributeSource() == null) {
            throw new AopConfigException("Cannot construct a TransactionAttributeSourceAdvisor using a " +
                                                                     "TransactionInterceptor that has no TransactionAttributeSource configured");
        }
        this.transactionAttributeSource = ti.getTransactionAttributeSource();
    }

    public boolean matches(Method m, Class targetClass) {
        return (this.transactionAttributeSource.getTransactionAttribute(m, targetClass) != null);
    }
}

TransactionAttributeSourceAdvisor 判断方法是否匹配时,实际是由 NameMatchTransactionAttributeSource 的方法 getTransactionAttribute 来处理。

public TransactionAttribute getTransactionAttribute(Method method, Class targetClass) {
    // 获取目标方法名
    String methodName = method.getName();
    // 获取目标方法匹配的事务属性定义
    TransactionAttribute attr = (TransactionAttribute) this.nameMap.get(methodName);
    // 如果 attr 不为空说明当前方法配置了事务属性定义,也就是当前方法需要事务管理
    if (attr != null) {
        return attr;
    }
    else {
        // look up most specific name match
        String bestNameMatch = null;
        for (Iterator it = this.nameMap.keySet().iterator(); it.hasNext();) {
            // 判断当前方法是否匹配通配符的方式
            String mappedName = (String) it.next();
            if (isMatch(methodName, mappedName) &&
                    (bestNameMatch == null || bestNameMatch.length() <= mappedName.length())) {
                attr = (TransactionAttribute) this.nameMap.get(mappedName);
                bestNameMatch = mappedName;
            }
        }
        return attr;
    }
}

protected boolean isMatch(String methodName, String mappedName) {
    return (mappedName.endsWith("*") && methodName.startsWith(mappedName.substring(0, mappedName.length() - 1))) ||
            (mappedName.startsWith("*") && methodName.endsWith(mappedName.substring(1, mappedName.length())));
}
TransactionInterceptor-事务拦截

在完成判断当前方法是否需要事务管理后,如果需要事务管理最终会调用 TransactionInterceptor 执行事务拦截的处理。

public final Object invoke(MethodInvocation invocation) throws Throwable {
    Class targetClass = (invocation.getThis() != null) ? invocation.getThis().getClass() : null;
    // if the transaction attribute is null, the method is non-transactional
    // 获取当前方法所支持的事务配置属性,若不存在则说明当前方法不需要事务管理
    TransactionAttribute transAtt = this.transactionAttributeSource.getTransactionAttribute(invocation.getMethod(), targetClass);
    TransactionStatus status = null;
    TransactionStatus oldTransactionStatus = null;
    
    // create transaction if necessary
    if (transAtt != null) {        
        // the transaction manager will flag an error if an incompatible tx already exists
        // 通过事务管理获取事务,该事务可能是新创建的也可能是当前上下文已存在的事务
        // 返回事务状态
        status = this.transactionManager.getTransaction(transAtt);
        
        // make the TransactionStatus available to callees
        oldTransactionStatus = (TransactionStatus) currentTransactionStatus.get();
        currentTransactionStatus.set(status);
    }
    else {
        // it isn't a transactional method
        
    }

    Object retVal = null;
    try {
        // 目标方法执行
        retVal = invocation.proceed();
    }
    catch (Throwable ex) {
        // target invocation exception
        if (status != null) {
            // 异常处理 可能会执行事务的回滚
            onThrowable(invocation, transAtt, status, ex);
        }
        throw ex;
    }
    finally {
        if (transAtt != null) {
            // use stack to restore old transaction status if one was set
            currentTransactionStatus.set(oldTransactionStatus);
        }
    }
    if (status != null) {
        // 通过事务管理执行事务提交
        this.transactionManager.commit(status);
    }
    return retVal;
}
private void onThrowable(MethodInvocation invocation, TransactionAttribute txAtt,
                             TransactionStatus status, Throwable ex) {
    // 判断异常是否需要回滚
    if (txAtt.rollbackOn(ex)) {
        try {
            // 通过事务管理执行回滚
            this.transactionManager.rollback(status);
        }
        catch (TransactionException tex) {
            logger.error("Application exception overridden by rollback exception", ex);
            throw tex;
        }
    }
    else {
        // Will still roll back if rollbackOnly is true
        // 异常不需要回滚的话 则提交事务
        this.transactionManager.commit(status);
    }
}

TransactionInterceptor 的处理逻辑来看,我们知道其主要做以下事情:

  • 获取当前方法所定义的事务属性
  • 通过事务管理器 Transaction Manager 来获取事务
  • 目标方法执行
  • 执行异常处理,如异常需要回滚则通过事务管理器执行事务 rollback,反之执行事务 commit
  • 方法执行成功则执行事务 commit
也就是说 TransactionInterceptor (事务拦截器) 主要是将事务相关的动作委托给 TransactionManager (事务管理器)处理
TransactionManager-事务管理
本文是以 DataSourceTransactionManager 为例来分析事务的管理实现
getTransaction-获取事务
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
    // 获取事务
    Object transaction = doGetTransaction();
    if (definition == null) {
        // 若 definition == null 则采用默认的事务定义
        definition = new DefaultTransactionDefinition();
    }

    // 判断当前上下文是否开启过事务
    if (isExistingTransaction(transaction)) {
        // 当前上下文开启过事务
        // 如果当前方法匹配的事务传播性为 PROPAGATION_NEVER 说明不需要事务则抛出异常
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
            throw new IllegalTransactionStateException("Transaction propagation 'never' but existing transaction found");
        }

        // 如果当前方法匹配的事务传播性为 PROPAGATION_NOT_SUPPORTED 说明该方法不应该运行在事务中,则将当前事务挂起
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
            // 将当前事务挂起
            Object suspendedResources = suspend(transaction);
            boolean newSynchronization = (this.transactionSynchronization == SYNCHRONIZATION_ALWAYS);
            // 返回的事务状态为 不需要事务
            return newTransactionStatus(null, false, newSynchronization,
                                        definition.isReadOnly(), debugEnabled, suspendedResources);
        }
        // 如果当前方法匹配的事务传播性为 PROPAGATION_REQUIRES_NEW 表示当前方法必须运行在它自己的事务中;将已存在的事务挂起,重新开启事务
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
            if (debugEnabled) {
                logger.debug("Creating new transaction, suspending current one");
            }
            // 挂起当前事务
            Object suspendedResources = suspend(transaction);
            // 重新开启个事务
            doBegin(transaction, definition);
            boolean newSynchronization = (this.transactionSynchronization != SYNCHRONIZATION_NEVER);
            // 返回的事务状态为 新建事务
            return newTransactionStatus(transaction, true, newSynchronization,
                                        definition.isReadOnly(), debugEnabled, suspendedResources);
        }
        else {
            boolean newSynchronization = (this.transactionSynchronization != SYNCHRONIZATION_NEVER);
            // 其他的传播行为 表示在已存在的事务中执行
            return newTransactionStatus(transaction, false, newSynchronization,
                                        definition.isReadOnly(), debugEnabled, null);
        }
    }

    if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
    }

    // 如果传播性为 PROPAGATION_MANDATORY 说明必须在事务中执行,若当前没有事务的话则抛出异常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException("Transaction propagation 'mandatory' but no existing transaction found");
    }

    // 当前上下文不存在事务
    // 若传播性为 PROPAGATION_REQUIRED 或 PROPAGATION_REQUIRES_NEW 则开启新的事务执行
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
        definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        // 开启新的 connection 并取消自动提交,将 connection 绑定当前线程
        doBegin(transaction, definition);
        boolean newSynchronization = (this.transactionSynchronization != SYNCHRONIZATION_NEVER);
        return newTransactionStatus(transaction, true, newSynchronization,
                                    definition.isReadOnly(), debugEnabled, null);
    }
    else {
        // "empty" (-> no) transaction
        boolean newSynchronization = (this.transactionSynchronization == SYNCHRONIZATION_ALWAYS);
        // 返回事务状态为 不需要事务
        return newTransactionStatus(null, false, newSynchronization,
                                    definition.isReadOnly(), debugEnabled, null);
    }
}

protected Object doGetTransaction() {
    // 判断当前线程是否开启过事务
    if (TransactionSynchronizationManager.hasResource(this.dataSource)) {
        // 获取当前已存在的 connectoin holder
        ConnectionHolder holder = (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
        return new DataSourceTransactionObject(holder);
    }
    else {
        return new DataSourceTransactionObject();
    }
}

看到了这里,是不是突然明白 PROPAGATION (事务传播性) 是干什么的了;

简单来说, PROPAGATION 就是为了告诉 Spring 当前方法需不需要事务,是在已存在的事务中执行,还是新开启事务执行;也可以认为是继承上个方法栈的事务,还是拥有自己的事务。

TransactionManager 获取事务的过程实际就是通过当前方法定义的 PROPAGATION (事务传播性) 和当前上下文是否存在事务来判断是否需要事务,是否需要开启新的事务或者是使用当前已存在的事务。

下面看下如何开启新的事务 doBegin

protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

    // cache to avoid repeated checks
    boolean debugEnabled = logger.isDebugEnabled();

    // 判断 connection holder 是否为空
    // 两种场景下可能为空:
    // 1. 上下文不存在事务的时候
    // 2. 上下文已存在的事务被挂起的时候
    if (txObject.getConnectionHolder() == null) {
        if (debugEnabled) {
            logger.debug("Opening new connection for JDBC transaction");
        }
        // 开启新的 connection
        Connection con = DataSourceUtils.getConnection(this.dataSource, false);
        txObject.setConnectionHolder(new ConnectionHolder(con));
    }

    Connection con = txObject.getConnectionHolder().getConnection();
    try {
        // apply read-only
        if (definition.isReadOnly()) {
            try {
                // 如果定义了只读,设置 connection 为只读
                con.setReadOnly(true);
            }
            catch (Exception ex) {
                // SQLException or UnsupportedOperationException
                logger.warn("Could not set JDBC connection read-only", ex);
            }
        }

        // apply isolation level
        // 设置事务隔离级别
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
            txObject.setPreviousIsolationLevel(new Integer(con.getTransactionIsolation()));
            con.setTransactionIsolation(definition.getIsolationLevel());
        }

        // 若 connection 为自动提交则取消
        if (con.getAutoCommit()) {
            txObject.setMustRestoreAutoCommit(true);
            con.setAutoCommit(false);
        }

        // 设置超时时间
        if (definition.getTimeout() != TransactionDefinition.TIMEOUT_DEFAULT) {
            txObject.getConnectionHolder().setTimeoutInSeconds(definition.getTimeout());
        }

        // 将当前 connection holder 绑定到当前上下文
        TransactionSynchronizationManager.bindResource(this.dataSource, txObject.getConnectionHolder());
    }
    catch (SQLException ex) {
        throw new CannotCreateTransactionException("Could not configure connection", ex);
    }
}

doBegin 执行开启事务的操作,在上下文不存在事务或者上下文事务被挂起的时候会新打开一个 connection, 并按照事务定义设置相关属性,譬如是否只读,取消自动提交,设置事务隔离级别,设置超时时间;最后会将 connection 绑定到当前上下文,也即当前线程。

doSuspend-事务挂起
protected Object doSuspend(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    // 将当前事务的 connection holder 置为空
    txObject.setConnectionHolder(null);
    // 并将当前事务与上下文解绑
    return TransactionSynchronizationManager.unbindResource(this.dataSource);
}

事务挂起既是将当前事务的连接持有者清空并与当前上下文解绑,保证后续能够重新开启事务。

数据库操作
针对数据库的操作,本文以 Spring 提供的 jdbcTemplate 工具类进行分析。
public Object execute(final StatementCallback action) {
    // 若当前需要事务管理的话,那么此时获取的 connection 则是 transaction manager bind 的 connection
    // 这样就保证数据库操作的时候所获得的的 connection 与 事务管理的一致
    Connection con = DataSourceUtils.getConnection(getDataSource());
    Statement stmt = null;
    // 以下代码省略 此处重点关注如何获取 connection
}
public static Connection getConnection(DataSource ds, boolean allowSynchronization)
        throws CannotGetJdbcConnectionException {
    // 从当前上下文获取 connection holder
    ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(ds);
    if (conHolder != null) {
        return conHolder.getConnection();
    }
    else {
        try {
            // 反之新打开一个 connection
            Connection con = ds.getConnection();
            if (allowSynchronization && TransactionSynchronizationManager.isSynchronizationActive()) {
                logger.debug("Registering transaction synchronization for JDBC connection");
                // use same Connection for further JDBC actions within the transaction
                // thread object will get removed by synchronization at transaction completion
                conHolder = new ConnectionHolder(con);
                TransactionSynchronizationManager.bindResource(ds, conHolder);
                TransactionSynchronizationManager.registerSynchronization(new ConnectionSynchronization(conHolder, ds));
            }
            return con;
        }
        catch (SQLException ex) {
            throw new CannotGetJdbcConnectionException("Could not get JDBC connection", ex);
        }
    }
}

从上述代码我们可以看到,当通过 jdbcTemplate 操作数据库时会先从当前上下文中获取 connection; 这样就保证了所获取的事务与事务拦截器的事务为同一个实例,也就是将事务交给了 Spring 来管理。

commit-事务提交
public final void commit(TransactionStatus status) throws TransactionException {
    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    // 省略
    else {
        try {
            try {
                triggerBeforeCommit(defStatus);
                triggerBeforeCompletion(defStatus);
                if (status.isNewTransaction()) {
                    logger.info("Initiating transaction commit");
                    // 执行事务提交
                    doCommit(defStatus);
                }
            }
            // 省略
        }
        finally {
            cleanupAfterCompletion(defStatus);
        }
    }
}

doCommit 执行事务提交

protected void doCommit(DefaultTransactionStatus status) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
    if (status.isDebug()) {
        logger.debug("Committing JDBC transaction [" + txObject.getConnectionHolder().getConnection() + "]");
    }
    try {
        // 事务提交
        txObject.getConnectionHolder().getConnection().commit();
    }
    catch (SQLException ex) {
        throw new TransactionSystemException("Could not commit", ex);
    }
}
resume-事务恢复

从上文的 commit 事务提交操作发现,在完成事务提交之后,还有个后置动作 cleanupAfterCompletion, 该方法会对挂起中的事务执行恢复操作。

private void cleanupAfterCompletion(DefaultTransactionStatus status) {
    if (status.isNewSynchronization()) {
        TransactionSynchronizationManager.clearSynchronization();
    }
    if (status.isNewTransaction()) {
        doCleanupAfterCompletion(status.getTransaction());
    }
    // 当存在挂起的事务时,执行恢复挂起的事务
    if (status.getSuspendedResources() != null) {
        if (status.isDebug()) {
            logger.debug("Resuming suspended transaction");
        }
        resume(status.getTransaction(), status.getSuspendedResources());
    }
}
protected void doResume(Object transaction, Object suspendedResources) {
    // 将挂起的事务绑定的 connection 重新绑定到当前上下文
    ConnectionHolder conHolder = (ConnectionHolder) suspendedResources;
    TransactionSynchronizationManager.bindResource(this.dataSource, conHolder);
}

事务的 resume 就是将挂起的事务重新绑定到当前上下文中。

rollback-事务回滚

TransactionInterceptor 调用目标方法执行出现异常的时候,会进行异常处理执行方法 onThrowable

private void onThrowable(MethodInvocation invocation, TransactionAttribute txAtt,
                         TransactionStatus status, Throwable ex) {
    if (txAtt.rollbackOn(ex)) {
        try {
            // 异常需要回滚
            this.transactionManager.rollback(status);
        }
        catch (TransactionException tex) {
            throw tex;
        }
    }
    else {
        // 异常不需要回滚的话 则提交事务
        this.transactionManager.commit(status);
    }
}

onThrowable 方法会通过配置判断当前异常是否需要回滚。

public final void rollback(TransactionStatus status) throws TransactionException {
    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    try {
        try {
            triggerBeforeCompletion(defStatus);
            if (status.isNewTransaction()) {
                // 执行事务回滚
                logger.info("Initiating transaction rollback");
                doRollback(defStatus);
            }
            else if (defStatus.getTransaction() != null) {
                if (defStatus.isDebug()) {
                    logger.debug("Setting existing transaction rollback-only");
                }
                doSetRollbackOnly(defStatus);
            }
            else {
                logger.info("Should roll back transaction but cannot - no transaction available");
            }
        }
        catch (TransactionException ex) {
            triggerAfterCompletion(defStatus, TransactionSynchronization.STATUS_UNKNOWN, ex);
            throw ex;
        }
        triggerAfterCompletion(defStatus, TransactionSynchronization.STATUS_ROLLED_BACK, null);
    }
    finally {
        cleanupAfterCompletion(defStatus);
    }
}

protected void doRollback(DefaultTransactionStatus status) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
    try {
        // 执行回滚
        txObject.getConnectionHolder().getConnection().rollback();
    }
    catch (SQLException ex) {
        throw new TransactionSystemException("Could not rollback", ex);
    }
}

小结

此时我们基本明白了 Spring Transaction 的实现原理,下面对其实现做个小结:

  • Spring Transaction 是基于 Spring AOP 的一种实现
  • Spring Transaction 通过配置创建事务 advisor 并创建目标对象代理类
  • 目标方法执行时将会被 TransactionInterceptor 拦截
  • TransactionInterceptor 会委派 TransactionManager 执行事务的创建,事务提交,事务回滚的动作
  • TransactionManager 会根据当前方法配置的事务传播性及当前上下文是否存在事务来判断是否新建事务
  • TransactionManager 当新建事务时会将事务绑定到当前上下文,以保证目标方法执行时获取的事务为同一实例
  • TransactionManager 执行事务挂起时会将当前事务与当前上下文解除绑定关系
  • TransactionManager 执行事务恢复时会将已挂起的事务重新与当前上下文绑定

重拾-Spring Transaction

相关推荐

kyle00 / 0评论 2020-05-07