diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java index 0a986c4768aea..cfd2810435b00 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java @@ -20,13 +20,14 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.Setter; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition; import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener; import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress; import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService; +import java.util.ArrayList; import java.util.Collection; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.List; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicLong; @@ -54,9 +55,7 @@ public final class ConsistencyCheckJobItemProgressContext implements PipelineJob private volatile Long checkEndTimeMillis; - private final Map sourceTableCheckPositions = new ConcurrentHashMap<>(); - - private final Map targetTableCheckPositions = new ConcurrentHashMap<>(); + private final List tableCheckRangePositions = new ArrayList<>(); private final String sourceDatabaseType; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/TableCheckRangePosition.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/TableCheckRangePosition.java new file mode 100644 index 0000000000000..430bc6f4e4eff --- /dev/null +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/TableCheckRangePosition.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.consistencycheck.position; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import lombok.ToString; +import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition; + +/** + * Table check range position. + */ +@RequiredArgsConstructor +@AllArgsConstructor +@Getter +@Setter +@ToString +public final class TableCheckRangePosition { + + private final Integer splittingItem; + + private final String sourceDataNode; + + private final String logicTableName; + + private final PrimaryKeyIngestPosition sourceRange; + + private final PrimaryKeyIngestPosition targetRange; + + private final String queryCondition; + + private volatile Object sourcePosition; + + private volatile Object targetPosition; + + private volatile boolean finished; + + private volatile Boolean matched; +} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePosition.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePosition.java new file mode 100644 index 0000000000000..7dcb5a7f007c2 --- /dev/null +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePosition.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration; + +/** + * Yaml table check range position. + */ +@Getter +@Setter +@EqualsAndHashCode +public final class YamlTableCheckRangePosition implements YamlConfiguration { + + private Integer splittingItem; + + private String sourceDataNode; + + private String logicTableName; + + private String sourceRange; + + private String targetRange; + + private String queryCondition; + + private Object sourcePosition; + + private Object targetPosition; + + private boolean finished; + + private Boolean matched; +} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePositionSwapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePositionSwapper.java new file mode 100644 index 0000000000000..1236cd785fcce --- /dev/null +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePositionSwapper.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml; + +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition; +import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPositionFactory; +import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper; + +/** + * Yaml table check range position swapper. + */ +public final class YamlTableCheckRangePositionSwapper implements YamlConfigurationSwapper { + + @Override + public YamlTableCheckRangePosition swapToYamlConfiguration(final TableCheckRangePosition data) { + YamlTableCheckRangePosition result = new YamlTableCheckRangePosition(); + result.setSplittingItem(data.getSplittingItem()); + result.setSourceDataNode(data.getSourceDataNode()); + result.setLogicTableName(data.getLogicTableName()); + result.setSourceRange(data.getSourceRange().toString()); + result.setTargetRange(data.getTargetRange().toString()); + result.setQueryCondition(data.getQueryCondition()); + result.setSourcePosition(data.getSourcePosition()); + result.setTargetPosition(data.getTargetPosition()); + result.setFinished(data.isFinished()); + result.setMatched(data.getMatched()); + return result; + } + + @Override + public TableCheckRangePosition swapToObject(final YamlTableCheckRangePosition yamlConfig) { + return new TableCheckRangePosition(yamlConfig.getSplittingItem(), yamlConfig.getSourceDataNode(), yamlConfig.getLogicTableName(), + PrimaryKeyIngestPositionFactory.newInstance(yamlConfig.getSourceRange()), PrimaryKeyIngestPositionFactory.newInstance(yamlConfig.getTargetRange()), + yamlConfig.getQueryCondition(), yamlConfig.getSourcePosition(), yamlConfig.getTargetPosition(), yamlConfig.isFinished(), yamlConfig.getMatched()); + } +} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java index 4198729ac461b..bcd622ee34a93 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java @@ -21,6 +21,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult; @@ -31,18 +32,14 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange; import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress; -import org.apache.shardingsphere.infra.exception.external.sql.type.kernel.category.PipelineSQLException; -import org.apache.shardingsphere.infra.exception.external.sql.type.wrapper.SQLWrapperException; +import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils; import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder; import org.apache.shardingsphere.infra.util.close.QuietlyCloser; -import java.sql.SQLException; import java.util.Iterator; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -78,18 +75,21 @@ public TableDataConsistencyCheckResult checkSingleTableInventoryData() { private TableDataConsistencyCheckResult checkSingleTableInventoryData(final TableInventoryCheckParameter param, final ThreadPoolExecutor executor) { SingleTableInventoryCalculateParameter sourceParam = new SingleTableInventoryCalculateParameter(param.getSourceDataSource(), param.getSourceTable(), - param.getColumnNames(), param.getUniqueKeys(), QueryType.RANGE_QUERY); - sourceParam.setQueryRange(new QueryRange(param.getProgressContext().getSourceTableCheckPositions().get(param.getSourceTable().getTableName()), true, null)); + param.getColumnNames(), param.getUniqueKeys(), QueryType.RANGE_QUERY, param.getQueryCondition()); + TableCheckRangePosition checkRangePosition = param.getProgressContext().getTableCheckRangePositions().get(param.getSplittingItem()); + sourceParam.setQueryRange(new QueryRange(null != checkRangePosition.getSourcePosition() ? checkRangePosition.getSourcePosition() : checkRangePosition.getSourceRange().getBeginValue(), + true, checkRangePosition.getSourceRange().getEndValue())); SingleTableInventoryCalculateParameter targetParam = new SingleTableInventoryCalculateParameter(param.getTargetDataSource(), param.getTargetTable(), - param.getColumnNames(), param.getUniqueKeys(), QueryType.RANGE_QUERY); - targetParam.setQueryRange(new QueryRange(param.getProgressContext().getTargetTableCheckPositions().get(param.getTargetTable().getTableName()), true, null)); + param.getColumnNames(), param.getUniqueKeys(), QueryType.RANGE_QUERY, param.getQueryCondition()); + targetParam.setQueryRange(new QueryRange(null != checkRangePosition.getTargetPosition() ? checkRangePosition.getTargetPosition() : checkRangePosition.getTargetRange().getBeginValue(), + true, checkRangePosition.getTargetRange().getEndValue())); SingleTableInventoryCalculator sourceCalculator = buildSingleTableInventoryCalculator(); this.sourceCalculator = sourceCalculator; SingleTableInventoryCalculator targetCalculator = buildSingleTableInventoryCalculator(); this.targetCalculator = targetCalculator; try { - Iterator sourceCalculatedResults = waitFuture(executor.submit(() -> sourceCalculator.calculate(sourceParam))).iterator(); - Iterator targetCalculatedResults = waitFuture(executor.submit(() -> targetCalculator.calculate(targetParam))).iterator(); + Iterator sourceCalculatedResults = PipelineTaskUtils.waitFuture(executor.submit(() -> sourceCalculator.calculate(sourceParam))).iterator(); + Iterator targetCalculatedResults = PipelineTaskUtils.waitFuture(executor.submit(() -> targetCalculator.calculate(targetParam))).iterator(); return checkSingleTableInventoryData(sourceCalculatedResults, targetCalculatedResults, param, executor); } finally { QuietlyCloser.close(sourceParam.getCalculationContext()); @@ -107,46 +107,31 @@ private TableDataConsistencyCheckResult checkSingleTableInventoryData(final Iter if (null != param.getReadRateLimitAlgorithm()) { param.getReadRateLimitAlgorithm().intercept(PipelineSQLOperationType.SELECT, 1); } - SingleTableInventoryCalculatedResult sourceCalculatedResult = waitFuture(executor.submit(sourceCalculatedResults::next)); - SingleTableInventoryCalculatedResult targetCalculatedResult = waitFuture(executor.submit(targetCalculatedResults::next)); + SingleTableInventoryCalculatedResult sourceCalculatedResult = PipelineTaskUtils.waitFuture(executor.submit(sourceCalculatedResults::next)); + SingleTableInventoryCalculatedResult targetCalculatedResult = PipelineTaskUtils.waitFuture(executor.submit(targetCalculatedResults::next)); if (!Objects.equals(sourceCalculatedResult, targetCalculatedResult)) { checkResult.setMatched(false); log.info("content matched false, jobId={}, sourceTable={}, targetTable={}, uniqueKeys={}", param.getJobId(), param.getSourceTable(), param.getTargetTable(), param.getUniqueKeys()); break; } + TableCheckRangePosition checkRangePosition = param.getProgressContext().getTableCheckRangePositions().get(param.getSplittingItem()); if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) { - param.getProgressContext().getSourceTableCheckPositions().put(param.getSourceTable().getTableName(), sourceCalculatedResult.getMaxUniqueKeyValue().get()); + checkRangePosition.setSourcePosition(sourceCalculatedResult.getMaxUniqueKeyValue().get()); } if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) { - param.getProgressContext().getTargetTableCheckPositions().put(param.getTargetTable().getTableName(), targetCalculatedResult.getMaxUniqueKeyValue().get()); + checkRangePosition.setTargetPosition(targetCalculatedResult.getMaxUniqueKeyValue().get()); } param.getProgressContext().onProgressUpdated(new PipelineJobUpdateProgress(sourceCalculatedResult.getRecordsCount())); } - if (sourceCalculatedResults.hasNext()) { + TableCheckRangePosition checkRangePosition = param.getProgressContext().getTableCheckRangePositions().get(param.getSplittingItem()); + checkRangePosition.setFinished(true); + if (sourceCalculatedResults.hasNext() || targetCalculatedResults.hasNext()) { checkResult.setMatched(false); - return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult); - } - if (targetCalculatedResults.hasNext()) { - checkResult.setMatched(false); - return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult); } + checkRangePosition.setMatched(checkResult.isMatched()); return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult); } - private T waitFuture(final Future future) { - try { - return future.get(); - } catch (final InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new SQLWrapperException(new SQLException(ex)); - } catch (final ExecutionException ex) { - if (ex.getCause() instanceof PipelineSQLException) { - throw (PipelineSQLException) ex.getCause(); - } - throw new SQLWrapperException(new SQLException(ex)); - } - } - protected abstract SingleTableInventoryCalculator buildSingleTableInventoryCalculator(); @Override diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java index 4419b643093a8..3cf1f8f37f2ba 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java @@ -36,6 +36,8 @@ public final class TableInventoryCheckParameter { private final String jobId; + private final int splittingItem; + private final PipelineDataSource sourceDataSource; private final PipelineDataSource targetDataSource; @@ -51,4 +53,14 @@ public final class TableInventoryCheckParameter { private final JobRateLimitAlgorithm readRateLimitAlgorithm; private final ConsistencyCheckJobItemProgressContext progressContext; + + private final String queryCondition; + + public TableInventoryCheckParameter(final String jobId, final PipelineDataSource sourceDataSource, final PipelineDataSource targetDataSource, + final QualifiedTable sourceTable, final QualifiedTable targetTable, + final List columnNames, final List uniqueKeys, + final JobRateLimitAlgorithm readRateLimitAlgorithm, final ConsistencyCheckJobItemProgressContext progressContext) { + this(jobId, 0, sourceDataSource, targetDataSource, sourceTable, targetTable, columnNames, uniqueKeys, readRateLimitAlgorithm, progressContext, + null); + } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java index 9c07cc5965dd2..d1a29f0250290 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java @@ -311,7 +311,7 @@ private void setParameters(final PreparedStatement preparedStatement, final Sing private SingleTableInventoryCalculateParameter buildPointRangeQueryCalculateParameter(final SingleTableInventoryCalculateParameter param, final Object uniqueKeyValue) { SingleTableInventoryCalculateParameter result = new SingleTableInventoryCalculateParameter(param.getDataSource(), param.getTable(), param.getColumnNames(), - Collections.singletonList(param.getFirstUniqueKey()), QueryType.POINT_QUERY); + Collections.singletonList(param.getFirstUniqueKey()), QueryType.POINT_QUERY, param.getQueryCondition()); result.setUniqueKeysValues(Collections.singletonList(uniqueKeyValue)); result.setShardingColumnsNames(param.getShardingColumnsNames()); result.setShardingColumnsValues(param.getShardingColumnsValues()); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java index 103ebba3d976c..8fe9e813bcc2b 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java @@ -68,6 +68,8 @@ public final class SingleTableInventoryCalculateParameter { private final QueryType queryType; + private final String queryCondition; + /** * Get database type. * diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/ConsistencyCheckJobItemProgress.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/ConsistencyCheckJobItemProgress.java index 4607d4c822622..e21b7c3a711e4 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/ConsistencyCheckJobItemProgress.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/ConsistencyCheckJobItemProgress.java @@ -21,11 +21,12 @@ import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.ToString; -import org.apache.shardingsphere.data.pipeline.core.job.JobStatus; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition; +import org.apache.shardingsphere.data.pipeline.core.job.JobStatus; -import java.util.LinkedHashMap; -import java.util.Map; +import java.util.ArrayList; +import java.util.List; /** * Data consistency check job item progress. @@ -48,9 +49,7 @@ public final class ConsistencyCheckJobItemProgress implements PipelineJobItemPro private final Long checkEndTimeMillis; - private final Map sourceTableCheckPositions = new LinkedHashMap<>(); - - private final Map targetTableCheckPositions = new LinkedHashMap<>(); + private final List tableCheckRangePositions = new ArrayList<>(); private final String sourceDatabaseType; @@ -64,8 +63,7 @@ public ConsistencyCheckJobItemProgress(final ConsistencyCheckJobItemProgressCont recordsCount = context.getRecordsCount(); checkBeginTimeMillis = context.getCheckBeginTimeMillis(); checkEndTimeMillis = context.getCheckEndTimeMillis(); - sourceTableCheckPositions.putAll(context.getSourceTableCheckPositions()); - targetTableCheckPositions.putAll(context.getTargetTableCheckPositions()); + tableCheckRangePositions.addAll(context.getTableCheckRangePositions()); sourceDatabaseType = context.getSourceDatabaseType(); } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/config/YamlConsistencyCheckJobItemProgress.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/config/YamlConsistencyCheckJobItemProgress.java index 49f7580454f42..a0df0d7ae1397 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/config/YamlConsistencyCheckJobItemProgress.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/config/YamlConsistencyCheckJobItemProgress.java @@ -19,8 +19,9 @@ import lombok.Getter; import lombok.Setter; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml.YamlTableCheckRangePosition; -import java.util.Map; +import java.util.List; /** * Yaml data consistency check job item progress. @@ -43,9 +44,7 @@ public final class YamlConsistencyCheckJobItemProgress implements YamlPipelineJo private Long checkEndTimeMillis; - private Map sourceTableCheckPositions; - - private Map targetTableCheckPositions; + private List tableCheckRangePositions; private String sourceDatabaseType; } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlConsistencyCheckJobItemProgressSwapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlConsistencyCheckJobItemProgressSwapper.java index 7f4e8d2215689..2c8ce5fccfd31 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlConsistencyCheckJobItemProgressSwapper.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlConsistencyCheckJobItemProgressSwapper.java @@ -17,15 +17,20 @@ package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml.YamlTableCheckRangePositionSwapper; import org.apache.shardingsphere.data.pipeline.core.job.JobStatus; import org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress; import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.config.YamlConsistencyCheckJobItemProgress; +import java.util.stream.Collectors; + /** * YAML data check job item progress swapper. */ public final class YamlConsistencyCheckJobItemProgressSwapper implements YamlPipelineJobItemProgressSwapper { + private final YamlTableCheckRangePositionSwapper tableCheckPositionSwapper = new YamlTableCheckRangePositionSwapper(); + @Override public YamlConsistencyCheckJobItemProgress swapToYamlConfiguration(final ConsistencyCheckJobItemProgress data) { YamlConsistencyCheckJobItemProgress result = new YamlConsistencyCheckJobItemProgress(); @@ -36,8 +41,7 @@ public YamlConsistencyCheckJobItemProgress swapToYamlConfiguration(final Consist result.setRecordsCount(data.getRecordsCount()); result.setCheckBeginTimeMillis(data.getCheckBeginTimeMillis()); result.setCheckEndTimeMillis(data.getCheckEndTimeMillis()); - result.setSourceTableCheckPositions(data.getSourceTableCheckPositions()); - result.setTargetTableCheckPositions(data.getTargetTableCheckPositions()); + result.setTableCheckRangePositions(data.getTableCheckRangePositions().stream().map(tableCheckPositionSwapper::swapToYamlConfiguration).collect(Collectors.toList())); result.setSourceDatabaseType(data.getSourceDatabaseType()); return result; } @@ -46,11 +50,8 @@ public YamlConsistencyCheckJobItemProgress swapToYamlConfiguration(final Consist public ConsistencyCheckJobItemProgress swapToObject(final YamlConsistencyCheckJobItemProgress yamlConfig) { ConsistencyCheckJobItemProgress result = new ConsistencyCheckJobItemProgress(yamlConfig.getTableNames(), yamlConfig.getIgnoredTableNames(), yamlConfig.getCheckedRecordsCount(), yamlConfig.getRecordsCount(), yamlConfig.getCheckBeginTimeMillis(), yamlConfig.getCheckEndTimeMillis(), yamlConfig.getSourceDatabaseType()); - if (null != yamlConfig.getSourceTableCheckPositions()) { - result.getSourceTableCheckPositions().putAll(yamlConfig.getSourceTableCheckPositions()); - } - if (null != yamlConfig.getTargetTableCheckPositions()) { - result.getTargetTableCheckPositions().putAll(yamlConfig.getTargetTableCheckPositions()); + if (null != yamlConfig.getTableCheckRangePositions()) { + result.getTableCheckRangePositions().addAll(yamlConfig.getTableCheckRangePositions().stream().map(tableCheckPositionSwapper::swapToObject).collect(Collectors.toList())); } result.setStatus(JobStatus.valueOf(yamlConfig.getStatus())); return result; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java index a4ae090d78683..acf9205e648f4 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java @@ -19,12 +19,18 @@ import lombok.AccessLevel; import lombok.NoArgsConstructor; +import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition; import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress; +import org.apache.shardingsphere.infra.exception.external.sql.type.kernel.category.PipelineSQLException; +import org.apache.shardingsphere.infra.exception.external.sql.type.wrapper.SQLWrapperException; +import java.sql.SQLException; import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; /** * Pipeline task utilities. @@ -57,4 +63,26 @@ public static IncrementalTaskProgress createIncrementalTaskProgress(final Ingest } return result; } + + /** + * Wait future. + * + * @param future future to wait + * @param result type + * @return execution result + * @throws SQLWrapperException if the future execution fails + */ + public static T waitFuture(final Future future) { + try { + return future.get(); + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new SQLWrapperException(new SQLException(ex)); + } catch (final ExecutionException ex) { + if (ex.getCause() instanceof PipelineSQLException || ex.getCause() instanceof PipelineJobCancelingException) { + throw (PipelineSQLException) ex.getCause(); + } + throw new SQLWrapperException(new SQLException(ex)); + } + } } diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java index e2b78e3733b25..597b1e215ad7e 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java @@ -66,7 +66,8 @@ class CRC32SingleTableInventoryCalculatorTest { void setUp() throws SQLException { DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, "FIXTURE"); List uniqueKeys = Collections.singletonList(new PipelineColumnMetaData(1, "id", Types.INTEGER, "integer", false, true, true)); - parameter = new SingleTableInventoryCalculateParameter(pipelineDataSource, new QualifiedTable(null, "foo_tbl"), Arrays.asList("foo_col", "bar_col"), uniqueKeys, QueryType.RANGE_QUERY); + parameter = new SingleTableInventoryCalculateParameter(pipelineDataSource, new QualifiedTable(null, "foo_tbl"), + Arrays.asList("foo_col", "bar_col"), uniqueKeys, QueryType.RANGE_QUERY, null); when(pipelineDataSource.getDatabaseType()).thenReturn(databaseType); when(pipelineDataSource.getConnection()).thenReturn(connection); } diff --git a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java index 39a88d0bdc0cd..bb56b31df9779 100644 --- a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java +++ b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java @@ -19,11 +19,11 @@ import lombok.Getter; import lombok.Setter; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; import org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext; import org.apache.shardingsphere.data.pipeline.core.context.PipelineProcessContext; import org.apache.shardingsphere.data.pipeline.core.job.JobStatus; import org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress; -import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration; import java.util.Optional; @@ -59,8 +59,7 @@ public ConsistencyCheckJobItemContext(final ConsistencyCheckJobConfiguration job progressContext = new ConsistencyCheckJobItemProgressContext(jobId, shardingItem, jobConfig.getSourceDatabaseType().getType()); if (null != jobItemProgress) { progressContext.getCheckedRecordsCount().set(Optional.ofNullable(jobItemProgress.getCheckedRecordsCount()).orElse(0L)); - Optional.ofNullable(jobItemProgress.getSourceTableCheckPositions()).ifPresent(progressContext.getSourceTableCheckPositions()::putAll); - Optional.ofNullable(jobItemProgress.getTargetTableCheckPositions()).ifPresent(progressContext.getTargetTableCheckPositions()::putAll); + progressContext.getTableCheckRangePositions().addAll(jobItemProgress.getTableCheckRangePositions()); } processContext = new ConsistencyCheckProcessContext(jobId); } diff --git a/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java index 1b95d6c53b115..aae4d9512b8f5 100644 --- a/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java +++ b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java @@ -17,6 +17,9 @@ package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition; +import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition; +import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition; import org.apache.shardingsphere.data.pipeline.core.job.JobStatus; import org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration; @@ -25,10 +28,13 @@ import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; class ConsistencyCheckJobItemContextTest { + private static final String DATA_NODE = "ds_0.t_order"; + private static final String TABLE = "t_order"; private final DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, "H2"); @@ -38,20 +44,40 @@ void assertConstructWithEmptyValues() { ConsistencyCheckJobItemProgress jobItemProgress = new ConsistencyCheckJobItemProgress(TABLE, null, 0L, 10L, null, null, "H2"); ConsistencyCheckJobItemContext actual = new ConsistencyCheckJobItemContext(new ConsistencyCheckJobConfiguration("", "", "DATA_MATCH", null, databaseType), 0, JobStatus.RUNNING, jobItemProgress); - assertThat(actual.getProgressContext().getSourceTableCheckPositions().size(), is(0)); - assertThat(actual.getProgressContext().getTargetTableCheckPositions().size(), is(0)); + assertThat(actual.getProgressContext().getTableCheckRangePositions().size(), is(0)); } @Test void assertConstructWithNonEmptyValues() { ConsistencyCheckJobItemProgress jobItemProgress = new ConsistencyCheckJobItemProgress(TABLE, null, 0L, 10L, null, null, "H2"); - jobItemProgress.getSourceTableCheckPositions().put(TABLE, 6); - jobItemProgress.getTargetTableCheckPositions().put(TABLE, 5); + jobItemProgress.getTableCheckRangePositions().add(new TableCheckRangePosition(0, DATA_NODE, TABLE, new IntegerPrimaryKeyIngestPosition(1, 100), + new IntegerPrimaryKeyIngestPosition(1, 101), null, 11, 11, false, null)); + jobItemProgress.getTableCheckRangePositions().add(new TableCheckRangePosition(1, DATA_NODE, TABLE, new IntegerPrimaryKeyIngestPosition(101, 200), + new IntegerPrimaryKeyIngestPosition(101, 203), null, 132, 132, false, null)); ConsistencyCheckJobItemContext actual = new ConsistencyCheckJobItemContext(new ConsistencyCheckJobConfiguration("", "", "DATA_MATCH", null, databaseType), 0, JobStatus.RUNNING, jobItemProgress); - assertThat(actual.getProgressContext().getSourceTableCheckPositions().size(), is(1)); - assertThat(actual.getProgressContext().getTargetTableCheckPositions().size(), is(1)); - assertThat(actual.getProgressContext().getSourceTableCheckPositions().get(TABLE), is(6)); - assertThat(actual.getProgressContext().getTargetTableCheckPositions().get(TABLE), is(5)); + assertThat(actual.getProgressContext().getTableCheckRangePositions().size(), is(2)); + assertTableCheckRangePosition(actual.getProgressContext().getTableCheckRangePositions().get(0), + new TableCheckRangePosition(0, DATA_NODE, TABLE, new IntegerPrimaryKeyIngestPosition(1, 100), new IntegerPrimaryKeyIngestPosition(1, 101), null, 11, 11, false, null)); + assertTableCheckRangePosition(actual.getProgressContext().getTableCheckRangePositions().get(1), + new TableCheckRangePosition(1, DATA_NODE, TABLE, new IntegerPrimaryKeyIngestPosition(101, 200), new IntegerPrimaryKeyIngestPosition(101, 203), null, 132, 132, false, null)); + } + + private void assertTableCheckRangePosition(final TableCheckRangePosition actual, final TableCheckRangePosition expected) { + assertRange(actual.getSourceRange(), expected.getSourceRange()); + assertRange(actual.getTargetRange(), expected.getTargetRange()); + assertThat(actual.getSplittingItem(), is(expected.getSplittingItem())); + assertThat(actual.getSourceDataNode(), is(expected.getSourceDataNode())); + assertThat(actual.getLogicTableName(), is(expected.getLogicTableName())); + assertThat(actual.getSourcePosition(), is(expected.getSourcePosition())); + assertThat(actual.getTargetPosition(), is(expected.getTargetPosition())); + assertThat(actual.isFinished(), is(expected.isFinished())); + } + + private void assertRange(final PrimaryKeyIngestPosition actual, final PrimaryKeyIngestPosition expected) { + assertThat(actual.getClass(), is(expected.getClass())); + assertThat(actual, instanceOf(IntegerPrimaryKeyIngestPosition.class)); + assertThat(((IntegerPrimaryKeyIngestPosition) actual).getBeginValue(), is(((IntegerPrimaryKeyIngestPosition) expected).getBeginValue())); + assertThat(((IntegerPrimaryKeyIngestPosition) actual).getEndValue(), is(((IntegerPrimaryKeyIngestPosition) expected).getEndValue())); } } diff --git a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java index d97f6f6bf2bb3..bc9b35f1b9026 100644 --- a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java +++ b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java @@ -21,17 +21,20 @@ import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker; import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext; +import org.apache.shardingsphere.data.pipeline.core.datanode.DataNodeUtils; import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry; import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException; +import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition; import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; @@ -96,12 +99,28 @@ public Map check(final String algorithm PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager(); TableDataConsistencyChecker tableChecker = TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps)) { PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration(jobConfig.getJobId(), (ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget()); - for (JobDataNodeLine each : jobConfig.getJobShardingDataNodes()) { - if (checkTableInventoryDataUnmatchedAndBreak(each, tableChecker, checkResultMap, dataSourceManager)) { - return checkResultMap.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey().format(), Entry::getValue)); + if (progressContext.getTableCheckRangePositions().isEmpty()) { + progressContext.getTableCheckRangePositions().addAll(splitCrossTables()); + } + for (TableCheckRangePosition each : progressContext.getTableCheckRangePositions()) { + TableDataConsistencyCheckResult checkResult = checkSingleTableInventoryData(each, tableChecker, dataSourceManager); + log.info("checkResult: {}, table: {}, checkRangePosition: {}", checkResult, each.getSourceDataNode(), each); + DataNode dataNode = DataNodeUtils.parseWithSchema(each.getSourceDataNode()); + QualifiedTable sourceTable = new QualifiedTable(dataNode.getSchemaName(), dataNode.getTableName()); + checkResultMap.put(sourceTable, checkResult); + if (checkResult.isIgnored()) { + progressContext.getIgnoredTableNames().add(sourceTable.format()); + log.info("Table '{}' is ignored, ignore type: {}", each.getSourceDataNode(), checkResult.getIgnoredType()); + continue; + } + if (!checkResult.isMatched() && tableChecker.isBreakOnInventoryCheckNotMatched()) { + log.info("Unmatched on table '{}', ignore left tables", each.getSourceDataNode()); + cancel(); + return checkResultMap.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey().toString(), Entry::getValue)); } } } + log.info("check done, jobId={}", jobConfig.getJobId()); return checkResultMap.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey().format(), Entry::getValue)); } @@ -110,42 +129,38 @@ private long getRecordsCount() { return jobProgress.values().stream().filter(Objects::nonNull).mapToLong(TransmissionJobItemProgress::getInventoryRecordsCount).sum(); } - private boolean checkTableInventoryDataUnmatchedAndBreak(final JobDataNodeLine jobDataNodeLine, final TableDataConsistencyChecker tableChecker, - final Map checkResultMap, - final PipelineDataSourceManager dataSourceManager) { - for (JobDataNodeEntry entry : jobDataNodeLine.getEntries()) { - for (DataNode each : entry.getDataNodes()) { - TableDataConsistencyCheckResult checkResult = checkSingleTableInventoryData(entry.getLogicTableName(), each, tableChecker, dataSourceManager); - QualifiedTable sourceTable = new QualifiedTable(each.getSchemaName(), each.getTableName()); - checkResultMap.put(sourceTable, checkResult); - if (checkResult.isIgnored()) { - progressContext.getIgnoredTableNames().add(sourceTable.format()); - log.info("Table '{}' is ignored, ignore type: {}", each.format(), checkResult.getIgnoredType()); - continue; - } - if (!checkResult.isMatched() && tableChecker.isBreakOnInventoryCheckNotMatched()) { - log.info("Unmatched on table '{}', ignore left tables", each.format()); - return true; + private List splitCrossTables() { + List result = new LinkedList<>(); + int splittingItem = 0; + for (JobDataNodeLine each : jobConfig.getJobShardingDataNodes()) { + for (JobDataNodeEntry entry : each.getEntries()) { + for (DataNode dataNode : entry.getDataNodes()) { + result.add(new TableCheckRangePosition(splittingItem++, dataNode.format(), entry.getLogicTableName(), + new UnsupportedKeyIngestPosition(), new UnsupportedKeyIngestPosition(), null)); } } } - return false; + return result; } - private TableDataConsistencyCheckResult checkSingleTableInventoryData(final String targetTableName, final DataNode dataNode, + private TableDataConsistencyCheckResult checkSingleTableInventoryData(final TableCheckRangePosition checkRangePosition, final TableDataConsistencyChecker tableChecker, final PipelineDataSourceManager dataSourceManager) { + log.info("checkSingleTableInventoryData, jobId: {}, checkRangePosition: {}", jobConfig.getJobId(), checkRangePosition); + DataNode dataNode = DataNodeUtils.parseWithSchema(checkRangePosition.getSourceDataNode()); QualifiedTable sourceTable = new QualifiedTable(dataNode.getSchemaName(), dataNode.getTableName()); PipelineDataSource sourceDataSource = dataSourceManager.getDataSource(jobConfig.getSources().get(dataNode.getDataSourceName())); PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(sourceDataSource); PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(dataNode.getSchemaName(), dataNode.getTableName()); ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new PipelineTableDataConsistencyCheckLoadingFailedException(new QualifiedTable(dataNode.getSchemaName(), dataNode.getTableName()))); + String targetTableName = checkRangePosition.getLogicTableName(); List columnNames = tableMetaData.getColumnNames(); List uniqueKeys = PipelineTableMetaDataUtils.getUniqueKeyColumns(sourceTable.getSchemaName(), sourceTable.getTableName(), metaDataLoader); QualifiedTable targetTable = new QualifiedTable(dataNode.getSchemaName(), targetTableName); PipelineDataSource targetDataSource = dataSourceManager.getDataSource(jobConfig.getTarget()); TableInventoryCheckParameter param = new TableInventoryCheckParameter( - jobConfig.getJobId(), sourceDataSource, targetDataSource, sourceTable, targetTable, columnNames, uniqueKeys, readRateLimitAlgorithm, progressContext); + jobConfig.getJobId(), checkRangePosition.getSplittingItem(), sourceDataSource, targetDataSource, sourceTable, targetTable, columnNames, uniqueKeys, + readRateLimitAlgorithm, progressContext, checkRangePosition.getQueryCondition()); TableInventoryChecker tableInventoryChecker = tableChecker.buildTableInventoryChecker(param); currentTableInventoryChecker.set(tableInventoryChecker); Optional preCheckResult = tableInventoryChecker.preCheck(); diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java index ea5d170683db1..72c02e9bf768b 100644 --- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java +++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java @@ -28,10 +28,12 @@ import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter; import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource; +import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition; import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData; @@ -191,6 +193,8 @@ private void assertDataMatched(final PipelineDataSource sourceDataSource, final PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(qualifiedTable.getSchemaName(), qualifiedTable.getTableName()); List uniqueKeys = Collections.singletonList(tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0))); ConsistencyCheckJobItemProgressContext progressContext = new ConsistencyCheckJobItemProgressContext("", 0, sourceDataSource.getDatabaseType().getType()); + progressContext.getTableCheckRangePositions().add(new TableCheckRangePosition(0, null, qualifiedTable.getTableName(), + new UnsupportedKeyIngestPosition(), new UnsupportedKeyIngestPosition(), null)); TableInventoryCheckParameter param = new TableInventoryCheckParameter("", sourceDataSource, targetDataSource, qualifiedTable, qualifiedTable, tableMetaData.getColumnNames(), uniqueKeys, null, progressContext); TableDataConsistencyChecker tableChecker = TypedSPILoader.getService(TableDataConsistencyChecker.class, "DATA_MATCH", new Properties()); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java index fb999a36a4c67..2b397612e4d4a 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java @@ -107,7 +107,7 @@ private static void insertRecord(final PreparedStatement preparedStatement, fina void assertCalculateOfRangeQueryFromBeginWithOrderIdUniqueKey(final String streamingRangeType) { RecordSingleTableInventoryCalculator calculator = new RecordSingleTableInventoryCalculator(4, StreamingRangeType.valueOf(streamingRangeType)); SingleTableInventoryCalculateParameter param = new SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, "t_order"), - Collections.emptyList(), buildOrderIdUniqueKey(), QueryType.RANGE_QUERY); + Collections.emptyList(), buildOrderIdUniqueKey(), QueryType.RANGE_QUERY, null); assertQueryRangeCalculatedResult(calculator, param, new QueryRange(0, false, null), 4, 4); } @@ -116,7 +116,7 @@ void assertCalculateOfRangeQueryFromBeginWithOrderIdUniqueKey(final String strea void assertCalculateOfRangeQueryFromMiddleWithOrderIdUniqueKey(final String streamingRangeType) { RecordSingleTableInventoryCalculator calculator = new RecordSingleTableInventoryCalculator(4, StreamingRangeType.valueOf(streamingRangeType)); SingleTableInventoryCalculateParameter param = new SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, "t_order"), - Collections.emptyList(), buildOrderIdUniqueKey(), QueryType.RANGE_QUERY); + Collections.emptyList(), buildOrderIdUniqueKey(), QueryType.RANGE_QUERY, null); assertQueryRangeCalculatedResult(calculator, param, new QueryRange(4, false, null), 4, 8); } @@ -125,7 +125,7 @@ void assertCalculateOfRangeQueryFromMiddleWithOrderIdUniqueKey(final String stre void assertCalculateOfRangeQueryWithMultiColumnUniqueKeys(final String streamingRangeType) { RecordSingleTableInventoryCalculator calculator = new RecordSingleTableInventoryCalculator(1000, StreamingRangeType.valueOf(streamingRangeType)); SingleTableInventoryCalculateParameter param = new SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, "t_order"), - Collections.emptyList(), buildMultiColumnUniqueKeys(), QueryType.RANGE_QUERY); + Collections.emptyList(), buildMultiColumnUniqueKeys(), QueryType.RANGE_QUERY, null); assertQueryRangeCalculatedResult(calculator, param, new QueryRange(3, true, 6), 8, 6); assertQueryRangeCalculatedResult(calculator, param, new QueryRange(3, false, 6), 3, 6); } @@ -153,7 +153,7 @@ void assertCalculateOfRangeQueryWithMultiColumnUniqueKeys2(final String streamin } RecordSingleTableInventoryCalculator calculator = new RecordSingleTableInventoryCalculator(1000, StreamingRangeType.valueOf(streamingRangeType)); SingleTableInventoryCalculateParameter param = new SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, "test3"), - Collections.emptyList(), buildMultiColumnUniqueKeys(), QueryType.RANGE_QUERY); + Collections.emptyList(), buildMultiColumnUniqueKeys(), QueryType.RANGE_QUERY, null); assertQueryRangeCalculatedResult(calculator, param, new QueryRange(3, true, 4), 4, 4); assertQueryRangeCalculatedResult(calculator, param, new QueryRange(5, true, 6), 4, 6); assertQueryRangeCalculatedResult(calculator, param, new QueryRange(5, true, 7), 5, 7); @@ -164,7 +164,7 @@ void assertCalculateOfRangeQueryWithMultiColumnUniqueKeys2(final String streamin void assertCalculateOfReservedRangeQueryWithMultiColumnUniqueKeys(final String streamingRangeType) { RecordSingleTableInventoryCalculator calculator = new RecordSingleTableInventoryCalculator(1000, StreamingRangeType.valueOf(streamingRangeType)); SingleTableInventoryCalculateParameter param = new SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, "t_order"), - Collections.emptyList(), buildMultiColumnUniqueKeys(), QueryType.RANGE_QUERY); + Collections.emptyList(), buildMultiColumnUniqueKeys(), QueryType.RANGE_QUERY, null); param.setQueryRange(new QueryRange(3, true, 2)); Optional calculatedResult = calculator.calculateChunk(param); QuietlyCloser.close(param.getCalculationContext()); @@ -179,7 +179,7 @@ void assertCalculateOfRangeQueryOnEmptyTableWithSingleColumnUniqueKey(final Stri connection.createStatement().execute("CREATE TABLE test1 (user_id INT NOT NULL, order_id INT, status VARCHAR(12), PRIMARY KEY (user_id))"); } SingleTableInventoryCalculateParameter param = new SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, "test1"), - Collections.emptyList(), buildUserIdUniqueKey(), QueryType.RANGE_QUERY); + Collections.emptyList(), buildUserIdUniqueKey(), QueryType.RANGE_QUERY, null); param.setQueryRange(new QueryRange(0, false, null)); RecordSingleTableInventoryCalculator calculator = new RecordSingleTableInventoryCalculator(5, StreamingRangeType.valueOf(streamingRangeType)); Optional calculateResult = calculator.calculateChunk(param); @@ -195,7 +195,7 @@ void assertCalculateOfRangeQueryOnEmptyTableWithMultiColumnUniqueKeys(final Stri connection.createStatement().execute("CREATE TABLE test2 (user_id INT NOT NULL, order_id INT, status VARCHAR(12), PRIMARY KEY (user_id, order_id))"); } SingleTableInventoryCalculateParameter param = new SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, "test2"), - Collections.emptyList(), buildMultiColumnUniqueKeys(), QueryType.RANGE_QUERY); + Collections.emptyList(), buildMultiColumnUniqueKeys(), QueryType.RANGE_QUERY, null); param.setQueryRange(new QueryRange(null, false, null)); RecordSingleTableInventoryCalculator calculator = new RecordSingleTableInventoryCalculator(5, StreamingRangeType.valueOf(streamingRangeType)); Optional calculateResult = calculator.calculateChunk(param); @@ -208,7 +208,7 @@ void assertCalculateOfRangeQueryOnEmptyTableWithMultiColumnUniqueKeys(final Stri void assertCalculateOfRangeQueryAllWithOrderIdUniqueKeyWith3x(final int streamingChunkCount, final String streamingRangeType) { RecordSingleTableInventoryCalculator calculator = new RecordSingleTableInventoryCalculator(3, streamingChunkCount, StreamingRangeType.valueOf(streamingRangeType)); SingleTableInventoryCalculateParameter param = new SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, "t_order"), - Collections.emptyList(), buildOrderIdUniqueKey(), QueryType.RANGE_QUERY); + Collections.emptyList(), buildOrderIdUniqueKey(), QueryType.RANGE_QUERY, null); param.setQueryRange(new QueryRange(null, false, null)); Iterator resultIterator = calculator.calculate(param).iterator(); RecordSingleTableInventoryCalculatedResult actual = (RecordSingleTableInventoryCalculatedResult) resultIterator.next(); @@ -246,7 +246,7 @@ void assertCalculateOfRangeQueryAllWithOrderIdUniqueKeyWith3x(final int streamin void assertCalculateOfRangeQueryAllWithMultiColumnUniqueKeysWith50x100(final String streamingRangeType) { RecordSingleTableInventoryCalculator calculator = new RecordSingleTableInventoryCalculator(50, 100, StreamingRangeType.valueOf(streamingRangeType)); SingleTableInventoryCalculateParameter param = new SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, "t_order"), - Collections.emptyList(), buildMultiColumnUniqueKeys(), QueryType.RANGE_QUERY); + Collections.emptyList(), buildMultiColumnUniqueKeys(), QueryType.RANGE_QUERY, null); param.setQueryRange(new QueryRange(null, false, null)); Iterator resultIterator = calculator.calculate(param).iterator(); RecordSingleTableInventoryCalculatedResult actual = (RecordSingleTableInventoryCalculatedResult) resultIterator.next(); @@ -272,7 +272,7 @@ void assertCalculateOfRangeQueryAllWithMultiColumnUniqueKeysWith50x100(final Str void assertCalculateOfRangeQueryAllWithMultiColumnUniqueKeysWith3x(final int streamingChunkCount, final String streamingRangeType) { RecordSingleTableInventoryCalculator calculator = new RecordSingleTableInventoryCalculator(3, streamingChunkCount, StreamingRangeType.valueOf(streamingRangeType)); SingleTableInventoryCalculateParameter param = new SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, "t_order"), - Collections.emptyList(), buildMultiColumnUniqueKeys(), QueryType.RANGE_QUERY); + Collections.emptyList(), buildMultiColumnUniqueKeys(), QueryType.RANGE_QUERY, null); param.setQueryRange(new QueryRange(null, false, null)); Iterator resultIterator = calculator.calculate(param).iterator(); RecordSingleTableInventoryCalculatedResult actual = (RecordSingleTableInventoryCalculatedResult) resultIterator.next(); @@ -310,7 +310,7 @@ void assertCalculateOfRangeQueryAllWithMultiColumnUniqueKeysWith3x(final int str void assertCalculateOfRangeQueryAllWithMultiColumnUniqueKeysWith2x(final int streamingChunkCount, final String streamingRangeType) { RecordSingleTableInventoryCalculator calculator = new RecordSingleTableInventoryCalculator(2, streamingChunkCount, StreamingRangeType.valueOf(streamingRangeType)); SingleTableInventoryCalculateParameter param = new SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, "t_order"), - Collections.emptyList(), buildMultiColumnUniqueKeys(), QueryType.RANGE_QUERY); + Collections.emptyList(), buildMultiColumnUniqueKeys(), QueryType.RANGE_QUERY, null); param.setQueryRange(new QueryRange(null, false, null)); Iterator resultIterator = calculator.calculate(param).iterator(); RecordSingleTableInventoryCalculatedResult actual = (RecordSingleTableInventoryCalculatedResult) resultIterator.next(); @@ -357,7 +357,7 @@ private void assertRecord(final Map recordMap, final int userId, void assertCalculateOfPointQuery(final String streamingRangeType) { RecordSingleTableInventoryCalculator calculator = new RecordSingleTableInventoryCalculator(3, StreamingRangeType.valueOf(streamingRangeType)); SingleTableInventoryCalculateParameter param = new SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, "t_order"), - Collections.emptyList(), buildMultiColumnUniqueKeys(), QueryType.POINT_QUERY); + Collections.emptyList(), buildMultiColumnUniqueKeys(), QueryType.POINT_QUERY, null); param.setUniqueKeysValues(Arrays.asList(3, 3)); Optional calculatedResult = calculator.calculateChunk(param); QuietlyCloser.close(param.getCalculationContext()); @@ -373,7 +373,7 @@ void assertCalculateOfPointQuery(final String streamingRangeType) { void assertCalculateOfPointRangeQuery(final String streamingRangeType) { RecordSingleTableInventoryCalculator calculator = new RecordSingleTableInventoryCalculator(3, StreamingRangeType.valueOf(streamingRangeType)); SingleTableInventoryCalculateParameter param = new SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, "t_order"), - Collections.emptyList(), buildUserIdUniqueKey(), QueryType.POINT_QUERY); + Collections.emptyList(), buildUserIdUniqueKey(), QueryType.POINT_QUERY, null); param.setUniqueKeysValues(Collections.singleton(3)); Optional calculatedResult = calculator.calculateChunk(param); QuietlyCloser.close(param.getCalculationContext()); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java index d34d41400aeab..32c4db66fd488 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java @@ -17,7 +17,10 @@ package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml.YamlTableCheckRangePosition; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml.YamlTableCheckRangePositionSwapper; import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey; +import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition; import org.apache.shardingsphere.data.pipeline.core.job.JobStatus; import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils; @@ -36,8 +39,9 @@ import org.junit.jupiter.api.Test; import java.util.Collections; -import java.util.Map; +import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -53,23 +57,39 @@ static void beforeClass() { void assertBuildPipelineJobItemContext() { ConsistencyCheckJobId pipelineJobId = new ConsistencyCheckJobId(new PipelineContextKey(InstanceType.PROXY), JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId()); String checkJobId = PipelineJobIdUtils.marshal(pipelineJobId); - Map expectTableCheckPosition = Collections.singletonMap("t_order", 100); + List expectedYamlTableCheckRangePositions = Collections.singletonList(createYamlTableCheckRangePosition()); PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId, 0, - YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition))); + YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectedYamlTableCheckRangePositions))); ConsistencyCheckJobExecutorCallback callback = new ConsistencyCheckJobExecutorCallback(); ConsistencyCheckJobConfiguration jobConfig = new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(createYamlConsistencyCheckJobConfiguration(checkJobId)); PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(new ConsistencyCheckJobType().getYamlJobItemProgressSwapper()); Optional jobItemProgress = jobItemManager.getProgress(jobConfig.getJobId(), 0); ConsistencyCheckJobItemContext actual = callback.buildJobItemContext(jobConfig, 0, jobItemProgress.orElse(null), null, null); - assertThat(actual.getProgressContext().getSourceTableCheckPositions(), is(expectTableCheckPosition)); - assertThat(actual.getProgressContext().getTargetTableCheckPositions(), is(expectTableCheckPosition)); + YamlTableCheckRangePositionSwapper tableCheckPositionSwapper = new YamlTableCheckRangePositionSwapper(); + List actualYamlTableCheckPositions = actual.getProgressContext().getTableCheckRangePositions().stream() + .map(tableCheckPositionSwapper::swapToYamlConfiguration).collect(Collectors.toList()); + assertThat(actualYamlTableCheckPositions.size(), is(expectedYamlTableCheckRangePositions.size())); + for (int i = 0; i < actualYamlTableCheckPositions.size(); i++) { + assertThat(actualYamlTableCheckPositions.get(i), is(expectedYamlTableCheckRangePositions.get(i))); + } } - private YamlConsistencyCheckJobItemProgress createYamlConsistencyCheckJobItemProgress(final Map expectTableCheckPosition) { + private YamlTableCheckRangePosition createYamlTableCheckRangePosition() { + YamlTableCheckRangePosition result = new YamlTableCheckRangePosition(); + result.setSplittingItem(0); + result.setSourceDataNode("ds_0.t_order"); + result.setLogicTableName("t_order"); + result.setSourceRange(new UnsupportedKeyIngestPosition().toString()); + result.setTargetRange(new UnsupportedKeyIngestPosition().toString()); + result.setSourcePosition(100); + result.setTargetPosition(100); + return result; + } + + private YamlConsistencyCheckJobItemProgress createYamlConsistencyCheckJobItemProgress(final List yamlTableCheckRangePositions) { YamlConsistencyCheckJobItemProgress result = new YamlConsistencyCheckJobItemProgress(); result.setStatus(JobStatus.RUNNING.name()); - result.setSourceTableCheckPositions(expectTableCheckPosition); - result.setTargetTableCheckPositions(expectTableCheckPosition); + result.setTableCheckRangePositions(yamlTableCheckRangePositions); return result; }