from __future__ import annotations
import os
import shutil
import struct
import textwrap
from array import array
from typing import Any, Callable, Literal
import numpy as np
from numpy.typing import NDArray
kNewClassTag = 0xFFFFFFFF
kByteCountMask = 0x40000000
kIsReferenced = 1 << 4
kStreamedMemberwise = 1 << 14
[docs]
def debug_print(*args, **kwargs):
pass
if "UPROOT_DEBUG" in os.environ:
debug_print = print
[docs]
class BinaryBuffer:
def __init__(
self,
data: NDArray[np.uint8],
offsets: NDArray[np.uint32],
repr_nbytes: int = 50,
):
self.data = data
self.offsets = offsets
self.cursor = 0
self.repr_nbytes = repr_nbytes
@property
def entries(self):
return len(self.offsets) - 1
@property
def remaining_data(self):
return self.data[self.cursor :]
[docs]
def read_uint8(self) -> int:
val = struct.unpack_from(">B", self.data, self.cursor)[0]
self.cursor += 1
return val
[docs]
def read_uint16(self) -> int:
val = struct.unpack_from(">H", self.data, self.cursor)[0]
self.cursor += 2
return val
[docs]
def read_uint32(self) -> int:
val = struct.unpack_from(">I", self.data, self.cursor)[0]
self.cursor += 4
return val
[docs]
def read_uint64(self) -> int:
val = struct.unpack_from(">Q", self.data, self.cursor)[0]
self.cursor += 8
return val
[docs]
def read_int8(self) -> int:
val = struct.unpack_from(">b", self.data, self.cursor)[0]
self.cursor += 1
return val
[docs]
def read_int16(self) -> int:
val = struct.unpack_from(">h", self.data, self.cursor)[0]
self.cursor += 2
return val
[docs]
def read_int32(self) -> int:
val = struct.unpack_from(">i", self.data, self.cursor)[0]
self.cursor += 4
return val
[docs]
def read_int64(self) -> int:
val = struct.unpack_from(">q", self.data, self.cursor)[0]
self.cursor += 8
return val
[docs]
def read_float(self) -> float:
val = struct.unpack_from(">f", self.data, self.cursor)[0]
self.cursor += 4
return val
[docs]
def read_double(self) -> float:
val = struct.unpack_from(">d", self.data, self.cursor)[0]
self.cursor += 8
return val
[docs]
def read_bool(self) -> bool:
return bool(self.read_uint8())
[docs]
def read_fNBytes(self) -> np.uint32:
byte_count = self.read_uint32()
assert byte_count & kByteCountMask, f"Invalid byte count: {byte_count}"
return byte_count & (~kByteCountMask)
[docs]
def read_fVersion(self):
return self.read_int16()
[docs]
def read_null_terminated_string(self):
start = self.cursor
while self.data[self.cursor] != 0:
self.cursor += 1
return self.data[start : self.cursor].decode()
[docs]
def read_TString(self):
length = self.read_uint8()
if length == 255:
length = self.read_uint32()
start = self.cursor
self.cursor += length
return self.data[start : self.cursor].decode()
[docs]
def skip(self, n: int):
self.cursor += n
[docs]
def skip_fNBytes(self):
self.read_fNBytes()
[docs]
def skip_fVersion(self):
self.skip(2)
[docs]
def skip_null_terminated_string(self):
while self.data[self.cursor] != 0:
self.cursor += 1
self.cursor += 1 # Skip the null terminator
[docs]
def skip_TObject(self):
self.skip_fVersion()
self.skip(4) # fUniqueID
fBits = self.read_uint32()
if fBits & kIsReferenced:
self.skip(2) # pidf
def __repr__(self):
res = ""
data_view = self.data[self.cursor : self.cursor + self.repr_nbytes]
for i in data_view:
res += f"{i:3d}, "
if len(data_view) < len(self.data[self.cursor :]):
res += "..."
else:
res = res[:-1]
width = 76
try:
width, _ = shutil.get_terminal_size()
width = max(40, width - 4)
except Exception:
# Ignore errors if terminal size cannot be determined; use default width
pass
wrapper = textwrap.TextWrapper(
width=width,
initial_indent="[ ",
subsequent_indent=" ",
break_long_words=False,
break_on_hyphens=False,
drop_whitespace=False,
replace_whitespace=False,
)
return "BinaryBuffer:\n" + wrapper.fill(res) + "]"
[docs]
class IReader:
def __init__(self, name: str):
self.name = name
[docs]
def read(self, buffer: BinaryBuffer) -> None:
raise NotImplementedError
[docs]
def read_many(self, buffer: BinaryBuffer, count: int) -> int:
for _ in range(count):
self.read(buffer)
return count
[docs]
def read_until(self, buffer: BinaryBuffer, end_pos: int) -> int:
count = 0
while buffer.cursor < end_pos:
self.read(buffer)
count += 1
return count
[docs]
def read_many_memberwise(self, buffer: BinaryBuffer, count: int) -> int:
raise NotImplementedError(
f"{self.__class__.__name__}({self.name}).read_many_memberwise is not implemented"
)
[docs]
def data(self) -> Any:
raise NotImplementedError
DTYPE_TO_TYPECODE = {
"uint8": "B",
"uint16": "H",
"uint32": "I",
"uint64": "Q",
"int8": "b",
"int16": "h",
"int32": "i",
"int64": "q",
"float32": "f",
"float64": "d",
"bool": "B",
}
DTYPE_TO_READER: dict[str, Callable[[BinaryBuffer], int]] = {
"uint8": BinaryBuffer.read_uint8,
"uint16": BinaryBuffer.read_uint16,
"uint32": BinaryBuffer.read_uint32,
"uint64": BinaryBuffer.read_uint64,
"int8": BinaryBuffer.read_int8,
"int16": BinaryBuffer.read_int16,
"int32": BinaryBuffer.read_int32,
"int64": BinaryBuffer.read_int64,
"float32": BinaryBuffer.read_float,
"float64": BinaryBuffer.read_double,
"bool": BinaryBuffer.read_bool,
}
[docs]
class PrimitiveReader(IReader):
def __init__(
self,
name: str,
dtype: Literal[
"bool",
"uint8",
"uint16",
"uint32",
"uint64",
"int8",
"int16",
"int32",
"int64",
"float32",
"float64",
],
):
super().__init__(name)
self.dtype = dtype
self.typecode = DTYPE_TO_TYPECODE[dtype]
self._data = array(self.typecode)
self.buffer_reader = DTYPE_TO_READER[dtype]
[docs]
def read(self, buffer):
self._data.append(self.buffer_reader(buffer))
[docs]
def data(self):
return np.asarray(self._data, dtype=self.dtype)
[docs]
class TObjectReader(IReader):
def __init__(self, name: str, keep_data: bool = False):
super().__init__(name)
self.keep_data = keep_data
self.unique_id = array("i")
self.bits = array("I")
self.pidf = array("H")
self.pidf_offsets = array("q", [0])
[docs]
def read(self, buffer):
buffer.skip_fVersion()
fUniqueID = buffer.read_int32()
fBits = buffer.read_uint32()
if fBits & kIsReferenced:
if self.keep_data:
self.pidf.append(buffer.read_uint16())
else:
buffer.skip(2)
if self.keep_data:
self.unique_id.append(fUniqueID)
self.bits.append(fBits)
self.pidf_offsets.append(len(self.pidf))
[docs]
def data(self):
if not self.keep_data:
return None
unique_id_array = np.asarray(self.unique_id)
bits_array = np.asarray(self.bits)
pidf_array = np.asarray(self.pidf)
pidf_offsets_array = np.asarray(self.pidf_offsets)
return unique_id_array, bits_array, pidf_array, pidf_offsets_array
[docs]
class TStringReader(IReader):
def __init__(self, name: str, with_header: bool):
super().__init__(name)
self.with_header = with_header
self._data = array("B")
self.offsets = array("q", [0])
[docs]
def read(self, buffer):
fSize = buffer.read_uint8()
if fSize == 255:
fSize = buffer.read_uint32()
for _ in range(fSize):
self._data.append(buffer.read_uint8())
self.offsets.append(len(self._data))
[docs]
def read_many(self, buffer, count):
assert (
count >= 0
), f"Calling {self.name}.read_many with negative count: {count} is not allowed"
if count == 0:
return 0
if self.with_header:
buffer.skip_fNBytes()
buffer.skip_fVersion()
for _ in range(count):
self.read(buffer)
return count
[docs]
def read_until(self, buffer, end_pos):
if buffer.cursor == end_pos:
return 0
if self.with_header:
buffer.skip_fNBytes()
buffer.skip_fVersion()
count = 0
while buffer.cursor < end_pos:
self.read(buffer)
count += 1
return count
[docs]
def data(self):
data_array = np.asarray(self._data)
offsets_array = np.asarray(self.offsets)
return offsets_array, data_array
[docs]
class STLSeqReader(IReader):
def __init__(
self,
name: str,
with_header: bool,
objwise_or_memberwise: Literal["auto", "obj-wise", "member-wise"],
element_reader: IReader,
):
super().__init__(name)
self.with_header = with_header
self.objwise_or_memberwise = objwise_or_memberwise
self.element_reader = element_reader
self.offsets = array("q", [0])
[docs]
def check_objwise_memberwise(self, is_memberwise: bool):
if self.objwise_or_memberwise == "obj-wise" and is_memberwise:
raise ValueError(
f"STLSeqReader({self.name}) expected obj-wise reading but got member-wise"
)
if self.objwise_or_memberwise == "member-wise" and not is_memberwise:
raise ValueError(
f"STLSeqReader({self.name}) expected member-wise reading but got obj-wise"
)
[docs]
def read_body(self, buffer: BinaryBuffer, is_memberwise: bool):
fSize = buffer.read_uint32()
self.offsets.append(self.offsets[-1] + fSize)
debug_print(
f"STLSeqReader({self.name}): reading body, is_memberwise={is_memberwise}, fSize={fSize}\n"
)
debug_print(buffer)
if is_memberwise:
self.element_reader.read_many_memberwise(buffer, fSize)
else:
self.element_reader.read_many(buffer, fSize)
[docs]
def read(self, buffer):
buffer.skip_fNBytes()
fVersion = buffer.read_fVersion()
is_memberwise = bool(fVersion & kStreamedMemberwise)
self.check_objwise_memberwise(is_memberwise)
if is_memberwise:
buffer.skip(2)
self.read_body(buffer, is_memberwise)
[docs]
def read_many(self, buffer, count):
if count == 0:
return 0
elif count < 0:
assert (
self.with_header
), f"STLSeqReader({self.name}).read_many called with negative count expects with_header=True"
fNBytes = buffer.read_fNBytes()
end_pos = buffer.cursor + fNBytes
fVersion = buffer.read_fVersion()
is_memberwise = bool(fVersion & kStreamedMemberwise)
self.check_objwise_memberwise(is_memberwise)
if is_memberwise:
buffer.skip(2)
cur_count = 0
while buffer.cursor < end_pos:
self.read_body(buffer, is_memberwise)
cur_count += 1
return cur_count
else:
is_memberwise = self.objwise_or_memberwise == "member-wise"
if self.with_header:
buffer.skip_fNBytes()
fVersion = buffer.read_fVersion()
is_memberwise = bool(fVersion & kStreamedMemberwise)
self.check_objwise_memberwise(is_memberwise)
if is_memberwise:
buffer.skip(2)
for _ in range(count):
self.read_body(buffer, is_memberwise)
return count
[docs]
def read_until(self, buffer, end_pos):
if buffer.cursor == end_pos:
return 0
is_membersie = self.objwise_or_memberwise == "member-wise"
if self.with_header:
buffer.skip_fNBytes()
fVersion = buffer.read_fVersion()
is_membersie = bool(fVersion & kStreamedMemberwise)
self.check_objwise_memberwise(is_membersie)
if is_membersie:
buffer.skip(2)
count = 0
while buffer.cursor < end_pos:
self.read_body(buffer, is_membersie)
count += 1
return count
[docs]
def data(self):
offsets_array = np.asarray(self.offsets)
element_data = self.element_reader.data()
return offsets_array, element_data
[docs]
class STLMapReader(IReader):
def __init__(
self,
name: str,
with_header: bool,
objwise_or_memberwise: Literal["auto", "obj-wise", "member-wise"],
key_reader: IReader,
value_reader: IReader,
):
super().__init__(name)
self.with_header = with_header
self.objwise_or_memberwise = objwise_or_memberwise
self.key_reader = key_reader
self.value_reader = value_reader
self.offsets = array("q", [0])
[docs]
def check_objwise_memberwise(self, is_memberwise: bool):
if self.objwise_or_memberwise == "obj-wise" and is_memberwise:
raise ValueError(
f"STLMapReader({self.name}) expected obj-wise reading but got member-wise"
)
if self.objwise_or_memberwise == "member-wise" and not is_memberwise:
raise ValueError(
f"STLMapReader({self.name}) expected member-wise reading but got obj-wise"
)
[docs]
def read_body(self, buffer: BinaryBuffer, is_memberwise: bool):
fSize = buffer.read_uint32()
self.offsets.append(self.offsets[-1] + fSize)
debug_print(
f"STLMapReader({self.name}): reading body, is_memberwise={is_memberwise}, fSize={fSize}\n"
)
debug_print(buffer)
if is_memberwise:
self.key_reader.read_many(buffer, fSize)
self.value_reader.read_many(buffer, fSize)
else:
for _ in range(fSize):
self.key_reader.read(buffer)
self.value_reader.read(buffer)
[docs]
def read(self, buffer):
buffer.skip_fNBytes()
fVersion = buffer.read_fVersion()
buffer.skip(6)
is_memberwise = bool(fVersion & kStreamedMemberwise)
self.check_objwise_memberwise(is_memberwise)
self.read_body(buffer, is_memberwise)
[docs]
def read_many(self, buffer, count):
if count == 0:
return 0
elif count < 0:
assert (
self.with_header
), f"STLMapReader({self.name}).read_many called with negative count expecting with_header=True"
fNBytes = buffer.read_fNBytes()
end_pos = buffer.cursor + fNBytes
fVersion = buffer.read_fVersion()
buffer.skip(6)
is_memberwise = bool(fVersion & kStreamedMemberwise)
self.check_objwise_memberwise(is_memberwise)
cur_count = 0
while buffer.cursor < end_pos:
self.read_body(buffer, is_memberwise)
cur_count += 1
return cur_count
else:
is_memberwise = self.objwise_or_memberwise == "member-wise"
if self.with_header:
buffer.skip_fNBytes()
fVersion = buffer.read_fVersion()
buffer.skip(6)
is_memberwise = bool(fVersion & kStreamedMemberwise)
self.check_objwise_memberwise(is_memberwise)
for _ in range(count):
self.read_body(buffer, is_memberwise)
return count
[docs]
def read_until(self, buffer, end_pos):
if buffer.cursor == end_pos:
return 0
is_membersie = self.objwise_or_memberwise == "member-wise"
if self.with_header:
buffer.skip_fNBytes()
fVersion = buffer.read_fVersion()
buffer.skip(6)
is_membersie = bool(fVersion & kStreamedMemberwise)
self.check_objwise_memberwise(is_membersie)
count = 0
while buffer.cursor < end_pos:
self.read_body(buffer, is_membersie)
count += 1
return count
[docs]
def read_many_memberwise(self, buffer, count):
assert (
count >= 0
), f"Calling {self.name}.read_many_memberwise with negative count: {count} is not allowed"
is_memberwise = True
self.check_objwise_memberwise(is_memberwise)
return self.read_many(buffer, count)
[docs]
def data(self):
offsets_array = np.asarray(self.offsets)
key_data = self.key_reader.data()
value_data = self.value_reader.data()
return offsets_array, key_data, value_data
[docs]
class STLStringReader(IReader):
def __init__(self, name: str, with_header: bool):
super().__init__(name)
self.with_header = with_header
self._data = array("B")
self.offsets = array("q", [0])
[docs]
def read_body(self, buffer: BinaryBuffer):
fSize = buffer.read_uint8()
if fSize == 255:
fSize = buffer.read_uint32()
self.offsets.append(self.offsets[-1] + fSize)
for _ in range(fSize):
self._data.append(buffer.read_uint8())
[docs]
def read(self, buffer):
if self.with_header:
buffer.skip_fNBytes()
buffer.skip_fVersion()
self.read_body(buffer)
[docs]
def read_many(self, buffer, count):
if count == 0:
return 0
elif count < 0:
assert (
self.with_header
), f"STLStringReader({self.name}).read_many called with negative count expecting with_header=True"
fNBytes = buffer.read_fNBytes()
end_pos = buffer.cursor + fNBytes
buffer.skip_fVersion()
cur_count = 0
while buffer.cursor < end_pos:
self.read_body(buffer)
cur_count += 1
return cur_count
else:
if self.with_header:
buffer.skip_fNBytes()
buffer.skip_fVersion()
for _ in range(count):
self.read_body(buffer)
return count
[docs]
def read_until(self, buffer, end_pos):
if buffer.cursor == end_pos:
return 0
if self.with_header:
buffer.skip_fNBytes()
buffer.skip_fVersion()
count = 0
while buffer.cursor < end_pos:
self.read_body(buffer)
count += 1
return count
[docs]
def data(self):
data_array = np.asarray(self._data)
offsets_array = np.asarray(self.offsets)
return offsets_array, data_array
[docs]
class TArrayReader(IReader):
def __init__(
self,
name: str,
dtype: Literal["int8", "int16", "int32", "int64", "float32", "float64"],
):
super().__init__(name)
self.dtype = dtype
self.typecode = DTYPE_TO_TYPECODE[dtype]
self._data = array(self.typecode)
self.offsets = array("q", [0])
self.buffer_reader = DTYPE_TO_READER[dtype]
[docs]
def read(self, buffer):
fSize = buffer.read_uint32()
self.offsets.append(self.offsets[-1] + fSize)
for _ in range(fSize):
self._data.append(self.buffer_reader(buffer))
[docs]
def data(self):
offsets_array = np.asarray(self.offsets)
data_array = np.asarray(self._data)
return offsets_array, data_array
[docs]
class GroupReader(IReader):
def __init__(self, name: str, element_readers: list[IReader]):
super().__init__(name)
self.element_readers = element_readers
[docs]
def read(self, buffer):
for reader in self.element_readers:
debug_print(f"GroupReader({self.name}) reading element {reader.name}:\n")
debug_print(buffer)
reader.read(buffer)
[docs]
def read_many_memberwise(self, buffer, count):
assert (
count >= 0
), f"Calling {self.name}.read_many_memberwise with negative count: {count} is not allowed"
for reader in self.element_readers:
debug_print(
f"GroupReader{self.name} reading many member-wise element {reader.name}:\n"
)
debug_print(buffer)
reader.read_many(buffer, count)
return count
[docs]
def data(self):
return [reader.data() for reader in self.element_readers]
[docs]
class AnyClassReader(IReader):
def __init__(self, name: str, element_readers: list[IReader]):
super().__init__(name)
self.element_readers = element_readers
[docs]
def read(self, buffer: BinaryBuffer):
fNBytes = buffer.read_fNBytes()
start_pos = buffer.cursor
end_pos = start_pos + fNBytes
buffer.skip_fVersion()
for reader in self.element_readers:
debug_print(f"AnyClassReader({self.name}) reading element {reader.name}:\n")
debug_print(buffer)
reader.read(buffer)
assert buffer.cursor == end_pos, (
f"AnyClassReader({self.name}): Invalid read length! Expect {fNBytes} bytes, "
f"but read {buffer.cursor - start_pos} bytes."
)
[docs]
def read_many_memberwise(self, buffer, count):
assert (
count >= 0
), f"Calling {self.name}.read_many_memberwise with negative count: {count} is not allowed"
for reader in self.element_readers:
debug_print(
f"AnyClassReader{self.name} reading many member-wise element {reader.name}:\n"
)
debug_print(buffer)
reader.read_many(buffer, count)
return count
[docs]
def data(self):
return [reader.data() for reader in self.element_readers]
[docs]
class CStyleArrayReader(IReader):
def __init__(self, name: str, flat_size: int, element_reader: IReader):
super().__init__(name)
self.flat_size = flat_size
self.element_reader = element_reader
self.offsets = array("q", [0])
[docs]
def read(self, buffer):
debug_print(
f"CStyleArrayReader({self.name}): reading C-style array of flat_size={self.flat_size}\n"
)
debug_print(buffer)
if self.flat_size >= 0:
self.element_reader.read_many(buffer, self.flat_size)
else:
entry_offsets = buffer.offsets
cursor_pos = buffer.cursor
end_offset_index = (entry_offsets > cursor_pos).nonzero()[0].min()
end_pos = entry_offsets[end_offset_index]
count = self.element_reader.read_until(buffer, end_pos)
self.offsets.append(self.offsets[-1] + count)
debug_print(f"CStyleArrayReader({self.name}): read {count} elements")
[docs]
def read_many(self, buffer, count):
assert (
self.flat_size >= 0
), f"Calling CStyleArrayReader({self.name}).read_many with negative flat_size is not allowed"
assert (
count >= 0
), f"Calling CStyleArrayReader({self.name}).read_many with negative count: {count} is not allowed"
for _ in range(count):
self.element_reader.read_many(buffer, self.flat_size)
return count
[docs]
def read_until(self, buffer, end_pos):
raise NotImplementedError("CStyleArrayReader.read_until is not supported")
[docs]
def data(self):
if self.flat_size >= 0:
return self.element_reader.data()
else:
offsets_array = np.asarray(self.offsets)
element_data = self.element_reader.data()
return offsets_array, element_data
[docs]
class EmptyReader(IReader):
[docs]
def read(self, buffer):
pass
[docs]
def data(self):
return None
[docs]
def read_data(data: NDArray[np.uint8], offsets: NDArray[np.uint32], reader: IReader):
buffer = BinaryBuffer(data, offsets)
for i_evt in range(buffer.entries):
start_pos = buffer.cursor
reader.read(buffer)
end_pos = buffer.cursor
assert end_pos == offsets[i_evt + 1], (
f"read_data: Invalid read length for {reader.name} at entry {i_evt}! Expect "
f"{buffer.offsets[i_evt + 1]-buffer.offsets[i_evt]} bytes, but read {end_pos - start_pos} bytes."
)
return reader.data()