作者:京東零售 范錫軍
1、引言
spring的spring-tx模塊提供了對事務管理支持,使用spring事務可以讓我們從復雜的事務處理中得到解脫,無需要去處理獲得連接、關閉連接、事務提交和回滾等這些操作。
spring事務有編程式事務和聲明式事務兩種實現方式。編程式事務是通過編寫代碼來管理事務的提交、回滾、以及事務的邊界。這意味著開發者需要在代碼中顯式地調用事務的開始、提交和回滾。聲明式事務是通過配置來管理事務,您可以使用注解或XML配置來定義事務的邊界和屬性,而無需顯式編寫事務管理的代碼。
下面我們逐步分析spring源代碼,理解spring事務的實現原理。
2、編程式事務
2.1 使用示例
// transactionManager是某一個具體的PlatformTransactionManager實現類的對象 private PlatformTransactionManager transactionManager; // 定義事務屬性 DefaultTransactionDefinition def = new DefaultTransactionDefinition(); // 獲取事務 TransactionStatus status = transactionManager.getTransaction(def); try { // 執行數據庫操作 // ... // 提交事務 transactionManager.commit(status); } catch (Exception ex) { // 回滾事務 transactionManager.rollback(status); }
在使用編程式事務處理的過程中,利用 DefaultTransactionDefinition 對象來持有事務處理屬性。同時,在創建事務的過程中得到一個 TransactionStatus 對象,然后通過直接調用 transactionManager 對象 的 commit() 和 rollback()方法 來完成事務處理。
2.2 PlatformTransactionManager核心接口
?
PlatformTransactionManager是Spring事務管理的核心接口,通過 PlatformTransactionManager 接口設計了一系列與事務處理息息相關的接口方法,如 getTransaction()、commit()、rollback() 這些和事務處理相關的統一接口。對于這些接口的實現,很大一部分是由 AbstractTransactionManager 抽象類來完成的。
AbstractPlatformManager 封裝了 Spring 事務處理中通用的處理部分,比如事務的創建、提交、回滾,事務狀態和信息的處理,與線程的綁定等,有了這些通用處理的支持,對于具體的事務管理器而言,它們只需要處理和具體數據源相關的組件設置就可以了,比如在DataSourceTransactionManager中,就只需要配置好和DataSource事務處理相關的接口以及相關的設置。
2.3 事務的創建
PlatformTransactionManager的getTransaction()方法,封裝了底層事務的創建,并生成一個 TransactionStatus對象。AbstractPlatformTransactionManager提供了創建事務的模板,這個模板會被具體的事務處理器所使用。從下面的代碼中可以看到,AbstractPlatformTransactionManager會根據事務屬性配置和當前進程綁定的事務信息,對事務是否需要創建,怎樣創建 進行一些通用的處理,然后把事務創建的底層工作交給具體的事務處理器完成,如:DataSourceTransactionManager、HibernateTransactionManager。
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults()); Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); if (isExistingTransaction(transaction)) { return handleExistingTransaction(def, transaction, debugEnabled); } if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout()); } if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def); } try { return startTransaction(def, transaction, false, debugEnabled, suspendedResources); } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } } else { if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + def); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null); } } private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) { boolean newSynchronization = this.getTransactionSynchronization() != SYNCHRONIZATION_NEVER; DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); this.doBegin(transaction, definition); this.prepareSynchronization(status, definition); return status; }
事務創建的結果是生成一個TransactionStatus對象,通過這個對象來保存事務處理需要的基本信息,TransactionStatus的創建過程如下:
protected DefaultTransactionStatus newTransactionStatus(TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) { boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive(); return new DefaultTransactionStatus(transaction, newTransaction, actualNewSynchronization, definition.isReadOnly(), debug, suspendedResources); }
以上是創建一個全新事務的過程,還有另一種情況是:在創建當前事務時,線程中已經有事務存在了。這種情況會涉及事務傳播行為的處理。spring中七種事務傳播行為如下:
事務傳播行為類型 | 說明 |
PROPAGATION_REQUIRED | 如果當前沒有事務,就新建一個事務,如果已經存在一個事務中,加入到這個事務中。這是最常見的選擇。 |
PROPAGATION_SUPPORTS | 支持當前事務,如果當前沒有事務,就以非事務方式執行。 |
PROPAGATION_MANDATORY | 使用當前的事務,如果當前沒有事務,就拋出異常。 |
PROPAGATION_REQUIRES_NEW | 新建事務,如果當前存在事務,把當前事務掛起。 |
PROPAGATION_NOT_SUPPORTED | 以非事務方式執行操作,如果當前存在事務,就把當前事務掛起。 |
PROPAGATION_NEVER | 以非事務方式執行,如果當前存在事務,則拋出異常。 |
PROPAGATION_NESTED | 如果當前存在事務,則在嵌套事務內執行。如果當前沒有事務,則執行與PROPAGATION_REQUIRED類似的操作。 |
如果檢測到已存在事務,handleExistingTransaction()方法將根據不同的事務傳播行為類型執行相應邏輯。
PROPAGATION_NEVER
即當前方法需要在非事務的環境下執行,如果有事務存在,那么拋出異常。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); }
PROPAGATION_NOT_SUPPORTED
與前者的區別在于,如果有事務存在,那么將事務掛起,而不是拋出異常。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); }
PROPAGATION_REQUIRES_NEW
新建事務,如果當前存在事務,把當前事務掛起。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { SuspendedResourcesHolder suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; }
PROPAGATION_NESTED
開始一個 "嵌套的" 事務, 它是已經存在事務的一個真正的子事務. 嵌套事務開始執行時, 它將取得一個 savepoint. 如果這個嵌套事務失敗, 我們將回滾到此 savepoint. 嵌套事務是外部事務的一部分, 只有外部事務結束后它才會被提交。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (useSavepointForNestedTransaction()) { DefaultTransactionStatus status = newTransactionStatus( definition, transaction, false, false, true, debugEnabled, null); this.transactionExecutionListeners.forEach(listener -> listener.beforeBegin(status)); try { status.createAndHoldSavepoint(); } catch (RuntimeException | Error ex) { this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, ex)); throw ex; } this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, null)); return status; } else { return startTransaction(definition, transaction, true, debugEnabled, null); } }
2.4 事務掛起
事務掛起在AbstractTransactionManager.suspend()中處理,該方法內部將調用具體事務管理器的doSuspend()方法。以DataSourceTransactionManager為例,將ConnectionHolder設為null,因為一個ConnectionHolder對象就代表了一個數據庫連接,將ConnectionHolder設為null就意味著我們下次要使用連接時,將重新從連接池獲取。
protected Object doSuspend(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; txObject.setConnectionHolder(null); return TransactionSynchronizationManager.unbindResource(obtainDataSource()); }
unbindResource()方法最終會調用TransactionSynchronizationManager.doUnbindResource()方法,該方法將移除當前線程與事務對象的綁定。
private static Object doUnbindResource(Object actualKey) { Map map = resources.get(); if (map == null) { return null; } Object value = map.remove(actualKey); if (map.isEmpty()) { resources.remove(); } if (value instanceof ResourceHolder resourceHolder && resourceHolder.isVoid()) { value = null; } return value; }
而被掛起的事務的各種狀態最終會保存在TransactionStatus對象中。
2.5 事務提交&回滾
主要是對jdbc的封裝、源碼邏輯較清晰,不展開細說。
?
3、聲明式事務
其底層建立在 AOP 的基礎之上,對方法前后進行攔截,然后在目標方法開始之前創建或者加入一個事務,在執行完目標方法之后根據執行情況提交或者回滾事務。通過聲明式事物,無需在業務邏輯代碼中摻雜事務管理的代碼,只需在配置文件中做相關的事務規則聲明(或通過等價的基于標注的方式),便可以將事務規則應用到業務邏輯中。
3.1 使用示例
配置:
/bean?> /bean?>
代碼:
@Transactional public void addOrder() { // 執行數據庫操作 }
3.2 自定義標簽解析
先從配置文件開始入手,找到處理annotation-driven標簽的類TxNamespaceHandler。TxNamespaceHandler實現了NamespaceHandler接口,定義了如何解析和處理自定義XML標簽。
@Override public void init() { registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser()); registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser()); registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser()); }
AnnotationDrivenBeanDefinitionParser里的parse()方法,對XML標簽annotation-driven進行解析。
@Override public BeanDefinition parse(Element element, ParserContext parserContext) { registerTransactionalEventListenerFactory(parserContext); String mode = element.getAttribute("mode"); if ("aspectj".equals(mode)) { // mode="aspectj" registerTransactionAspect(element, parserContext); if (ClassUtils.isPresent("jakarta.transaction.Transactional", getClass().getClassLoader())) { registerJtaTransactionAspect(element, parserContext); } } else { // mode="proxy" AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext); } return null; }
以默認mode配置為例,執行configureAutoProxyCreator()方法,將在Spring容器中注冊了3個bean:
BeanFactoryTransactionAttributeSourceAdvisor、TransactionInterceptor、AnnotationTransactionAttributeSource。同時會將TransactionInterceptor的BeanName傳入到Advisor中,然后將AnnotationTransactionAttributeSource這個Bean注入到Advisor中。之后動態代理的時候會使用這個Advisor去尋找每個Bean是否需要動態代理。
// Create the TransactionAttributeSourceAdvisor definition. RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class); advisorDef.setSource(eleSource); advisorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); advisorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName)); advisorDef.getPropertyValues().add("adviceBeanName", interceptorName); if (element.hasAttribute("order")) { advisorDef.getPropertyValues().add("order", element.getAttribute("order")); } parserContext.getRegistry().registerBeanDefinition(txAdvisorBeanName, advisorDef); CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(), eleSource); compositeDef.addNestedComponent(new BeanComponentDefinition(sourceDef, sourceName)); compositeDef.addNestedComponent(new BeanComponentDefinition(interceptorDef, interceptorName)); compositeDef.addNestedComponent(new BeanComponentDefinition(advisorDef, txAdvisorBeanName)); parserContext.registerComponent(compositeDef);
3.3 Advisor
回顧AOP用法,Advisor可用于定義一個切面,它包含切點(Pointcut)和通知(Advice),用于在特定的連接點上執行特定的操作。spring事務實現了一個Advisor: BeanFactoryTransactionAttributeSourceAdvisor。
public class BeanFactoryTransactionAttributeSourceAdvisor extends AbstractBeanFactoryPointcutAdvisor { private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut(); public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) { this.pointcut.setTransactionAttributeSource(transactionAttributeSource); } public void setClassFilter(ClassFilter classFilter) { this.pointcut.setClassFilter(classFilter); } @Override public Pointcut getPointcut() { return this.pointcut; } }
BeanFactoryTransactionAttributeSourceAdvisor其實是一個PointcutAdvisor,是否匹配到切入點取決于Pointcut。Pointcut的核心在于其ClassFilter和MethodMatcher。
ClassFilter:
TransactionAttributeSourcePointcut內部私有類 TransactionAttributeSourceClassFilter,實現了Spring框架中的ClassFilter接口。在matches方法中,它首先檢查傳入的類clazz 否為TransactionalProxy、TransactionManager或PersistenceExceptionTranslator的子類,如果不是,則獲取當前的 TransactionAttributeSource 并檢查其是否允許該類作為候選類。
private class TransactionAttributeSourceClassFilter implements ClassFilter { @Override public boolean matches(Class??> clazz) { if (TransactionalProxy.class.isAssignableFrom(clazz) || TransactionManager.class.isAssignableFrom(clazz) || PersistenceExceptionTranslator.class.isAssignableFrom(clazz)) { return false; } return (transactionAttributeSource == null || transactionAttributeSource.isCandidateClass(clazz)); } }
MethodMatcher:
TransactionAttributeSourcePointcut.matches:
@Override public boolean matches(Method method, Class??> targetClass) { return (this.transactionAttributeSource == null || this.transactionAttributeSource.getTransactionAttribute(method, targetClass) != null); }
getTransactionAttribute()方法最終會調用至AbstractFallbackTransactionAttributeSource.computeTransactionAttribute()方法,該方法將先去方法上查找是否有相應的事務注解(比如@Transactional),如果沒有,那么再去類上查找。
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class??> targetClass) { // Don't allow non-public methods, as configured. if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) { return null; } // The method may be on an interface, but we need attributes from the target class. // If the target class is null, the method will be unchanged. Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass); // First try is the method in the target class. TransactionAttribute txAttr = findTransactionAttribute(specificMethod); if (txAttr != null) { return txAttr; } // Second try is the transaction attribute on the target class. txAttr = findTransactionAttribute(specificMethod.getDeclaringClass()); if (txAttr != null && ClassUtils.isUserLevelMethod(method)) { return txAttr; } if (specificMethod != method) { // Fallback is to look at the original method. txAttr = findTransactionAttribute(method); if (txAttr != null) { return txAttr; } // Last fallback is the class of the original method. txAttr = findTransactionAttribute(method.getDeclaringClass()); if (txAttr != null && ClassUtils.isUserLevelMethod(method)) { return txAttr; } } return null; }
3.4 TransactionInterceptor
TransactionInterceptor是spring事務提供的AOP攔截器,實現了AOP Alliance的MethodInterceptor接口,是一種通知(advice)。其可以用于在方法調用前后進行事務管理。
@Override @Nullable public Object invoke(MethodInvocation invocation) throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. Class??> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() { @Override @Nullable public Object proceedWithInvocation() throws Throwable { return invocation.proceed(); } @Override public Object getTarget() { return invocation.getThis(); } @Override public Object[] getArguments() { return invocation.getArguments(); } }); }
invokeWithinTransaction()方法會根據目標方法上的事務配置,來決定是開啟新事務、加入已有事務,還是直接執行邏輯(如果沒有事務)。其代碼簡化如下(僅保留PlatformTransactionManager部分):
protected Object invokeWithinTransaction(Method method, @Nullable Class??> targetClass, final InvocationCallback invocation) { // If the transaction attribute is null, the method is non-transactional. final TransactionAttribute txAttr = getTransactionAttributeSource() .getTransactionAttribute(method, targetClass); final PlatformTransactionManager tm = determineTransactionManager(txAttr); final String joinpointIdentification = methodIdentification(method, targetClass); if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); ObjectretVal = null; try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. retVal = invocation.proceedWithInvocation(); } catch (Throwableex) { // target invocation exception completeTransactionAfterThrowing(txInfo, ex); throwex; } finally { cleanupTransactionInfo(txInfo); } commitTransactionAfterReturning(txInfo); returnretVal; } } 審核編輯 黃宇
-
XML
+關注
關注
0文章
188瀏覽量
33104 -
代碼
+關注
關注
30文章
4802瀏覽量
68740 -
spring
+關注
關注
0文章
340瀏覽量
14356
發布評論請先 登錄
相關推薦
評論