-
Notifications
You must be signed in to change notification settings - Fork 382
[AMORO-4048] Saving cleanup opeartion process info in table_process #4077
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -24,17 +24,25 @@ | |||||||||||||||||
| import org.apache.amoro.ServerTableIdentifier; | ||||||||||||||||||
| import org.apache.amoro.TableRuntime; | ||||||||||||||||||
| import org.apache.amoro.config.TableConfiguration; | ||||||||||||||||||
| import org.apache.amoro.process.ProcessStatus; | ||||||||||||||||||
| import org.apache.amoro.server.optimizing.OptimizingStatus; | ||||||||||||||||||
| import org.apache.amoro.server.persistence.PersistentBase; | ||||||||||||||||||
| import org.apache.amoro.server.persistence.mapper.TableProcessMapper; | ||||||||||||||||||
| import org.apache.amoro.server.process.TableProcessMeta; | ||||||||||||||||||
| import org.apache.amoro.server.table.DefaultTableRuntime; | ||||||||||||||||||
| import org.apache.amoro.server.table.RuntimeHandlerChain; | ||||||||||||||||||
| import org.apache.amoro.server.table.TableService; | ||||||||||||||||||
| import org.apache.amoro.server.table.cleanup.CleanupOperation; | ||||||||||||||||||
| import org.apache.amoro.server.utils.SnowflakeIdGenerator; | ||||||||||||||||||
| import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; | ||||||||||||||||||
| import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; | ||||||||||||||||||
| import org.apache.amoro.utils.ExceptionUtil; | ||||||||||||||||||
| import org.apache.commons.lang3.StringUtils; | ||||||||||||||||||
| import org.slf4j.Logger; | ||||||||||||||||||
| import org.slf4j.LoggerFactory; | ||||||||||||||||||
|
|
||||||||||||||||||
| import java.util.Collections; | ||||||||||||||||||
| import java.util.HashMap; | ||||||||||||||||||
| import java.util.HashSet; | ||||||||||||||||||
| import java.util.List; | ||||||||||||||||||
| import java.util.Locale; | ||||||||||||||||||
|
|
@@ -48,6 +56,12 @@ public abstract class PeriodicTableScheduler extends RuntimeHandlerChain { | |||||||||||||||||
| protected final Logger logger = LoggerFactory.getLogger(getClass()); | ||||||||||||||||||
|
|
||||||||||||||||||
| private static final long START_DELAY = 10 * 1000L; | ||||||||||||||||||
| private static final String CLEANUP_EXECUTION_ENGINE = "AMORO"; | ||||||||||||||||||
| private static final String CLEANUP_PROCESS_STAGE = "CLEANUP"; | ||||||||||||||||||
| private static final String EXTERNAL_PROCESS_IDENTIFIER = ""; | ||||||||||||||||||
| private static final SnowflakeIdGenerator ID_GENERATOR = new SnowflakeIdGenerator(); | ||||||||||||||||||
|
|
||||||||||||||||||
| private final PersistenceHelper persistenceHelper = new PersistenceHelper(); | ||||||||||||||||||
|
|
||||||||||||||||||
| protected final Set<ServerTableIdentifier> scheduledTables = | ||||||||||||||||||
| Collections.synchronizedSet(new HashSet<>()); | ||||||||||||||||||
|
|
@@ -123,16 +137,30 @@ private void scheduleTableExecution(TableRuntime tableRuntime, long delay) { | |||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| private void executeTask(TableRuntime tableRuntime) { | ||||||||||||||||||
| TableProcessMeta cleanProcessMeta = null; | ||||||||||||||||||
| CleanupOperation cleanupOperation = null; | ||||||||||||||||||
| Throwable executionError = null; | ||||||||||||||||||
|
|
||||||||||||||||||
| try { | ||||||||||||||||||
| if (isExecutable(tableRuntime)) { | ||||||||||||||||||
| cleanupOperation = getCleanupOperation(); | ||||||||||||||||||
| cleanProcessMeta = createCleanupProcessInfo(tableRuntime, cleanupOperation); | ||||||||||||||||||
|
|
||||||||||||||||||
| execute(tableRuntime); | ||||||||||||||||||
| // Different tables take different amounts of time to execute the end of execute(), | ||||||||||||||||||
| // so you need to perform the update operation separately for each table. | ||||||||||||||||||
| persistUpdatingCleanupTime(tableRuntime); | ||||||||||||||||||
| } | ||||||||||||||||||
| } catch (Exception e) { | ||||||||||||||||||
| logger.error("exception when schedule for table: {}", tableRuntime.getTableIdentifier(), e); | ||||||||||||||||||
| } catch (Throwable t) { | ||||||||||||||||||
| executionError = t; | ||||||||||||||||||
| logger.error( | ||||||||||||||||||
| "Failed to execute cleanup operation {} for table {}", | ||||||||||||||||||
| cleanupOperation, | ||||||||||||||||||
| tableRuntime.getTableIdentifier(), | ||||||||||||||||||
| t); | ||||||||||||||||||
| } finally { | ||||||||||||||||||
| persistCleanupResult(tableRuntime, cleanupOperation, cleanProcessMeta, executionError); | ||||||||||||||||||
|
|
||||||||||||||||||
| scheduledTables.remove(tableRuntime.getTableIdentifier()); | ||||||||||||||||||
| scheduleIfNecessary(tableRuntime, getNextExecutingTime(tableRuntime)); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
@@ -178,6 +206,124 @@ private void persistUpdatingCleanupTime(TableRuntime tableRuntime) { | |||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| @VisibleForTesting | ||||||||||||||||||
| protected TableProcessMeta createCleanupProcessInfo( | ||||||||||||||||||
| TableRuntime tableRuntime, CleanupOperation cleanupOperation) { | ||||||||||||||||||
|
|
||||||||||||||||||
| if (shouldSkipOperation(tableRuntime, cleanupOperation)) { | ||||||||||||||||||
| return null; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| TableProcessMeta cleanProcessMeta = buildProcessMeta(tableRuntime, cleanupOperation); | ||||||||||||||||||
| persistenceHelper.beginAndPersistCleanupProcess(cleanProcessMeta); | ||||||||||||||||||
|
|
||||||||||||||||||
| logger.debug( | ||||||||||||||||||
| "Successfully persist cleanup process [processId={}, tableId={}, processType={}]", | ||||||||||||||||||
| cleanProcessMeta.getProcessId(), | ||||||||||||||||||
| cleanProcessMeta.getTableId(), | ||||||||||||||||||
| cleanProcessMeta.getProcessType()); | ||||||||||||||||||
|
|
||||||||||||||||||
| return cleanProcessMeta; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| @VisibleForTesting | ||||||||||||||||||
| protected void persistCleanupResult( | ||||||||||||||||||
| TableRuntime tableRuntime, | ||||||||||||||||||
| CleanupOperation cleanupOperation, | ||||||||||||||||||
| TableProcessMeta cleanProcessMeta, | ||||||||||||||||||
| Throwable executionError) { | ||||||||||||||||||
|
|
||||||||||||||||||
|
zhangwl9 marked this conversation as resolved.
|
||||||||||||||||||
| if (cleanProcessMeta == null) { | ||||||||||||||||||
| return; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| if (executionError != null) { | ||||||||||||||||||
| cleanProcessMeta.setStatus(ProcessStatus.FAILED); | ||||||||||||||||||
| cleanProcessMeta.setFailMessage(ExceptionUtil.getErrorMessage(executionError, 4000)); | ||||||||||||||||||
| } else { | ||||||||||||||||||
|
zhangwl9 marked this conversation as resolved.
|
||||||||||||||||||
| cleanProcessMeta.setStatus(ProcessStatus.SUCCESS); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| long endTime = System.currentTimeMillis(); | ||||||||||||||||||
| persistenceHelper.persistAndSetCompleted( | ||||||||||||||||||
| tableRuntime, cleanupOperation, cleanProcessMeta, endTime); | ||||||||||||||||||
|
|
||||||||||||||||||
| logger.debug( | ||||||||||||||||||
| "Successfully updated lastCleanTime and cleanupProcess for table {} with cleanup operation {}", | ||||||||||||||||||
| tableRuntime.getTableIdentifier().getTableName(), | ||||||||||||||||||
| cleanupOperation); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| private TableProcessMeta buildProcessMeta( | ||||||||||||||||||
| TableRuntime tableRuntime, CleanupOperation cleanupOperation) { | ||||||||||||||||||
|
|
||||||||||||||||||
| TableProcessMeta cleanProcessMeta = new TableProcessMeta(); | ||||||||||||||||||
| cleanProcessMeta.setTableId(tableRuntime.getTableIdentifier().getId()); | ||||||||||||||||||
| cleanProcessMeta.setProcessId(ID_GENERATOR.generateId()); | ||||||||||||||||||
| cleanProcessMeta.setExternalProcessIdentifier(EXTERNAL_PROCESS_IDENTIFIER); | ||||||||||||||||||
| cleanProcessMeta.setStatus(ProcessStatus.RUNNING); | ||||||||||||||||||
| cleanProcessMeta.setProcessType(cleanupOperation.name()); | ||||||||||||||||||
| cleanProcessMeta.setProcessStage(CLEANUP_PROCESS_STAGE); | ||||||||||||||||||
| cleanProcessMeta.setExecutionEngine(CLEANUP_EXECUTION_ENGINE); | ||||||||||||||||||
| cleanProcessMeta.setRetryNumber(0); | ||||||||||||||||||
| cleanProcessMeta.setCreateTime(System.currentTimeMillis()); | ||||||||||||||||||
| cleanProcessMeta.setProcessParameters(new HashMap<>()); | ||||||||||||||||||
| cleanProcessMeta.setSummary(new HashMap<>()); | ||||||||||||||||||
|
|
||||||||||||||||||
| return cleanProcessMeta; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| private static class PersistenceHelper extends PersistentBase { | ||||||||||||||||||
|
|
||||||||||||||||||
| public PersistenceHelper() {} | ||||||||||||||||||
|
|
||||||||||||||||||
| private void beginAndPersistCleanupProcess(TableProcessMeta meta) { | ||||||||||||||||||
| doAsTransaction( | ||||||||||||||||||
| () -> | ||||||||||||||||||
| doAs( | ||||||||||||||||||
| TableProcessMapper.class, | ||||||||||||||||||
| mapper -> | ||||||||||||||||||
| mapper.insertProcess( | ||||||||||||||||||
| meta.getTableId(), | ||||||||||||||||||
| meta.getProcessId(), | ||||||||||||||||||
| meta.getExternalProcessIdentifier(), | ||||||||||||||||||
| meta.getStatus(), | ||||||||||||||||||
| meta.getProcessType(), | ||||||||||||||||||
| meta.getProcessStage(), | ||||||||||||||||||
| meta.getExecutionEngine(), | ||||||||||||||||||
| meta.getRetryNumber(), | ||||||||||||||||||
| meta.getCreateTime(), | ||||||||||||||||||
| meta.getProcessParameters(), | ||||||||||||||||||
| meta.getSummary()))); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| private void persistAndSetCompleted( | ||||||||||||||||||
| TableRuntime tableRuntime, | ||||||||||||||||||
| CleanupOperation cleanupOperation, | ||||||||||||||||||
| TableProcessMeta meta, | ||||||||||||||||||
| long endTime) { | ||||||||||||||||||
|
|
||||||||||||||||||
| doAsTransaction( | ||||||||||||||||||
| () -> | ||||||||||||||||||
| doAs( | ||||||||||||||||||
| TableProcessMapper.class, | ||||||||||||||||||
| mapper -> | ||||||||||||||||||
| mapper.updateProcess( | ||||||||||||||||||
| meta.getTableId(), | ||||||||||||||||||
| meta.getProcessId(), | ||||||||||||||||||
| meta.getExternalProcessIdentifier(), | ||||||||||||||||||
| meta.getStatus(), | ||||||||||||||||||
| meta.getProcessStage(), | ||||||||||||||||||
| meta.getRetryNumber(), | ||||||||||||||||||
| endTime, | ||||||||||||||||||
| meta.getFailMessage(), | ||||||||||||||||||
| meta.getProcessParameters(), | ||||||||||||||||||
| meta.getSummary())), | ||||||||||||||||||
| () -> | ||||||||||||||||||
| ((DefaultTableRuntime) tableRuntime).updateLastCleanTime(cleanupOperation, endTime)); | ||||||||||||||||||
|
Comment on lines
+322
to
+323
|
||||||||||||||||||
| () -> | |
| ((DefaultTableRuntime) tableRuntime).updateLastCleanTime(cleanupOperation, endTime)); | |
| () -> { | |
| if (meta.getStatus() == ProcessStatus.SUCCESS) { | |
| ((DefaultTableRuntime) tableRuntime) | |
| .updateLastCleanTime(cleanupOperation, endTime); | |
| } | |
| }); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,27 +19,41 @@ | |
| package org.apache.amoro.server.scheduler.inline; | ||
|
|
||
| import org.apache.amoro.TableRuntime; | ||
| import org.apache.amoro.server.process.TableProcessMeta; | ||
| import org.apache.amoro.server.scheduler.PeriodicTableScheduler; | ||
| import org.apache.amoro.server.table.TableService; | ||
| import org.apache.amoro.server.table.cleanup.CleanupOperation; | ||
|
|
||
| import java.lang.reflect.InvocationTargetException; | ||
| import java.lang.reflect.Method; | ||
|
|
||
| /** | ||
| * Test table executor implementation for testing PeriodicTableScheduler functionality. This class | ||
| * allows configuration of cleanup operations and enabled state for testing purposes. | ||
| */ | ||
| class PeriodicTableSchedulerTestBase extends PeriodicTableScheduler { | ||
| private final CleanupOperation cleanupOperation; | ||
| private final boolean enabled; | ||
| private final RuntimeException executionException; | ||
| private static final long SNAPSHOTS_EXPIRING_INTERVAL = 60 * 60 * 1000L; // 1 hour | ||
| private static final long ORPHAN_FILES_CLEANING_INTERVAL = 24 * 60 * 60 * 1000L; // 1 day | ||
| private static final long DANGLING_DELETE_FILES_CLEANING_INTERVAL = 24 * 60 * 60 * 1000L; | ||
| private static final long DATA_EXPIRING_INTERVAL = 60 * 60 * 1000L; // 1 hour | ||
|
|
||
| public PeriodicTableSchedulerTestBase( | ||
| TableService tableService, CleanupOperation cleanupOperation, boolean enabled) { | ||
| this(tableService, cleanupOperation, enabled, null); | ||
| } | ||
|
|
||
| public PeriodicTableSchedulerTestBase( | ||
| TableService tableService, | ||
| CleanupOperation cleanupOperation, | ||
| boolean enabled, | ||
| RuntimeException executionException) { | ||
| super(tableService, 1); | ||
| this.cleanupOperation = cleanupOperation; | ||
| this.enabled = enabled; | ||
| this.executionException = executionException; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -59,7 +73,9 @@ protected boolean enabled(TableRuntime tableRuntime) { | |
|
|
||
| @Override | ||
| protected void execute(TableRuntime tableRuntime) { | ||
| // Do nothing in test | ||
| if (executionException != null) { | ||
| throw executionException; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -88,4 +104,37 @@ public boolean shouldExecuteTaskForTest( | |
| TableRuntime tableRuntime, CleanupOperation cleanupOperation) { | ||
| return shouldExecuteTask(tableRuntime, cleanupOperation); | ||
| } | ||
|
|
||
| public TableProcessMeta createCleanupProcessInfoForTest( | ||
| TableRuntime tableRuntime, CleanupOperation operation) { | ||
| return createCleanupProcessInfo(tableRuntime, operation); | ||
| } | ||
|
|
||
| public void persistCleanupResultForTest( | ||
| TableRuntime tableRuntime, | ||
| CleanupOperation operation, | ||
| TableProcessMeta meta, | ||
| Throwable error) { | ||
| persistCleanupResult(tableRuntime, operation, meta, error); | ||
| } | ||
|
|
||
| public void executeTaskForTest(TableRuntime tableRuntime) { | ||
| try { | ||
| Method executeTask = | ||
| PeriodicTableScheduler.class.getDeclaredMethod("executeTask", TableRuntime.class); | ||
| executeTask.setAccessible(true); | ||
| executeTask.invoke(this, tableRuntime); | ||
| } catch (InvocationTargetException e) { | ||
| Throwable cause = e.getCause(); | ||
| if (cause instanceof RuntimeException) { | ||
| throw (RuntimeException) cause; | ||
| } | ||
| if (cause instanceof Error) { | ||
| throw (Error) cause; | ||
| } | ||
| throw new RuntimeException(cause); | ||
| } catch (ReflectiveOperationException e) { | ||
| throw new RuntimeException("Failed to invoke executeTask for test", e); | ||
| } | ||
| } | ||
|
Comment on lines
+121
to
+139
|
||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.