Skip to content

Parallel b-tree reading #216

@bnlawrence

Description

@bnlawrence

Parallel B-Tree Reading via cat_ranges

Much usage of pyfive will be for remote access to data where the data is presented to pyfive
vai fsspec, which itself supports a significant set of benefits, the most of important of
which is that if the requests are provided together via cat_ranges, rather than a series of requests,
each with latency, the requests are generated concurrently. Tests with chunk reading show this
can improve performance by a factor of 10 on remote http and s3 resources.
Some benefit ought to be possible for b-trees as well.

Background and Motivation

HDF5 B-tree traversal is currently sequential: each node is read via fh.seek() +
fh.read(), and child addresses are only known after the parent node has been parsed.
For remote files this means O(N nodes) serial round-trips.

The existing MetadataBufferingWrapper papers over this by eagerly reading the first
1 MiB on first access, which covers B-tree nodes for well-packed files. For
poorly-packed files (nodes scattered beyond 1 MiB) the fallback is individual remote
reads — exactly the pattern that cat_ranges is designed to replace.

The chunk data path already uses cat_ranges via _read_bulk_fsspec. This change
applies the same technique to B-tree node reads.


Key Insight: What Can Be Parallelised

B-tree traversal has a sequential dependency between levels: child addresses are
only discovered by parsing the parent. This cannot be avoided. However, once all
addresses at a given level are known, every read at that level is independent.

Level N (root):   sequential read          ← 1 read, unavoidable
Level N-1:        cat_ranges               ← all addresses known after root
...
Level 0 (leaves): cat_ranges               ← all addresses known after level 1

In practice HDF5 v1 B-trees are shallow (2–3 levels), so the internal nodes are few
and cheap. The leaves dominate, and that is where the win is largest.

The target optimisation: traverse internal nodes sequentially (cheap, few reads),
collect all leaf addresses, then fetch all leaf nodes in a single cat_ranges call.


Scope

This change applies to BTreeV1RawDataChunks only — this is the B-tree used for
chunked raw data and is the hot path for array reads. BTreeV1Groups and the v2
B-trees are lower priority and should not be changed in this pass.


Implementation

1. Add a _supports_cat_ranges helper on AbstractBTree

@staticmethod
def _get_cat_ranges_fs(fh):
    """
    Return (fs, path) if fh supports cat_ranges, else (None, None).
    Reaches through MetadataBufferingWrapper the same way _read_bulk_fsspec does.
    """
    actual_fh = getattr(fh, "fh", fh)
    fs = getattr(actual_fh, "fs", None)
    path = getattr(actual_fh, "path", None)
    if fs is not None and path is not None and hasattr(fs, "cat_ranges"):
        return fs, path
    return None, None

2. Split BTreeV1RawDataChunks.__init__ traversal into two phases

The existing _read_children walks the tree level-by-level sequentially. Replace it
in BTreeV1RawDataChunks with a version that:

  • Phase 1: reads internal nodes (level > 0) sequentially as today
  • Phase 2: collects all leaf addresses from the parsed internal nodes, then fetches
    all leaf nodes in one cat_ranges call
def _read_children(self):
    """Override to parallelise leaf reads via cat_ranges when available."""
    fs, path = self._get_cat_ranges_fs(self.fh)

    if fs is None:
        # No cat_ranges support — fall back to original sequential traversal
        super()._read_children()
        return

    # Phase 1: traverse internal nodes sequentially (level > 0)
    # These are few in number; sequential reads are fine here, and we
    # cannot avoid the dependency chain between levels.
    for node_level in range(self.depth, 0, -1):
        for parent_node in self.all_nodes[node_level]:
            for child_addr in parent_node["addresses"]:
                if node_level - 1 > 0:
                    # Still an internal node — read sequentially as before
                    child_node = self._read_node(child_addr, node_level - 1)
                    self._add_node(child_node)
                # If node_level - 1 == 0 it's a leaf; collect below

    # Phase 2: collect all leaf addresses from the lowest internal level
    leaf_addresses = []
    lowest_internal = self.all_nodes.get(1, [])
    for node in lowest_internal:
        leaf_addresses.extend(node["addresses"])

    if not leaf_addresses:
        # Depth-0 tree (root is also leaf) — already read, nothing to do
        return

    # Phase 3: bulk-fetch all leaf nodes in one cat_ranges call
    leaf_size = self._estimate_leaf_node_size()
    starts = [addr for addr in leaf_addresses]
    stops  = [addr + leaf_size for addr in leaf_addresses]

    raw_leaves = fs.cat_ranges([path] * len(leaf_addresses), starts, stops)

    for addr, raw in zip(leaf_addresses, raw_leaves):
        node = self._parse_leaf_from_buffer(raw, addr)
        self._add_node(node)

3. Leaf node size estimation

HDF5 v1 B-tree node size is not stored in the node itself; it must be inferred.
The safest approach is to read the first leaf node sequentially to determine its
actual size, then use that for all subsequent reads.

