Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 186 additions & 0 deletions doc/pyfive_class_diagram.pu
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
@startuml
skinparam classAttributeIconSize 0
'skinparam linetype ortho
'Use portrait layout
top to bottom direction
skinparam ClassBackgroundColor palegoldenrod
skinparam ClassBorderColor Black



skinparam stereotypeCBackgroundColor white



!define BTREE_COLOR white
!define BTREE_STEREO <<(B,BTREE_COLOR)>>
!define HEAP_COLOR white
!define HEAP_STEREO <<(H,HEAP_COLOR)>>


' ------------------------------------------------------------------
' Packages
' ------------------------------------------------------------------

package pyfive {

package "high_level" {
class File {
+ filename : str
+ mode : str
+ __init__(filename, mode, metadata_buffer_size)
+ close()
+ __enter__()
+ __exit__()
+ consolidated_metadata : bool
}

class Group {
+ name : str
+ attrs : dict
+ __init__(name, dataobjects, parent)
+ __getitem__(name)
+ visit(func)
+ visititems(func, noindex)
+ get_lazy_view(name)
}

class Dataset {
+ name : str
+ shape : tuple
+ dtype : dtype
+ size : int
+ chunks : tuple
+ __init__(name, datasetid, parent)
+ __getitem__(args)
}
}

package "h5d" {
class ChunkRead {
+ set_parallelism(thread_count, cat_range_allowed)
+ _select_chunks(indexer, out, dtype)
}

class DatasetID {
+ rank : int
+ shape : tuple
+ dtype : dtype
+ chunks : tuple
+ layout_class : int
+ __init__(dataobjects)
+ get_data(args, fillvalue)
+ get_chunk_info(index)
+ get_num_chunks()
+ _build_index()
+ _make_btree_fetch_fn()
}

ChunkRead <|- DatasetID
}

package "dataobjects" {
class DataObjects {
+ offset : int
+ is_group : bool
+ is_dataset : bool
+ get_attributes()
+ get_links()
}
}

package "misc_low_level" {
class SuperBlock {
+ version : int
+ offset_to_dataobjects : int
}

class Heap HEAP_STEREO
class GlobalHeap HEAP_STEREO
class FractalHeap HEAP_STEREO
class SymbolTable

SymbolTable -[hidden]- Heap
SymbolTable -[hidden]- GlobalHeap
SymbolTable -[hidden]- FractalHeap
}

package "btree" {
class AbstractBTree BTREE_STEREO
class BTreeV1 BTREE_STEREO
class BTreeV1\nGroups BTREE_STEREO
class BTreeV1\nRawDataChunks BTREE_STEREO {
+ fh : int
+ offset : int
+ dims : int
fetch_fn()
}
class BTreeV2 BTREE_STEREO
class BTreeV2\nGroupNames BTREE_STEREO
class BTreeV2\nGroupOrders BTREE_STEREO
class BTreeV2\nAttrCreationOrder BTREE_STEREO
class BTreeV2\nAttrNames BTREE_STEREO

BTreeV1 --|> AbstractBTree
BTreeV2 --|> AbstractBTree
BTreeV1\nGroups --|> BTreeV1
BTreeV1\nRawDataChunks --|> BTreeV1
BTreeV2\nGroupNames --|> BTreeV2
BTreeV2\nGroupOrders --|> BTreeV2
BTreeV2\nAttrCreationOrder -|> BTreeV2
BTreeV2\nAttrNames --|> BTreeV2


}

package "utilities" {
class MetadataBufferingWrapper
}

' ------------------------------------------------------------------
' Relationships
' ------------------------------------------------------------------

' High-Level Hierarchy
File --|> Group
Group o-- Dataset
Group o-- Group

' Composition
File *-- SuperBlock : parses >
SuperBlock --> DataObjects : points to (root)
Group *-- DataObjects : uses >
Dataset *-- DatasetID : wraps >
DatasetID --> DataObjects : uses metadata >

' Lower Level internal mechanics
DataObjects ..> Heap : reads names
DataObjects ..> GlobalHeap : reads strings
DataObjects ..> FractalHeap : reads messages
DataObjects ..> SymbolTable : reads entries

' B-Tree usage
DataObjects ..> BTreeV1\nGroups : iterates children
DataObjects ..> BTreeV2\nGroupNames : iterates children
DataObjects ..> BTreeV2\nGroupOrders : iterates children
DataObjects ..> BTreeV2\nAttrCreationOrder : iterates attrs
DataObjects ..> BTreeV2\nAttrNames : iterates attrs
DatasetID ..> BTreeV1\nRawDataChunks : finds\ndata\nchunks
File ..> MetadataBufferingWrapper : wraps remote handles

note as N1
b-tree
- built by <i>_build_index</i>
- parallelism passed
via the <i>fetch_fn</i> to
<i>BTreeV1RawDataChunks</i>
end note

DatasetID .. N1
N1 .. BTreeV1\nRawDataChunks

}



@enduml
122 changes: 115 additions & 7 deletions pyfive/btree.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,22 +126,81 @@ class BTreeV1RawDataChunks(BTreeV1):

NODE_TYPE = 1 # type: ignore[assignment]

