from __future__ import annotations
import os
import shutil
import struct
import textwrap
from array import array
from dataclasses import dataclass
from typing import Any, Callable, Literal, Optional
import numpy as np
from numpy.typing import NDArray
kNewClassTag = 0xFFFFFFFF
kByteCountMask = 0x40000000
kClassMask = 0x80000000
kIsReferenced = 1 << 4
kStreamedMemberwise = 1 << 14
kMapOffset = 2
[docs]
def debug_print(*args, **kwargs):
pass
if "UPROOT_DEBUG" in os.environ:
debug_print = print
@dataclass
class _Reference:
type: Literal["object", "class"]
class_name: Optional[str] = None
object_index: Optional[int] = None
[docs]
class BinaryStream:
def __init__(
self,
data: NDArray[np.uint8],
offsets: NDArray[np.uint32],
initial_cursor_position: int,
repr_nbytes: int = 50,
):
if isinstance(data, bytes):
data = np.frombuffer(data, dtype=np.uint8)
self.data = data
self.offsets = offsets
self.cursor = 0
self.repr_nbytes = repr_nbytes
# Store the initial cursor position for reference (e.g., for calculating relative positions)
self.initial_cursor_position = initial_cursor_position
self.refs: dict[int, _Reference] = {}
@property
def entries(self):
return len(self.offsets) - 1
@property
def remaining_data(self):
return self.data[self.cursor :]
[docs]
def read_uint8(self) -> int:
val = struct.unpack_from(">B", self.data, self.cursor)[0]
self.cursor += 1
return val
[docs]
def read_uint16(self) -> int:
val = struct.unpack_from(">H", self.data, self.cursor)[0]
self.cursor += 2
return val
[docs]
def read_uint32(self) -> int:
val = struct.unpack_from(">I", self.data, self.cursor)[0]
self.cursor += 4
return val
[docs]
def read_uint64(self) -> int:
val = struct.unpack_from(">Q", self.data, self.cursor)[0]
self.cursor += 8
return val
[docs]
def read_int8(self) -> int:
val = struct.unpack_from(">b", self.data, self.cursor)[0]
self.cursor += 1
return val
[docs]
def read_int16(self) -> int:
val = struct.unpack_from(">h", self.data, self.cursor)[0]
self.cursor += 2
return val
[docs]
def read_int32(self) -> int:
val = struct.unpack_from(">i", self.data, self.cursor)[0]
self.cursor += 4
return val
[docs]
def read_int64(self) -> int:
val = struct.unpack_from(">q", self.data, self.cursor)[0]
self.cursor += 8
return val
[docs]
def read_float(self) -> float:
val = struct.unpack_from(">f", self.data, self.cursor)[0]
self.cursor += 4
return val
[docs]
def read_double(self) -> float:
val = struct.unpack_from(">d", self.data, self.cursor)[0]
self.cursor += 8
return val
[docs]
def read_bool(self) -> bool:
return bool(self.read_uint8())
[docs]
def read_fNBytes(self) -> np.uint32:
byte_count = self.read_uint32()
assert byte_count & kByteCountMask, f"Invalid byte count: {byte_count}"
return byte_count & (~kByteCountMask)
[docs]
def read_fVersion(self):
return self.read_int16()
[docs]
def read_null_terminated_string(self):
start = self.cursor
while self.data[self.cursor] != 0:
self.cursor += 1
res = self.data[start : self.cursor].tobytes().decode()
self.cursor += 1 # Skip the null terminator
return res
[docs]
def read_TString(self):
length = self.read_uint8()
if length == 255:
length = self.read_uint32()
start = self.cursor
self.cursor += length
return self.data[start : self.cursor].tobytes().decode()
[docs]
def skip(self, n: int):
self.cursor += n
[docs]
def skip_fNBytes(self):
self.read_fNBytes()
[docs]
def skip_fVersion(self):
self.skip(2)
[docs]
def skip_null_terminated_string(self):
while self.data[self.cursor] != 0:
self.cursor += 1
self.cursor += 1 # Skip the null terminator
[docs]
def skip_TObject(self):
self.skip_fVersion()
self.skip(4) # fUniqueID
fBits = self.read_uint32()
if fBits & kIsReferenced:
self.skip(2) # pidf
def __repr__(self):
res = ""
data_view = self.data[self.cursor : self.cursor + self.repr_nbytes]
for i in data_view:
res += f"{i:3d}, "
if len(data_view) < len(self.data[self.cursor :]):
res += "..."
else:
res = res[:-1]
width = 76
try:
width, _ = shutil.get_terminal_size()
width = max(40, width - 4)
except Exception:
# Ignore errors if terminal size cannot be determined; use default width
pass
wrapper = textwrap.TextWrapper(
width=width,
initial_indent="[ ",
subsequent_indent=" ",
break_long_words=False,
break_on_hyphens=False,
drop_whitespace=False,
replace_whitespace=False,
)
return "BinaryStream:\n" + wrapper.fill(res) + "]"
BinaryBuffer = BinaryStream
[docs]
class IReader:
def __init__(self, name: str):
self.name = name
[docs]
def read(self, stream: BinaryStream) -> None:
raise NotImplementedError
[docs]
def read_many(self, stream: BinaryStream, count: int) -> int:
for _ in range(count):
self.read(stream)
return count
[docs]
def read_until(self, stream: BinaryStream, end_pos: int) -> int:
count = 0
while stream.cursor < end_pos:
self.read(stream)
count += 1
return count
[docs]
def read_many_memberwise(self, stream: BinaryStream, count: int) -> int:
raise NotImplementedError(
f"{self.__class__.__name__}({self.name}).read_many_memberwise is not implemented"
)
[docs]
def data(self) -> Any:
raise NotImplementedError
DTYPE_TO_TYPECODE = {
"uint8": "B",
"uint16": "H",
"uint32": "I",
"uint64": "Q",
"int8": "b",
"int16": "h",
"int32": "i",
"int64": "q",
"float32": "f",
"float64": "d",
"bool": "B",
}
DTYPE_TO_READER: dict[str, Callable[[BinaryStream], int]] = {
"uint8": BinaryStream.read_uint8,
"uint16": BinaryStream.read_uint16,
"uint32": BinaryStream.read_uint32,
"uint64": BinaryStream.read_uint64,
"int8": BinaryStream.read_int8,
"int16": BinaryStream.read_int16,
"int32": BinaryStream.read_int32,
"int64": BinaryStream.read_int64,
"float32": BinaryStream.read_float,
"float64": BinaryStream.read_double,
"bool": BinaryStream.read_bool,
}
[docs]
class PrimitiveReader(IReader):
def __init__(
self,
name: str,
dtype: Literal[
"bool",
"uint8",
"uint16",
"uint32",
"uint64",
"int8",
"int16",
"int32",
"int64",
"float32",
"float64",
],
):
super().__init__(name)
self.dtype = dtype
self.typecode = DTYPE_TO_TYPECODE[dtype]
self._data = array(self.typecode)
self.buffer_reader = DTYPE_TO_READER[dtype]
[docs]
def read(self, stream):
self._data.append(self.buffer_reader(stream))
[docs]
def data(self):
return np.asarray(self._data, dtype=self.dtype)
[docs]
class TObjectReader(IReader):
def __init__(self, name: str, keep_data: bool = False):
super().__init__(name)
self.keep_data = keep_data
self.unique_id = array("i")
self.bits = array("I")
self.pidf = array("H")
self.pidf_offsets = array("q", [0])
[docs]
def read(self, stream):
stream.skip_fVersion()
fUniqueID = stream.read_int32()
fBits = stream.read_uint32()
if fBits & kIsReferenced:
if self.keep_data:
self.pidf.append(stream.read_uint16())
else:
stream.skip(2)
if self.keep_data:
self.unique_id.append(fUniqueID)
self.bits.append(fBits)
self.pidf_offsets.append(len(self.pidf))
[docs]
def data(self):
if not self.keep_data:
return None
unique_id_array = np.asarray(self.unique_id)
bits_array = np.asarray(self.bits)
pidf_array = np.asarray(self.pidf)
pidf_offsets_array = np.asarray(self.pidf_offsets)
return unique_id_array, bits_array, pidf_array, pidf_offsets_array
[docs]
class TStringReader(IReader):
def __init__(self, name: str, with_header: bool):
super().__init__(name)
self.with_header = with_header
self._data = array("B")
self.offsets = array("q", [0])
[docs]
def read(self, stream):
fSize = stream.read_uint8()
if fSize == 255:
fSize = stream.read_uint32()
for _ in range(fSize):
self._data.append(stream.read_uint8())
self.offsets.append(len(self._data))
[docs]
def read_many(self, stream, count):
assert (
count >= 0
), f"Calling {self.name}.read_many with negative count: {count} is not allowed"
if count == 0:
return 0
if self.with_header:
stream.skip_fNBytes()
stream.skip_fVersion()
for _ in range(count):
self.read(stream)
return count
[docs]
def read_until(self, stream, end_pos):
if stream.cursor == end_pos:
return 0
if self.with_header:
stream.skip_fNBytes()
stream.skip_fVersion()
count = 0
while stream.cursor < end_pos:
self.read(stream)
count += 1
return count
[docs]
def data(self):
data_array = np.asarray(self._data)
offsets_array = np.asarray(self.offsets)
return offsets_array, data_array
[docs]
class STLSeqReader(IReader):
def __init__(
self,
name: str,
with_header: bool,
objwise_or_memberwise: Literal["auto", "obj-wise", "member-wise"],
element_reader: IReader,
):
super().__init__(name)
self.with_header = with_header
self.objwise_or_memberwise = objwise_or_memberwise
self.element_reader = element_reader
self.offsets = array("q", [0])
[docs]
def check_objwise_memberwise(self, is_memberwise: bool):
if self.objwise_or_memberwise == "obj-wise" and is_memberwise:
raise ValueError(
f"STLSeqReader({self.name}) expected obj-wise reading but got member-wise"
)
if self.objwise_or_memberwise == "member-wise" and not is_memberwise:
raise ValueError(
f"STLSeqReader({self.name}) expected member-wise reading but got obj-wise"
)
[docs]
def read_element_version(self, stream: BinaryStream):
version = stream.read_fVersion()
checksum = None
if version == 0:
checksum = stream.read_uint32()
return version, checksum
[docs]
def read_body(self, stream: BinaryStream, is_memberwise: bool):
fSize = stream.read_uint32()
self.offsets.append(self.offsets[-1] + fSize)
debug_print(
f"STLSeqReader({self.name}): reading body, is_memberwise={is_memberwise}, fSize={fSize}\n"
)
debug_print(stream)
if is_memberwise:
self.element_reader.read_many_memberwise(stream, fSize)
else:
self.element_reader.read_many(stream, fSize)
[docs]
def read(self, stream):
stream.skip_fNBytes()
fVersion = stream.read_fVersion()
is_memberwise = bool(fVersion & kStreamedMemberwise)
self.check_objwise_memberwise(is_memberwise)
if is_memberwise:
self.read_element_version(stream)
self.read_body(stream, is_memberwise)
[docs]
def read_many(self, stream, count):
if count == 0:
return 0
elif count < 0:
assert (
self.with_header
), f"STLSeqReader({self.name}).read_many called with negative count expects with_header=True"
fNBytes = stream.read_fNBytes()
end_pos = stream.cursor + fNBytes
fVersion = stream.read_fVersion()
is_memberwise = bool(fVersion & kStreamedMemberwise)
self.check_objwise_memberwise(is_memberwise)
if is_memberwise:
self.read_element_version(stream)
cur_count = 0
while stream.cursor < end_pos:
self.read_body(stream, is_memberwise)
cur_count += 1
return cur_count
else:
is_memberwise = self.objwise_or_memberwise == "member-wise"
if self.with_header:
stream.skip_fNBytes()
fVersion = stream.read_fVersion()
is_memberwise = bool(fVersion & kStreamedMemberwise)
self.check_objwise_memberwise(is_memberwise)
if is_memberwise:
self.read_element_version(stream)
for _ in range(count):
self.read_body(stream, is_memberwise)
return count
[docs]
def read_until(self, stream, end_pos):
if stream.cursor == end_pos:
return 0
is_memberwise = self.objwise_or_memberwise == "member-wise"
if self.with_header:
stream.skip_fNBytes()
fVersion = stream.read_fVersion()
is_memberwise = bool(fVersion & kStreamedMemberwise)
self.check_objwise_memberwise(is_memberwise)
if is_memberwise:
self.read_element_version(stream)
count = 0
while stream.cursor < end_pos:
self.read_body(stream, is_memberwise)
count += 1
return count
[docs]
def data(self):
offsets_array = np.asarray(self.offsets)
element_data = self.element_reader.data()
return offsets_array, element_data
[docs]
class STLMapReader(IReader):
def __init__(
self,
name: str,
with_header: bool,
objwise_or_memberwise: Literal["auto", "obj-wise", "member-wise"],
key_reader: IReader,
value_reader: IReader,
):
super().__init__(name)
self.with_header = with_header
self.objwise_or_memberwise = objwise_or_memberwise
self.key_reader = key_reader
self.value_reader = value_reader
self.offsets = array("q", [0])
[docs]
def check_objwise_memberwise(self, is_memberwise: bool):
if self.objwise_or_memberwise == "obj-wise" and is_memberwise:
raise ValueError(
f"STLMapReader({self.name}) expected obj-wise reading but got member-wise"
)
if self.objwise_or_memberwise == "member-wise" and not is_memberwise:
raise ValueError(
f"STLMapReader({self.name}) expected member-wise reading but got obj-wise"
)
[docs]
def read_element_version(self, stream: BinaryStream):
version = stream.read_fVersion()
checksum = None
if version == 0:
checksum = stream.read_uint32()
return version, checksum
[docs]
def read_body(self, stream: BinaryStream, is_memberwise: bool):
fSize = stream.read_uint32()
self.offsets.append(self.offsets[-1] + fSize)
debug_print(
f"STLMapReader({self.name}): reading body, is_memberwise={is_memberwise}, fSize={fSize}\n"
)
debug_print(stream)
if is_memberwise:
self.key_reader.read_many(stream, fSize)
self.value_reader.read_many(stream, fSize)
else:
for _ in range(fSize):
self.key_reader.read(stream)
self.value_reader.read(stream)
[docs]
def read(self, stream):
stream.skip_fNBytes()
fVersion = stream.read_fVersion()
self.read_element_version(stream)
is_memberwise = bool(fVersion & kStreamedMemberwise)
self.check_objwise_memberwise(is_memberwise)
self.read_body(stream, is_memberwise)
[docs]
def read_many(self, stream, count):
if count == 0:
return 0
elif count < 0:
assert (
self.with_header
), f"STLMapReader({self.name}).read_many called with negative count expecting with_header=True"
fNBytes = stream.read_fNBytes()
end_pos = stream.cursor + fNBytes
fVersion = stream.read_fVersion()
self.read_element_version(stream)
is_memberwise = bool(fVersion & kStreamedMemberwise)
self.check_objwise_memberwise(is_memberwise)
cur_count = 0
while stream.cursor < end_pos:
self.read_body(stream, is_memberwise)
cur_count += 1
return cur_count
else:
is_memberwise = self.objwise_or_memberwise == "member-wise"
if self.with_header:
stream.skip_fNBytes()
fVersion = stream.read_fVersion()
self.read_element_version(stream)
is_memberwise = bool(fVersion & kStreamedMemberwise)
self.check_objwise_memberwise(is_memberwise)
for _ in range(count):
self.read_body(stream, is_memberwise)
return count
[docs]
def read_until(self, stream, end_pos):
if stream.cursor == end_pos:
return 0
is_memberwise = self.objwise_or_memberwise == "member-wise"
if self.with_header:
stream.skip_fNBytes()
fVersion = stream.read_fVersion()
self.read_element_version(stream)
is_memberwise = bool(fVersion & kStreamedMemberwise)
self.check_objwise_memberwise(is_memberwise)
count = 0
while stream.cursor < end_pos:
self.read_body(stream, is_memberwise)
count += 1
return count
[docs]
def read_many_memberwise(self, stream, count):
assert (
count >= 0
), f"Calling {self.name}.read_many_memberwise with negative count: {count} is not allowed"
is_memberwise = True
self.check_objwise_memberwise(is_memberwise)
return self.read_many(stream, count)
[docs]
def data(self):
offsets_array = np.asarray(self.offsets)
key_data = self.key_reader.data()
value_data = self.value_reader.data()
return offsets_array, key_data, value_data
[docs]
class STLStringReader(IReader):
def __init__(self, name: str, with_header: bool):
super().__init__(name)
self.with_header = with_header
self._data = array("B")
self.offsets = array("q", [0])
[docs]
def read_body(self, stream: BinaryStream):
fSize = stream.read_uint8()
if fSize == 255:
fSize = stream.read_uint32()
self.offsets.append(self.offsets[-1] + fSize)
for _ in range(fSize):
self._data.append(stream.read_uint8())
[docs]
def read(self, stream):
if self.with_header:
stream.skip_fNBytes()
stream.skip_fVersion()
self.read_body(stream)
[docs]
def read_many(self, stream, count):
if count == 0:
return 0
elif count < 0:
assert (
self.with_header
), f"STLStringReader({self.name}).read_many called with negative count expecting with_header=True"
fNBytes = stream.read_fNBytes()
end_pos = stream.cursor + fNBytes
stream.skip_fVersion()
cur_count = 0
while stream.cursor < end_pos:
self.read_body(stream)
cur_count += 1
return cur_count
else:
if self.with_header:
stream.skip_fNBytes()
stream.skip_fVersion()
for _ in range(count):
self.read_body(stream)
return count
[docs]
def read_until(self, stream, end_pos):
if stream.cursor == end_pos:
return 0
if self.with_header:
stream.skip_fNBytes()
stream.skip_fVersion()
count = 0
while stream.cursor < end_pos:
self.read_body(stream)
count += 1
return count
[docs]
def data(self):
data_array = np.asarray(self._data)
offsets_array = np.asarray(self.offsets)
return offsets_array, data_array
[docs]
class TArrayReader(IReader):
def __init__(
self,
name: str,
dtype: Literal["int8", "int16", "int32", "int64", "float32", "float64"],
):
super().__init__(name)
self.dtype = dtype
self.typecode = DTYPE_TO_TYPECODE[dtype]
self._data = array(self.typecode)
self.offsets = array("q", [0])
self.buffer_reader = DTYPE_TO_READER[dtype]
[docs]
def read(self, stream):
fSize = stream.read_uint32()
self.offsets.append(self.offsets[-1] + fSize)
for _ in range(fSize):
self._data.append(self.buffer_reader(stream))
[docs]
def data(self):
offsets_array = np.asarray(self.offsets)
data_array = np.asarray(self._data)
return offsets_array, data_array
[docs]
class GroupReader(IReader):
def __init__(self, name: str, element_readers: list[IReader]):
super().__init__(name)
self.element_readers = element_readers
[docs]
def read(self, stream):
for reader in self.element_readers:
debug_print(f"GroupReader({self.name}) reading element {reader.name}:\n")
debug_print(stream)
reader.read(stream)
[docs]
def read_many_memberwise(self, stream, count):
assert (
count >= 0
), f"Calling {self.name}.read_many_memberwise with negative count: {count} is not allowed"
for reader in self.element_readers:
debug_print(
f"GroupReader{self.name} reading many member-wise element {reader.name}:\n"
)
debug_print(stream)
reader.read_many(stream, count)
return count
[docs]
def data(self):
return [reader.data() for reader in self.element_readers]
[docs]
class AnyClassReader(IReader):
def __init__(self, name: str, element_readers: list[IReader]):
super().__init__(name)
self.element_readers = element_readers
[docs]
def read(self, stream: BinaryStream):
fNBytes = stream.read_fNBytes()
start_pos = stream.cursor
end_pos = start_pos + fNBytes
stream.skip_fVersion()
for reader in self.element_readers:
debug_print(f"AnyClassReader({self.name}) reading element {reader.name}:\n")
debug_print(stream)
reader.read(stream)
assert stream.cursor == end_pos, (
f"AnyClassReader({self.name}): Invalid read length! Expect {fNBytes} bytes, "
f"but read {stream.cursor - start_pos} bytes."
)
[docs]
def read_many_memberwise(self, stream, count):
assert (
count >= 0
), f"Calling {self.name}.read_many_memberwise with negative count: {count} is not allowed"
for reader in self.element_readers:
debug_print(
f"AnyClassReader{self.name} reading many member-wise element {reader.name}:\n"
)
debug_print(stream)
reader.read_many(stream, count)
return count
[docs]
def data(self):
return [reader.data() for reader in self.element_readers]
[docs]
class AnyPointerReader(IReader):
"""
Reads objects stored via ROOT's read_object_any protocol.
This handles both:
- True pointers (kObjectP=64, kAnyP=69): full read_object_any with null/reference support
- Object headers (kNewClassTag + classname before object data)
Binary format (read_object_any):
1. bcnt (uint32) — if kByteCountMask set: versioned, then read tag (uint32)
otherwise: unversioned, tag = bcnt
2. tag:
- tag == 0: null pointer
- tag & kClassMask == 0 (nonzero): reference to previously-read object (skip bytes)
- tag == kNewClassTag: null-terminated classname + object data
- tag & kClassMask != 0: reference to known class, new object data
"""
def __init__(self, name: str, element_reader: IReader):
super().__init__(name)
self.element_reader = element_reader
self.object_counter = 0 # Counter for assigning object indexes to true pointers
self.object_indexes = array("q")
self.class_name = None
[docs]
def read(self, stream):
start_pos = stream.cursor
ref_begin = start_pos + stream.initial_cursor_position
fNBytes = stream.read_uint32()
if (fNBytes & kByteCountMask) == 0 or fNBytes == kNewClassTag:
fVersion = 0
ref_begin = 0
fTag = fNBytes
fNBytes = 0
else:
fVersion = 1
fTag = stream.read_uint32()
fNBytes &= ~kByteCountMask
end_pos = start_pos + 4 + fNBytes if fVersion > 0 else start_pos + 4
if fTag & kClassMask == 0:
if fTag == 0:
assert (
stream.cursor == end_pos
), f"AnyPointerReader({self.name}): Invalid read length! Expect to read 0 bytes, but read {stream.cursor - start_pos} bytes."
self.object_indexes.append(-1) # Use -1 to indicate null pointer
return
elif fTag == 1:
raise NotImplementedError("AnyPointerReader: kAnyP (tag=1) is not supported")
elif fTag not in stream.refs:
stream.cursor = end_pos # Skip unknown reference
return
else:
ref_idx = stream.refs[fTag].object_index
self.object_indexes.append(ref_idx)
assert (
stream.cursor == end_pos
), f"AnyPointerReader({self.name}): Invalid read length! Expect to read 0 bytes, but read {stream.cursor - start_pos} bytes."
return
elif fTag == kNewClassTag:
class_name = stream.read_null_terminated_string()
if self.class_name is None:
self.class_name = class_name
else:
assert self.class_name == class_name, (
f"AnyPointerReader({self.name}): Inconsistent class names for multiple objects! "
f"Expected {self.class_name}, but got {class_name}"
)
ref_key = ref_begin + kMapOffset if fVersion > 0 else len(stream.refs) + 1
stream.refs[ref_key] = _Reference(type="class", class_name=class_name)
self.element_reader.read(stream)
self.object_indexes.append(self.object_counter)
ref_key = ref_begin + kMapOffset if fVersion > 0 else len(stream.refs) + 1
stream.refs[ref_key] = _Reference(type="object", object_index=self.object_counter)
self.object_counter += 1
else: # reference class, new object
cls_ref_key = fTag & (~kClassMask)
if cls_ref_key in stream.refs:
class_name = stream.refs[cls_ref_key].class_name
assert class_name == self.class_name, (
f"AnyPointerReader({self.name}): Inconsistent class names for multiple objects! "
f"Expected {self.class_name}, but got {class_name}"
)
self.element_reader.read(stream)
self.object_indexes.append(self.object_counter)
obj_ref_key = ref_begin + kMapOffset if fVersion > 0 else len(stream.refs) + 1
stream.refs[obj_ref_key] = _Reference(
type="object", object_index=self.object_counter
)
self.object_counter += 1
assert (
stream.cursor == end_pos
), f"AnyPointerReader({self.name}): Invalid read length! Expect to read 0 bytes, but read {stream.cursor - start_pos} bytes."
return
[docs]
def data(self):
element_data = self.element_reader.data()
object_indexes_array = np.asarray(self.object_indexes)
return element_data, object_indexes_array
[docs]
class CStyleArrayReader(IReader):
def __init__(self, name: str, flat_size: int, element_reader: IReader):
super().__init__(name)
self.flat_size = flat_size
self.element_reader = element_reader
self.offsets = array("q", [0])
[docs]
def read(self, stream):
debug_print(
f"CStyleArrayReader({self.name}): reading C-style array of flat_size={self.flat_size}\n"
)
debug_print(stream)
if self.flat_size >= 0:
self.element_reader.read_many(stream, self.flat_size)
else:
entry_offsets = stream.offsets
cursor_pos = stream.cursor
end_offset_index = (entry_offsets > cursor_pos).nonzero()[0].min()
end_pos = entry_offsets[end_offset_index]
count = self.element_reader.read_until(stream, end_pos)
self.offsets.append(self.offsets[-1] + count)
debug_print(f"CStyleArrayReader({self.name}): read {count} elements")
[docs]
def read_many(self, stream, count):
assert (
self.flat_size >= 0
), f"Calling CStyleArrayReader({self.name}).read_many with negative flat_size is not allowed"
assert (
count >= 0
), f"Calling CStyleArrayReader({self.name}).read_many with negative count: {count} is not allowed"
for _ in range(count):
self.element_reader.read_many(stream, self.flat_size)
return count
[docs]
def read_until(self, stream, end_pos):
raise NotImplementedError("CStyleArrayReader.read_until is not supported")
[docs]
def data(self):
if self.flat_size >= 0:
return self.element_reader.data()
else:
offsets_array = np.asarray(self.offsets)
element_data = self.element_reader.data()
return offsets_array, element_data
[docs]
class EmptyReader(IReader):
[docs]
def read(self, stream):
pass
[docs]
def data(self):
return None
[docs]
def read_data(
data: NDArray[np.uint8],
offsets: NDArray[np.uint32],
cursor_offset: int,
reader: IReader,
):
stream = BinaryStream(data, offsets, cursor_offset)
for i_evt in range(stream.entries):
start_pos = stream.cursor
reader.read(stream)
end_pos = stream.cursor
assert end_pos == offsets[i_evt + 1], (
f"read_data: Invalid read length for {reader.name} at entry {i_evt}! Expect "
f"{stream.offsets[i_evt + 1]-stream.offsets[i_evt]} bytes, but read {end_pos - start_pos} bytes."
)
return reader.data()