def _estimate_leaf_node_size(self):
    """
    Return the byte size of a leaf node by reading the first one sequentially.

    HDF5 v1 B-tree nodes have no stored size field; the size is a function of
    the number of entries (entries_used in the header) and the per-entry record
    size (which depends on self.dims).  We read one node to get entries_used,
    compute the size, and assume all leaf nodes are the same size.

    Per-entry layout for NODE_TYPE=1 (raw data chunks):
        8 bytes  : chunk_size + filter_mask (two uint32)
        8 * dims : chunk_offset (dims uint64 values)
        8 bytes  : chunk_address (uint64)
    """
    # B_LINK_NODE header size: 4 + 1 + 1 + 2 + 8 + 8 = 24 bytes
    HEADER_SIZE = 24
    # Per-entry record size
    entry_size = 8 + 8 * self.dims + 8
    # N+1 keys are *not* present in NODE_TYPE=1 (unlike type 0 groups)

    # Peek at the first leaf node to get entries_used
    first_leaf_addr = self.all_nodes[1][0]["addresses"][0]
    self.fh.seek(first_leaf_addr)
    header_bytes = self.fh.read(HEADER_SIZE)
    entries_used = struct.unpack_from("<H", header_bytes, 6)[0]  # offset 6 in header

    return HEADER_SIZE + entries_used * entry_size

Note: If leaf nodes can have differing entries_used values (e.g. the last leaf
in a non-full tree), then using a fixed size will over-read the last few nodes.
Over-reading is harmless — _parse_leaf_from_buffer reads only entries_used
entries from the buffer — but wastes a small amount of bandwidth. An alternative
is to compute the size per-address by peeking all headers in a first cat_ranges
call (24 bytes each), then issuing a second cat_ranges for full node bodies.
For most files the single-size approach is sufficient.

4. Parse a leaf node from a raw buffer

Extract the existing sequential parse logic from _read_node into a buffer-based
variant so it can be called on the raw bytes returned by cat_ranges:

def _parse_leaf_from_buffer(self, raw, addr):
    """
    Parse a leaf node (node_level=0) from a raw bytes buffer.
    Mirrors the logic in _read_node / _read_node_header but reads from
    a BytesIO rather than seeking self.fh.
    """
    import io
    buf = io.BytesIO(raw)

    # --- header (mirrors _read_node_header) ---
    node = _unpack_struct_from(self.B_LINK_NODE, buf.read(struct.calcsize(
        "<" + "".join(self.B_LINK_NODE.values())
    )))
    assert node["signature"] == b"TREE"
    assert node["node_type"] == self.NODE_TYPE
    assert node["node_level"] == 0

    # --- entries (mirrors _read_node body for NODE_TYPE=1) ---
    keys = []
    addresses = []
    fmt = "<" + "Q" * self.dims
    fmt_size = struct.calcsize(fmt)

    for _ in range(node["entries_used"]):
        chunk_size, filter_mask = struct.unpack("<II", buf.read(8))
        chunk_offset = struct.unpack(fmt, buf.read(fmt_size))
        chunk_address = struct.unpack("<Q", buf.read(8))[0]
        keys.append(OrderedDict((
            ("chunk_size",   chunk_size),
            ("filter_mask",  filter_mask),
            ("chunk_offset", chunk_offset),
        )))
        addresses.append(chunk_address)

    node["keys"] = keys
    node["addresses"] = addresses
    self.last_offset = max(addr, self.last_offset)
    return node

Implementation note: _unpack_struct_from already exists in core.py and
accepts a buffer argument — use it directly rather than duplicating the unpacking
logic. Check its exact signature before wiring up.


Interaction with MetadataBufferingWrapper

cat_ranges is called on actual_fh.fs (the underlying fsspec filesystem), bypassing
MetadataBufferingWrapper entirely — exactly as _read_bulk_fsspec does for chunks.
This is correct behaviour:

  • Well-packed files: the 1 MiB eager buffer already holds the leaf nodes.
    cat_ranges will hit the fsspec cache layer and cost nothing extra.
  • Poorly-packed files: cat_ranges issues parallel remote fetches for all
    scattered leaf nodes in one round-trip, which is far better than the current
    sequential fallback reads.

No changes to MetadataBufferingWrapper are required.


Fallback Behaviour

_read_children checks for cat_ranges support before doing anything new. If the
filesystem does not support cat_ranges (local files, in-memory buffers, custom
wrappers) the call falls through to super()._read_children() — the existing
sequential traversal — unchanged.


Testing

  1. Unit test — well-packed file on S3 (mocked): mock fs.cat_ranges to return
    pre-built leaf node bytes; assert it is called once with all leaf addresses; assert
    the resulting self.all_nodes[0] matches the sequential parse.

  2. Unit test — no cat_ranges: pass a plain BytesIO fh; assert the sequential
    path is taken and results are identical.

  3. Regression: run the existing B-tree test suite unchanged — all results must
    match before and after.

  4. Integration: open a real chunked HDF5 file via s3fs; assert chunk index
    contents are identical between serial and parallel paths.


Files to Change

File Change
pyfive/btree.py Add _get_cat_ranges_fs, override _read_children and add _estimate_leaf_node_size / _parse_leaf_from_buffer to BTreeV1RawDataChunks
tests/test_btree_parallel.py New test file covering the cases above

No changes required to metadata_buffering_wrapper.py, chunk_read.py, or any caller
of BTreeV1RawDataChunks — the parallel path is entirely internal to the class.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions