Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions tree/ntuple/inc/ROOT/RMiniFile.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@ class TVirtualStreamerInfo;

namespace ROOT {

class RNTupleWriteOptions;

namespace Internal {

class RNTupleFileWriter;
class RPageSource;
class RRawFile;
}

class RNTupleWriteOptions;
TDirectory *GetUnderlyingDirectory(ROOT::Internal::RNTupleFileWriter &writer);

namespace Internal {
/// Holds status information of an open ROOT file during writing
struct RTFileControlBlock;

Expand All @@ -53,6 +56,8 @@ RNTuple data keys.
*/
// clang-format on
class RMiniFileReader {
friend ROOT::Internal::RPageSource;

private:
/// The raw file used to read byte ranges
ROOT::Internal::RRawFile *fRawFile = nullptr;
Expand All @@ -68,9 +73,6 @@ private:
/// Used when the file turns out to be a TFile container. The ntuplePath variable is either the ntuple name
/// or an ntuple name preceded by a directory (`myNtuple` or `foo/bar/myNtuple` or `/foo/bar/myNtuple`)
RResult<RNTuple> GetNTupleProper(std::string_view ntuplePath);
/// Loads an RNTuple anchor from a TFile at the given file offset (unzipping it if necessary).
RResult<RNTuple>
GetNTupleProperAtOffset(std::uint64_t payloadOffset, std::uint64_t compSize, std::uint64_t uncompLen);

/// Searches for a key with the given name and type in the key index of the directory starting at offsetDir.
/// The offset points to the start of the TDirectory DATA section, without the key and without the name and title
Expand All @@ -84,6 +86,9 @@ public:
explicit RMiniFileReader(ROOT::Internal::RRawFile *rawFile);
/// Extracts header and footer location for the RNTuple identified by ntupleName
RResult<RNTuple> GetNTuple(std::string_view ntupleName);
/// Loads an RNTuple anchor from a TFile at the given file offset (unzipping it if necessary).
RResult<RNTuple>
GetNTupleProperAtOffset(std::uint64_t payloadOffset, std::uint64_t compSize, std::uint64_t uncompLen);
/// Reads a given byte range from the file into the provided memory buffer.
/// If `nbytes > fMaxKeySize` it will perform chunked read from multiple blobs,
/// whose addresses are listed at the end of the first chunk.
Expand All @@ -109,6 +114,8 @@ A stand-alone version of RNTuple can remove the TFile based writer.
*/
// clang-format on
class RNTupleFileWriter {
friend TDirectory *ROOT::Internal::GetUnderlyingDirectory(ROOT::Internal::RNTupleFileWriter &writer);

public:
/// The key length of a blob. It is always a big key (version > 1000) with class name RBlob.
static constexpr std::size_t kBlobKeyLen = 42;
Expand Down Expand Up @@ -254,7 +261,7 @@ public:
void WriteIntoReservedBlob(const void *buffer, size_t nbytes, std::int64_t offset);
/// Ensures that the streamer info records passed as argument are written to the file
void UpdateStreamerInfos(const ROOT::Internal::RNTupleSerializer::StreamerInfoMap_t &streamerInfos);
/// Writes the RNTuple key to the file so that the header and footer keys can be found
/// Writes the RNTuple key to the file so that the header and footer keys can be found.
void Commit(int compression = RCompressionSetting::EDefaults::kUseGeneralPurpose);
};

Expand Down
5 changes: 4 additions & 1 deletion tree/ntuple/inc/ROOT/RNTupleFillContext.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private:
std::size_t FillImpl(Entry &entry)
{
ROOT::RNTupleFillStatus status;
FillNoFlush(entry, status);
FillNoFlushImpl(entry, status);
if (status.ShouldFlushCluster())
FlushCluster();
return status.GetLastEntrySize();
Expand All @@ -114,6 +114,9 @@ private:
RNTupleFillContext &operator=(const RNTupleFillContext &) = delete;

public:
RNTupleFillContext(RNTupleFillContext &&) = default;
RNTupleFillContext &operator=(RNTupleFillContext &&) = default;

~RNTupleFillContext();

/// Fill an entry into this context, but don't commit the cluster. The calling code must pass an RNTupleFillStatus
Expand Down
2 changes: 2 additions & 0 deletions tree/ntuple/inc/ROOT/RPageSinkBuf.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ public:
void CommitDatasetImpl() final;

RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final;

TDirectory *GetUnderlyingDirectory() const final { return fInnerSink->GetUnderlyingDirectory(); }
}; // RPageSinkBuf

} // namespace Internal
Expand Down
8 changes: 6 additions & 2 deletions tree/ntuple/inc/ROOT/RPageStorage.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ namespace ROOT {
class RNTupleModel;

namespace Internal {

class RPageAllocator;
class RColumn;
class RMiniFileReader;
class RPageAllocator;
struct RNTupleModelChangeset;

enum class EPageStorageType {
Expand Down Expand Up @@ -313,6 +313,8 @@ public:

virtual ROOT::NTupleSize_t GetNEntries() const = 0;

virtual TDirectory *GetUnderlyingDirectory() const { return nullptr; }

/// Physically creates the storage container to hold the ntuple (e.g., a keys a TFile or an S3 bucket)
/// Init() associates column handles to the columns referenced by the model
void Init(RNTupleModel &model)
Expand Down Expand Up @@ -808,6 +810,8 @@ public:
virtual std::vector<std::unique_ptr<ROOT::Internal::RCluster>>
LoadClusters(std::span<ROOT::Internal::RCluster::RKey> clusterKeys) = 0;

virtual RMiniFileReader *GetUnderlyingReader() { return nullptr; }

/// Parallel decompression and unpacking of the pages in the given cluster. The unzipped pages are supposed
/// to be preloaded in a page pool attached to the source. The method is triggered by the cluster pool's
/// unzip thread. It is an optional optimization, the method can safely do nothing. In particular, the
Expand Down
11 changes: 11 additions & 0 deletions tree/ntuple/inc/ROOT/RPageStorageFile.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ public:
RPageSinkFile(RPageSinkFile &&) = default;
RPageSinkFile &operator=(RPageSinkFile &&) = default;
~RPageSinkFile() override;

virtual TDirectory *GetUnderlyingDirectory() const final { return Internal::GetUnderlyingDirectory(*fWriter); }

ROOT::Internal::RNTupleFileWriter *GetUnderlyingWriter() const { return fWriter.get(); }
}; // class RPageSinkFile

// clang-format off
Expand Down Expand Up @@ -149,6 +153,8 @@ private:
std::unique_ptr<ROOT::Internal::RCluster>
PrepareSingleCluster(const ROOT::Internal::RCluster::RKey &clusterKey, std::vector<RRawFile::RIOVec> &readRequests);

RMiniFileReader *GetUnderlyingReader() final { return &fReader; }

protected:
void LoadStructureImpl() final;
ROOT::RNTupleDescriptor AttachImpl(RNTupleSerializer::EDescriptorDeserializeMode mode) final;
Expand All @@ -173,6 +179,11 @@ public:
RPageSourceFile &operator=(RPageSourceFile &&) = delete;
~RPageSourceFile() override;

/// Creates a new PageSourceFile using the same underlying file as this but referring to a different RNTuple,
/// represented by `anchor`.
std::unique_ptr<RPageSourceFile>
OpenWithDifferentAnchor(const RNTuple &anchor, const ROOT::RNTupleReadOptions &options = ROOT::RNTupleReadOptions());

void
LoadSealedPage(ROOT::DescriptorId_t physicalColumnId, RNTupleLocalIndex localIndex, RSealedPage &sealedPage) final;

Expand Down
8 changes: 8 additions & 0 deletions tree/ntuple/src/RMiniFile.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1608,3 +1608,11 @@ void ROOT::Internal::RNTupleFileWriter::WriteTFileSkeleton(int defaultCompressio
fileSimple.Write(&padding, sizeof(padding));
fileSimple.fKeyOffset = fileSimple.fFilePos;
}

TDirectory *ROOT::Internal::GetUnderlyingDirectory(ROOT::Internal::RNTupleFileWriter &writer)
{
if (auto *proper = std::get_if<ROOT::Internal::RNTupleFileWriter::RFileProper>(&writer.fFile)) {
return proper->fDirectory;
}
return nullptr;
}
5 changes: 3 additions & 2 deletions tree/ntuple/src/RPageStorage.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <ROOT/RPageAllocator.hxx>
#include <ROOT/RPageSinkBuf.hxx>
#include <ROOT/RPageStorageFile.hxx>
#include <ROOT/RNTupleReader.hxx>
#ifdef R__ENABLE_DAOS
#include <ROOT/RPageStorageDaos.hxx>
#endif
Expand Down Expand Up @@ -1198,8 +1199,8 @@ void ROOT::Internal::RPagePersistentSink::CommitStagedClusters(std::span<RStaged
if (!columnInfo.fIsSuppressed)
continue;
const auto colId = columnInfo.fPageRange.GetPhysicalColumnId();
// For suppressed columns, we need to reset the first element index to the first element of the next (upcoming)
// cluster. This information has been determined for the committed cluster descriptor through
// For suppressed columns, we need to reset the first element index to the first element of the next
// (upcoming) cluster. This information has been determined for the committed cluster descriptor through
// CommitSuppressedColumnRanges(), so we can use the information from the descriptor.
const auto &columnRangeFromDesc = clusterBuilder.GetColumnRange(colId);
fOpenColumnRanges[colId].SetFirstElementIndex(columnRangeFromDesc.GetFirstElementIndex() +
Expand Down
32 changes: 20 additions & 12 deletions tree/ntuple/src/RPageStorageFile.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,15 @@ ROOT::Internal::RPageSourceFile::CreateFromAnchor(const RNTuple &anchor, const R
return pageSource;
}

std::unique_ptr<ROOT::Internal::RPageSourceFile>
ROOT::Internal::RPageSourceFile::OpenWithDifferentAnchor(const RNTuple &anchor, const ROOT::RNTupleReadOptions &options)
{
auto pageSource = std::make_unique<RPageSourceFile>("", fFile->Clone(), options);
pageSource->fAnchor = anchor;
pageSource->fNTupleName = pageSource->fDescriptorBuilder.GetDescriptor().GetName();
return pageSource;
}

ROOT::Internal::RPageSourceFile::~RPageSourceFile() = default;

void ROOT::Internal::RPageSourceFile::LoadStructureImpl()
Expand Down Expand Up @@ -504,18 +513,17 @@ ROOT::Internal::RPageSourceFile::PrepareSingleCluster(const RCluster::RKey &clus
std::vector<ROnDiskPageLocator> onDiskPages;
auto activeSize = 0;
auto pageZeroMap = std::make_unique<ROnDiskPageMap>();
PrepareLoadCluster(clusterKey, *pageZeroMap,
[&](ROOT::DescriptorId_t physicalColumnId, ROOT::NTupleSize_t pageNo,
const ROOT::RClusterDescriptor::RPageInfo &pageInfo) {
const auto &pageLocator = pageInfo.GetLocator();
if (pageLocator.GetType() == RNTupleLocator::kTypeUnknown)
throw RException(R__FAIL("tried to read a page with an unknown locator"));
const auto nBytes =
pageLocator.GetNBytesOnStorage() + pageInfo.HasChecksum() * kNBytesPageChecksum;
activeSize += nBytes;
onDiskPages.push_back(
{physicalColumnId, pageNo, pageLocator.GetPosition<std::uint64_t>(), nBytes, 0});
});
PrepareLoadCluster(
clusterKey, *pageZeroMap,
[&](ROOT::DescriptorId_t physicalColumnId, ROOT::NTupleSize_t pageNo,
const ROOT::RClusterDescriptor::RPageInfo &pageInfo) {
const auto &pageLocator = pageInfo.GetLocator();
if (pageLocator.GetType() == RNTupleLocator::kTypeUnknown)
throw RException(R__FAIL("tried to read a page with an unknown locator"));
const auto nBytes = pageLocator.GetNBytesOnStorage() + pageInfo.HasChecksum() * kNBytesPageChecksum;
activeSize += nBytes;
onDiskPages.push_back({physicalColumnId, pageNo, pageLocator.GetPosition<std::uint64_t>(), nBytes, 0});
});

// Linearize the page requests by file offset
std::sort(onDiskPages.begin(), onDiskPages.end(),
Expand Down
56 changes: 56 additions & 0 deletions tree/ntuple/test/ntuple_storage.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1128,3 +1128,59 @@ TEST(RPageSinkFile, StreamerInfo)
}
FAIL() << "not all streamer infos found! ";
}

TEST(RPageSourceFile, OpenDifferentAnchor)
{
FileRaii fileGuard("test_ntuple_open_diff_anchor.root");

auto model = RNTupleModel::Create();
auto pF = model->MakeField<float>("f");
auto file = std::unique_ptr<TFile>(TFile::Open(fileGuard.GetPath().c_str(), "RECREATE"));
{
auto writer = RNTupleWriter::Append(std::move(model), "ntpl1", *file);
for (auto i = 0; i < 100; ++i) {
*pF = i;
writer->Fill();
}
}
{
model = RNTupleModel::Create();
auto pI = model->MakeField<int>("i");
auto pC = model->MakeField<char>("c");

auto writer = RNTupleWriter::Append(std::move(model), "ntpl2", *file);
for (auto i = 0; i < 20; ++i) {
*pI = i;
*pC = i;
writer->Fill();
}
}

auto source = std::make_unique<RPageSourceFile>("ntpl1", fileGuard.GetPath(), RNTupleReadOptions());
source->Attach();
EXPECT_EQ(source->GetNEntries(), 100);
auto desc = source->GetSharedDescriptorGuard();
EXPECT_NE(desc->FindFieldId("f"), ROOT::kInvalidDescriptorId);

auto anchor2 = file->Get<ROOT::RNTuple>("ntpl2");
ASSERT_NE(anchor2, nullptr);
auto source2 = source->OpenWithDifferentAnchor(*anchor2);
source2->Attach();
EXPECT_EQ(source2->GetNEntries(), 20);
{
auto desc2 = source2->GetSharedDescriptorGuard();
EXPECT_EQ(desc2->FindFieldId("f"), ROOT::kInvalidDescriptorId);
EXPECT_NE(desc2->FindFieldId("i"), ROOT::kInvalidDescriptorId);
EXPECT_NE(desc2->FindFieldId("c"), ROOT::kInvalidDescriptorId);
}

source.reset();
// source2 should still be valid after dropping the first source.
EXPECT_EQ(source2->GetNEntries(), 20);
{
auto desc2 = source2->GetSharedDescriptorGuard();
EXPECT_EQ(desc2->FindFieldId("f"), ROOT::kInvalidDescriptorId);
EXPECT_NE(desc2->FindFieldId("i"), ROOT::kInvalidDescriptorId);
EXPECT_NE(desc2->FindFieldId("c"), ROOT::kInvalidDescriptorId);
}
}
Loading