diff --git a/pom.xml b/pom.xml
index f1d7c50b5d127..6a0c2a77c4adf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,7 +52,7 @@
4.12.0
3.4.0
19.3.0.0
- 1.43
+ 1.45
2.13.1
diff --git a/presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaRequirement.java b/presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaRequirement.java
index fcc2404926d91..ec30b024c58e7 100644
--- a/presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaRequirement.java
+++ b/presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaRequirement.java
@@ -13,6 +13,9 @@
*/
package com.facebook.presto.hive;
+import com.facebook.drift.annotations.ThriftConstructor;
+import com.facebook.drift.annotations.ThriftField;
+import com.facebook.drift.annotations.ThriftStruct;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.airlift.units.DataSize;
@@ -24,6 +27,7 @@
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;
+@ThriftStruct
public class CacheQuotaRequirement
{
public static final CacheQuotaRequirement NO_CACHE_REQUIREMENT = new CacheQuotaRequirement(GLOBAL, Optional.empty());
@@ -32,6 +36,7 @@ public class CacheQuotaRequirement
private final Optional quota;
@JsonCreator
+ @ThriftConstructor
public CacheQuotaRequirement(
@JsonProperty("cacheQuotaScope") CacheQuotaScope cacheQuotaScope,
@JsonProperty("quota") Optional quota)
@@ -41,12 +46,14 @@ public CacheQuotaRequirement(
}
@JsonProperty
+ @ThriftField(1)
public CacheQuotaScope getCacheQuotaScope()
{
return cacheQuotaScope;
}
@JsonProperty
+ @ThriftField(2)
public Optional getQuota()
{
return quota;
diff --git a/presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaScope.java b/presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaScope.java
index 2c8dddd3b6ba7..c10f05acebd4b 100644
--- a/presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaScope.java
+++ b/presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaScope.java
@@ -13,7 +13,28 @@
*/
package com.facebook.presto.hive;
+import com.facebook.drift.annotations.ThriftEnum;
+import com.facebook.drift.annotations.ThriftEnumValue;
+
+@ThriftEnum
public enum CacheQuotaScope
{
- GLOBAL, SCHEMA, TABLE, PARTITION
+ GLOBAL(0),
+ SCHEMA(1),
+ TABLE(2),
+ PARTITION(3),
+ /**/;
+
+ private final int value;
+
+ CacheQuotaScope(int value)
+ {
+ this.value = value;
+ }
+
+ @ThriftEnumValue
+ public int getValue()
+ {
+ return value;
+ }
}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorCodecManager.java b/presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorCodecManager.java
new file mode 100644
index 0000000000000..fb5179d141507
--- /dev/null
+++ b/presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorCodecManager.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.connector;
+
+import com.facebook.drift.codec.ThriftCodecManager;
+import com.facebook.presto.spi.ConnectorCodec;
+import com.facebook.presto.spi.ConnectorDeleteTableHandle;
+import com.facebook.presto.spi.ConnectorId;
+import com.facebook.presto.spi.ConnectorInsertTableHandle;
+import com.facebook.presto.spi.ConnectorOutputTableHandle;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.connector.ConnectorCodecProvider;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.thrift.RemoteCodecProvider;
+import com.google.inject.Provider;
+
+import javax.inject.Inject;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static com.facebook.presto.operator.ExchangeOperator.REMOTE_CONNECTOR_ID;
+import static java.util.Objects.requireNonNull;
+
+public class ConnectorCodecManager
+{
+ private final Map connectorCodecProviders = new ConcurrentHashMap<>();
+
+ @Inject
+ public ConnectorCodecManager(Provider thriftCodecManagerProvider)
+ {
+ requireNonNull(thriftCodecManagerProvider, "thriftCodecManager is null");
+
+ connectorCodecProviders.put(REMOTE_CONNECTOR_ID.toString(), new RemoteCodecProvider(thriftCodecManagerProvider));
+ }
+
+ public void addConnectorCodecProvider(ConnectorId connectorId, ConnectorCodecProvider connectorCodecProvider)
+ {
+ requireNonNull(connectorId, "connectorId is null");
+ requireNonNull(connectorCodecProvider, "connectorThriftCodecProvider is null");
+ connectorCodecProviders.put(connectorId.getCatalogName(), connectorCodecProvider);
+ }
+
+ public Optional> getConnectorSplitCodec(String connectorId)
+ {
+ requireNonNull(connectorId, "connectorId is null");
+ return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorSplitCodec);
+ }
+
+ public Optional> getTransactionHandleCodec(String connectorId)
+ {
+ requireNonNull(connectorId, "connectorId is null");
+ return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorTransactionHandleCodec);
+ }
+
+ public Optional> getOutputTableHandleCodec(String connectorId)
+ {
+ requireNonNull(connectorId, "connectorId is null");
+ return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorOutputTableHandleCodec);
+ }
+
+ public Optional> getInsertTableHandleCodec(String connectorId)
+ {
+ requireNonNull(connectorId, "connectorId is null");
+ return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorInsertTableHandleCodec);
+ }
+
+ public Optional> getDeleteTableHandleCodec(String connectorId)
+ {
+ requireNonNull(connectorId, "connectorId is null");
+ return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorDeleteTableHandleCodec);
+ }
+
+ public Optional> getTableLayoutHandleCodec(String connectorId)
+ {
+ requireNonNull(connectorId, "connectorId is null");
+ return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorTableLayoutHandleCodec);
+ }
+
+ public Optional> getTableHandleCodec(String connectorId)
+ {
+ requireNonNull(connectorId, "connectorId is null");
+ return Optional.ofNullable(connectorCodecProviders.get(connectorId)).flatMap(ConnectorCodecProvider::getConnectorTableHandleCodec);
+ }
+}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorManager.java b/presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorManager.java
index 32fada27a9b7b..b1747993fe0fe 100644
--- a/presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorManager.java
+++ b/presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorManager.java
@@ -40,6 +40,7 @@
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
import com.facebook.presto.spi.connector.Connector;
import com.facebook.presto.spi.connector.ConnectorAccessControl;
+import com.facebook.presto.spi.connector.ConnectorCodecProvider;
import com.facebook.presto.spi.connector.ConnectorContext;
import com.facebook.presto.spi.connector.ConnectorFactory;
import com.facebook.presto.spi.connector.ConnectorIndexProvider;
@@ -118,6 +119,7 @@ public class ConnectorManager
private final FilterStatsCalculator filterStatsCalculator;
private final BlockEncodingSerde blockEncodingSerde;
private final ConnectorSystemConfig connectorSystemConfig;
+ private final ConnectorCodecManager connectorCodecManager;
@GuardedBy("this")
private final ConcurrentMap connectorFactories = new ConcurrentHashMap<>();
@@ -151,7 +153,8 @@ public ConnectorManager(
DeterminismEvaluator determinismEvaluator,
FilterStatsCalculator filterStatsCalculator,
BlockEncodingSerde blockEncodingSerde,
- FeaturesConfig featuresConfig)
+ FeaturesConfig featuresConfig,
+ ConnectorCodecManager connectorCodecManager)
{
this.metadataManager = requireNonNull(metadataManager, "metadataManager is null");
this.catalogManager = requireNonNull(catalogManager, "catalogManager is null");
@@ -176,6 +179,7 @@ public ConnectorManager(
this.filterStatsCalculator = requireNonNull(filterStatsCalculator, "filterStatsCalculator is null");
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
this.connectorSystemConfig = () -> featuresConfig.isNativeExecutionEnabled();
+ this.connectorCodecManager = requireNonNull(connectorCodecManager, "connectorThriftCodecManager is null");
}
@PreDestroy
@@ -303,7 +307,7 @@ private synchronized void addConnectorInternal(MaterializedConnector connector)
connector.getPlanOptimizerProvider()
.ifPresent(planOptimizerProvider -> connectorPlanOptimizerManager.addPlanOptimizerProvider(connectorId, planOptimizerProvider));
}
-
+ connector.getConnectorCodecProvider().ifPresent(connectorCodecProvider -> connectorCodecManager.addConnectorCodecProvider(connectorId, connectorCodecProvider));
metadataManager.getProcedureRegistry().addProcedures(connectorId, connector.getProcedures());
connector.getAccessControl()
@@ -392,6 +396,7 @@ private static class MaterializedConnector
private final Optional indexProvider;
private final Optional partitioningProvider;
private final Optional planOptimizerProvider;
+ private final Optional connectorCodecProvider;
private final Optional accessControl;
private final List> sessionProperties;
private final List> tableProperties;
@@ -472,6 +477,15 @@ public MaterializedConnector(ConnectorId connectorId, Connector connector)
}
this.planOptimizerProvider = Optional.ofNullable(planOptimizerProvider);
+ ConnectorCodecProvider connectorCodecProvider = null;
+ try {
+ connectorCodecProvider = connector.getConnectorCodecProvider();
+ requireNonNull(connectorCodecProvider, format("Connector %s returned null connector specific codec provider", connectorId));
+ }
+ catch (UnsupportedOperationException ignored) {
+ }
+ this.connectorCodecProvider = Optional.ofNullable(connectorCodecProvider);
+
ConnectorAccessControl accessControl = null;
try {
accessControl = connector.getAccessControl();
@@ -580,5 +594,10 @@ public List> getAnalyzeProperties()
{
return analyzeProperties;
}
+
+ public Optional getConnectorCodecProvider()
+ {
+ return connectorCodecProvider;
+ }
}
}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/Location.java b/presto-main-base/src/main/java/com/facebook/presto/execution/Location.java
index ed5e59718992c..fc93ee63c9458 100644
--- a/presto-main-base/src/main/java/com/facebook/presto/execution/Location.java
+++ b/presto-main-base/src/main/java/com/facebook/presto/execution/Location.java
@@ -13,6 +13,9 @@
*/
package com.facebook.presto.execution;
+import com.facebook.drift.annotations.ThriftConstructor;
+import com.facebook.drift.annotations.ThriftField;
+import com.facebook.drift.annotations.ThriftStruct;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -20,17 +23,20 @@
import static java.util.Objects.requireNonNull;
+@ThriftStruct
public class Location
{
private final String location;
@JsonCreator
+ @ThriftConstructor
public Location(@JsonProperty("location") String location)
{
this.location = requireNonNull(location, "location is null");
}
@JsonProperty
+ @ThriftField(1)
public String getLocation()
{
return location;
diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTarget.java b/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTarget.java
index 2b0c9e5167dac..2704ab5ab8960 100644
--- a/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTarget.java
+++ b/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTarget.java
@@ -14,6 +14,9 @@
package com.facebook.presto.execution.scheduler;
+import com.facebook.drift.annotations.ThriftConstructor;
+import com.facebook.drift.annotations.ThriftField;
+import com.facebook.drift.annotations.ThriftStruct;
import com.facebook.presto.metadata.DeleteTableHandle;
import com.facebook.presto.metadata.InsertTableHandle;
import com.facebook.presto.metadata.OutputTableHandle;
@@ -36,6 +39,7 @@
@SuppressWarnings({"EmptyClass", "ClassMayBeInterface"})
public abstract class ExecutionWriterTarget
{
+ @ThriftStruct
public static class CreateHandle
extends ExecutionWriterTarget
{
@@ -43,6 +47,7 @@ public static class CreateHandle
private final SchemaTableName schemaTableName;
@JsonCreator
+ @ThriftConstructor
public CreateHandle(
@JsonProperty("handle") OutputTableHandle handle,
@JsonProperty("schemaTableName") SchemaTableName schemaTableName)
@@ -52,12 +57,14 @@ public CreateHandle(
}
@JsonProperty
+ @ThriftField(1)
public OutputTableHandle getHandle()
{
return handle;
}
@JsonProperty
+ @ThriftField(2)
public SchemaTableName getSchemaTableName()
{
return schemaTableName;
@@ -70,6 +77,7 @@ public String toString()
}
}
+ @ThriftStruct
public static class InsertHandle
extends ExecutionWriterTarget
{
@@ -77,6 +85,7 @@ public static class InsertHandle
private final SchemaTableName schemaTableName;
@JsonCreator
+ @ThriftConstructor
public InsertHandle(
@JsonProperty("handle") InsertTableHandle handle,
@JsonProperty("schemaTableName") SchemaTableName schemaTableName)
@@ -86,12 +95,14 @@ public InsertHandle(
}
@JsonProperty
+ @ThriftField(1)
public InsertTableHandle getHandle()
{
return handle;
}
@JsonProperty
+ @ThriftField(2)
public SchemaTableName getSchemaTableName()
{
return schemaTableName;
@@ -104,6 +115,7 @@ public String toString()
}
}
+ @ThriftStruct
public static class DeleteHandle
extends ExecutionWriterTarget
{
@@ -111,6 +123,7 @@ public static class DeleteHandle
private final SchemaTableName schemaTableName;
@JsonCreator
+ @ThriftConstructor
public DeleteHandle(
@JsonProperty("handle") DeleteTableHandle handle,
@JsonProperty("schemaTableName") SchemaTableName schemaTableName)
@@ -120,12 +133,14 @@ public DeleteHandle(
}
@JsonProperty
+ @ThriftField(1)
public DeleteTableHandle getHandle()
{
return handle;
}
@JsonProperty
+ @ThriftField(2)
public SchemaTableName getSchemaTableName()
{
return schemaTableName;
@@ -138,6 +153,7 @@ public String toString()
}
}
+ @ThriftStruct
public static class RefreshMaterializedViewHandle
extends ExecutionWriterTarget
{
@@ -145,6 +161,7 @@ public static class RefreshMaterializedViewHandle
private final SchemaTableName schemaTableName;
@JsonCreator
+ @ThriftConstructor
public RefreshMaterializedViewHandle(
@JsonProperty("handle") InsertTableHandle handle,
@JsonProperty("schemaTableName") SchemaTableName schemaTableName)
@@ -154,12 +171,14 @@ public RefreshMaterializedViewHandle(
}
@JsonProperty
+ @ThriftField(1)
public InsertTableHandle getHandle()
{
return handle;
}
@JsonProperty
+ @ThriftField(2)
public SchemaTableName getSchemaTableName()
{
return schemaTableName;
@@ -172,6 +191,7 @@ public String toString()
}
}
+ @ThriftStruct
public static class UpdateHandle
extends ExecutionWriterTarget
{
@@ -179,6 +199,7 @@ public static class UpdateHandle
private final SchemaTableName schemaTableName;
@JsonCreator
+ @ThriftConstructor
public UpdateHandle(
@JsonProperty("handle") TableHandle handle,
@JsonProperty("schemaTableName") SchemaTableName schemaTableName)
@@ -188,12 +209,14 @@ public UpdateHandle(
}
@JsonProperty
+ @ThriftField(1)
public TableHandle getHandle()
{
return handle;
}
@JsonProperty
+ @ThriftField(2)
public SchemaTableName getSchemaTableName()
{
return schemaTableName;
diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTargetUnion.java b/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTargetUnion.java
new file mode 100644
index 0000000000000..f61bd8228d1f1
--- /dev/null
+++ b/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTargetUnion.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.execution.scheduler;
+
+import com.facebook.drift.annotations.ThriftConstructor;
+import com.facebook.drift.annotations.ThriftField;
+import com.facebook.drift.annotations.ThriftUnion;
+import com.facebook.drift.annotations.ThriftUnionId;
+
+import static java.util.Objects.requireNonNull;
+
+@ThriftUnion
+public class ExecutionWriterTargetUnion
+{
+ private short id;
+ private ExecutionWriterTarget.CreateHandle createHandle;
+ private ExecutionWriterTarget.InsertHandle insertHandle;
+ private ExecutionWriterTarget.DeleteHandle deleteHandle;
+ private ExecutionWriterTarget.RefreshMaterializedViewHandle refreshMaterializedViewHandle;
+ private ExecutionWriterTarget.UpdateHandle updateHandle;
+
+ @ThriftConstructor
+ public ExecutionWriterTargetUnion()
+ {
+ this.id = 0;
+ }
+
+ @ThriftConstructor
+ public ExecutionWriterTargetUnion(ExecutionWriterTarget.CreateHandle createHandle)
+ {
+ this.id = 1;
+ this.createHandle = createHandle;
+ }
+
+ @ThriftField(1)
+ public ExecutionWriterTarget.CreateHandle getCreateHandle()
+ {
+ return createHandle;
+ }
+
+ @ThriftConstructor
+ public ExecutionWriterTargetUnion(ExecutionWriterTarget.InsertHandle insertHandle)
+ {
+ this.id = 2;
+ this.insertHandle = insertHandle;
+ }
+
+ @ThriftField(2)
+ public ExecutionWriterTarget.InsertHandle getInsertHandle()
+ {
+ return insertHandle;
+ }
+
+ @ThriftConstructor
+ public ExecutionWriterTargetUnion(ExecutionWriterTarget.DeleteHandle deleteHandle)
+ {
+ this.id = 3;
+ this.deleteHandle = deleteHandle;
+ }
+
+ @ThriftField(3)
+ public ExecutionWriterTarget.DeleteHandle getDeleteHandle()
+ {
+ return deleteHandle;
+ }
+
+ @ThriftConstructor
+ public ExecutionWriterTargetUnion(ExecutionWriterTarget.RefreshMaterializedViewHandle refreshMaterializedViewHandle)
+ {
+ this.id = 4;
+ this.refreshMaterializedViewHandle = refreshMaterializedViewHandle;
+ }
+
+ @ThriftField(4)
+ public ExecutionWriterTarget.RefreshMaterializedViewHandle getRefreshMaterializedViewHandle()
+ {
+ return refreshMaterializedViewHandle;
+ }
+
+ @ThriftConstructor
+ public ExecutionWriterTargetUnion(ExecutionWriterTarget.UpdateHandle updateHandle)
+ {
+ this.id = 5;
+ this.updateHandle = updateHandle;
+ }
+
+ @ThriftField(5)
+ public ExecutionWriterTarget.UpdateHandle getUpdateHandle()
+ {
+ return updateHandle;
+ }
+
+ @ThriftUnionId
+ public short getId()
+ {
+ return id;
+ }
+
+ public static ExecutionWriterTarget toExecutionWriterTarget(ExecutionWriterTargetUnion executionWriterTargetUnion)
+ {
+ requireNonNull(executionWriterTargetUnion, "executionWriterTargetUnion is null");
+ if (executionWriterTargetUnion.getCreateHandle() != null) {
+ return executionWriterTargetUnion.getCreateHandle();
+ }
+ else if (executionWriterTargetUnion.getInsertHandle() != null) {
+ return executionWriterTargetUnion.getInsertHandle();
+ }
+ else if (executionWriterTargetUnion.getDeleteHandle() != null) {
+ return executionWriterTargetUnion.getDeleteHandle();
+ }
+ else if (executionWriterTargetUnion.getRefreshMaterializedViewHandle() != null) {
+ return executionWriterTargetUnion.getRefreshMaterializedViewHandle();
+ }
+ else if (executionWriterTargetUnion.getUpdateHandle() != null) {
+ return executionWriterTargetUnion.getUpdateHandle();
+ }
+ else {
+ throw new IllegalArgumentException("Unrecognized execution writer target: " + executionWriterTargetUnion);
+ }
+ }
+
+ public static ExecutionWriterTargetUnion fromExecutionWriterTarget(ExecutionWriterTarget executionWriterTarget)
+ {
+ requireNonNull(executionWriterTarget, "executionWriterTarget is null");
+
+ if (executionWriterTarget instanceof ExecutionWriterTarget.CreateHandle) {
+ return new ExecutionWriterTargetUnion((ExecutionWriterTarget.CreateHandle) executionWriterTarget);
+ }
+ else if (executionWriterTarget instanceof ExecutionWriterTarget.InsertHandle) {
+ return new ExecutionWriterTargetUnion((ExecutionWriterTarget.InsertHandle) executionWriterTarget);
+ }
+ else if (executionWriterTarget instanceof ExecutionWriterTarget.DeleteHandle) {
+ return new ExecutionWriterTargetUnion((ExecutionWriterTarget.DeleteHandle) executionWriterTarget);
+ }
+ else if (executionWriterTarget instanceof ExecutionWriterTarget.RefreshMaterializedViewHandle) {
+ return new ExecutionWriterTargetUnion((ExecutionWriterTarget.RefreshMaterializedViewHandle) executionWriterTarget);
+ }
+ else if (executionWriterTarget instanceof ExecutionWriterTarget.UpdateHandle) {
+ return new ExecutionWriterTargetUnion((ExecutionWriterTarget.UpdateHandle) executionWriterTarget);
+ }
+ else {
+ throw new IllegalArgumentException("Unsupported execution writer target: " + executionWriterTarget);
+ }
+ }
+}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/TableWriteInfo.java b/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/TableWriteInfo.java
index 63d952c1c0ce1..2d2fa96b6882e 100644
--- a/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/TableWriteInfo.java
+++ b/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/TableWriteInfo.java
@@ -14,6 +14,9 @@
package com.facebook.presto.execution.scheduler;
+import com.facebook.drift.annotations.ThriftConstructor;
+import com.facebook.drift.annotations.ThriftField;
+import com.facebook.drift.annotations.ThriftStruct;
import com.facebook.presto.Session;
import com.facebook.presto.metadata.AnalyzeTableHandle;
import com.facebook.presto.metadata.Metadata;
@@ -38,6 +41,7 @@
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
+@ThriftStruct
public class TableWriteInfo
{
private final Optional writerTarget;
@@ -53,6 +57,12 @@ public TableWriteInfo(
checkArgument(!analyzeTableHandle.isPresent() || !writerTarget.isPresent(), "analyzeTableHandle is present, so no other fields should be present");
}
+ @ThriftConstructor
+ public TableWriteInfo(ExecutionWriterTargetUnion writerTargetUnion, Optional analyzeTableHandle)
+ {
+ this(Optional.ofNullable(writerTargetUnion).map(ExecutionWriterTargetUnion::toExecutionWriterTarget), analyzeTableHandle == null ? Optional.empty() : analyzeTableHandle);
+ }
+
public static TableWriteInfo createTableWriteInfo(StreamingSubPlan plan, Metadata metadata, Session session)
{
Optional writerTarget = createWriterTarget(plan, metadata, session);
@@ -159,7 +169,14 @@ public Optional getWriterTarget()
return writerTarget;
}
+ @ThriftField(value = 1, name = "writerTargetUnion")
+ public ExecutionWriterTargetUnion getWriterTargetUnion()
+ {
+ return writerTarget.map(ExecutionWriterTargetUnion::fromExecutionWriterTarget).orElse(null);
+ }
+
@JsonProperty
+ @ThriftField(2)
public Optional getAnalyzeTableHandle()
{
return analyzeTableHandle;
diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/AnalyzeTableHandle.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/AnalyzeTableHandle.java
index 5e77adeaa1818..df97d5f1eb735 100644
--- a/presto-main-base/src/main/java/com/facebook/presto/metadata/AnalyzeTableHandle.java
+++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/AnalyzeTableHandle.java
@@ -13,6 +13,9 @@
*/
package com.facebook.presto.metadata;
+import com.facebook.drift.annotations.ThriftConstructor;
+import com.facebook.drift.annotations.ThriftField;
+import com.facebook.drift.annotations.ThriftStruct;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
@@ -23,6 +26,7 @@
import static java.util.Objects.requireNonNull;
+@ThriftStruct
public class AnalyzeTableHandle
{
private final ConnectorId connectorId;
@@ -30,6 +34,7 @@ public class AnalyzeTableHandle
private final ConnectorTableHandle connectorHandle;
@JsonCreator
+ @ThriftConstructor
public AnalyzeTableHandle(
@JsonProperty("connectorId") ConnectorId connectorId,
@JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle,
@@ -41,18 +46,21 @@ public AnalyzeTableHandle(
}
@JsonProperty
+ @ThriftField(1)
public ConnectorId getConnectorId()
{
return connectorId;
}
@JsonProperty
+ @ThriftField(3)
public ConnectorTableHandle getConnectorHandle()
{
return connectorHandle;
}
@JsonProperty
+ @ThriftField(2)
public ConnectorTransactionHandle getTransactionHandle()
{
return transactionHandle;
diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/DeleteTableHandle.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/DeleteTableHandle.java
index ea05b223843d5..de2868576bb3f 100644
--- a/presto-main-base/src/main/java/com/facebook/presto/metadata/DeleteTableHandle.java
+++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/DeleteTableHandle.java
@@ -13,6 +13,9 @@
*/
package com.facebook.presto.metadata;
+import com.facebook.drift.annotations.ThriftConstructor;
+import com.facebook.drift.annotations.ThriftField;
+import com.facebook.drift.annotations.ThriftStruct;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
@@ -23,6 +26,7 @@
import static java.util.Objects.requireNonNull;
+@ThriftStruct
public final class DeleteTableHandle
{
private final ConnectorId connectorId;
@@ -30,6 +34,7 @@ public final class DeleteTableHandle
private final ConnectorDeleteTableHandle connectorHandle;
@JsonCreator
+ @ThriftConstructor
public DeleteTableHandle(
@JsonProperty("connectorId") ConnectorId connectorId,
@JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle,
@@ -41,18 +46,21 @@ public DeleteTableHandle(
}
@JsonProperty
+ @ThriftField(1)
public ConnectorId getConnectorId()
{
return connectorId;
}
@JsonProperty
+ @ThriftField(2)
public ConnectorTransactionHandle getTransactionHandle()
{
return transactionHandle;
}
@JsonProperty
+ @ThriftField(3)
public ConnectorDeleteTableHandle getConnectorHandle()
{
return connectorHandle;
diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/InsertTableHandle.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/InsertTableHandle.java
index 6cf3e0d2525d9..bb286e048185f 100644
--- a/presto-main-base/src/main/java/com/facebook/presto/metadata/InsertTableHandle.java
+++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/InsertTableHandle.java
@@ -13,6 +13,9 @@
*/
package com.facebook.presto.metadata;
+import com.facebook.drift.annotations.ThriftConstructor;
+import com.facebook.drift.annotations.ThriftField;
+import com.facebook.drift.annotations.ThriftStruct;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
@@ -23,6 +26,7 @@
import static java.util.Objects.requireNonNull;
+@ThriftStruct
public final class InsertTableHandle
{
private final ConnectorId connectorId;
@@ -30,6 +34,7 @@ public final class InsertTableHandle
private final ConnectorInsertTableHandle connectorHandle;
@JsonCreator
+ @ThriftConstructor
public InsertTableHandle(
@JsonProperty("connectorId") ConnectorId connectorId,
@JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle,
@@ -41,18 +46,21 @@ public InsertTableHandle(
}
@JsonProperty
+ @ThriftField(1)
public ConnectorId getConnectorId()
{
return connectorId;
}
@JsonProperty
+ @ThriftField(2)
public ConnectorTransactionHandle getTransactionHandle()
{
return transactionHandle;
}
@JsonProperty
+ @ThriftField(3)
public ConnectorInsertTableHandle getConnectorHandle()
{
return connectorHandle;
diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/OutputTableHandle.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/OutputTableHandle.java
index 8d3d3d2f6dc87..f88dabb0c243b 100644
--- a/presto-main-base/src/main/java/com/facebook/presto/metadata/OutputTableHandle.java
+++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/OutputTableHandle.java
@@ -13,6 +13,9 @@
*/
package com.facebook.presto.metadata;
+import com.facebook.drift.annotations.ThriftConstructor;
+import com.facebook.drift.annotations.ThriftField;
+import com.facebook.drift.annotations.ThriftStruct;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
@@ -23,6 +26,7 @@
import static java.util.Objects.requireNonNull;
+@ThriftStruct
public final class OutputTableHandle
{
private final ConnectorId connectorId;
@@ -30,6 +34,7 @@ public final class OutputTableHandle
private final ConnectorOutputTableHandle connectorHandle;
@JsonCreator
+ @ThriftConstructor
public OutputTableHandle(
@JsonProperty("connectorId") ConnectorId connectorId,
@JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle,
@@ -41,18 +46,21 @@ public OutputTableHandle(
}
@JsonProperty
+ @ThriftField(1)
public ConnectorId getConnectorId()
{
return connectorId;
}
@JsonProperty
+ @ThriftField(2)
public ConnectorTransactionHandle getTransactionHandle()
{
return transactionHandle;
}
@JsonProperty
+ @ThriftField(3)
public ConnectorOutputTableHandle getConnectorHandle()
{
return connectorHandle;
diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/RemoteTransactionHandle.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/RemoteTransactionHandle.java
index 2f89509efa0ee..9f2e60061dcdf 100644
--- a/presto-main-base/src/main/java/com/facebook/presto/metadata/RemoteTransactionHandle.java
+++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/RemoteTransactionHandle.java
@@ -13,14 +13,18 @@
*/
package com.facebook.presto.metadata;
+import com.facebook.drift.annotations.ThriftConstructor;
+import com.facebook.drift.annotations.ThriftStruct;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+@ThriftStruct
public class RemoteTransactionHandle
implements ConnectorTransactionHandle
{
@JsonCreator
+ @ThriftConstructor
public RemoteTransactionHandle()
{
}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/Split.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/Split.java
index d8581dc1730db..ba93e58a16efc 100644
--- a/presto-main-base/src/main/java/com/facebook/presto/metadata/Split.java
+++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/Split.java
@@ -13,6 +13,9 @@
*/
package com.facebook.presto.metadata;
+import com.facebook.drift.annotations.ThriftConstructor;
+import com.facebook.drift.annotations.ThriftField;
+import com.facebook.drift.annotations.ThriftStruct;
import com.facebook.presto.execution.Lifespan;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorSplit;
@@ -33,6 +36,7 @@
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;
+@ThriftStruct
public final class Split
{
private final ConnectorId connectorId;
@@ -48,6 +52,7 @@ public Split(ConnectorId connectorId, ConnectorTransactionHandle transactionHand
}
@JsonCreator
+ @ThriftConstructor
public Split(
@JsonProperty("connectorId") ConnectorId connectorId,
@JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle,
@@ -63,30 +68,35 @@ public Split(
}
@JsonProperty
+ @ThriftField(1)
public ConnectorId getConnectorId()
{
return connectorId;
}
@JsonProperty
+ @ThriftField(2)
public ConnectorTransactionHandle getTransactionHandle()
{
return transactionHandle;
}
@JsonProperty
+ @ThriftField(3)
public ConnectorSplit getConnectorSplit()
{
return connectorSplit;
}
@JsonProperty
+ @ThriftField(4)
public Lifespan getLifespan()
{
return lifespan;
}
@JsonProperty
+ @ThriftField(5)
public SplitContext getSplitContext()
{
return splitContext;
diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/AbstractTypedThriftCodec.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/AbstractTypedThriftCodec.java
new file mode 100644
index 0000000000000..f5d394b6cda7d
--- /dev/null
+++ b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/AbstractTypedThriftCodec.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.server.thrift;
+
+import com.facebook.airlift.json.JsonCodec;
+import com.facebook.airlift.log.Logger;
+import com.facebook.drift.annotations.ThriftField.Requiredness;
+import com.facebook.drift.codec.ThriftCodec;
+import com.facebook.drift.codec.metadata.DefaultThriftTypeReference;
+import com.facebook.drift.codec.metadata.FieldKind;
+import com.facebook.drift.codec.metadata.ThriftFieldMetadata;
+import com.facebook.drift.codec.metadata.ThriftMethodInjection;
+import com.facebook.drift.codec.metadata.ThriftStructMetadata;
+import com.facebook.drift.codec.metadata.ThriftType;
+import com.facebook.drift.protocol.TField;
+import com.facebook.drift.protocol.TProtocolReader;
+import com.facebook.drift.protocol.TProtocolWriter;
+import com.facebook.drift.protocol.TStruct;
+import com.facebook.drift.protocol.TType;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+
+import static java.util.Objects.requireNonNull;
+
+public abstract class AbstractTypedThriftCodec
+ implements ThriftCodec
+{
+ private static final Set NON_THRIFT_CONNECTOR = new HashSet<>();
+ private static final Logger log = Logger.get(AbstractTypedThriftCodec.class);
+ private static final String TYPE_VALUE = "connectorId";
+ private static final String CUSTOM_SERIALIZED_VALUE = "customSerializedValue";
+ private static final String JSON_VALUE = "jsonValue";
+ private static final short TYPE_FIELD_ID = 1;
+ private static final short CUSTOM_FIELD_ID = 2;
+ private static final short JSON_FIELD_ID = 3;
+
+ private final Class baseClass;
+ private final JsonCodec jsonCodec;
+ private final Function nameResolver;
+ private final Function> classResolver;
+
+ protected AbstractTypedThriftCodec(Class baseClass,
+ JsonCodec jsonCodec,
+ Function nameResolver,
+ Function> classResolver)
+ {
+ this.baseClass = requireNonNull(baseClass, "baseClass is null");
+ this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
+ this.nameResolver = requireNonNull(nameResolver, "nameResolver is null");
+ this.classResolver = requireNonNull(classResolver, "classResolver is null");
+ }
+
+ @Override
+ public abstract ThriftType getType();
+
+ protected static ThriftType createThriftType(Class> baseClass)
+ {
+ List fields = new ArrayList<>();
+ try {
+ fields.add(new ThriftFieldMetadata(
+ TYPE_FIELD_ID,
+ false, false, Requiredness.OPTIONAL, ImmutableMap.of(),
+ new DefaultThriftTypeReference(ThriftType.STRING),
+ TYPE_VALUE,
+ FieldKind.THRIFT_FIELD,
+ ImmutableList.of(),
+ Optional.empty(),
+ // Drift requires at least one of the three arguments below, so we provide a dummy method here as a workaround.
+ // https://github.com/airlift/drift/blob/master/drift-codec/src/main/java/io/airlift/drift/codec/metadata/ThriftFieldMetadata.java#L99
+ Optional.of(new ThriftMethodInjection(AbstractTypedThriftCodec.class.getDeclaredMethod("getTypeField"), ImmutableList.of())),
+ Optional.empty(),
+ Optional.empty()));
+ fields.add(new ThriftFieldMetadata(
+ CUSTOM_FIELD_ID,
+ false, false, Requiredness.OPTIONAL, ImmutableMap.of(),
+ new DefaultThriftTypeReference(ThriftType.BINARY),
+ CUSTOM_SERIALIZED_VALUE,
+ FieldKind.THRIFT_FIELD,
+ ImmutableList.of(),
+ Optional.empty(),
+ // Drift requires at least one of the three arguments below, so we provide a dummy method here as a workaround.
+ // https://github.com/airlift/drift/blob/master/drift-codec/src/main/java/io/airlift/drift/codec/metadata/ThriftFieldMetadata.java#L99
+ Optional.of(new ThriftMethodInjection(AbstractTypedThriftCodec.class.getDeclaredMethod("getCustomField"), ImmutableList.of())),
+ Optional.empty(),
+ Optional.empty()));
+ // TODO: This field will be cleaned up: https://github.com/prestodb/presto/issues/25671
+ fields.add(new ThriftFieldMetadata(
+ JSON_FIELD_ID,
+ false, false, Requiredness.OPTIONAL, ImmutableMap.of(),
+ new DefaultThriftTypeReference(ThriftType.STRING),
+ JSON_VALUE,
+ FieldKind.THRIFT_FIELD,
+ ImmutableList.of(),
+ Optional.empty(),
+ // Drift requires at least one of the three arguments below, so we provide a dummy method here as a workaround.
+ // https://github.com/airlift/drift/blob/master/drift-codec/src/main/java/io/airlift/drift/codec/metadata/ThriftFieldMetadata.java#L99
+ Optional.of(new ThriftMethodInjection(AbstractTypedThriftCodec.class.getDeclaredMethod("getJsonField"), ImmutableList.of())),
+ Optional.empty(),
+ Optional.empty()));
+ }
+ catch (NoSuchMethodException e) {
+ throw new IllegalArgumentException("Failed to create ThriftFieldMetadata", e);
+ }
+
+ return ThriftType.struct(new ThriftStructMetadata(
+ baseClass.getSimpleName(),
+ ImmutableMap.of(), baseClass, null, ThriftStructMetadata.MetadataType.STRUCT,
+ Optional.empty(), ImmutableList.of(), fields, Optional.empty(), ImmutableList.of()));
+ }
+
+ @Override
+ public T read(TProtocolReader reader)
+ throws Exception
+ {
+ String connectorId = null;
+ T value = null;
+ String jsonValue = null;
+
+ reader.readStructBegin();
+ while (true) {
+ TField field = reader.readFieldBegin();
+ if (field.getType() == TType.STOP) {
+ break;
+ }
+ switch (field.getId()) {
+ case JSON_FIELD_ID:
+ jsonValue = reader.readString();
+ break;
+ case TYPE_FIELD_ID:
+ connectorId = reader.readString();
+ break;
+ case CUSTOM_FIELD_ID:
+ requireNonNull(connectorId, "connectorId is null");
+ Class extends T> concreteClass = classResolver.apply(connectorId);
+ requireNonNull(concreteClass, "concreteClass is null");
+ value = readConcreteValue(connectorId, reader);
+ break;
+ }
+ reader.readFieldEnd();
+ }
+ reader.readStructEnd();
+
+ if (jsonValue != null) {
+ return jsonCodec.fromJson(jsonValue);
+ }
+ if (value != null) {
+ return value;
+ }
+ throw new IllegalStateException("Neither thrift nor json value was present");
+ }
+
+ public abstract T readConcreteValue(String connectorId, TProtocolReader reader)
+ throws Exception;
+
+ public abstract void writeConcreteValue(String connectorId, T value, TProtocolWriter writer)
+ throws Exception;
+
+ public abstract boolean isThriftCodecAvailable(String connectorId);
+
+ @Override
+ public void write(T value, TProtocolWriter writer)
+ throws Exception
+ {
+ if (value == null) {
+ return;
+ }
+ String connectorId = nameResolver.apply(value);
+ requireNonNull(connectorId, "connectorId is null");
+
+ writer.writeStructBegin(new TStruct(baseClass.getSimpleName()));
+ if (isThriftCodecAvailable(connectorId)) {
+ writer.writeFieldBegin(new TField(TYPE_VALUE, TType.STRING, TYPE_FIELD_ID));
+ writer.writeString(connectorId);
+ writer.writeFieldEnd();
+
+ writer.writeFieldBegin(new TField(CUSTOM_SERIALIZED_VALUE, TType.STRING, CUSTOM_FIELD_ID));
+ writeConcreteValue(connectorId, value, writer);
+ writer.writeFieldEnd();
+ }
+ else {
+ // If thrift codec is not available for this connector, fall back to its json
+ writer.writeFieldBegin(new TField(JSON_VALUE, TType.STRING, JSON_FIELD_ID));
+ writer.writeString(jsonCodec.toJson(value));
+ writer.writeFieldEnd();
+ }
+ writer.writeFieldStop();
+ writer.writeStructEnd();
+ }
+
+ private String getTypeField()
+ {
+ return "getTypeField";
+ }
+
+ private String getCustomField()
+ {
+ return "getCustomField";
+ }
+
+ private String getJsonField()
+ {
+ return "getJsonField";
+ }
+}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/ConnectorSplitThriftCodec.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/ConnectorSplitThriftCodec.java
new file mode 100644
index 0000000000000..234a53611f421
--- /dev/null
+++ b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/ConnectorSplitThriftCodec.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.server.thrift;
+
+import com.facebook.airlift.json.JsonCodec;
+import com.facebook.drift.codec.CodecThriftType;
+import com.facebook.drift.codec.metadata.ThriftType;
+import com.facebook.drift.protocol.TProtocolReader;
+import com.facebook.drift.protocol.TProtocolWriter;
+import com.facebook.presto.connector.ConnectorCodecManager;
+import com.facebook.presto.metadata.HandleResolver;
+import com.facebook.presto.spi.ConnectorSplit;
+
+import javax.inject.Inject;
+
+import java.nio.ByteBuffer;
+
+import static java.util.Objects.requireNonNull;
+
+public class ConnectorSplitThriftCodec
+ extends AbstractTypedThriftCodec
+{
+ private static final ThriftType THRIFT_TYPE = createThriftType(ConnectorSplit.class);
+ private final ConnectorCodecManager connectorCodecManager;
+
+ @Inject
+ public ConnectorSplitThriftCodec(HandleResolver handleResolver, ConnectorCodecManager connectorCodecManager, JsonCodec jsonCodec)
+ {
+ super(ConnectorSplit.class,
+ requireNonNull(jsonCodec, "jsonCodec is null"),
+ requireNonNull(handleResolver, "handleResolver is null")::getId,
+ handleResolver::getSplitClass);
+ this.connectorCodecManager = requireNonNull(connectorCodecManager, "connectorThriftCodecManager is null");
+ }
+
+ @CodecThriftType
+ public static ThriftType getThriftType()
+ {
+ return THRIFT_TYPE;
+ }
+
+ @Override
+ public ThriftType getType()
+ {
+ return THRIFT_TYPE;
+ }
+
+ @Override
+ public ConnectorSplit readConcreteValue(String connectorId, TProtocolReader reader)
+ throws Exception
+ {
+ ByteBuffer byteBuffer = reader.readBinary();
+ assert (byteBuffer.position() == 0);
+ byte[] bytes = byteBuffer.array();
+ return connectorCodecManager.getConnectorSplitCodec(connectorId).map(codec -> codec.deserialize(bytes)).orElse(null);
+ }
+
+ @Override
+ public void writeConcreteValue(String connectorId, ConnectorSplit value, TProtocolWriter writer)
+ throws Exception
+ {
+ requireNonNull(value, "value is null");
+ writer.writeBinary(ByteBuffer.wrap(connectorCodecManager.getConnectorSplitCodec(connectorId).map(codec -> codec.serialize(value)).orElseThrow(() -> new IllegalArgumentException("Cannot serialize " + value))));
+ }
+
+ @Override
+ public boolean isThriftCodecAvailable(String connectorId)
+ {
+ return connectorCodecManager.getConnectorSplitCodec(connectorId).isPresent();
+ }
+}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/CustomCodecUtils.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/CustomCodecUtils.java
deleted file mode 100644
index 483c0c3def17d..0000000000000
--- a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/CustomCodecUtils.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.facebook.presto.server.thrift;
-
-import com.facebook.airlift.json.JsonCodec;
-import com.facebook.drift.TException;
-import com.facebook.drift.codec.metadata.DefaultThriftTypeReference;
-import com.facebook.drift.codec.metadata.FieldKind;
-import com.facebook.drift.codec.metadata.ThriftFieldExtractor;
-import com.facebook.drift.codec.metadata.ThriftFieldMetadata;
-import com.facebook.drift.codec.metadata.ThriftStructMetadata;
-import com.facebook.drift.codec.metadata.ThriftType;
-import com.facebook.drift.protocol.TField;
-import com.facebook.drift.protocol.TProtocolException;
-import com.facebook.drift.protocol.TProtocolReader;
-import com.facebook.drift.protocol.TProtocolWriter;
-import com.facebook.drift.protocol.TStruct;
-import com.facebook.drift.protocol.TType;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-import java.util.Optional;
-
-import static com.facebook.drift.annotations.ThriftField.Requiredness.NONE;
-import static java.lang.String.format;
-
-/***
- * When we need a custom codec for a primitive type, we need a wrapper to pass the needsCodec check within ThriftCodecByteCodeGenerator.java
- */
-public class CustomCodecUtils
-{
- private CustomCodecUtils() {}
-
- public static ThriftStructMetadata createSyntheticMetadata(short fieldId, String fieldName, Class> originalType, Class> referencedType, ThriftType thriftType)
- {
- ThriftFieldMetadata fieldMetaData = new ThriftFieldMetadata(
- fieldId,
- false, false, NONE, ImmutableMap.of(),
- new DefaultThriftTypeReference(thriftType),
- fieldName,
- FieldKind.THRIFT_FIELD,
- ImmutableList.of(),
- Optional.empty(),
- Optional.empty(),
- Optional.of(new ThriftFieldExtractor(
- fieldId,
- fieldName,
- FieldKind.THRIFT_FIELD,
- originalType.getDeclaredFields()[0], // Any field should work since we are handing extraction in codec on our own
- referencedType)),
- Optional.empty());
- return new ThriftStructMetadata(
- originalType.getSimpleName() + "Wrapper",
- ImmutableMap.of(),
- originalType, null,
- ThriftStructMetadata.MetadataType.STRUCT,
- Optional.empty(), ImmutableList.of(), ImmutableList.of(fieldMetaData), Optional.empty(), ImmutableList.of());
- }
-
- public static T readSingleJsonField(TProtocolReader protocol, JsonCodec jsonCodec, short fieldId, String fieldName)
- throws TException
- {
- protocol.readStructBegin();
- String jsonValue = null;
- TField field = protocol.readFieldBegin();
- while (field.getType() != TType.STOP) {
- if (field.getId() == fieldId) {
- if (field.getType() == TType.STRING) {
- jsonValue = protocol.readString();
- }
- else {
- throw new TProtocolException(format("Unexpected field type: %s for field %s", field.getType(), fieldName));
- }
- }
- protocol.readFieldEnd();
- field = protocol.readFieldBegin();
- }
- protocol.readStructEnd();
-
- if (jsonValue == null) {
- throw new TProtocolException(format("Required field '%s' was not found", fieldName));
- }
- return jsonCodec.fromJson(jsonValue);
- }
-
- public static void writeSingleJsonField(T value, TProtocolWriter protocol, JsonCodec jsonCodec, short fieldId, String fieldName, String structName)
- throws TException
- {
- protocol.writeStructBegin(new TStruct(structName));
-
- protocol.writeFieldBegin(new TField(fieldName, TType.STRING, fieldId));
- protocol.writeString(jsonCodec.toJson(value));
- protocol.writeFieldEnd();
-
- protocol.writeFieldStop();
- protocol.writeStructEnd();
- }
-}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/DeleteTableHandleThriftCodec.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/DeleteTableHandleThriftCodec.java
new file mode 100644
index 0000000000000..37232f59e1fba
--- /dev/null
+++ b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/DeleteTableHandleThriftCodec.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.server.thrift;
+
+import com.facebook.airlift.json.JsonCodec;
+import com.facebook.drift.codec.CodecThriftType;
+import com.facebook.drift.codec.metadata.ThriftType;
+import com.facebook.drift.protocol.TProtocolReader;
+import com.facebook.drift.protocol.TProtocolWriter;
+import com.facebook.presto.connector.ConnectorCodecManager;
+import com.facebook.presto.metadata.HandleResolver;
+import com.facebook.presto.spi.ConnectorDeleteTableHandle;
+
+import javax.inject.Inject;
+
+import java.nio.ByteBuffer;
+
+import static java.util.Objects.requireNonNull;
+
+public class DeleteTableHandleThriftCodec
+ extends AbstractTypedThriftCodec
+{
+ private static final ThriftType THRIFT_TYPE = createThriftType(ConnectorDeleteTableHandle.class);
+ private final ConnectorCodecManager connectorCodecManager;
+
+ @Inject
+ public DeleteTableHandleThriftCodec(HandleResolver handleResolver, ConnectorCodecManager connectorCodecManager, JsonCodec jsonCodec)
+ {
+ super(ConnectorDeleteTableHandle.class,
+ requireNonNull(jsonCodec, "jsonCodec is null"),
+ requireNonNull(handleResolver, "handleResolver is null")::getId,
+ handleResolver::getDeleteTableHandleClass);
+ this.connectorCodecManager = requireNonNull(connectorCodecManager, "connectorThriftCodecManager is null");
+ }
+
+ @CodecThriftType
+ public static ThriftType getThriftType()
+ {
+ return THRIFT_TYPE;
+ }
+
+ @Override
+ public ThriftType getType()
+ {
+ return THRIFT_TYPE;
+ }
+
+ @Override
+ public ConnectorDeleteTableHandle readConcreteValue(String connectorId, TProtocolReader reader)
+ throws Exception
+ {
+ ByteBuffer byteBuffer = reader.readBinary();
+ assert (byteBuffer.position() == 0);
+ byte[] bytes = byteBuffer.array();
+ return connectorCodecManager.getDeleteTableHandleCodec(connectorId).map(codec -> codec.deserialize(bytes)).orElse(null);
+ }
+
+ @Override
+ public void writeConcreteValue(String connectorId, ConnectorDeleteTableHandle value, TProtocolWriter writer)
+ throws Exception
+ {
+ requireNonNull(value, "value is null");
+ writer.writeBinary(ByteBuffer.wrap(connectorCodecManager.getDeleteTableHandleCodec(connectorId).map(codec -> codec.serialize(value)).orElseThrow(() -> new IllegalArgumentException("Can not serialize " + value))));
+ }
+
+ @Override
+ public boolean isThriftCodecAvailable(String connectorId)
+ {
+ return connectorCodecManager.getDeleteTableHandleCodec(connectorId).isPresent();
+ }
+}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/HandleThriftModule.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/HandleThriftModule.java
new file mode 100644
index 0000000000000..6d4c169e82032
--- /dev/null
+++ b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/HandleThriftModule.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.server.thrift;
+
+import com.facebook.presto.metadata.HandleResolver;
+import com.facebook.presto.spi.ConnectorDeleteTableHandle;
+import com.facebook.presto.spi.ConnectorInsertTableHandle;
+import com.facebook.presto.spi.ConnectorOutputTableHandle;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Scopes;
+
+import static com.facebook.airlift.json.JsonCodecBinder.jsonCodecBinder;
+import static com.facebook.drift.codec.guice.ThriftCodecBinder.thriftCodecBinder;
+
+public class HandleThriftModule
+ implements Module
+{
+ @Override
+ public void configure(Binder binder)
+ {
+ thriftCodecBinder(binder).bindCustomThriftCodec(ConnectorSplitThriftCodec.class);
+ thriftCodecBinder(binder).bindCustomThriftCodec(TransactionHandleThriftCodec.class);
+ thriftCodecBinder(binder).bindCustomThriftCodec(OutputTableHandleThriftCodec.class);
+ thriftCodecBinder(binder).bindCustomThriftCodec(InsertTableHandleThriftCodec.class);
+ thriftCodecBinder(binder).bindCustomThriftCodec(DeleteTableHandleThriftCodec.class);
+ thriftCodecBinder(binder).bindCustomThriftCodec(TableLayoutHandleThriftCodec.class);
+ thriftCodecBinder(binder).bindCustomThriftCodec(TableHandleThriftCodec.class);
+
+ jsonCodecBinder(binder).bindJsonCodec(ConnectorSplit.class);
+ jsonCodecBinder(binder).bindJsonCodec(ConnectorTransactionHandle.class);
+ jsonCodecBinder(binder).bindJsonCodec(ConnectorOutputTableHandle.class);
+ jsonCodecBinder(binder).bindJsonCodec(ConnectorInsertTableHandle.class);
+ jsonCodecBinder(binder).bindJsonCodec(ConnectorDeleteTableHandle.class);
+ jsonCodecBinder(binder).bindJsonCodec(ConnectorTableLayoutHandle.class);
+ jsonCodecBinder(binder).bindJsonCodec(ConnectorTableHandle.class);
+
+ binder.bind(HandleResolver.class).in(Scopes.SINGLETON);
+ }
+}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/InsertTableHandleThriftCodec.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/InsertTableHandleThriftCodec.java
new file mode 100644
index 0000000000000..c01480a5c535e
--- /dev/null
+++ b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/InsertTableHandleThriftCodec.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.server.thrift;
+
+import com.facebook.airlift.json.JsonCodec;
+import com.facebook.drift.codec.CodecThriftType;
+import com.facebook.drift.codec.metadata.ThriftType;
+import com.facebook.drift.protocol.TProtocolReader;
+import com.facebook.drift.protocol.TProtocolWriter;
+import com.facebook.presto.connector.ConnectorCodecManager;
+import com.facebook.presto.metadata.HandleResolver;
+import com.facebook.presto.spi.ConnectorInsertTableHandle;
+
+import javax.inject.Inject;
+
+import java.nio.ByteBuffer;
+
+import static java.util.Objects.requireNonNull;
+
+public class InsertTableHandleThriftCodec
+ extends AbstractTypedThriftCodec
+{
+ private static final ThriftType THRIFT_TYPE = createThriftType(ConnectorInsertTableHandle.class);
+ private final ConnectorCodecManager connectorCodecManager;
+
+ @Inject
+ public InsertTableHandleThriftCodec(HandleResolver handleResolver, ConnectorCodecManager connectorCodecManager, JsonCodec jsonCodec)
+ {
+ super(ConnectorInsertTableHandle.class,
+ requireNonNull(jsonCodec, "jsonCodec is null"),
+ requireNonNull(handleResolver, "handleResolver is null")::getId,
+ handleResolver::getInsertTableHandleClass);
+ this.connectorCodecManager = requireNonNull(connectorCodecManager, "connectorThriftCodecManager is null");
+ }
+
+ @CodecThriftType
+ public static ThriftType getThriftType()
+ {
+ return THRIFT_TYPE;
+ }
+
+ @Override
+ public ThriftType getType()
+ {
+ return THRIFT_TYPE;
+ }
+
+ @Override
+ public ConnectorInsertTableHandle readConcreteValue(String connectorId, TProtocolReader reader)
+ throws Exception
+ {
+ ByteBuffer byteBuffer = reader.readBinary();
+ assert (byteBuffer.position() == 0);
+ byte[] bytes = byteBuffer.array();
+ return connectorCodecManager.getInsertTableHandleCodec(connectorId).map(codec -> codec.deserialize(bytes)).orElse(null);
+ }
+
+ @Override
+ public void writeConcreteValue(String connectorId, ConnectorInsertTableHandle value, TProtocolWriter writer)
+ throws Exception
+ {
+ requireNonNull(value, "value is null");
+ writer.writeBinary(ByteBuffer.wrap(connectorCodecManager.getInsertTableHandleCodec(connectorId).map(codec -> codec.serialize(value)).orElseThrow(() -> new IllegalArgumentException("Can not serialize " + value))));
+ }
+
+ @Override
+ public boolean isThriftCodecAvailable(String connectorId)
+ {
+ return connectorCodecManager.getInsertTableHandleCodec(connectorId).isPresent();
+ }
+}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/OutputTableHandleThriftCodec.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/OutputTableHandleThriftCodec.java
new file mode 100644
index 0000000000000..b364dc694f8b7
--- /dev/null
+++ b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/OutputTableHandleThriftCodec.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.server.thrift;
+
+import com.facebook.airlift.json.JsonCodec;
+import com.facebook.drift.codec.CodecThriftType;
+import com.facebook.drift.codec.metadata.ThriftType;
+import com.facebook.drift.protocol.TProtocolReader;
+import com.facebook.drift.protocol.TProtocolWriter;
+import com.facebook.presto.connector.ConnectorCodecManager;
+import com.facebook.presto.metadata.HandleResolver;
+import com.facebook.presto.spi.ConnectorOutputTableHandle;
+
+import javax.inject.Inject;
+
+import java.nio.ByteBuffer;
+
+import static java.util.Objects.requireNonNull;
+
+public class OutputTableHandleThriftCodec
+ extends AbstractTypedThriftCodec
+{
+ private static final ThriftType THRIFT_TYPE = createThriftType(ConnectorOutputTableHandle.class);
+ private final ConnectorCodecManager connectorCodecManager;
+
+ @Inject
+ public OutputTableHandleThriftCodec(HandleResolver handleResolver, ConnectorCodecManager connectorCodecManager, JsonCodec jsonCodec)
+ {
+ super(ConnectorOutputTableHandle.class,
+ requireNonNull(jsonCodec, "jsonCodec is null"),
+ requireNonNull(handleResolver, "handleResolver is null")::getId,
+ handleResolver::getOutputTableHandleClass);
+ this.connectorCodecManager = requireNonNull(connectorCodecManager, "connectorThriftCodecManager is null");
+ }
+
+ @CodecThriftType
+ public static ThriftType getThriftType()
+ {
+ return THRIFT_TYPE;
+ }
+
+ @Override
+ public ThriftType getType()
+ {
+ return THRIFT_TYPE;
+ }
+
+ @Override
+ public ConnectorOutputTableHandle readConcreteValue(String connectorId, TProtocolReader reader)
+ throws Exception
+ {
+ ByteBuffer byteBuffer = reader.readBinary();
+ assert (byteBuffer.position() == 0);
+ byte[] bytes = byteBuffer.array();
+ return connectorCodecManager.getOutputTableHandleCodec(connectorId).map(codec -> codec.deserialize(bytes)).orElse(null);
+ }
+
+ @Override
+ public void writeConcreteValue(String connectorId, ConnectorOutputTableHandle value, TProtocolWriter writer)
+ throws Exception
+ {
+ requireNonNull(value, "value is null");
+ writer.writeBinary(ByteBuffer.wrap(connectorCodecManager.getOutputTableHandleCodec(connectorId).map(codec -> codec.serialize(value)).orElseThrow(() -> new IllegalArgumentException("Can not serialize " + value))));
+ }
+
+ @Override
+ public boolean isThriftCodecAvailable(String connectorId)
+ {
+ return connectorCodecManager.getOutputTableHandleCodec(connectorId).isPresent();
+ }
+}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/SplitCodec.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/SplitCodec.java
deleted file mode 100644
index e47fdcf9b3e19..0000000000000
--- a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/SplitCodec.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.facebook.presto.server.thrift;
-
-import com.facebook.airlift.json.JsonCodec;
-import com.facebook.drift.codec.CodecThriftType;
-import com.facebook.drift.codec.ThriftCodec;
-import com.facebook.drift.codec.metadata.ThriftCatalog;
-import com.facebook.drift.codec.metadata.ThriftType;
-import com.facebook.drift.protocol.TProtocolReader;
-import com.facebook.drift.protocol.TProtocolWriter;
-import com.facebook.presto.metadata.Split;
-
-import javax.inject.Inject;
-
-import static com.facebook.presto.server.thrift.CustomCodecUtils.createSyntheticMetadata;
-import static com.facebook.presto.server.thrift.CustomCodecUtils.readSingleJsonField;
-import static com.facebook.presto.server.thrift.CustomCodecUtils.writeSingleJsonField;
-import static java.util.Objects.requireNonNull;
-
-public class SplitCodec
- implements ThriftCodec
-{
- private static final short SPLIT_DATA_FIELD_ID = 1;
- private static final String SPLIT_DATA_FIELD_NAME = "split";
- private static final String SPLIT_DATA_STRUCT_NAME = "Split";
- private static final ThriftType SYNTHETIC_STRUCT_TYPE = ThriftType.struct(createSyntheticMetadata(SPLIT_DATA_FIELD_ID, SPLIT_DATA_FIELD_NAME, Split.class, String.class, ThriftType.STRING));
-
- private final JsonCodec jsonCodec;
-
- @Inject
- public SplitCodec(JsonCodec jsonCodec, ThriftCatalog thriftCatalog)
- {
- this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
- thriftCatalog.addThriftType(SYNTHETIC_STRUCT_TYPE);
- }
-
- @CodecThriftType
- public static ThriftType getThriftType()
- {
- return SYNTHETIC_STRUCT_TYPE;
- }
-
- @Override
- public ThriftType getType()
- {
- return SYNTHETIC_STRUCT_TYPE;
- }
-
- @Override
- public Split read(TProtocolReader protocol)
- throws Exception
- {
- return readSingleJsonField(protocol, jsonCodec, SPLIT_DATA_FIELD_ID, SPLIT_DATA_FIELD_NAME);
- }
-
- @Override
- public void write(Split value, TProtocolWriter protocol)
- throws Exception
- {
- writeSingleJsonField(value, protocol, jsonCodec, SPLIT_DATA_FIELD_ID, SPLIT_DATA_FIELD_NAME, SPLIT_DATA_STRUCT_NAME);
- }
-}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TableHandleThriftCodec.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TableHandleThriftCodec.java
new file mode 100644
index 0000000000000..95b209879c762
--- /dev/null
+++ b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TableHandleThriftCodec.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.server.thrift;
+
+import com.facebook.airlift.json.JsonCodec;
+import com.facebook.drift.codec.CodecThriftType;
+import com.facebook.drift.codec.metadata.ThriftType;
+import com.facebook.drift.protocol.TProtocolReader;
+import com.facebook.drift.protocol.TProtocolWriter;
+import com.facebook.presto.connector.ConnectorCodecManager;
+import com.facebook.presto.metadata.HandleResolver;
+import com.facebook.presto.spi.ConnectorTableHandle;
+
+import javax.inject.Inject;
+
+import java.nio.ByteBuffer;
+
+import static java.util.Objects.requireNonNull;
+
+public class TableHandleThriftCodec
+ extends AbstractTypedThriftCodec
+{
+ private static final ThriftType THRIFT_TYPE = createThriftType(ConnectorTableHandle.class);
+ private final ConnectorCodecManager connectorCodecManager;
+
+ @Inject
+ public TableHandleThriftCodec(HandleResolver handleResolver, ConnectorCodecManager connectorCodecManager, JsonCodec jsonCodec)
+ {
+ super(ConnectorTableHandle.class,
+ requireNonNull(jsonCodec, "jsonCodec is null"),
+ requireNonNull(handleResolver, "handleResolver is null")::getId,
+ handleResolver::getTableHandleClass);
+ this.connectorCodecManager = requireNonNull(connectorCodecManager, "connectorThriftCodecManager is null");
+ }
+
+ @CodecThriftType
+ public static ThriftType getThriftType()
+ {
+ return THRIFT_TYPE;
+ }
+
+ @Override
+ public ThriftType getType()
+ {
+ return THRIFT_TYPE;
+ }
+
+ @Override
+ public ConnectorTableHandle readConcreteValue(String connectorId, TProtocolReader reader)
+ throws Exception
+ {
+ ByteBuffer byteBuffer = reader.readBinary();
+ assert (byteBuffer.position() == 0);
+ byte[] bytes = byteBuffer.array();
+ return connectorCodecManager.getTableHandleCodec(connectorId).map(codec -> codec.deserialize(bytes)).orElse(null);
+ }
+
+ @Override
+ public void writeConcreteValue(String connectorId, ConnectorTableHandle value, TProtocolWriter writer)
+ throws Exception
+ {
+ requireNonNull(value, "value is null");
+ writer.writeBinary(ByteBuffer.wrap(connectorCodecManager.getTableHandleCodec(connectorId).map(codec -> codec.serialize(value)).orElseThrow(() -> new IllegalArgumentException("Can not serialize " + value))));
+ }
+
+ @Override
+ public boolean isThriftCodecAvailable(String connectorId)
+ {
+ return connectorCodecManager.getTableHandleCodec(connectorId).isPresent();
+ }
+}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TableLayoutHandleThriftCodec.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TableLayoutHandleThriftCodec.java
new file mode 100644
index 0000000000000..9387a6c0058b5
--- /dev/null
+++ b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TableLayoutHandleThriftCodec.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.server.thrift;
+
+import com.facebook.airlift.json.JsonCodec;
+import com.facebook.drift.codec.CodecThriftType;
+import com.facebook.drift.codec.metadata.ThriftType;
+import com.facebook.drift.protocol.TProtocolReader;
+import com.facebook.drift.protocol.TProtocolWriter;
+import com.facebook.presto.connector.ConnectorCodecManager;
+import com.facebook.presto.metadata.HandleResolver;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+
+import javax.inject.Inject;
+
+import java.nio.ByteBuffer;
+
+import static java.util.Objects.requireNonNull;
+
+public class TableLayoutHandleThriftCodec
+ extends AbstractTypedThriftCodec
+{
+ private static final ThriftType THRIFT_TYPE = createThriftType(ConnectorTableLayoutHandle.class);
+ private final ConnectorCodecManager connectorCodecManager;
+
+ @Inject
+ public TableLayoutHandleThriftCodec(HandleResolver handleResolver, ConnectorCodecManager connectorCodecManager, JsonCodec jsonCodec)
+ {
+ super(ConnectorTableLayoutHandle.class,
+ requireNonNull(jsonCodec, "jsonCodec is null"),
+ requireNonNull(handleResolver, "handleResolver is null")::getId,
+ handleResolver::getTableLayoutHandleClass);
+ this.connectorCodecManager = requireNonNull(connectorCodecManager, "connectorThriftCodecManager is null");
+ }
+
+ @CodecThriftType
+ public static ThriftType getThriftType()
+ {
+ return THRIFT_TYPE;
+ }
+
+ @Override
+ public ThriftType getType()
+ {
+ return THRIFT_TYPE;
+ }
+
+ @Override
+ public ConnectorTableLayoutHandle readConcreteValue(String connectorId, TProtocolReader reader)
+ throws Exception
+ {
+ ByteBuffer byteBuffer = reader.readBinary();
+ assert (byteBuffer.position() == 0);
+ byte[] bytes = byteBuffer.array();
+ return connectorCodecManager.getTableLayoutHandleCodec(connectorId).map(codec -> codec.deserialize(bytes)).orElse(null);
+ }
+
+ @Override
+ public void writeConcreteValue(String connectorId, ConnectorTableLayoutHandle value, TProtocolWriter writer)
+ throws Exception
+ {
+ requireNonNull(value, "value is null");
+ writer.writeBinary(ByteBuffer.wrap(connectorCodecManager.getTableLayoutHandleCodec(connectorId).map(codec -> codec.serialize(value)).orElseThrow(() -> new IllegalArgumentException("Can not serialize " + value))));
+ }
+
+ @Override
+ public boolean isThriftCodecAvailable(String connectorId)
+ {
+ return connectorCodecManager.getTableLayoutHandleCodec(connectorId).isPresent();
+ }
+}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TableWriteInfoCodec.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TableWriteInfoCodec.java
deleted file mode 100644
index 50754a0593bae..0000000000000
--- a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TableWriteInfoCodec.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.facebook.presto.server.thrift;
-
-import com.facebook.airlift.json.JsonCodec;
-import com.facebook.drift.codec.CodecThriftType;
-import com.facebook.drift.codec.ThriftCodec;
-import com.facebook.drift.codec.metadata.ThriftCatalog;
-import com.facebook.drift.codec.metadata.ThriftType;
-import com.facebook.drift.protocol.TProtocolReader;
-import com.facebook.drift.protocol.TProtocolWriter;
-import com.facebook.presto.execution.scheduler.TableWriteInfo;
-
-import javax.inject.Inject;
-
-import static com.facebook.presto.server.thrift.CustomCodecUtils.createSyntheticMetadata;
-import static com.facebook.presto.server.thrift.CustomCodecUtils.readSingleJsonField;
-import static com.facebook.presto.server.thrift.CustomCodecUtils.writeSingleJsonField;
-import static java.util.Objects.requireNonNull;
-
-public class TableWriteInfoCodec
- implements ThriftCodec
-{
- private static final short TABLE_WRITE_INFO_DATA_FIELD_ID = 1;
- private static final String TABLE_WRITE_INFO_DATA_FIELD_NAME = "tableWriteInfo";
- private static final String TABLE_WRITE_INFO_STRUCT_NAME = "TableWriteInfo";
- private static final ThriftType SYNTHETIC_STRUCT_TYPE = ThriftType.struct(createSyntheticMetadata(TABLE_WRITE_INFO_DATA_FIELD_ID, TABLE_WRITE_INFO_DATA_FIELD_NAME, TableWriteInfo.class, String.class, ThriftType.STRING));
-
- private final JsonCodec jsonCodec;
-
- @Inject
- public TableWriteInfoCodec(JsonCodec jsonCodec, ThriftCatalog thriftCatalog)
- {
- this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
- thriftCatalog.addThriftType(SYNTHETIC_STRUCT_TYPE);
- }
-
- @CodecThriftType
- public static ThriftType getThriftType()
- {
- return SYNTHETIC_STRUCT_TYPE;
- }
-
- @Override
- public ThriftType getType()
- {
- return SYNTHETIC_STRUCT_TYPE;
- }
-
- @Override
- public TableWriteInfo read(TProtocolReader protocol)
- throws Exception
- {
- return readSingleJsonField(protocol, jsonCodec, TABLE_WRITE_INFO_DATA_FIELD_ID, TABLE_WRITE_INFO_DATA_FIELD_NAME);
- }
-
- @Override
- public void write(TableWriteInfo value, TProtocolWriter protocol)
- throws Exception
- {
- writeSingleJsonField(value, protocol, jsonCodec, TABLE_WRITE_INFO_DATA_FIELD_ID, TABLE_WRITE_INFO_DATA_FIELD_NAME, TABLE_WRITE_INFO_STRUCT_NAME);
- }
-}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/ThriftCodecUtils.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/ThriftCodecUtils.java
new file mode 100644
index 0000000000000..4819ac2a00a40
--- /dev/null
+++ b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/ThriftCodecUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.server.thrift;
+
+import com.facebook.drift.codec.ThriftCodec;
+import com.facebook.drift.protocol.TBinaryProtocol;
+import com.facebook.drift.protocol.TMemoryBuffer;
+import com.facebook.drift.protocol.TMemoryBufferWriteOnly;
+import com.facebook.drift.protocol.TProtocolException;
+
+public class ThriftCodecUtils
+{
+ private ThriftCodecUtils() {}
+
+ public static T fromThrift(byte[] bytes, ThriftCodec thriftCodec)
+ throws TProtocolException
+ {
+ try {
+ TMemoryBuffer transport = new TMemoryBuffer(bytes.length);
+ transport.write(bytes);
+ TBinaryProtocol protocol = new TBinaryProtocol(transport);
+ return thriftCodec.read(protocol);
+ }
+ catch (Exception e) {
+ throw new TProtocolException("Can not deserialize the data", e);
+ }
+ }
+
+ public static byte[] toThrift(T value, ThriftCodec thriftCodec)
+ throws TProtocolException
+ {
+ TMemoryBufferWriteOnly transport = new TMemoryBufferWriteOnly(1024);
+ TBinaryProtocol protocol = new TBinaryProtocol(transport);
+ try {
+ thriftCodec.write(value, protocol);
+ return transport.getBytes();
+ }
+ catch (Exception e) {
+ throw new TProtocolException("Can not serialize the data", e);
+ }
+ }
+}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TransactionHandleThriftCodec.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TransactionHandleThriftCodec.java
new file mode 100644
index 0000000000000..65a96ec5f863d
--- /dev/null
+++ b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TransactionHandleThriftCodec.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.server.thrift;
+
+import com.facebook.airlift.json.JsonCodec;
+import com.facebook.drift.codec.CodecThriftType;
+import com.facebook.drift.codec.metadata.ThriftType;
+import com.facebook.drift.protocol.TProtocolReader;
+import com.facebook.drift.protocol.TProtocolWriter;
+import com.facebook.presto.connector.ConnectorCodecManager;
+import com.facebook.presto.metadata.HandleResolver;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+
+import javax.inject.Inject;
+
+import java.nio.ByteBuffer;
+
+import static java.util.Objects.requireNonNull;
+
+public class TransactionHandleThriftCodec
+ extends AbstractTypedThriftCodec
+{
+ private static final ThriftType THRIFT_TYPE = createThriftType(ConnectorTransactionHandle.class);
+ private final ConnectorCodecManager connectorCodecManager;
+
+ @Inject
+ public TransactionHandleThriftCodec(HandleResolver handleResolver, ConnectorCodecManager connectorCodecManager, JsonCodec jsonCodec)
+ {
+ super(ConnectorTransactionHandle.class,
+ requireNonNull(jsonCodec, "jsonCodec is null"),
+ requireNonNull(handleResolver, "handleResolver is null")::getId,
+ handleResolver::getTransactionHandleClass);
+ this.connectorCodecManager = requireNonNull(connectorCodecManager, "connectorThriftCodecManager is null");
+ }
+
+ @CodecThriftType
+ public static ThriftType getThriftType()
+ {
+ return THRIFT_TYPE;
+ }
+
+ @Override
+ public ThriftType getType()
+ {
+ return THRIFT_TYPE;
+ }
+
+ @Override
+ public ConnectorTransactionHandle readConcreteValue(String connectorId, TProtocolReader reader)
+ throws Exception
+ {
+ ByteBuffer byteBuffer = reader.readBinary();
+ assert (byteBuffer.position() == 0);
+ byte[] bytes = byteBuffer.array();
+ return connectorCodecManager.getTransactionHandleCodec(connectorId).map(codec -> codec.deserialize(bytes)).orElse(null);
+ }
+
+ @Override
+ public void writeConcreteValue(String connectorId, ConnectorTransactionHandle value, TProtocolWriter writer)
+ throws Exception
+ {
+ requireNonNull(value, "value is null");
+ writer.writeBinary(ByteBuffer.wrap(connectorCodecManager.getTransactionHandleCodec(connectorId).map(codec -> codec.serialize(value)).orElseThrow(() -> new IllegalArgumentException("Can not serialize " + value))));
+ }
+
+ @Override
+ public boolean isThriftCodecAvailable(String connectorId)
+ {
+ return connectorCodecManager.getTransactionHandleCodec(connectorId).isPresent();
+ }
+}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/split/RemoteSplit.java b/presto-main-base/src/main/java/com/facebook/presto/split/RemoteSplit.java
index c91d7f1e38395..b87429a880615 100644
--- a/presto-main-base/src/main/java/com/facebook/presto/split/RemoteSplit.java
+++ b/presto-main-base/src/main/java/com/facebook/presto/split/RemoteSplit.java
@@ -13,6 +13,9 @@
*/
package com.facebook.presto.split;
+import com.facebook.drift.annotations.ThriftConstructor;
+import com.facebook.drift.annotations.ThriftField;
+import com.facebook.drift.annotations.ThriftStruct;
import com.facebook.presto.execution.Location;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.spi.ConnectorSplit;
@@ -29,6 +32,7 @@
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;
+@ThriftStruct
public class RemoteSplit
implements ConnectorSplit
{
@@ -36,6 +40,7 @@ public class RemoteSplit
private final TaskId remoteSourceTaskId;
@JsonCreator
+ @ThriftConstructor
public RemoteSplit(@JsonProperty("location") Location location, @JsonProperty("remoteSourceTaskId") TaskId remoteSourceTaskId)
{
this.location = requireNonNull(location, "location is null");
@@ -43,12 +48,14 @@ public RemoteSplit(@JsonProperty("location") Location location, @JsonProperty("r
}
@JsonProperty
+ @ThriftField(1)
public Location getLocation()
{
return location;
}
@JsonProperty
+ @ThriftField(2)
public TaskId getRemoteSourceTaskId()
{
return remoteSourceTaskId;
diff --git a/presto-main-base/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java b/presto-main-base/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java
index 7d92bc2d27139..67e1c396d3c4b 100644
--- a/presto-main-base/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java
+++ b/presto-main-base/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java
@@ -14,6 +14,7 @@
package com.facebook.presto.testing;
import com.facebook.airlift.node.NodeInfo;
+import com.facebook.drift.codec.ThriftCodecManager;
import com.facebook.presto.ClientRequestFilterManager;
import com.facebook.presto.GroupByHashPageIndexerFactory;
import com.facebook.presto.PagesIndexPageSorter;
@@ -26,6 +27,7 @@
import com.facebook.presto.common.block.SortOrder;
import com.facebook.presto.common.type.BooleanType;
import com.facebook.presto.common.type.Type;
+import com.facebook.presto.connector.ConnectorCodecManager;
import com.facebook.presto.connector.ConnectorManager;
import com.facebook.presto.connector.system.AnalyzePropertiesSystemTable;
import com.facebook.presto.connector.system.CatalogSystemTable;
@@ -506,7 +508,8 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig,
new RowExpressionDeterminismEvaluator(metadata.getFunctionAndTypeManager()),
new FilterStatsCalculator(metadata, scalarStatsCalculator, statsNormalizer),
blockEncodingManager,
- featuresConfig);
+ featuresConfig,
+ new ConnectorCodecManager(ThriftCodecManager::new));
GlobalSystemConnectorFactory globalSystemConnectorFactory = new GlobalSystemConnectorFactory(ImmutableSet.of(
new NodeSystemTable(nodeManager),
diff --git a/presto-main-base/src/main/java/com/facebook/presto/thrift/RemoteCodecProvider.java b/presto-main-base/src/main/java/com/facebook/presto/thrift/RemoteCodecProvider.java
new file mode 100644
index 0000000000000..fa68ecc23d993
--- /dev/null
+++ b/presto-main-base/src/main/java/com/facebook/presto/thrift/RemoteCodecProvider.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.thrift;
+
+import com.facebook.drift.codec.ThriftCodecManager;
+import com.facebook.presto.spi.ConnectorCodec;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.connector.ConnectorCodecProvider;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.google.inject.Provider;
+
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+public class RemoteCodecProvider
+ implements ConnectorCodecProvider
+{
+ private final Provider thriftCodecManagerProvider;
+
+ public RemoteCodecProvider(Provider thriftCodecManagerProvider)
+ {
+ this.thriftCodecManagerProvider = requireNonNull(thriftCodecManagerProvider, "thriftCodecManagerProvider is null");
+ }
+
+ @Override
+ public Optional> getConnectorSplitCodec()
+ {
+ return Optional.of(new RemoteSplitCodec(thriftCodecManagerProvider));
+ }
+
+ @Override
+ public Optional> getConnectorTransactionHandleCodec()
+ {
+ return Optional.of(new RemoteTransactionHandleCodec(thriftCodecManagerProvider));
+ }
+}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/thrift/RemoteSplitCodec.java b/presto-main-base/src/main/java/com/facebook/presto/thrift/RemoteSplitCodec.java
new file mode 100644
index 0000000000000..f8d1c90f1c35f
--- /dev/null
+++ b/presto-main-base/src/main/java/com/facebook/presto/thrift/RemoteSplitCodec.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.thrift;
+
+import com.facebook.drift.codec.ThriftCodecManager;
+import com.facebook.drift.protocol.TProtocolException;
+import com.facebook.presto.spi.ConnectorCodec;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.PrestoException;
+import com.facebook.presto.split.RemoteSplit;
+import com.google.inject.Provider;
+
+import static com.facebook.presto.server.thrift.ThriftCodecUtils.fromThrift;
+import static com.facebook.presto.server.thrift.ThriftCodecUtils.toThrift;
+import static com.facebook.presto.spi.StandardErrorCode.INVALID_ARGUMENTS;
+import static java.util.Objects.requireNonNull;
+
+public class RemoteSplitCodec
+ implements ConnectorCodec
+{
+ private final Provider thriftCodecManagerProvider;
+
+ public RemoteSplitCodec(Provider thriftCodecManagerProvider)
+ {
+ this.thriftCodecManagerProvider = requireNonNull(thriftCodecManagerProvider, "thriftCodecManagerProvider is null");
+ }
+
+ @Override
+ public byte[] serialize(ConnectorSplit split)
+ {
+ try {
+ return toThrift((RemoteSplit) split, thriftCodecManagerProvider.get().getCodec(RemoteSplit.class));
+ }
+ catch (TProtocolException e) {
+ throw new PrestoException(INVALID_ARGUMENTS, "Can not serialize remote split", e);
+ }
+ }
+
+ @Override
+ public ConnectorSplit deserialize(byte[] bytes)
+ {
+ try {
+ return fromThrift(bytes, thriftCodecManagerProvider.get().getCodec(RemoteSplit.class));
+ }
+ catch (TProtocolException e) {
+ throw new PrestoException(INVALID_ARGUMENTS, "Can not deserialize remote split", e);
+ }
+ }
+}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/thrift/RemoteTransactionHandleCodec.java b/presto-main-base/src/main/java/com/facebook/presto/thrift/RemoteTransactionHandleCodec.java
new file mode 100644
index 0000000000000..b3a664042764a
--- /dev/null
+++ b/presto-main-base/src/main/java/com/facebook/presto/thrift/RemoteTransactionHandleCodec.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.thrift;
+
+import com.facebook.drift.codec.ThriftCodecManager;
+import com.facebook.drift.protocol.TProtocolException;
+import com.facebook.presto.metadata.RemoteTransactionHandle;
+import com.facebook.presto.spi.ConnectorCodec;
+import com.facebook.presto.spi.PrestoException;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.google.inject.Provider;
+
+import static com.facebook.presto.server.thrift.ThriftCodecUtils.fromThrift;
+import static com.facebook.presto.server.thrift.ThriftCodecUtils.toThrift;
+import static com.facebook.presto.spi.StandardErrorCode.INVALID_ARGUMENTS;
+import static java.util.Objects.requireNonNull;
+
+public class RemoteTransactionHandleCodec
+ implements ConnectorCodec
+{
+ private final Provider thriftCodecManagerProvider;
+
+ public RemoteTransactionHandleCodec(Provider thriftCodecManagerProvider)
+ {
+ this.thriftCodecManagerProvider = requireNonNull(thriftCodecManagerProvider, "thriftCodecManagerProvider is null");
+ }
+
+ @Override
+ public byte[] serialize(ConnectorTransactionHandle handle)
+ {
+ try {
+ return toThrift((RemoteTransactionHandle) handle, thriftCodecManagerProvider.get().getCodec(RemoteTransactionHandle.class));
+ }
+ catch (TProtocolException e) {
+ throw new PrestoException(INVALID_ARGUMENTS, "Can not serialize remote transaction handle", e);
+ }
+ }
+
+ @Override
+ public ConnectorTransactionHandle deserialize(byte[] bytes)
+ {
+ try {
+ return fromThrift(bytes, thriftCodecManagerProvider.get().getCodec(RemoteTransactionHandle.class));
+ }
+ catch (TProtocolException e) {
+ throw new PrestoException(INVALID_ARGUMENTS, "Can not deserialize remote transaction handle", e);
+ }
+ }
+}
diff --git a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestGroupInnerJoinsByConnectorRuleSet.java b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestGroupInnerJoinsByConnectorRuleSet.java
index 3e3fbf6089d8d..d29241c1531a0 100644
--- a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestGroupInnerJoinsByConnectorRuleSet.java
+++ b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestGroupInnerJoinsByConnectorRuleSet.java
@@ -201,7 +201,7 @@ public void testDPartialPushDownTwoDifferentConnectors()
.matches(
project(
filter(
- "a1 = b1 and a1 = c1 and true",
+ "a1 = b1 and a1 = c1 and true",
join(
JoinTableScanMatcher.tableScan(CATALOG_SUPPORTING_JOIN_PUSHDOWN, tableHandle1, "a1", "a2", "c1", "c2"),
JoinTableScanMatcher.tableScan(LOCAL, tableHandle2, "b1", "b2")))));
@@ -277,7 +277,7 @@ public void testJoinPushDownHappenedWithFilters()
.matches(
project(
filter(
- "a1 = a2 and a1 > b1 and true",
+ "a1 = a2 and a1 > b1 and true",
JoinTableScanMatcher.tableScan(CATALOG_SUPPORTING_JOIN_PUSHDOWN, tableHandle, "a1", "a2", "b1"))));
}
@@ -339,11 +339,11 @@ public void testPushDownWithTwoDifferentConnectors()
tableScan(CATALOG_SUPPORTING_JOIN_PUSHDOWN, "b1", "b2"),
tableScan(OTHER_CATALOG_SUPPORTING_JOIN_PUSHDOWN, "c1", "c2"),
new EquiJoinClause(newBigintVariable("b1"), newBigintVariable("c1"))),
- new EquiJoinClause(newBigintVariable("c1"), newBigintVariable("d1"))))
+ new EquiJoinClause(newBigintVariable("c1"), newBigintVariable("d1"))))
.matches(
project(
filter(
- "((a1 = b1 and a1 = d1) and (b1 = c1 and c1 = d1)) and true",
+ "((a1 = b1 and a1 = d1) and (b1 = c1 and c1 = d1)) and true",
join(
JoinTableScanMatcher.tableScan(CATALOG_SUPPORTING_JOIN_PUSHDOWN, tableHandle1, "a1", "b1"),
JoinTableScanMatcher.tableScan(OTHER_CATALOG_SUPPORTING_JOIN_PUSHDOWN, tableHandle2, "c1", "d1")))));
@@ -352,9 +352,7 @@ public void testPushDownWithTwoDifferentConnectors()
private RuleAssert assertGroupInnerJoinsByConnectorRuleSet()
{
// For testing, we do not wish to push down pulled up predicates
- return tester.assertThat(new GroupInnerJoinsByConnectorRuleSet.OnlyJoinRule(tester.getMetadata(),
- (plan, session, types, variableAllocator, idAllocator, warningCollector) ->
- PlanOptimizerResult.optimizerResult(plan, false)),
+ return tester.assertThat(new GroupInnerJoinsByConnectorRuleSet.OnlyJoinRule(tester.getMetadata(), (plan, session, types, variableAllocator, idAllocator, warningCollector) -> PlanOptimizerResult.optimizerResult(plan, false)),
ImmutableList.of(CATALOG_SUPPORTING_JOIN_PUSHDOWN, OTHER_CATALOG_SUPPORTING_JOIN_PUSHDOWN));
}
@@ -473,8 +471,8 @@ public MatchResult detailMatches(PlanNode node, StatsProvider stats, Session ses
ConnectorTableHandle connectorHandle = otherTable.getConnectorHandle();
if (connectorId.equals(otherTable.getConnectorId()) && Objects.equals(otherTable.getConnectorId(), this.tableHandle.getConnectorId()) &&
- Objects.equals(otherTable.getConnectorHandle(), this.tableHandle.getConnectorHandle()) &&
- Objects.equals(otherTable.getLayout().isPresent(), this.tableHandle.getLayout().isPresent())) {
+ Objects.equals(otherTable.getConnectorHandle(), this.tableHandle.getConnectorHandle()) &&
+ Objects.equals(otherTable.getLayout().isPresent(), this.tableHandle.getLayout().isPresent())) {
return MatchResult.match(SymbolAliases.builder().putAll(Arrays.stream(columns).collect(toMap(identity(), SymbolReference::new))).build());
}
diff --git a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java
index 2b2772d7d0b51..ddb834d818dfc 100644
--- a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java
+++ b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java
@@ -41,6 +41,7 @@
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
+import com.facebook.presto.connector.ConnectorCodecManager;
import com.facebook.presto.connector.ConnectorManager;
import com.facebook.presto.connector.system.SystemConnectorModule;
import com.facebook.presto.cost.FilterStatsCalculator;
@@ -98,7 +99,6 @@
import com.facebook.presto.metadata.SchemaPropertyManager;
import com.facebook.presto.metadata.SessionPropertyManager;
import com.facebook.presto.metadata.SessionPropertyProviderConfig;
-import com.facebook.presto.metadata.Split;
import com.facebook.presto.metadata.StaticCatalogStore;
import com.facebook.presto.metadata.StaticCatalogStoreConfig;
import com.facebook.presto.metadata.StaticFunctionNamespaceStore;
@@ -141,8 +141,7 @@
import com.facebook.presto.resourcemanager.ResourceManagerResourceGroupService;
import com.facebook.presto.server.remotetask.HttpLocationFactory;
import com.facebook.presto.server.thrift.FixedAddressSelector;
-import com.facebook.presto.server.thrift.SplitCodec;
-import com.facebook.presto.server.thrift.TableWriteInfoCodec;
+import com.facebook.presto.server.thrift.HandleThriftModule;
import com.facebook.presto.server.thrift.ThriftServerInfoClient;
import com.facebook.presto.server.thrift.ThriftServerInfoService;
import com.facebook.presto.server.thrift.ThriftTaskClient;
@@ -426,10 +425,13 @@ else if (serverConfig.isCoordinator()) {
binder.bind(TaskManagementExecutor.class).in(Scopes.SINGLETON);
install(new DefaultThriftCodecsModule());
+ // handle resolve for thrift
+ binder.install(new HandleThriftModule());
+
thriftCodecBinder(binder).bindCustomThriftCodec(SqlInvokedFunctionCodec.class);
thriftCodecBinder(binder).bindCustomThriftCodec(SqlFunctionIdCodec.class);
- thriftCodecBinder(binder).bindCustomThriftCodec(SplitCodec.class);
- thriftCodecBinder(binder).bindCustomThriftCodec(TableWriteInfoCodec.class);
+
+ binder.bind(ConnectorCodecManager.class).in(Scopes.SINGLETON);
jsonCodecBinder(binder).bindListJsonCodec(TaskMemoryReservationSummary.class);
binder.bind(SqlTaskManager.class).in(Scopes.SINGLETON);
@@ -563,7 +565,6 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon
jsonCodecBinder(binder).bindJsonCodec(TableCommitContext.class);
jsonCodecBinder(binder).bindJsonCodec(SqlInvokedFunction.class);
jsonCodecBinder(binder).bindJsonCodec(TaskSource.class);
- jsonCodecBinder(binder).bindJsonCodec(Split.class);
jsonCodecBinder(binder).bindJsonCodec(TableWriteInfo.class);
smileCodecBinder(binder).bindSmileCodec(TaskStatus.class);
smileCodecBinder(binder).bindSmileCodec(TaskInfo.class);
diff --git a/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTask.java b/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTask.java
index 5947963043a7c..a5a5a1bf88a65 100644
--- a/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTask.java
+++ b/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTask.java
@@ -33,6 +33,7 @@
import com.facebook.presto.common.ErrorCode;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
+import com.facebook.presto.connector.ConnectorCodecManager;
import com.facebook.presto.execution.Lifespan;
import com.facebook.presto.execution.NodeTaskMap;
import com.facebook.presto.execution.QueryManagerConfig;
@@ -56,9 +57,21 @@
import com.facebook.presto.metadata.Split;
import com.facebook.presto.server.InternalCommunicationConfig;
import com.facebook.presto.server.TaskUpdateRequest;
-import com.facebook.presto.server.thrift.SplitCodec;
-import com.facebook.presto.server.thrift.TableWriteInfoCodec;
+import com.facebook.presto.server.thrift.ConnectorSplitThriftCodec;
+import com.facebook.presto.server.thrift.DeleteTableHandleThriftCodec;
+import com.facebook.presto.server.thrift.InsertTableHandleThriftCodec;
+import com.facebook.presto.server.thrift.OutputTableHandleThriftCodec;
+import com.facebook.presto.server.thrift.TableHandleThriftCodec;
+import com.facebook.presto.server.thrift.TableLayoutHandleThriftCodec;
+import com.facebook.presto.server.thrift.TransactionHandleThriftCodec;
+import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorId;
+import com.facebook.presto.spi.ConnectorInsertTableHandle;
+import com.facebook.presto.spi.ConnectorOutputTableHandle;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.sql.Serialization;
@@ -74,6 +87,7 @@
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.Provides;
+import com.google.inject.Scopes;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.testng.annotations.DataProvider;
@@ -370,14 +384,28 @@ public void configure(Binder binder)
jsonCodecBinder(binder).bindJsonCodec(TaskUpdateRequest.class);
jsonCodecBinder(binder).bindJsonCodec(PlanFragment.class);
jsonCodecBinder(binder).bindJsonCodec(TableWriteInfo.class);
- jsonCodecBinder(binder).bindJsonCodec(Split.class);
jsonBinder(binder).addKeySerializerBinding(VariableReferenceExpression.class).to(Serialization.VariableReferenceExpressionSerializer.class);
jsonBinder(binder).addKeyDeserializerBinding(VariableReferenceExpression.class).to(Serialization.VariableReferenceExpressionDeserializer.class);
+ jsonCodecBinder(binder).bindJsonCodec(ConnectorSplit.class);
+ jsonCodecBinder(binder).bindJsonCodec(ConnectorTransactionHandle.class);
+ jsonCodecBinder(binder).bindJsonCodec(ConnectorOutputTableHandle.class);
+ jsonCodecBinder(binder).bindJsonCodec(ConnectorDeleteTableHandle.class);
+ jsonCodecBinder(binder).bindJsonCodec(ConnectorInsertTableHandle.class);
+ jsonCodecBinder(binder).bindJsonCodec(ConnectorTableHandle.class);
+ jsonCodecBinder(binder).bindJsonCodec(ConnectorTableLayoutHandle.class);
+
+ binder.bind(ConnectorCodecManager.class).in(Scopes.SINGLETON);
+
+ thriftCodecBinder(binder).bindCustomThriftCodec(ConnectorSplitThriftCodec.class);
+ thriftCodecBinder(binder).bindCustomThriftCodec(TransactionHandleThriftCodec.class);
+ thriftCodecBinder(binder).bindCustomThriftCodec(OutputTableHandleThriftCodec.class);
+ thriftCodecBinder(binder).bindCustomThriftCodec(InsertTableHandleThriftCodec.class);
+ thriftCodecBinder(binder).bindCustomThriftCodec(DeleteTableHandleThriftCodec.class);
+ thriftCodecBinder(binder).bindCustomThriftCodec(TableHandleThriftCodec.class);
+ thriftCodecBinder(binder).bindCustomThriftCodec(TableLayoutHandleThriftCodec.class);
thriftCodecBinder(binder).bindThriftCodec(TaskStatus.class);
thriftCodecBinder(binder).bindThriftCodec(TaskInfo.class);
thriftCodecBinder(binder).bindThriftCodec(TaskUpdateRequest.class);
- thriftCodecBinder(binder).bindCustomThriftCodec(SplitCodec.class);
- thriftCodecBinder(binder).bindCustomThriftCodec(TableWriteInfoCodec.class);
thriftCodecBinder(binder).bindCustomThriftCodec(LocaleToLanguageTagCodec.class);
thriftCodecBinder(binder).bindCustomThriftCodec(JodaDateTimeToEpochMillisThriftCodec.class);
thriftCodecBinder(binder).bindCustomThriftCodec(DurationToMillisThriftCodec.class);
diff --git a/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTaskWithEventLoop.java b/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTaskWithEventLoop.java
index d38892df6df33..e97cd8e37a2aa 100644
--- a/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTaskWithEventLoop.java
+++ b/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTaskWithEventLoop.java
@@ -32,6 +32,7 @@
import com.facebook.presto.common.ErrorCode;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
+import com.facebook.presto.connector.ConnectorCodecManager;
import com.facebook.presto.execution.Lifespan;
import com.facebook.presto.execution.NodeTaskMap;
import com.facebook.presto.execution.QueryManagerConfig;
@@ -54,9 +55,21 @@
import com.facebook.presto.metadata.Split;
import com.facebook.presto.server.InternalCommunicationConfig;
import com.facebook.presto.server.TaskUpdateRequest;
-import com.facebook.presto.server.thrift.SplitCodec;
-import com.facebook.presto.server.thrift.TableWriteInfoCodec;
+import com.facebook.presto.server.thrift.ConnectorSplitThriftCodec;
+import com.facebook.presto.server.thrift.DeleteTableHandleThriftCodec;
+import com.facebook.presto.server.thrift.InsertTableHandleThriftCodec;
+import com.facebook.presto.server.thrift.OutputTableHandleThriftCodec;
+import com.facebook.presto.server.thrift.TableHandleThriftCodec;
+import com.facebook.presto.server.thrift.TableLayoutHandleThriftCodec;
+import com.facebook.presto.server.thrift.TransactionHandleThriftCodec;
+import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorId;
+import com.facebook.presto.spi.ConnectorInsertTableHandle;
+import com.facebook.presto.spi.ConnectorOutputTableHandle;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.sql.Serialization;
@@ -73,6 +86,7 @@
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.Provides;
+import com.google.inject.Scopes;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.testng.annotations.DataProvider;
@@ -378,14 +392,28 @@ public void configure(Binder binder)
jsonCodecBinder(binder).bindJsonCodec(TaskUpdateRequest.class);
jsonCodecBinder(binder).bindJsonCodec(PlanFragment.class);
jsonCodecBinder(binder).bindJsonCodec(TableWriteInfo.class);
- jsonCodecBinder(binder).bindJsonCodec(Split.class);
jsonBinder(binder).addKeySerializerBinding(VariableReferenceExpression.class).to(Serialization.VariableReferenceExpressionSerializer.class);
jsonBinder(binder).addKeyDeserializerBinding(VariableReferenceExpression.class).to(Serialization.VariableReferenceExpressionDeserializer.class);
+ jsonCodecBinder(binder).bindJsonCodec(ConnectorSplit.class);
+ jsonCodecBinder(binder).bindJsonCodec(ConnectorTransactionHandle.class);
+ jsonCodecBinder(binder).bindJsonCodec(ConnectorOutputTableHandle.class);
+ jsonCodecBinder(binder).bindJsonCodec(ConnectorDeleteTableHandle.class);
+ jsonCodecBinder(binder).bindJsonCodec(ConnectorInsertTableHandle.class);
+ jsonCodecBinder(binder).bindJsonCodec(ConnectorTableHandle.class);
+ jsonCodecBinder(binder).bindJsonCodec(ConnectorTableLayoutHandle.class);
+
+ binder.bind(ConnectorCodecManager.class).in(Scopes.SINGLETON);
+
+ thriftCodecBinder(binder).bindCustomThriftCodec(ConnectorSplitThriftCodec.class);
+ thriftCodecBinder(binder).bindCustomThriftCodec(TransactionHandleThriftCodec.class);
+ thriftCodecBinder(binder).bindCustomThriftCodec(OutputTableHandleThriftCodec.class);
+ thriftCodecBinder(binder).bindCustomThriftCodec(InsertTableHandleThriftCodec.class);
+ thriftCodecBinder(binder).bindCustomThriftCodec(DeleteTableHandleThriftCodec.class);
+ thriftCodecBinder(binder).bindCustomThriftCodec(TableHandleThriftCodec.class);
+ thriftCodecBinder(binder).bindCustomThriftCodec(TableLayoutHandleThriftCodec.class);
thriftCodecBinder(binder).bindThriftCodec(TaskStatus.class);
thriftCodecBinder(binder).bindThriftCodec(TaskInfo.class);
thriftCodecBinder(binder).bindThriftCodec(TaskUpdateRequest.class);
- thriftCodecBinder(binder).bindCustomThriftCodec(SplitCodec.class);
- thriftCodecBinder(binder).bindCustomThriftCodec(TableWriteInfoCodec.class);
thriftCodecBinder(binder).bindCustomThriftCodec(LocaleToLanguageTagCodec.class);
thriftCodecBinder(binder).bindCustomThriftCodec(JodaDateTimeToEpochMillisThriftCodec.class);
thriftCodecBinder(binder).bindCustomThriftCodec(DurationToMillisThriftCodec.class);
diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java
index eeb8d1b8b6fc2..3fddc68925c89 100644
--- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java
+++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java
@@ -70,7 +70,8 @@
public class PrestoNativeQueryRunnerUtils
{
- public enum QueryRunnerType {
+ public enum QueryRunnerType
+ {
JAVA,
NATIVE
}
@@ -176,8 +177,8 @@ public HiveQueryRunnerBuilder setFailOnNestedLoopJoin(boolean failOnNestedLoopJo
public HiveQueryRunnerBuilder setUseThrift(boolean useThrift)
{
- this.extraProperties
- .put("experimental.internal-communication.thrift-transport-enabled", String.valueOf(useThrift));
+ this.extraProperties.put("experimental.internal-communication.thrift-transport-enabled", String.valueOf(useThrift));
+ this.extraProperties.put("experimental.internal-communication.task-info-thrift-transport-enabled", String.valueOf(useThrift));
return this;
}
diff --git a/presto-spark-base/pom.xml b/presto-spark-base/pom.xml
index 6926105a4fcaa..e2bcff9c8d4de 100644
--- a/presto-spark-base/pom.xml
+++ b/presto-spark-base/pom.xml
@@ -203,6 +203,11 @@
commons-text
+
+ com.facebook.drift
+ drift-codec
+
+
com.facebook.presto
presto-testng-services
diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java
index ab050bb2c63c2..53abdce4a89ff 100644
--- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java
+++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java
@@ -19,6 +19,7 @@
import com.facebook.airlift.json.smile.SmileCodec;
import com.facebook.airlift.node.NodeConfig;
import com.facebook.airlift.node.NodeInfo;
+import com.facebook.drift.codec.ThriftCodecManager;
import com.facebook.presto.ClientRequestFilterManager;
import com.facebook.presto.GroupByHashPageIndexerFactory;
import com.facebook.presto.PagesIndexPageSorter;
@@ -32,6 +33,7 @@
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
+import com.facebook.presto.connector.ConnectorCodecManager;
import com.facebook.presto.connector.ConnectorManager;
import com.facebook.presto.connector.system.SystemConnectorModule;
import com.facebook.presto.cost.CostCalculator;
@@ -426,6 +428,10 @@ protected void setup(Binder binder)
binder.bind(PageSourceManager.class).in(Scopes.SINGLETON);
binder.bind(PageSourceProvider.class).to(PageSourceManager.class).in(Scopes.SINGLETON);
+ // for thrift serde
+ binder.bind(ThriftCodecManager.class).toInstance(new ThriftCodecManager());
+ binder.bind(ConnectorCodecManager.class).in(Scopes.SINGLETON);
+
// page sink provider
binder.bind(PageSinkManager.class).in(Scopes.SINGLETON);
binder.bind(PageSinkProvider.class).to(PageSinkManager.class).in(Scopes.SINGLETON);
diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorCodec.java b/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorCodec.java
new file mode 100644
index 0000000000000..baded6e02eb8e
--- /dev/null
+++ b/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorCodec.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.spi;
+
+import com.facebook.presto.spi.api.Experimental;
+
+@Experimental
+public interface ConnectorCodec
+{
+ byte[] serialize(T value);
+
+ T deserialize(byte[] bytes);
+}
diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/SchemaTableName.java b/presto-spi/src/main/java/com/facebook/presto/spi/SchemaTableName.java
index 349c41e7eb205..b34f1309e9b28 100644
--- a/presto-spi/src/main/java/com/facebook/presto/spi/SchemaTableName.java
+++ b/presto-spi/src/main/java/com/facebook/presto/spi/SchemaTableName.java
@@ -49,14 +49,14 @@ public static SchemaTableName valueOf(String schemaTableName)
}
@JsonProperty("schema")
- @ThriftField(1)
+ @ThriftField(value = 1, name = "schema")
public String getSchemaName()
{
return schemaName;
}
@JsonProperty("table")
- @ThriftField(2)
+ @ThriftField(value = 2, name = "table")
public String getTableName()
{
return tableName;
diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/SplitContext.java b/presto-spi/src/main/java/com/facebook/presto/spi/SplitContext.java
index 16ccbfa188a9d..9b686fd7fdaf9 100644
--- a/presto-spi/src/main/java/com/facebook/presto/spi/SplitContext.java
+++ b/presto-spi/src/main/java/com/facebook/presto/spi/SplitContext.java
@@ -13,6 +13,9 @@
*/
package com.facebook.presto.spi;
+import com.facebook.drift.annotations.ThriftConstructor;
+import com.facebook.drift.annotations.ThriftField;
+import com.facebook.drift.annotations.ThriftStruct;
import com.facebook.presto.common.predicate.TupleDomain;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -22,6 +25,7 @@
import static java.util.Objects.requireNonNull;
// TODO: Use builder pattern for SplitContext if we are to add optional field
+@ThriftStruct
public class SplitContext
{
public static final SplitContext NON_CACHEABLE = new SplitContext(false);
@@ -31,6 +35,7 @@ public class SplitContext
private final Optional> dynamicFilterPredicate;
@JsonCreator
+ @ThriftConstructor
public SplitContext(@JsonProperty boolean cacheable)
{
this(cacheable, Optional.empty());
@@ -48,6 +53,7 @@ private SplitContext(boolean cacheable, Optional> dyna
}
@JsonProperty
+ @ThriftField(1)
public boolean isCacheable()
{
return cacheable;
diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/SplitWeight.java b/presto-spi/src/main/java/com/facebook/presto/spi/SplitWeight.java
index cc8289de8d312..b7f5843fa375f 100644
--- a/presto-spi/src/main/java/com/facebook/presto/spi/SplitWeight.java
+++ b/presto-spi/src/main/java/com/facebook/presto/spi/SplitWeight.java
@@ -13,6 +13,9 @@
*/
package com.facebook.presto.spi;
+import com.facebook.drift.annotations.ThriftConstructor;
+import com.facebook.drift.annotations.ThriftField;
+import com.facebook.drift.annotations.ThriftStruct;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
@@ -23,6 +26,7 @@
import static java.lang.Math.addExact;
import static java.lang.Math.multiplyExact;
+@ThriftStruct
public final class SplitWeight
{
private static final long UNIT_VALUE = 100;
@@ -31,7 +35,8 @@ public final class SplitWeight
private final long value;
- private SplitWeight(long value)
+ @ThriftConstructor
+ public SplitWeight(long value)
{
if (value <= 0) {
throw new IllegalArgumentException("value must be > 0, found: " + value);
@@ -43,6 +48,7 @@ private SplitWeight(long value)
* @return The internal integer representation for this weight value
*/
@JsonValue
+ @ThriftField(value = 1, name = "value")
public long getRawValue()
{
return value;
diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/TableHandle.java b/presto-spi/src/main/java/com/facebook/presto/spi/TableHandle.java
index bbaa4a9422f3c..51df365849ee0 100644
--- a/presto-spi/src/main/java/com/facebook/presto/spi/TableHandle.java
+++ b/presto-spi/src/main/java/com/facebook/presto/spi/TableHandle.java
@@ -13,6 +13,9 @@
*/
package com.facebook.presto.spi;
+import com.facebook.drift.annotations.ThriftConstructor;
+import com.facebook.drift.annotations.ThriftField;
+import com.facebook.drift.annotations.ThriftStruct;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -24,6 +27,7 @@
import static java.util.Objects.requireNonNull;
+@ThriftStruct
public final class TableHandle
{
private final ConnectorId connectorId;
@@ -47,6 +51,16 @@ public TableHandle(
this(connectorId, connectorHandle, transaction, layout, Optional.empty());
}
+ @ThriftConstructor
+ public TableHandle(
+ ConnectorId connectorId,
+ ConnectorTableHandle connectorHandle,
+ ConnectorTransactionHandle transaction,
+ ConnectorTableLayoutHandle connectorTableLayout)
+ {
+ this(connectorId, connectorHandle, transaction, Optional.of(connectorTableLayout), Optional.empty());
+ }
+
public TableHandle(
ConnectorId connectorId,
ConnectorTableHandle connectorHandle,
@@ -62,18 +76,21 @@ public TableHandle(
}
@JsonProperty
+ @ThriftField(1)
public ConnectorId getConnectorId()
{
return connectorId;
}
@JsonProperty
+ @ThriftField(2)
public ConnectorTableHandle getConnectorHandle()
{
return connectorHandle;
}
@JsonProperty
+ @ThriftField(3)
public ConnectorTransactionHandle getTransaction()
{
return transaction;
@@ -85,6 +102,12 @@ public Optional getLayout()
return layout;
}
+ @ThriftField(value = 4, name = "connectorTableLayout")
+ public ConnectorTableLayoutHandle getLayoutHandle()
+ {
+ return layout.orElse(null);
+ }
+
public Optional>> getDynamicFilter()
{
return dynamicFilter;
diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/connector/Connector.java b/presto-spi/src/main/java/com/facebook/presto/spi/connector/Connector.java
index f4603e66016c1..6fd46075ff1c8 100644
--- a/presto-spi/src/main/java/com/facebook/presto/spi/connector/Connector.java
+++ b/presto-spi/src/main/java/com/facebook/presto/spi/connector/Connector.java
@@ -85,6 +85,14 @@ default ConnectorPlanOptimizerProvider getConnectorPlanOptimizerProvider()
throw new UnsupportedOperationException();
}
+ /**
+ * @throws UnsupportedOperationException if this connector does not support connector specific codec
+ */
+ default ConnectorCodecProvider getConnectorCodecProvider()
+ {
+ throw new UnsupportedOperationException();
+ }
+
/**
* @return the set of system tables provided by this connector
*/
diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorCodecProvider.java b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorCodecProvider.java
new file mode 100644
index 0000000000000..4bd2d81d456b4
--- /dev/null
+++ b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorCodecProvider.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.spi.connector;
+
+import com.facebook.presto.spi.ConnectorCodec;
+import com.facebook.presto.spi.ConnectorDeleteTableHandle;
+import com.facebook.presto.spi.ConnectorInsertTableHandle;
+import com.facebook.presto.spi.ConnectorOutputTableHandle;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+
+import java.util.Optional;
+
+public interface ConnectorCodecProvider
+{
+ default Optional> getConnectorSplitCodec()
+ {
+ return Optional.empty();
+ }
+
+ default Optional> getConnectorTransactionHandleCodec()
+ {
+ return Optional.empty();
+ }
+
+ default Optional> getConnectorOutputTableHandleCodec()
+ {
+ return Optional.empty();
+ }
+
+ default Optional> getConnectorInsertTableHandleCodec()
+ {
+ return Optional.empty();
+ }
+
+ default Optional> getConnectorDeleteTableHandleCodec()
+ {
+ return Optional.empty();
+ }
+
+ default Optional> getConnectorTableLayoutHandleCodec()
+ {
+ return Optional.empty();
+ }
+
+ default Optional> getConnectorTableHandleCodec()
+ {
+ return Optional.empty();
+ }
+}
diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/schedule/NodeSelectionStrategy.java b/presto-spi/src/main/java/com/facebook/presto/spi/schedule/NodeSelectionStrategy.java
index 384d5b98e7999..06930f8730853 100644
--- a/presto-spi/src/main/java/com/facebook/presto/spi/schedule/NodeSelectionStrategy.java
+++ b/presto-spi/src/main/java/com/facebook/presto/spi/schedule/NodeSelectionStrategy.java
@@ -13,9 +13,27 @@
*/
package com.facebook.presto.spi.schedule;
+import com.facebook.drift.annotations.ThriftEnum;
+import com.facebook.drift.annotations.ThriftEnumValue;
+
+@ThriftEnum
public enum NodeSelectionStrategy
{
- HARD_AFFINITY,
- SOFT_AFFINITY,
- NO_PREFERENCE
+ HARD_AFFINITY(0),
+ SOFT_AFFINITY(1),
+ NO_PREFERENCE(2),
+ /**/;
+
+ private final int value;
+
+ NodeSelectionStrategy(int value)
+ {
+ this.value = value;
+ }
+
+ @ThriftEnumValue
+ public int getValue()
+ {
+ return value;
+ }
}
diff --git a/presto-thrift-spec/pom.xml b/presto-thrift-spec/pom.xml
index 6b4e3e4baed8c..a403c718fbf18 100644
--- a/presto-thrift-spec/pom.xml
+++ b/presto-thrift-spec/pom.xml
@@ -20,6 +20,7 @@
com.facebook.presto
presto-common
+
com.facebook.presto
presto-main-base
@@ -53,14 +54,30 @@
com.facebook.presto.execution.TaskStatus
com.facebook.presto.execution.TaskInfo
com.facebook.presto.server.TaskUpdateRequest
+ com.facebook.presto.spi.ConnectorSplit
+ com.facebook.presto.spi.connector.ConnectorTransactionHandle
+ com.facebook.presto.spi.ConnectorOutputTableHandle
+ com.facebook.presto.spi.ConnectorDeleteTableHandle
+ com.facebook.presto.spi.ConnectorInsertTableHandle
+ com.facebook.presto.spi.ConnectorTableHandle
+ com.facebook.presto.spi.ConnectorTableLayoutHandle
+
+ facebook.presto.thrift
+
- com.facebook.presto.server.thrift.SplitCodec
- com.facebook.presto.server.thrift.TableWriteInfoCodec
+ com.facebook.presto.server.thrift.ConnectorSplitThriftCodec
+ com.facebook.presto.server.thrift.TransactionHandleThriftCodec
+ com.facebook.presto.server.thrift.OutputTableHandleThriftCodec
+ com.facebook.presto.server.thrift.DeleteTableHandleThriftCodec
+ com.facebook.presto.server.thrift.InsertTableHandleThriftCodec
+ com.facebook.presto.server.thrift.TableHandleThriftCodec
+ com.facebook.presto.server.thrift.TableLayoutHandleThriftCodec
com.facebook.drift.codec.utils.DurationToMillisThriftCodec
com.facebook.drift.codec.utils.DataSizeToBytesThriftCodec
com.facebook.drift.codec.utils.JodaDateTimeToEpochMillisThriftCodec
com.facebook.drift.codec.utils.LocaleToLanguageTagCodec
+ com.facebook.drift.codec.utils.UuidToLeachSalzBinaryEncodingThriftCodec
com.facebook.drift.codec.internal.builtin.OptionalIntThriftCodec
com.facebook.drift.codec.internal.builtin.OptionalLongThriftCodec
com.facebook.drift.codec.internal.builtin.OptionalDoubleThriftCodec
diff --git a/presto-tpcds/pom.xml b/presto-tpcds/pom.xml
index a1f4befb094cd..6d2d68f8f7c00 100644
--- a/presto-tpcds/pom.xml
+++ b/presto-tpcds/pom.xml
@@ -42,6 +42,16 @@
jackson-datatype-jdk8
+
+ com.facebook.drift
+ drift-codec
+
+
+
+ com.facebook.drift
+ drift-protocol
+
+
com.facebook.presto
@@ -67,6 +77,12 @@
provided
+
+ io.airlift
+ units
+ provided
+
+
com.fasterxml.jackson.core
jackson-annotations
diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsConnectorFactory.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsConnectorFactory.java
index 092ded05a523c..0e3880c20c022 100644
--- a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsConnectorFactory.java
+++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsConnectorFactory.java
@@ -13,9 +13,11 @@
*/
package com.facebook.presto.tpcds;
+import com.facebook.drift.codec.ThriftCodecManager;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.connector.Connector;
+import com.facebook.presto.spi.connector.ConnectorCodecProvider;
import com.facebook.presto.spi.connector.ConnectorContext;
import com.facebook.presto.spi.connector.ConnectorFactory;
import com.facebook.presto.spi.connector.ConnectorMetadata;
@@ -24,6 +26,7 @@
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.transaction.IsolationLevel;
+import com.facebook.presto.tpcds.thrift.TpcdsCodecProvider;
import java.util.Map;
@@ -103,6 +106,12 @@ public ConnectorNodePartitioningProvider getNodePartitioningProvider()
{
return new TpcdsNodePartitioningProvider(nodeManager, splitsPerNode);
}
+
+ @Override
+ public ConnectorCodecProvider getConnectorCodecProvider()
+ {
+ return new TpcdsCodecProvider(new ThriftCodecManager());
+ }
};
}
diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsSplit.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsSplit.java
index 466a9b96302cc..7ad33fb467e7a 100644
--- a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsSplit.java
+++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsSplit.java
@@ -13,6 +13,9 @@
*/
package com.facebook.presto.tpcds;
+import com.facebook.drift.annotations.ThriftConstructor;
+import com.facebook.drift.annotations.ThriftField;
+import com.facebook.drift.annotations.ThriftStruct;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
@@ -29,6 +32,7 @@
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
+@ThriftStruct
public class TpcdsSplit
implements ConnectorSplit
{
@@ -39,6 +43,7 @@ public class TpcdsSplit
private final boolean noSexism;
@JsonCreator
+ @ThriftConstructor
public TpcdsSplit(
@JsonProperty("tableHandle") TpcdsTableHandle tableHandle,
@JsonProperty("partNumber") int partNumber,
@@ -60,18 +65,21 @@ public TpcdsSplit(
}
@JsonProperty
+ @ThriftField(1)
public TpcdsTableHandle getTableHandle()
{
return tableHandle;
}
@JsonProperty
+ @ThriftField(2)
public int getTotalParts()
{
return totalParts;
}
@JsonProperty
+ @ThriftField(3)
public int getPartNumber()
{
return partNumber;
@@ -90,6 +98,7 @@ public NodeSelectionStrategy getNodeSelectionStrategy()
}
@JsonProperty
+ @ThriftField(4)
public List getAddresses()
{
return addresses;
@@ -102,6 +111,7 @@ public List getPreferredNodes(NodeProvider nodeProvider)
}
@JsonProperty
+ @ThriftField(5)
public boolean isNoSexism()
{
return noSexism;
diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTableHandle.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTableHandle.java
index e2434dc254d78..581b091c1d52c 100644
--- a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTableHandle.java
+++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTableHandle.java
@@ -13,6 +13,9 @@
*/
package com.facebook.presto.tpcds;
+import com.facebook.drift.annotations.ThriftConstructor;
+import com.facebook.drift.annotations.ThriftField;
+import com.facebook.drift.annotations.ThriftStruct;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -22,6 +25,7 @@
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
+@ThriftStruct
public class TpcdsTableHandle
implements ConnectorTableHandle
{
@@ -29,6 +33,7 @@ public class TpcdsTableHandle
private final double scaleFactor;
@JsonCreator
+ @ThriftConstructor
public TpcdsTableHandle(@JsonProperty("tableName") String tableName, @JsonProperty("scaleFactor") double scaleFactor)
{
this.tableName = requireNonNull(tableName, "tableName is null");
@@ -37,12 +42,14 @@ public TpcdsTableHandle(@JsonProperty("tableName") String tableName, @JsonProper
}
@JsonProperty
+ @ThriftField(1)
public String getTableName()
{
return tableName;
}
@JsonProperty
+ @ThriftField(2)
public double getScaleFactor()
{
return scaleFactor;
diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTableLayoutHandle.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTableLayoutHandle.java
index 04b2ed175f482..45e8d9cd1f221 100644
--- a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTableLayoutHandle.java
+++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTableLayoutHandle.java
@@ -13,24 +13,30 @@
*/
package com.facebook.presto.tpcds;
+import com.facebook.drift.annotations.ThriftConstructor;
+import com.facebook.drift.annotations.ThriftField;
+import com.facebook.drift.annotations.ThriftStruct;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import static java.util.Objects.requireNonNull;
+@ThriftStruct
public class TpcdsTableLayoutHandle
implements ConnectorTableLayoutHandle
{
private final TpcdsTableHandle table;
@JsonCreator
+ @ThriftConstructor
public TpcdsTableLayoutHandle(@JsonProperty("table") TpcdsTableHandle table)
{
this.table = requireNonNull(table, "table is null");
}
@JsonProperty
+ @ThriftField(1)
public TpcdsTableHandle getTable()
{
return table;
diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTransactionHandle.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTransactionHandle.java
index 1e8e4d0c99e27..7f362324ec9a8 100644
--- a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTransactionHandle.java
+++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTransactionHandle.java
@@ -13,10 +13,26 @@
*/
package com.facebook.presto.tpcds;
+import com.facebook.drift.annotations.ThriftEnum;
+import com.facebook.drift.annotations.ThriftEnumValue;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+@ThriftEnum
public enum TpcdsTransactionHandle
implements ConnectorTransactionHandle
{
- INSTANCE
+ INSTANCE(1);
+
+ private final int value;
+
+ TpcdsTransactionHandle(int value)
+ {
+ this.value = value;
+ }
+
+ @ThriftEnumValue
+ public int getValue()
+ {
+ return value;
+ }
}
diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/ThriftCodecUtils.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/ThriftCodecUtils.java
new file mode 100644
index 0000000000000..4dbff7ef02038
--- /dev/null
+++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/ThriftCodecUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.tpcds.thrift;
+
+import com.facebook.drift.codec.ThriftCodec;
+import com.facebook.drift.protocol.TBinaryProtocol;
+import com.facebook.drift.protocol.TMemoryBuffer;
+import com.facebook.drift.protocol.TMemoryBufferWriteOnly;
+import com.facebook.drift.protocol.TProtocolException;
+
+public class ThriftCodecUtils
+{
+ private ThriftCodecUtils() {}
+
+ public static T fromThrift(byte[] bytes, ThriftCodec thriftCodec)
+ throws TProtocolException
+ {
+ try {
+ TMemoryBuffer transport = new TMemoryBuffer(bytes.length);
+ transport.write(bytes);
+ TBinaryProtocol protocol = new TBinaryProtocol(transport);
+ return thriftCodec.read(protocol);
+ }
+ catch (Exception e) {
+ throw new TProtocolException("Can not deserialize the data", e);
+ }
+ }
+
+ public static byte[] toThrift(T value, ThriftCodec thriftCodec)
+ throws TProtocolException
+ {
+ TMemoryBufferWriteOnly transport = new TMemoryBufferWriteOnly(1024);
+ TBinaryProtocol protocol = new TBinaryProtocol(transport);
+ try {
+ thriftCodec.write(value, protocol);
+ return transport.getBytes();
+ }
+ catch (Exception e) {
+ throw new TProtocolException("Can not serialize the data", e);
+ }
+ }
+}
diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsCodecProvider.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsCodecProvider.java
new file mode 100644
index 0000000000000..09545c6b96fe7
--- /dev/null
+++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsCodecProvider.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.tpcds.thrift;
+
+import com.facebook.drift.codec.ThriftCodecManager;
+import com.facebook.presto.spi.ConnectorCodec;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.connector.ConnectorCodecProvider;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+public class TpcdsCodecProvider
+ implements ConnectorCodecProvider
+{
+ private final ThriftCodecManager thriftCodecManager;
+
+ public TpcdsCodecProvider(ThriftCodecManager thriftCodecManager)
+ {
+ this.thriftCodecManager = requireNonNull(thriftCodecManager, "thriftCodecManager is null");
+ }
+
+ @Override
+ public Optional> getConnectorSplitCodec()
+ {
+ return Optional.of(new TpcdsSplitCodec(thriftCodecManager));
+ }
+
+ @Override
+ public Optional> getConnectorTransactionHandleCodec()
+ {
+ return Optional.of(new TpcdsTransactionHandleCodec(thriftCodecManager));
+ }
+
+ @Override
+ public Optional> getConnectorTableLayoutHandleCodec()
+ {
+ return Optional.of(new TpcdsTableLayoutHandleCodec(thriftCodecManager));
+ }
+
+ @Override
+ public Optional> getConnectorTableHandleCodec()
+ {
+ return Optional.of(new TpcdsTableHandleCodec(thriftCodecManager));
+ }
+}
diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsSplitCodec.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsSplitCodec.java
new file mode 100644
index 0000000000000..32e45525de417
--- /dev/null
+++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsSplitCodec.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.tpcds.thrift;
+
+import com.facebook.drift.codec.ThriftCodec;
+import com.facebook.drift.codec.ThriftCodecManager;
+import com.facebook.drift.protocol.TProtocolException;
+import com.facebook.presto.spi.ConnectorCodec;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.PrestoException;
+import com.facebook.presto.tpcds.TpcdsSplit;
+
+import static com.facebook.presto.spi.StandardErrorCode.INVALID_ARGUMENTS;
+import static com.facebook.presto.tpcds.thrift.ThriftCodecUtils.fromThrift;
+import static com.facebook.presto.tpcds.thrift.ThriftCodecUtils.toThrift;
+import static java.util.Objects.requireNonNull;
+
+public class TpcdsSplitCodec
+ implements ConnectorCodec
+{
+ private final ThriftCodec thriftCodec;
+
+ public TpcdsSplitCodec(ThriftCodecManager thriftCodecManager)
+ {
+ this.thriftCodec = requireNonNull(thriftCodecManager, "thriftCodecManager is null").getCodec(TpcdsSplit.class);
+ }
+
+ @Override
+ public byte[] serialize(ConnectorSplit split)
+ {
+ try {
+ return toThrift((TpcdsSplit) split, thriftCodec);
+ }
+ catch (TProtocolException e) {
+ throw new PrestoException(INVALID_ARGUMENTS, "Can not serialize tpcds split", e);
+ }
+ }
+
+ @Override
+ public ConnectorSplit deserialize(byte[] bytes)
+ {
+ try {
+ return fromThrift(bytes, thriftCodec);
+ }
+ catch (TProtocolException e) {
+ throw new PrestoException(INVALID_ARGUMENTS, "Can not deserialize tpcds split", e);
+ }
+ }
+}
diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsTableHandleCodec.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsTableHandleCodec.java
new file mode 100644
index 0000000000000..815d981bc1059
--- /dev/null
+++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsTableHandleCodec.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.tpcds.thrift;
+
+import com.facebook.drift.codec.ThriftCodec;
+import com.facebook.drift.codec.ThriftCodecManager;
+import com.facebook.drift.protocol.TProtocolException;
+import com.facebook.presto.spi.ConnectorCodec;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.PrestoException;
+import com.facebook.presto.tpcds.TpcdsTableHandle;
+
+import static com.facebook.presto.spi.StandardErrorCode.INVALID_ARGUMENTS;
+import static com.facebook.presto.tpcds.thrift.ThriftCodecUtils.fromThrift;
+import static com.facebook.presto.tpcds.thrift.ThriftCodecUtils.toThrift;
+import static java.util.Objects.requireNonNull;
+
+public class TpcdsTableHandleCodec
+ implements ConnectorCodec
+{
+ private final ThriftCodec thriftCodec;
+
+ public TpcdsTableHandleCodec(ThriftCodecManager thriftCodecManager)
+ {
+ this.thriftCodec = requireNonNull(thriftCodecManager, "thriftCodecManager is null").getCodec(TpcdsTableHandle.class);
+ }
+
+ @Override
+ public byte[] serialize(ConnectorTableHandle handle)
+ {
+ try {
+ return toThrift((TpcdsTableHandle) handle, thriftCodec);
+ }
+ catch (TProtocolException e) {
+ throw new PrestoException(INVALID_ARGUMENTS, "Can not serialize tpcds table handle", e);
+ }
+ }
+
+ @Override
+ public ConnectorTableHandle deserialize(byte[] bytes)
+ {
+ try {
+ return fromThrift(bytes, thriftCodec);
+ }
+ catch (TProtocolException e) {
+ throw new PrestoException(INVALID_ARGUMENTS, "Can not deserialize tpcds table handle", e);
+ }
+ }
+}
diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsTableLayoutHandleCodec.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsTableLayoutHandleCodec.java
new file mode 100644
index 0000000000000..c7a84e168d3ce
--- /dev/null
+++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsTableLayoutHandleCodec.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.tpcds.thrift;
+
+import com.facebook.drift.codec.ThriftCodec;
+import com.facebook.drift.codec.ThriftCodecManager;
+import com.facebook.drift.protocol.TProtocolException;
+import com.facebook.presto.spi.ConnectorCodec;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.PrestoException;
+import com.facebook.presto.tpcds.TpcdsTableLayoutHandle;
+
+import static com.facebook.presto.spi.StandardErrorCode.INVALID_ARGUMENTS;
+import static com.facebook.presto.tpcds.thrift.ThriftCodecUtils.fromThrift;
+import static com.facebook.presto.tpcds.thrift.ThriftCodecUtils.toThrift;
+import static java.util.Objects.requireNonNull;
+
+public class TpcdsTableLayoutHandleCodec
+ implements ConnectorCodec
+{
+ private final ThriftCodec thriftCodec;
+
+ public TpcdsTableLayoutHandleCodec(ThriftCodecManager thriftCodecManager)
+ {
+ this.thriftCodec = requireNonNull(thriftCodecManager, "thriftCodecManager is null").getCodec(TpcdsTableLayoutHandle.class);
+ }
+
+ @Override
+ public byte[] serialize(ConnectorTableLayoutHandle handle)
+ {
+ try {
+ return toThrift((TpcdsTableLayoutHandle) handle, thriftCodec);
+ }
+ catch (TProtocolException e) {
+ throw new PrestoException(INVALID_ARGUMENTS, "Can not serialize tpcds table Layout handle", e);
+ }
+ }
+
+ @Override
+ public ConnectorTableLayoutHandle deserialize(byte[] bytes)
+ {
+ try {
+ return fromThrift(bytes, thriftCodec);
+ }
+ catch (TProtocolException e) {
+ throw new PrestoException(INVALID_ARGUMENTS, "Can not deserialize tpcds table Layout handle", e);
+ }
+ }
+}
diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsTransactionHandleCodec.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsTransactionHandleCodec.java
new file mode 100644
index 0000000000000..0f98eefb4d40d
--- /dev/null
+++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/thrift/TpcdsTransactionHandleCodec.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.tpcds.thrift;
+
+import com.facebook.drift.codec.ThriftCodec;
+import com.facebook.drift.codec.ThriftCodecManager;
+import com.facebook.drift.protocol.TProtocolException;
+import com.facebook.presto.spi.ConnectorCodec;
+import com.facebook.presto.spi.PrestoException;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.tpcds.TpcdsTransactionHandle;
+
+import static com.facebook.presto.spi.StandardErrorCode.INVALID_ARGUMENTS;
+import static com.facebook.presto.tpcds.thrift.ThriftCodecUtils.fromThrift;
+import static com.facebook.presto.tpcds.thrift.ThriftCodecUtils.toThrift;
+import static java.util.Objects.requireNonNull;
+
+public class TpcdsTransactionHandleCodec
+ implements ConnectorCodec
+{
+ private final ThriftCodec thriftCodec;
+
+ public TpcdsTransactionHandleCodec(ThriftCodecManager thriftCodecManager)
+ {
+ this.thriftCodec = requireNonNull(thriftCodecManager, "thriftCodecManager is null").getCodec(TpcdsTransactionHandle.class);
+ }
+
+ @Override
+ public byte[] serialize(ConnectorTransactionHandle handle)
+ {
+ try {
+ return toThrift((TpcdsTransactionHandle) handle, thriftCodec);
+ }
+ catch (TProtocolException e) {
+ throw new PrestoException(INVALID_ARGUMENTS, "Can not serialize tpcds transaction handle", e);
+ }
+ }
+
+ @Override
+ public ConnectorTransactionHandle deserialize(byte[] bytes)
+ {
+ try {
+ return fromThrift(bytes, thriftCodec);
+ }
+ catch (TProtocolException e) {
+ throw new PrestoException(INVALID_ARGUMENTS, "Can not deserialize tpcds transaction handle", e);
+ }
+ }
+}
diff --git a/presto-tpcds/src/test/java/com/facebook/presto/tpcds/TestTpcdsWithThrift.java b/presto-tpcds/src/test/java/com/facebook/presto/tpcds/TestTpcdsWithThrift.java
new file mode 100644
index 0000000000000..5ce27d9be0815
--- /dev/null
+++ b/presto-tpcds/src/test/java/com/facebook/presto/tpcds/TestTpcdsWithThrift.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.tpcds;
+
+import com.facebook.presto.testing.QueryRunner;
+import com.google.common.collect.ImmutableMap;
+
+public class TestTpcdsWithThrift
+ extends AbstractTestTpcds
+{
+ @Override
+ protected QueryRunner createQueryRunner()
+ throws Exception
+ {
+ return TpcdsQueryRunner.createQueryRunner(ImmutableMap.builder()
+ .put("experimental.internal-communication.task-info-response-thrift-serde-enabled", "true")
+ .put("experimental.internal-communication.task-update-request-thrift-serde-enabled", "true")
+ .build());
+ }
+}