diff --git a/tree/ntuple/src/RNTupleMerger.cxx b/tree/ntuple/src/RNTupleMerger.cxx index 9b5462fbf2446..9cf7593120315 100644 --- a/tree/ntuple/src/RNTupleMerger.cxx +++ b/tree/ntuple/src/RNTupleMerger.cxx @@ -267,6 +267,38 @@ struct RChangeCompressionFunc { RPageStorage::RSealedPage &fSealedPage; ROOT::Internal::RPageAllocator &fPageAlloc; std::uint8_t *fBuffer; + std::size_t fBufSize; + + void operator()() const + { + assert(fSrcColElement.GetIdentifier() == fDstColElement.GetIdentifier()); + + const auto bytesPacked = fSrcColElement.GetPackedSize(fSealedPage.GetNElements()); + // TODO: this buffer could be kept and reused across pages + auto unzipBuf = MakeUninitArray(bytesPacked); + ROOT::Internal::RNTupleDecompressor::Unzip(fSealedPage.GetBuffer(), fSealedPage.GetDataSize(), bytesPacked, + unzipBuf.get()); + + // TODO: maybe we should check writeOpts.fWriteChecksum rather than fSealedPage.GetHasChecksum()... + const auto checksumSize = fSealedPage.GetHasChecksum() * sizeof(std::uint64_t); + assert(fBufSize >= bytesPacked + checksumSize); + auto nBytesZipped = ROOT::Internal::RNTupleCompressor::Zip(unzipBuf.get(), bytesPacked, + fMergeOptions.fCompressionSettings.value(), fBuffer); + + fSealedPage = {fBuffer, nBytesZipped + checksumSize, fSealedPage.GetNElements(), fSealedPage.GetHasChecksum()}; + fSealedPage.ChecksumIfEnabled(); + } +}; + +struct RResealFunc { + const RColumnElementBase &fSrcColElement; + const RColumnElementBase &fDstColElement; + const RNTupleMergeOptions &fMergeOptions; + + RPageStorage::RSealedPage &fSealedPage; + ROOT::Internal::RPageAllocator &fPageAlloc; + std::uint8_t *fBuffer; + std::size_t fBufSize; void operator()() const { @@ -277,11 +309,25 @@ struct RChangeCompressionFunc { sealConf.fBuffer = fBuffer; sealConf.fCompressionSettings = *fMergeOptions.fCompressionSettings; sealConf.fWriteChecksum = fSealedPage.GetHasChecksum(); + assert(fBufSize >= fSealedPage.GetDataSize() + fSealedPage.GetHasChecksum() * sizeof(std::uint64_t)); auto refSealedPage = RPageSink::SealPage(sealConf); fSealedPage = refSealedPage; } }; +struct RTaskVisitor { + std::optional &fGroup; + + template + void operator()(T &f) + { + if (fGroup) + fGroup->Run(f); + else + f(); + } +}; + struct RCommonField { const ROOT::RFieldDescriptor *fSrc; const ROOT::RFieldDescriptor *fDst; @@ -778,15 +824,23 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, // Each column range potentially has a distinct compression settings const auto colRangeCompressionSettings = clusterDesc.GetColumnRange(columnId).GetCompressionSettings().value(); - // If either the compression or the encoding of the source doesn't match that of the destination, we need - // to reseal the page. Otherwise, if both match, we can fast merge. + + // Select "merging level". There are 3 levels, from fastest to slowest, depending on the case: + // L1: compression and encoding of src and dest both match: we can simply copy the page + // L2: compression of dest doesn't match the src but encoding does: we must recompress the page but can avoid + // resealing it. + // L3: on-disk encoding doesn't match: we need to reseal the page, which implies decompressing and recompressing + // it. + const bool compressionIsDifferent = + colRangeCompressionSettings != mergeData.fMergeOpts.fCompressionSettings.value(); const bool needsResealing = - colRangeCompressionSettings != mergeData.fMergeOpts.fCompressionSettings.value() || srcColElement->GetIdentifier().fOnDiskType != dstColElement->GetIdentifier().fOnDiskType; + const bool needsRecompressing = compressionIsDifferent || needsResealing; - if (needsResealing && mergeData.fMergeOpts.fExtraVerbose) { + if (needsRecompressing && mergeData.fMergeOpts.fExtraVerbose) { R__LOG_INFO(NTupleMergeLog()) - << "Resealing column " << column.fColumnName << ": { compression: " << colRangeCompressionSettings << " => " + << (needsResealing ? "Resealing" : "Recompressing") << " column " << column.fColumnName + << ": { compression: " << colRangeCompressionSettings << " => " << mergeData.fMergeOpts.fCompressionSettings.value() << ", onDiskType: " << RColumnElementBase::GetColumnTypeName(srcColElement->GetIdentifier().fOnDiskType) << " => " << RColumnElementBase::GetColumnTypeName(dstColElement->GetIdentifier().fOnDiskType); @@ -795,7 +849,7 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, size_t pageBufferBaseIdx = sealedPageData.fBuffers.size(); // If the column range already has the right compression we don't need to allocate any new buffer, so we don't // bother reserving memory for them. - if (needsResealing) + if (needsRecompressing) sealedPageData.fBuffers.resize(sealedPageData.fBuffers.size() + pages.GetPageInfos().size()); // If this column is deferred, we may need to fill "holes" until its real start. We fill any missing entry @@ -831,18 +885,40 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, sealedPage.VerifyChecksumIfEnabled().ThrowOnError(); R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize())); - if (needsResealing) { + if (needsRecompressing) { + std::optional> task; const auto uncompressedSize = srcColElement->GetSize() * sealedPage.GetNElements(); auto &buffer = sealedPageData.fBuffers[pageBufferBaseIdx + pageIdx]; - buffer = MakeUninitArray(uncompressedSize + checksumSize); - RChangeCompressionFunc compressTask{ - *srcColElement, *dstColElement, mergeData.fMergeOpts, sealedPage, *fPageAlloc, buffer.get(), - }; - - if (fTaskGroup) - fTaskGroup->Run(compressTask); - else - compressTask(); + const auto bufSize = uncompressedSize + checksumSize; + // NOTE: we currently allocate the max possible size for this buffer and don't shrink it afterward. + // We might want to introduce an option that trades speed for memory usage and shrink the buffer to fit + // the actual data size after recompressing. + buffer = MakeUninitArray(bufSize); + + if (needsResealing) { + task.emplace(RResealFunc{ + *srcColElement, + *dstColElement, + mergeData.fMergeOpts, + sealedPage, + *fPageAlloc, + buffer.get(), + bufSize, + }); + } else { + task.emplace(RChangeCompressionFunc{ + *srcColElement, + *dstColElement, + mergeData.fMergeOpts, + sealedPage, + *fPageAlloc, + buffer.get(), + bufSize, + }); + } + + assert(task.has_value()); + std::visit(RTaskVisitor{fTaskGroup}, *task); } ++pageIdx;