Skip to content

Supporting Modules

ServerState

Encapsulates all mutable state for one MCP server instance.

Source code in src/napari_mcp/state.py
class ServerState:
    """Encapsulates all mutable state for one MCP server instance."""

    def __init__(
        self,
        mode: StartupMode = StartupMode.STANDALONE,
        bridge_port: int | None = None,
    ):
        # Viewer
        self.viewer: ViewerProtocol | None = None
        self.viewer_lock: asyncio.Lock = asyncio.Lock()

        # Mode
        self.mode: StartupMode = mode
        self.bridge_port: int = bridge_port or int(
            os.environ.get("NAPARI_MCP_BRIDGE_PORT", "9999")
        )

        # Qt state
        self.qt_app: Any | None = None
        self.qt_pump_task: asyncio.Task | None = None
        self.window_close_connected: bool = False
        self.gui_executor: Any | None = None

        # Server lifecycle
        self._event_loop: asyncio.AbstractEventLoop | None = None
        self._shutdown_requested: bool = False

        # Execution namespace (persists across execute_code calls)
        self.exec_globals: dict[str, Any] = {}

        # Output storage
        self.output_storage: dict[str, dict[str, Any]] = {}
        self.output_storage_lock: asyncio.Lock = asyncio.Lock()
        self.next_output_id: int = 1
        try:
            self.max_output_items: int = int(
                os.environ.get("NAPARI_MCP_MAX_OUTPUT_ITEMS", "1000")
            )
        except Exception:
            self.max_output_items = 1000

    def request_shutdown(self, delay: float = 1.0) -> None:
        """Request the MCP server to shut down.

        Safe to call from any thread (e.g. the Qt main thread when the
        viewer window is destroyed).

        Parameters
        ----------
        delay : float
            Seconds to wait before stopping the event loop.  A short delay
            allows any in-flight MCP responses (e.g. from ``close_viewer``)
            to be flushed before the loop halts.
        """
        if self._shutdown_requested:
            return
        self._shutdown_requested = True
        logger.info("Server shutdown requested (viewer closed)")

        loop = self._event_loop
        if loop is not None and not loop.is_closed():
            try:
                loop.call_soon_threadsafe(lambda: loop.call_later(delay, loop.stop))
            except RuntimeError:
                pass

    def gui_execute(self, operation: Any) -> Any:
        """Run operation through GUI executor if set, else directly."""
        if self.gui_executor is not None:
            return self.gui_executor(operation)
        return operation()

    async def store_output(
        self,
        tool_name: str,
        stdout: str = "",
        stderr: str = "",
        result_repr: str | None = None,
        **metadata: Any,
    ) -> str:
        """Store tool output and return a unique ID."""
        async with self.output_storage_lock:
            output_id = str(self.next_output_id)
            self.next_output_id += 1

            self.output_storage[output_id] = {
                "tool_name": tool_name,
                "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(),
                "stdout": stdout,
                "stderr": stderr,
                "result_repr": result_repr,
                **metadata,
            }
            # Evict oldest items if exceeding capacity
            if (
                self.max_output_items > 0
                and len(self.output_storage) > self.max_output_items
            ):
                overflow = len(self.output_storage) - self.max_output_items
                for victim in sorted(self.output_storage.keys(), key=lambda k: int(k))[
                    :overflow
                ]:
                    self.output_storage.pop(victim, None)

            return output_id

    async def proxy_to_external(
        self, tool_name: str, params: dict[str, Any] | None = None
    ) -> Any | None:
        """Proxy a tool call to an external viewer if in AUTO_DETECT mode.

        Returns None immediately in STANDALONE mode (zero overhead).
        """
        if self.mode != StartupMode.AUTO_DETECT:
            return None

        try:
            from fastmcp import Client

            client = Client(f"http://localhost:{self.bridge_port}/mcp")
            async with client:
                result = await client.call_tool(tool_name, params or {})
                if hasattr(result, "content"):
                    content = result.content
                    if content[0].type == "text":
                        response = (
                            content[0].text
                            if hasattr(content[0], "text")
                            else str(content[0])
                        )
                        try:
                            return json.loads(response)
                        except json.JSONDecodeError:
                            return {
                                "status": "error",
                                "message": f"Invalid JSON response: {response}",
                            }
                    else:
                        return content
                return {
                    "status": "error",
                    "message": "Invalid response format from external viewer",
                }
        except Exception:
            return None

    async def detect_external_viewer(
        self,
    ) -> tuple[bool, dict[str, Any] | None]:
        """Detect if an external napari viewer is available via MCP bridge.

        Returns
        -------
        tuple of (found, info)
            found is True if an external bridge was detected, False otherwise.
            info is the session information dict when found, else None.
        """
        if self.mode != StartupMode.AUTO_DETECT:
            return False, None

        try:
            from fastmcp import Client

            client = Client(f"http://localhost:{self.bridge_port}/mcp")
            async with client:
                result = await client.call_tool("session_information")
                if result and hasattr(result, "content"):
                    content = result.content
                    if isinstance(content, list) and len(content) > 0:
                        info = (
                            content[0].text
                            if hasattr(content[0], "text")
                            else str(content[0])
                        )
                        info_dict = json.loads(info) if isinstance(info, str) else info
                        if info_dict.get("session_type") == "napari_bridge_session":
                            return True, info_dict
                return False, None
        except Exception:
            return False, None

    async def external_session_information(self) -> dict[str, Any]:
        """Get session information from the external viewer."""
        from fastmcp import Client

        test_client = Client(f"http://localhost:{self.bridge_port}/mcp")
        async with test_client:
            result = await test_client.call_tool("session_information")
            if hasattr(result, "content"):
                content = result.content
                if isinstance(content, list) and len(content) > 0:
                    info = (
                        content[0].text
                        if hasattr(content[0], "text")
                        else str(content[0])
                    )
                    info_dict = json.loads(info) if isinstance(info, str) else info
                    if info_dict.get("session_type") == "napari_bridge_session":
                        return {
                            "status": "ok",
                            "viewer_type": "external",
                            "title": info_dict.get("viewer", {}).get(
                                "title", "External Viewer"
                            ),
                            "layers": info_dict.get("viewer", {}).get(
                                "layer_names", []
                            ),
                            "port": info_dict.get("bridge_port", self.bridge_port),
                        }

        return {
            "status": "error",
            "message": "Failed to get session information from external viewer",
        }

