Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,25 @@
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
import org.apache.amoro.server.process.TableProcessMeta;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.table.cleanup.CleanupOperation;
import org.apache.amoro.server.utils.SnowflakeIdGenerator;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.amoro.utils.ExceptionUtil;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
Expand All @@ -48,6 +56,12 @@ public abstract class PeriodicTableScheduler extends RuntimeHandlerChain {
protected final Logger logger = LoggerFactory.getLogger(getClass());

private static final long START_DELAY = 10 * 1000L;
private static final String CLEANUP_EXECUTION_ENGINE = "AMORO";
private static final String CLEANUP_PROCESS_STAGE = "CLEANUP";
private static final String EXTERNAL_PROCESS_IDENTIFIER = "";
private static final SnowflakeIdGenerator ID_GENERATOR = new SnowflakeIdGenerator();

private final PersistenceHelper persistenceHelper = new PersistenceHelper();

protected final Set<ServerTableIdentifier> scheduledTables =
Collections.synchronizedSet(new HashSet<>());
Expand Down Expand Up @@ -123,16 +137,30 @@ private void scheduleTableExecution(TableRuntime tableRuntime, long delay) {
}

private void executeTask(TableRuntime tableRuntime) {
TableProcessMeta cleanProcessMeta = null;
CleanupOperation cleanupOperation = null;
Throwable executionError = null;

try {
if (isExecutable(tableRuntime)) {
cleanupOperation = getCleanupOperation();
cleanProcessMeta = createCleanupProcessInfo(tableRuntime, cleanupOperation);

execute(tableRuntime);
// Different tables take different amounts of time to execute the end of execute(),
// so you need to perform the update operation separately for each table.
persistUpdatingCleanupTime(tableRuntime);
}
} catch (Exception e) {
logger.error("exception when schedule for table: {}", tableRuntime.getTableIdentifier(), e);
} catch (Throwable t) {
executionError = t;
Comment thread
zhangwl9 marked this conversation as resolved.
logger.error(
"Failed to execute cleanup operation {} for table {}",
cleanupOperation,
tableRuntime.getTableIdentifier(),
t);
} finally {
persistCleanupResult(tableRuntime, cleanupOperation, cleanProcessMeta, executionError);

scheduledTables.remove(tableRuntime.getTableIdentifier());
scheduleIfNecessary(tableRuntime, getNextExecutingTime(tableRuntime));
}
Expand Down Expand Up @@ -178,6 +206,124 @@ private void persistUpdatingCleanupTime(TableRuntime tableRuntime) {
}
}

@VisibleForTesting
protected TableProcessMeta createCleanupProcessInfo(
TableRuntime tableRuntime, CleanupOperation cleanupOperation) {

if (shouldSkipOperation(tableRuntime, cleanupOperation)) {
return null;
}

TableProcessMeta cleanProcessMeta = buildProcessMeta(tableRuntime, cleanupOperation);
persistenceHelper.beginAndPersistCleanupProcess(cleanProcessMeta);

logger.debug(
"Successfully persist cleanup process [processId={}, tableId={}, processType={}]",
cleanProcessMeta.getProcessId(),
cleanProcessMeta.getTableId(),
cleanProcessMeta.getProcessType());

return cleanProcessMeta;
}

@VisibleForTesting
protected void persistCleanupResult(
TableRuntime tableRuntime,
CleanupOperation cleanupOperation,
TableProcessMeta cleanProcessMeta,
Throwable executionError) {

Comment thread
zhangwl9 marked this conversation as resolved.
if (cleanProcessMeta == null) {
return;
}

if (executionError != null) {
cleanProcessMeta.setStatus(ProcessStatus.FAILED);
cleanProcessMeta.setFailMessage(ExceptionUtil.getErrorMessage(executionError, 4000));
} else {
Comment thread
zhangwl9 marked this conversation as resolved.
cleanProcessMeta.setStatus(ProcessStatus.SUCCESS);
}

long endTime = System.currentTimeMillis();
persistenceHelper.persistAndSetCompleted(
tableRuntime, cleanupOperation, cleanProcessMeta, endTime);

logger.debug(
"Successfully updated lastCleanTime and cleanupProcess for table {} with cleanup operation {}",
tableRuntime.getTableIdentifier().getTableName(),
cleanupOperation);
}

private TableProcessMeta buildProcessMeta(
TableRuntime tableRuntime, CleanupOperation cleanupOperation) {

TableProcessMeta cleanProcessMeta = new TableProcessMeta();
cleanProcessMeta.setTableId(tableRuntime.getTableIdentifier().getId());
cleanProcessMeta.setProcessId(ID_GENERATOR.generateId());
cleanProcessMeta.setExternalProcessIdentifier(EXTERNAL_PROCESS_IDENTIFIER);
cleanProcessMeta.setStatus(ProcessStatus.RUNNING);
cleanProcessMeta.setProcessType(cleanupOperation.name());
cleanProcessMeta.setProcessStage(CLEANUP_PROCESS_STAGE);
cleanProcessMeta.setExecutionEngine(CLEANUP_EXECUTION_ENGINE);
cleanProcessMeta.setRetryNumber(0);
cleanProcessMeta.setCreateTime(System.currentTimeMillis());
cleanProcessMeta.setProcessParameters(new HashMap<>());
cleanProcessMeta.setSummary(new HashMap<>());

return cleanProcessMeta;
}

private static class PersistenceHelper extends PersistentBase {

public PersistenceHelper() {}

private void beginAndPersistCleanupProcess(TableProcessMeta meta) {
doAsTransaction(
() ->
doAs(
TableProcessMapper.class,
mapper ->
mapper.insertProcess(
meta.getTableId(),
meta.getProcessId(),
meta.getExternalProcessIdentifier(),
meta.getStatus(),
meta.getProcessType(),
meta.getProcessStage(),
meta.getExecutionEngine(),
meta.getRetryNumber(),
meta.getCreateTime(),
meta.getProcessParameters(),
meta.getSummary())));
}

private void persistAndSetCompleted(
TableRuntime tableRuntime,
CleanupOperation cleanupOperation,
TableProcessMeta meta,
long endTime) {

doAsTransaction(
() ->
doAs(
TableProcessMapper.class,
mapper ->
mapper.updateProcess(
meta.getTableId(),
meta.getProcessId(),
meta.getExternalProcessIdentifier(),
meta.getStatus(),
meta.getProcessStage(),
meta.getRetryNumber(),
endTime,
meta.getFailMessage(),
meta.getProcessParameters(),
meta.getSummary())),
() ->
((DefaultTableRuntime) tableRuntime).updateLastCleanTime(cleanupOperation, endTime));
Comment on lines +322 to +323
Copy link

Copilot AI Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

persistAndSetCompleted always updates lastCleanTime regardless of whether the process meta ended in SUCCESS or FAILED. Given persistCleanupResult sets FAILED on errors, this will still advance lastCleanTime on failures, which is likely incorrect for scheduling/visibility. Consider moving updateLastCleanTime behind a success-only condition, or passing a flag/status into this helper to decide whether lastCleanTime should be updated.

Suggested change
() ->
((DefaultTableRuntime) tableRuntime).updateLastCleanTime(cleanupOperation, endTime));
() -> {
if (meta.getStatus() == ProcessStatus.SUCCESS) {
((DefaultTableRuntime) tableRuntime)
.updateLastCleanTime(cleanupOperation, endTime);
}
});

Copilot uses AI. Check for mistakes.
}
}

