GraphArrayView(
graph: BaseGraph,
attr_key: str,
*,
offset: int | ndarray = 0,
shape: tuple[int, ...] | None = None,
chunk_shape: tuple[int, ...] | int | None = None,
buffer_cache_size: int | None = None,
dtype: dtype | None = None,
)
Bases: BaseReadOnlyArray
flowchart TD
tracksdata.array.GraphArrayView[GraphArrayView]
tracksdata.array._base_array.BaseReadOnlyArray[BaseReadOnlyArray]
tracksdata.array._base_array.BaseReadOnlyArray --> tracksdata.array.GraphArrayView
click tracksdata.array.GraphArrayView href "" "tracksdata.array.GraphArrayView"
click tracksdata.array._base_array.BaseReadOnlyArray href "" "tracksdata.array._base_array.BaseReadOnlyArray"
Class used to view the content of a graph as an array.
The resulting graph behaves as a read-only numpy array,
displaying arbitrary attributes inside their respective instance mask.
The content is lazy loaded from the original data source as
it's done with a zarr.Array
Parameters:
-
graph
(BaseGraph)
–
The graph to view as an array.
-
attr_key
(str)
–
The attribute key to view as an array.
-
offset
(int | ndarray, default:
0
)
–
The offset to apply to the array.
-
shape
(tuple[int, ...] | None, default:
None
)
–
The shape of the array. If None, the shape is inferred from the graph metadata shape key.
-
chunk_shape
(tuple[int] | None, default:
None
)
–
The chunk shape for the array. If None, the default chunk size is used.
-
buffer_cache_size
(int, default:
None
)
–
The maximum number of buffers to keep in the cache for the array.
If None, the default buffer cache size is used.
Methods:
-
__array__
–
Convert the GraphArrayView to a numpy array.
-
__getitem__
–
Return a sliced view of the GraphArrayView.
-
__len__
–
Returns the length of the first dimension of the array.
-
reindex
–
Reindex the GraphArrayView.
Attributes:
-
dtype
(dtype)
–
Returns the dtype of the array.
-
ndim
(int)
–
Returns the number of dimensions of the array.
-
shape
(tuple[int, ...])
–
Returns the shape of the array.
-
size
(int)
–
Returns the total number of elements in the array.
Source code in src/tracksdata/array/_graph_array.py
| def __init__(
self,
graph: BaseGraph,
attr_key: str,
*,
offset: int | np.ndarray = 0,
shape: tuple[int, ...] | None = None,
chunk_shape: tuple[int, ...] | int | None = None,
buffer_cache_size: int | None = None,
dtype: np.dtype | None = None,
):
if attr_key not in graph.node_attr_keys(return_ids=True):
raise ValueError(f"Attribute key '{attr_key}' not found in graph. Expected '{graph.node_attr_keys()}'")
self.graph = graph
self._attr_key = attr_key
self._offset = offset
if dtype is None:
# Infer the dtype from the graph's attribute
# TODO improve performance
df = graph.node_attrs(attr_keys=[self._attr_key])
if df.is_empty():
dtype = get_options().gav_default_dtype
else:
try:
dtype = polars_dtype_to_numpy_dtype(df[self._attr_key].dtype, allow_sequence=False)
except ValueError as e:
raise ValueError(f"Attribute values for key '{self._attr_key}' must be scalar.") from e
# napari support for bool is limited
if np.issubdtype(dtype, bool):
dtype = np.uint8
self._dtype = dtype
self.original_shape = _validate_shape(shape, graph, "GraphArrayView")
chunk_shape = chunk_shape or get_options().gav_chunk_shape
if isinstance(chunk_shape, int):
chunk_shape = (chunk_shape,) * (len(self.original_shape) - 1)
elif len(chunk_shape) < len(self.original_shape) - 1:
chunk_shape = (1,) * (len(self.original_shape) - 1 - len(chunk_shape)) + tuple(chunk_shape)
self.chunk_shape = chunk_shape
self.buffer_cache_size = buffer_cache_size or get_options().gav_buffer_cache_size
self._indices = tuple(slice(0, s) for s in self.original_shape)
self._cache = NDChunkCache(
compute_func=self._fill_array,
shape=self.shape[1:],
chunk_shape=self.chunk_shape,
buffer_cache_size=self.buffer_cache_size,
dtype=self.dtype,
)
self._spatial_filter = self.graph.bbox_spatial_filter(
frame_attr_key=DEFAULT_ATTR_KEYS.T,
bbox_attr_key=DEFAULT_ATTR_KEYS.BBOX,
)
self.graph.node_added.connect(self._on_node_added)
self.graph.node_removed.connect(self._on_node_removed)
self.graph.node_updated.connect(self._on_node_updated)
|
dtype
property
Returns the dtype of the array.
ndim
property
Returns the number of dimensions of the array.
shape
property
Returns the shape of the array.
size
property
Returns the total number of elements in the array.
__array__
__array__(
dtype: dtype | None = None, copy: bool | None = None
) -> np.ndarray
Convert the GraphArrayView to a numpy array.
Parameters:
-
dtype
(dtype, default:
None
)
–
The desired dtype of the output array. If None, the dtype of the GraphArrayView is used.
-
copy
(bool, default:
None
)
–
This parameter is ignored, as the GraphArrayView is read-only.
Returns:
-
ndarray
–
In memory numpy array of the GraphArrayView of the current indices.
Source code in src/tracksdata/array/_graph_array.py
| def __array__(
self,
dtype: np.dtype | None = None,
copy: bool | None = None,
) -> np.ndarray:
"""Convert the GraphArrayView to a numpy array.
Parameters
----------
dtype : np.dtype, optional
The desired dtype of the output array. If None, the dtype of the GraphArrayView is used.
copy : bool, optional
This parameter is ignored, as the GraphArrayView is read-only.
Returns
-------
np.ndarray
In memory numpy array of the GraphArrayView of the current indices.
"""
if sum(isinstance(i, Sequence) for i in self._indices) > 1:
raise NotImplementedError("Multiple sequences in indices are not supported for __array__.")
time = self._indices[0]
volume_slicing = self._indices[1:]
if np.isscalar(time):
try:
time = time.item() # convert from numpy.int to int
except AttributeError:
pass
result = self._cache.get(
time=time,
volume_slicing=volume_slicing,
).astype(dtype or self.dtype)
return np.array(result) if np.isscalar(result) else result
else:
if isinstance(time, slice):
time = range(self.original_shape[0])[time]
return np.stack(
[
self._cache.get(
time=t,
volume_slicing=volume_slicing,
)
for t in time
]
).astype(dtype or self.dtype)
|
__getitem__
__getitem__(index: ArrayIndex) -> GraphArrayView
Return a sliced view of the GraphArrayView.
Parameters:
-
index
(ArrayIndex)
–
The indices to slice the array.
Returns:
Source code in src/tracksdata/array/_graph_array.py
| def __getitem__(self, index: ArrayIndex) -> "GraphArrayView":
"""Return a sliced view of the GraphArrayView.
Parameters
----------
index : ArrayIndex
The indices to slice the array.
Returns
-------
GraphArrayView
A new GraphArrayView object with updated indices.
"""
normalized_index = []
if not isinstance(index, tuple):
index = (index,)
if None in index:
raise ValueError("None is not allowed for GraphArrayView indexing.")
jj = 0
for oi in self._indices:
if np.isscalar(oi):
normalized_index.append(None)
else:
if len(index) <= jj:
normalized_index.append(slice(None))
else:
normalized_index.append(index[jj])
jj += 1
return self.reindex(normalized_index)
|
__len__
Returns the length of the first dimension of the array.
Source code in src/tracksdata/array/_base_array.py
| def __len__(self) -> int:
"""Returns the length of the first dimension of the array."""
return self.shape[0]
|
_bbox_to_slices
_bbox_to_slices(bbox: Any) -> tuple[slice, ...] | None
Convert a bbox to clipped spatial slices in array coordinates.
Returns None when the bbox does not overlap the current array volume.
Source code in src/tracksdata/array/_graph_array.py
| def _bbox_to_slices(self, bbox: Any) -> tuple[slice, ...] | None:
"""
Convert a bbox to clipped spatial slices in array coordinates.
Returns `None` when the bbox does not overlap the current array volume.
"""
bbox = np.asarray(bbox, dtype=np.int64).reshape(-1)
ndim = len(self.original_shape) - 1
if len(bbox) != 2 * ndim:
raise ValueError(f"`bbox` must have length {2 * ndim}, got {len(bbox)}")
offset = self._offset_as_array(ndim)
start = bbox[:ndim] + offset
stop = bbox[ndim:] + offset
shape = np.asarray(self.original_shape[1:], dtype=np.int64)
start = np.clip(start, 0, shape)
stop = np.clip(stop, 0, shape)
if np.any(stop <= start):
return None
return tuple(slice(int(s), int(e)) for s, e in zip(start, stop, strict=True))
|
_fill_array
_fill_array(
time: int,
volume_slicing: Sequence[slice],
buffer: ndarray,
) -> np.ndarray
Fill the buffer with data from the graph at a specific time.
Parameters:
-
time
(int)
–
The time point to retrieve data for.
-
volume_slicing
(Sequence[slice])
–
The volume slicing information (currently not fully utilized).
-
buffer
(ndarray)
–
The buffer to fill with data.
Returns:
Source code in src/tracksdata/array/_graph_array.py
| def _fill_array(self, time: int, volume_slicing: Sequence[slice], buffer: np.ndarray) -> np.ndarray:
"""Fill the buffer with data from the graph at a specific time.
Parameters
----------
time : int
The time point to retrieve data for.
volume_slicing : Sequence[slice]
The volume slicing information (currently not fully utilized).
buffer : np.ndarray
The buffer to fill with data.
Returns
-------
np.ndarray
The filled buffer.
"""
subgraph = self._spatial_filter[(slice(time, time), *volume_slicing)]
df = subgraph.node_attrs(
attr_keys=[self._attr_key, DEFAULT_ATTR_KEYS.MASK],
)
for mask, value in zip(df[DEFAULT_ATTR_KEYS.MASK], df[self._attr_key], strict=True):
mask: Mask
mask.paint_buffer(buffer, value, offset=self._offset)
|
_invalidate_bbox
_invalidate_bbox(
time_values: Sequence[Any],
bboxes: Sequence[ndarray | None],
) -> None
Invalidate the cache regions covered by the given times and bboxes.
time_values and bboxes are parallel sequences; each (time, bbox)
pair is clipped to the array volume and the matching cache region is dropped.
A bbox that lies outside the array volume invalidates nothing.
A GraphArrayView requires every node to carry a bbox attribute, so a
None bbox is a programming error and raises ValueError.
Source code in src/tracksdata/array/_graph_array.py
| def _invalidate_bbox(self, time_values: Sequence[Any], bboxes: Sequence[np.ndarray | None]) -> None:
"""
Invalidate the cache regions covered by the given times and bboxes.
``time_values`` and ``bboxes`` are parallel sequences; each ``(time, bbox)``
pair is clipped to the array volume and the matching cache region is dropped.
A bbox that lies outside the array volume invalidates nothing.
A ``GraphArrayView`` requires every node to carry a ``bbox`` attribute, so a
``None`` bbox is a programming error and raises ``ValueError``.
"""
if hasattr(time_values, "to_list"):
time_values = time_values.to_list()
for time_value, bbox in zip(time_values, bboxes, strict=True):
try:
time = int(time_value)
except (TypeError, ValueError) as e:
raise ValueError(
f"Time attribute value must be a scalar integer, got {time_value!r} of type {type(time_value)}"
) from e
if not (0 <= time < self.original_shape[0]):
continue
if bbox is None:
raise ValueError(
f"Node at time {time} is missing a '{DEFAULT_ATTR_KEYS.BBOX}' attribute. "
"A GraphArrayView requires every node to have a bbox."
)
slices = self._bbox_to_slices(bbox)
if slices is not None:
self._cache.invalidate(time=time, volume_slicing=slices)
|
_mask_changed
staticmethod
_mask_changed(old_attr: dict, new_attr: dict) -> bool
Whether the painted output changed while the bbox stayed in place.
The rendered region depends on the displayed attribute value and the mask
pixels, so a mask swap with an unchanged bbox still requires invalidation.
Source code in src/tracksdata/array/_graph_array.py
| @staticmethod
def _mask_changed(old_attr: dict, new_attr: dict) -> bool:
"""
Whether the painted output changed while the bbox stayed in place.
The rendered region depends on the displayed attribute value and the mask
pixels, so a mask swap with an unchanged bbox still requires invalidation.
"""
old_mask = old_attr.get(DEFAULT_ATTR_KEYS.MASK)
new_mask = new_attr.get(DEFAULT_ATTR_KEYS.MASK)
if old_mask is None and new_mask is None:
return False
elif old_mask is None or new_mask is None:
return True
return old_mask != new_mask
|
_offset_as_array
_offset_as_array(ndim: int) -> np.ndarray
Normalize offset to a vector for each spatial axis.
Source code in src/tracksdata/array/_graph_array.py
| def _offset_as_array(self, ndim: int) -> np.ndarray:
"""Normalize `offset` to a vector for each spatial axis."""
if np.isscalar(self._offset):
return np.full(ndim, int(self._offset), dtype=np.int64)
offset = np.asarray(self._offset, dtype=np.int64).reshape(-1)
if len(offset) != ndim:
raise ValueError(f"`offset` must have length {ndim}, got {len(offset)}")
return offset
|
reindex
reindex(slicing: Sequence[ArrayIndex]) -> GraphArrayView
Reindex the GraphArrayView.
Returns a shallow copy of the GraphArrayView with the new indices.
Parameters:
-
slicing
(tuple[ArrayIndex, ...])
–
The new indices to apply to the GraphArrayView.
Returns:
Source code in src/tracksdata/array/_graph_array.py
| def reindex(
self,
slicing: Sequence[ArrayIndex],
) -> "GraphArrayView":
"""
Reindex the GraphArrayView.
Returns a shallow copy of the GraphArrayView with the new indices.
Parameters
----------
slicing : tuple[ArrayIndex, ...]
The new indices to apply to the GraphArrayView.
Returns
-------
GraphArrayView
A new GraphArrayView object with updated indices.
"""
obj = copy(self)
obj._indices = tuple(chain_indices(i1, i2) for i1, i2 in zip(self._indices, slicing, strict=False))
return obj
|