Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.amoro.config.Configurations;
import org.apache.amoro.config.shade.utils.ConfigShadeUtils;
import org.apache.amoro.exception.AmoroRuntimeException;
import org.apache.amoro.process.ActionCoordinator;
import org.apache.amoro.process.ProcessFactory;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.DefaultCatalogManager;
import org.apache.amoro.server.dashboard.DashboardServer;
Expand All @@ -47,12 +49,15 @@
import org.apache.amoro.server.persistence.HttpSessionHandlerFactory;
import org.apache.amoro.server.persistence.SqlSessionFactoryProvider;
import org.apache.amoro.server.process.ProcessService;
import org.apache.amoro.server.process.TableProcessFactoryManager;
import org.apache.amoro.server.process.executor.ExecuteEngineManager;
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.Containers;
import org.apache.amoro.server.resource.DefaultOptimizerManager;
import org.apache.amoro.server.resource.OptimizerManager;
import org.apache.amoro.server.scheduler.inline.InlineTableExecutors;
import org.apache.amoro.server.table.DefaultTableManager;
import org.apache.amoro.server.table.DefaultTableRuntimeFactory;
import org.apache.amoro.server.table.DefaultTableService;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableManager;
Expand All @@ -61,6 +66,8 @@
import org.apache.amoro.server.terminal.TerminalManager;
import org.apache.amoro.server.utils.ThriftServiceProxy;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
Expand All @@ -75,6 +82,7 @@
import org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportException;
import org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportFactory;
import org.apache.amoro.shade.thrift.org.apache.thrift.transport.layered.TFramedTransport;
import org.apache.amoro.table.TableRuntimeFactory;
import org.apache.amoro.utils.IcebergThreadPools;
import org.apache.amoro.utils.JacksonUtil;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -96,6 +104,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;

