Skip to content

Commit 4d7b458

Browse files
authored
Merge pull request #36630 from FlyingZC/dev0919-trans1
Refactor transaction context
2 parents 8671f78 + 45783c7 commit 4d7b458

File tree

12 files changed

+84
-55
lines changed

12 files changed

+84
-55
lines changed

infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public <T> List<T> execute(final ExecutionGroupContext<JDBCExecutionUnit> execut
6363
public <T> List<T> execute(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
6464
final JDBCExecutorCallback<T> firstCallback, final JDBCExecutorCallback<T> callback) throws SQLException {
6565
try {
66-
return executorEngine.execute(executionGroupContext, firstCallback, callback, connectionContext.getTransactionContext().isInDistributedTransaction());
66+
return executorEngine.execute(executionGroupContext, firstCallback, callback, connectionContext.getTransactionContext().isDistributedTransactionStarted());
6767
} catch (final SQLException ex) {
6868
SQLExecutorExceptionHandler.handleException(ex);
6969
return Collections.emptyList();

infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionConnectionContext.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ public void beginTransaction(final String transactionType, final TransactionMana
6161
*
6262
* @return in distributed transaction or not
6363
*/
64-
public boolean isInDistributedTransaction() {
65-
return inTransaction && ("XA".equals(transactionType) || "BASE".equals(transactionType));
64+
public boolean isDistributedTransactionStarted() {
65+
return isTransactionStarted() && ("XA".equals(transactionType) || "BASE".equals(transactionType));
6666
}
6767

6868
/**
@@ -92,6 +92,15 @@ public Optional<TransactionManager> getTransactionManager() {
9292
return null == transactionManager ? Optional.empty() : Optional.ofNullable(transactionManager.get());
9393
}
9494

95+
/**
96+
* Judge transaction is started or not.
97+
*
98+
* @return whether transaction is started or not
99+
*/
100+
public boolean isTransactionStarted() {
101+
return inTransaction;
102+
}
103+
95104
@Override
96105
public void close() {
97106
transactionType = null;

infra/session/src/test/java/org/apache/shardingsphere/infra/session/connection/transaction/TransactionConnectionContextTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,25 +40,25 @@ void assertBeginTransaction() {
4040

4141
@Test
4242
void assertIsNotInDistributedTransactionWhenNotBegin() {
43-
assertFalse(transactionConnectionContext.isInDistributedTransaction());
43+
assertFalse(transactionConnectionContext.isDistributedTransactionStarted());
4444
}
4545

4646
@Test
4747
void assertIsNotInDistributedTransactionWithLocal() {
4848
transactionConnectionContext.beginTransaction("LOCAL", mock(TransactionManager.class));
49-
assertFalse(transactionConnectionContext.isInDistributedTransaction());
49+
assertFalse(transactionConnectionContext.isDistributedTransactionStarted());
5050
}
5151

5252
@Test
5353
void assertIsInDistributedTransactionWithXA() {
5454
transactionConnectionContext.beginTransaction("XA", mock(TransactionManager.class));
55-
assertTrue(transactionConnectionContext.isInDistributedTransaction());
55+
assertTrue(transactionConnectionContext.isDistributedTransactionStarted());
5656
}
5757

5858
@Test
5959
void assertIsInDistributedTransactionWithBASE() {
6060
transactionConnectionContext.beginTransaction("BASE", mock(TransactionManager.class));
61-
assertTrue(transactionConnectionContext.isInDistributedTransaction());
61+
assertTrue(transactionConnectionContext.isDistributedTransactionStarted());
6262
}
6363

6464
@Test

jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/transaction/DriverTransactionalExecutor.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public <T> T execute(final ShardingSphereDatabase database, final ExecutionConte
5858

5959
private <T> T executeWithImplicitCommit(final ShardingSphereDatabase database, final ImplicitTransactionCallback<T> callback) throws SQLException {
6060
try {
61-
connection.setAutoCommit(false);
61+
connection.getDatabaseConnectionManager().begin();
6262
T result = callback.execute();
6363
connection.commit();
6464
return result;
@@ -67,8 +67,6 @@ private <T> T executeWithImplicitCommit(final ShardingSphereDatabase database, f
6767
// CHECKSTYLE:ON
6868
connection.rollback();
6969
throw SQLExceptionTransformEngine.toSQLException(ex, database.getProtocolType());
70-
} finally {
71-
connection.setAutoCommit(true);
7270
}
7371
}
7472
}

jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/DriverDatabaseConnectionManager.java

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.apache.shardingsphere.mode.manager.ContextManager;
3434
import org.apache.shardingsphere.transaction.savepoint.ConnectionSavepointManager;
3535
import org.apache.shardingsphere.transaction.ConnectionTransaction;
36-
import org.apache.shardingsphere.transaction.api.TransactionType;
3736
import org.apache.shardingsphere.transaction.rule.TransactionRule;
3837

3938
import javax.sql.DataSource;
@@ -130,11 +129,17 @@ public void clearCachedConnections() throws SQLException {
130129
*/
131130
public void begin() throws SQLException {
132131
ConnectionTransaction connectionTransaction = getConnectionTransaction();
133-
if (TransactionType.isDistributedTransaction(connectionTransaction.getTransactionType())) {
132+
connectionContext.getTransactionContext().beginTransaction(connectionTransaction.getTransactionType().name(), connectionTransaction.getDistributedTransactionManager());
133+
doBegin(connectionTransaction);
134+
}
135+
136+
private void doBegin(final ConnectionTransaction connectionTransaction) throws SQLException {
137+
if (connectionTransaction.isLocalTransaction()) {
138+
setAutoCommit(false);
139+
} else {
134140
close();
135141
connectionTransaction.begin();
136142
}
137-
connectionContext.getTransactionContext().beginTransaction(String.valueOf(connectionTransaction.getTransactionType()), connectionTransaction.getDistributedTransactionManager());
138143
}
139144

140145
/**
@@ -144,6 +149,9 @@ public void begin() throws SQLException {
144149
*/
145150
public void commit() throws SQLException {
146151
ConnectionTransaction connectionTransaction = getConnectionTransaction();
152+
if (!connectionContext.getTransactionContext().isInTransaction() && !connectionTransaction.isInDistributedTransaction()) {
153+
return;
154+
}
147155
try {
148156
if (connectionTransaction.isLocalTransaction() && connectionContext.getTransactionContext().isExceptionOccur()) {
149157
forceExecuteTemplate.execute(getCachedConnections(), Connection::rollback);
@@ -153,12 +161,7 @@ public void commit() throws SQLException {
153161
connectionTransaction.commit();
154162
}
155163
} finally {
156-
methodInvocationRecorder.remove("setSavepoint");
157-
for (Connection each : getCachedConnections()) {
158-
ConnectionSavepointManager.getInstance().transactionFinished(each);
159-
}
160-
connectionContext.close();
161-
clearCachedConnections();
164+
clear();
162165
}
163166
}
164167

@@ -169,19 +172,17 @@ public void commit() throws SQLException {
169172
*/
170173
public void rollback() throws SQLException {
171174
ConnectionTransaction connectionTransaction = getConnectionTransaction();
175+
if (!connectionContext.getTransactionContext().isInTransaction() && !connectionTransaction.isInDistributedTransaction()) {
176+
return;
177+
}
172178
try {
173179
if (connectionTransaction.isLocalTransaction()) {
174180
forceExecuteTemplate.execute(getCachedConnections(), Connection::rollback);
175181
} else {
176182
connectionTransaction.rollback();
177183
}
178184
} finally {
179-
methodInvocationRecorder.remove("setSavepoint");
180-
for (Connection each : getCachedConnections()) {
181-
ConnectionSavepointManager.getInstance().transactionFinished(each);
182-
}
183-
connectionContext.close();
184-
clearCachedConnections();
185+
clear();
185186
}
186187
}
187188

@@ -197,6 +198,14 @@ public void rollback(final Savepoint savepoint) throws SQLException {
197198
}
198199
}
199200

201+
private void clear() {
202+
methodInvocationRecorder.remove("setSavepoint");
203+
for (Connection each : getCachedConnections()) {
204+
ConnectionSavepointManager.getInstance().transactionFinished(each);
205+
}
206+
connectionContext.close();
207+
}
208+
200209
/**
201210
* Set savepoint.
202211
*

kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public ConnectionTransaction(final TransactionRule rule, final TransactionConnec
5757
* @return in distributed transaction or not
5858
*/
5959
public boolean isInDistributedTransaction(final TransactionConnectionContext transactionContext) {
60-
return transactionContext.isInTransaction() && isInDistributedTransaction();
60+
return transactionContext.isTransactionStarted() && isInDistributedTransaction();
6161
}
6262

6363
/**

proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ private List<Connection> createNewConnections(final String databaseName, final S
132132
for (Connection each : result) {
133133
replayTransactionOption(each);
134134
}
135-
if (connectionSession.getTransactionStatus().isInTransaction()) {
135+
if (connectionSession.getConnectionContext().getTransactionContext().isTransactionStarted()) {
136136
for (Connection each : result) {
137137
replayMethodsInvocation(each);
138138
}

proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,13 @@ public final class BackendTransactionManager implements TransactionManager {
5656

5757
private final Map<ShardingSphereRule, TransactionHook> transactionHooks;
5858

59+
private final TransactionConnectionContext transactionContext;
60+
5961
public BackendTransactionManager(final ProxyDatabaseConnectionManager databaseConnectionManager) {
6062
connection = databaseConnectionManager;
6163
localTransactionManager = new LocalTransactionManager(databaseConnectionManager);
6264
TransactionRule transactionRule = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(TransactionRule.class);
63-
TransactionConnectionContext transactionContext = getTransactionContext();
65+
transactionContext = connection.getConnectionSession().getConnectionContext().getTransactionContext();
6466
transactionType = transactionRule.getDefaultType();
6567
ShardingSphereTransactionManagerEngine engine = transactionRule.getResource();
6668
if (transactionContext.getTransactionManager().isPresent()) {
@@ -75,21 +77,25 @@ public BackendTransactionManager(final ProxyDatabaseConnectionManager databaseCo
7577
public void begin() {
7678
if (!connection.getConnectionSession().getTransactionStatus().isInTransaction()) {
7779
connection.getConnectionSession().getTransactionStatus().setInTransaction(true);
78-
getTransactionContext().beginTransaction(transactionType.name(), distributedTransactionManager);
79-
connection.closeHandlers(true);
80-
connection.closeConnections(false);
80+
transactionContext.beginTransaction(transactionType.name(), distributedTransactionManager);
8181
}
82+
doBegin();
83+
}
84+
85+
private void doBegin() {
86+
connection.closeHandlers(true);
87+
connection.closeConnections(false);
8288
DatabaseType databaseType = ProxyContext.getInstance().getDatabaseType();
8389
for (Entry<ShardingSphereRule, TransactionHook> entry : transactionHooks.entrySet()) {
84-
entry.getValue().beforeBegin(entry.getKey(), databaseType, getTransactionContext());
90+
entry.getValue().beforeBegin(entry.getKey(), databaseType, transactionContext);
8591
}
8692
if (TransactionType.LOCAL == transactionType || null == distributedTransactionManager) {
8793
localTransactionManager.begin();
8894
} else {
8995
distributedTransactionManager.begin();
9096
}
9197
for (Entry<ShardingSphereRule, TransactionHook> entry : transactionHooks.entrySet()) {
92-
entry.getValue().afterBegin(entry.getKey(), databaseType, getTransactionContext());
98+
entry.getValue().afterBegin(entry.getKey(), databaseType, transactionContext);
9399
}
94100
}
95101

@@ -112,22 +118,15 @@ private void commit(final DatabaseType databaseType) throws SQLException {
112118
try {
113119
// FIXME if timeout when lock required, TSO not assigned, but commit will continue, solution is use redis lock in impl to instead of reg center's lock. #35041
114120
for (Entry<ShardingSphereRule, TransactionHook> entry : transactionHooks.entrySet()) {
115-
entry.getValue().beforeCommit(entry.getKey(), databaseType, connection.getCachedConnections().values(), getTransactionContext());
121+
entry.getValue().beforeCommit(entry.getKey(), databaseType, connection.getCachedConnections().values(), transactionContext);
116122
}
117-
if (TransactionType.LOCAL == TransactionUtils.getTransactionType(getTransactionContext()) || null == distributedTransactionManager) {
123+
if (TransactionType.LOCAL == TransactionUtils.getTransactionType(transactionContext) || null == distributedTransactionManager) {
118124
localTransactionManager.commit();
119125
} else {
120126
distributedTransactionManager.commit(getTransactionContext().isExceptionOccur());
121127
}
122128
} finally {
123-
for (Entry<ShardingSphereRule, TransactionHook> entry : transactionHooks.entrySet()) {
124-
entry.getValue().afterCommit(entry.getKey(), databaseType, connection.getCachedConnections().values(), getTransactionContext());
125-
}
126-
for (Connection each : connection.getCachedConnections().values()) {
127-
ConnectionSavepointManager.getInstance().transactionFinished(each);
128-
}
129-
connection.getConnectionSession().getTransactionStatus().setInTransaction(false);
130-
connection.getConnectionSession().getConnectionContext().close();
129+
clear();
131130
}
132131
}
133132

@@ -140,28 +139,35 @@ private boolean isNeedLockWhenCommit() {
140139
return false;
141140
}
142141

142+
private void clear() {
143+
for (Connection each : connection.getCachedConnections().values()) {
144+
ConnectionSavepointManager.getInstance().transactionFinished(each);
145+
}
146+
connection.getConnectionSession().getTransactionStatus().setInTransaction(false);
147+
connection.getConnectionSession().getConnectionContext().close();
148+
}
149+
143150
@Override
144151
public void rollback() throws SQLException {
152+
if (!connection.getConnectionSession().getTransactionStatus().isInTransaction()) {
153+
return;
154+
}
145155
DatabaseType databaseType = ProxyContext.getInstance().getDatabaseType();
146156
for (Entry<ShardingSphereRule, TransactionHook> entry : transactionHooks.entrySet()) {
147-
entry.getValue().beforeRollback(entry.getKey(), databaseType, connection.getCachedConnections().values(), getTransactionContext());
157+
entry.getValue().beforeRollback(entry.getKey(), databaseType, connection.getCachedConnections().values(), transactionContext);
148158
}
149159
if (connection.getConnectionSession().getTransactionStatus().isInTransaction()) {
150160
try {
151-
if (TransactionType.LOCAL == TransactionUtils.getTransactionType(getTransactionContext()) || null == distributedTransactionManager) {
161+
if (TransactionType.LOCAL == TransactionUtils.getTransactionType(transactionContext) || null == distributedTransactionManager) {
152162
localTransactionManager.rollback();
153163
} else {
154164
distributedTransactionManager.rollback();
155165
}
156166
} finally {
157167
for (Entry<ShardingSphereRule, TransactionHook> entry : transactionHooks.entrySet()) {
158-
entry.getValue().afterRollback(entry.getKey(), databaseType, connection.getCachedConnections().values(), getTransactionContext());
159-
}
160-
for (Connection each : connection.getCachedConnections().values()) {
161-
ConnectionSavepointManager.getInstance().transactionFinished(each);
168+
entry.getValue().afterRollback(entry.getKey(), databaseType, connection.getCachedConnections().values(), transactionContext);
162169
}
163-
connection.getConnectionSession().getTransactionStatus().setInTransaction(false);
164-
connection.getConnectionSession().getConnectionContext().close();
170+
clear();
165171
}
166172
}
167173
}
@@ -188,8 +194,8 @@ public void rollbackTo(final String savepointName) throws SQLException {
188194
result.add(ex);
189195
}
190196
}
191-
if (result.isEmpty() && getTransactionContext().isExceptionOccur()) {
192-
getTransactionContext().setExceptionOccur(false);
197+
if (result.isEmpty() && transactionContext.isExceptionOccur()) {
198+
transactionContext.setExceptionOccur(false);
193199
}
194200
throwSQLExceptionIfNecessary(result);
195201
}

proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManagerTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
3030
import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
3131
import org.apache.shardingsphere.infra.metadata.statistics.builder.ShardingSphereStatisticsFactory;
32+
import org.apache.shardingsphere.infra.session.connection.transaction.TransactionConnectionContext;
3233
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
3334
import org.apache.shardingsphere.mode.manager.ContextManager;
3435
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -107,6 +108,7 @@ void setUp() {
107108
ContextManager contextManager = mockContextManager();
108109
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
109110
when(ProxyContext.getInstance().getBackendDataSource()).thenReturn(backendDataSource);
111+
when(connectionSession.getConnectionContext().getTransactionContext()).thenReturn(new TransactionConnectionContext());
110112
when(connectionSession.getTransactionStatus()).thenReturn(new TransactionStatus());
111113
when(connectionSession.getUsedDatabaseName()).thenReturn(String.format(SCHEMA_PATTERN, 0));
112114
databaseConnectionManager = new ProxyDatabaseConnectionManager(connectionSession);
@@ -172,6 +174,7 @@ void assertGetConnectionSizeGreaterThanCache() throws SQLException {
172174
@Test
173175
void assertGetConnectionWithConnectionPostProcessors() throws SQLException {
174176
connectionSession.getTransactionStatus().setInTransaction(true);
177+
connectionSession.getConnectionContext().getTransactionContext().beginTransaction(TransactionType.LOCAL.name(), null);
175178
when(backendDataSource.getConnections(anyString(), anyString(), eq(2), any())).thenReturn(MockConnectionUtils.mockNewConnections(2));
176179
setConnectionPostProcessors();
177180
List<Connection> actualConnections = databaseConnectionManager.getConnections("foo_db", "ds1", 0, 2, ConnectionMode.MEMORY_STRICTLY);

proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManagerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ void assertBeginForDistributedTransaction() {
113113
newBackendTransactionManager(TransactionType.XA, true);
114114
backendTransactionManager.begin();
115115
verify(transactionStatus, times(0)).setInTransaction(true);
116-
verify(databaseConnectionManager, times(0)).closeConnections(false);
116+
verify(databaseConnectionManager, times(1)).closeConnections(false);
117117
verify(distributedTransactionManager).begin();
118118
}
119119

0 commit comments

Comments
 (0)