Skip to content

Commit 59ec9fc

Browse files
committed
Implement a geoparquet writer
1 parent b622606 commit 59ec9fc

File tree

10 files changed

+364
-49
lines changed

10 files changed

+364
-49
lines changed

baremaps-benchmarking/src/main/java/org/apache/baremaps/benchmarking/geoparquet/OvertureMapsBenchmark.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,16 @@ public void setup() throws IOException {
8383
@SuppressWarnings({"squid:S1481", "squid:S2201"})
8484
@Benchmark
8585
public void read() {
86-
GeoParquetReader reader = new GeoParquetReader(directory.toUri());
86+
var path = new org.apache.hadoop.fs.Path(directory.toUri());
87+
GeoParquetReader reader = new GeoParquetReader(path);
8788
reader.read().count();
8889
}
8990

9091
@SuppressWarnings({"squid:S1481", "squid:S2201"})
9192
@Benchmark
9293
public void readParallel() {
93-
GeoParquetReader reader = new GeoParquetReader(directory.toUri());
94+
var path = new org.apache.hadoop.fs.Path(directory.toUri());
95+
GeoParquetReader reader = new GeoParquetReader(path);
9496
reader.readParallel().count();
9597
}
9698
}

baremaps-benchmarking/src/main/java/org/apache/baremaps/benchmarking/geoparquet/SmallFileBenchmark.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,17 @@ public void setup() throws IOException {
6161
@SuppressWarnings({"squid:S1481", "squid:S2201"})
6262
@Benchmark
6363
public void read() {
64-
GeoParquetReader reader =
65-
new GeoParquetReader(Path.of("baremaps-benchmarking/data/small/*.parquet").toUri());
64+
var path = new org.apache.hadoop.fs.Path("baremaps-benchmarking/data/small/*.parquet");
65+
GeoParquetReader reader = new GeoParquetReader(path);
6666
reader.read().count();
6767
}
6868

6969
@SuppressWarnings({"squid:S1481", "squid:S2201"})
7070
@Benchmark
7171
public void readParallel() {
72+
var path = new org.apache.hadoop.fs.Path("baremaps-benchmarking/data/small/*.parquet");
7273
GeoParquetReader reader =
73-
new GeoParquetReader(Path.of("baremaps-benchmarking/data/small/*.parquet").toUri());
74+
new GeoParquetReader(path);
7475
reader.readParallel().count();
7576
}
7677
}

baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.baremaps.data.storage.*;
2525
import org.apache.baremaps.geoparquet.GeoParquetException;
2626
import org.apache.baremaps.geoparquet.GeoParquetReader;
27+
import org.apache.hadoop.fs.Path;
2728