Functions

request_shutdown

request_shutdown(delay: float = 1.0) -> None

Request the MCP server to shut down.

Safe to call from any thread (e.g. the Qt main thread when the viewer window is destroyed).

Parameters:

Name Type Description Default
delay float

Seconds to wait before stopping the event loop. A short delay allows any in-flight MCP responses (e.g. from close_viewer) to be flushed before the loop halts.

1.0
Source code in src/napari_mcp/state.py
def request_shutdown(self, delay: float = 1.0) -> None:
    """Request the MCP server to shut down.

    Safe to call from any thread (e.g. the Qt main thread when the
    viewer window is destroyed).

    Parameters
    ----------
    delay : float
        Seconds to wait before stopping the event loop.  A short delay
        allows any in-flight MCP responses (e.g. from ``close_viewer``)
        to be flushed before the loop halts.
    """
    if self._shutdown_requested:
        return
    self._shutdown_requested = True
    logger.info("Server shutdown requested (viewer closed)")

    loop = self._event_loop
    if loop is not None and not loop.is_closed():
        try:
            loop.call_soon_threadsafe(lambda: loop.call_later(delay, loop.stop))
        except RuntimeError:
            pass

gui_execute

gui_execute(operation: Any) -> Any

Run operation through GUI executor if set, else directly.

Source code in src/napari_mcp/state.py
def gui_execute(self, operation: Any) -> Any:
    """Run operation through GUI executor if set, else directly."""
    if self.gui_executor is not None:
        return self.gui_executor(operation)
    return operation()

store_output async

store_output(tool_name: str, stdout: str = '', stderr: str = '', result_repr: str | None = None, **metadata: Any) -> str

