Skip to content

Commit ad5dd96

Browse files
committed
Fix boolean column handling by unpacking bits and adjusting decompression size
1 parent 3adec85 commit ad5dd96

File tree

1 file changed

+129
-82
lines changed

1 file changed

+129
-82
lines changed

modules/rntuple.mjs

Lines changed: 129 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,8 @@ function getTypeByteSize(coltype) {
181181
case ENTupleColumnType.kByte:
182182
case ENTupleColumnType.kChar:
183183
return 1;
184+
case ENTupleColumnType.kBit:
185+
return 1/8;
184186
default:
185187
throw new Error(`Unsupported coltype for byte size: ${coltype} (0x${coltype.toString(16).padStart(2, '0')})`);
186188
}
@@ -747,87 +749,108 @@ class RNTupleDescriptorBuilder {
747749
this.pageLocations = clusterPageLocations;
748750
}
749751

750-
// Example Of Deserializing Page Content
751-
deserializePage(blob, columnDescriptor) {
752-
const originalColtype = columnDescriptor.coltype,
753-
{ coltype } = recontructUnsplitBuffer(blob, columnDescriptor);
754-
let { blob: processedBlob } = recontructUnsplitBuffer(blob, columnDescriptor);
752+
// Example Of Deserializing Page Content
753+
deserializePage(blob, columnDescriptor) {
754+
const originalColtype = columnDescriptor.coltype,
755+
{ coltype } = recontructUnsplitBuffer(blob, columnDescriptor);
756+
let { blob: processedBlob } = recontructUnsplitBuffer(blob, columnDescriptor);
755757

756-
757-
// Handle split index types
758+
759+
// Handle split index types
758760
if (originalColtype === ENTupleColumnType.kSplitIndex32 || originalColtype=== ENTupleColumnType.kSplitIndex64) {
759-
const { blob: decodedArray } = DecodeDeltaIndex(processedBlob, coltype);
760-
processedBlob = decodedArray;
761-
}
761+
const { blob: decodedArray } = DecodeDeltaIndex(processedBlob, coltype);
762+
processedBlob = decodedArray;
763+
}
762764

763-
// Handle Split Signed Int types
764-
if (originalColtype === ENTupleColumnType.kSplitInt16 || originalColtype === ENTupleColumnType.kSplitInt32 || originalColtype === ENTupleColumnType.kSplitInt64) {
765-
const { blob: decodedArray } = decodeZigzag(processedBlob, coltype);
766-
processedBlob = decodedArray;
767-
}
765+
// Handle Split Signed Int types
766+
if (originalColtype === ENTupleColumnType.kSplitInt16 || originalColtype === ENTupleColumnType.kSplitInt32 || originalColtype === ENTupleColumnType.kSplitInt64) {
767+
const { blob: decodedArray } = decodeZigzag(processedBlob, coltype);
768+
processedBlob = decodedArray;
769+
}
768770

769-
const byteSize = getTypeByteSize(coltype),
770-
reader = new RBufferReader(processedBlob),
771-
values = [];
771+
const byteSize = getTypeByteSize(coltype),
772+
reader = new RBufferReader(processedBlob),
773+
values = [];
772774

773-
if (!byteSize)
774-
throw new Error('Invalid or unsupported column type: cannot determine byte size');
775+
if (!byteSize)
776+
throw new Error('Invalid or unsupported column type: cannot determine byte size');
775777

776-
const numValues = processedBlob.byteLength / byteSize;
778+
const numValues = processedBlob.byteLength / byteSize;
777779

778-
for (let i = 0; i < (numValues ?? processedBlob.byteLength); ++i) {
779-
let val;
780+
switch (coltype) {
781+
case ENTupleColumnType.kBit: {
782+
let bitCount = 0;
783+
const totalBitsInBuffer = processedBlob.byteLength * 8;
780784

781-
switch (coltype) {
782-
case ENTupleColumnType.kReal64:
783-
val = reader.readF64();
784-
break;
785-
case ENTupleColumnType.kReal32:
786-
val = reader.readF32();
787-
break;
788-
case ENTupleColumnType.kInt64:
789-
val = reader.readS64();
790-
break;
791-
case ENTupleColumnType.kUInt64:
792-
val = reader.readU64();
793-
break;
794-
case ENTupleColumnType.kInt32:
795-
val = reader.readS32();
796-
break;
797-
case ENTupleColumnType.kUInt32:
798-
val = reader.readU32();
799-
break;
800-
case ENTupleColumnType.kInt16:
801-
val = reader.readS16();
802-
break;
803-
case ENTupleColumnType.kUInt16:
804-
val = reader.readU16();
805-
break;
806-
case ENTupleColumnType.kInt8:
807-
val = reader.readS8();
808-
break;
809-
case ENTupleColumnType.kUInt8:
810-
case ENTupleColumnType.kByte:
811-
val = reader.readU8();
812-
break;
813-
case ENTupleColumnType.kChar:
814-
val = String.fromCharCode(reader.readS8());
815-
break;
816-
case ENTupleColumnType.kIndex32:
817-
val = reader.readS32();
818-
break;
819-
case ENTupleColumnType.kIndex64:
820-
val = reader.readS64();
821-
break;
822-
default:
823-
throw new Error(`Unsupported column type: ${columnDescriptor.coltype}`);
785+
for (let byteIndex = 0; byteIndex < processedBlob.byteLength; ++byteIndex) {
786+
const byte = reader.readU8();
787+
788+
// Extract 8 bits from this byte
789+
for (let bitPos = 0; bitPos < 8 && bitCount < totalBitsInBuffer; ++bitPos, ++bitCount) {
790+
const bitValue = (byte >>> bitPos) & 1,
791+
boolValue = bitValue === 1;
792+
values.push(boolValue);
793+
}
824794
}
825-
values.push(val);
795+
break;
826796
}
827797

828-
return values;
798+
default: {
799+
for (let i = 0; i < (numValues ?? processedBlob.byteLength); ++i) {
800+
let val;
801+
802+
switch (coltype) {
803+
case ENTupleColumnType.kReal64:
804+
val = reader.readF64();
805+
break;
806+
case ENTupleColumnType.kReal32:
807+
val = reader.readF32();
808+
break;
809+
case ENTupleColumnType.kInt64:
810+
val = reader.readS64();
811+
break;
812+
case ENTupleColumnType.kUInt64:
813+
val = reader.readU64();
814+
break;
815+
case ENTupleColumnType.kInt32:
816+
val = reader.readS32();
817+
break;
818+
case ENTupleColumnType.kUInt32:
819+
val = reader.readU32();
820+
break;
821+
case ENTupleColumnType.kInt16:
822+
val = reader.readS16();
823+
break;
824+
case ENTupleColumnType.kUInt16:
825+
val = reader.readU16();
826+
break;
827+
case ENTupleColumnType.kInt8:
828+
val = reader.readS8();
829+
break;
830+
case ENTupleColumnType.kUInt8:
831+
case ENTupleColumnType.kByte:
832+
val = reader.readU8();
833+
break;
834+
case ENTupleColumnType.kChar:
835+
val = String.fromCharCode(reader.readS8());
836+
break;
837+
case ENTupleColumnType.kIndex32:
838+
val = reader.readS32();
839+
break;
840+
case ENTupleColumnType.kIndex64:
841+
val = reader.readS64();
842+
break;
843+
default:
844+
throw new Error(`Unsupported column type: ${columnDescriptor.coltype}`);
845+
}
846+
values.push(val);
847+
}
848+
}
829849
}
830850

851+
return values;
852+
}
853+
831854
} // class RNTupleDescriptorBuilder
832855

833856

@@ -990,22 +1013,46 @@ function readNextCluster(rntuple, selector) {
9901013

9911014
// Build flat array of [offset, size, offset, size, ...] to read pages
9921015
const dataToRead = pages.flatMap(p =>
993-
[Number(p.page.locator.offset), Number(p.page.locator.size)]
994-
);
1016+
[Number(p.page.locator.offset), Number(p.page.locator.size)]
1017+
);
9951018

9961019
return rntuple.$file.readBuffer(dataToRead).then(blobsRaw => {
9971020
const blobs = Array.isArray(blobsRaw) ? blobsRaw : [blobsRaw],
998-
unzipPromises = blobs.map((blob, idx) => {
999-
const { page, colDesc } = pages[idx],
1021+
unzipPromises = blobs.map((blob, idx) => {
1022+
const { page, colDesc } = pages[idx],
10001023
colEntry = builder.pageLocations[clusterIndex][colDesc.index], // Access column entry
1001-
numElements = Number(page.numElements),
1002-
elementSize = colDesc.bitsOnStorage / 8;
1024+
numElements = Number(page.numElements),
1025+
elementSize = colDesc.bitsOnStorage / 8;
1026+
1027+
// Check if data is compressed
1028+
if (colEntry.compression === 0)
1029+
return Promise.resolve(blob); // Uncompressed: use blob directly
1030+
const expectedSize = numElements * elementSize;
1031+
1032+
// Special handling for boolean fields
1033+
if (colDesc.coltype === ENTupleColumnType.kBit) {
1034+
const expectedBoolSize = Math.ceil(numElements / 8);
1035+
if (blob.byteLength === expectedBoolSize)
1036+
return Promise.resolve(blob);
1037+
// Try decompression but catch errors for boolean fields
1038+
return R__unzip(blob, expectedBoolSize).catch(err => {
1039+
throw new Error(`Failed to unzip boolean page ${idx}: ${err.message}`);
1040+
});
1041+
}
10031042

1004-
// Check if data is compressed
1005-
if (colEntry.compression === 0)
1006-
return Promise.resolve(blob); // Uncompressed: use blob directly
1007-
return R__unzip(blob, numElements * elementSize);
1043+
// If the blob is already the expected size, treat as uncompressed
1044+
if (blob.byteLength === expectedSize)
1045+
return Promise.resolve(blob);
1046+
1047+
// Try decompression
1048+
return R__unzip(blob, expectedSize).then(result => {
1049+
if (!result)
1050+
return blob; // Fallback to original blob
1051+
return result;
1052+
}).catch(err => {
1053+
throw new Error(`Failed to unzip page ${idx}: ${err.message}`);
10081054
});
1055+
});
10091056

10101057
return Promise.all(unzipPromises).then(unzipBlobs => {
10111058
rntuple._clusterData = {}; // store deserialized data per field
@@ -1028,10 +1075,10 @@ function readNextCluster(rntuple, selector) {
10281075
// splitting string fields into offset and payload components
10291076
if (field.typeName === 'std::string') {
10301077
if (
1031-
colDesc.coltype === ENTupleColumnType.kIndex64 ||
1032-
colDesc.coltype === ENTupleColumnType.kIndex32 ||
1033-
colDesc.coltype === ENTupleColumnType.kSplitIndex64 ||
1034-
colDesc.coltype === ENTupleColumnType.kSplitIndex32
1078+
colDesc.coltype === ENTupleColumnType.kIndex64 ||
1079+
colDesc.coltype === ENTupleColumnType.kIndex32 ||
1080+
colDesc.coltype === ENTupleColumnType.kSplitIndex64 ||
1081+
colDesc.coltype === ENTupleColumnType.kSplitIndex32
10351082
) // Index64/Index32
10361083
rntuple._clusterData[field.fieldName][0] = values; // Offsets
10371084
else if (colDesc.coltype === ENTupleColumnType.kChar)

0 commit comments

Comments
 (0)