2829
public class GeoParquetDataTable implements DataTable {
2930

@@ -35,7 +36,7 @@ public class GeoParquetDataTable implements DataTable {
3536

3637
public GeoParquetDataTable(URI path) {
3738
this.path = path;
38-
this.reader = new GeoParquetReader(path);
39+
this.reader = new GeoParquetReader(new Path(path));
3940
}
4041

4142
@Override

baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroup.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public int getFieldRepetitionCount(int fieldIndex) {
105105
}
106106
}
107107

108-
private Object getValue(int fieldIndex, int index) {
108+
Object getValue(int fieldIndex, int index) {
109109
Object value = data[fieldIndex];
110110
if (value instanceof List<?>list) {
111111
return list.get(index);

baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.fasterxml.jackson.databind.DeserializationFeature;
2121
import com.fasterxml.jackson.databind.ObjectMapper;
2222
import java.io.IOException;
23-
import java.net.URI;
2423
import java.util.*;
2524
import java.util.concurrent.atomic.AtomicLong;
2625
import java.util.stream.Collectors;
@@ -53,31 +52,31 @@ public class GeoParquetReader {
5352
/**
5453
* Constructs a new {@code GeoParquetReader}.
5554
*
56-
* @param uri the URI to read from
55+
* @param path the path to read from
5756
*/
58-
public GeoParquetReader(URI uri) {
59-
this(uri, null, new Configuration());
57+
public GeoParquetReader(Path path) {
58+
this(path, null, new Configuration());
6059
}
6160

6261
/**
6362
* Constructs a new {@code GeoParquetReader}.
6463
*
65-
* @param uri the URI to read from
64+
* @param path the path to read from
6665
* @param envelope the envelope to filter records
6766
*/
68-
public GeoParquetReader(URI uri, Envelope envelope) {
69-
this(uri, envelope, new Configuration());
67+
public GeoParquetReader(Path path, Envelope envelope) {
68+
this(path, envelope, new Configuration());
7069
}
7170

7271
/**
7372
* Constructs a new {@code GeoParquetReader}.
7473
*
75-
* @param uri the URI to read from
74+
* @param path the path to read from
7675
* @param configuration the configuration
7776
*/
78-
public GeoParquetReader(URI uri, Envelope envelope, Configuration configuration) {
77+
public GeoParquetReader(Path path, Envelope envelope, Configuration configuration) {
7978
this.configuration = configuration;
80-
this.files = initializeFiles(uri, configuration);
79+
this.files = initializeFiles(path, configuration);
8180
this.envelope = envelope;
8281
}
8382

@@ -168,11 +167,10 @@ private FileInfo getFileInfo(FileStatus fileStatus) {
168167
}
169168
}
170169

171-
private static List<FileStatus> initializeFiles(URI uri, Configuration configuration) {
170+
private static List<FileStatus> initializeFiles(Path path, Configuration configuration) {
172171
try {
173-
Path globPath = new Path(uri.getPath());
174-
FileSystem fileSystem = FileSystem.get(uri, configuration);
175-
FileStatus[] fileStatuses = fileSystem.globStatus(globPath);
172+
FileSystem fileSystem = FileSystem.get(path.toUri(), configuration);
173+
FileStatus[] fileStatuses = fileSystem.globStatus(path);
176174
if (fileStatuses == null) {
177175
throw new GeoParquetException("No files found at the specified URI.");
178176
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.baremaps.geoparquet;
19+
20+
import com.fasterxml.jackson.core.JsonProcessingException;
21+
import com.fasterxml.jackson.databind.ObjectMapper;
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.parquet.hadoop.api.WriteSupport;
26+
import org.apache.parquet.io.api.Binary;
27+
import org.apache.parquet.io.api.RecordConsumer;
28+
import org.apache.parquet.schema.*;
29+
30+
/**
31+
* WriteSupport implementation for writing GeoParquetGroup instances to Parquet.
32+
*/
33+
public class GeoParquetWriteSupport extends WriteSupport<GeoParquetGroup> {
34+
35+
private RecordConsumer recordConsumer;
36+
private final MessageType schema;
37+
private final GeoParquetMetadata metadata;
38+
private final ObjectMapper objectMapper = new ObjectMapper();
39+
40+
/**
41+
* Constructs a new GeoParquetWriteSupport.
42+
*
43+
* @param schema the Parquet schema
44+
* @param metadata the GeoParquet metadata
45+
*/
46+
public GeoParquetWriteSupport(MessageType schema, GeoParquetMetadata metadata) {
47+
this.schema = schema;
48+
this.metadata = metadata;
49+
}
50+
51+
@Override
52+
public WriteContext init(Configuration configuration) {
53+
Map<String, String> extraMetadata = new HashMap<>();
54+
// Serialize the GeoParquet metadata to JSON and add it to the file metadata
55+
String geoMetadataJson = serializeMetadata(metadata);
56+
extraMetadata.put("geo", geoMetadataJson);
57+
58+
return new WriteContext(schema, extraMetadata);
59+
}
60+
61+
@Override
62+
public void prepareForWrite(RecordConsumer recordConsumer) {
63+
this.recordConsumer = recordConsumer;
64+
}
65+
66+
@Override
67+
public void write(GeoParquetGroup group) {
68+
writeGroup(group, schema);
69+
}
70+
71+
private void writeGroup(GeoParquetGroup group, GroupType groupType) {
72+
recordConsumer.startMessage();
73+
for (int i = 0; i < groupType.getFieldCount(); i++) {
74+
Type fieldType = groupType.getType(i);
75+
String fieldName = fieldType.getName();
76+
int repetitionCount = group.getFieldRepetitionCount(i);
77+
if (repetitionCount == 0) {
78+
continue; // Skip if no values are present
79+
}
80+
for (int j = 0; j < repetitionCount; j++) {
81+
recordConsumer.startField(fieldName, i);
82+
if (fieldType.isPrimitive()) {
83+
Object value = group.getValue(i, j);
84+
writePrimitive(value, fieldType.asPrimitiveType());
85+
} else {
86+
GeoParquetGroup childGroup = group.getGroup(i, j);
87+
writeGroup(childGroup, fieldType.asGroupType());
88+
}
89+
recordConsumer.endField(fieldName, i);
90+
}
91+
}
92+
recordConsumer.endMessage();
93+
}
94+
95+
private void writePrimitive(Object value, PrimitiveType primitiveType) {
96+
if (value == null) {
97+
// The Parquet format does not support writing null values directly.
98+
// If the field is optional and the value is null, we simply do not write it.
99+
return;
100+
}
101+
switch (primitiveType.getPrimitiveTypeName()) {
102+
case INT32:
103+
recordConsumer.addInteger((Integer) value);
104+
break;
105+
case INT64:
106+
recordConsumer.addLong((Long) value);
107+
break;
108+
case FLOAT:
109+
recordConsumer.addFloat((Float) value);
110+
break;
111+
case DOUBLE:
112+
recordConsumer.addDouble((Double) value);
113+
break;
114+
case BOOLEAN:
115+
recordConsumer.addBoolean((Boolean) value);
116+
break;
117+
case BINARY, FIXED_LEN_BYTE_ARRAY:
118+
recordConsumer.addBinary((Binary) value);
119+
break;
120+
default:
121+
throw new GeoParquetException(
122+
"Unsupported type: " + primitiveType.getPrimitiveTypeName());
123+
}
124+
}
125+
126+
private String serializeMetadata(GeoParquetMetadata metadata) {
127+
try {
128+
return objectMapper.writeValueAsString(metadata);
129+
} catch (JsonProcessingException e) {
130+
throw new RuntimeException("Failed to serialize GeoParquet metadata", e);
131+
}
132+
}
133+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.baremaps.geoparquet;
19+
20+
import java.io.IOException;
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.fs.Path;
23+
import org.apache.parquet.column.ParquetProperties.WriterVersion;
24+
import org.apache.parquet.hadoop.ParquetWriter;
25+
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
26+
import org.apache.parquet.schema.MessageType;
27+
28+
/**
29+
* A writer for GeoParquet files that writes GeoParquetGroup instances to a Parquet file.
30+
*/
31+
public class GeoParquetWriter implements AutoCloseable {
32+
33+
private final ParquetWriter<GeoParquetGroup> parquetWriter;
34+
35+
/**
36+
* Constructs a new GeoParquetWriter.
37+
*
38+
* @param outputFile the output file
39+
* @param schema the Parquet schema
40+
* @param metadata the GeoParquet metadata
41+
* @throws IOException if an I/O error occurs
42+
*/
43+
public GeoParquetWriter(Path outputFile, MessageType schema, GeoParquetMetadata metadata)
44+
throws IOException {
45+
this.parquetWriter = new ParquetWriter<>(
46+
outputFile,
47+
new GeoParquetWriteSupport(schema, metadata),
48+
CompressionCodecName.UNCOMPRESSED,
49+
ParquetWriter.DEFAULT_BLOCK_SIZE,
50+
ParquetWriter.DEFAULT_PAGE_SIZE,
51+
ParquetWriter.DEFAULT_PAGE_SIZE,
52+
ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,
53+
ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
54+
WriterVersion.PARQUET_2_0,
55+
new Configuration());
56+
}
57+
58+
/**
59+
* Writes a GeoParquetGroup to the Parquet file.
60+
*
61+
* @param group the GeoParquetGroup to write
62+
* @throws IOException if an I/O error occurs
63+
*/
64+
public void write(GeoParquetGroup group) throws IOException {
65+
parquetWriter.write(group);
66+
}
67+
68+
/**
69+
* Closes the writer and releases any system resources associated with it.
70+
*
71+
* @throws IOException if an I/O error occurs
72+
*/
73+
public void close() throws IOException {
74+
parquetWriter.close();
75+
}
76+
}

baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,45 +19,45 @@
1919

2020
import static org.junit.jupiter.api.Assertions.*;
2121

22-
import java.net.URI;
2322
import org.apache.baremaps.testing.TestFiles;
23+
import org.apache.hadoop.fs.Path;
2424
import org.junit.jupiter.api.Test;
2525
import org.locationtech.jts.geom.Envelope;
2626

2727
class GeoParquetReaderTest {
2828

2929
@Test
3030
void read() {
31-
URI geoParquet = TestFiles.GEOPARQUET.toUri();
31+
Path geoParquet = new Path(TestFiles.GEOPARQUET.toUri());
3232
GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet);
3333
assertEquals(5, geoParquetReader.read().count());
3434
}
3535

3636
@Test
3737
void readFiltered() {
38-
URI geoParquet = TestFiles.GEOPARQUET.toUri();
38+
Path geoParquet = new Path(TestFiles.GEOPARQUET.toUri());
3939
GeoParquetReader geoParquetReader =
4040
new GeoParquetReader(geoParquet, new Envelope(-172, -65, 18, 72));
4141
assertEquals(1, geoParquetReader.read().count());
4242
}
4343

4444
@Test
4545
void size() {
46-
URI geoParquet = TestFiles.GEOPARQUET.toUri();
46+
Path geoParquet = new Path(TestFiles.GEOPARQUET.toUri());
4747
GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet);
4848
assertEquals(5, geoParquetReader.size());
4949
}
5050

5151
@Test
5252
void count() {
53-
URI geoParquet = TestFiles.GEOPARQUET.toUri();
53+
Path geoParquet = new Path(TestFiles.GEOPARQUET.toUri());
5454
GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet);
5555
assertEquals(5, geoParquetReader.read().count());
5656
}
5757

5858
@Test
5959
void validateSchemas() {
60-
URI geoParquet = TestFiles.GEOPARQUET.toUri();
60+
Path geoParquet = new Path(TestFiles.GEOPARQUET.toUri());
6161
GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet);
6262
assertTrue(geoParquetReader.validateSchemasAreIdentical());
6363
}

0 commit comments

Comments
 (0)