Store tool output and return a unique ID.

Source code in src/napari_mcp/state.py
async def store_output(
    self,
    tool_name: str,
    stdout: str = "",
    stderr: str = "",
    result_repr: str | None = None,
    **metadata: Any,
) -> str:
    """Store tool output and return a unique ID."""
    async with self.output_storage_lock:
        output_id = str(self.next_output_id)
        self.next_output_id += 1

        self.output_storage[output_id] = {
            "tool_name": tool_name,
            "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(),
            "stdout": stdout,
            "stderr": stderr,
            "result_repr": result_repr,
            **metadata,
        }
        # Evict oldest items if exceeding capacity
        if (
            self.max_output_items > 0
            and len(self.output_storage) > self.max_output_items
        ):
            overflow = len(self.output_storage) - self.max_output_items
            for victim in sorted(self.output_storage.keys(), key=lambda k: int(k))[
                :overflow
            ]:
                self.output_storage.pop(victim, None)

        return output_id

proxy_to_external async

proxy_to_external(tool_name: str, params: dict[str, Any] | None = None) -> Any | None

Proxy a tool call to an external viewer if in AUTO_DETECT mode.

Returns None immediately in STANDALONE mode (zero overhead).

Source code in src/napari_mcp/state.py
async def proxy_to_external(
    self, tool_name: str, params: dict[str, Any] | None = None
) -> Any | None:
    """Proxy a tool call to an external viewer if in AUTO_DETECT mode.

    Returns None immediately in STANDALONE mode (zero overhead).
    """
    if self.mode != StartupMode.AUTO_DETECT:
        return None

    try:
        from fastmcp import Client

        client = Client(f"http://localhost:{self.bridge_port}/mcp")
        async with client:
            result = await client.call_tool(tool_name, params or {})
            if hasattr(result, "content"):
                content = result.content
                if content[0].type == "text":
                    response = (
                        content[0].text
                        if hasattr(content[0], "text")
                        else str(content[0])
                    )
                    try:
                        return json.loads(response)
                    except json.JSONDecodeError:
                        return {
                            "status": "error",
                            "message": f"Invalid JSON response: {response}",
                        }
                else:
                    return content
            return {
                "status": "error",
                "message": "Invalid response format from external viewer",
            }
    except Exception:
        return None

detect_external_viewer async

detect_external_viewer() -> tuple[bool, dict[str, Any] | None]

Detect if an external napari viewer is available via MCP bridge.

Returns:

Type Description
tuple of (found, info)

found is True if an external bridge was detected, False otherwise. info is the session information dict when found, else None.

Source code in src/napari_mcp/state.py
async def detect_external_viewer(
    self,
) -> tuple[bool, dict[str, Any] | None]:
    """Detect if an external napari viewer is available via MCP bridge.

    Returns
    -------
    tuple of (found, info)
        found is True if an external bridge was detected, False otherwise.
        info is the session information dict when found, else None.
    """
    if self.mode != StartupMode.AUTO_DETECT:
        return False, None

    try:
        from fastmcp import Client

        client = Client(f"http://localhost:{self.bridge_port}/mcp")
        async with client:
            result = await client.call_tool("session_information")
            if result and hasattr(result, "content"):
                content = result.content
                if isinstance(content, list) and len(content) > 0:
                    info = (
                        content[0].text
                        if hasattr(content[0], "text")
                        else str(content[0])
                    )
                    info_dict = json.loads(info) if isinstance(info, str) else info
                    if info_dict.get("session_type") == "napari_bridge_session":
                        return True, info_dict
            return False, None
    except Exception:
        return False, None

external_session_information async

external_session_information() -> dict[str, Any]

Get session information from the external viewer.