def __init__(self, fh, offset, dims):
def __init__(self, fh, offset, dims, fetch_fn=None):
"""initalize."""
self.dims = dims
self._fetch_fn = fetch_fn
super().__init__(fh, offset)

def _read_children(self):
"""Read children; fetch leaf nodes via fetch_fn when provided."""
if self._fetch_fn is None:
super()._read_children()
return

# Leaf-level root node: already read in _read_root_node.
if self.depth == 0:
return

# Traverse internal levels sequentially; each level depends on the prior.
for node_level in range(self.depth, 0, -1):
for parent_node in self.all_nodes[node_level]:
for child_addr in parent_node["addresses"]:
if node_level - 1 > 0:
child_node = self._read_node(child_addr, node_level - 1)
self._add_node(child_node)

# Collect leaf addresses from the lowest internal level.
leaf_addresses = []
for node in self.all_nodes.get(1, []):
leaf_addresses.extend(node["addresses"])

if not leaf_addresses:
return

# Leaf nodes can differ in entries_used, so fetch just headers first,
# then group addresses by exact node size.
header_size = struct.calcsize("<" + "".join(self.B_LINK_NODE.values()))
header_buffers = self._fetch_fn(leaf_addresses, header_size)

size_to_addresses = OrderedDict()
entry_size = 8 + self.dims * 8 + 8
for addr, header in zip(leaf_addresses, header_buffers):
entries_used = struct.unpack_from("<H", header, 6)[0]
node_size = header_size + entries_used * entry_size
size_to_addresses.setdefault(node_size, []).append(addr)

for node_size, addresses in size_to_addresses.items():
raw_buffers = self._fetch_fn(addresses, node_size)
for addr, raw in zip(addresses, raw_buffers):
node = self._parse_node_from_buffer(raw, addr, node_level=0)
self._add_node(node)

def _read_node(self, offset, node_level):
"""Return a single node in the b-tree located at a give offset."""
node = self._read_node_header(offset, node_level)

keys = []
addresses = []
for _ in range(node["entries_used"]):
chunk_size, filter_mask = struct.unpack("<II", self.fh.read(8))
fmt = "<" + "Q" * self.dims
fmt_size = struct.calcsize(fmt)
chunk_offset = struct.unpack(fmt, self.fh.read(fmt_size))
chunk_address = struct.unpack("<Q", self.fh.read(8))[0]
n_entries = node["entries_used"]
entry_size = 8 + self.dims * 8 + 8
read_size = n_entries * entry_size
node_data = self.fh.read(read_size)
mem_view = memoryview(node_data)
offset_fmt = f"<{self.dims}Q"
offset_size = self.dims * 8

pos = 0
for _ in range(n_entries):
# chunk_size, filter_mask
chunk_size, filter_mask = struct.unpack_from("<II", mem_view, pos)
pos += 8
# chunk_offset dims × uint64
chunk_offset = struct.unpack_from(offset_fmt, mem_view, pos)
pos += offset_size
# address
(chunk_address,) = struct.unpack_from("<Q", mem_view, pos)
pos += 8

keys.append(
OrderedDict(
Expand All @@ -158,6 +217,55 @@ def _read_node(self, offset, node_level):
self.last_offset = max(offset, self.last_offset)
return node

def _leaf_node_size(self):
"""Compute the byte size of a leaf node by peeking at the first one."""
header_size = struct.calcsize("<" + "".join(self.B_LINK_NODE.values()))
entry_size = 8 + 8 * self.dims + 8

first_leaf_addr = self.all_nodes[1][0]["addresses"][0]
self.fh.seek(first_leaf_addr)
header_bytes = self.fh.read(header_size)
entries_used = struct.unpack_from("<H", header_bytes, 6)[0]
return header_size + entries_used * entry_size

def _parse_node_from_buffer(self, raw, addr, node_level):
"""Parse a raw-data-chunk b-tree node from an in-memory bytes buffer."""
node = _unpack_struct_from(self.B_LINK_NODE, raw)
assert node["signature"] == b"TREE"
assert node["node_type"] == self.NODE_TYPE
assert node["node_level"] == node_level

keys = []
addresses = []
mv = memoryview(raw)
offset_fmt = f"<{self.dims}Q"
offset_fmt_size = self.dims * 8
cursor = struct.calcsize("<" + "".join(self.B_LINK_NODE.values()))

for _ in range(node["entries_used"]):
chunk_size, filter_mask = struct.unpack_from("<II", mv, cursor)
cursor += 8
chunk_offset = struct.unpack_from(offset_fmt, mv, cursor)
cursor += offset_fmt_size
chunk_address = struct.unpack_from("<Q", mv, cursor)[0]
cursor += 8

keys.append(
OrderedDict(
(
("chunk_size", chunk_size),
("filter_mask", filter_mask),
("chunk_offset", chunk_offset),
)
)
)
addresses.append(chunk_address)

node["keys"] = keys
node["addresses"] = addresses
self.last_offset = max(addr, self.last_offset)
return node

@classmethod
def _filter_chunk(cls, chunk_buffer, filter_mask, filter_pipeline, itemsize):
"""Apply decompression filters to a chunk of data."""
Expand Down
Loading
Loading