Skip to content

Commit 58c0a15

Browse files
committed
Use thrift serde for split
1 parent a12b523 commit 58c0a15

File tree

24 files changed

+750
-91
lines changed

24 files changed

+750
-91
lines changed

presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaRequirement.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
*/
1414
package com.facebook.presto.hive;
1515

16+
import com.facebook.drift.annotations.ThriftConstructor;
17+
import com.facebook.drift.annotations.ThriftField;
18+
import com.facebook.drift.annotations.ThriftStruct;
1619
import com.fasterxml.jackson.annotation.JsonCreator;
1720
import com.fasterxml.jackson.annotation.JsonProperty;
1821
import io.airlift.units.DataSize;
@@ -24,6 +27,7 @@
2427
import static com.google.common.base.MoreObjects.toStringHelper;
2528
import static java.util.Objects.requireNonNull;
2629

30+
@ThriftStruct
2731
public class CacheQuotaRequirement
2832
{
2933
public static final CacheQuotaRequirement NO_CACHE_REQUIREMENT = new CacheQuotaRequirement(GLOBAL, Optional.empty());
@@ -32,6 +36,7 @@ public class CacheQuotaRequirement
3236
private final Optional<DataSize> quota;
3337

3438
@JsonCreator
39+
@ThriftConstructor
3540
public CacheQuotaRequirement(
3641
@JsonProperty("cacheQuotaScope") CacheQuotaScope cacheQuotaScope,
3742
@JsonProperty("quota") Optional<DataSize> quota)
@@ -41,12 +46,14 @@ public CacheQuotaRequirement(
4146
}
4247

4348
@JsonProperty
49+
@ThriftField(1)
4450
public CacheQuotaScope getCacheQuotaScope()
4551
{
4652
return cacheQuotaScope;
4753
}
4854

4955
@JsonProperty
56+
@ThriftField(2)
5057
public Optional<DataSize> getQuota()
5158
{
5259
return quota;

presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaScope.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,28 @@
1313
*/
1414
package com.facebook.presto.hive;
1515

16+
import com.facebook.drift.annotations.ThriftEnum;
17+
import com.facebook.drift.annotations.ThriftEnumValue;
18+
19+
@ThriftEnum
1620
public enum CacheQuotaScope
1721
{
18-
GLOBAL, SCHEMA, TABLE, PARTITION
22+
GLOBAL(1),
23+
SCHEMA(2),
24+
TABLE(3),
25+
PARTITION(4),
26+
/**/;
27+
28+
private final int value;
29+
30+
CacheQuotaScope(int value)
31+
{
32+
this.value = value;
33+
}
34+
35+
@ThriftEnumValue
36+
public int getValue()
37+
{
38+
return value;
39+
}
1940
}

presto-hive/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@
2828
<artifactId>drift-codec</artifactId>
2929
</dependency>
3030

31+
<dependency>
32+
<groupId>com.facebook.drift</groupId>
33+
<artifactId>drift-protocol</artifactId>
34+
</dependency>
35+
3136
<dependency>
3237
<groupId>com.facebook.drift</groupId>
3338
<artifactId>drift-codec-utils</artifactId>

presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import com.facebook.presto.spi.connector.ConnectorPageSinkProvider;
7575
import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
7676
import com.facebook.presto.spi.connector.ConnectorPlanOptimizerProvider;
77+
import com.facebook.presto.spi.connector.ConnectorSpecificCodecProvider;
7778
import com.facebook.presto.spi.connector.ConnectorSplitManager;
7879
import com.facebook.presto.spi.connector.ConnectorTypeSerdeProvider;
7980
import com.google.common.cache.Cache;
@@ -179,6 +180,7 @@ public void configure(Binder binder)
179180
binder.bind(ConnectorPlanOptimizerProvider.class).to(HivePlanOptimizerProvider.class).in(Scopes.SINGLETON);
180181
binder.bind(ConnectorMetadataUpdaterProvider.class).to(HiveMetadataUpdaterProvider.class).in(Scopes.SINGLETON);
181182
binder.bind(ConnectorTypeSerdeProvider.class).to(HiveConnectorTypeSerdeProvider.class).in(Scopes.SINGLETON);
183+
binder.bind(ConnectorSpecificCodecProvider.class).to(HiveConnectorCodecProvider.class).in(Scopes.SINGLETON);
182184
binder.install(new ThriftCodecModule());
183185
binder.install(new DefaultThriftCodecsModule());
184186
thriftCodecBinder(binder).bindThriftCodec(HiveMetadataUpdateHandle.class);
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.hive;
15+
16+
import com.facebook.drift.codec.ThriftCodecManager;
17+
import com.facebook.presto.spi.ConnectorSpecificCodec;
18+
import com.facebook.presto.spi.ConnectorSplit;
19+
import com.facebook.presto.spi.connector.ConnectorSpecificCodecProvider;
20+
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
21+
22+
import javax.inject.Inject;
23+
24+
import static java.util.Objects.requireNonNull;
25+
26+
public class HiveConnectorCodecProvider
27+
implements ConnectorSpecificCodecProvider
28+
{
29+
private final ThriftCodecManager thriftCodecManager;
30+
31+
@Inject
32+
public HiveConnectorCodecProvider(ThriftCodecManager thriftCodecManager)
33+
{
34+
this.thriftCodecManager = requireNonNull(thriftCodecManager, "thriftCodecManager is null");
35+
}
36+
37+
@Override
38+
public ConnectorSpecificCodec<ConnectorSplit> getConnectorSplitCodec()
39+
{
40+
return new HiveSplitCodec(thriftCodecManager);
41+
}
42+
43+
@Override
44+
public ConnectorSpecificCodec<ConnectorTransactionHandle> getConnectorTransactionHandleCodec()
45+
{
46+
return new HiveTransactionHandleCodec(thriftCodecManager);
47+
}
48+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.hive;
15+
16+
import com.facebook.drift.codec.ThriftCodec;
17+
import com.facebook.drift.codec.ThriftCodecManager;
18+
import com.facebook.drift.protocol.TBinaryProtocol;
19+
import com.facebook.drift.protocol.TMemoryBuffer;
20+
import com.facebook.presto.spi.ConnectorSpecificCodec;
21+
import com.facebook.presto.spi.ConnectorSplit;
22+
23+
import static java.util.Objects.requireNonNull;
24+
25+
public class HiveSplitCodec
26+
implements ConnectorSpecificCodec<ConnectorSplit>
27+
{
28+
private final ThriftCodec<HiveSplit> thriftCodec;
29+
30+
public HiveSplitCodec(ThriftCodecManager thriftCodecManager)
31+
{
32+
this.thriftCodec = requireNonNull(thriftCodecManager, "thriftCodecManager is null").getCodec(HiveSplit.class);
33+
}
34+
35+
@Override
36+
public byte[] serialize(ConnectorSplit split)
37+
{
38+
HiveSplit hiveSplit = (HiveSplit) split;
39+
TMemoryBuffer transport = new TMemoryBuffer(1024);
40+
TBinaryProtocol protocol = new TBinaryProtocol(transport);
41+
try {
42+
thriftCodec.write(hiveSplit, protocol);
43+
return transport.getBytes();
44+
}
45+
catch (Exception e) {
46+
throw new RuntimeException(e);
47+
}
48+
}
49+
50+
@Override
51+
public ConnectorSplit deserialize(byte[] bytes)
52+
{
53+
try {
54+
TMemoryBuffer transport = new TMemoryBuffer(bytes.length);
55+
transport.write(bytes);
56+
TBinaryProtocol protocol = new TBinaryProtocol(transport);
57+
return thriftCodec.read(protocol);
58+
}
59+
catch (Exception e) {
60+
throw new RuntimeException(e);
61+
}
62+
}
63+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.hive;
15+
16+
import com.facebook.drift.codec.ThriftCodec;
17+
import com.facebook.drift.codec.ThriftCodecManager;
18+
import com.facebook.drift.protocol.TBinaryProtocol;
19+
import com.facebook.drift.protocol.TMemoryBuffer;
20+
import com.facebook.presto.spi.ConnectorSpecificCodec;
21+
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
22+
23+
import static java.util.Objects.requireNonNull;
24+
25+
public class HiveTransactionHandleCodec
26+
implements ConnectorSpecificCodec<ConnectorTransactionHandle>
27+
{
28+
private final ThriftCodec<HiveTransactionHandle> thriftCodec;
29+
30+
public HiveTransactionHandleCodec(ThriftCodecManager thriftCodecManager)
31+
{
32+
this.thriftCodec = requireNonNull(thriftCodecManager, "thriftCodecManager is null").getCodec(HiveTransactionHandle.class);
33+
}
34+
35+
@Override
36+
public byte[] serialize(ConnectorTransactionHandle transactionHandle)
37+
{
38+
HiveTransactionHandle hiveTransactionHandle = (HiveTransactionHandle) transactionHandle;
39+
TMemoryBuffer transport = new TMemoryBuffer(1024);
40+
TBinaryProtocol protocol = new TBinaryProtocol(transport);
41+
try {
42+
thriftCodec.write(hiveTransactionHandle, protocol);
43+
return transport.getBytes();
44+
}
45+
catch (Exception e) {
46+
throw new RuntimeException(e);
47+
}
48+
}
49+
50+
@Override
51+
public ConnectorTransactionHandle deserialize(byte[] bytes)
52+
{
53+
try {
54+
TMemoryBuffer transport = new TMemoryBuffer(bytes.length);
55+
transport.write(bytes);
56+
TBinaryProtocol protocol = new TBinaryProtocol(transport);
57+
return thriftCodec.read(protocol);
58+
}
59+
catch (Exception e) {
60+
throw new RuntimeException(e);
61+
}
62+
}
63+
}

presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorManager.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
5151
import com.facebook.presto.spi.connector.ConnectorPlanOptimizerProvider;
5252
import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
53+
import com.facebook.presto.spi.connector.ConnectorSpecificCodecProvider;
5354
import com.facebook.presto.spi.connector.ConnectorSplitManager;
5455
import com.facebook.presto.spi.connector.ConnectorTypeSerdeProvider;
5556
import com.facebook.presto.spi.procedure.Procedure;
@@ -123,6 +124,7 @@ public class ConnectorManager
123124
private final FilterStatsCalculator filterStatsCalculator;
124125
private final BlockEncodingSerde blockEncodingSerde;
125126
private final ConnectorSystemConfig connectorSystemConfig;
127+
private final ConnectorSpecificCodecManager connectorSpecificCodecManager;
126128

127129
@GuardedBy("this")
128130
private final ConcurrentMap<String, ConnectorFactory> connectorFactories = new ConcurrentHashMap<>();
@@ -158,7 +160,8 @@ public ConnectorManager(
158160
DeterminismEvaluator determinismEvaluator,
159161
FilterStatsCalculator filterStatsCalculator,
160162
BlockEncodingSerde blockEncodingSerde,
161-
FeaturesConfig featuresConfig)
163+
FeaturesConfig featuresConfig,
164+
ConnectorSpecificCodecManager connectorSpecificCodecManager)
162165
{
163166
this.metadataManager = requireNonNull(metadataManager, "metadataManager is null");
164167
this.catalogManager = requireNonNull(catalogManager, "catalogManager is null");
@@ -185,6 +188,7 @@ public ConnectorManager(
185188
this.filterStatsCalculator = requireNonNull(filterStatsCalculator, "filterStatsCalculator is null");
186189
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
187190
this.connectorSystemConfig = () -> featuresConfig.isNativeExecutionEnabled();
191+
this.connectorSpecificCodecManager = requireNonNull(connectorSpecificCodecManager, "connectorSpecificCodecManager is null");
188192
}
189193

190194
@PreDestroy
@@ -317,10 +321,12 @@ private synchronized void addConnectorInternal(MaterializedConnector connector)
317321
.ifPresent(metadataUpdaterProvider -> connectorMetadataUpdaterManager.addMetadataUpdaterProvider(connectorId, metadataUpdaterProvider));
318322

319323
connector.getConnectorTypeSerdeProvider()
320-
.ifPresent(
321-
connectorTypeSerdeProvider ->
324+
.ifPresent(
325+
connectorTypeSerdeProvider ->
322326
connectorTypeSerdeManager.addConnectorTypeSerdeProvider(connectorId, connectorTypeSerdeProvider));
323327

328+
connector.getConnectorSpecificCodecProvider().ifPresent(connectorSpecificCodecProvider -> connectorSpecificCodecManager.addConnectorSpecificCodecProvider(connectorId, connectorSpecificCodecProvider));
329+
324330
metadataManager.getProcedureRegistry().addProcedures(connectorId, connector.getProcedures());
325331

326332
connector.getAccessControl()
@@ -412,6 +418,7 @@ private static class MaterializedConnector
412418
private final Optional<ConnectorPlanOptimizerProvider> planOptimizerProvider;
413419
private final Optional<ConnectorMetadataUpdaterProvider> metadataUpdaterProvider;
414420
private final Optional<ConnectorTypeSerdeProvider> connectorTypeSerdeProvider;
421+
private final Optional<ConnectorSpecificCodecProvider> connectorSpecificCodecProvider;
415422
private final Optional<ConnectorAccessControl> accessControl;
416423
private final List<PropertyMetadata<?>> sessionProperties;
417424
private final List<PropertyMetadata<?>> tableProperties;
@@ -510,6 +517,15 @@ public MaterializedConnector(ConnectorId connectorId, Connector connector)
510517
}
511518
this.connectorTypeSerdeProvider = Optional.ofNullable(connectorTypeSerdeProvider);
512519

520+
ConnectorSpecificCodecProvider connectorSpecificCodecProvider = null;
521+
try {
522+
connectorSpecificCodecProvider = connector.getConnectorSpecificCodecProvider();
523+
requireNonNull(connectorSpecificCodecProvider, format("Connector %s returned null connector specific codec provider", connectorId));
524+
}
525+
catch (UnsupportedOperationException ignored) {
526+
}
527+
this.connectorSpecificCodecProvider = Optional.ofNullable(connectorSpecificCodecProvider);
528+
513529
ConnectorAccessControl accessControl = null;
514530
try {
515531
accessControl = connector.getAccessControl();
@@ -628,5 +644,10 @@ public List<PropertyMetadata<?>> getAnalyzeProperties()
628644
{
629645
return analyzeProperties;
630646
}
647+
648+
public Optional<ConnectorSpecificCodecProvider> getConnectorSpecificCodecProvider()
649+
{
650+
return connectorSpecificCodecProvider;
651+
}
631652
}
632653
}

0 commit comments

Comments
 (0)