Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 92 additions & 16 deletions tree/ntuple/src/RNTupleMerger.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,38 @@ struct RChangeCompressionFunc {
RPageStorage::RSealedPage &fSealedPage;
ROOT::Internal::RPageAllocator &fPageAlloc;
std::uint8_t *fBuffer;
std::size_t fBufSize;

void operator()() const
{
assert(fSrcColElement.GetIdentifier() == fDstColElement.GetIdentifier());

const auto bytesPacked = fSrcColElement.GetPackedSize(fSealedPage.GetNElements());
// TODO: this buffer could be kept and reused across pages
auto unzipBuf = MakeUninitArray<unsigned char>(bytesPacked);
ROOT::Internal::RNTupleDecompressor::Unzip(fSealedPage.GetBuffer(), fSealedPage.GetDataSize(), bytesPacked,
unzipBuf.get());

// TODO: maybe we should check writeOpts.fWriteChecksum rather than fSealedPage.GetHasChecksum()...
const auto checksumSize = fSealedPage.GetHasChecksum() * sizeof(std::uint64_t);
assert(fBufSize >= bytesPacked + checksumSize);
auto nBytesZipped = ROOT::Internal::RNTupleCompressor::Zip(unzipBuf.get(), bytesPacked,
fMergeOptions.fCompressionSettings.value(), fBuffer);

fSealedPage = {fBuffer, nBytesZipped + checksumSize, fSealedPage.GetNElements(), fSealedPage.GetHasChecksum()};
fSealedPage.ChecksumIfEnabled();
}
};

struct RResealFunc {
const RColumnElementBase &fSrcColElement;
const RColumnElementBase &fDstColElement;
const RNTupleMergeOptions &fMergeOptions;

RPageStorage::RSealedPage &fSealedPage;
ROOT::Internal::RPageAllocator &fPageAlloc;
std::uint8_t *fBuffer;
std::size_t fBufSize;

void operator()() const
{
Expand All @@ -277,11 +309,25 @@ struct RChangeCompressionFunc {
sealConf.fBuffer = fBuffer;
sealConf.fCompressionSettings = *fMergeOptions.fCompressionSettings;
sealConf.fWriteChecksum = fSealedPage.GetHasChecksum();
assert(fBufSize >= fSealedPage.GetDataSize() + fSealedPage.GetHasChecksum() * sizeof(std::uint64_t));
auto refSealedPage = RPageSink::SealPage(sealConf);
fSealedPage = refSealedPage;
}
};

struct RTaskVisitor {
std::optional<ROOT::Experimental::TTaskGroup> &fGroup;

template <typename T>
void operator()(T &f)
{
if (fGroup)
fGroup->Run(f);
else
f();
}
};

struct RCommonField {
const ROOT::RFieldDescriptor *fSrc;
const ROOT::RFieldDescriptor *fDst;
Expand Down Expand Up @@ -778,15 +824,23 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,

// Each column range potentially has a distinct compression settings
const auto colRangeCompressionSettings = clusterDesc.GetColumnRange(columnId).GetCompressionSettings().value();
// If either the compression or the encoding of the source doesn't match that of the destination, we need
// to reseal the page. Otherwise, if both match, we can fast merge.

// Select "merging level". There are 3 levels, from fastest to slowest, depending on the case:
// L1: compression and encoding of src and dest both match: we can simply copy the page
// L2: compression of dest doesn't match the src but encoding does: we must recompress the page but can avoid
// resealing it.
// L3: on-disk encoding doesn't match: we need to reseal the page, which implies decompressing and recompressing
// it.
const bool compressionIsDifferent =
colRangeCompressionSettings != mergeData.fMergeOpts.fCompressionSettings.value();
const bool needsResealing =
colRangeCompressionSettings != mergeData.fMergeOpts.fCompressionSettings.value() ||
srcColElement->GetIdentifier().fOnDiskType != dstColElement->GetIdentifier().fOnDiskType;
const bool needsRecompressing = compressionIsDifferent || needsResealing;

if (needsResealing && mergeData.fMergeOpts.fExtraVerbose) {
if (needsRecompressing && mergeData.fMergeOpts.fExtraVerbose) {
R__LOG_INFO(NTupleMergeLog())
<< "Resealing column " << column.fColumnName << ": { compression: " << colRangeCompressionSettings << " => "
<< (needsResealing ? "Resealing" : "Recompressing") << " column " << column.fColumnName
<< ": { compression: " << colRangeCompressionSettings << " => "
<< mergeData.fMergeOpts.fCompressionSettings.value()
<< ", onDiskType: " << RColumnElementBase::GetColumnTypeName(srcColElement->GetIdentifier().fOnDiskType)
<< " => " << RColumnElementBase::GetColumnTypeName(dstColElement->GetIdentifier().fOnDiskType);
Expand All @@ -795,7 +849,7 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,
size_t pageBufferBaseIdx = sealedPageData.fBuffers.size();
// If the column range already has the right compression we don't need to allocate any new buffer, so we don't
// bother reserving memory for them.
if (needsResealing)
if (needsRecompressing)
sealedPageData.fBuffers.resize(sealedPageData.fBuffers.size() + pages.GetPageInfos().size());

// If this column is deferred, we may need to fill "holes" until its real start. We fill any missing entry
Expand Down Expand Up @@ -831,18 +885,40 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,
sealedPage.VerifyChecksumIfEnabled().ThrowOnError();
R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));

if (needsResealing) {
if (needsRecompressing) {
std::optional<std::variant<RChangeCompressionFunc, RResealFunc>> task;
const auto uncompressedSize = srcColElement->GetSize() * sealedPage.GetNElements();
auto &buffer = sealedPageData.fBuffers[pageBufferBaseIdx + pageIdx];
buffer = MakeUninitArray<std::uint8_t>(uncompressedSize + checksumSize);
RChangeCompressionFunc compressTask{
*srcColElement, *dstColElement, mergeData.fMergeOpts, sealedPage, *fPageAlloc, buffer.get(),
};

if (fTaskGroup)
fTaskGroup->Run(compressTask);
else
compressTask();
const auto bufSize = uncompressedSize + checksumSize;
// NOTE: we currently allocate the max possible size for this buffer and don't shrink it afterward.
// We might want to introduce an option that trades speed for memory usage and shrink the buffer to fit
// the actual data size after recompressing.
buffer = MakeUninitArray<std::uint8_t>(bufSize);

if (needsResealing) {
task.emplace(RResealFunc{
*srcColElement,
*dstColElement,
mergeData.fMergeOpts,
sealedPage,
*fPageAlloc,
buffer.get(),
bufSize,
});
} else {
task.emplace(RChangeCompressionFunc{
*srcColElement,
*dstColElement,
mergeData.fMergeOpts,
sealedPage,
*fPageAlloc,
buffer.get(),
bufSize,
});
}

assert(task.has_value());
std::visit(RTaskVisitor{fTaskGroup}, *task);
}

++pageIdx;
Expand Down
Loading