Skip to content

tracksdata.functional

Functional utilities for graph operations.

Functions:

join_node_attrs_to_edges

join_node_attrs_to_edges(node_attrs: DataFrame, edge_attrs: DataFrame, node_id_key: str = DEFAULT_ATTR_KEYS.NODE_ID, source_key: str = DEFAULT_ATTR_KEYS.EDGE_SOURCE, target_key: str = DEFAULT_ATTR_KEYS.EDGE_TARGET, source_prefix: str = 'source_', target_prefix: str = 'target_', how: JoinStrategy = 'left') -> pl.DataFrame

Add node attributes to edge attributes by joining on the node ID.

Parameters:

  • node_attrs

    (DataFrame) –

    Node attributes.

  • edge_attrs

    (DataFrame) –

    Edge attributes.

  • node_id_key

    (str, default: NODE_ID ) –

    The key of the node ID column.

  • source_key

    (str, default: EDGE_SOURCE ) –

    The key of the source column.

  • target_key

    (str, default: EDGE_TARGET ) –

    The key of the target column.

  • source_prefix

    (str, default: 'source_' ) –

    The prefix of the source column.

  • target_prefix

    (str, default: 'target_' ) –

    The prefix of the target column.

  • how

    (JoinStrategy, default: 'left' ) –

    The join type, where the "left" dataframe is the edge attributes and the "right" dataframe is the node attributes.

Returns:

  • DataFrame

    Edge attributes with node attributes added.

Examples:

node_attrs = pl.DataFrame({"node_id": [1, 2, 3], "a": [1, 2, 3], "b": [4, 5, 6]})
edge_attrs = pl.DataFrame({"source": [1, 2], "target": [2, 3]})
node_attr_to_edges(node_attrs, edge_attrs)
# shape: (2, 5)
# ┌──────────┬──────────┬──────────┬──────────┬────────────────┬────────────────┐
# │ source_a ┆ source_b ┆ target_a ┆ target_b ┆ source_node_id ┆ target_node_id │
# │ ---      ┆ ---      ┆ ---      ┆ ---      ┆ ---            ┆ ---            │
# │ i64      ┆ i64      ┆ i64      ┆ i64      ┆ i64            ┆ i64            │
# ╞══════════╪══════════╪══════════╪══════════╪════════════════╪════════════════╡
# │ 1        ┆ 4        ┆ 2        ┆ 5        ┆ 1              ┆ 2              │
# │ 2        ┆ 5        ┆ 3        ┆ 6        ┆ 2              ┆ 3              │
# └──────────┴──────────┴──────────┴──────────┴────────────────┴────────────────┘
Source code in src/tracksdata/functional/_edges.py
def join_node_attrs_to_edges(
    node_attrs: pl.DataFrame,
    edge_attrs: pl.DataFrame,
    node_id_key: str = DEFAULT_ATTR_KEYS.NODE_ID,
    source_key: str = DEFAULT_ATTR_KEYS.EDGE_SOURCE,
    target_key: str = DEFAULT_ATTR_KEYS.EDGE_TARGET,
    source_prefix: str = "source_",
    target_prefix: str = "target_",
    how: JoinStrategy = "left",
) -> pl.DataFrame:
    """
    Add node attributes to edge attributes by joining on the node ID.

    Parameters
    ----------
    node_attrs : pl.DataFrame
        Node attributes.
    edge_attrs : pl.DataFrame
        Edge attributes.
    node_id_key : str, optional
        The key of the node ID column.
    source_key : str, optional
        The key of the source column.
    target_key : str, optional
        The key of the target column.
    source_prefix : str, optional
        The prefix of the source column.
    target_prefix : str, optional
        The prefix of the target column.
    how : JoinStrategy, optional
        The join type, where the "left" dataframe is the edge attributes and
        the "right" dataframe is the node attributes.

    Returns
    -------
    pl.DataFrame
        Edge attributes with node attributes added.

    Examples
    --------
    ```python
    node_attrs = pl.DataFrame({"node_id": [1, 2, 3], "a": [1, 2, 3], "b": [4, 5, 6]})
    edge_attrs = pl.DataFrame({"source": [1, 2], "target": [2, 3]})
    node_attr_to_edges(node_attrs, edge_attrs)
    # shape: (2, 5)
    # ┌──────────┬──────────┬──────────┬──────────┬────────────────┬────────────────┐
    # │ source_a ┆ source_b ┆ target_a ┆ target_b ┆ source_node_id ┆ target_node_id │
    # │ ---      ┆ ---      ┆ ---      ┆ ---      ┆ ---            ┆ ---            │
    # │ i64      ┆ i64      ┆ i64      ┆ i64      ┆ i64            ┆ i64            │
    # ╞══════════╪══════════╪══════════╪══════════╪════════════════╪════════════════╡
    # │ 1        ┆ 4        ┆ 2        ┆ 5        ┆ 1              ┆ 2              │
    # │ 2        ┆ 5        ┆ 3        ┆ 6        ┆ 2              ┆ 3              │
    # └──────────┴──────────┴──────────┴──────────┴────────────────┴────────────────┘
    ```
    """
    node_attr_keys = node_attrs.columns
    node_attr_keys.remove(node_id_key)

    source_mapping = dict(zip(node_attr_keys, [f"{source_prefix}{c}" for c in node_attr_keys], strict=False))
    target_mapping = dict(zip(node_attr_keys, [f"{target_prefix}{c}" for c in node_attr_keys], strict=False))

    edge_attrs = edge_attrs.join(
        node_attrs.select(node_id_key, *node_attr_keys).rename(source_mapping),
        left_on=source_key,
        right_on=node_id_key,
        how=how,
    ).join(
        node_attrs.select(node_id_key, *node_attr_keys).rename(target_mapping),
        left_on=target_key,
        right_on=node_id_key,
        how=how,
    )

    return edge_attrs