Source code in src/napari_mcp/state.py
async def external_session_information(self) -> dict[str, Any]:
    """Get session information from the external viewer."""
    from fastmcp import Client

    test_client = Client(f"http://localhost:{self.bridge_port}/mcp")
    async with test_client:
        result = await test_client.call_tool("session_information")
        if hasattr(result, "content"):
            content = result.content
            if isinstance(content, list) and len(content) > 0:
                info = (
                    content[0].text
                    if hasattr(content[0], "text")
                    else str(content[0])
                )
                info_dict = json.loads(info) if isinstance(info, str) else info
                if info_dict.get("session_type") == "napari_bridge_session":
                    return {
                        "status": "ok",
                        "viewer_type": "external",
                        "title": info_dict.get("viewer", {}).get(
                            "title", "External Viewer"
                        ),
                        "layers": info_dict.get("viewer", {}).get(
                            "layer_names", []
                        ),
                        "port": info_dict.get("bridge_port", self.bridge_port),
                    }

    return {
        "status": "error",
        "message": "Failed to get session information from external viewer",
    }

Output Utilities

Output storage and truncation utilities.

Functions

truncate_output

truncate_output(output: str, line_limit: int) -> tuple[str, bool]

Truncate output to specified line limit.

Parameters:

Name Type Description Default
output str

The output text to truncate.

required
line_limit int

Maximum number of lines to return. If -1, return all lines.

required

Returns:

Type Description
tuple[str, bool]

Tuple of (truncated_output, was_truncated).

Source code in src/napari_mcp/output.py
def truncate_output(output: str, line_limit: int) -> tuple[str, bool]:
    """Truncate output to specified line limit.

    Parameters
    ----------
    output : str
        The output text to truncate.
    line_limit : int
        Maximum number of lines to return. If -1, return all lines.

    Returns
    -------
    tuple[str, bool]
        Tuple of (truncated_output, was_truncated).
    """
    try:
        line_limit = int(line_limit)
    except Exception:
        line_limit = 30
    if line_limit < -1:
        line_limit = -1
    if line_limit == -1:
        return output, False

    lines = output.splitlines(keepends=True)
    if len(lines) <= line_limit:
        return output, False

    truncated = "".join(lines[:line_limit])
    return truncated, True

Shared Helpers

Shared helpers used by both server.py and bridge_server.py.

These functions extract logic that was previously duplicated between the standalone server and the bridge server.

Functions

parse_bool

parse_bool(value: bool | str | None, default: bool = False) -> bool

Parse a boolean value from various input types.

Handles bool, str ("true"/"false"/"1"/"0"/etc.), and None.

Source code in src/napari_mcp/_helpers.py
def parse_bool(value: bool | str | None, default: bool = False) -> bool:
    """Parse a boolean value from various input types.

    Handles bool, str ("true"/"false"/"1"/"0"/etc.), and None.
    """
    if value is None:
        return default
    if isinstance(value, bool):
        return value
    if isinstance(value, str):
        return value.lower() in ("true", "1", "yes", "on")
    return bool(value)

resolve_layer_type

resolve_layer_type(layer_type: str) -> str | None

Resolve a layer type string to its canonical form.

Returns None if the type is not recognized.

Source code in src/napari_mcp/_helpers.py
def resolve_layer_type(layer_type: str) -> str | None:
    """Resolve a layer type string to its canonical form.

    Returns None if the type is not recognized.
    """
    return LAYER_TYPE_ALIASES.get(layer_type.strip().lower())

build_layer_detail

build_layer_detail(layer: Any) -> dict[str, Any]

Build a detail dict for a single napari layer.

Used by session_information in both standalone and bridge modes.

Source code in src/napari_mcp/_helpers.py
def build_layer_detail(layer: Any) -> dict[str, Any]:
    """Build a detail dict for a single napari layer.

    Used by session_information in both standalone and bridge modes.
    """
    detail: dict[str, Any] = {
        "name": layer.name,
        "type": layer.__class__.__name__,
        "visible": bool(getattr(layer, "visible", True)),
        "opacity": float(getattr(layer, "opacity", 1.0)),
    }
    if hasattr(layer, "data") and hasattr(layer.data, "shape"):
        detail["data_shape"] = list(layer.data.shape)
    if hasattr(layer, "data") and hasattr(layer.data, "dtype"):
        detail["data_dtype"] = str(layer.data.dtype)
    if hasattr(layer, "colormap"):
        detail["colormap"] = getattr(layer.colormap, "name", str(layer.colormap))
    if hasattr(layer, "blending"):
        detail["blending"] = getattr(layer, "blending", None)
    if hasattr(layer, "contrast_limits"):
        try:
            cl = layer.contrast_limits
            detail["contrast_limits"] = [float(cl[0]), float(cl[1])]
        except Exception:
            pass
    if hasattr(layer, "gamma"):
        detail["gamma"] = float(getattr(layer, "gamma", 1.0))
    return detail

