from __future__ import annotations
import warnings
from typing import Any, Literal, Union
import awkward as ak
import awkward.contents
import awkward.forms
import awkward.index
import numpy as np
import uproot
from uproot_custom.utils import (
get_dims_from_branch,
get_map_key_val_typenames,
get_sequence_element_typename,
get_top_type_name,
)
import uproot_custom.readers.cpp
import uproot_custom.readers.python
import uproot_custom.readers._forth
try:
import uproot_custom.readers._numba
except ImportError:
pass
registered_factories: set[type["Factory"]] = set()
reader_backend: Literal["cpp", "python", "forth", "numba"] = "cpp"
def _objwise_or_memberwise_to_text(
objwise_or_memberwise: Literal[-1, 0, 1],
) -> Literal["auto", "obj-wise", "member-wise"]:
return {
-1: "auto",
0: "obj-wise",
1: "member-wise",
}[objwise_or_memberwise]
[docs]
def build_factory(
cur_streamer_info: dict,
all_streamer_info: dict,
item_path: str = "",
**kwargs,
) -> "Factory":
"""
Generate factory with a given streamer information.
Args:
cur_streamer_info (dict): Streamer information of current item.
all_streamer_info (dict): All streamer information.
item_path (str): Path to the item.
Returns:
An instance of `Factory`.
"""
fName = cur_streamer_info["fName"]
top_type_name = (
get_top_type_name(cur_streamer_info["fTypeName"])
if "fTypeName" in cur_streamer_info
else None
)
if not kwargs.get("called_from_top", False):
item_path = f"{item_path}.{fName}"
for factory_class in sorted(
registered_factories, key=lambda x: x.priority(), reverse=True
):
factory_instance = factory_class.build_factory(
top_type_name,
cur_streamer_info,
all_streamer_info,
item_path,
**kwargs,
)
if factory_instance is not None:
return factory_instance
raise ValueError(f"Unknown type: {cur_streamer_info['fTypeName']} for {item_path}")
[docs]
def read_branch(
branch: uproot.TBranch,
data: np.ndarray[np.uint8],
offsets: np.ndarray,
cur_streamer_info: dict,
all_streamer_info: dict[str, list[dict]],
item_path: str = "",
):
factory = build_factory(
cur_streamer_info,
all_streamer_info,
item_path,
called_from_top=True,
branch=branch,
)
if offsets is None:
nbyte = cur_streamer_info["fSize"]
offsets = np.arange(data.size // nbyte + 1, dtype=np.uint32) * nbyte
if reader_backend == "cpp":
reader = factory.build_cpp_reader()
raw_data = uproot_custom.readers.cpp.read_data(data, offsets, reader)
elif reader_backend == "python":
reader = factory.build_python_reader()
raw_data = uproot_custom.readers.python.read_data(data, offsets, reader)
elif reader_backend == "forth":
warnings.warn(
'"forth" reader is only for testing and benchmarking. It is not recommended for production use.',
UserWarning,
)
buffer_holder = uproot_custom.readers._forth.BufferHolder()
reader = factory.build_forth_reader(buffer_holder)
raw_data = uproot_custom.readers._forth.read_data(data, offsets, reader)
elif reader_backend == "numba":
warnings.warn(
'"numba" reader is only for testing and benchmarking. It is not recommended for production use.',
UserWarning,
)
ctx = uproot_custom.readers._numba.CompilationContext()
reader = factory.build_numba_reader(ctx)
raw_data = uproot_custom.readers._numba.read_data(
data, offsets, reader, id(branch), ctx
)
else:
raise ValueError(f"Unknown reader backend: {reader_backend}.")
return factory.make_awkward_content(raw_data)
[docs]
class Factory:
"""
Base class of reader factories. Reader factory is in charge of
generating reader configuration tree, build an combine C++ reader
and reconstruct raw array from C++ reader into structured awkward
array.
"""
[docs]
@classmethod
def priority(cls) -> int:
"""
Return the call priority of this factory. Factories with higher
priority will be called first.
"""
return 10
[docs]
@classmethod
def build_factory(
cls,
top_type_name: str,
cur_streamer_info: dict,
all_streamer_info: dict,
item_path: str,
**kwargs,
) -> Union[None, Factory]:
"""
Return an instance of this factory when current item matches this factory,
otherwise return `None`.
Args:
top_type_name (str): Name of the top-level class of current item.
For example, `vector<int>` -> `vector`.
cur_streamer_info (dict): Streamer information of current item.
all_streamer_info (dict): Dictionary storing streamer information
of all types. The key is the classname, pair is a dictionary
like `cur_streamer_info`.
item_path (str): Indicating which item is being matched. One can
use this variable to apply specific behavior.
Returns:
A dictionary containing all necessary information of building
C++ reader and reconstruct raw data to awkward array for current
item.
"""
return None
def __init__(self, name: str):
self.name = name
[docs]
def build_cpp_reader(self) -> uproot_custom.readers.cpp.IReader:
"""
Build concrete C++ reader.
Returns:
An instance of `uproot_custom.readers.cpp.IReader`.
"""
raise NotImplementedError("build_cpp_reader not implemented.")
[docs]
def build_python_reader(self) -> uproot_custom.readers.python.IReader:
"""
Build concrete Python reader.
Returns:
An instance of `uproot_custom.python.IReader`.
"""
raise NotImplementedError("build_python_reader not implemented.")
[docs]
def build_forth_reader(
self,
buffer_holder: uproot_custom.readers._forth.BufferHolder,
) -> uproot_custom.readers._forth.IReader:
"""
Build concrete Forth reader.
Args:
buffer_holder: An instance of `BufferHolder` to register buffers
Returns:
An instance of `uproot_custom.readers.forth.IReader`.
"""
raise NotImplementedError("build_forth_reader not implemented.")
[docs]
def build_numba_reader(
self,
ctx: uproot_custom.readers._numba.CompilationContext,
):
"""
Build concrete Numba reader.
Args:
ctx: An instance of `CompilationContext` to register buffers and store other compilation information.
Returns:
An instance of `uproot_custom.readers.numba.IReader`.
"""
raise NotImplementedError("build_numba_reader not implemented.")
[docs]
def make_awkward_content(
self,
raw_data: Any,
) -> awkward.contents.Content:
"""
Reconstruct awkward contents with raw data returned from the C++ reader.
Args:
raw_data: Data returned from C++ reader.
Returns:
awkward.contents.Content: Awkward content to build corresponding array.
"""
raise NotImplementedError("reconstruct_array not implemented.")
[docs]
class PrimitiveFactory(Factory):
typename2dtype = {
# builtin
"bool": "bool",
"char": "int8",
"short": "int16",
"int": "int32",
"long": "int64",
"long long": "int64",
"signed char": "int8",
"signed short": "int16",
"signed int": "int32",
"signed long": "int64",
"signed long long": "int64",
"unsigned char": "uint8",
"unsigned short": "uint16",
"unsigned int": "uint32",
"unsigned long": "uint64",
"unsigned long long": "uint64",
"float": "float32",
"double": "float64",
# cstdint
"int8_t": "int8",
"int16_t": "int16",
"int32_t": "int32",
"int64_t": "int64",
"uint8_t": "uint8",
"uint16_t": "uint16",
"uint32_t": "uint32",
"uint64_t": "uint64",
# ROOT types
"Bool_t": "bool",
"Char_t": "int8",
"Short_t": "int16",
"Int_t": "int32",
"Long_t": "int64",
"UChar_t": "uint8",
"UShort_t": "uint16",
"UInt_t": "uint32",
"ULong_t": "uint64",
"Float_t": "float32",
"Double_t": "float64",
}
ftype2dtype = {
1: "int8",
2: "int16",
3: "int32",
4: "int64",
5: "float32",
8: "float64",
11: "uint8",
12: "uint16",
13: "uint32",
14: "uint64",
16: "int64", # long long
17: "uint64", # unsigned long long
18: "bool",
}
cpp_reader_map = {
"bool": uproot_custom.readers.cpp.UInt8Reader,
"int8": uproot_custom.readers.cpp.Int8Reader,
"int16": uproot_custom.readers.cpp.Int16Reader,
"int32": uproot_custom.readers.cpp.Int32Reader,
"int64": uproot_custom.readers.cpp.Int64Reader,
"uint8": uproot_custom.readers.cpp.UInt8Reader,
"uint16": uproot_custom.readers.cpp.UInt16Reader,
"uint32": uproot_custom.readers.cpp.UInt32Reader,
"uint64": uproot_custom.readers.cpp.UInt64Reader,
"float32": uproot_custom.readers.cpp.FloatReader,
"float64": uproot_custom.readers.cpp.DoubleReader,
}
[docs]
@classmethod
def build_factory(
cls,
top_type_name,
cur_streamer_info,
all_streamer_info,
item_path,
**kwargs,
):
"""
Return when `top_type_name` is primitive type.
"""
dtype = cls.ftype2dtype.get(cur_streamer_info.get("fType", -1), None)
if dtype is None:
# Match typename when fType is not available. This is for handling
# types like vector<int>
dtype = cls.typename2dtype.get(top_type_name, None)
if dtype is None:
return None
return cls(name=cur_streamer_info["fName"], dtype=dtype)
def __init__(self, name: str, dtype: str):
self.name = name
self.dtype = dtype
[docs]
def build_cpp_reader(self):
return self.cpp_reader_map[self.dtype](self.name)
[docs]
def build_python_reader(self):
return uproot_custom.readers.python.PrimitiveReader(self.name, self.dtype)
[docs]
def build_forth_reader(
self,
buffer_holder: uproot_custom.readers._forth.BufferHolder,
):
return uproot_custom.readers._forth.PrimitiveReader(
self.name, self.dtype, buffer_holder
)
[docs]
def build_numba_reader(
self,
ctx: uproot_custom.readers._numba.CompilationContext,
):
return uproot_custom.readers._numba.PrimitiveReader(self.name, ctx, self.dtype)
[docs]
def make_awkward_content(self, raw_data: np.ndarray):
if self.dtype == "bool":
raw_data = raw_data.astype(np.bool_)
return ak.contents.NumpyArray(raw_data)
stl_typenames = {
"vector",
"array",
"string",
"list",
"set",
"multiset",
"unordered_set",
"unordered_multiset",
"map",
"multimap",
"unordered_map",
"unordered_multimap",
}
[docs]
class STLSeqFactory(Factory):
"""
This factory reads sequence-like STL containers.
"""
target_types = [
"vector",
"array",
"list",
"set",
"multiset",
"unordered_set",
"unordered_multiset",
]
[docs]
@classmethod
def build_factory(
cls,
top_type_name,
cur_streamer_info,
all_streamer_info,
item_path,
**kwargs,
):
"""
Return when `top_type_name` is in `cls.target_types`.
"""
if top_type_name not in cls.target_types:
return None
fName = cur_streamer_info["fName"]
fTypeName = cur_streamer_info["fTypeName"]
element_type = get_sequence_element_typename(fTypeName)
element_info = {
"fName": fName,
"fTypeName": element_type,
}
element_factory = build_factory(
element_info,
all_streamer_info,
item_path,
)
if isinstance(element_factory, (STLSeqFactory, STLMapFactory, STLStringFactory)):
element_factory.with_header = False
return cls(
name=fName,
with_header=True,
objwise_or_memberwise=-1,
element_factory=element_factory,
)
def __init__(
self,
name: str,
with_header: bool,
objwise_or_memberwise: Literal[-1, 0, 1],
element_factory: Factory,
):
self.name = name
self.with_header = with_header
self.objwise_or_memberwise = objwise_or_memberwise
self.element_factory = element_factory
[docs]
def build_cpp_reader(self):
element_reader = self.element_factory.build_cpp_reader()
return uproot_custom.readers.cpp.STLSeqReader(
self.name,
self.with_header,
self.objwise_or_memberwise,
element_reader,
)
[docs]
def build_python_reader(self):
objwise_or_memberwise = _objwise_or_memberwise_to_text(self.objwise_or_memberwise)
element_reader = self.element_factory.build_python_reader()
return uproot_custom.readers.python.STLSeqReader(
self.name,
self.with_header,
objwise_or_memberwise,
element_reader,
)
[docs]
def build_forth_reader(
self,
buffer_holder: uproot_custom.readers._forth.BufferHolder,
):
objwise_or_memberwise = _objwise_or_memberwise_to_text(self.objwise_or_memberwise)
element_reader = self.element_factory.build_forth_reader(buffer_holder)
return uproot_custom.readers._forth.STLSeqReader(
self.name,
self.with_header,
objwise_or_memberwise,
element_reader,
buffer_holder,
)
[docs]
def build_numba_reader(
self,
ctx: uproot_custom.readers._numba.CompilationContext,
):
objwise_or_memberwise = _objwise_or_memberwise_to_text(self.objwise_or_memberwise)
element_reader = self.element_factory.build_numba_reader(ctx)
return uproot_custom.readers._numba.STLSeqReader(
self.name,
ctx,
self.with_header,
objwise_or_memberwise,
element_reader,
)
[docs]
def make_awkward_content(self, raw_data):
offsets, element_raw_data = raw_data
element_content = self.element_factory.make_awkward_content(element_raw_data)
return ak.contents.ListOffsetArray(
ak.index.Index64(offsets),
element_content,
)
[docs]
class STLMapFactory(Factory):
"""
This class reads mapping-like STL containers.
"""
target_types = ["map", "unordered_map", "multimap", "unordered_multimap"]
[docs]
@classmethod
def build_factory(
cls,
top_type_name,
cur_streamer_info,
all_streamer_info,
item_path,
**kwargs,
):
"""
Return when `top_type_name` is in `cls.target_types`.
"""
if top_type_name not in cls.target_types:
return None
fTypeName = cur_streamer_info["fTypeName"]
key_type_name, val_type_name = get_map_key_val_typenames(fTypeName)
fName = cur_streamer_info["fName"]
key_info = {
"fName": "key",
"fTypeName": key_type_name,
}
val_info = {
"fName": "val",
"fTypeName": val_type_name,
}
key_factory = build_factory(key_info, all_streamer_info, item_path)
val_factory = build_factory(val_info, all_streamer_info, item_path)
return cls(
name=fName,
with_header=True,
objwise_or_memberwise=-1,
key_factory=key_factory,
val_factory=val_factory,
)
def __init__(
self,
name: str,
with_header: bool,
objwise_or_memberwise: Literal[-1, 0, 1],
key_factory: Factory,
val_factory: Factory,
):
self.name = name
self.with_header = with_header
self.objwise_or_memberwise = objwise_or_memberwise
self.key_factory = key_factory
self.val_factory = val_factory
[docs]
def build_cpp_reader(self):
is_obj_wise = self.objwise_or_memberwise == 0
if is_obj_wise:
self.key_factory.with_header = False
self.val_factory.with_header = False
key_cpp_reader = self.key_factory.build_cpp_reader()
val_cpp_reader = self.val_factory.build_cpp_reader()
return uproot_custom.readers.cpp.STLMapReader(
self.name,
self.with_header,
self.objwise_or_memberwise,
key_cpp_reader,
val_cpp_reader,
)
[docs]
def build_python_reader(self):
is_obj_wise = self.objwise_or_memberwise == 0
if is_obj_wise:
self.key_factory.with_header = False
self.val_factory.with_header = False
objwise_or_memberwise = _objwise_or_memberwise_to_text(self.objwise_or_memberwise)
key_python_reader = self.key_factory.build_python_reader()
val_python_reader = self.val_factory.build_python_reader()
return uproot_custom.readers.python.STLMapReader(
self.name,
self.with_header,
objwise_or_memberwise,
key_python_reader,
val_python_reader,
)
[docs]
def build_forth_reader(
self,
buffer_holder: uproot_custom.readers._forth.BufferHolder,
):
is_obj_wise = self.objwise_or_memberwise == 0
if is_obj_wise:
self.key_factory.with_header = False
self.val_factory.with_header = False
objwise_or_memberwise = _objwise_or_memberwise_to_text(self.objwise_or_memberwise)
key_forth_reader = self.key_factory.build_forth_reader(buffer_holder)
val_forth_reader = self.val_factory.build_forth_reader(buffer_holder)
return uproot_custom.readers._forth.STLMapReader(
self.name,
self.with_header,
objwise_or_memberwise,
key_forth_reader,
val_forth_reader,
buffer_holder,
)
[docs]
def build_numba_reader(
self,
ctx: uproot_custom.readers._numba.CompilationContext,
):
is_obj_wise = self.objwise_or_memberwise == 0
if is_obj_wise:
self.key_factory.with_header = False
self.val_factory.with_header = False
objwise_or_memberwise = _objwise_or_memberwise_to_text(self.objwise_or_memberwise)
key_numba_reader = self.key_factory.build_numba_reader(ctx)
val_numba_reader = self.val_factory.build_numba_reader(ctx)
return uproot_custom.readers._numba.STLMapReader(
self.name,
ctx,
self.with_header,
objwise_or_memberwise,
key_numba_reader,
val_numba_reader,
)
[docs]
def make_awkward_content(self, raw_data):
offsets, key_raw_data, val_raw_data = raw_data
key_content = self.key_factory.make_awkward_content(key_raw_data)
val_content = self.val_factory.make_awkward_content(val_raw_data)
return ak.contents.ListOffsetArray(
ak.index.Index64(offsets),
ak.contents.RecordArray(
[key_content, val_content],
[self.key_factory.name, self.val_factory.name],
),
)
[docs]
class STLStringFactory(Factory):
"""
This class reads std::string.
"""
[docs]
@classmethod
def build_factory(
cls,
top_type_name,
cur_streamer_info,
all_streamer_info,
item_path,
**kwargs,
):
if top_type_name != "string":
return None
return cls(
name=cur_streamer_info["fName"],
with_header=True,
)
def __init__(self, name: str, with_header: bool):
self.name = name
self.with_header = with_header
[docs]
def build_cpp_reader(self):
return uproot_custom.readers.cpp.STLStringReader(
self.name,
self.with_header,
)
[docs]
def build_python_reader(self):
return uproot_custom.readers.python.STLStringReader(
self.name,
self.with_header,
)
[docs]
def build_forth_reader(
self,
buffer_holder: uproot_custom.readers._forth.BufferHolder,
):
return uproot_custom.readers._forth.STLStringReader(
self.name,
self.with_header,
buffer_holder,
)
[docs]
def build_numba_reader(
self,
ctx: uproot_custom.readers._numba.CompilationContext,
):
return uproot_custom.readers._numba.STLStringReader(
self.name,
ctx,
self.with_header,
)
[docs]
def make_awkward_content(self, raw_data):
offsets, data = raw_data
return awkward.contents.ListOffsetArray(
awkward.index.Index64(offsets),
awkward.contents.NumpyArray(data, parameters={"__array__": "char"}),
parameters={"__array__": "string"},
)
[docs]
class TArrayFactory(Factory):
"""
This class reads TArray from a binary paerser.
TArray includes TArrayC, TArrayS, TArrayI, TArrayL, TArrayL64, TArrayF, and TArrayD.
Corresponding dtype is int8, int16, int32, int64, int64, float32, and float64 respectively.
"""
typename2dtype = {
"TArrayC": "int8",
"TArrayS": "int16",
"TArrayI": "int32",
"TArrayL": "int64",
"TArrayL64": "int64",
"TArrayF": "float32",
"TArrayD": "float64",
}
[docs]
@classmethod
def build_factory(
cls,
top_type_name,
cur_streamer_info,
all_streamer_info,
item_path,
**kwargs,
):
"""
Return when `top_type_name` is in `cls.typenames`.
"""
if top_type_name not in cls.typename2dtype:
return None
dtype = cls.typename2dtype[top_type_name]
return cls(name=cur_streamer_info["fName"], dtype=dtype)
def __init__(self, name: str, dtype: str):
super().__init__(name)
self.dtype = dtype
[docs]
def build_cpp_reader(self):
return {
"int8": uproot_custom.readers.cpp.TArrayCReader,
"int16": uproot_custom.readers.cpp.TArraySReader,
"int32": uproot_custom.readers.cpp.TArrayIReader,
"int64": uproot_custom.readers.cpp.TArrayLReader,
"float32": uproot_custom.readers.cpp.TArrayFReader,
"float64": uproot_custom.readers.cpp.TArrayDReader,
}[self.dtype](self.name)
[docs]
def build_python_reader(self):
return uproot_custom.readers.python.TArrayReader(self.name, self.dtype)
[docs]
def build_forth_reader(
self,
buffer_holder: uproot_custom.readers._forth.BufferHolder,
):
return uproot_custom.readers._forth.TArrayReader(self.name, self.dtype, buffer_holder)
[docs]
def build_numba_reader(
self,
ctx: uproot_custom.readers._numba.CompilationContext,
):
return uproot_custom.readers._numba.TArrayReader(self.name, ctx, self.dtype)
[docs]
def make_awkward_content(self, raw_data):
offsets, data = raw_data
return awkward.contents.ListOffsetArray(
awkward.index.Index64(offsets),
awkward.contents.NumpyArray(data),
)
[docs]
class TStringFactory(Factory):
"""
This class reads TString from a binary parser.
"""
[docs]
@classmethod
def build_factory(
cls,
top_type_name,
cur_streamer_info,
all_streamer_info,
item_path,
**kwargs,
):
if top_type_name != "TString":
return None
return cls(
name=cur_streamer_info["fName"],
with_header=False,
)
def __init__(self, name: str, with_header: bool):
super().__init__(name)
self.with_header = with_header
[docs]
def build_cpp_reader(self):
return uproot_custom.readers.cpp.TStringReader(self.name, self.with_header)
[docs]
def build_python_reader(self):
return uproot_custom.readers.python.TStringReader(self.name, self.with_header)
[docs]
def build_forth_reader(
self,
buffer_holder: uproot_custom.readers._forth.BufferHolder,
):
return uproot_custom.readers._forth.TStringReader(
self.name, self.with_header, buffer_holder
)
[docs]
def build_numba_reader(
self,
ctx: uproot_custom.readers._numba.CompilationContext,
):
return uproot_custom.readers._numba.TStringReader(self.name, ctx, self.with_header)
[docs]
def make_awkward_content(self, raw_data):
offsets, data = raw_data
return awkward.contents.ListOffsetArray(
awkward.index.Index64(offsets),
awkward.contents.NumpyArray(data, parameters={"__array__": "char"}),
parameters={"__array__": "string"},
)
[docs]
class TObjectFactory(Factory):
"""
This class reads base TObject from a binary parser.
You should skip reconstructing array when this factory
keeps no data, since the method `reconstruct_array`
will always return `None`.
"""
# Whether keep TObject data.
keep_data_itempaths: set[str] = set()
[docs]
@classmethod
def build_factory(
cls,
top_type_name,
cur_streamer_info,
all_streamer_info,
item_path,
**kwargs,
):
"""
The configuration contains:
- `factory`: cls
- `name: fName,
- `keep_data`: Whether keep data from TObject.
"""
if top_type_name != "BASE":
return None
fType = cur_streamer_info["fType"]
if fType != 66:
return None
return cls(
name=cur_streamer_info["fName"],
keep_data=item_path in cls.keep_data_itempaths,
)
def __init__(self, name: str, keep_data: bool):
super().__init__(name)
self.keep_data = keep_data
[docs]
def build_cpp_reader(self):
return uproot_custom.readers.cpp.TObjectReader(
self.name,
self.keep_data,
)
[docs]
def build_python_reader(self):
return uproot_custom.readers.python.TObjectReader(
self.name,
self.keep_data,
)
[docs]
def build_forth_reader(
self,
buffer_holder: uproot_custom.readers._forth.BufferHolder,
):
return uproot_custom.readers._forth.TObjectReader(
self.name,
self.keep_data,
buffer_holder,
)
[docs]
def build_numba_reader(
self,
ctx: uproot_custom.readers._numba.CompilationContext,
):
return uproot_custom.readers._numba.TObjectReader(
self.name,
ctx,
self.keep_data,
)
[docs]
def make_awkward_content(self, raw_data):
if not self.keep_data:
return awkward.contents.EmptyArray()
unique_ids, bits, pidf, pidf_offsets = raw_data
return awkward.contents.RecordArray(
[
awkward.contents.NumpyArray(unique_ids),
awkward.contents.NumpyArray(bits),
awkward.contents.ListOffsetArray(
awkward.index.Index64(pidf_offsets),
awkward.contents.NumpyArray(pidf),
),
],
["fUniqueID", "fBits", "pidf"],
)
[docs]
class CStyleArrayFactory(Factory):
"""
This class reads a C-style array from a binary parser.
"""
[docs]
@classmethod
def priority(cls):
return 20 # This reader should be called first
[docs]
@classmethod
def build_factory(
cls,
top_type_name,
cur_streamer_info,
all_streamer_info,
item_path,
**kwargs,
):
fTypeName = cur_streamer_info.get("fTypeName", "")
dims = ()
if kwargs.get("called_from_top", False):
branch = kwargs["branch"]
dims, is_jagged = get_dims_from_branch(branch)
if is_jagged and not fTypeName.endswith("[]"):
fTypeName += "[]"
if not fTypeName.endswith("[]") and cur_streamer_info.get("fArrayDim", 0) == 0:
return None
fName = cur_streamer_info["fName"]
fArrayDim = cur_streamer_info.get("fArrayDim", None)
fMaxIndex = cur_streamer_info.get("fMaxIndex", None)
if fTypeName.endswith("[]"):
flat_size = -1
else:
assert fArrayDim is not None, f"fArrayDim cannot be None for {item_path}."
assert fMaxIndex is not None, f"fMaxIndex cannot be None for {item_path}."
flat_size = np.prod(fMaxIndex[:fArrayDim])
element_streamer_info = cur_streamer_info.copy()
element_streamer_info["fArrayDim"] = 0
while fTypeName.endswith("[]"):
fTypeName = fTypeName[:-2]
element_streamer_info["fTypeName"] = fTypeName
element_factory = build_factory(
element_streamer_info,
all_streamer_info,
item_path=item_path,
)
# When TString is stored in C-style or std array, it has a "fNByte+fVersion" header.
if isinstance(element_factory, TStringFactory) and fArrayDim != 0:
element_factory.with_header = True
assert flat_size != 0, "flatten_size cannot be 0."
# When stored in std::array
# [1] There is no header for vector and map.
# [2] Map is object-wise serialized.
# By so far, we use fType==82 to identify std::array.
if (
isinstance(
element_factory,
(
STLSeqFactory,
STLMapFactory,
STLStringFactory,
),
)
and cur_streamer_info.get("fType", -1) == 82
):
element_factory.with_header = False
element_factory.objwise_or_memberwise = 0 # -1: auto, 0: obj-wise, 1: member-wise
return cls(
name=fName,
element_factory=element_factory,
flat_size=flat_size,
fMaxIndex=fMaxIndex,
fArrayDim=fArrayDim,
)
def __init__(
self,
name: str,
element_factory: Factory,
flat_size: int,
fMaxIndex: int,
fArrayDim: np.ndarray,
):
super().__init__(name)
self.element_factory = element_factory
self.flat_size = flat_size
self.fMaxIndex = fMaxIndex
self.fArrayDim = fArrayDim
[docs]
def build_cpp_reader(self):
element_reader = self.element_factory.build_cpp_reader()
return uproot_custom.readers.cpp.CStyleArrayReader(
self.name,
self.flat_size,
element_reader,
)
[docs]
def build_python_reader(self):
element_reader = self.element_factory.build_python_reader()
return uproot_custom.readers.python.CStyleArrayReader(
self.name,
self.flat_size,
element_reader,
)
[docs]
def build_forth_reader(
self,
buffer_holder: uproot_custom.readers._forth.BufferHolder,
):
element_reader = self.element_factory.build_forth_reader(buffer_holder)
return uproot_custom.readers._forth.CStyleArrayReader(
self.name,
self.flat_size,
element_reader,
buffer_holder,
)
[docs]
def build_numba_reader(
self,
ctx: uproot_custom.readers._numba.CompilationContext,
):
element_reader = self.element_factory.build_numba_reader(ctx)
return uproot_custom.readers._numba.CStyleArrayReader(
self.name,
ctx,
self.flat_size,
element_reader,
)
[docs]
def make_awkward_content(self, raw_data):
if self.flat_size < 0:
element_raw_data = raw_data[1]
else:
element_raw_data = raw_data
element_content = self.element_factory.make_awkward_content(element_raw_data)
if self.fArrayDim is not None and self.fMaxIndex is not None:
shape = [self.fMaxIndex[i] for i in range(self.fArrayDim)]
for s in shape[::-1]:
element_content = awkward.contents.RegularArray(element_content, int(s))
else:
shape = ()
if self.flat_size < 0:
offsets = raw_data[0]
for s in shape:
offsets = offsets / s
return ak.contents.ListOffsetArray(
ak.index.Index64(offsets),
element_content,
)
else:
return element_content
[docs]
class GroupFactory(Factory):
"""
This factory groups differernt factory together. You can use
this factory to read specific format of data as you like.
"""
[docs]
@classmethod
def build_factory(
cls,
top_type_name,
cur_streamer_info,
all_streamer_info,
item_path,
**kwargs,
):
"""
Never match items. If one needs to use this factory,
instatiate it directly.
"""
return None
def __init__(self, name: str, sub_factories: list[Factory]):
super().__init__(name)
self.sub_factories = sub_factories
[docs]
def build_cpp_reader(self):
sub_readers = [s.build_cpp_reader() for s in self.sub_factories]
return uproot_custom.readers.cpp.GroupReader(self.name, sub_readers)
[docs]
def build_python_reader(self):
sub_readers = [s.build_python_reader() for s in self.sub_factories]
return uproot_custom.readers.python.GroupReader(self.name, sub_readers)
[docs]
def build_forth_reader(
self,
buffer_holder: uproot_custom.readers._forth.BufferHolder,
):
sub_readers = [s.build_forth_reader(buffer_holder) for s in self.sub_factories]
return uproot_custom.readers._forth.GroupReader(self.name, sub_readers, buffer_holder)
[docs]
def build_numba_reader(
self,
ctx: uproot_custom.readers._numba.CompilationContext,
):
sub_readers = [s.build_numba_reader(ctx) for s in self.sub_factories]
return uproot_custom.readers._numba.GroupReader(self.name, ctx, sub_readers)
[docs]
def make_awkward_content(self, raw_data):
sub_configs = self.sub_factories
sub_fields = []
sub_contents = []
for s_fac, s_data in zip(sub_configs, raw_data):
s_cont = s_fac.make_awkward_content(s_data)
if isinstance(s_cont, awkward.contents.EmptyArray):
continue
sub_fields.append(s_fac.name)
sub_contents.append(s_cont)
if len(sub_contents) == 0:
return awkward.contents.EmptyArray()
else:
return awkward.contents.RecordArray(sub_contents, sub_fields)
[docs]
class BaseObjectFactory(GroupFactory):
"""
This class reads base-object of an object. The base object has
fNBytes(uint32), fVersion(uint16) at the beginning.
"""
[docs]
@classmethod
def build_factory(
cls,
top_type_name,
cls_streamer_info,
all_streamer_info,
item_path,
**kwargs,
):
if top_type_name != "BASE":
return None
fType = cls_streamer_info["fType"]
if fType != 0:
return None
fName = cls_streamer_info["fName"]
sub_streamers: list[dict] = all_streamer_info[fName]
sub_factories = [build_factory(s, all_streamer_info, item_path) for s in sub_streamers]
return cls(name=fName, sub_factories=sub_factories)
[docs]
def build_cpp_reader(self):
sub_readers = [s.build_cpp_reader() for s in self.sub_factories]
return uproot_custom.readers.cpp.GroupReader(self.name, sub_readers)
[docs]
def build_python_reader(self):
sub_readers = [s.build_python_reader() for s in self.sub_factories]
return uproot_custom.readers.python.GroupReader(self.name, sub_readers)
[docs]
def build_forth_reader(
self,
buffer_holder: uproot_custom.readers._forth.BufferHolder,
):
sub_readers = [s.build_forth_reader(buffer_holder) for s in self.sub_factories]
return uproot_custom.readers._forth.GroupReader(self.name, sub_readers, buffer_holder)
[docs]
class AnyClassFactory(GroupFactory):
"""
This class tries to read any class object that is not handled by other factories.
"""
[docs]
@classmethod
def priority(cls):
return 0 # This reader should be called last
[docs]
@classmethod
def build_factory(
cls,
top_type_name,
cur_streamer_info,
all_streamer_info,
item_path,
**kwargs,
):
sub_streamers: list = all_streamer_info[top_type_name]
sub_factories = [build_factory(s, all_streamer_info, item_path) for s in sub_streamers]
return cls(name=top_type_name, sub_factories=sub_factories)
[docs]
def build_cpp_reader(self):
sub_readers = [s.build_cpp_reader() for s in self.sub_factories]
return uproot_custom.readers.cpp.AnyClassReader(self.name, sub_readers)
[docs]
def build_python_reader(self):
sub_readers = [s.build_python_reader() for s in self.sub_factories]
return uproot_custom.readers.python.AnyClassReader(self.name, sub_readers)
[docs]
def build_forth_reader(
self,
buffer_holder: uproot_custom.readers._forth.BufferHolder,
):
sub_readers = [s.build_forth_reader(buffer_holder) for s in self.sub_factories]
return uproot_custom.readers._forth.AnyClassReader(
self.name, sub_readers, buffer_holder
)
[docs]
def build_numba_reader(
self,
ctx: uproot_custom.readers._numba.CompilationContext,
):
sub_readers = [s.build_numba_reader(ctx) for s in self.sub_factories]
return uproot_custom.readers._numba.AnyClassReader(self.name, ctx, sub_readers)
[docs]
class EmptyFactory(Factory):
"""
This factory does nothing. It's just a place holder.
"""
[docs]
@classmethod
def build_factory(
cls,
top_type_name,
cur_streamer_info,
all_streamer_info,
item_path,
**kwargs,
):
"""
This factory will never match items. If one needs to use this factory,
instatiate it directly.
"""
return None
[docs]
def build_cpp_reader(self):
return uproot_custom.readers.cpp.EmptyReader(self.name)
[docs]
def build_python_reader(self):
return uproot_custom.readers.python.EmptyReader(self.name)
[docs]
def build_forth_reader(
self,
buffer_holder: uproot_custom.readers._forth.BufferHolder,
):
return uproot_custom.readers._forth.EmptyReader(self.name, buffer_holder)
[docs]
def build_numba_reader(
self,
ctx: uproot_custom.readers._numba.CompilationContext,
):
return uproot_custom.readers._numba.EmptyReader(self.name, ctx)
[docs]
def make_awkward_content(self, raw_data):
return awkward.contents.EmptyArray()
registered_factories |= {
PrimitiveFactory,
STLSeqFactory,
STLMapFactory,
STLStringFactory,
TArrayFactory,
TStringFactory,
TObjectFactory,
CStyleArrayFactory,
GroupFactory,
BaseObjectFactory,
AnyClassFactory,
ObjectHeaderFactory,
EmptyFactory,
}