Skip to content

Commit 98b0b3f

Browse files
chore: make FlatRecordTraversalNode concurrent safe
1 parent a7140bd commit 98b0b3f

File tree

7 files changed

+480
-315
lines changed

7 files changed

+480
-315
lines changed
Lines changed: 342 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,342 @@
1+
package com.netflix.hollow.core.write.objectmapper.flatrecords;
2+
3+
import com.netflix.hollow.core.memory.encoding.VarInt;
4+
import com.netflix.hollow.core.memory.encoding.ZigZag;
5+
import com.netflix.hollow.core.schema.HollowObjectSchema;
6+
import com.netflix.hollow.core.schema.HollowSchema;
7+
import com.netflix.hollow.core.util.IntList;
8+
import com.netflix.hollow.core.write.HollowObjectWriteRecord;
9+
10+
public class FlatRecordOrdinalReader {
11+
private final FlatRecord record;
12+
private final IntList ordinalOffsets = new IntList();
13+
14+
public FlatRecordOrdinalReader(FlatRecord record) {
15+
this.record = record;
16+
populateOrdinalOffset();
17+
}
18+
19+
private void populateOrdinalOffset() {
20+
int offset = record.dataStartByte;
21+
while (offset < record.dataEndByte) {
22+
ordinalOffsets.add(offset);
23+
offset += sizeOfOrdinal(ordinalOffsets.size() - 1);
24+
}
25+
}
26+
27+
private int getOrdinalOffset(int ordinal) {
28+
return ordinalOffsets.get(ordinal);
29+
}
30+
31+
public int getOrdinalCount() {
32+
return ordinalOffsets.size();
33+
}
34+
35+
public HollowSchema readSchema(int ordinal) {
36+
int schemaId = VarInt.readVInt(record.data, getOrdinalOffset(ordinal));
37+
return record.schemaIdMapper.getSchema(schemaId);
38+
}
39+
40+
public int readSize(int ordinal) {
41+
int offset = getOrdinalOffset(ordinal);
42+
43+
int schemaId = VarInt.readVInt(record.data, offset);
44+
offset += VarInt.sizeOfVInt(schemaId);
45+
46+
HollowSchema schema = record.schemaIdMapper.getSchema(schemaId);
47+
if (schema.getSchemaType() != HollowSchema.SchemaType.LIST &&
48+
schema.getSchemaType() != HollowSchema.SchemaType.SET &&
49+
schema.getSchemaType() != HollowSchema.SchemaType.MAP) {
50+
throw new IllegalArgumentException(String.format("Ordinal %d is not a LIST, SET, or MAP type (found %s)", ordinal, schema.getSchemaType()));
51+
}
52+
53+
return VarInt.readVInt(record.data, offset);
54+
}
55+
56+
public void readListElementsInto(int ordinal, int[] elements) {
57+
int offset = getOrdinalOffset(ordinal);
58+
59+
int schemaId = VarInt.readVInt(record.data, offset);
60+
offset += VarInt.sizeOfVInt(schemaId);
61+
62+
HollowSchema schema = record.schemaIdMapper.getSchema(schemaId);
63+
if (schema.getSchemaType() != HollowSchema.SchemaType.LIST) {
64+
throw new IllegalArgumentException(String.format("Ordinal %d is not a LIST type (found %s)", ordinal, schema.getSchemaType()));
65+
}
66+
67+
int size = VarInt.readVInt(record.data, offset);
68+
offset += VarInt.sizeOfVInt(size);
69+
70+
for (int i = 0; i < size; i++) {
71+
elements[i] = VarInt.readVInt(record.data, offset);
72+
offset += VarInt.sizeOfVInt(elements[i]);
73+
}
74+
}
75+
76+
public void readSetElementsInto(int ordinal, int[] elements) {
77+
int offset = getOrdinalOffset(ordinal);
78+
79+
int schemaId = VarInt.readVInt(record.data, offset);
80+
offset += VarInt.sizeOfVInt(schemaId);
81+
82+
HollowSchema schema = record.schemaIdMapper.getSchema(schemaId);
83+
if (schema.getSchemaType() != HollowSchema.SchemaType.SET) {
84+
throw new IllegalArgumentException(String.format("Ordinal %d is not a SET type (found %s)", ordinal, schema.getSchemaType()));
85+
}
86+
87+
int size = VarInt.readVInt(record.data, offset);
88+
offset += VarInt.sizeOfVInt(size);
89+
90+
int elementOrdinal = 0;
91+
for (int i = 0; i < size; i++) {
92+
int elementOrdinalDelta = VarInt.readVInt(record.data, offset);
93+
offset += VarInt.sizeOfVInt(elementOrdinalDelta);
94+
elementOrdinal += elementOrdinalDelta;
95+
elements[i] = elementOrdinal;
96+
}
97+
}
98+
99+
public void readMapElementsInto(int ordinal, int[] keys, int[] values) {
100+
int offset = getOrdinalOffset(ordinal);
101+
102+
int schemaId = VarInt.readVInt(record.data, offset);
103+
offset += VarInt.sizeOfVInt(schemaId);
104+
105+
HollowSchema schema = record.schemaIdMapper.getSchema(schemaId);
106+
if (schema.getSchemaType() != HollowSchema.SchemaType.MAP) {
107+
throw new IllegalArgumentException(String.format("Ordinal %d is not a MAP type (found %s)", ordinal, schema.getSchemaType()));
108+
}
109+
110+
int size = VarInt.readVInt(record.data, offset);
111+
offset += VarInt.sizeOfVInt(size);
112+
113+
int keyOrdinal = 0;
114+
for (int i = 0; i < size; i++) {
115+
int keyOrdinalDelta = VarInt.readVInt(record.data, offset);
116+
offset += VarInt.sizeOfVInt(keyOrdinalDelta);
117+
keyOrdinal += keyOrdinalDelta;
118+
keys[i] = keyOrdinal;
119+
values[i] = VarInt.readVInt(record.data, offset);
120+
offset += VarInt.sizeOfVInt(values[i]);
121+
}
122+
}
123+
124+
public int readFieldReference(int ordinal, String field) {
125+
int offset = skipToField(ordinal, HollowObjectSchema.FieldType.REFERENCE, field);
126+
if (offset == -1) {
127+
return -1;
128+
}
129+
130+
if (VarInt.readVNull(record.data, offset)) {
131+
return -1;
132+
}
133+
134+
return VarInt.readVInt(record.data, offset);
135+
}
136+
137+
public Boolean readFieldBoolean(int ordinal, String field) {
138+
int offset = skipToField(ordinal, HollowObjectSchema.FieldType.BOOLEAN, field);
139+
if (offset == -1) {
140+
return null;
141+
}
142+
143+
if (VarInt.readVNull(record.data, offset)) {
144+
return null;
145+
}
146+
147+
int value = record.data.get(offset);
148+
return value == 1 ? Boolean.TRUE : Boolean.FALSE;
149+
}
150+
151+
public int readFieldInt(int ordinal, String field) {
152+
int offset = skipToField(ordinal, HollowObjectSchema.FieldType.INT, field);
153+
if (offset == -1) {
154+
return Integer.MIN_VALUE;
155+
}
156+
157+
if (VarInt.readVNull(record.data, offset)) {
158+
return Integer.MIN_VALUE;
159+
}
160+
161+
int value = VarInt.readVInt(record.data, offset);
162+
return ZigZag.decodeInt(value);
163+
}
164+
165+
public long readFieldLong(int ordinal, String field) {
166+
int offset = skipToField(ordinal, HollowObjectSchema.FieldType.LONG, field);
167+
if (offset == -1) {
168+
return Long.MIN_VALUE;
169+
}
170+
171+
if (VarInt.readVNull(record.data, offset)) {
172+
return Long.MIN_VALUE;
173+
}
174+
175+
long value = VarInt.readVLong(record.data, offset);
176+
return ZigZag.decodeLong(value);
177+
}
178+
179+
public float readFieldFloat(int ordinal, String field) {
180+
int offset = skipToField(ordinal, HollowObjectSchema.FieldType.FLOAT, field);
181+
if (offset == -1) {
182+
return Float.NaN;
183+
}
184+
185+
int value = record.data.readIntBits(offset);
186+
if (value == HollowObjectWriteRecord.NULL_FLOAT_BITS) {
187+
return Float.NaN;
188+
}
189+
190+
return Float.intBitsToFloat(value);
191+
}
192+
193+
public double readFieldDouble(int ordinal, String field) {
194+
int offset = skipToField(ordinal, HollowObjectSchema.FieldType.DOUBLE, field);
195+
if (offset == -1) {
196+
return Double.NaN;
197+
}
198+
199+
long value = record.data.readLongBits(offset);
200+
if (value == HollowObjectWriteRecord.NULL_DOUBLE_BITS) {
201+
return Double.NaN;
202+
}
203+
204+
return Double.longBitsToDouble(value);
205+
}
206+
207+
public String readFieldString(int ordinal, String field) {
208+
int offset = skipToField(ordinal, HollowObjectSchema.FieldType.STRING, field);
209+
if (offset == -1) {
210+
return null;
211+
}
212+
213+
if (VarInt.readVNull(record.data, offset)) {
214+
return null;
215+
}
216+
217+
int length = VarInt.readVInt(record.data, offset);
218+
offset += VarInt.sizeOfVInt(length);
219+
220+
int cLength = VarInt.countVarIntsInRange(record.data, offset, length);
221+
char[] s = new char[cLength];
222+
for (int i = 0; i < cLength; i++) {
223+
int charValue = VarInt.readVInt(record.data, offset);
224+
s[i] = (char) charValue;
225+
offset += VarInt.sizeOfVInt(charValue);
226+
}
227+
228+
return new String(s);
229+
}
230+
231+
public byte[] readFieldBytes(int ordinal, String field) {
232+
int offset = skipToField(ordinal, HollowObjectSchema.FieldType.BYTES, field);
233+
if (offset == -1) {
234+
return null;
235+
}
236+
237+
if (VarInt.readVNull(record.data, offset)) {
238+
return null;
239+
}
240+
241+
int length = VarInt.readVInt(record.data, offset);
242+
offset += VarInt.sizeOfVInt(length);
243+
244+
byte[] b = new byte[length];
245+
for (int i = 0; i < length; i++) {
246+
b[i] = record.data.get(offset++);
247+
}
248+
249+
return b;
250+
}
251+
252+
private int skipToField(int ordinal, HollowObjectSchema.FieldType fieldType, String field) {
253+
int offset = getOrdinalOffset(ordinal);
254+
255+
int schemaId = VarInt.readVInt(record.data, offset);
256+
offset += VarInt.sizeOfVInt(schemaId);
257+
258+
HollowSchema schema = record.schemaIdMapper.getSchema(schemaId);
259+
if (schema.getSchemaType() != HollowSchema.SchemaType.OBJECT) {
260+
throw new IllegalArgumentException(String.format("Ordinal %d is not an OBJECT type (found %s)", ordinal, schema.getSchemaType()));
261+
}
262+
HollowObjectSchema objectSchema = (HollowObjectSchema) schema;
263+
264+
int fieldIndex = objectSchema.getPosition(field);
265+
if (fieldIndex == -1) {
266+
return -1;
267+
}
268+
269+
if (fieldType != objectSchema.getFieldType(fieldIndex)) {
270+
throw new IllegalArgumentException(String.format("Field %s is not of type %s", field, fieldType));
271+
}
272+
273+
for (int i = 0; i < fieldIndex; i++) {
274+
offset += sizeOfFieldValue(objectSchema.getFieldType(i), offset);
275+
}
276+
277+
return offset;
278+
}
279+
280+
private int sizeOfOrdinal(int ordinal) {
281+
int offset = getOrdinalOffset(ordinal);
282+
int start = offset;
283+
284+
int schemaId = VarInt.readVInt(record.data, offset);
285+
offset += VarInt.sizeOfVInt(schemaId);
286+
287+
HollowSchema schema = record.schemaIdMapper.getSchema(schemaId);
288+
switch (schema.getSchemaType()) {
289+
case OBJECT: {
290+
HollowObjectSchema objectSchema = (HollowObjectSchema) schema;
291+
for (int i = 0; i < objectSchema.numFields(); i++) {
292+
offset += sizeOfFieldValue(objectSchema.getFieldType(i), offset);
293+
}
294+
break;
295+
}
296+
case LIST:
297+
case SET: {
298+
int size = VarInt.readVInt(record.data, offset);
299+
offset += VarInt.sizeOfVInt(size);
300+
for (int i = 0; i < size; i++) {
301+
offset += VarInt.nextVLongSize(record.data, offset);
302+
}
303+
break;
304+
}
305+
case MAP: {
306+
int size = VarInt.readVInt(record.data, offset);
307+
offset += VarInt.sizeOfVInt(size);
308+
for (int i = 0; i < size; i++) {
309+
offset += VarInt.nextVLongSize(record.data, offset); // key
310+
offset += VarInt.nextVLongSize(record.data, offset); // value
311+
}
312+
break;
313+
}
314+
}
315+
316+
return offset - start;
317+
}
318+
319+
private int sizeOfFieldValue(HollowObjectSchema.FieldType fieldType, int offset) {
320+
switch (fieldType) {
321+
case INT:
322+
case LONG:
323+
case REFERENCE:
324+
return VarInt.nextVLongSize(record.data, offset);
325+
case BYTES:
326+
case STRING:
327+
if (VarInt.readVNull(record.data, offset)) {
328+
return 1;
329+
}
330+
int fieldLength = VarInt.readVInt(record.data, offset);
331+
return VarInt.sizeOfVInt(fieldLength) + fieldLength;
332+
case BOOLEAN:
333+
return 1;
334+
case DOUBLE:
335+
return 8;
336+
case FLOAT:
337+
return 4;
338+
default:
339+
throw new IllegalArgumentException("Unsupported field type: " + fieldType);
340+
}
341+
}
342+
}

0 commit comments

Comments
 (0)