GraphArrayView(
graph: BaseGraph,
attr_key: str,
*,
offset: int | ndarray = 0,
shape: tuple[int, ...] | None = None,
chunk_shape: tuple[int, ...] | int | None = None,
buffer_cache_size: int | None = None,
dtype: dtype | None = None,
)
Bases: BaseReadOnlyArray
flowchart TD
tracksdata.array.GraphArrayView[GraphArrayView]
tracksdata.array._base_array.BaseReadOnlyArray[BaseReadOnlyArray]
tracksdata.array._base_array.BaseReadOnlyArray --> tracksdata.array.GraphArrayView
click tracksdata.array.GraphArrayView href "" "tracksdata.array.GraphArrayView"
click tracksdata.array._base_array.BaseReadOnlyArray href "" "tracksdata.array._base_array.BaseReadOnlyArray"
Class used to view the content of a graph as an array.
The resulting graph behaves as a read-only numpy array,
displaying arbitrary attributes inside their respective instance mask.
The content is lazy loaded from the original data source as
it's done with a zarr.Array
Parameters:
-
graph
(BaseGraph)
–
The graph to view as an array.
-
attr_key
(str)
–
The attribute key to view as an array.
-
offset
(int | ndarray, default:
0
)
–
The offset to apply to the array.
-
shape
(tuple[int, ...] | None, default:
None
)
–
The shape of the array. If None, the shape is inferred from the graph metadata shape key.
-
chunk_shape
(tuple[int] | None, default:
None
)
–
The chunk shape for the array. If None, the default chunk size is used.
-
buffer_cache_size
(int, default:
None
)
–
The maximum number of buffers to keep in the cache for the array.
If None, the default buffer cache size is used.
Methods:
-
__array__
–
Convert the GraphArrayView to a numpy array.
-
__getitem__
–
Return a sliced view of the GraphArrayView.
-
__len__
–
Returns the length of the first dimension of the array.
-
reindex
–
Reindex the GraphArrayView.
Attributes:
-
dtype
(dtype)
–
Returns the dtype of the array.
-
ndim
(int)
–
Returns the number of dimensions of the array.
-
shape
(tuple[int, ...])
–
Returns the shape of the array.
-
size
(int)
–
Returns the total number of elements in the array.
Source code in src/tracksdata/array/_graph_array.py
| def __init__(
self,
graph: BaseGraph,
attr_key: str,
*,
offset: int | np.ndarray = 0,
shape: tuple[int, ...] | None = None,
chunk_shape: tuple[int, ...] | int | None = None,
buffer_cache_size: int | None = None,
dtype: np.dtype | None = None,
):
if attr_key not in graph.node_attr_keys:
raise ValueError(f"Attribute key '{attr_key}' not found in graph. Expected '{graph.node_attr_keys}'")
self.graph = graph
self._attr_key = attr_key
self._offset = offset
if dtype is None:
# Infer the dtype from the graph's attribute
# TODO improve performance
df = graph.node_attrs(attr_keys=[self._attr_key])
if df.is_empty():
dtype = get_options().gav_default_dtype
else:
try:
dtype = polars_dtype_to_numpy_dtype(df[self._attr_key].dtype, allow_sequence=False)
except ValueError as e:
raise ValueError(f"Attribute values for key '{self._attr_key}' must be scalar.") from e
# napari support for bool is limited
if np.issubdtype(dtype, bool):
dtype = np.uint8
self._dtype = dtype
self.original_shape = _validate_shape(shape, graph, "GraphArrayView")
chunk_shape = chunk_shape or get_options().gav_chunk_shape
if isinstance(chunk_shape, int):
chunk_shape = (chunk_shape,) * (len(self.original_shape) - 1)
elif len(chunk_shape) < len(self.original_shape) - 1:
chunk_shape = (1,) * (len(self.original_shape) - 1 - len(chunk_shape)) + tuple(chunk_shape)
self.chunk_shape = chunk_shape
self.buffer_cache_size = buffer_cache_size or get_options().gav_buffer_cache_size
self._indices = tuple(slice(0, s) for s in self.original_shape)
self._cache = NDChunkCache(
compute_func=self._fill_array,
shape=self.shape[1:],
chunk_shape=self.chunk_shape,
buffer_cache_size=self.buffer_cache_size,
dtype=self.dtype,
)
self._spatial_filter = self.graph.bbox_spatial_filter(
frame_attr_key=DEFAULT_ATTR_KEYS.T,
bbox_attr_key=DEFAULT_ATTR_KEYS.BBOX,
)
|
dtype
property
Returns the dtype of the array.
ndim
property
Returns the number of dimensions of the array.
shape
property
Returns the shape of the array.
size
property
Returns the total number of elements in the array.
__array__
__array__(
dtype: dtype | None = None, copy: bool | None = None
) -> np.ndarray
Convert the GraphArrayView to a numpy array.
Parameters:
-
dtype
(dtype, default:
None
)
–
The desired dtype of the output array. If None, the dtype of the GraphArrayView is used.
-
copy
(bool, default:
None
)
–
This parameter is ignored, as the GraphArrayView is read-only.
Returns:
-
ndarray
–
In memory numpy array of the GraphArrayView of the current indices.
Source code in src/tracksdata/array/_graph_array.py
| def __array__(
self,
dtype: np.dtype | None = None,
copy: bool | None = None,
) -> np.ndarray:
"""Convert the GraphArrayView to a numpy array.
Parameters
----------
dtype : np.dtype, optional
The desired dtype of the output array. If None, the dtype of the GraphArrayView is used.
copy : bool, optional
This parameter is ignored, as the GraphArrayView is read-only.
Returns
-------
np.ndarray
In memory numpy array of the GraphArrayView of the current indices.
"""
if sum(isinstance(i, Sequence) for i in self._indices) > 1:
raise NotImplementedError("Multiple sequences in indices are not supported for __array__.")
time = self._indices[0]
volume_slicing = self._indices[1:]
if np.isscalar(time):
try:
time = time.item() # convert from numpy.int to int
except AttributeError:
pass
result = self._cache.get(
time=time,
volume_slicing=volume_slicing,
).astype(dtype or self.dtype)
return np.array(result) if np.isscalar(result) else result
else:
if isinstance(time, slice):
time = range(self.original_shape[0])[time]
return np.stack(
[
self._cache.get(
time=t,
volume_slicing=volume_slicing,
)
for t in time
]
).astype(dtype or self.dtype)
|
__getitem__
__getitem__(index: ArrayIndex) -> GraphArrayView
Return a sliced view of the GraphArrayView.
Parameters:
-
index
(ArrayIndex)
–
The indices to slice the array.
Returns:
Source code in src/tracksdata/array/_graph_array.py
| def __getitem__(self, index: ArrayIndex) -> "GraphArrayView":
"""Return a sliced view of the GraphArrayView.
Parameters
----------
index : ArrayIndex
The indices to slice the array.
Returns
-------
GraphArrayView
A new GraphArrayView object with updated indices.
"""
normalized_index = []
if not isinstance(index, tuple):
index = (index,)
if None in index:
raise ValueError("None is not allowed for GraphArrayView indexing.")
jj = 0
for oi in self._indices:
if np.isscalar(oi):
normalized_index.append(None)
else:
if len(index) <= jj:
normalized_index.append(slice(None))
else:
normalized_index.append(index[jj])
jj += 1
return self.reindex(normalized_index)
|
__len__
Returns the length of the first dimension of the array.
Source code in src/tracksdata/array/_base_array.py
| def __len__(self) -> int:
"""Returns the length of the first dimension of the array."""
return self.shape[0]
|
_fill_array
_fill_array(
time: int,
volume_slicing: Sequence[slice],
buffer: ndarray,
) -> np.ndarray
Fill the buffer with data from the graph at a specific time.
Parameters:
-
time
(int)
–
The time point to retrieve data for.
-
volume_slicing
(Sequence[slice])
–
The volume slicing information (currently not fully utilized).
-
buffer
(ndarray)
–
The buffer to fill with data.
Returns:
Source code in src/tracksdata/array/_graph_array.py
| def _fill_array(self, time: int, volume_slicing: Sequence[slice], buffer: np.ndarray) -> np.ndarray:
"""Fill the buffer with data from the graph at a specific time.
Parameters
----------
time : int
The time point to retrieve data for.
volume_slicing : Sequence[slice]
The volume slicing information (currently not fully utilized).
buffer : np.ndarray
The buffer to fill with data.
Returns
-------
np.ndarray
The filled buffer.
"""
subgraph = self._spatial_filter[(slice(time, time), *volume_slicing)]
df = subgraph.node_attrs(
attr_keys=[self._attr_key, DEFAULT_ATTR_KEYS.MASK],
)
for mask, value in zip(df[DEFAULT_ATTR_KEYS.MASK], df[self._attr_key], strict=True):
mask: Mask
mask.paint_buffer(buffer, value, offset=self._offset)
|
reindex
reindex(slicing: Sequence[ArrayIndex]) -> GraphArrayView
Reindex the GraphArrayView.
Returns a shallow copy of the GraphArrayView with the new indices.
Parameters:
-
slicing
(tuple[ArrayIndex, ...])
–
The new indices to apply to the GraphArrayView.
Returns:
Source code in src/tracksdata/array/_graph_array.py
| def reindex(
self,
slicing: Sequence[ArrayIndex],
) -> "GraphArrayView":
"""
Reindex the GraphArrayView.
Returns a shallow copy of the GraphArrayView with the new indices.
Parameters
----------
slicing : tuple[ArrayIndex, ...]
The new indices to apply to the GraphArrayView.
Returns
-------
GraphArrayView
A new GraphArrayView object with updated indices.
"""
obj = copy(self)
obj._indices = tuple(chain_indices(i1, i2) for i1, i2 in zip(self._indices, slicing, strict=False))
return obj
|