public class AmoroServiceContainer {

Expand Down Expand Up @@ -231,14 +240,32 @@ public void transitionToFollower() {
public void startOptimizingService() throws Exception {
TableRuntimeFactoryManager tableRuntimeFactoryManager = new TableRuntimeFactoryManager();
tableRuntimeFactoryManager.initialize();
List<TableRuntimeFactory> tableRuntimeFactories = tableRuntimeFactoryManager.installedPlugins();
Preconditions.checkArgument(
tableRuntimeFactories.size() == 1, "Only one table runtime factory is supported");
TableRuntimeFactory tableRuntimeFactory = new DefaultTableRuntimeFactory();

TableProcessFactoryManager tableProcessFactoryManager = new TableProcessFactoryManager();
tableProcessFactoryManager.initialize();
List<ProcessFactory> processFactories = tableProcessFactoryManager.installedPlugins();
tableRuntimeFactory.initialize(processFactories);

List<ActionCoordinator> actionCoordinators =
tableRuntimeFactoryManager.installedPlugins().stream()
.flatMap(f -> f.supportedCoordinators().stream())
.collect(Collectors.toList());

ExecuteEngineManager executeEngineManager = new ExecuteEngineManager();

tableService =
new DefaultTableService(serviceConfig, catalogManager, tableRuntimeFactoryManager);
new DefaultTableService(
serviceConfig, catalogManager, Lists.newArrayList(tableRuntimeFactory));

optimizingService =
new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService);

processService = new ProcessService(serviceConfig, tableService);
processService =
new ProcessService(serviceConfig, tableService, actionCoordinators, executeEngineManager);

LOG.info("Setting up AMS table executors...");
InlineTableExecutors.getInstance().setup(tableService, serviceConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import org.apache.amoro.server.resource.OptimizerManager;
import org.apache.amoro.server.resource.OptimizerThread;
import org.apache.amoro.server.resource.QuotaProvider;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.CompatibleTableRuntime;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -142,16 +142,16 @@ public RuntimeHandlerChain getTableRuntimeHandler() {
return tableHandlerChain;
}

private void loadOptimizingQueues(List<DefaultTableRuntime> tableRuntimeList) {
private void loadOptimizingQueues(List<CompatibleTableRuntime> tableRuntimeList) {
List<ResourceGroup> optimizerGroups =
getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups);
List<OptimizerInstance> optimizers = getAs(OptimizerMapper.class, OptimizerMapper::selectAll);
Map<String, List<DefaultTableRuntime>> groupToTableRuntimes =
Map<String, List<CompatibleTableRuntime>> groupToTableRuntimes =
tableRuntimeList.stream().collect(Collectors.groupingBy(TableRuntime::getGroupName));
optimizerGroups.forEach(
group -> {
String groupName = group.getName();
List<DefaultTableRuntime> tableRuntimes = groupToTableRuntimes.remove(groupName);
List<CompatibleTableRuntime> tableRuntimes = groupToTableRuntimes.remove(groupName);
OptimizingQueue optimizingQueue =
new OptimizingQueue(
catalogManager,
Expand Down Expand Up @@ -275,7 +275,7 @@ public boolean cancelProcess(long processId) {
return false;
}
long tableId = processMeta.getTableId();
DefaultTableRuntime tableRuntime = (DefaultTableRuntime) tableService.getRuntime(tableId);
CompatibleTableRuntime tableRuntime = (CompatibleTableRuntime) tableService.getRuntime(tableId);
if (tableRuntime == null) {
return false;
}
Expand Down Expand Up @@ -368,7 +368,7 @@ private class TableRuntimeHandlerImpl extends RuntimeHandlerChain {

@Override
public void handleStatusChanged(TableRuntime tableRuntime, OptimizingStatus originalStatus) {
DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime) tableRuntime;
CompatibleTableRuntime defaultTableRuntime = (CompatibleTableRuntime) tableRuntime;
if (!defaultTableRuntime.getOptimizingStatus().isProcessing()) {
getOptionalQueueByGroup(defaultTableRuntime.getGroupName())
.ifPresent(q -> q.refreshTable(defaultTableRuntime));
Expand All @@ -377,7 +377,7 @@ public void handleStatusChanged(TableRuntime tableRuntime, OptimizingStatus orig

@Override
public void handleConfigChanged(TableRuntime runtime, TableConfiguration originalConfig) {
DefaultTableRuntime tableRuntime = (DefaultTableRuntime) runtime;
CompatibleTableRuntime tableRuntime = (CompatibleTableRuntime) runtime;
String originalGroup = originalConfig.getOptimizingConfig().getOptimizerGroup();
if (!tableRuntime.getGroupName().equals(originalGroup)) {
getOptionalQueueByGroup(originalGroup).ifPresent(q -> q.releaseTable(tableRuntime));
Expand All @@ -388,14 +388,14 @@ public void handleConfigChanged(TableRuntime runtime, TableConfiguration origina

@Override
public void handleTableAdded(AmoroTable<?> table, TableRuntime runtime) {
DefaultTableRuntime tableRuntime = (DefaultTableRuntime) runtime;
CompatibleTableRuntime tableRuntime = (CompatibleTableRuntime) runtime;
getOptionalQueueByGroup(tableRuntime.getGroupName())
.ifPresent(q -> q.refreshTable(tableRuntime));
}

@Override
public void handleTableRemoved(TableRuntime runtime) {
DefaultTableRuntime tableRuntime = (DefaultTableRuntime) runtime;
CompatibleTableRuntime tableRuntime = (CompatibleTableRuntime) runtime;
getOptionalQueueByGroup(tableRuntime.getGroupName())
.ifPresent(queue -> queue.releaseTable(tableRuntime));
}
Expand All @@ -405,8 +405,8 @@ protected void initHandler(List<TableRuntime> tableRuntimeList) {
LOG.info("OptimizerManagementService begin initializing");
loadOptimizingQueues(
tableRuntimeList.stream()
.filter(t -> t instanceof DefaultTableRuntime)
.map(t -> (DefaultTableRuntime) t)
.filter(t -> t instanceof CompatibleTableRuntime)
.map(t -> (CompatibleTableRuntime) t)
.collect(Collectors.toList()));
optimizerKeeper.start();
optimizingConfigWatcher.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.resource.OptimizerThread;
import org.apache.amoro.server.resource.QuotaProvider;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.CompatibleTableRuntime;
import org.apache.amoro.server.table.blocker.TableBlocker;
import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -112,7 +112,7 @@ public OptimizingQueue(
ResourceGroup optimizerGroup,
QuotaProvider quotaProvider,
Executor planExecutor,
List<DefaultTableRuntime> tableRuntimeList,
List<CompatibleTableRuntime> tableRuntimeList,
int maxPlanningParallelism) {
Preconditions.checkNotNull(optimizerGroup, "Optimizer group can not be null");
this.planExecutor = planExecutor;
Expand All @@ -128,7 +128,7 @@ public OptimizingQueue(
tableRuntimeList.forEach(this::initTableRuntime);
}

private void initTableRuntime(DefaultTableRuntime tableRuntime) {
private void initTableRuntime(CompatibleTableRuntime tableRuntime) {
TableOptimizingProcess process = null;
if (tableRuntime.getOptimizingStatus().isProcessing() && tableRuntime.getProcessId() != 0) {
TableProcessMeta meta =
Expand Down Expand Up @@ -172,7 +172,7 @@ public String getContainerName() {
return optimizerGroup.getContainer();
}

public void refreshTable(DefaultTableRuntime tableRuntime) {
public void refreshTable(CompatibleTableRuntime tableRuntime) {
if (tableRuntime.getOptimizingConfig().isEnabled()
&& !tableRuntime.getOptimizingStatus().isProcessing()) {
LOG.info(
Expand All @@ -185,7 +185,7 @@ public void refreshTable(DefaultTableRuntime tableRuntime) {
}
}

public void releaseTable(DefaultTableRuntime tableRuntime) {
public void releaseTable(CompatibleTableRuntime tableRuntime) {
scheduler.removeTable(tableRuntime);
List<OptimizingProcess> processList =
tableQueue.stream()
Expand Down Expand Up @@ -280,7 +280,7 @@ private void skipBlockedTables(Set<ServerTableIdentifier> skipTables) {
}

private void triggerAsyncPlanning(
DefaultTableRuntime tableRuntime, Set<ServerTableIdentifier> skipTables, long startTime) {
CompatibleTableRuntime tableRuntime, Set<ServerTableIdentifier> skipTables, long startTime) {
LOG.info(
"Trigger planning table {} by policy {}",
tableRuntime.getTableIdentifier(),
Expand Down Expand Up @@ -325,7 +325,7 @@ private void triggerAsyncPlanning(
});
}

private TableOptimizingProcess planInternal(DefaultTableRuntime tableRuntime) {
private TableOptimizingProcess planInternal(CompatibleTableRuntime tableRuntime) {
tableRuntime.beginPlanning();
try {
ServerTableIdentifier identifier = tableRuntime.getTableIdentifier();
Expand Down Expand Up @@ -427,7 +427,7 @@ private class TableOptimizingProcess implements OptimizingProcess {
private final Lock lock = new ReentrantLock();
private final long processId;
private final OptimizingType optimizingType;
private final DefaultTableRuntime tableRuntime;
private final CompatibleTableRuntime tableRuntime;
private final long planTime;
private final long targetSnapshotId;
private final long targetChangeSnapshotId;
Expand Down Expand Up @@ -471,7 +471,7 @@ public TaskRuntime<?> poll(OptimizerThread thread, boolean needQuotaChecking) {
}

public TableOptimizingProcess(
AbstractOptimizingPlanner planner, DefaultTableRuntime tableRuntime) {
AbstractOptimizingPlanner planner, CompatibleTableRuntime tableRuntime) {
processId = planner.getProcessId();
this.tableRuntime = tableRuntime;
optimizingType = planner.getOptimizingType();
Expand All @@ -485,7 +485,7 @@ public TableOptimizingProcess(
}

public TableOptimizingProcess(
DefaultTableRuntime tableRuntime,
CompatibleTableRuntime tableRuntime,
TableProcessMeta processMeta,
OptimizingProcessState processState) {
this.tableRuntime = tableRuntime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.optimizing.sorter.QuotaOccupySorter;
import org.apache.amoro.server.optimizing.sorter.SorterFactory;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.CompatibleTableRuntime;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.slf4j.Logger;
Expand All @@ -45,7 +45,8 @@ public class SchedulingPolicy {

private static final String SCHEDULING_POLICY_PROPERTY_NAME = "scheduling-policy";

private final Map<ServerTableIdentifier, DefaultTableRuntime> tableRuntimeMap = new HashMap<>();
private final Map<ServerTableIdentifier, CompatibleTableRuntime> tableRuntimeMap =
new HashMap<>();
private volatile String policyName;
private final Lock tableLock = new ReentrantLock();
private static final Map<String, SorterFactory> sorterFactoryCache = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -84,7 +85,7 @@ public String name() {
return policyName;
}

public DefaultTableRuntime scheduleTable(Set<ServerTableIdentifier> skipSet) {
public CompatibleTableRuntime scheduleTable(Set<ServerTableIdentifier> skipSet) {
tableLock.lock();
try {
fillSkipSet(skipSet);
Expand All @@ -97,7 +98,7 @@ public DefaultTableRuntime scheduleTable(Set<ServerTableIdentifier> skipSet) {
}
}

private Comparator<DefaultTableRuntime> createSorterByPolicy() {
private Comparator<CompatibleTableRuntime> createSorterByPolicy() {
if (sorterFactoryCache.get(policyName) != null) {
SorterFactory sorterFactory = sorterFactoryCache.get(policyName);
return sorterFactory.createComparator();
Expand All @@ -117,14 +118,14 @@ private void fillSkipSet(Set<ServerTableIdentifier> originalSet) {
.forEach(tableRuntime -> originalSet.add(tableRuntime.getTableIdentifier()));
}

private boolean isTablePending(DefaultTableRuntime tableRuntime) {
private boolean isTablePending(CompatibleTableRuntime tableRuntime) {
return tableRuntime.getOptimizingStatus() == OptimizingStatus.PENDING
&& (tableRuntime.getLastOptimizedSnapshotId() != tableRuntime.getCurrentSnapshotId()
|| tableRuntime.getLastOptimizedChangeSnapshotId()
!= tableRuntime.getCurrentChangeSnapshotId());
}

public void addTable(DefaultTableRuntime tableRuntime) {
public void addTable(CompatibleTableRuntime tableRuntime) {
tableLock.lock();
try {
tableRuntimeMap.put(tableRuntime.getTableIdentifier(), tableRuntime);
Expand All @@ -133,7 +134,7 @@ public void addTable(DefaultTableRuntime tableRuntime) {
}
}

public void removeTable(DefaultTableRuntime tableRuntime) {
public void removeTable(CompatibleTableRuntime tableRuntime) {
tableLock.lock();
try {
tableRuntimeMap.remove(tableRuntime.getTableIdentifier());
Expand All @@ -143,7 +144,7 @@ public void removeTable(DefaultTableRuntime tableRuntime) {
}

@VisibleForTesting
Map<ServerTableIdentifier, DefaultTableRuntime> getTableRuntimeMap() {
Map<ServerTableIdentifier, CompatibleTableRuntime> getTableRuntimeMap() {
return tableRuntimeMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.amoro.maintainer.MaintainerMetrics;
import org.apache.amoro.maintainer.OptimizingInfo;
import org.apache.amoro.maintainer.TableMaintainerContext;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.CompatibleTableRuntime;
import org.apache.amoro.server.table.TableOrphanFilesCleaningMetrics;
import org.apache.amoro.server.utils.HiveLocationUtil;
import org.apache.amoro.table.MixedTable;
Expand All @@ -31,20 +31,20 @@
import java.util.Set;

/**
* Default implementation of TableMaintainerContext for AMS. Adapts DefaultTableRuntime to
* Default implementation of TableMaintainerContext for AMS. Adapts CompatibleTableRuntime to
* TableMaintainerContext interface.
*/
public class DefaultTableMaintainerContext implements TableMaintainerContext {

private final DefaultTableRuntime tableRuntime;
private final CompatibleTableRuntime tableRuntime;
private final MixedTable mixedTable;

public DefaultTableMaintainerContext(DefaultTableRuntime tableRuntime) {
public DefaultTableMaintainerContext(CompatibleTableRuntime tableRuntime) {
this.tableRuntime = tableRuntime;
this.mixedTable = null;
}

public DefaultTableMaintainerContext(DefaultTableRuntime tableRuntime, MixedTable mixedTable) {
public DefaultTableMaintainerContext(CompatibleTableRuntime tableRuntime, MixedTable mixedTable) {
this.tableRuntime = tableRuntime;
this.mixedTable = mixedTable;
}
Expand Down Expand Up @@ -84,12 +84,12 @@ public Set<String> getHiveLocationPaths() {
return HiveLocationUtil.getHiveLocation(mixedTable);
}

/** OptimizingInfo implementation based on DefaultTableRuntime. */
/** OptimizingInfo implementation based on CompatibleTableRuntime. */
private static class DefaultOptimizingInfo implements OptimizingInfo {

private final DefaultTableRuntime tableRuntime;
private final CompatibleTableRuntime tableRuntime;

DefaultOptimizingInfo(DefaultTableRuntime tableRuntime) {
DefaultOptimizingInfo(CompatibleTableRuntime tableRuntime) {
this.tableRuntime = tableRuntime;
}

Expand Down
Loading
Loading