Skip to content

Commit 1b7461c

Browse files
committed
Add ExclusiveOperation instead of GlobalLock
1 parent 8ac6505 commit 1b7461c

File tree

24 files changed

+347
-106
lines changed

24 files changed

+347
-106
lines changed

agent/plugins/metrics/type/prometheus/src/test/java/org/apache/shardingsphere/agent/plugin/metrics/prometheus/PrometheusPluginLifecycleServiceTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
2828
import org.apache.shardingsphere.infra.util.props.PropertiesBuilder;
2929
import org.apache.shardingsphere.infra.util.props.PropertiesBuilder.Property;
30-
import org.apache.shardingsphere.mode.lock.LockContext;
30+
import org.apache.shardingsphere.mode.exclusive.ExclusiveOperatorContext;
3131
import org.apache.shardingsphere.mode.manager.ContextManager;
3232
import org.apache.shardingsphere.mode.manager.standalone.workerid.StandaloneWorkerIdGenerator;
3333
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -74,6 +74,6 @@ private ContextManager mockContextManager() {
7474
ComputeNodeInstanceContext computeNodeInstanceContext = new ComputeNodeInstanceContext(
7575
new ComputeNodeInstance(mock(InstanceMetaData.class)), new ModeConfiguration("Standalone", null), new EventBusContext());
7676
computeNodeInstanceContext.init(new StandaloneWorkerIdGenerator());
77-
return new ContextManager(metaDataContexts, computeNodeInstanceContext, mock(LockContext.class), mock(PersistRepository.class));
77+
return new ContextManager(metaDataContexts, computeNodeInstanceContext, mock(PersistRepository.class), mock(ExclusiveOperatorContext.class));
7878
}
7979
}

kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,7 @@
6767
import org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
6868
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
6969
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
70-
import org.apache.shardingsphere.mode.lock.LockDefinition;
7170
import org.apache.shardingsphere.mode.manager.ContextManager;
72-
import org.apache.shardingsphere.mode.manager.cluster.lock.global.GlobalLockDefinition;
7371
import org.apache.shardingsphere.parser.rule.SQLParserRule;
7472

