Skip to content

Commit 8b437b7

Browse files
committed
Use thrift serde for split
1 parent a12b523 commit 8b437b7

File tree

27 files changed

+747
-95
lines changed

27 files changed

+747
-95
lines changed

presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaRequirement.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
*/
1414
package com.facebook.presto.hive;
1515

16+
import com.facebook.drift.annotations.ThriftConstructor;
17+
import com.facebook.drift.annotations.ThriftField;
18+
import com.facebook.drift.annotations.ThriftStruct;
1619
import com.fasterxml.jackson.annotation.JsonCreator;
1720
import com.fasterxml.jackson.annotation.JsonProperty;
1821
import io.airlift.units.DataSize;
@@ -24,6 +27,7 @@
2427
import static com.google.common.base.MoreObjects.toStringHelper;
2528
import static java.util.Objects.requireNonNull;
2629

30+
@ThriftStruct
2731
public class CacheQuotaRequirement
2832
{
2933
public static final CacheQuotaRequirement NO_CACHE_REQUIREMENT = new CacheQuotaRequirement(GLOBAL, Optional.empty());
@@ -32,6 +36,7 @@ public class CacheQuotaRequirement
3236
private final Optional<DataSize> quota;
3337

3438
@JsonCreator
39+
@ThriftConstructor
3540
public CacheQuotaRequirement(
3641
@JsonProperty("cacheQuotaScope") CacheQuotaScope cacheQuotaScope,
3742
@JsonProperty("quota") Optional<DataSize> quota)
@@ -41,12 +46,14 @@ public CacheQuotaRequirement(
4146
}
4247

4348
@JsonProperty
49+
@ThriftField(1)
4450
public CacheQuotaScope getCacheQuotaScope()
4551
{
4652
return cacheQuotaScope;
4753
}
4854

4955
@JsonProperty
56+
@ThriftField(2)
5057
public Optional<DataSize> getQuota()
5158
{
5259
return quota;

presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaScope.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,28 @@
1313
*/
1414
package com.facebook.presto.hive;
1515

16+
import com.facebook.drift.annotations.ThriftEnum;
17+
import com.facebook.drift.annotations.ThriftEnumValue;
18+
19+
@ThriftEnum
1620
public enum CacheQuotaScope
1721
{
18-
GLOBAL, SCHEMA, TABLE, PARTITION
22+
GLOBAL(1),
23+
SCHEMA(2),
24+
TABLE(3),
25+
PARTITION(4),
26+
/**/;
27+
28+
private final int value;
29+
30+
CacheQuotaScope(int value)
31+
{
32+
this.value = value;
33+
}
34+
35+
@ThriftEnumValue
36+
public int getValue()
37+
{
38+
return value;
39+
}
1940
}

presto-hive/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@
2828
<artifactId>drift-codec</artifactId>
2929
</dependency>
3030

31+
<dependency>
32+
<groupId>com.facebook.drift</groupId>
33+
<artifactId>drift-protocol</artifactId>
34+
</dependency>
35+
3136
<dependency>
3237
<groupId>com.facebook.drift</groupId>
3338
<artifactId>drift-codec-utils</artifactId>

presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import com.facebook.presto.spi.connector.ConnectorPageSinkProvider;
7575
import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
7676
import com.facebook.presto.spi.connector.ConnectorPlanOptimizerProvider;
77+
import com.facebook.presto.spi.connector.ConnectorSpecificCodecProvider;
7778
import com.facebook.presto.spi.connector.ConnectorSplitManager;
7879
import com.facebook.presto.spi.connector.ConnectorTypeSerdeProvider;
7980
import com.google.common.cache.Cache;
@@ -179,6 +180,7 @@ public void configure(Binder binder)
179180
binder.bind(ConnectorPlanOptimizerProvider.class).to(HivePlanOptimizerProvider.class).in(Scopes.SINGLETON);
180181
binder.bind(ConnectorMetadataUpdaterProvider.class).to(HiveMetadataUpdaterProvider.class).in(Scopes.SINGLETON);
181182
binder.bind(ConnectorTypeSerdeProvider.class).to(HiveConnectorTypeSerdeProvider.class).in(Scopes.SINGLETON);
183+
binder.bind(ConnectorSpecificCodecProvider.class).to(HiveConnectorCodecProvider.class).in(Scopes.SINGLETON);
182184
binder.install(new ThriftCodecModule());
183185
binder.install(new DefaultThriftCodecsModule());
184186
thriftCodecBinder(binder).bindThriftCodec(HiveMetadataUpdateHandle.class);

presto-hive/src/main/java/com/facebook/presto/hive/HiveConnector.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.facebook.presto.spi.connector.ConnectorPageSinkProvider;
2828
import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
2929
import com.facebook.presto.spi.connector.ConnectorPlanOptimizerProvider;
30+
import com.facebook.presto.spi.connector.ConnectorSpecificCodecProvider;
3031
import com.facebook.presto.spi.connector.ConnectorSplitManager;
3132
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
3233
import com.facebook.presto.spi.connector.ConnectorTypeSerdeProvider;
@@ -96,7 +97,8 @@ public HiveConnector(
9697
ConnectorPlanOptimizerProvider planOptimizerProvider,
9798
ConnectorMetadataUpdaterProvider metadataUpdaterProvider,
9899
ConnectorTypeSerdeProvider connectorTypeSerdeProvider,
99-
ClassLoader classLoader)
100+
ClassLoader classLoader,
101+
ConnectorSpecificCodecProvider connectorSpecificCodecProvider)
100102
{
101103
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
102104
this.metadataFactory = requireNonNull(metadataFactory, "metadata is null");
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.hive;
15+
16+
import com.facebook.drift.codec.ThriftCodecManager;
17+
import com.facebook.presto.spi.ConnectorSpecificCodec;
18+
import com.facebook.presto.spi.ConnectorSplit;
19+
import com.facebook.presto.spi.connector.ConnectorSpecificCodecProvider;
20+
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
21+
22+
import javax.inject.Inject;
23+
24+
import static java.util.Objects.requireNonNull;
25+
26+
public class HiveConnectorCodecProvider
27+
implements ConnectorSpecificCodecProvider
28+
{
29+
private final ThriftCodecManager thriftCodecManager;
30+
31+
@Inject
32+
public HiveConnectorCodecProvider(ThriftCodecManager thriftCodecManager)
33+
{
34+
this.thriftCodecManager = requireNonNull(thriftCodecManager, "thriftCodecManager is null");
35+
}
36+
37+
@Override
38+
public ConnectorSpecificCodec<ConnectorSplit> getConnectorSplitCodec()
39+
{
40+
return new HiveSplitCodec(thriftCodecManager);
41+
}
42+
43+
@Override
44+
public ConnectorSpecificCodec<ConnectorTransactionHandle> getConnectorTransactionHandleCodec()
45+
{
46+
return new HiveTransactionHandleCodec(thriftCodecManager);
47+
}
48+
}

presto-hive/src/main/java/com/facebook/presto/hive/HiveConnectorFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import com.facebook.presto.spi.connector.ConnectorPageSinkProvider;
4444
import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
4545
import com.facebook.presto.spi.connector.ConnectorPlanOptimizerProvider;
46+
import com.facebook.presto.spi.connector.ConnectorSpecificCodecProvider;
4647
import com.facebook.presto.spi.connector.ConnectorSplitManager;
4748
import com.facebook.presto.spi.connector.ConnectorTypeSerdeProvider;
4849
import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSinkProvider;
@@ -160,6 +161,7 @@ public Connector create(String catalogName, Map<String, String> config, Connecto
160161
ConnectorPlanOptimizerProvider planOptimizerProvider = injector.getInstance(ConnectorPlanOptimizerProvider.class);
161162
ConnectorMetadataUpdaterProvider metadataUpdaterProvider = injector.getInstance(ConnectorMetadataUpdaterProvider.class);
162163
ConnectorTypeSerdeProvider connectorTypeSerdeProvider = injector.getInstance(ConnectorTypeSerdeProvider.class);
164+
ConnectorSpecificCodecProvider connectorSpecificCodecProvider = injector.getInstance(ConnectorSpecificCodecProvider.class);
163165

164166
List<PropertyMetadata<?>> allSessionProperties = new ArrayList<>(hiveSessionProperties.getSessionProperties());
165167
allSessionProperties.addAll(hiveCommonSessionProperties.getSessionProperties());
@@ -182,7 +184,8 @@ public Connector create(String catalogName, Map<String, String> config, Connecto
182184
planOptimizerProvider,
183185
metadataUpdaterProvider,
184186
connectorTypeSerdeProvider,
185-
classLoader);
187+
classLoader,
188+
connectorSpecificCodecProvider);
186189
}
187190
catch (Exception e) {
188191
throwIfUnchecked(e);
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.hive;
15+
16+
import com.facebook.drift.codec.ThriftCodec;
17+
import com.facebook.drift.codec.ThriftCodecManager;
18+
import com.facebook.drift.protocol.TBinaryProtocol;
19+
import com.facebook.drift.protocol.TMemoryBuffer;
20+
import com.facebook.presto.spi.ConnectorSpecificCodec;
21+
import com.facebook.presto.spi.ConnectorSplit;
22+
23+
import static java.util.Objects.requireNonNull;
24+
25+
public class HiveSplitCodec
26+
implements ConnectorSpecificCodec<ConnectorSplit>
27+
{
28+
private final ThriftCodec<HiveSplit> thriftCodec;
29+
30+
public HiveSplitCodec(ThriftCodecManager thriftCodecManager)
31+
{
32+
this.thriftCodec = requireNonNull(thriftCodecManager, "thriftCodecManager is null").getCodec(HiveSplit.class);
33+
}
34+
35+
@Override
36+
public byte[] serialize(ConnectorSplit split)
37+
{
38+
HiveSplit hiveSplit = (HiveSplit) split;
39+
TMemoryBuffer transport = new TMemoryBuffer(1024);
40+
TBinaryProtocol protocol = new TBinaryProtocol(transport);
41+
try {
42+
thriftCodec.write(hiveSplit, protocol);
43+
return transport.getBytes();
44+
}
45+
catch (Exception e) {
46+
throw new RuntimeException(e);
47+
}
48+
}
49+
50+
@Override
51+
public ConnectorSplit deserialize(byte[] bytes)
52+
{
53+
try {
54+
TMemoryBuffer transport = new TMemoryBuffer(bytes.length);
55+
transport.write(bytes);
56+
TBinaryProtocol protocol = new TBinaryProtocol(transport);
57+
return thriftCodec.read(protocol);
58+
}
59+
catch (Exception e) {
60+
throw new RuntimeException(e);
61+
}
62+
}
63+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.hive;
15+
16+
import com.facebook.drift.codec.ThriftCodec;
17+
import com.facebook.drift.codec.ThriftCodecManager;
18+
import com.facebook.drift.protocol.TBinaryProtocol;
19+
import com.facebook.drift.protocol.TMemoryBuffer;
20+
import com.facebook.presto.spi.ConnectorSpecificCodec;
21+
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
22+
23+
import static java.util.Objects.requireNonNull;
24+
25+
public class HiveTransactionHandleCodec
26+
implements ConnectorSpecificCodec<ConnectorTransactionHandle>
27+
{
28+
private final ThriftCodec<HiveTransactionHandle> thriftCodec;
29+
30+
public HiveTransactionHandleCodec(ThriftCodecManager thriftCodecManager)
31+
{
32+
this.thriftCodec = requireNonNull(thriftCodecManager, "thriftCodecManager is null").getCodec(HiveTransactionHandle.class);
33+
}
34+
35+
@Override
36+
public byte[] serialize(ConnectorTransactionHandle transactionHandle)
37+
{
38+
HiveTransactionHandle hiveTransactionHandle = (HiveTransactionHandle) transactionHandle;
39+
TMemoryBuffer transport = new TMemoryBuffer(1024);
40+
TBinaryProtocol protocol = new TBinaryProtocol(transport);
41+
try {
42+
thriftCodec.write(hiveTransactionHandle, protocol);
43+
return transport.getBytes();
44+
}
45+
catch (Exception e) {
46+
throw new RuntimeException(e);
47+
}
48+
}
49+
50+
@Override
51+
public ConnectorTransactionHandle deserialize(byte[] bytes)
52+
{
53+
try {
54+
TMemoryBuffer transport = new TMemoryBuffer(bytes.length);
55+
transport.write(bytes);
56+
TBinaryProtocol protocol = new TBinaryProtocol(transport);
57+
return thriftCodec.read(protocol);
58+
}
59+
catch (Exception e) {
60+
throw new RuntimeException(e);
61+
}
62+
}
63+
}

0 commit comments

Comments
 (0)