rx_digraph_to_napari_dict

rx_digraph_to_napari_dict(tracklet_graph: PyDiGraph) -> dict[int, list[int]]

Convert a tracklet graph to a napari-ready dictionary. The input is a (child -> parent) graph (forward in time) and it is converted to a (parent -> child) dictionary (backward in time).

Parameters

tracklet_graph : rx.PyDiGraph The tracklet graph to convert.

Returns

dict[int, list[int]] A dictionary of parent -> child relationships.

Source code in src/tracksdata/functional/_napari.py
def rx_digraph_to_napari_dict(
    tracklet_graph: rx.PyDiGraph,
) -> dict[int, list[int]]:
    """
    Convert a tracklet graph to a napari-ready dictionary.
    The input is a (child -> parent) graph (forward in time) and it is converted
    to a (parent -> child) dictionary (backward in time).

    Parameters
    ----------
    tracklet_graph : rx.PyDiGraph
        The tracklet graph to convert.

    Returns
    -------
    dict[int, list[int]]
        A dictionary of parent -> child relationships.
    """
    dict_graph = {}
    for parent, child in tracklet_graph.edges():
        dict_graph.setdefault(child, []).append(parent)
    return dict_graph

to_napari_format

to_napari_format(graph: BaseGraph, shape: tuple[int, ...], solution_key: str | None, output_track_id_key: str, mask_key: None) -> tuple[pl.DataFrame, dict[int, int]]
to_napari_format(graph: BaseGraph, shape: tuple[int, ...], solution_key: str | None, output_track_id_key: str, mask_key: str) -> tuple[pl.DataFrame, dict[int, int], GraphArrayView]
to_napari_format(graph: BaseGraph, shape: tuple[int, ...], solution_key: str | None = DEFAULT_ATTR_KEYS.SOLUTION, output_track_id_key: str = DEFAULT_ATTR_KEYS.TRACK_ID, mask_key: str | None = None, chunk_shape: tuple[int] | None = None, buffer_cache_size: int | None = None) -> tuple[pl.DataFrame, dict[int, int], GraphArrayView] | tuple[pl.DataFrame, dict[int, int]]

Convert the subgraph of solution nodes to a napari-ready format.

This includes: - a tracks layer with the solution tracks - a graph with the parent-child relationships for the solution tracks - a labels layer with the solution nodes if mask_key is provided.

IMPORTANT: This function will reset the track ids if they already exist.