create_layer_on_viewer

create_layer_on_viewer(viewer: Any, resolved_data: Any, lt: str, *, name: str | None = None, colormap: str | None = None, blending: str | None = None, channel_axis: int | str | None = None, size: float | str | None = None, shape_type: str | None = None, edge_color: str | None = None, face_color: str | None = None, edge_width: float | str | None = None) -> dict[str, Any]

Add a layer to a napari viewer and return a result dict.

This is the shared core used by add_layer in both server.py (standalone) and bridge_server.py (Qt thread). The caller is responsible for calling process_events and holding locks.

Parameters:

Name Type Description Default
viewer Viewer

The napari viewer instance.

required
resolved_data Any

The data to add (numpy array, list, tuple, etc.).

required
lt str

Canonical layer type (one of: image, labels, points, shapes, vectors, tracks, surface).

required
Source code in src/napari_mcp/_helpers.py
def create_layer_on_viewer(
    viewer: Any,
    resolved_data: Any,
    lt: str,
    *,
    name: str | None = None,
    colormap: str | None = None,
    blending: str | None = None,
    channel_axis: int | str | None = None,
    size: float | str | None = None,
    shape_type: str | None = None,
    edge_color: str | None = None,
    face_color: str | None = None,
    edge_width: float | str | None = None,
) -> dict[str, Any]:
    """Add a layer to a napari viewer and return a result dict.

    This is the shared core used by ``add_layer`` in both server.py
    (standalone) and bridge_server.py (Qt thread). The caller is responsible
    for calling ``process_events`` and holding locks.

    Parameters
    ----------
    viewer : napari.Viewer
        The napari viewer instance.
    resolved_data : Any
        The data to add (numpy array, list, tuple, etc.).
    lt : str
        Canonical layer type (one of: image, labels, points, shapes,
        vectors, tracks, surface).
    """
    if lt == "image":
        arr = np.asarray(resolved_data)
        if arr.size == 0:
            return {
                "status": "error",
                "message": "Cannot add image layer: data is empty.",
            }
        if np.issubdtype(arr.dtype, np.complexfloating):
            return {
                "status": "error",
                "message": (
                    f"Cannot add image layer: complex dtype ({arr.dtype}) "
                    "not supported. Convert to real first (e.g., np.abs(data))."
                ),
            }
        kwargs: dict[str, Any] = {"name": name}
        if colormap is not None:
            kwargs["colormap"] = colormap
        if blending is not None:
            kwargs["blending"] = blending
        if channel_axis is not None:
            kwargs["channel_axis"] = int(channel_axis)
        layer = viewer.add_image(arr, **kwargs)
        # napari returns a list of layers when channel_axis is used
        if isinstance(layer, list):
            names = [lyr.name for lyr in layer]
            return {
                "status": "ok",
                "name": names,
                "shape": list(np.shape(arr)),
                "n_channels": len(layer),
            }
        return {"status": "ok", "name": layer.name, "shape": list(np.shape(arr))}

    elif lt == "labels":
        arr = np.asarray(resolved_data)
        if arr.size == 0:
            return {
                "status": "error",
                "message": "Cannot add labels layer: data is empty.",
            }
        layer = viewer.add_labels(arr, name=name)
        return {"status": "ok", "name": layer.name, "shape": list(np.shape(arr))}

    elif lt == "points":
        arr = np.asarray(resolved_data, dtype=float)
        if arr.size == 0:
            return {
                "status": "error",
                "message": "Cannot add points layer: data is empty.",
            }
        layer = viewer.add_points(arr, name=name, size=float(size or 10.0))
        return {"status": "ok", "name": layer.name, "n_points": int(arr.shape[0])}

    elif lt == "shapes":
        kwargs = {"name": name, "shape_type": shape_type or "rectangle"}
        if edge_color is not None:
            kwargs["edge_color"] = edge_color
        if face_color is not None:
            kwargs["face_color"] = face_color
        if edge_width is not None:
            kwargs["edge_width"] = float(edge_width)
        layer = viewer.add_shapes(resolved_data, **kwargs)
        return {"status": "ok", "name": layer.name, "nshapes": int(layer.nshapes)}

    elif lt == "vectors":
        arr = np.asarray(resolved_data, dtype=float)
        kwargs = {"name": name}
        if edge_color is not None:
            kwargs["edge_color"] = edge_color
        if edge_width is not None:
            kwargs["edge_width"] = float(edge_width)
        layer = viewer.add_vectors(arr, **kwargs)
        return {"status": "ok", "name": layer.name, "n_vectors": int(arr.shape[0])}

    elif lt == "tracks":
        arr = np.asarray(resolved_data, dtype=float)
        layer = viewer.add_tracks(arr, name=name)
        return {
            "status": "ok",
            "name": layer.name,
            "n_tracks": int(len(np.unique(arr[:, 0]))),
        }

    elif lt == "surface":
        layer = viewer.add_surface(resolved_data, name=name)
        verts = np.asarray(resolved_data[0])
        faces = np.asarray(resolved_data[1])
        return {
            "status": "ok",
            "name": layer.name,
            "n_vertices": int(verts.shape[0]),
            "n_faces": int(faces.shape[0]),
        }

    else:
        return {
            "status": "error",
            "message": f"Unknown layer type '{lt}'.",
        }

