Skip to content

Commit f62dd9f

Browse files
committed
Resharding: Data elements split/join utils for list, set, and map types
1 parent a7140bd commit f62dd9f

File tree

46 files changed

+2134
-341
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+2134
-341
lines changed

docs/advanced-topics.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ The number of bits used to represent a field which is one of the types (`INT`, `
372372

373373
32 bits are used to represent a `FLOAT`, and 64 bits are used to represent a `DOUBLE`.
374374

375-
`STRING` and `BYTES` fields each get a separate byte array, into which the values for all records are packed. The fixed-length value in these fields are offsets into the field’s byte array where the record’s value ends. In order to determine the begin byte for the record with ordinal n, the offset encoded into the record with ordinal (n-1) is read. The number of fixed length bits used to represent the offsets is exactly equal to the number of number of bits required to represent the maximum offset, plus one.
375+
`STRING` and `BYTES` fields each get a separate byte array, into which the values for all records are packed. The fixed-length value in these fields are offsets into the field’s byte array where the record’s value ends. In order to determine the begin byte for the record with ordinal n, the offset encoded into the record with ordinal (n-1) is read. The number of fixed length bits used to represent the offsets is exactly equal to the number of bits required to represent the maximum offset, plus one.
376376

377377
Each field type may be assigned a null value. For `INT`, `LONG`, and `REFERENCE` fields, null is encoded as a value with all ones. For `FLOAT` and `DOUBLE` fields, null is encoded as special bit sequences. For `STRING` and `BYTES` fields, null is encoded by setting a designated null bit at the beginning of each field, followed by the end offset of the last populated value for that field.
378378

hollow-diff-ui/src/test/java/com/netflix/hollow/diffview/HollowDiffUIServerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ public class HollowDiffUIServerTest {
1010
public void test() throws Exception {
1111
HollowDiff testDiff = new FakeHollowDiffGenerator().createFakeDiff();
1212

13-
HollowDiffUIServer server = new HollowDiffUIServer();
13+
HollowDiffUIServer server = new HollowDiffUIServer(0);
1414

1515
server.addDiff("diff", testDiff);
1616

@@ -22,7 +22,7 @@ public void test() throws Exception {
2222
public void testBackwardsCompatibiltyWithJettyImplementation() throws Exception {
2323
HollowDiff testDiff = new FakeHollowDiffGenerator().createFakeDiff();
2424

25-
com.netflix.hollow.diff.ui.jetty.HollowDiffUIServer server = new com.netflix.hollow.diff.ui.jetty.HollowDiffUIServer();
25+
com.netflix.hollow.diff.ui.jetty.HollowDiffUIServer server = new com.netflix.hollow.diff.ui.jetty.HollowDiffUIServer(0);
2626

2727
server.addDiff("diff", testDiff);
2828

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.netflix.hollow.core.read.engine;
2+
3+
import com.netflix.hollow.core.memory.MemoryMode;
4+
import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;
5+
import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler;
6+
import com.netflix.hollow.core.read.HollowBlobInput;
7+
import com.netflix.hollow.core.read.engine.map.HollowMapTypeDataElements;
8+
import java.io.IOException;
9+
10+
public abstract class AbstractHollowTypeDataElements {
11+
12+
public int maxOrdinal;
13+
14+
public GapEncodedVariableLengthIntegerReader encodedAdditions;
15+
public GapEncodedVariableLengthIntegerReader encodedRemovals;
16+
17+
public final ArraySegmentRecycler memoryRecycler;
18+
public final MemoryMode memoryMode;
19+
20+
public AbstractHollowTypeDataElements(MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler) {
21+
this.memoryMode = memoryMode;
22+
this.memoryRecycler = memoryRecycler;
23+
}
24+
25+
public abstract void destroy();
26+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package com.netflix.hollow.core.read.engine;
2+
3+
import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;
4+
5+
public abstract class AbstractHollowTypeDataElementsJoiner <T extends AbstractHollowTypeDataElements> {
6+
public final int fromMask;
7+
public final int fromOrdinalShift;
8+
public final T[] from;
9+
10+
public T to;
11+
12+
public AbstractHollowTypeDataElementsJoiner(T[] from) {
13+
this.from = from;
14+
this.fromMask = from.length - 1;
15+
this.fromOrdinalShift = 31 - Integer.numberOfLeadingZeros(from.length);
16+
17+
if (from.length<=0 || !((from.length&(from.length-1))==0)) {
18+
throw new IllegalStateException("No. of DataElements to be joined must be a power of 2");
19+
}
20+
21+
for (int i=0;i<from.length;i++) {
22+
if (from[i].maxOrdinal == -1) {
23+
continue;
24+
}
25+
if (from[i].maxOrdinal > (1<<29)
26+
|| from[i].maxOrdinal != 0 && (from.length > (1<<29)/from[i].maxOrdinal)
27+
|| from[i].maxOrdinal * from.length + i > (1<<29)) {
28+
throw new IllegalArgumentException("Too large to join, maxOrdinal would exceed 2<<29");
29+
}
30+
}
31+
32+
for (AbstractHollowTypeDataElements elements : from) {
33+
if (elements.encodedAdditions != null) {
34+
throw new IllegalStateException("Encountered encodedAdditions in data elements joiner- this is not expected " +
35+
"since encodedAdditions only exist on delta data elements and they dont carry over to target data elements, " +
36+
"delta data elements are never split/joined");
37+
}
38+
}
39+
}
40+
41+
public T join() {
42+
43+
initToElements();
44+
to.maxOrdinal = -1;
45+
46+
populateStats();
47+
48+
copyRecords();
49+
50+
GapEncodedVariableLengthIntegerReader[] fromRemovals = new GapEncodedVariableLengthIntegerReader[from.length];
51+
for (int i=0;i<from.length;i++) {
52+
fromRemovals[i] = from[i].encodedRemovals;
53+
}
54+
to.encodedRemovals = GapEncodedVariableLengthIntegerReader.join(fromRemovals);
55+
56+
return to;
57+
}
58+
59+
/**
60+
* Initialize the target data elements.
61+
*/
62+
public abstract void initToElements();
63+
64+
/**
65+
* Populate the stats of the target data elements.
66+
*/
67+
public abstract void populateStats();
68+
69+
/**
70+
* Copy records from the source data elements to the target data elements.
71+
*/
72+
public abstract void copyRecords();
73+
74+
75+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package com.netflix.hollow.core.read.engine;
2+
3+
import com.netflix.hollow.core.memory.encoding.GapEncodedVariableLengthIntegerReader;
4+
5+
/**
6+
* Join multiple {@code HollowListTypeDataElements}s into 1 {@code HollowListTypeDataElements}.
7+
* Ordinals are remapped and corresponding data is copied over.
8+
* The original data elements are not destroyed.
9+
* The no. of passed data elements must be a power of 2.
10+
*/
11+
public abstract class AbstractHollowTypeDataElementsSplitter<T extends AbstractHollowTypeDataElements> {
12+
public final int numSplits;
13+
public final int toMask;
14+
public final int toOrdinalShift;
15+
public final T from;
16+
17+
public T[] to;
18+
19+
public AbstractHollowTypeDataElementsSplitter(T from, int numSplits) {
20+
this.from = from;
21+
this.numSplits = numSplits;
22+
this.toMask = numSplits - 1;
23+
this.toOrdinalShift = 31 - Integer.numberOfLeadingZeros(numSplits);
24+
25+
if (numSplits<=0 || !((numSplits&(numSplits-1))==0)) {
26+
throw new IllegalStateException("Must split by power of 2");
27+
}
28+
29+
if (from.encodedAdditions != null) {
30+
throw new IllegalStateException("Encountered encodedAdditions in data elements splitter- this is not expected " +
31+
"since encodedAdditions only exist on delta data elements and they dont carry over to target data elements, " +
32+
"delta data elements are never split/joined");
33+
}
34+
}
35+
36+
public T[] split() {
37+
38+
initToElements();
39+
for(int i=0;i<to.length;i++) {
40+
to[i].maxOrdinal = -1;
41+
}
42+
43+
populateStats();
44+
45+
copyRecords();
46+
47+
if (from.encodedRemovals != null) {
48+
GapEncodedVariableLengthIntegerReader[] splitRemovals = from.encodedRemovals.split(numSplits);
49+
for(int i=0;i<to.length;i++) {
50+
to[i].encodedRemovals = splitRemovals[i];
51+
}
52+
}
53+
54+
return to;
55+
}
56+
57+
/**
58+
* Initialize the target data elements.
59+
*/
60+
public abstract void initToElements();
61+
62+
/**
63+
* Populate the stats of the target data elements.
64+
*/
65+
public abstract void populateStats();
66+
67+
/**
68+
* Copy records from the source data elements to the target data elements.
69+
*/
70+
public abstract void copyRecords();
71+
72+
73+
}

hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListDeltaHistoricalStateCreator.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,11 @@ private void populateStats() {
100100
historicalDataElements.maxOrdinal = removedEntryCount - 1;
101101
historicalDataElements.totalNumberOfElements = totalElementCount;
102102
historicalDataElements.bitsPerListPointer = totalElementCount == 0 ? 1 : 64 - Long.numberOfLeadingZeros(totalElementCount);
103-
historicalDataElements.bitsPerElement = stateEngineDataElements[0].bitsPerElement;
104-
103+
for (int i=0;i<stateEngineDataElements.length;i++) {
104+
if (stateEngineDataElements[i].bitsPerElement > historicalDataElements.bitsPerElement) {
105+
historicalDataElements.bitsPerElement = stateEngineDataElements[i].bitsPerElement;
106+
}
107+
}
105108
ordinalMapping = new IntMap(removedEntryCount);
106109
}
107110

@@ -110,8 +113,8 @@ private void copyRecord(int ordinal) {
110113
int shardOrdinal = ordinal >> shardOrdinalShift;
111114

112115
long bitsPerElement = stateEngineDataElements[shard].bitsPerElement;
113-
long fromStartElement = shardOrdinal == 0 ? 0 : stateEngineDataElements[shard].listPointerData.getElementValue((long)(shardOrdinal - 1) * stateEngineDataElements[shard].bitsPerListPointer, stateEngineDataElements[shard].bitsPerListPointer);
114-
long fromEndElement = stateEngineDataElements[shard].listPointerData.getElementValue((long)shardOrdinal * stateEngineDataElements[shard].bitsPerListPointer, stateEngineDataElements[shard].bitsPerListPointer);
116+
long fromStartElement = stateEngineDataElements[shard].getStartElement(shardOrdinal);
117+
long fromEndElement = stateEngineDataElements[shard].getEndElement(shardOrdinal);
115118
long size = fromEndElement - fromStartElement;
116119

117120
historicalDataElements.elementData.copyBits(stateEngineDataElements[shard].elementData, fromStartElement * bitsPerElement, nextStartElement * bitsPerElement, size * bitsPerElement);

hollow/src/main/java/com/netflix/hollow/core/read/engine/list/HollowListTypeDataElements.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.netflix.hollow.core.memory.encoding.VarInt;
2424
import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler;
2525
import com.netflix.hollow.core.read.HollowBlobInput;
26+
import com.netflix.hollow.core.read.engine.AbstractHollowTypeDataElements;
2627
import java.io.IOException;
2728

2829
/**
@@ -31,30 +32,21 @@
3132
* During a delta, the HollowListTypeReadState will create a new HollowListTypeDataElements and atomically swap
3233
* with the existing one to make sure a consistent view of the data is always available.
3334
*/
34-
public class HollowListTypeDataElements {
35-
36-
int maxOrdinal;
35+
public class HollowListTypeDataElements extends AbstractHollowTypeDataElements {
3736

3837
FixedLengthData listPointerData;
3938
FixedLengthData elementData;
4039

41-
GapEncodedVariableLengthIntegerReader encodedAdditions;
42-
GapEncodedVariableLengthIntegerReader encodedRemovals;
43-
4440
int bitsPerListPointer;
45-
int bitsPerElement;
41+
int bitsPerElement = 0;
4642
long totalNumberOfElements = 0;
4743

48-
final ArraySegmentRecycler memoryRecycler;
49-
final MemoryMode memoryMode;
50-
5144
public HollowListTypeDataElements(ArraySegmentRecycler memoryRecycler) {
5245
this(MemoryMode.ON_HEAP, memoryRecycler);
5346
}
5447

5548
public HollowListTypeDataElements(MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler) {
56-
this.memoryMode = memoryMode;
57-
this.memoryRecycler = memoryRecycler;
49+
super(memoryMode, memoryRecycler);
5850
}
5951

6052
void readSnapshot(HollowBlobInput in) throws IOException {
@@ -109,9 +101,31 @@ public void applyDelta(HollowListTypeDataElements fromData, HollowListTypeDataEl
109101
new HollowListDeltaApplicator(fromData, deltaData, this).applyDelta();
110102
}
111103

104+
@Override
112105
public void destroy() {
113106
FixedLengthDataFactory.destroy(listPointerData, memoryRecycler);
114107
FixedLengthDataFactory.destroy(elementData, memoryRecycler);
115108
}
116109

110+
long getStartElement(int ordinal) {
111+
return ordinal == 0 ? 0 : listPointerData.getElementValue(((long)(ordinal-1) * bitsPerListPointer), bitsPerListPointer);
112+
}
113+
114+
long getEndElement(int ordinal) {
115+
return listPointerData.getElementValue((long)ordinal * bitsPerListPointer, bitsPerListPointer);
116+
}
117+
118+
void copyElementsFrom(long startElement, HollowListTypeDataElements src, long srcStartElement, long srcEndElement) {
119+
if (bitsPerElement == src.bitsPerElement) {
120+
// fast path can bulk copy elements
121+
long numElements = srcEndElement - srcStartElement;
122+
elementData.copyBits(src.elementData, srcStartElement * bitsPerElement, startElement * bitsPerElement, numElements * bitsPerElement);
123+
} else {
124+
for (long element=srcStartElement;element<srcEndElement;element++) {
125+
long elementVal = src.elementData.getElementValue(element * src.bitsPerElement, src.bitsPerElement);
126+
elementData.setElementValue(startElement * bitsPerElement, bitsPerElement, elementVal);
127+
startElement++;
128+
}
129+
}
130+
}
117131
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package com.netflix.hollow.core.read.engine.list;
2+
3+
import com.netflix.hollow.core.memory.FixedLengthDataFactory;
4+
import com.netflix.hollow.core.read.engine.AbstractHollowTypeDataElementsJoiner;
5+
6+
7+
/**
8+
* Join multiple {@code HollowListTypeDataElements}s into 1 {@code HollowListTypeDataElements}.
9+
* Ordinals are remapped and corresponding data is copied over.
10+
* The original data elements are not destroyed.
11+
* The no. of passed data elements must be a power of 2.
12+
*/
13+
class HollowListTypeDataElementsJoiner extends AbstractHollowTypeDataElementsJoiner<HollowListTypeDataElements> {
14+
15+
public HollowListTypeDataElementsJoiner(HollowListTypeDataElements[] from) {
16+
super(from);
17+
}
18+
19+
@Override
20+
public void initToElements() {
21+
this.to = new HollowListTypeDataElements(from[0].memoryMode, from[0].memoryRecycler);
22+
}
23+
24+
@Override
25+
public void populateStats() {
26+
for(int fromIndex=0;fromIndex<from.length;fromIndex++) {
27+
int mappedMaxOrdinal = from[fromIndex].maxOrdinal == -1 ? -1 : (from[fromIndex].maxOrdinal * from.length) + fromIndex;
28+
to.maxOrdinal = Math.max(to.maxOrdinal, mappedMaxOrdinal);
29+
if (from[fromIndex].bitsPerElement > to.bitsPerElement) {
30+
// uneven bitsPerElement could be the case for consumers that skip type shards with no additions, so pick max across all shards
31+
to.bitsPerElement = from[fromIndex].bitsPerElement;
32+
}
33+
}
34+
35+
long totalOfListSizes = 0;
36+
for(int ordinal=0;ordinal<=to.maxOrdinal;ordinal++) {
37+
int fromIndex = ordinal & fromMask;
38+
int fromOrdinal = ordinal >> fromOrdinalShift;
39+
40+
long startElement = from[fromIndex].getStartElement(fromOrdinal);
41+
long endElement = from[fromIndex].getEndElement(fromOrdinal);
42+
long numElements = endElement - startElement;
43+
totalOfListSizes += numElements;
44+
45+
}
46+
to.bitsPerListPointer = totalOfListSizes == 0 ? 1 : 64 - Long.numberOfLeadingZeros(totalOfListSizes);
47+
to.totalNumberOfElements = totalOfListSizes;
48+
}
49+
50+
@Override
51+
public void copyRecords() {
52+
long elementCounter = 0;
53+
54+
to.listPointerData = FixedLengthDataFactory.get((long)to.bitsPerListPointer * (to.maxOrdinal + 1), to.memoryMode, to.memoryRecycler);
55+
to.elementData = FixedLengthDataFactory.get(to.bitsPerElement * to.totalNumberOfElements, to.memoryMode, to.memoryRecycler);
56+
57+
for(int ordinal=0;ordinal<=to.maxOrdinal;ordinal++) {
58+
int fromIndex = ordinal & fromMask;
59+
int fromOrdinal = ordinal >> fromOrdinalShift;
60+
61+
if (fromOrdinal <= from[fromIndex].maxOrdinal) { // else lopsided shard for e.g. when consumers skip type shards with no additions
62+
HollowListTypeDataElements source = from[fromIndex];
63+
long startElement = source.getStartElement(fromOrdinal);
64+
long endElement = source.getEndElement(fromOrdinal);
65+
66+
long numElements = endElement - startElement;
67+
to.copyElementsFrom(elementCounter, source, startElement, endElement);
68+
elementCounter += numElements;
69+
}
70+
to.listPointerData.setElementValue((long)to.bitsPerListPointer * ordinal, to.bitsPerListPointer, elementCounter);
71+
}
72+
}
73+
}

0 commit comments

Comments
 (0)