Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.List;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -54,9 +55,7 @@ public final class ConsistencyCheckJobItemProgressContext implements PipelineJob

private volatile Long checkEndTimeMillis;

private final Map<String, Object> sourceTableCheckPositions = new ConcurrentHashMap<>();

private final Map<String, Object> targetTableCheckPositions = new ConcurrentHashMap<>();
private final List<TableCheckRangePosition> tableCheckRangePositions = new ArrayList<>();

private final String sourceDatabaseType;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.consistencycheck.position;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;

/**
* Table check range position.
*/
@RequiredArgsConstructor
@AllArgsConstructor
@Getter
@Setter
@ToString
public final class TableCheckRangePosition {

private final Integer splittingItem;

private final String sourceDataNode;

private final String logicTableName;

private final PrimaryKeyIngestPosition<?> sourceRange;

private final PrimaryKeyIngestPosition<?> targetRange;

private final String queryCondition;

private volatile Object sourcePosition;

private volatile Object targetPosition;

private volatile boolean finished;

private volatile Boolean matched;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;

/**
* Yaml table check range position.
*/
@Getter
@Setter
@EqualsAndHashCode
public final class YamlTableCheckRangePosition implements YamlConfiguration {

private Integer splittingItem;

private String sourceDataNode;

private String logicTableName;

private String sourceRange;

private String targetRange;

private String queryCondition;

private Object sourcePosition;

private Object targetPosition;

private boolean finished;

private Boolean matched;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml;

import org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPositionFactory;
import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;

/**
* Yaml table check range position swapper.
*/
public final class YamlTableCheckRangePositionSwapper implements YamlConfigurationSwapper<YamlTableCheckRangePosition, TableCheckRangePosition> {

@Override
public YamlTableCheckRangePosition swapToYamlConfiguration(final TableCheckRangePosition data) {
YamlTableCheckRangePosition result = new YamlTableCheckRangePosition();
result.setSplittingItem(data.getSplittingItem());
result.setSourceDataNode(data.getSourceDataNode());
result.setLogicTableName(data.getLogicTableName());
result.setSourceRange(data.getSourceRange().toString());
result.setTargetRange(data.getTargetRange().toString());
result.setQueryCondition(data.getQueryCondition());
result.setSourcePosition(data.getSourcePosition());
result.setTargetPosition(data.getTargetPosition());
result.setFinished(data.isFinished());
result.setMatched(data.getMatched());
return result;
}

@Override
public TableCheckRangePosition swapToObject(final YamlTableCheckRangePosition yamlConfig) {
return new TableCheckRangePosition(yamlConfig.getSplittingItem(), yamlConfig.getSourceDataNode(), yamlConfig.getLogicTableName(),
PrimaryKeyIngestPositionFactory.newInstance(yamlConfig.getSourceRange()), PrimaryKeyIngestPositionFactory.newInstance(yamlConfig.getTargetRange()),
yamlConfig.getQueryCondition(), yamlConfig.getSourcePosition(), yamlConfig.getTargetPosition(), yamlConfig.isFinished(), yamlConfig.getMatched());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
Expand All @@ -31,18 +32,14 @@
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import org.apache.shardingsphere.infra.exception.external.sql.type.kernel.category.PipelineSQLException;
import org.apache.shardingsphere.infra.exception.external.sql.type.wrapper.SQLWrapperException;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;

import java.sql.SQLException;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -78,18 +75,21 @@ public TableDataConsistencyCheckResult checkSingleTableInventoryData() {

private TableDataConsistencyCheckResult checkSingleTableInventoryData(final TableInventoryCheckParameter param, final ThreadPoolExecutor executor) {
SingleTableInventoryCalculateParameter sourceParam = new SingleTableInventoryCalculateParameter(param.getSourceDataSource(), param.getSourceTable(),
param.getColumnNames(), param.getUniqueKeys(), QueryType.RANGE_QUERY);
sourceParam.setQueryRange(new QueryRange(param.getProgressContext().getSourceTableCheckPositions().get(param.getSourceTable().getTableName()), true, null));
param.getColumnNames(), param.getUniqueKeys(), QueryType.RANGE_QUERY, param.getQueryCondition());
TableCheckRangePosition checkRangePosition = param.getProgressContext().getTableCheckRangePositions().get(param.getSplittingItem());
sourceParam.setQueryRange(new QueryRange(null != checkRangePosition.getSourcePosition() ? checkRangePosition.getSourcePosition() : checkRangePosition.getSourceRange().getBeginValue(),
true, checkRangePosition.getSourceRange().getEndValue()));
SingleTableInventoryCalculateParameter targetParam = new SingleTableInventoryCalculateParameter(param.getTargetDataSource(), param.getTargetTable(),
param.getColumnNames(), param.getUniqueKeys(), QueryType.RANGE_QUERY);
targetParam.setQueryRange(new QueryRange(param.getProgressContext().getTargetTableCheckPositions().get(param.getTargetTable().getTableName()), true, null));
param.getColumnNames(), param.getUniqueKeys(), QueryType.RANGE_QUERY, param.getQueryCondition());
targetParam.setQueryRange(new QueryRange(null != checkRangePosition.getTargetPosition() ? checkRangePosition.getTargetPosition() : checkRangePosition.getTargetRange().getBeginValue(),
true, checkRangePosition.getTargetRange().getEndValue()));
SingleTableInventoryCalculator sourceCalculator = buildSingleTableInventoryCalculator();
this.sourceCalculator = sourceCalculator;
SingleTableInventoryCalculator targetCalculator = buildSingleTableInventoryCalculator();
this.targetCalculator = targetCalculator;
try {
Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults = waitFuture(executor.submit(() -> sourceCalculator.calculate(sourceParam))).iterator();
Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults = waitFuture(executor.submit(() -> targetCalculator.calculate(targetParam))).iterator();
Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults = PipelineTaskUtils.waitFuture(executor.submit(() -> sourceCalculator.calculate(sourceParam))).iterator();
Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults = PipelineTaskUtils.waitFuture(executor.submit(() -> targetCalculator.calculate(targetParam))).iterator();
return checkSingleTableInventoryData(sourceCalculatedResults, targetCalculatedResults, param, executor);
} finally {
QuietlyCloser.close(sourceParam.getCalculationContext());
Expand All @@ -107,46 +107,31 @@ private TableDataConsistencyCheckResult checkSingleTableInventoryData(final Iter
if (null != param.getReadRateLimitAlgorithm()) {
param.getReadRateLimitAlgorithm().intercept(PipelineSQLOperationType.SELECT, 1);
}
SingleTableInventoryCalculatedResult sourceCalculatedResult = waitFuture(executor.submit(sourceCalculatedResults::next));
SingleTableInventoryCalculatedResult targetCalculatedResult = waitFuture(executor.submit(targetCalculatedResults::next));
SingleTableInventoryCalculatedResult sourceCalculatedResult = PipelineTaskUtils.waitFuture(executor.submit(sourceCalculatedResults::next));
SingleTableInventoryCalculatedResult targetCalculatedResult = PipelineTaskUtils.waitFuture(executor.submit(targetCalculatedResults::next));
if (!Objects.equals(sourceCalculatedResult, targetCalculatedResult)) {
checkResult.setMatched(false);
log.info("content matched false, jobId={}, sourceTable={}, targetTable={}, uniqueKeys={}", param.getJobId(), param.getSourceTable(), param.getTargetTable(), param.getUniqueKeys());
break;
}
TableCheckRangePosition checkRangePosition = param.getProgressContext().getTableCheckRangePositions().get(param.getSplittingItem());
if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
param.getProgressContext().getSourceTableCheckPositions().put(param.getSourceTable().getTableName(), sourceCalculatedResult.getMaxUniqueKeyValue().get());
checkRangePosition.setSourcePosition(sourceCalculatedResult.getMaxUniqueKeyValue().get());
}
if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
param.getProgressContext().getTargetTableCheckPositions().put(param.getTargetTable().getTableName(), targetCalculatedResult.getMaxUniqueKeyValue().get());
checkRangePosition.setTargetPosition(targetCalculatedResult.getMaxUniqueKeyValue().get());
}
param.getProgressContext().onProgressUpdated(new PipelineJobUpdateProgress(sourceCalculatedResult.getRecordsCount()));
}
if (sourceCalculatedResults.hasNext()) {
TableCheckRangePosition checkRangePosition = param.getProgressContext().getTableCheckRangePositions().get(param.getSplittingItem());
checkRangePosition.setFinished(true);
if (sourceCalculatedResults.hasNext() || targetCalculatedResults.hasNext()) {
checkResult.setMatched(false);
return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
}
if (targetCalculatedResults.hasNext()) {
checkResult.setMatched(false);
return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
}
checkRangePosition.setMatched(checkResult.isMatched());
return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
}

private <T> T waitFuture(final Future<T> future) {
try {
return future.get();
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
throw new SQLWrapperException(new SQLException(ex));
} catch (final ExecutionException ex) {
if (ex.getCause() instanceof PipelineSQLException) {
throw (PipelineSQLException) ex.getCause();
}
throw new SQLWrapperException(new SQLException(ex));
}
}

protected abstract SingleTableInventoryCalculator buildSingleTableInventoryCalculator();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public final class TableInventoryCheckParameter {

private final String jobId;

private final int splittingItem;

private final PipelineDataSource sourceDataSource;

private final PipelineDataSource targetDataSource;
Expand All @@ -51,4 +53,14 @@ public final class TableInventoryCheckParameter {
private final JobRateLimitAlgorithm readRateLimitAlgorithm;

private final ConsistencyCheckJobItemProgressContext progressContext;

private final String queryCondition;

public TableInventoryCheckParameter(final String jobId, final PipelineDataSource sourceDataSource, final PipelineDataSource targetDataSource,
final QualifiedTable sourceTable, final QualifiedTable targetTable,
final List<String> columnNames, final List<PipelineColumnMetaData> uniqueKeys,
final JobRateLimitAlgorithm readRateLimitAlgorithm, final ConsistencyCheckJobItemProgressContext progressContext) {
this(jobId, 0, sourceDataSource, targetDataSource, sourceTable, targetTable, columnNames, uniqueKeys, readRateLimitAlgorithm, progressContext,
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ private void setParameters(final PreparedStatement preparedStatement, final Sing

private SingleTableInventoryCalculateParameter buildPointRangeQueryCalculateParameter(final SingleTableInventoryCalculateParameter param, final Object uniqueKeyValue) {
SingleTableInventoryCalculateParameter result = new SingleTableInventoryCalculateParameter(param.getDataSource(), param.getTable(), param.getColumnNames(),
Collections.singletonList(param.getFirstUniqueKey()), QueryType.POINT_QUERY);
Collections.singletonList(param.getFirstUniqueKey()), QueryType.POINT_QUERY, param.getQueryCondition());
result.setUniqueKeysValues(Collections.singletonList(uniqueKeyValue));
result.setShardingColumnsNames(param.getShardingColumnsNames());
result.setShardingColumnsValues(param.getShardingColumnsValues());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public final class SingleTableInventoryCalculateParameter {

private final QueryType queryType;

private final String queryCondition;

/**
* Get database type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.ArrayList;
import java.util.List;

/**
* Data consistency check job item progress.
Expand All @@ -48,9 +49,7 @@ public final class ConsistencyCheckJobItemProgress implements PipelineJobItemPro

private final Long checkEndTimeMillis;

private final Map<String, Object> sourceTableCheckPositions = new LinkedHashMap<>();

private final Map<String, Object> targetTableCheckPositions = new LinkedHashMap<>();
private final List<TableCheckRangePosition> tableCheckRangePositions = new ArrayList<>();

private final String sourceDatabaseType;

Expand All @@ -64,8 +63,7 @@ public ConsistencyCheckJobItemProgress(final ConsistencyCheckJobItemProgressCont
recordsCount = context.getRecordsCount();
checkBeginTimeMillis = context.getCheckBeginTimeMillis();
checkEndTimeMillis = context.getCheckEndTimeMillis();
sourceTableCheckPositions.putAll(context.getSourceTableCheckPositions());
targetTableCheckPositions.putAll(context.getTargetTableCheckPositions());
tableCheckRangePositions.addAll(context.getTableCheckRangePositions());
sourceDatabaseType = context.getSourceDatabaseType();
}
}
Loading