run_code

run_code(code: str, exec_globals: dict[str, Any], *, source_label: str = '<mcp-exec>') -> tuple[str, str, str | None, Exception | None]

Execute Python code with stdout/stderr capture.

This is the shared core used by both execute_code in server.py (standalone) and bridge_server.py (Qt thread).

Parameters:

Name Type Description Default
code str

Python code string. The last expression's value is captured.

required
exec_globals dict

The execution namespace (both globals and locals).

required
source_label str

Label for compile() filename, e.g. "<mcp-exec>" or "<bridge-exec>".

'<mcp-exec>'

Returns:

Type Description
tuple of (stdout, stderr, result_repr, error)
  • stdout: captured stdout output
  • stderr: captured stderr output (includes traceback on error)
  • result_repr: repr() of the last expression, or None
  • error: the exception if one occurred, or None
Source code in src/napari_mcp/_helpers.py
def run_code(
    code: str,
    exec_globals: dict[str, Any],
    *,
    source_label: str = "<mcp-exec>",
) -> tuple[str, str, str | None, Exception | None]:
    """Execute Python code with stdout/stderr capture.

    This is the shared core used by both ``execute_code`` in server.py
    (standalone) and bridge_server.py (Qt thread).

    Parameters
    ----------
    code : str
        Python code string. The last expression's value is captured.
    exec_globals : dict
        The execution namespace (both globals and locals).
    source_label : str
        Label for compile() filename, e.g. ``"<mcp-exec>"`` or ``"<bridge-exec>"``.

    Returns
    -------
    tuple of (stdout, stderr, result_repr, error)
        - stdout: captured stdout output
        - stderr: captured stderr output (includes traceback on error)
        - result_repr: repr() of the last expression, or None
        - error: the exception if one occurred, or None
    """
    stdout_buf = StringIO()
    stderr_buf = StringIO()
    result_repr: str | None = None
    error: Exception | None = None

    try:
        with (
            contextlib.redirect_stdout(stdout_buf),
            contextlib.redirect_stderr(stderr_buf),
        ):
            parsed = ast.parse(code, mode="exec")
            if parsed.body and isinstance(parsed.body[-1], ast.Expr):
                if len(parsed.body) > 1:
                    exec_ast = ast.Module(body=parsed.body[:-1], type_ignores=[])
                    exec(
                        compile(exec_ast, source_label, "exec"),
                        exec_globals,
                        exec_globals,
                    )
                last_expr = ast.Expression(body=parsed.body[-1].value)
                value = eval(
                    compile(last_expr, source_label.replace("-exec", "-eval"), "eval"),
                    exec_globals,
                    exec_globals,
                )
                result_repr = repr(value)
            else:
                exec(
                    compile(parsed, source_label, "exec"),
                    exec_globals,
                    exec_globals,
                )
    except Exception as e:
        tb = traceback.format_exc()
        error = e
        # Append traceback to stderr
        stderr_buf.write(tb)

    return (
        stdout_buf.getvalue(),
        stderr_buf.getvalue(),
        result_repr,
        error,
    )

