Source code for uproot_custom.readers.python

from __future__ import annotations

import os
import shutil
import struct
import textwrap
from array import array
from dataclasses import dataclass
from typing import Any, Callable, Literal, Optional

import numpy as np
from numpy.typing import NDArray

kNewClassTag = 0xFFFFFFFF
kByteCountMask = 0x40000000
kClassMask = 0x80000000
kIsReferenced = 1 << 4
kStreamedMemberwise = 1 << 14
kMapOffset = 2


[docs] def debug_print(*args, **kwargs): pass
if "UPROOT_DEBUG" in os.environ: debug_print = print @dataclass class _Reference: type: Literal["object", "class"] class_name: Optional[str] = None object_index: Optional[int] = None
[docs] class BinaryStream: def __init__( self, data: NDArray[np.uint8], offsets: NDArray[np.uint32], initial_cursor_position: int, repr_nbytes: int = 50, ): if isinstance(data, bytes): data = np.frombuffer(data, dtype=np.uint8) self.data = data self.offsets = offsets self.cursor = 0 self.repr_nbytes = repr_nbytes # Store the initial cursor position for reference (e.g., for calculating relative positions) self.initial_cursor_position = initial_cursor_position self.refs: dict[int, _Reference] = {} @property def entries(self): return len(self.offsets) - 1 @property def remaining_data(self): return self.data[self.cursor :]
[docs] def read_uint8(self) -> int: val = struct.unpack_from(">B", self.data, self.cursor)[0] self.cursor += 1 return val
[docs] def read_uint16(self) -> int: val = struct.unpack_from(">H", self.data, self.cursor)[0] self.cursor += 2 return val
[docs] def read_uint32(self) -> int: val = struct.unpack_from(">I", self.data, self.cursor)[0] self.cursor += 4 return val
[docs] def read_uint64(self) -> int: val = struct.unpack_from(">Q", self.data, self.cursor)[0] self.cursor += 8 return val
[docs] def read_int8(self) -> int: val = struct.unpack_from(">b", self.data, self.cursor)[0] self.cursor += 1 return val
[docs] def read_int16(self) -> int: val = struct.unpack_from(">h", self.data, self.cursor)[0] self.cursor += 2 return val
[docs] def read_int32(self) -> int: val = struct.unpack_from(">i", self.data, self.cursor)[0] self.cursor += 4 return val
[docs] def read_int64(self) -> int: val = struct.unpack_from(">q", self.data, self.cursor)[0] self.cursor += 8 return val
[docs] def read_float(self) -> float: val = struct.unpack_from(">f", self.data, self.cursor)[0] self.cursor += 4 return val
[docs] def read_double(self) -> float: val = struct.unpack_from(">d", self.data, self.cursor)[0] self.cursor += 8 return val
[docs] def read_bool(self) -> bool: return bool(self.read_uint8())
[docs] def read_fNBytes(self) -> np.uint32: byte_count = self.read_uint32() assert byte_count & kByteCountMask, f"Invalid byte count: {byte_count}" return byte_count & (~kByteCountMask)
[docs] def read_fVersion(self): return self.read_int16()
[docs] def read_null_terminated_string(self): start = self.cursor while self.data[self.cursor] != 0: self.cursor += 1 res = self.data[start : self.cursor].tobytes().decode() self.cursor += 1 # Skip the null terminator return res
[docs] def read_obj_header(self): self.read_fNBytes() fTag = self.read_uint32() if fTag == kNewClassTag: return self.read_null_terminated_string() else: return ""
[docs] def read_TString(self): length = self.read_uint8() if length == 255: length = self.read_uint32() start = self.cursor self.cursor += length return self.data[start : self.cursor].tobytes().decode()
[docs] def skip(self, n: int): self.cursor += n
[docs] def skip_fNBytes(self): self.read_fNBytes()
[docs] def skip_fVersion(self): self.skip(2)
[docs] def skip_null_terminated_string(self): while self.data[self.cursor] != 0: self.cursor += 1 self.cursor += 1 # Skip the null terminator
[docs] def skip_obj_header(self): self.skip_fNBytes() fTag = self.read_uint32() if fTag == kNewClassTag: self.skip_null_terminated_string()
[docs] def skip_TObject(self): self.skip_fVersion() self.skip(4) # fUniqueID fBits = self.read_uint32() if fBits & kIsReferenced: self.skip(2) # pidf
def __repr__(self): res = "" data_view = self.data[self.cursor : self.cursor + self.repr_nbytes] for i in data_view: res += f"{i:3d}, " if len(data_view) < len(self.data[self.cursor :]): res += "..." else: res = res[:-1] width = 76 try: width, _ = shutil.get_terminal_size() width = max(40, width - 4) except Exception: # Ignore errors if terminal size cannot be determined; use default width pass wrapper = textwrap.TextWrapper( width=width, initial_indent="[ ", subsequent_indent=" ", break_long_words=False, break_on_hyphens=False, drop_whitespace=False, replace_whitespace=False, ) return "BinaryStream:\n" + wrapper.fill(res) + "]"
BinaryBuffer = BinaryStream
[docs] class IReader: def __init__(self, name: str): self.name = name
[docs] def read(self, stream: BinaryStream) -> None: raise NotImplementedError
[docs] def read_many(self, stream: BinaryStream, count: int) -> int: for _ in range(count): self.read(stream) return count
[docs] def read_until(self, stream: BinaryStream, end_pos: int) -> int: count = 0 while stream.cursor < end_pos: self.read(stream) count += 1 return count
[docs] def read_many_memberwise(self, stream: BinaryStream, count: int) -> int: raise NotImplementedError( f"{self.__class__.__name__}({self.name}).read_many_memberwise is not implemented" )
[docs] def data(self) -> Any: raise NotImplementedError
DTYPE_TO_TYPECODE = { "uint8": "B", "uint16": "H", "uint32": "I", "uint64": "Q", "int8": "b", "int16": "h", "int32": "i", "int64": "q", "float32": "f", "float64": "d", "bool": "B", } DTYPE_TO_READER: dict[str, Callable[[BinaryStream], int]] = { "uint8": BinaryStream.read_uint8, "uint16": BinaryStream.read_uint16, "uint32": BinaryStream.read_uint32, "uint64": BinaryStream.read_uint64, "int8": BinaryStream.read_int8, "int16": BinaryStream.read_int16, "int32": BinaryStream.read_int32, "int64": BinaryStream.read_int64, "float32": BinaryStream.read_float, "float64": BinaryStream.read_double, "bool": BinaryStream.read_bool, }
[docs] class PrimitiveReader(IReader): def __init__( self, name: str, dtype: Literal[ "bool", "uint8", "uint16", "uint32", "uint64", "int8", "int16", "int32", "int64", "float32", "float64", ], ): super().__init__(name) self.dtype = dtype self.typecode = DTYPE_TO_TYPECODE[dtype] self._data = array(self.typecode) self.buffer_reader = DTYPE_TO_READER[dtype]
[docs] def read(self, stream): self._data.append(self.buffer_reader(stream))
[docs] def data(self): return np.asarray(self._data, dtype=self.dtype)
[docs] class TObjectReader(IReader): def __init__(self, name: str, keep_data: bool = False): super().__init__(name) self.keep_data = keep_data self.unique_id = array("i") self.bits = array("I") self.pidf = array("H") self.pidf_offsets = array("q", [0])
[docs] def read(self, stream): stream.skip_fVersion() fUniqueID = stream.read_int32() fBits = stream.read_uint32() if fBits & kIsReferenced: if self.keep_data: self.pidf.append(stream.read_uint16()) else: stream.skip(2) if self.keep_data: self.unique_id.append(fUniqueID) self.bits.append(fBits) self.pidf_offsets.append(len(self.pidf))
[docs] def data(self): if not self.keep_data: return None unique_id_array = np.asarray(self.unique_id) bits_array = np.asarray(self.bits) pidf_array = np.asarray(self.pidf) pidf_offsets_array = np.asarray(self.pidf_offsets) return unique_id_array, bits_array, pidf_array, pidf_offsets_array
[docs] class TStringReader(IReader): def __init__(self, name: str, with_header: bool): super().__init__(name) self.with_header = with_header self._data = array("B") self.offsets = array("q", [0])
[docs] def read(self, stream): fSize = stream.read_uint8() if fSize == 255: fSize = stream.read_uint32() for _ in range(fSize): self._data.append(stream.read_uint8()) self.offsets.append(len(self._data))
[docs] def read_many(self, stream, count): assert ( count >= 0 ), f"Calling {self.name}.read_many with negative count: {count} is not allowed" if count == 0: return 0 if self.with_header: stream.skip_fNBytes() stream.skip_fVersion() for _ in range(count): self.read(stream) return count
[docs] def read_until(self, stream, end_pos): if stream.cursor == end_pos: return 0 if self.with_header: stream.skip_fNBytes() stream.skip_fVersion() count = 0 while stream.cursor < end_pos: self.read(stream) count += 1 return count
[docs] def data(self): data_array = np.asarray(self._data) offsets_array = np.asarray(self.offsets) return offsets_array, data_array
[docs] class STLSeqReader(IReader): def __init__( self, name: str, with_header: bool, objwise_or_memberwise: Literal["auto", "obj-wise", "member-wise"], element_reader: IReader, ): super().__init__(name) self.with_header = with_header self.objwise_or_memberwise = objwise_or_memberwise self.element_reader = element_reader self.offsets = array("q", [0])
[docs] def check_objwise_memberwise(self, is_memberwise: bool): if self.objwise_or_memberwise == "obj-wise" and is_memberwise: raise ValueError( f"STLSeqReader({self.name}) expected obj-wise reading but got member-wise" ) if self.objwise_or_memberwise == "member-wise" and not is_memberwise: raise ValueError( f"STLSeqReader({self.name}) expected member-wise reading but got obj-wise" )
[docs] def read_element_version(self, stream: BinaryStream): version = stream.read_fVersion() checksum = None if version == 0: checksum = stream.read_uint32() return version, checksum
[docs] def read_body(self, stream: BinaryStream, is_memberwise: bool): fSize = stream.read_uint32() self.offsets.append(self.offsets[-1] + fSize) debug_print( f"STLSeqReader({self.name}): reading body, is_memberwise={is_memberwise}, fSize={fSize}\n" ) debug_print(stream) if is_memberwise: self.element_reader.read_many_memberwise(stream, fSize) else: self.element_reader.read_many(stream, fSize)
[docs] def read(self, stream): stream.skip_fNBytes() fVersion = stream.read_fVersion() is_memberwise = bool(fVersion & kStreamedMemberwise) self.check_objwise_memberwise(is_memberwise) if is_memberwise: self.read_element_version(stream) self.read_body(stream, is_memberwise)
[docs] def read_many(self, stream, count): if count == 0: return 0 elif count < 0: assert ( self.with_header ), f"STLSeqReader({self.name}).read_many called with negative count expects with_header=True" fNBytes = stream.read_fNBytes() end_pos = stream.cursor + fNBytes fVersion = stream.read_fVersion() is_memberwise = bool(fVersion & kStreamedMemberwise) self.check_objwise_memberwise(is_memberwise) if is_memberwise: self.read_element_version(stream) cur_count = 0 while stream.cursor < end_pos: self.read_body(stream, is_memberwise) cur_count += 1 return cur_count else: is_memberwise = self.objwise_or_memberwise == "member-wise" if self.with_header: stream.skip_fNBytes() fVersion = stream.read_fVersion() is_memberwise = bool(fVersion & kStreamedMemberwise) self.check_objwise_memberwise(is_memberwise) if is_memberwise: self.read_element_version(stream) for _ in range(count): self.read_body(stream, is_memberwise) return count
[docs] def read_until(self, stream, end_pos): if stream.cursor == end_pos: return 0 is_memberwise = self.objwise_or_memberwise == "member-wise" if self.with_header: stream.skip_fNBytes() fVersion = stream.read_fVersion() is_memberwise = bool(fVersion & kStreamedMemberwise) self.check_objwise_memberwise(is_memberwise) if is_memberwise: self.read_element_version(stream) count = 0 while stream.cursor < end_pos: self.read_body(stream, is_memberwise) count += 1 return count
[docs] def data(self): offsets_array = np.asarray(self.offsets) element_data = self.element_reader.data() return offsets_array, element_data
[docs] class STLMapReader(IReader): def __init__( self, name: str, with_header: bool, objwise_or_memberwise: Literal["auto", "obj-wise", "member-wise"], key_reader: IReader, value_reader: IReader, ): super().__init__(name) self.with_header = with_header self.objwise_or_memberwise = objwise_or_memberwise self.key_reader = key_reader self.value_reader = value_reader self.offsets = array("q", [0])
[docs] def check_objwise_memberwise(self, is_memberwise: bool): if self.objwise_or_memberwise == "obj-wise" and is_memberwise: raise ValueError( f"STLMapReader({self.name}) expected obj-wise reading but got member-wise" ) if self.objwise_or_memberwise == "member-wise" and not is_memberwise: raise ValueError( f"STLMapReader({self.name}) expected member-wise reading but got obj-wise" )
[docs] def read_element_version(self, stream: BinaryStream): version = stream.read_fVersion() checksum = None if version == 0: checksum = stream.read_uint32() return version, checksum
[docs] def read_body(self, stream: BinaryStream, is_memberwise: bool): fSize = stream.read_uint32() self.offsets.append(self.offsets[-1] + fSize) debug_print( f"STLMapReader({self.name}): reading body, is_memberwise={is_memberwise}, fSize={fSize}\n" ) debug_print(stream) if is_memberwise: self.key_reader.read_many(stream, fSize) self.value_reader.read_many(stream, fSize) else: for _ in range(fSize): self.key_reader.read(stream) self.value_reader.read(stream)
[docs] def read(self, stream): stream.skip_fNBytes() fVersion = stream.read_fVersion() self.read_element_version(stream) is_memberwise = bool(fVersion & kStreamedMemberwise) self.check_objwise_memberwise(is_memberwise) self.read_body(stream, is_memberwise)
[docs] def read_many(self, stream, count): if count == 0: return 0 elif count < 0: assert ( self.with_header ), f"STLMapReader({self.name}).read_many called with negative count expecting with_header=True" fNBytes = stream.read_fNBytes() end_pos = stream.cursor + fNBytes fVersion = stream.read_fVersion() self.read_element_version(stream) is_memberwise = bool(fVersion & kStreamedMemberwise) self.check_objwise_memberwise(is_memberwise) cur_count = 0 while stream.cursor < end_pos: self.read_body(stream, is_memberwise) cur_count += 1 return cur_count else: is_memberwise = self.objwise_or_memberwise == "member-wise" if self.with_header: stream.skip_fNBytes() fVersion = stream.read_fVersion() self.read_element_version(stream) is_memberwise = bool(fVersion & kStreamedMemberwise) self.check_objwise_memberwise(is_memberwise) for _ in range(count): self.read_body(stream, is_memberwise) return count
[docs] def read_until(self, stream, end_pos): if stream.cursor == end_pos: return 0 is_memberwise = self.objwise_or_memberwise == "member-wise" if self.with_header: stream.skip_fNBytes() fVersion = stream.read_fVersion() self.read_element_version(stream) is_memberwise = bool(fVersion & kStreamedMemberwise) self.check_objwise_memberwise(is_memberwise) count = 0 while stream.cursor < end_pos: self.read_body(stream, is_memberwise) count += 1 return count
[docs] def read_many_memberwise(self, stream, count): assert ( count >= 0 ), f"Calling {self.name}.read_many_memberwise with negative count: {count} is not allowed" is_memberwise = True self.check_objwise_memberwise(is_memberwise) return self.read_many(stream, count)
[docs] def data(self): offsets_array = np.asarray(self.offsets) key_data = self.key_reader.data() value_data = self.value_reader.data() return offsets_array, key_data, value_data
[docs] class STLStringReader(IReader): def __init__(self, name: str, with_header: bool): super().__init__(name) self.with_header = with_header self._data = array("B") self.offsets = array("q", [0])
[docs] def read_body(self, stream: BinaryStream): fSize = stream.read_uint8() if fSize == 255: fSize = stream.read_uint32() self.offsets.append(self.offsets[-1] + fSize) for _ in range(fSize): self._data.append(stream.read_uint8())
[docs] def read(self, stream): if self.with_header: stream.skip_fNBytes() stream.skip_fVersion() self.read_body(stream)
[docs] def read_many(self, stream, count): if count == 0: return 0 elif count < 0: assert ( self.with_header ), f"STLStringReader({self.name}).read_many called with negative count expecting with_header=True" fNBytes = stream.read_fNBytes() end_pos = stream.cursor + fNBytes stream.skip_fVersion() cur_count = 0 while stream.cursor < end_pos: self.read_body(stream) cur_count += 1 return cur_count else: if self.with_header: stream.skip_fNBytes() stream.skip_fVersion() for _ in range(count): self.read_body(stream) return count
[docs] def read_until(self, stream, end_pos): if stream.cursor == end_pos: return 0 if self.with_header: stream.skip_fNBytes() stream.skip_fVersion() count = 0 while stream.cursor < end_pos: self.read_body(stream) count += 1 return count
[docs] def data(self): data_array = np.asarray(self._data) offsets_array = np.asarray(self.offsets) return offsets_array, data_array
[docs] class TArrayReader(IReader): def __init__( self, name: str, dtype: Literal["int8", "int16", "int32", "int64", "float32", "float64"], ): super().__init__(name) self.dtype = dtype self.typecode = DTYPE_TO_TYPECODE[dtype] self._data = array(self.typecode) self.offsets = array("q", [0]) self.buffer_reader = DTYPE_TO_READER[dtype]
[docs] def read(self, stream): fSize = stream.read_uint32() self.offsets.append(self.offsets[-1] + fSize) for _ in range(fSize): self._data.append(self.buffer_reader(stream))
[docs] def data(self): offsets_array = np.asarray(self.offsets) data_array = np.asarray(self._data) return offsets_array, data_array
[docs] class GroupReader(IReader): def __init__(self, name: str, element_readers: list[IReader]): super().__init__(name) self.element_readers = element_readers
[docs] def read(self, stream): for reader in self.element_readers: debug_print(f"GroupReader({self.name}) reading element {reader.name}:\n") debug_print(stream) reader.read(stream)
[docs] def read_many_memberwise(self, stream, count): assert ( count >= 0 ), f"Calling {self.name}.read_many_memberwise with negative count: {count} is not allowed" for reader in self.element_readers: debug_print( f"GroupReader{self.name} reading many member-wise element {reader.name}:\n" ) debug_print(stream) reader.read_many(stream, count) return count
[docs] def data(self): return [reader.data() for reader in self.element_readers]
[docs] class AnyClassReader(IReader): def __init__(self, name: str, element_readers: list[IReader]): super().__init__(name) self.element_readers = element_readers
[docs] def read(self, stream: BinaryStream): fNBytes = stream.read_fNBytes() start_pos = stream.cursor end_pos = start_pos + fNBytes stream.skip_fVersion() for reader in self.element_readers: debug_print(f"AnyClassReader({self.name}) reading element {reader.name}:\n") debug_print(stream) reader.read(stream) assert stream.cursor == end_pos, ( f"AnyClassReader({self.name}): Invalid read length! Expect {fNBytes} bytes, " f"but read {stream.cursor - start_pos} bytes." )
[docs] def read_many_memberwise(self, stream, count): assert ( count >= 0 ), f"Calling {self.name}.read_many_memberwise with negative count: {count} is not allowed" for reader in self.element_readers: debug_print( f"AnyClassReader{self.name} reading many member-wise element {reader.name}:\n" ) debug_print(stream) reader.read_many(stream, count) return count
[docs] def data(self): return [reader.data() for reader in self.element_readers]
[docs] class AnyPointerReader(IReader): """ Reads objects stored via ROOT's read_object_any protocol. This handles both: - True pointers (kObjectP=64, kAnyP=69): full read_object_any with null/reference support - Object headers (kNewClassTag + classname before object data) Binary format (read_object_any): 1. bcnt (uint32) — if kByteCountMask set: versioned, then read tag (uint32) otherwise: unversioned, tag = bcnt 2. tag: - tag == 0: null pointer - tag & kClassMask == 0 (nonzero): reference to previously-read object (skip bytes) - tag == kNewClassTag: null-terminated classname + object data - tag & kClassMask != 0: reference to known class, new object data """ def __init__(self, name: str, element_reader: IReader): super().__init__(name) self.element_reader = element_reader self.object_counter = 0 # Counter for assigning object indexes to true pointers self.object_indexes = array("q") self.class_name = None
[docs] def read(self, stream): start_pos = stream.cursor ref_begin = start_pos + stream.initial_cursor_position fNBytes = stream.read_uint32() if (fNBytes & kByteCountMask) == 0 or fNBytes == kNewClassTag: fVersion = 0 ref_begin = 0 fTag = fNBytes fNBytes = 0 else: fVersion = 1 fTag = stream.read_uint32() fNBytes &= ~kByteCountMask end_pos = start_pos + 4 + fNBytes if fVersion > 0 else start_pos + 4 if fTag & kClassMask == 0: if fTag == 0: assert ( stream.cursor == end_pos ), f"AnyPointerReader({self.name}): Invalid read length! Expect to read 0 bytes, but read {stream.cursor - start_pos} bytes." self.object_indexes.append(-1) # Use -1 to indicate null pointer return elif fTag == 1: raise NotImplementedError("AnyPointerReader: kAnyP (tag=1) is not supported") elif fTag not in stream.refs: stream.cursor = end_pos # Skip unknown reference return else: ref_idx = stream.refs[fTag].object_index self.object_indexes.append(ref_idx) assert ( stream.cursor == end_pos ), f"AnyPointerReader({self.name}): Invalid read length! Expect to read 0 bytes, but read {stream.cursor - start_pos} bytes." return elif fTag == kNewClassTag: class_name = stream.read_null_terminated_string() if self.class_name is None: self.class_name = class_name else: assert self.class_name == class_name, ( f"AnyPointerReader({self.name}): Inconsistent class names for multiple objects! " f"Expected {self.class_name}, but got {class_name}" ) ref_key = ref_begin + kMapOffset if fVersion > 0 else len(stream.refs) + 1 stream.refs[ref_key] = _Reference(type="class", class_name=class_name) self.element_reader.read(stream) self.object_indexes.append(self.object_counter) ref_key = ref_begin + kMapOffset if fVersion > 0 else len(stream.refs) + 1 stream.refs[ref_key] = _Reference(type="object", object_index=self.object_counter) self.object_counter += 1 else: # reference class, new object cls_ref_key = fTag & (~kClassMask) if cls_ref_key in stream.refs: class_name = stream.refs[cls_ref_key].class_name assert class_name == self.class_name, ( f"AnyPointerReader({self.name}): Inconsistent class names for multiple objects! " f"Expected {self.class_name}, but got {class_name}" ) self.element_reader.read(stream) self.object_indexes.append(self.object_counter) obj_ref_key = ref_begin + kMapOffset if fVersion > 0 else len(stream.refs) + 1 stream.refs[obj_ref_key] = _Reference( type="object", object_index=self.object_counter ) self.object_counter += 1 assert ( stream.cursor == end_pos ), f"AnyPointerReader({self.name}): Invalid read length! Expect to read 0 bytes, but read {stream.cursor - start_pos} bytes." return
[docs] def data(self): element_data = self.element_reader.data() object_indexes_array = np.asarray(self.object_indexes) return element_data, object_indexes_array
[docs] class ObjectHeaderReader(IReader): def __init__(self, name: str, element_reader: IReader): import warnings warnings.warn( "ObjectHeaderReader is deprecated and will be removed in a future version. " "Use BinaryStream.read_obj_header() directly in your reader instead.", DeprecationWarning, ) super().__init__(name) self.element_reader = element_reader
[docs] def read(self, stream): fNBytes = stream.read_fNBytes() start_pos = stream.cursor end_pos = stream.cursor + fNBytes fTag = stream.read_int32() if fTag == kNewClassTag: stream.skip_null_terminated_string() self.element_reader.read(stream) assert stream.cursor == end_pos, ( f"ObjectHeaderReader({self.name}): Invalid read length! Expect {fNBytes} bytes, " f"but read {stream.cursor - start_pos} bytes." )
[docs] def data(self): return self.element_reader.data()
[docs] class CStyleArrayReader(IReader): def __init__(self, name: str, flat_size: int, element_reader: IReader): super().__init__(name) self.flat_size = flat_size self.element_reader = element_reader self.offsets = array("q", [0])
[docs] def read(self, stream): debug_print( f"CStyleArrayReader({self.name}): reading C-style array of flat_size={self.flat_size}\n" ) debug_print(stream) if self.flat_size >= 0: self.element_reader.read_many(stream, self.flat_size) else: entry_offsets = stream.offsets cursor_pos = stream.cursor end_offset_index = (entry_offsets > cursor_pos).nonzero()[0].min() end_pos = entry_offsets[end_offset_index] count = self.element_reader.read_until(stream, end_pos) self.offsets.append(self.offsets[-1] + count) debug_print(f"CStyleArrayReader({self.name}): read {count} elements")
[docs] def read_many(self, stream, count): assert ( self.flat_size >= 0 ), f"Calling CStyleArrayReader({self.name}).read_many with negative flat_size is not allowed" assert ( count >= 0 ), f"Calling CStyleArrayReader({self.name}).read_many with negative count: {count} is not allowed" for _ in range(count): self.element_reader.read_many(stream, self.flat_size) return count
[docs] def read_until(self, stream, end_pos): raise NotImplementedError("CStyleArrayReader.read_until is not supported")
[docs] def data(self): if self.flat_size >= 0: return self.element_reader.data() else: offsets_array = np.asarray(self.offsets) element_data = self.element_reader.data() return offsets_array, element_data
[docs] class EmptyReader(IReader):
[docs] def read(self, stream): pass
[docs] def data(self): return None
[docs] def read_data( data: NDArray[np.uint8], offsets: NDArray[np.uint32], cursor_offset: int, reader: IReader, ): stream = BinaryStream(data, offsets, cursor_offset) for i_evt in range(stream.entries): start_pos = stream.cursor reader.read(stream) end_pos = stream.cursor assert end_pos == offsets[i_evt + 1], ( f"read_data: Invalid read length for {reader.name} at entry {i_evt}! Expect " f"{stream.offsets[i_evt + 1]-stream.offsets[i_evt]} bytes, but read {end_pos - start_pos} bytes." ) return reader.data()