diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 4fd3e5e9af..5f3365c899 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -32,6 +32,8 @@ import org.apache.amoro.config.Configurations; import org.apache.amoro.config.shade.utils.ConfigShadeUtils; import org.apache.amoro.exception.AmoroRuntimeException; +import org.apache.amoro.process.ActionCoordinator; +import org.apache.amoro.process.ProcessFactory; import org.apache.amoro.server.catalog.CatalogManager; import org.apache.amoro.server.catalog.DefaultCatalogManager; import org.apache.amoro.server.dashboard.DashboardServer; @@ -47,16 +49,18 @@ import org.apache.amoro.server.persistence.HttpSessionHandlerFactory; import org.apache.amoro.server.persistence.SqlSessionFactoryProvider; import org.apache.amoro.server.process.ProcessService; +import org.apache.amoro.server.process.ProcessService.ExecuteEngineManager; +import org.apache.amoro.server.process.TableProcessFactoryManager; import org.apache.amoro.server.resource.ContainerMetadata; import org.apache.amoro.server.resource.Containers; import org.apache.amoro.server.resource.DefaultOptimizerManager; import org.apache.amoro.server.resource.OptimizerManager; import org.apache.amoro.server.scheduler.inline.InlineTableExecutors; import org.apache.amoro.server.table.DefaultTableManager; +import org.apache.amoro.server.table.DefaultTableRuntimeFactory; import org.apache.amoro.server.table.DefaultTableService; import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableManager; -import org.apache.amoro.server.table.TableRuntimeFactoryManager; import org.apache.amoro.server.table.TableService; import org.apache.amoro.server.terminal.TerminalManager; import org.apache.amoro.server.utils.ThriftServiceProxy; @@ -229,17 +233,23 @@ public void transitionToFollower() { } public void startOptimizingService() throws Exception { - TableRuntimeFactoryManager tableRuntimeFactoryManager = new TableRuntimeFactoryManager(); - tableRuntimeFactoryManager.initialize(); + // Load process factories and build action coordinators from default table runtime factory. + TableProcessFactoryManager tableProcessFactoryManager = new TableProcessFactoryManager(); + tableProcessFactoryManager.initialize(); + List processFactories = tableProcessFactoryManager.installedPlugins(); - tableService = - new DefaultTableService(serviceConfig, catalogManager, tableRuntimeFactoryManager); + DefaultTableRuntimeFactory defaultRuntimeFactory = new DefaultTableRuntimeFactory(); + defaultRuntimeFactory.initialize(processFactories); + List actionCoordinators = defaultRuntimeFactory.supportedCoordinators(); + ExecuteEngineManager executeEngineManager = new ExecuteEngineManager(); + + tableService = new DefaultTableService(serviceConfig, catalogManager, defaultRuntimeFactory); + processService = + new ProcessService(serviceConfig, tableService, actionCoordinators, executeEngineManager); optimizingService = new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService); - processService = new ProcessService(serviceConfig, tableService); - LOG.info("Setting up AMS table executors..."); InlineTableExecutors.getInstance().setup(tableService, serviceConfig); addHandlerChain(optimizingService.getTableRuntimeHandler()); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java index 252d1574bf..bf4ab6564b 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java @@ -64,8 +64,8 @@ public class ProcessService extends PersistentBase { new ConcurrentHashMap<>(); private final Map executeEngines = new ConcurrentHashMap<>(); - private final ActionCoordinatorManager actionCoordinatorManager; private final ExecuteEngineManager executeEngineManager; + private final List actionCoordinatorList; private final ProcessRuntimeHandler tableRuntimeHandler = new ProcessRuntimeHandler(); private final ThreadPoolExecutor processExecutionPool = new ThreadPoolExecutor(10, 100, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); @@ -74,16 +74,16 @@ public class ProcessService extends PersistentBase { new ConcurrentHashMap<>(); public ProcessService(Configurations serviceConfig, TableService tableService) { - this(serviceConfig, tableService, new ActionCoordinatorManager(), new ExecuteEngineManager()); + this(serviceConfig, tableService, Collections.emptyList(), new ExecuteEngineManager()); } public ProcessService( Configurations serviceConfig, TableService tableService, - ActionCoordinatorManager actionCoordinatorManager, + List actionCoordinators, ExecuteEngineManager executeEngineManager) { this.tableService = tableService; - this.actionCoordinatorManager = actionCoordinatorManager; + this.actionCoordinatorList = actionCoordinators; this.executeEngineManager = executeEngineManager; } @@ -148,7 +148,6 @@ public void cancel(TableProcess process) { /** Dispose the service, shutdown engines and clear active processes. */ public void dispose() { - actionCoordinatorManager.close(); executeEngineManager.close(); processExecutionPool.shutdown(); activeTableProcess.clear(); @@ -156,16 +155,12 @@ public void dispose() { private void initialize(List tableRuntimes) { LOG.info("Initializing process service"); - actionCoordinatorManager.initialize(); - actionCoordinatorManager - .installedPlugins() - .forEach( - actionCoordinator -> { - actionCoordinators.put( - actionCoordinator.action().getName(), - new ActionCoordinatorScheduler( - actionCoordinator, tableService, ProcessService.this)); - }); + // Pre-configured coordinators built from TableRuntimeFactory / ProcessFactory + for (ActionCoordinator actionCoordinator : actionCoordinatorList) { + actionCoordinators.put( + actionCoordinator.action().getName(), + new ActionCoordinatorScheduler(actionCoordinator, tableService, ProcessService.this)); + } executeEngineManager.initialize(); executeEngineManager .installedPlugins() @@ -553,13 +548,6 @@ protected void doDispose() { } } - /** Manager for {@link ActionCoordinator} plugins. */ - public static class ActionCoordinatorManager extends AbstractPluginManager { - public ActionCoordinatorManager() { - super("action-coordinators"); - } - } - /** Manager for {@link ExecuteEngine} plugins. */ public static class ExecuteEngineManager extends AbstractPluginManager { public ExecuteEngineManager() { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeFactoryManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessFactoryManager.java old mode 100644 new mode 100755 similarity index 61% rename from amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeFactoryManager.java rename to amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessFactoryManager.java index 65a9061081..f54290937d --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeFactoryManager.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessFactoryManager.java @@ -16,15 +16,21 @@ * limitations under the License. */ -package org.apache.amoro.server.table; +package org.apache.amoro.server.process; +import org.apache.amoro.process.ProcessFactory; import org.apache.amoro.server.manager.AbstractPluginManager; -import org.apache.amoro.table.TableRuntimeFactory; -public class TableRuntimeFactoryManager extends AbstractPluginManager { - public static final String PLUGIN_CATEGORY = "table-runtime-factories"; +/** + * Plugin manager for {@link ProcessFactory} implementations. + * + *

Process factories are configured via {@code plugins/process-factories.yaml} and are + * responsible for describing how different {@code TableFormat} / {@code Action} combinations should + * be scheduled and executed. + */ +public class TableProcessFactoryManager extends AbstractPluginManager { - public TableRuntimeFactoryManager() { - super(PLUGIN_CATEGORY); + public TableProcessFactoryManager() { + super("process-factories"); } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultActionCoordinator.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultActionCoordinator.java new file mode 100755 index 0000000000..053e3c1cff --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultActionCoordinator.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table; + +import org.apache.amoro.Action; +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.ActionCoordinator; +import org.apache.amoro.process.ProcessFactory; +import org.apache.amoro.process.ProcessTriggerStrategy; +import org.apache.amoro.process.RecoverProcessFailedException; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.process.TableProcessStore; +import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; + +import java.util.Map; +import java.util.Optional; + +/** + * Default implementation of {@link ActionCoordinator} that bridges {@link ProcessFactory} + * declarations to the AMS scheduling framework. + */ +public class DefaultActionCoordinator implements ActionCoordinator { + + private final Action action; + private final TableFormat format; + private final ProcessFactory factory; + private final ProcessTriggerStrategy strategy; + + public DefaultActionCoordinator(TableFormat format, Action action, ProcessFactory factory) { + this.action = action; + this.format = format; + this.factory = factory; + this.strategy = factory.triggerStrategy(format, action); + Preconditions.checkArgument( + strategy != null, + "ProcessTriggerStrategy cannot be null for format %s, action %s, factory %s", + format, + action, + factory.name()); + } + + @Override + public String name() { + // No need to be globally unique, this coordinator is not discovered via plugin manager. + return String.format("%s-%s-coordinator", format.name().toLowerCase(), action.getName()); + } + + @Override + public void open(Map properties) { + // No-op: lifecycle is managed by owning TableRuntimeFactory. + } + + @Override + public void close() { + // No-op: nothing to close. + } + + @Override + public boolean formatSupported(TableFormat format) { + return this.format.equals(format); + } + + @Override + public int parallelism() { + return strategy.getTriggerParallelism(); + } + + @Override + public Action action() { + return action; + } + + @Override + public long getNextExecutingTime(TableRuntime tableRuntime) { + // Fixed-rate scheduling based on configured trigger interval. + return strategy.getTriggerInterval().toMillis(); + } + + @Override + public boolean enabled(TableRuntime tableRuntime) { + return formatSupported(tableRuntime.getFormat()); + } + + @Override + public long getExecutorDelay() { + return strategy.getTriggerInterval().toMillis(); + } + + @Override + public Optional trigger(TableRuntime tableRuntime) { + return factory.trigger(tableRuntime, action); + } + + @Override + public TableProcess recoverTableProcess( + TableRuntime tableRuntime, TableProcessStore processStore) { + try { + return factory.recover(tableRuntime, processStore); + } catch (RecoverProcessFailedException e) { + throw new IllegalStateException( + String.format( + "Failed to recover table process for format %s, action %s, table %s", + format, action, tableRuntime.getTableIdentifier()), + e); + } + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java index 41c8041f22..e148aca4b0 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java @@ -18,6 +18,7 @@ package org.apache.amoro.server.table; +import org.apache.amoro.Action; import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; @@ -28,45 +29,108 @@ import org.apache.amoro.table.TableRuntimeFactory; import org.apache.amoro.table.TableRuntimeStore; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; +/** + * Default {@link TableRuntimeFactory} implementation used by AMS. + * + *

Besides creating {@link DefaultTableRuntime} instances for mixed/iceberg formats, this factory + * also aggregates {@link ProcessFactory} declarations to expose {@link ActionCoordinator} plugins + * for different {@link TableFormat}/{@link Action} combinations. + */ public class DefaultTableRuntimeFactory implements TableRuntimeFactory { - @Override - public void open(Map properties) {} - @Override - public void close() {} + /** Mapping from table format to its supported actions and corresponding process factory. */ + private final Map> factoriesByFormat = new HashMap<>(); - @Override - public String name() { - return "default"; - } + /** Coordinators derived from all installed process factories. */ + private final List supportedCoordinators = Lists.newArrayList(); @Override public List supportedCoordinators() { - return Lists.newArrayList(); + return supportedCoordinators; } @Override - public void initialize(List factories) {} + public void initialize(List factories) { + factoriesByFormat.clear(); + supportedCoordinators.clear(); + + for (ProcessFactory factory : factories) { + Map> supported = factory.supportedActions(); + if (supported == null || supported.isEmpty()) { + continue; + } + + for (Map.Entry> entry : supported.entrySet()) { + TableFormat format = entry.getKey(); + Map byAction = + factoriesByFormat.computeIfAbsent(format, k -> new HashMap<>()); + + for (Action action : entry.getValue()) { + ProcessFactory existed = byAction.get(action); + if (existed != null && existed != factory) { + throw new IllegalArgumentException( + String.format( + "ProcessFactory conflict for format %s and action %s, existing: %s, new: %s", + format, action, existed.name(), factory.name())); + } + byAction.put(action, factory); + supportedCoordinators.add(new DefaultActionCoordinator(format, action, factory)); + } + } + } + } @Override public Optional accept( ServerTableIdentifier tableIdentifier, Map tableProperties) { - if (tableIdentifier - .getFormat() - .in(TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE, TableFormat.ICEBERG)) { - return Optional.of(new TableRuntimeCreatorImpl()); + TableFormat format = tableIdentifier.getFormat(); + boolean defaultSupported = + format.in(TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE, TableFormat.ICEBERG); + boolean hasProcessFactories = factoriesByFormat.containsKey(format); + + if (!defaultSupported && !hasProcessFactories) { + return Optional.empty(); } - return Optional.empty(); + + return Optional.of(new TableRuntimeCreatorImpl(format)); } - private static class TableRuntimeCreatorImpl implements TableRuntimeFactory.TableRuntimeCreator { + private class TableRuntimeCreatorImpl implements TableRuntimeFactory.TableRuntimeCreator { + + private final TableFormat format; + + private TableRuntimeCreatorImpl(TableFormat format) { + this.format = format; + } + @Override public List> requiredStateKeys() { - return DefaultTableRuntime.REQUIRED_STATES; + Map> merged = new LinkedHashMap<>(); + // 1) DefaultTableRuntime required states + for (StateKey stateKey : DefaultTableRuntime.REQUIRED_STATES) { + merged.put(stateKey.getKey(), stateKey); + } + + // 2) Extra states from all process factories for this format (if any) + Map byAction = factoriesByFormat.get(format); + if (byAction != null) { + byAction + .values() + .forEach( + factory -> + factory + .requiredStates() + .forEach(stateKey -> merged.put(stateKey.getKey(), stateKey))); + } + + return Lists.newArrayList(merged.values()); } @Override diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java index 464cd5601d..a247b18041 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java @@ -44,6 +44,7 @@ import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.shade.guava32.com.google.common.collect.Sets; import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.amoro.table.TableRuntimeFactory; import org.apache.amoro.table.TableSummary; import org.apache.amoro.utils.TablePropertyUtil; import org.slf4j.Logger; @@ -85,19 +86,19 @@ public class DefaultTableService extends PersistentBase implements TableService private final CompletableFuture initialized = new CompletableFuture<>(); private final Configurations serverConfiguration; private final CatalogManager catalogManager; - private final TableRuntimeFactoryManager tableRuntimeFactoryManager; + private final TableRuntimeFactory tableRuntimeFactory; private RuntimeHandlerChain headHandler; private ExecutorService tableExplorerExecutors; public DefaultTableService( Configurations configuration, CatalogManager catalogManager, - TableRuntimeFactoryManager tableRuntimeFactoryManager) { + TableRuntimeFactory tableRuntimeFactory) { this.catalogManager = catalogManager; this.externalCatalogRefreshingInterval = configuration.get(AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL).toMillis(); this.serverConfiguration = configuration; - this.tableRuntimeFactoryManager = tableRuntimeFactoryManager; + this.tableRuntimeFactory = tableRuntimeFactory; } @Override @@ -515,21 +516,19 @@ private Optional createTableRuntime( ServerTableIdentifier identifier, TableRuntimeMeta runtimeMeta, List restoredStates) { - return tableRuntimeFactoryManager.installedPlugins().stream() - .map(f -> f.accept(identifier, runtimeMeta.getTableConfig())) - .filter(Optional::isPresent) - .map(Optional::get) - .findFirst() - .map( - creator -> { - DefaultTableRuntimeStore store = - new DefaultTableRuntimeStore( - identifier, runtimeMeta, creator.requiredStateKeys(), restoredStates); - store.setRuntimeHandler(this); - TableRuntime tableRuntime = creator.create(store); - store.setTableRuntime(tableRuntime); - return tableRuntime; - }); + Optional creatorOpt = + tableRuntimeFactory.accept(identifier, runtimeMeta.getTableConfig()); + + return creatorOpt.map( + creator -> { + DefaultTableRuntimeStore store = + new DefaultTableRuntimeStore( + identifier, runtimeMeta, creator.requiredStateKeys(), restoredStates); + store.setRuntimeHandler(this); + TableRuntime tableRuntime = creator.create(store); + store.setTableRuntime(tableRuntime); + return tableRuntime; + }); } private void revertTableRuntimeAdded( diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java index 9f3d25b3f7..bd8e4cdec8 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java @@ -26,12 +26,9 @@ import org.apache.amoro.server.table.DefaultTableRuntime; import org.apache.amoro.server.table.DefaultTableRuntimeFactory; import org.apache.amoro.server.table.DefaultTableService; -import org.apache.amoro.server.table.TableRuntimeFactoryManager; -import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; -import org.mockito.Mockito; import java.time.Duration; @@ -40,22 +37,16 @@ public abstract class AMSServiceTestBase extends AMSManagerTestBase { private static DefaultOptimizingService OPTIMIZING_SERVICE = null; private static ProcessService PROCESS_SERVICE = null; - private static TableRuntimeFactoryManager tableRuntimeFactoryManager = null; - @BeforeClass public static void initTableService() { DefaultTableRuntimeFactory runtimeFactory = new DefaultTableRuntimeFactory(); - tableRuntimeFactoryManager = Mockito.mock(TableRuntimeFactoryManager.class); - Mockito.when(tableRuntimeFactoryManager.installedPlugins()) - .thenReturn(Lists.newArrayList(runtimeFactory)); try { Configurations configurations = new Configurations(); configurations.set(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT, Duration.ofMillis(800L)); configurations.set( AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT, Duration.ofMillis(30000L)); TABLE_SERVICE = - new DefaultTableService( - new Configurations(), CATALOG_MANAGER, tableRuntimeFactoryManager); + new DefaultTableService(new Configurations(), CATALOG_MANAGER, runtimeFactory); OPTIMIZING_SERVICE = new DefaultOptimizingService( configurations, CATALOG_MANAGER, OPTIMIZER_MANAGER, TABLE_SERVICE); @@ -80,7 +71,6 @@ public static void disposeTableService() { TABLE_SERVICE.dispose(); MetricManager.dispose(); EventsManager.dispose(); - tableRuntimeFactoryManager = null; } protected DefaultTableService tableService() { diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java index 792faebcf7..898b3d0151 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java @@ -41,7 +41,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.mockito.Mockito; import java.util.List; @@ -49,7 +48,7 @@ public class TestDefaultTableRuntimeHandler extends AMSTableTestBase { private DefaultTableService tableService; - private final TableRuntimeFactoryManager runtimeFactoryManager; + private final DefaultTableRuntimeFactory runtimeFactory; @Parameterized.Parameters(name = "{0}, {1}") public static Object[] parameters() { @@ -66,16 +65,12 @@ public static Object[] parameters() { public TestDefaultTableRuntimeHandler( CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) { super(catalogTestHelper, tableTestHelper, false); - DefaultTableRuntimeFactory runtimeFactory = new DefaultTableRuntimeFactory(); - runtimeFactoryManager = Mockito.mock(TableRuntimeFactoryManager.class); - Mockito.when(runtimeFactoryManager.installedPlugins()) - .thenReturn(Lists.newArrayList(runtimeFactory)); + this.runtimeFactory = new DefaultTableRuntimeFactory(); } @Test public void testInitialize() throws Exception { - tableService = - new DefaultTableService(new Configurations(), CATALOG_MANAGER, runtimeFactoryManager); + tableService = new DefaultTableService(new Configurations(), CATALOG_MANAGER, runtimeFactory); TestHandler handler = new TestHandler(); tableService.addHandlerChain(handler); tableService.initialize(); @@ -94,8 +89,7 @@ public void testInitialize() throws Exception { Assert.assertTrue(handler.isDisposed()); // initialize with a history table - tableService = - new DefaultTableService(new Configurations(), CATALOG_MANAGER, runtimeFactoryManager); + tableService = new DefaultTableService(new Configurations(), CATALOG_MANAGER, runtimeFactory); handler = new TestHandler(); tableService.addHandlerChain(handler); tableService.initialize(); @@ -132,8 +126,7 @@ public void testInitialize() throws Exception { @Test public void testRefreshUpdatesOptimizerGroup() throws Exception { - tableService = - new DefaultTableService(new Configurations(), CATALOG_MANAGER, runtimeFactoryManager); + tableService = new DefaultTableService(new Configurations(), CATALOG_MANAGER, runtimeFactory); TestHandler handler = new TestHandler(); tableService.addHandlerChain(handler); tableService.initialize(); diff --git a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java index 90e26bd4b6..5799166c59 100644 --- a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java +++ b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java @@ -18,7 +18,6 @@ package org.apache.amoro.table; -import org.apache.amoro.ActivePlugin; import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableRuntime; import org.apache.amoro.process.ActionCoordinator; @@ -29,7 +28,7 @@ import java.util.Optional; /** Table runtime factory. */ -public interface TableRuntimeFactory extends ActivePlugin { +public interface TableRuntimeFactory { List supportedCoordinators();