build_truncated_response

build_truncated_response(*, status: str, output_id: str, stdout_full: str, stderr_full: str, result_repr: str | None, line_limit: int | str, error: Exception | None = None) -> dict[str, Any]

Build a response dict with optional output truncation.

This is the shared pattern used by execute_code in both server.py and bridge_server.py, and also by install_packages.

Parameters:

Name Type Description Default
status str

"ok" or "error".

required
output_id str

The stored output ID.

required
stdout_full str

Full stdout/stderr content.

required
stderr_full str

Full stdout/stderr content.

required
result_repr str or None

The repr of the last expression result.

required
line_limit int or str

Maximum lines (-1 for unlimited). Strings are converted to int; invalid values fall back to 30.

required
error Exception or None

The exception, if status == "error".

None

Returns:

Type Description
dict[str, Any]

The response dict ready to return from a tool.

Source code in src/napari_mcp/_helpers.py
def build_truncated_response(
    *,
    status: str,
    output_id: str,
    stdout_full: str,
    stderr_full: str,
    result_repr: str | None,
    line_limit: int | str,
    error: Exception | None = None,
) -> dict[str, Any]:
    """Build a response dict with optional output truncation.

    This is the shared pattern used by ``execute_code`` in both server.py
    and bridge_server.py, and also by ``install_packages``.

    Parameters
    ----------
    status : str
        "ok" or "error".
    output_id : str
        The stored output ID.
    stdout_full, stderr_full : str
        Full stdout/stderr content.
    result_repr : str or None
        The repr of the last expression result.
    line_limit : int or str
        Maximum lines (-1 for unlimited). Strings are converted to int;
        invalid values fall back to 30.
    error : Exception or None
        The exception, if status == "error".

    Returns
    -------
    dict[str, Any]
        The response dict ready to return from a tool.
    """
    response: dict[str, Any] = {
        "status": status,
        "output_id": output_id,
    }
    if result_repr is not None:
        response["result_repr"] = result_repr

    try:
        line_limit = int(line_limit)
    except (ValueError, TypeError):
        line_limit = 30

    if line_limit == -1:
        response["warning"] = (
            "Unlimited output requested. This may consume a large number "
            "of tokens. Consider using read_output for large outputs."
        )
        response["stdout"] = stdout_full
        response["stderr"] = stderr_full
    else:
        limit = line_limit
        stdout_truncated, stdout_was_truncated = truncate_output(stdout_full, limit)
        stderr_truncated, stderr_was_truncated = truncate_output(stderr_full, limit)
        response["stdout"] = stdout_truncated

        # For errors, inject a summary line if not already visible
        if status == "error" and error is not None:
            error_summary = f"{type(error).__name__}: {error}"
            if error_summary not in stderr_truncated:
                if stderr_truncated and not stderr_truncated.endswith("\n"):
                    stderr_truncated += "\n"
                stderr_truncated += error_summary + "\n"

        response["stderr"] = stderr_truncated
        if stdout_was_truncated or stderr_was_truncated:
            response["truncated"] = True
            response["message"] = (
                f"Output truncated to {limit} lines. "
                f"Use read_output('{output_id}') to retrieve full output."
            )

    return response