Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 52 additions & 20 deletions modules/rntuple.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ function getTypeByteSize(coltype) {
case ENTupleColumnType.kSplitInt64:
case ENTupleColumnType.kSplitUInt64:
case ENTupleColumnType.kSplitReal64:
case ENTupleColumnType.kSplitIndex64:
return 8;

case ENTupleColumnType.kReal32:
Expand All @@ -168,7 +169,6 @@ function getTypeByteSize(coltype) {
case ENTupleColumnType.kSplitUInt32:
case ENTupleColumnType.kSplitReal32:
case ENTupleColumnType.kSplitIndex32:
case ENTupleColumnType.kSplitIndex64:
return 4;
case ENTupleColumnType.kInt16:
case ENTupleColumnType.kUInt16:
Expand Down Expand Up @@ -198,7 +198,9 @@ function recontructUnsplitBuffer(blob, columnDescriptor) {
coltype === ENTupleColumnType.kSplitUInt64 ||
coltype === ENTupleColumnType.kSplitReal16 ||
coltype === ENTupleColumnType.kSplitReal32 ||
coltype === ENTupleColumnType.kSplitReal64
coltype === ENTupleColumnType.kSplitReal64 ||
coltype === ENTupleColumnType.kSplitIndex32 ||
coltype === ENTupleColumnType.kSplitIndex64
) {
const byteSize = getTypeByteSize(coltype),
splitView = new DataView(blob.buffer, blob.byteOffset, blob.byteLength),
Expand Down Expand Up @@ -255,6 +257,29 @@ function recontructUnsplitBuffer(blob, columnDescriptor) {
}


/**
* @summary Decode a reconstructed index buffer (32- or 64-bit deltas to absolute indices)
*/
function DecodeDeltaIndex(blob, coltype) {
let deltas, result;

if (coltype === ENTupleColumnType.kIndex32) {
deltas = new Int32Array(blob.buffer || blob, blob.byteOffset || 0, blob.byteLength / 4);
result = new Int32Array(deltas.length);
} else if (coltype === ENTupleColumnType.kIndex64) {
deltas = new BigInt64Array(blob.buffer || blob, blob.byteOffset || 0, blob.byteLength / 8);
result = new BigInt64Array(deltas.length);
} else
throw new Error(`DecodeDeltaIndex: unsupported column type ${coltype}`);

if (deltas.length > 0) result[0] = deltas[0];
for (let i = 1; i < deltas.length; ++i)
result[i] = result[i - 1] + deltas[i];

return { blob: result, coltype };
}


// Envelope Types
// TODO: Define usage logic for envelope types in future
// const kEnvelopeTypeHeader = 0x01,
Expand Down Expand Up @@ -686,13 +711,20 @@ class RNTupleDescriptorBuilder {

// Example Of Deserializing Page Content
deserializePage(blob, columnDescriptor) {
const {
blob: processedBlob,
coltype
} = recontructUnsplitBuffer(blob, columnDescriptor),
byteSize = getTypeByteSize(coltype),
reader = new RBufferReader(processedBlob),
values = [];
const originalColtype = columnDescriptor.coltype,
{ coltype } = recontructUnsplitBuffer(blob, columnDescriptor);
let { blob: processedBlob } = recontructUnsplitBuffer(blob, columnDescriptor);


// Handle split index types
if (originalColtype === ENTupleColumnType.kSplitIndex32 || originalColtype=== ENTupleColumnType.kSplitIndex64) {
const { blob: decodedArray } = DecodeDeltaIndex(processedBlob, coltype);
processedBlob = decodedArray;
}

const byteSize = getTypeByteSize(coltype),
reader = new RBufferReader(processedBlob),
values = [];

if (!byteSize)
throw new Error('Invalid or unsupported column type: cannot determine byte size');
Expand All @@ -710,20 +742,19 @@ class RNTupleDescriptorBuilder {
val = reader.readF32();
break;
case ENTupleColumnType.kInt64:
val = reader.readI64();
val = reader.readS64();
break;
case ENTupleColumnType.kUInt64:
val = reader.readU64();
break;
case ENTupleColumnType.kInt32:
case ENTupleColumnType.kIndex32:
val = reader.readU32();
val = reader.readS32();
break;
case ENTupleColumnType.kUInt32:
val = reader.readU32();
break;
case ENTupleColumnType.kInt16:
val = reader.readI16();
val = reader.readS16();
break;
case ENTupleColumnType.kUInt16:
val = reader.readU16();
Expand All @@ -733,18 +764,14 @@ class RNTupleDescriptorBuilder {
break;
case ENTupleColumnType.kUInt8:
case ENTupleColumnType.kByte:
case ENTupleColumnType.kByteArray:
case ENTupleColumnType.kIndexArrayU8:
val = reader.readU8();
break;
case ENTupleColumnType.kChar:
val = String.fromCharCode(reader.readS8());
break;
case ENTupleColumnType.kIndex32:
case ENTupleColumnType.kIndex64:
val = reader.readU64();
break;
case ENTupleColumnType.kSplitIndex64:
val = reader.readU32();
val = processedBlob[i];
break;
default:
throw new Error(`Unsupported column type: ${columnDescriptor.coltype}`);
Expand Down Expand Up @@ -954,7 +981,12 @@ function readNextCluster(rntuple, selector) {

// splitting string fields into offset and payload components
if (field.typeName === 'std::string') {
if (colDesc.coltype === ENTupleColumnType.kIndex64) // Index64/Index32
if (
colDesc.coltype === ENTupleColumnType.kIndex64 ||
colDesc.coltype === ENTupleColumnType.kIndex32 ||
colDesc.coltype === ENTupleColumnType.kSplitIndex64 ||
colDesc.coltype === ENTupleColumnType.kSplitIndex32
) // Index64/Index32
rntuple._clusterData[field.fieldName][0] = values; // Offsets
else if (colDesc.coltype === ENTupleColumnType.kChar)
rntuple._clusterData[field.fieldName][1] = values; // Payload
Expand Down
Loading