Parameters:

  • graph

    (BaseGraph) –

    The graph to convert.

  • shape

    (tuple[int, ...]) –

    The shape of the labels layer.

  • solution_key

    (str, default: SOLUTION ) –

    The key of the solution attribute. If None, the graph is not filtered by the solution attribute.

  • output_track_id_key

    (str, default: TRACK_ID ) –

    The key of the output track id attribute.

  • mask_key

    (str | None, default: None ) –

    The key of the mask attribute.

  • chunk_shape

    (tuple[int] | None, default: None ) –

    The chunk shape for the labels layer. If None, the default chunk size is used.

  • buffer_cache_size

    (int, default: None ) –

    The maximum number of buffers to keep in the cache for the labels layer. If None, the default buffer cache size is used.

Examples:

labels = ...
graph = ...
tracks_data, dict_graph, array_view = to_napari_format(graph, labels.shape, mask_key="mask")

Returns:

  • tuple[DataFrame, dict[int, int], GraphArrayView] | tuple[DataFrame, dict[int, int]]
    • tracks_data: The tracks data as a polars DataFrame.
    • dict_graph: A dictionary of parent -> child relationships.
    • array_view: The array view of the solution graph if mask_key is provided.
Source code in src/tracksdata/functional/_napari.py
def to_napari_format(
    graph: BaseGraph,
    shape: tuple[int, ...],
    solution_key: str | None = DEFAULT_ATTR_KEYS.SOLUTION,
    output_track_id_key: str = DEFAULT_ATTR_KEYS.TRACK_ID,
    mask_key: str | None = None,
    chunk_shape: tuple[int] | None = None,
    buffer_cache_size: int | None = None,
) -> (
    tuple[
        pl.DataFrame,
        dict[int, int],
        "GraphArrayView",
    ]
    | tuple[
        pl.DataFrame,
        dict[int, int],
    ]
):
    """
    Convert the subgraph of solution nodes to a napari-ready format.

    This includes:
    - a tracks layer with the solution tracks
    - a graph with the parent-child relationships for the solution tracks
    - a labels layer with the solution nodes if `mask_key` is provided.

    IMPORTANT: This function will reset the track ids if they already exist.

    Parameters
    ----------
    graph : BaseGraph
        The graph to convert.
    shape : tuple[int, ...]
        The shape of the labels layer.
    solution_key : str, optional
        The key of the solution attribute. If None, the graph is not filtered by the solution attribute.
    output_track_id_key : str, optional
        The key of the output track id attribute.
    mask_key : str | None, optional
        The key of the mask attribute.
    chunk_shape : tuple[int] | None, optional
        The chunk shape for the labels layer. If None, the default chunk size is used.
    buffer_cache_size : int, optional
        The maximum number of buffers to keep in the cache for the labels layer.
        If None, the default buffer cache size is used.

    Examples
    --------

    ```python
    labels = ...
    graph = ...
    tracks_data, dict_graph, array_view = to_napari_format(graph, labels.shape, mask_key="mask")
    ```

    Returns
    -------
    tuple[pl.DataFrame, dict[int, int], GraphArrayView] | tuple[pl.DataFrame, dict[int, int]]
        - tracks_data: The tracks data as a polars DataFrame.
        - dict_graph: A dictionary of parent -> child relationships.
        - array_view: The array view of the solution graph if `mask_key` is provided.
    """
    if solution_key is not None:
        solution_graph = graph.filter(
            NodeAttr(solution_key) == True,
            EdgeAttr(solution_key) == True,
        ).subgraph()

    else:
        solution_graph = graph

    tracks_graph = solution_graph.assign_track_ids(output_track_id_key)
    dict_graph = {tracks_graph[child]: tracks_graph[parent] for parent, child in tracks_graph.edge_list()}

    spatial_cols = ["z", "y", "x"][-len(shape) + 1 :]

    tracks_data = solution_graph.node_attrs(
        attr_keys=[output_track_id_key, DEFAULT_ATTR_KEYS.T, *spatial_cols],
    )

    # sorting columns
    tracks_data = tracks_data.select([output_track_id_key, DEFAULT_ATTR_KEYS.T, *spatial_cols])

    if mask_key is not None:
        from tracksdata.array._graph_array import GraphArrayView

        array_view = GraphArrayView(
            solution_graph,
            shape,
            attr_key=output_track_id_key,
            chunk_shape=chunk_shape,
            buffer_cache_size=buffer_cache_size,
        )

        return tracks_data, dict_graph, array_view

    return tracks_data, dict_graph