7573
import java.sql.SQLException;
@@ -115,26 +113,16 @@ private void prepareAndCheckTargetWithLock(final MigrationJobItemContext jobItem
115113
if (!jobItemManager.getProgress(jobId, jobItemContext.getShardingItem()).isPresent()) {
116114
jobItemManager.persistProgress(jobItemContext);
117115
}
118-
LockDefinition lockDefinition = new GlobalLockDefinition(new MigrationPrepareLock(jobConfig.getJobId()));
119-
long startTimeMillis = System.currentTimeMillis();
120-
if (contextManager.getLockContext().tryLock(lockDefinition, 600 * 1000L)) {
121-
log.info("Lock success, jobId={}, shardingItem={}, cost {} ms.", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
122-
try {
123-
PipelineJobOffsetGovernanceRepository offsetRepository = PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobFacade().getOffset();
124-
JobOffsetInfo offsetInfo = offsetRepository.load(jobId);
125-
if (!offsetInfo.isTargetSchemaTableCreated()) {
126-
jobItemContext.setStatus(JobStatus.PREPARING);
127-
jobItemManager.updateStatus(jobId, jobItemContext.getShardingItem(), JobStatus.PREPARING);
128-
prepareAndCheckTarget(jobItemContext, contextManager);
129-
offsetRepository.persist(jobId, new JobOffsetInfo(true));
130-
}
131-
} finally {
132-
log.info("Unlock, jobId={}, shardingItem={}, cost {} ms.", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
133-
contextManager.getLockContext().unlock(lockDefinition);
116+
contextManager.getExclusiveOperatorEngine().operate(new MigrationPrepareOperation(jobId), 600 * 1000L, () -> {
117+
PipelineJobOffsetGovernanceRepository offsetRepository = PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobFacade().getOffset();
118+
JobOffsetInfo offsetInfo = offsetRepository.load(jobId);
119+
if (!offsetInfo.isTargetSchemaTableCreated()) {
120+
jobItemContext.setStatus(JobStatus.PREPARING);
121+
jobItemManager.updateStatus(jobId, jobItemContext.getShardingItem(), JobStatus.PREPARING);
122+
prepareAndCheckTarget(jobItemContext, contextManager);
123+
offsetRepository.persist(jobId, new JobOffsetInfo(true));
134124
}
135-
} else {
136-
log.warn("Lock failed, jobId={}, shardingItem={}.", jobId, jobItemContext.getShardingItem());
137-
}
125+
});
138126
}
139127

140128
private void prepareAndCheckTarget(final MigrationJobItemContext jobItemContext, final ContextManager contextManager) throws SQLException {
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@
1818
package org.apache.shardingsphere.data.pipeline.scenario.migration.preparer;
1919

2020
import lombok.RequiredArgsConstructor;
21-
import org.apache.shardingsphere.mode.manager.cluster.lock.global.GlobalLock;
21+
import org.apache.shardingsphere.mode.exclusive.ExclusiveOperation;
2222

2323
/**
24-
* Migration prepare lock.
24+
* Migration prepare exclusive operation.
2525
*/
2626
@RequiredArgsConstructor
27-
public final class MigrationPrepareLock implements GlobalLock {
27+
public final class MigrationPrepareOperation implements ExclusiveOperation {
2828

2929
private final String jobId;
3030

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.shardingsphere.mode.exclusive;
19+
20+
/**
21+
* Exclusive operation.
22+
*/
23+
public interface ExclusiveOperation {
24+
25+
/**
26+
* Get operation name.
27+
*
28+
* @return operation name
29+
*/
30+
String getName();
31+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.shardingsphere.mode.exclusive;
19+
20+
import java.sql.SQLException;
21+
22+
@FunctionalInterface
23+
public interface ExclusiveOperationCallback {
24+
25+
/**
26+
* Execute.
27+
*
28+
* @throws SQLException SQL exception
29+
*/
30+
void execute() throws SQLException;
31+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.shardingsphere.mode.exclusive;
19+
20+
/**
21+
* Exclusive operator context.
22+
*/
23+
public interface ExclusiveOperatorContext {
24+
25+
/**
26+
* Start exclusive operation.
27+
*
28+
* @param operation exclusive operation
29+
* @param timeoutMillis timeout milliseconds
30+
* @return is started or not
31+
*/
32+
boolean start(ExclusiveOperation operation, long timeoutMillis);
33+
34+
/**
35+
* Stop exclusive operation.
36+
*
37+
* @param operation exclusive operation
38+
*/
39+
void stop(ExclusiveOperation operation);
40+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.shardingsphere.mode.exclusive;
19+
20+
import lombok.RequiredArgsConstructor;
21+
22+
import java.sql.SQLException;
23+
24+
/**
25+
* Exclusive operator engine.
26+
*/
27+
@RequiredArgsConstructor
28+
public final class ExclusiveOperatorEngine {
29+
30+
private final ExclusiveOperatorContext exclusiveOperatorContext;
31+
32+
/**
33+
* Operate with exclusive lock.
34+
*
35+
* @param operation exclusive operation
36+
* @param timeoutMillis timeout millis
37+
* @param callback callback
38+
* @throws SQLException SQL exception
39+
*/
40+
public void operate(final ExclusiveOperation operation, final long timeoutMillis, final ExclusiveOperationCallback callback) throws SQLException {
41+
if (exclusiveOperatorContext.start(operation, timeoutMillis)) {
42+
try {
43+
callback.execute();
44+
} finally {
45+
exclusiveOperatorContext.stop(operation);
46+
}
47+
}
48+
}
49+
}

mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@
3939
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
4040
import org.apache.shardingsphere.infra.metadata.statistics.builder.ShardingSphereStatisticsFactory;
4141
import org.apache.shardingsphere.infra.rule.builder.global.GlobalRulesBuilder;
42-
import org.apache.shardingsphere.mode.lock.LockContext;
42+
import org.apache.shardingsphere.mode.exclusive.ExclusiveOperatorContext;
43+
import org.apache.shardingsphere.mode.exclusive.ExclusiveOperatorEngine;
4344
import org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListenerFactory;
4445
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
4546
import org.apache.shardingsphere.mode.metadata.factory.MetaDataContextsFactory;
@@ -65,8 +66,6 @@ public final class ContextManager implements AutoCloseable {
6566

6667
private final ComputeNodeInstanceContext computeNodeInstanceContext;
6768

68-
private final LockContext lockContext;
69-
7069
private final StateContext stateContext;
7170

7271
private final ExecutorEngine executorEngine;
@@ -75,14 +74,20 @@ public final class ContextManager implements AutoCloseable {
7574

7675
private final MetaDataContextManager metaDataContextManager;
7776

78-
public ContextManager(final MetaDataContexts metaDataContexts, final ComputeNodeInstanceContext computeNodeInstanceContext, final LockContext lockContext, final PersistRepository repository) {
77+
private final ExclusiveOperatorContext exclusiveOperatorContext;
78+
79+
private final ExclusiveOperatorEngine exclusiveOperatorEngine;
80+
81+
public ContextManager(final MetaDataContexts metaDataContexts, final ComputeNodeInstanceContext computeNodeInstanceContext,
82+
final PersistRepository repository, final ExclusiveOperatorContext exclusiveOperatorContext) {
7983
this.metaDataContexts = metaDataContexts;
8084
this.computeNodeInstanceContext = computeNodeInstanceContext;
81-
this.lockContext = lockContext;
85+
this.exclusiveOperatorContext = exclusiveOperatorContext;
8286
executorEngine = ExecutorEngine.createExecutorEngineWithSize(metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE));
8387
metaDataContextManager = new MetaDataContextManager(metaDataContexts, computeNodeInstanceContext, repository);
8488
persistServiceFacade = new PersistServiceFacade(repository, computeNodeInstanceContext.getModeConfiguration(), metaDataContextManager);
8589
stateContext = new StateContext(persistServiceFacade.getStateService().load());
90+
exclusiveOperatorEngine = new ExclusiveOperatorEngine(exclusiveOperatorContext);
8691
ContextManagerLifecycleListenerFactory.getListeners(this).forEach(each -> each.onInitialized(this));
8792
}
8893

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.shardingsphere.mode.node.path.type.exclusive;
19+
20+
import lombok.Getter;
21+
import lombok.RequiredArgsConstructor;
22+
import org.apache.shardingsphere.mode.node.path.NodePath;
23+
import org.apache.shardingsphere.mode.node.path.NodePathEntity;
24+
25+
/**
26+
* Exclusive operation node path.
27+
*/
28+
@NodePathEntity("/exclusive_operation/${name}")
29+
@RequiredArgsConstructor
30+
@Getter
31+
public final class ExclusiveOperationNodePath implements NodePath {
32+
33+
private final String name;
34+
}

mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,12 @@
2727
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
2828
import org.apache.shardingsphere.mode.deliver.DeliverEventSubscriber;
2929
import org.apache.shardingsphere.mode.deliver.DeliverEventSubscriberRegistry;
30-
import org.apache.shardingsphere.mode.lock.LockContext;
3130
import org.apache.shardingsphere.mode.manager.ContextManager;
3231
import org.apache.shardingsphere.mode.manager.builder.ContextManagerBuilder;
3332
import org.apache.shardingsphere.mode.manager.builder.ContextManagerBuilderParameter;
3433
import org.apache.shardingsphere.mode.manager.cluster.dispatch.listener.DataChangedEventListenerRegistry;
3534
import org.apache.shardingsphere.mode.manager.cluster.exception.MissingRequiredClusterRepositoryConfigurationException;
36-
import org.apache.shardingsphere.mode.manager.cluster.lock.ClusterLockContext;
35+
import org.apache.shardingsphere.mode.manager.cluster.exclusive.ClusterExclusiveOperatorContext;
3736
import org.apache.shardingsphere.mode.manager.cluster.persist.facade.ClusterPersistServiceFacade;
3837
import org.apache.shardingsphere.mode.manager.cluster.workerid.ClusterWorkerIdGenerator;
3938
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -59,9 +58,8 @@ public ContextManager build(final ContextManagerBuilderParameter param, final Ev
5958
ComputeNodeInstanceContext computeNodeInstanceContext = new ComputeNodeInstanceContext(new ComputeNodeInstance(param.getInstanceMetaData(), param.getLabels()), modeConfig, eventBusContext);
6059
ClusterPersistRepository repository = getClusterPersistRepository(config, computeNodeInstanceContext);
6160
computeNodeInstanceContext.init(new ClusterWorkerIdGenerator(repository, param.getInstanceMetaData().getId()));
62-
LockContext lockContext = new ClusterLockContext(repository);
6361
MetaDataContexts metaDataContexts = new MetaDataContextsFactory(new MetaDataPersistFacade(repository), computeNodeInstanceContext).create(param);
64-
ContextManager result = new ContextManager(metaDataContexts, computeNodeInstanceContext, lockContext, repository);
62+
ContextManager result = new ContextManager(metaDataContexts, computeNodeInstanceContext, repository, new ClusterExclusiveOperatorContext(repository));
6563
registerOnline(computeNodeInstanceContext, param, result);
6664
new DeliverEventSubscriberRegistry(result.getComputeNodeInstanceContext().getEventBusContext()).register(createDeliverEventSubscribers(repository));
6765
return result;

0 commit comments

Comments
 (0)