/**
* Get cleanup operation. Default is NONE, subclasses should override this method to provide
* specific operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,41 @@
package org.apache.amoro.server.scheduler.inline;

import org.apache.amoro.TableRuntime;
import org.apache.amoro.server.process.TableProcessMeta;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.table.cleanup.CleanupOperation;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

/**
* Test table executor implementation for testing PeriodicTableScheduler functionality. This class
* allows configuration of cleanup operations and enabled state for testing purposes.
*/
class PeriodicTableSchedulerTestBase extends PeriodicTableScheduler {
private final CleanupOperation cleanupOperation;
private final boolean enabled;
private final RuntimeException executionException;
private static final long SNAPSHOTS_EXPIRING_INTERVAL = 60 * 60 * 1000L; // 1 hour
private static final long ORPHAN_FILES_CLEANING_INTERVAL = 24 * 60 * 60 * 1000L; // 1 day
private static final long DANGLING_DELETE_FILES_CLEANING_INTERVAL = 24 * 60 * 60 * 1000L;
private static final long DATA_EXPIRING_INTERVAL = 60 * 60 * 1000L; // 1 hour

public PeriodicTableSchedulerTestBase(
TableService tableService, CleanupOperation cleanupOperation, boolean enabled) {
this(tableService, cleanupOperation, enabled, null);
}

public PeriodicTableSchedulerTestBase(
TableService tableService,
CleanupOperation cleanupOperation,
boolean enabled,
RuntimeException executionException) {
super(tableService, 1);
this.cleanupOperation = cleanupOperation;
this.enabled = enabled;
this.executionException = executionException;
}

@Override
Expand All @@ -59,7 +73,9 @@ protected boolean enabled(TableRuntime tableRuntime) {

@Override
protected void execute(TableRuntime tableRuntime) {
// Do nothing in test
if (executionException != null) {
throw executionException;
}
}

@Override
Expand Down Expand Up @@ -88,4 +104,37 @@ public boolean shouldExecuteTaskForTest(
TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
return shouldExecuteTask(tableRuntime, cleanupOperation);
}

public TableProcessMeta createCleanupProcessInfoForTest(
TableRuntime tableRuntime, CleanupOperation operation) {
return createCleanupProcessInfo(tableRuntime, operation);
}

public void persistCleanupResultForTest(
TableRuntime tableRuntime,
CleanupOperation operation,
TableProcessMeta meta,
Throwable error) {
persistCleanupResult(tableRuntime, operation, meta, error);
}

public void executeTaskForTest(TableRuntime tableRuntime) {
try {
Method executeTask =
PeriodicTableScheduler.class.getDeclaredMethod("executeTask", TableRuntime.class);
executeTask.setAccessible(true);
executeTask.invoke(this, tableRuntime);
} catch (InvocationTargetException e) {
Throwable cause = e.getCause();
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
if (cause instanceof Error) {
throw (Error) cause;
}
throw new RuntimeException(cause);
} catch (ReflectiveOperationException e) {
throw new RuntimeException("Failed to invoke executeTask for test", e);
}
}
Comment on lines +121 to +139
Copy link

Copilot AI Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test relies on reflective access to a private method (executeTask) via setAccessible(true), which is brittle under stricter JVM access rules and can fail in certain build/test environments. Consider making executeTask protected (or package-private) and annotating it with @VisibleForTesting, similar to createCleanupProcessInfo/persistCleanupResult, so tests can call it directly without reflection.

Copilot uses AI. Check for mistakes.
}
Loading
Loading