diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java deleted file mode 100644 index f7b7557819..0000000000 --- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.server.persistence.mapper; - -import org.apache.amoro.process.TableProcessState; -import org.apache.ibatis.annotations.Insert; -import org.apache.ibatis.annotations.Options; -import org.apache.ibatis.annotations.Param; -import org.apache.ibatis.annotations.Result; -import org.apache.ibatis.annotations.ResultMap; -import org.apache.ibatis.annotations.Results; -import org.apache.ibatis.annotations.Select; -import org.apache.ibatis.annotations.Update; - -import java.util.Map; - -public interface ProcessStateMapper { - - @Insert( - "INSERT INTO table_process_state " - + "(process_id, action, table_id, retry_num, status, start_time, end_time, fail_reason, summary) " - + "VALUES " - + "(#{id}, #{action}, #{tableId}, #{retryNumber}, #{status}, #{startTime}, #{endTime}, #{failedReason}, #{summary})") - @Options(useGeneratedKeys = true, keyProperty = "id") - void createProcessState(TableProcessState state); - - @Update( - "UPDATE table_process_state " - + "SET status = #{status}, start_time = #{startTime} " - + "WHERE process_id = #{id} and retry_num = #{retryNumber}") - void updateProcessRunning(TableProcessState state); - - @Update( - "UPDATE table_process_state " - + "SET status = #{status}, end_time = #{endTime} " - + "WHERE process_id = #{id} and retry_num = #{retryNumber}") - void updateProcessCompleted(TableProcessState state); - - @Update( - "UPDATE table_process_state " - + "SET status = #{status}, end_time = #{endTime}, fail_reason = #{failedReason} " - + "WHERE process_id = #{id} and retry_num = #{retryNumber}") - void updateProcessFailed(TableProcessState state); - - @Select( - "SELECT process_id, action, table_id, retry_num, status, start_time, end_time, fail_reason, summary " - + "FROM table_process_state " - + "WHERE process_id = #{processId}") - @Results( - id = "TableProcessStateResultMap", - value = { - @Result(property = "id", column = "process_id"), - @Result(property = "action", column = "action"), - @Result(property = "tableId", column = "table_id"), - @Result(property = "retryNumber", column = "retry_num"), - @Result(property = "status", column = "status"), - @Result(property = "startTime", column = "start_time"), - @Result(property = "endTime", column = "end_time"), - @Result(property = "failedReason", column = "fail_reason"), - @Result(property = "summary", column = "summary", javaType = Map.class) - }) - TableProcessState getProcessStateById(@Param("processId") long processId); - - /** Query TableProcessState by table_id */ - @Select( - "SELECT process_id, action, table_id, retry_num, status, start_time, end_time, fail_reason, summary " - + "FROM table_process_state " - + "WHERE table_id = #{tableId}") - @ResultMap("TableProcessStateResultMap") - TableProcessState getProcessStateByTableId(@Param("tableId") long tableId); -} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java index 83574c6636..daf5869413 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java @@ -20,6 +20,7 @@ import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.ActionCoordinator; import org.apache.amoro.process.TableProcess; import org.apache.amoro.process.TableProcessStore; import org.apache.amoro.server.scheduler.PeriodicTableScheduler; @@ -27,6 +28,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Optional; + /** * Periodic scheduler that delegates scheduling decisions to an {@link ActionCoordinator}. It * creates, recovers and retries table processes via {@link ProcessService}. @@ -95,8 +98,8 @@ protected boolean enabled(TableRuntime tableRuntime) { */ @Override protected void execute(TableRuntime tableRuntime) { - TableProcess process = coordinator.createTableProcess(tableRuntime); - processService.register(tableRuntime, process); + Optional process = coordinator.trigger(tableRuntime); + process.ifPresent(p -> processService.register(tableRuntime, p)); } /** @@ -110,16 +113,6 @@ protected void recover(TableRuntime tableRuntime, TableProcessStore processStore processService.recover(tableRuntime, process); } - /** - * Retry a failed table process. - * - * @param process process to retry - */ - protected void retry(TableProcess process) { - process = coordinator.retryTableProcess(process); - processService.retry(process); - } - /** * Get executor delay from coordinator. * diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java index ee9f136dc6..252d1574bf 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java @@ -25,6 +25,7 @@ import org.apache.amoro.TableRuntime; import org.apache.amoro.config.Configurations; import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.process.ActionCoordinator; import org.apache.amoro.process.ProcessEvent; import org.apache.amoro.process.ProcessStatus; import org.apache.amoro.process.TableProcess; @@ -242,7 +243,7 @@ private void executeOrTraceProcess(TableProcess process) { "Regular Retry.", process.getProcessParameters(), process.getSummary()); - scheduler.retry(process); + executeOrTraceProcess(process); } else { untrackTableProcessInstance( process.getTableRuntime().getTableIdentifier(), process.getId()); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java index 311b60e035..6890c25c10 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java @@ -19,7 +19,6 @@ package org.apache.amoro.server.process; import org.apache.amoro.process.ProcessStatus; -import org.apache.amoro.process.TableProcessState; import org.apache.amoro.process.TableProcessStore; import java.util.HashMap; @@ -189,25 +188,6 @@ public static TableProcessMeta fromTableProcessStore(TableProcessStore tableProc return tableProcessMeta; } - @Deprecated - public static TableProcessMeta fromTableProcessState(TableProcessState tableProcessState) { - TableProcessMeta tableProcessMeta = new TableProcessMeta(); - tableProcessMeta.setProcessId(tableProcessState.getId()); - tableProcessMeta.setTableId(tableProcessState.getTableIdentifier().getId()); - tableProcessMeta.setExternalProcessIdentifier(tableProcessState.getExternalProcessIdentifier()); - tableProcessMeta.setStatus(tableProcessState.getStatus()); - tableProcessMeta.setProcessType(tableProcessState.getAction().getName()); - tableProcessMeta.setProcessStage(tableProcessState.getStage().getDesc()); - tableProcessMeta.setExecutionEngine(tableProcessState.getExecutionEngine()); - tableProcessMeta.setRetryNumber(tableProcessState.getRetryNumber()); - tableProcessMeta.setCreateTime(tableProcessState.getStartTime()); - tableProcessMeta.setFinishTime(tableProcessState.getEndTime()); - tableProcessMeta.setFailMessage(tableProcessState.getFailedReason()); - tableProcessMeta.setProcessParameters(tableProcessState.getProcessParameters()); - tableProcessMeta.setSummary(tableProcessState.getSummary()); - return tableProcessMeta; - } - public static TableProcessMeta of( long processId, long tableId, diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicExternalScheduler.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicExternalScheduler.java deleted file mode 100644 index 346f02772b..0000000000 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicExternalScheduler.java +++ /dev/null @@ -1,278 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.server.scheduler; - -import org.apache.amoro.Action; -import org.apache.amoro.SupportsProcessPlugins; -import org.apache.amoro.TableRuntime; -import org.apache.amoro.process.AmoroProcess; -import org.apache.amoro.process.ManagedProcess; -import org.apache.amoro.process.ProcessFactory; -import org.apache.amoro.process.ProcessStatus; -import org.apache.amoro.process.SimpleFuture; -import org.apache.amoro.process.TableProcess; -import org.apache.amoro.process.TableProcessState; -import org.apache.amoro.process.TableProcessStore; -import org.apache.amoro.resource.ExternalResourceContainer; -import org.apache.amoro.resource.Resource; -import org.apache.amoro.resource.ResourceManager; -import org.apache.amoro.server.persistence.PersistentBase; -import org.apache.amoro.server.persistence.mapper.TableProcessMapper; -import org.apache.amoro.server.process.DefaultTableProcessStore; -import org.apache.amoro.server.process.TableProcessMeta; -import org.apache.amoro.server.table.TableService; -import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; - -import java.util.List; -import java.util.Optional; - -public abstract class PeriodicExternalScheduler extends PeriodicTableScheduler { - - private final ExternalResourceContainer resourceContainer; - private final ResourceManager resourceManager; - private final ProcessFactory processFactory; - - public PeriodicExternalScheduler( - ResourceManager resourceManager, - ExternalResourceContainer resourceContainer, - Action action, - TableService tableService, - int poolSize) { - super(action, tableService, poolSize); - this.resourceContainer = resourceContainer; - this.resourceManager = resourceManager; - this.processFactory = generateProcessFactory(); - } - - @Override - protected void initHandler(List tableRuntimeList) { - tableRuntimeList.stream() - .filter(t -> t instanceof SupportsProcessPlugins) - .map(t -> (SupportsProcessPlugins) t) - .forEach(tableRuntime -> tableRuntime.install(getAction(), processFactory)); - super.initHandler(tableRuntimeList); - } - - @Override - protected boolean enabled(TableRuntime tableRuntime) { - return Optional.of(tableRuntime) - .filter(t -> t instanceof SupportsProcessPlugins) - .map(t -> (SupportsProcessPlugins) t) - .map(t -> t.enabled(getAction())) - .orElse(false); - } - - @Override - protected void execute(TableRuntime tableRuntime) { - Preconditions.checkArgument(tableRuntime instanceof SupportsProcessPlugins); - SupportsProcessPlugins runtimeSupportProcessPlugin = (SupportsProcessPlugins) tableRuntime; - // Trigger a table process and check conflicts by table runtime - // Update process state after process completed, the callback must be register first - AmoroProcess process = runtimeSupportProcessPlugin.trigger(getAction()); - process.getCompleteFuture().whenCompleted(() -> persistTableProcess(process)); - ManagedProcess managedProcess = new ManagedTableProcess(process); - - // Submit the table process to resource manager, this is a sync operation - // update process completed and delete related resources - managedProcess.submit(); - - // Trace the table process by async framework so that process can be called back when completed - trace(process); - } - - protected int getMaxRetryNumber() { - return 1; - } - - protected abstract void trace(AmoroProcess process); - - protected ProcessFactory generateProcessFactory() { - return new ExternalProcessFactory(); - } - - protected void persistTableProcess(AmoroProcess process) { - TableProcessStore store = process.store(); - if (store.getStatus() == ProcessStatus.SUBMITTED) { - new PersistencyHelper().createProcessState(store); - } else { - new PersistencyHelper().updateProcessStatus(store); - } - } - - private static class PersistencyHelper extends PersistentBase { - - void createProcessState(TableProcessStore store) { - TableProcessMeta meta = TableProcessMeta.fromTableProcessStore(store); - doAs( - TableProcessMapper.class, - mapper -> - mapper.updateProcess( - meta.getTableId(), - meta.getProcessId(), - meta.getExternalProcessIdentifier(), - meta.getStatus(), - meta.getProcessStage(), - meta.getRetryNumber(), - meta.getFinishTime(), - meta.getFailMessage(), - meta.getProcessParameters(), - meta.getSummary())); - } - - void updateProcessStatus(TableProcessStore store) { - TableProcessMeta meta = TableProcessMeta.fromTableProcessStore(store); - doAs( - TableProcessMapper.class, - mapper -> - mapper.updateProcess( - meta.getTableId(), - meta.getProcessId(), - meta.getExternalProcessIdentifier(), - meta.getStatus(), - meta.getProcessStage(), - meta.getRetryNumber(), - meta.getFinishTime(), - meta.getFailMessage(), - meta.getProcessParameters(), - meta.getSummary())); - } - } - - private class ExternalTableProcess extends TableProcess { - - ExternalTableProcess(TableRuntime tableRuntime) { - super( - tableRuntime, - new DefaultTableProcessStore( - tableRuntime, new TableProcessMeta(), PeriodicExternalScheduler.this.getAction())); - } - - ExternalTableProcess(TableRuntime tableRuntime, TableProcessState state) { - super( - tableRuntime, - new DefaultTableProcessStore( - tableRuntime, - TableProcessMeta.fromTableProcessState(state), - PeriodicExternalScheduler.this.getAction())); - } - - @Override - protected void closeInternal() {} - } - - private class ExternalProcessFactory implements ProcessFactory { - - @Override - public AmoroProcess create(TableRuntime tableRuntime, Action action) { - return new ExternalTableProcess(tableRuntime); - } - - @Override - public AmoroProcess recover(TableRuntime tableRuntime, TableProcessState state) { - return new ExternalTableProcess(tableRuntime, state); - } - } - - protected class ManagedTableProcess implements ManagedProcess { - - private final AmoroProcess process; - - ManagedTableProcess(AmoroProcess process) { - this.process = process; - } - - @Override - public void submit() { - Resource resource = resourceContainer.submit(this); - if (resource == null) { - throw new IllegalStateException("Submit table process can not return null resource"); - } - persistTableProcess(this); - resourceManager.createResource(resource); - getCompleteFuture() - .whenCompleted( - () -> { - resourceManager.deleteResource(resource.getResourceId()); - if (store().getStatus() == ProcessStatus.FAILED - && store().getRetryNumber() < getMaxRetryNumber()) { - retry(); - } - }); - store().begin().updateTableProcessStatus(ProcessStatus.SUBMITTED).commit(); - getSubmitFuture().complete(); - } - - @Override - public void complete() { - store() - .begin() - .updateTableProcessStatus(ProcessStatus.SUCCESS) - .updateFinishTime(System.currentTimeMillis()) - .commit(); - process.getCompleteFuture().complete(); - } - - @Override - public void complete(String failedReason) { - store() - .begin() - .updateTableProcessStatus(ProcessStatus.FAILED) - .updateTableProcessFailMessage(failedReason) - .updateFinishTime(System.currentTimeMillis()) - .commit(); - process.getCompleteFuture().complete(); - } - - @Override - public void retry() { - store() - .begin() - .updateTableProcessStatus(ProcessStatus.PENDING) - .updateRetryNumber(store().getRetryNumber() + 1) - .updateExternalProcessIdentifier("") - .commit(); - submit(); - } - - @Override - public void kill() { - store() - .begin() - .updateTableProcessStatus(ProcessStatus.KILLED) - .updateFinishTime(System.currentTimeMillis()) - .commit(); - process.getCompleteFuture().complete(); - } - - @Override - public SimpleFuture getSubmitFuture() { - return process.getSubmitFuture(); - } - - @Override - public SimpleFuture getCompleteFuture() { - return process.getCompleteFuture(); - } - - @Override - public TableProcessStore store() { - return process.store(); - } - } -} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java index e07cd4432d..05e17adc09 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java @@ -18,17 +18,26 @@ package org.apache.amoro.server.table; +import org.apache.amoro.Action; import org.apache.amoro.ServerTableIdentifier; -import org.apache.amoro.SupportsProcessPlugins; -import org.apache.amoro.TableRuntime; import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.process.TableProcessStore; import org.apache.amoro.server.persistence.PersistentBase; +import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; +import org.apache.amoro.shade.zookeeper3.org.apache.curator.shaded.com.google.common.collect.Maps; import org.apache.amoro.table.TableRuntimeStore; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + public abstract class AbstractTableRuntime extends PersistentBase - implements TableRuntime, SupportsProcessPlugins { + implements SupportProcessManagement { private final TableRuntimeStore store; + private final Map processContainerMap = Maps.newConcurrentMap(); protected AbstractTableRuntime(TableRuntimeStore store) { this.store = store; @@ -48,6 +57,49 @@ public TableConfiguration getTableConfiguration() { return TableConfigurations.parseTableConfig(store().getTableConfig()); } + @Override + public Map getTableConfig() { + return store().getTableConfig(); + } + + @Override + public List getProcessStates() { + return processContainerMap.values().stream() + .flatMap(container -> container.getProcessStates().stream()) + .collect(Collectors.toList()); + } + + @Override + public List getProcessStates(Action action) { + return processContainerMap.get(action).getProcessStates(); + } + + @Override + public void registerProcess(TableProcessStore processStore) { + processContainerMap + .computeIfAbsent(processStore.getAction(), k -> new TableProcessContainer()) + .processLock + .lock(); + try { + processContainerMap + .get(processStore.getAction()) + .processMap + .put(processStore.getProcessId(), processStore); + } finally { + processContainerMap.get(processStore.getAction()).processLock.unlock(); + } + } + + @Override + public void removeProcess(TableProcessStore processStore) { + processContainerMap.computeIfPresent( + processStore.getAction(), + (action, container) -> { + container.processMap.remove(processStore.getProcessId()); + return container; + }); + } + @Override public String getGroupName() { return store().getGroupName(); @@ -61,4 +113,13 @@ public int getStatusCode() { public void dispose() { store().dispose(); } + + private static class TableProcessContainer { + private final Lock processLock = new ReentrantLock(); + private final Map processMap = Maps.newConcurrentMap(); + + public List getProcessStates() { + return Lists.newArrayList(processMap.values()); + } + } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java index 11c1a21cff..8a337451c4 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java @@ -18,10 +18,7 @@ package org.apache.amoro.server.table; -import org.apache.amoro.Action; import org.apache.amoro.AmoroTable; -import org.apache.amoro.SupportsProcessPlugins; -import org.apache.amoro.TableRuntime; import org.apache.amoro.api.BlockableOperation; import org.apache.amoro.config.OptimizingConfig; import org.apache.amoro.config.TableConfiguration; @@ -30,10 +27,7 @@ import org.apache.amoro.optimizing.OptimizingType; import org.apache.amoro.optimizing.TableRuntimeOptimizingState; import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator; -import org.apache.amoro.process.AmoroProcess; -import org.apache.amoro.process.ProcessFactory; import org.apache.amoro.process.ProcessStatus; -import org.apache.amoro.process.TableProcessStore; import org.apache.amoro.server.AmoroServiceConstants; import org.apache.amoro.server.optimizing.OptimizingProcess; import org.apache.amoro.server.optimizing.OptimizingStatus; @@ -48,7 +42,6 @@ import org.apache.amoro.server.utils.IcebergTableUtil; import org.apache.amoro.server.utils.SnowflakeIdGenerator; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; -import org.apache.amoro.shade.zookeeper3.org.apache.curator.shaded.com.google.common.collect.Maps; import org.apache.amoro.table.BaseTable; import org.apache.amoro.table.ChangeTable; import org.apache.amoro.table.MixedTable; @@ -64,13 +57,9 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; /** Default table runtime implementation. */ -public class DefaultTableRuntime extends AbstractTableRuntime - implements TableRuntime, SupportsProcessPlugins { +public class DefaultTableRuntime extends AbstractTableRuntime { private static final Logger LOG = LoggerFactory.getLogger(DefaultTableRuntime.class); @@ -95,8 +84,6 @@ public class DefaultTableRuntime extends AbstractTableRuntime public static final List> REQUIRED_STATES = Lists.newArrayList( OPTIMIZING_STATE_KEY, PENDING_INPUT_KEY, PROCESS_ID_KEY, CLEANUP_STATE_KEY); - - private final Map processContainerMap = Maps.newConcurrentMap(); private final TableOptimizingMetrics optimizingMetrics; private final TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics; private final TableSummaryMetrics tableSummaryMetrics; @@ -132,39 +119,6 @@ public void registerMetric(MetricRegistry metricRegistry) { this.tableSummaryMetrics.register(metricRegistry); } - @Override - public AmoroProcess trigger(Action action) { - return Optional.ofNullable(processContainerMap.get(action)) - .map(container -> container.trigger(action)) - // Define a related exception - .orElseThrow(() -> new IllegalArgumentException("No ProcessFactory for action " + action)); - } - - @Override - public void install(Action action, ProcessFactory processFactory) { - if (processContainerMap.putIfAbsent(action, new TableProcessContainer(processFactory)) - != null) { - throw new IllegalStateException("ProcessFactory for action " + action + " already exists"); - } - } - - @Override - public boolean enabled(Action action) { - return processContainerMap.get(action) != null; - } - - @Override - public List getProcessStates() { - return processContainerMap.values().stream() - .flatMap(container -> container.getProcessStates().stream()) - .collect(Collectors.toList()); - } - - @Override - public List getProcessStates(Action action) { - return processContainerMap.get(action).getProcessStates(); - } - public TableOrphanFilesCleaningMetrics getOrphanFilesCleaningMetrics() { return orphanFilesCleaningMetrics; } @@ -586,30 +540,4 @@ private long doRefreshSnapshots(UnkeyedTable table) { return currentSnapshotId; } - - private class TableProcessContainer { - private final Lock processLock = new ReentrantLock(); - private final ProcessFactory processFactory; - private final Map processMap = Maps.newConcurrentMap(); - - TableProcessContainer(ProcessFactory processFactory) { - this.processFactory = processFactory; - } - - public AmoroProcess trigger(Action action) { - processLock.lock(); - try { - AmoroProcess process = processFactory.create(DefaultTableRuntime.this, action); - process.getCompleteFuture().whenCompleted(() -> processMap.remove(process.getId())); - processMap.put(process.getId(), process); - return process; - } finally { - processLock.unlock(); - } - } - - public List getProcessStates() { - return processMap.values().stream().map(AmoroProcess::store).collect(Collectors.toList()); - } - } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java index 86e748cc54..41c8041f22 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java @@ -21,6 +21,9 @@ import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.ActionCoordinator; +import org.apache.amoro.process.ProcessFactory; +import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.table.StateKey; import org.apache.amoro.table.TableRuntimeFactory; import org.apache.amoro.table.TableRuntimeStore; @@ -41,6 +44,14 @@ public String name() { return "default"; } + @Override + public List supportedCoordinators() { + return Lists.newArrayList(); + } + + @Override + public void initialize(List factories) {} + @Override public Optional accept( ServerTableIdentifier tableIdentifier, Map tableProperties) { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/SupportProcessManagement.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/SupportProcessManagement.java new file mode 100644 index 0000000000..b6ff971b8b --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/SupportProcessManagement.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table; + +import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.TableProcessStore; + +/** Interface for AMS inner used. */ +public interface SupportProcessManagement extends TableRuntime { + + /** + * Register a process store to the table runtime. + * + * @param processStore the process store to register + */ + void registerProcess(TableProcessStore processStore); + + /** + * Remove a process store from the table runtime. + * + * @param processStore the process store to remove + */ + void removeProcess(TableProcessStore processStore); +} diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java b/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java index da3617a148..9d0f89b6b9 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java @@ -21,18 +21,20 @@ import org.apache.amoro.Action; import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.ActionCoordinator; import org.apache.amoro.process.TableProcess; import org.apache.amoro.process.TableProcessStore; +import org.apache.amoro.server.utils.SnowflakeIdGenerator; import java.util.HashMap; import java.util.Map; +import java.util.Optional; /** Mock implementation of {@link ActionCoordinator} used in tests. */ public class MockActionCoordinator implements ActionCoordinator { public static final int PROCESS_MAX_POOL_SIZE = 1000; - private static final TableFormat[] DEFAULT_FORMATS = new TableFormat[] {TableFormat.PAIMON}; - - public static final Action DEFAULT_ACTION = new Action(DEFAULT_FORMATS, 0, "default_action"); + public static final Action DEFAULT_ACTION = Action.register("default_action"); + public static final SnowflakeIdGenerator SNOWFLAKE_ID_GENERATOR = new SnowflakeIdGenerator(); /** * Whether the format is supported. @@ -61,12 +63,6 @@ public Action action() { return DEFAULT_ACTION; } - /** Get execution engine name. */ - @Override - public String executionEngine() { - return "default"; - } - /** Next executing time. */ @Override public long getNextExecutingTime(TableRuntime tableRuntime) { @@ -92,19 +88,19 @@ public long getExecutorDelay() { * @return mock process */ @Override - public TableProcess createTableProcess(TableRuntime tableRuntime) { + public Optional trigger(TableRuntime tableRuntime) { TableProcessMeta tableProcessMeta = TableProcessMeta.of( SNOWFLAKE_ID_GENERATOR.generateId(), tableRuntime.getTableIdentifier().getId(), action().getName(), - executionEngine(), + "default", new HashMap<>()); TableProcessStore tableProcessStore = new DefaultTableProcessStore( tableProcessMeta.getProcessId(), tableRuntime, tableProcessMeta, action(), 3); MockTableProcess mockTableProcess = new MockTableProcess(tableRuntime, tableProcessStore); - return mockTableProcess; + return Optional.of(mockTableProcess); } /** @@ -120,18 +116,6 @@ public TableProcess recoverTableProcess( return new MockTableProcess(tableRuntime, processStore); } - /** Return same process to cancel. */ - @Override - public TableProcess cancelTableProcess(TableRuntime tableRuntime, TableProcess process) { - return process; - } - - /** Return same process to retry. */ - @Override - public TableProcess retryTableProcess(TableProcess process) { - return process; - } - /** Open plugin. */ @Override public void open(Map properties) {} diff --git a/amoro-common/src/main/java/org/apache/amoro/Action.java b/amoro-common/src/main/java/org/apache/amoro/Action.java index 42671a6e1d..1d5f81b3bf 100644 --- a/amoro-common/src/main/java/org/apache/amoro/Action.java +++ b/amoro-common/src/main/java/org/apache/amoro/Action.java @@ -20,38 +20,33 @@ import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; -import java.util.Arrays; +import java.util.Locale; +import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; public final class Action { private static final int MAX_NAME_LENGTH = 16; - - /** supported table formats of this action */ - private final TableFormat[] formats; + private static final Map registeredActions = new ConcurrentHashMap<>(); private final String name; - /** - * the weight number of this action, the bigger the weight number, the higher positions of - * schedulers or front pages - */ - private final int weight; - public Action(TableFormat[] formats, int weight, String name) { - Preconditions.checkArgument( - name.length() <= MAX_NAME_LENGTH, - "Action name length should be less than " + MAX_NAME_LENGTH); - this.formats = formats; - this.name = name; - this.weight = weight; + public static Action register(String name) { + final String regularName = name.trim().toUpperCase(Locale.ROOT); + return registeredActions.computeIfAbsent(regularName, s -> new Action(regularName)); } - public int getWeight() { - return weight; + public static Action valueOf(String name) { + final String regularName = name.trim().toUpperCase(Locale.ROOT); + return registeredActions.get(regularName); } - public TableFormat[] supportedFormats() { - return formats; + private Action(String name) { + Preconditions.checkArgument( + name.length() <= MAX_NAME_LENGTH, + "Action name length should be less than " + MAX_NAME_LENGTH); + this.name = name; } public String getName() { @@ -67,13 +62,11 @@ public boolean equals(Object o) { return false; } Action action = (Action) o; - return Objects.equals(name, action.name) && Arrays.equals(formats, action.formats); + return Objects.equals(name, action.name); } @Override public int hashCode() { - int result = Objects.hash(name); - result = 31 * result + Arrays.hashCode(formats); - return result; + return Objects.hash(name); } } diff --git a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java index 76f470d98c..c75c5ac8d0 100644 --- a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java +++ b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java @@ -23,9 +23,9 @@ public class IcebergActions { private static final TableFormat[] DEFAULT_FORMATS = new TableFormat[] {TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE}; - public static final Action SYSTEM = new Action(DEFAULT_FORMATS, 0, "system"); - public static final Action REWRITE = new Action(DEFAULT_FORMATS, 10, "rewrite"); - public static final Action DELETE_ORPHANS = new Action(DEFAULT_FORMATS, 2, "delete-orphans"); - public static final Action SYNC_HIVE = new Action(DEFAULT_FORMATS, 3, "sync-hive"); - public static final Action EXPIRE_DATA = new Action(DEFAULT_FORMATS, 1, "expire-data"); + public static final Action SYSTEM = Action.register("system"); + public static final Action REWRITE = Action.register("rewrite"); + public static final Action DELETE_ORPHANS = Action.register("delete-orphans"); + public static final Action SYNC_HIVE = Action.register("sync-hive"); + public static final Action EXPIRE_DATA = Action.register("expire-data"); } diff --git a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java index 31ea74f1fe..ae2f610cc4 100644 --- a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java +++ b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java @@ -24,6 +24,7 @@ import org.apache.amoro.process.TableProcessStore; import java.util.List; +import java.util.Map; /** * TableRuntime is the key interface for the AMS framework to interact with the table. Typically, it @@ -59,12 +60,20 @@ public interface TableRuntime { ServerTableIdentifier getTableIdentifier(); /** - * Get the table configuration. + * Get the table configuration. @Deprecated use {@link #getTableConfig()} instead. * * @return the table configuration */ + @Deprecated TableConfiguration getTableConfiguration(); + /** + * Get the table configuration. + * + * @return the table configuration + */ + Map getTableConfig(); + /** * Register the metric of the table runtime. * diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinator.java b/amoro-common/src/main/java/org/apache/amoro/process/ActionCoordinator.java similarity index 72% rename from amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinator.java rename to amoro-common/src/main/java/org/apache/amoro/process/ActionCoordinator.java index 5e55154cbd..1f00a1c4f7 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinator.java +++ b/amoro-common/src/main/java/org/apache/amoro/process/ActionCoordinator.java @@ -16,15 +16,14 @@ * limitations under the License. */ -package org.apache.amoro.server.process; +package org.apache.amoro.process; import org.apache.amoro.Action; import org.apache.amoro.ActivePlugin; import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; -import org.apache.amoro.process.TableProcess; -import org.apache.amoro.process.TableProcessStore; -import org.apache.amoro.server.utils.SnowflakeIdGenerator; + +import java.util.Optional; /** * Coordinator for a specific {@link org.apache.amoro.Action} to manage table processes. Provides @@ -32,10 +31,6 @@ */ public interface ActionCoordinator extends ActivePlugin { - String PROPERTY_PARALLELISM = "parallelism"; - - SnowflakeIdGenerator SNOWFLAKE_ID_GENERATOR = new SnowflakeIdGenerator(); - /** * Check whether the given table format is supported by this coordinator. * @@ -58,13 +53,6 @@ public interface ActionCoordinator extends ActivePlugin { */ Action action(); - /** - * Get the execution engine name used by this coordinator. - * - * @return execution engine name - */ - String executionEngine(); - /** * Calculate the next executing time for the given table runtime. * @@ -94,7 +82,7 @@ public interface ActionCoordinator extends ActivePlugin { * @param tableRuntime table runtime * @return a new table process */ - TableProcess createTableProcess(TableRuntime tableRuntime); + Optional trigger(TableRuntime tableRuntime); /** * Recover a {@link TableProcess} from persisted store. @@ -104,21 +92,4 @@ public interface ActionCoordinator extends ActivePlugin { * @return recovered table process */ TableProcess recoverTableProcess(TableRuntime tableRuntime, TableProcessStore processStore); - - /** - * Prepare a {@link TableProcess} for cancellation. - * - * @param tableRuntime table runtime - * @param process table process to cancel - * @return the process instance to be canceled - */ - TableProcess cancelTableProcess(TableRuntime tableRuntime, TableProcess process); - - /** - * Prepare a {@link TableProcess} for retrying. - * - * @param process table process to retry - * @return the process instance to be retried - */ - TableProcess retryTableProcess(TableProcess process); } diff --git a/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java b/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java index cf0ffd50bf..6ff5b6cfb6 100644 --- a/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java +++ b/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java @@ -47,7 +47,7 @@ public interface AmoroProcess { SimpleFuture getCompleteFuture(); /** - * Get {@link ProcessState} of the process + * Get {@link TableProcessStore} of the process * * @return the state of the process */ diff --git a/amoro-common/src/main/java/org/apache/amoro/process/OptimizingState.java b/amoro-common/src/main/java/org/apache/amoro/process/OptimizingState.java deleted file mode 100644 index c4a900dcac..0000000000 --- a/amoro-common/src/main/java/org/apache/amoro/process/OptimizingState.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.process; - -import org.apache.amoro.Action; -import org.apache.amoro.ServerTableIdentifier; -import org.apache.amoro.StateField; - -/** The state of the optimizing process. */ -public abstract class OptimizingState extends TableProcessState { - - @StateField private volatile long targetSnapshotId; - @StateField private volatile long watermark; - @StateField private volatile ProcessStage stage; - @StateField private volatile long currentStageStartTime; - - public OptimizingState(Action action, ServerTableIdentifier tableIdentifier) { - super(action, tableIdentifier); - } - - public OptimizingState(long id, Action action, ServerTableIdentifier tableIdentifier) { - super(id, action, tableIdentifier); - } - - protected void setStage(ProcessStage stage) { - this.stage = stage; - this.currentStageStartTime = System.currentTimeMillis(); - } - - protected void setStage(ProcessStage stage, long stageStartTime) { - this.stage = stage; - this.currentStageStartTime = stageStartTime; - } - - protected void setTargetSnapshotId(long targetSnapshotId) { - this.targetSnapshotId = targetSnapshotId; - } - - protected void setWatermark(long watermark) { - this.watermark = watermark; - } - - public long getWatermark() { - return watermark; - } - - @Override - public ProcessStage getStage() { - return stage; - } - - public long getTargetSnapshotId() { - return targetSnapshotId; - } - - public long getCurrentStageStartTime() { - return currentStageStartTime; - } - - @Override - public String getName() { - return stage.getDesc(); - } -} diff --git a/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java b/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java index a2df0a99f3..c2e5c838a8 100644 --- a/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java +++ b/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java @@ -19,30 +19,53 @@ package org.apache.amoro.process; import org.apache.amoro.Action; +import org.apache.amoro.ActivePlugin; +import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; +import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; +import org.apache.amoro.table.StateKey; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; /** * A factory to create a process. Normally, There will be default ProcessFactories for each action * and used by default scheduler. Meanwhile, user could extend external ProcessFactory to run jobs * on external resources like Yarn. */ -public interface ProcessFactory { +public interface ProcessFactory extends ActivePlugin { + + default List> requiredStates() { + return Lists.newArrayList(); + } + + /** Get supported actions for each table format. */ + Map> supportedActions(); + + /** How to trigger a process for the action. */ + default ProcessTriggerStrategy triggerStrategy(TableFormat format, Action action) { + return ProcessTriggerStrategy.METADATA_TRIGGER; + } /** - * Create a process for the action. + * Try trigger a process for the action. * * @param tableRuntime table runtime * @param action action type * @return target process which has not been submitted yet. */ - AmoroProcess create(TableRuntime tableRuntime, Action action); + Optional trigger(TableRuntime tableRuntime, Action action); /** * Recover a process for the action from a state. * * @param tableRuntime table runtime - * @param state state of the process + * @param store storage of the process * @return target process which has not been submitted yet. + * @throws RecoverProcessFailedException if recover failed */ - AmoroProcess recover(TableRuntime tableRuntime, TableProcessState state); + TableProcess recover(TableRuntime tableRuntime, TableProcessStore store) + throws RecoverProcessFailedException; } diff --git a/amoro-common/src/main/java/org/apache/amoro/process/ProcessState.java b/amoro-common/src/main/java/org/apache/amoro/process/ProcessState.java deleted file mode 100644 index 91f76fcaef..0000000000 --- a/amoro-common/src/main/java/org/apache/amoro/process/ProcessState.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.process; - -import org.apache.amoro.Action; - -import java.util.Map; - -/** - * ProcessState contains information in any {@link AmoroProcess} which must be persistent and {@link - * ProcessFactory} will use to recover {@link AmoroProcess}. - */ -public interface ProcessState { - - /** @return unique identifier of the process. */ - long getId(); - - /** - * @return the name of the state. If multiple stages are involved, it should be the name of the - * current stage. - */ - String getName(); - - /** @return start time of the process. */ - long getStartTime(); - - /** @return the action of the process. */ - Action getAction(); - - /** @return the status of the process. */ - ProcessStatus getStatus(); - - /** - * Get the string encoded summary of the process, this could be a simple description or a POJO - * encoded by JSON - * - * @return the summary of the process - */ - Map getSummary(); - - /** @return the reason of process failure, null if the process has not failed yet. */ - String getFailedReason(); - - /** - * Total millisecond running time of all tasks in the process. - * - * @return actual quota runtime of the process. - */ - long getQuotaRuntime(); - - /** - * Quota value is calculated by the total millisecond running time of all tasks in the process - * divided by the total millisecond from the start time to the current time. It is used to - * evaluate the actual runtime concurrence of the process. - * - * @return the quota value of the process. - */ - default double getQuotaValue() { - return (double) getQuotaRuntime() / (System.currentTimeMillis() - getStartTime()); - } -} diff --git a/amoro-common/src/main/java/org/apache/amoro/process/ProcessTriggerStrategy.java b/amoro-common/src/main/java/org/apache/amoro/process/ProcessTriggerStrategy.java new file mode 100644 index 0000000000..43bf5aed80 --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/process/ProcessTriggerStrategy.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.process; + +import java.time.Duration; + +/** Process trigger strategy. */ +public final class ProcessTriggerStrategy { + + public static final ProcessTriggerStrategy METADATA_TRIGGER = + new ProcessTriggerStrategy(Duration.ofDays(1), true, 1); + + private final Duration triggerInterval; + + private final boolean triggerOnNewSnapshot; + + private final int triggerParallelism; + + public ProcessTriggerStrategy( + Duration triggerInterval, boolean triggerOnNewSnapshot, int triggerParallelism) { + this.triggerInterval = triggerInterval; + this.triggerOnNewSnapshot = triggerOnNewSnapshot; + this.triggerParallelism = triggerParallelism; + } + + public static ProcessTriggerStrategy triggerAtFixRate(Duration triggerInterval) { + return new ProcessTriggerStrategy(triggerInterval, false, 1); + } + + public Duration getTriggerInterval() { + return triggerInterval; + } + + public boolean isTriggerOnNewSnapshot() { + return triggerOnNewSnapshot; + } + + public int getTriggerParallelism() { + return triggerParallelism; + } +} diff --git a/amoro-common/src/main/java/org/apache/amoro/process/RecoverProcessFailedException.java b/amoro-common/src/main/java/org/apache/amoro/process/RecoverProcessFailedException.java new file mode 100644 index 0000000000..5e9c0c64dd --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/process/RecoverProcessFailedException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.process; + +/** Exception thrown when a process recovery fails. */ +public class RecoverProcessFailedException extends RuntimeException { + public RecoverProcessFailedException(String message) { + super(message); + } +} diff --git a/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java b/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java deleted file mode 100644 index c4e800ed1c..0000000000 --- a/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.process; - -import org.apache.amoro.Action; -import org.apache.amoro.ServerTableIdentifier; -import org.apache.amoro.StateField; - -import java.util.Map; - -/** A common state of a table process. */ -public class TableProcessState implements ProcessState { - - @StateField private volatile long id; - @StateField private volatile String externalProcessIdentifier; - private final Action action; - private final ServerTableIdentifier tableIdentifier; - private String executionEngine; - @StateField private int retryNumber = 0; - @StateField private long startTime = -1L; - @StateField private long endTime = -1L; - @StateField private ProcessStatus status = ProcessStatus.PENDING; - @StateField private volatile String failedReason; - private volatile Map processParameters; - private volatile Map summary; - - public TableProcessState(Action action, ServerTableIdentifier tableIdentifier) { - this.action = action; - this.tableIdentifier = tableIdentifier; - } - - public TableProcessState(long id, Action action, ServerTableIdentifier tableIdentifier) { - this.id = id; - this.action = action; - this.tableIdentifier = tableIdentifier; - } - - public TableProcessState( - long id, Action action, ServerTableIdentifier tableIdentifier, String executionEngine) { - this.id = id; - this.action = action; - this.tableIdentifier = tableIdentifier; - this.executionEngine = executionEngine; - } - - @Override - public long getId() { - return id; - } - - public String getExternalProcessIdentifier() { - return externalProcessIdentifier; - } - - public String getName() { - return action.getName(); - } - - public Action getAction() { - return action; - } - - public long getStartTime() { - return startTime; - } - - public long getEndTime() { - return endTime; - } - - public ProcessStatus getStatus() { - return status; - } - - public String getExecutionEngine() { - return executionEngine; - } - - @Override - public Map getSummary() { - return summary; - } - - @Override - public long getQuotaRuntime() { - return getDuration(); - } - - @Override - public double getQuotaValue() { - return 1; - } - - public long getDuration() { - return endTime > 0 ? endTime - startTime : System.currentTimeMillis() - startTime; - } - - public ServerTableIdentifier getTableIdentifier() { - return tableIdentifier; - } - - public void setExternalProcessIdentifier(String externalProcessIdentifier) { - this.externalProcessIdentifier = externalProcessIdentifier; - } - - public void setExecutionEngine(String executionEngine) { - this.executionEngine = executionEngine; - } - - protected void setSummary(Map summary) { - this.summary = summary; - } - - protected void setStartTime(long startTime) { - this.startTime = startTime; - } - - public void setStatus(ProcessStatus status) { - if (status == ProcessStatus.SUCCESS - || status == ProcessStatus.FAILED - || status == ProcessStatus.KILLED) { - endTime = System.currentTimeMillis(); - } else if (this.status != ProcessStatus.SUBMITTED && status == ProcessStatus.SUBMITTED) { - endTime = -1L; - failedReason = null; - summary = null; - } - this.status = status; - } - - public String getFailedReason() { - return failedReason; - } - - public ProcessStage getStage() { - return status.toStage(); - } - - protected void setId(long processId) { - this.id = processId; - } - - public Map getProcessParameters() { - return processParameters; - } - - public void setProcessParameters(Map processParameters) { - this.processParameters = processParameters; - } - - public void setSubmitted(String externalProcessIdentifier) { - this.status = ProcessStatus.SUBMITTED; - setExternalProcessIdentifier(externalProcessIdentifier); - this.startTime = System.currentTimeMillis(); - } - - public void setSubmitted() { - this.status = ProcessStatus.SUBMITTED; - this.startTime = System.currentTimeMillis(); - } - - public void setRunning() { - this.status = ProcessStatus.RUNNING; - } - - public void setCanceling() { - this.status = ProcessStatus.CANCELING; - } - - public void addRetryNumber() { - this.retryNumber += 1; - this.status = ProcessStatus.PENDING; - this.externalProcessIdentifier = ""; - this.failedReason = null; - } - - public void resetRetryNumber() { - this.retryNumber = 0; - } - - public void setCanceled() { - this.status = ProcessStatus.CANCELED; - } - - public void setCompleted() { - this.status = ProcessStatus.SUCCESS; - this.endTime = System.currentTimeMillis(); - } - - public void setKilled() { - this.status = ProcessStatus.KILLED; - this.endTime = System.currentTimeMillis(); - } - - public void setCompleted(String failedReason) { - this.status = ProcessStatus.FAILED; - this.failedReason = failedReason; - this.endTime = System.currentTimeMillis(); - } - - public int getRetryNumber() { - return retryNumber; - } - - public void setRetryNumber(int retryNumber) { - this.retryNumber = retryNumber; - } -} diff --git a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java index 7654af7104..90e26bd4b6 100644 --- a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java +++ b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java @@ -21,6 +21,8 @@ import org.apache.amoro.ActivePlugin; import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.ActionCoordinator; +import org.apache.amoro.process.ProcessFactory; import java.util.List; import java.util.Map; @@ -29,6 +31,10 @@ /** Table runtime factory. */ public interface TableRuntimeFactory extends ActivePlugin { + List supportedCoordinators(); + + void initialize(List factories); + Optional accept( ServerTableIdentifier tableIdentifier, Map tableProperties);