Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.amoro.config.Configurations;
import org.apache.amoro.config.shade.utils.ConfigShadeUtils;
import org.apache.amoro.exception.AmoroRuntimeException;
import org.apache.amoro.process.ActionCoordinator;
import org.apache.amoro.process.ProcessFactory;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.DefaultCatalogManager;
import org.apache.amoro.server.dashboard.DashboardServer;
Expand All @@ -47,6 +49,8 @@
import org.apache.amoro.server.persistence.HttpSessionHandlerFactory;
import org.apache.amoro.server.persistence.SqlSessionFactoryProvider;
import org.apache.amoro.server.process.ProcessService;
import org.apache.amoro.server.process.ProcessService.ExecuteEngineManager;
import org.apache.amoro.server.process.TableProcessFactoryManager;
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.Containers;
import org.apache.amoro.server.resource.DefaultOptimizerManager;
Expand Down Expand Up @@ -75,6 +79,7 @@
import org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportException;
import org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportFactory;
import org.apache.amoro.shade.thrift.org.apache.thrift.transport.layered.TFramedTransport;
import org.apache.amoro.table.TableRuntimeFactory;
import org.apache.amoro.utils.IcebergThreadPools;
import org.apache.amoro.utils.JacksonUtil;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -96,6 +101,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;

public class AmoroServiceContainer {

Expand Down Expand Up @@ -238,7 +244,23 @@ public void startOptimizingService() throws Exception {
optimizingService =
new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService);

processService = new ProcessService(serviceConfig, tableService);
// Load process factories and build action coordinators from all table runtime factories.
TableProcessFactoryManager tableProcessFactoryManager = new TableProcessFactoryManager();
tableProcessFactoryManager.initialize();
List<ProcessFactory> processFactories = tableProcessFactoryManager.installedPlugins();

List<TableRuntimeFactory> tableRuntimeFactories = tableRuntimeFactoryManager.installedPlugins();
tableRuntimeFactories.forEach(factory -> factory.initialize(processFactories));

List<ActionCoordinator> actionCoordinators =
tableRuntimeFactories.stream()
.flatMap(factory -> factory.supportedCoordinators().stream())
.collect(Collectors.toList());

ExecuteEngineManager executeEngineManager = new ExecuteEngineManager();

processService =
new ProcessService(serviceConfig, tableService, actionCoordinators, executeEngineManager);

LOG.info("Setting up AMS table executors...");
InlineTableExecutors.getInstance().setup(tableService, serviceConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public class ProcessService extends PersistentBase {
new ConcurrentHashMap<>();
private final Map<EngineType, ExecuteEngine> executeEngines = new ConcurrentHashMap<>();

private final ActionCoordinatorManager actionCoordinatorManager;
private final ExecuteEngineManager executeEngineManager;
private final List<ActionCoordinator> actionCoordinatorList;
private final ProcessRuntimeHandler tableRuntimeHandler = new ProcessRuntimeHandler();
private final ThreadPoolExecutor processExecutionPool =
new ThreadPoolExecutor(10, 100, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
Expand All @@ -74,16 +74,25 @@ public class ProcessService extends PersistentBase {
new ConcurrentHashMap<>();

public ProcessService(Configurations serviceConfig, TableService tableService) {
this(serviceConfig, tableService, new ActionCoordinatorManager(), new ExecuteEngineManager());
this(serviceConfig, tableService, Collections.emptyList(), new ExecuteEngineManager());
}

@Deprecated
public ProcessService(
Configurations serviceConfig,
TableService tableService,
ActionCoordinatorManager actionCoordinatorManager,
ExecuteEngineManager executeEngineManager) {
this(serviceConfig, tableService, Collections.emptyList(), executeEngineManager);
}

public ProcessService(
Configurations serviceConfig,
TableService tableService,
List<ActionCoordinator> actionCoordinators,
ExecuteEngineManager executeEngineManager) {
this.tableService = tableService;
this.actionCoordinatorManager = actionCoordinatorManager;
this.actionCoordinatorList = actionCoordinators;
this.executeEngineManager = executeEngineManager;
}

Expand Down Expand Up @@ -148,24 +157,19 @@ public void cancel(TableProcess process) {

/** Dispose the service, shutdown engines and clear active processes. */
public void dispose() {
actionCoordinatorManager.close();
executeEngineManager.close();
processExecutionPool.shutdown();
activeTableProcess.clear();
}

private void initialize(List<TableRuntime> tableRuntimes) {
LOG.info("Initializing process service");
actionCoordinatorManager.initialize();
actionCoordinatorManager
.installedPlugins()
.forEach(
actionCoordinator -> {
actionCoordinators.put(
actionCoordinator.action().getName(),
new ActionCoordinatorScheduler(
actionCoordinator, tableService, ProcessService.this));
});
// Pre-configured coordinators built from TableRuntimeFactory / ProcessFactory
for (ActionCoordinator actionCoordinator : actionCoordinatorList) {
actionCoordinators.put(
actionCoordinator.action().getName(),
new ActionCoordinatorScheduler(actionCoordinator, tableService, ProcessService.this));
}
executeEngineManager.initialize();
executeEngineManager
.installedPlugins()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.process;

import org.apache.amoro.process.ProcessFactory;
import org.apache.amoro.server.manager.AbstractPluginManager;

/**
* Plugin manager for {@link ProcessFactory} implementations.
*
* <p>Process factories are configured via {@code plugins/process-factories.yaml} and are
* responsible for describing how different {@code TableFormat} / {@code Action} combinations should
* be scheduled and executed.
*/
public class TableProcessFactoryManager extends AbstractPluginManager<ProcessFactory> {

public TableProcessFactoryManager() {
super("process-factories");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.table;

import org.apache.amoro.Action;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.process.ActionCoordinator;
import org.apache.amoro.process.ProcessFactory;
import org.apache.amoro.process.ProcessTriggerStrategy;
import org.apache.amoro.process.RecoverProcessFailedException;
import org.apache.amoro.process.TableProcess;
import org.apache.amoro.process.TableProcessStore;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;

import java.util.Map;
import java.util.Optional;

/**
* Default implementation of {@link ActionCoordinator} that bridges {@link ProcessFactory}
* declarations to the AMS scheduling framework.
*/
public class DefaultActionCoordinator implements ActionCoordinator {

private final Action action;
private final TableFormat format;
private final ProcessFactory factory;
private final ProcessTriggerStrategy strategy;

public DefaultActionCoordinator(TableFormat format, Action action, ProcessFactory factory) {
this.action = action;
this.format = format;
this.factory = factory;
this.strategy = factory.triggerStrategy(format, action);
Preconditions.checkArgument(
strategy != null,
"ProcessTriggerStrategy cannot be null for format %s, action %s, factory %s",
format,
action,
factory.name());
}

@Override
public String name() {
// No need to be globally unique, this coordinator is not discovered via plugin manager.
return String.format("%s-%s-coordinator", format.name().toLowerCase(), action.getName());
}

@Override
public void open(Map<String, String> properties) {
// No-op: lifecycle is managed by owning TableRuntimeFactory.
}

@Override
public void close() {
// No-op: nothing to close.
}

@Override
public boolean formatSupported(TableFormat format) {
return this.format.equals(format);
}

@Override
public int parallelism() {
return strategy.getTriggerParallelism();
}

@Override
public Action action() {
return action;
}

@Override
public long getNextExecutingTime(TableRuntime tableRuntime) {
// Fixed-rate scheduling based on configured trigger interval.
return strategy.getTriggerInterval().toMillis();
}

@Override
public boolean enabled(TableRuntime tableRuntime) {
return formatSupported(tableRuntime.getFormat());
}

@Override
public long getExecutorDelay() {
return strategy.getTriggerInterval().toMillis();
}

@Override
public Optional<TableProcess> trigger(TableRuntime tableRuntime) {
return factory.trigger(tableRuntime, action);
}

@Override
public TableProcess recoverTableProcess(
TableRuntime tableRuntime, TableProcessStore processStore) {
try {
return factory.recover(tableRuntime, processStore);
} catch (RecoverProcessFailedException e) {
throw new IllegalStateException(
String.format(
"Failed to recover table process for format %s, action %s, table %s",
format, action, tableRuntime.getTableIdentifier()),
e);
}
}
}
Loading