Skip to content

Utilities

Advanced utility functions for screenshots, timelapse capture, code execution, package management, and output retrieval.

screenshot

MCP Tool Interface

Take a screenshot of the napari canvas and return as base64.

Parameters:

Name Type Description Default
canvas_only bool

If True, only capture the canvas area.

True

Returns:

Type Description
ImageContent

The screenshot image as an mcp.types.ImageContent object.

Source code in src/napari_mcp/server.py
async def screenshot(canvas_only: bool | str = True) -> ImageContent:
    """
    Take a screenshot of the napari canvas and return as base64.

    Parameters
    ----------
    canvas_only : bool, default=True
        If True, only capture the canvas area.

    Returns
    -------
    ImageContent
        The screenshot image as an mcp.types.ImageContent object.
    """
    return await NapariMCPTools.screenshot(canvas_only)

Implementation

Take a screenshot of the napari canvas and return as base64.

Parameters:

Name Type Description Default
canvas_only bool

If True, only capture the canvas area.

True

Returns:

Type Description
ImageContent

The screenshot image as an mcp.types.ImageContent object.

Source code in src/napari_mcp/server.py
@staticmethod
async def screenshot(canvas_only: bool | str = True) -> ImageContent:
    """
    Take a screenshot of the napari canvas and return as base64.

    Parameters
    ----------
    canvas_only : bool, default=True
        If True, only capture the canvas area.

    Returns
    -------
    ImageContent
        The screenshot image as an mcp.types.ImageContent object.
    """
    # Try to proxy to external viewer first
    result = await _proxy_to_external("screenshot", {"canvas_only": canvas_only})
    if result is not None:
        return result

    # Local execution
    async with _viewer_lock:

        def _shot():
            v = _ensure_viewer()
            _process_events(3)
            arr = v.screenshot(canvas_only=canvas_only)
            if not isinstance(arr, np.ndarray):
                arr = np.asarray(arr)
            if arr.dtype != np.uint8:
                arr = arr.astype(np.uint8, copy=False)
            img = Image.fromarray(arr)
            buf = BytesIO()
            img.save(buf, format="PNG")
            enc = buf.getvalue()
            return fastmcp.utilities.types.Image(
                data=enc, format="png"
            ).to_image_content()

        return _gui_execute(_shot)

timelapse_screenshot

MCP Tool Interface

Capture a series of screenshots while sweeping a dims axis.

Parameters:

Name Type Description Default
axis int

Dims axis index to sweep (e.g., temporal axis).

required
slice_range str

Python-like slice string over step indices, e.g. "1:5", ":6", "::2". Defaults follow Python semantics with start=0, stop=nsteps, step=1.

required
canvas_only bool

If True, only capture the canvas area.

True
interpolate_to_fit bool

If True, interpolate the images to fit the total size cap of 1309246 bytes.

False

Returns:

Type Description
list[ImageContent]

List of screenshots as mcp.types.ImageContent objects.

Source code in src/napari_mcp/server.py
async def timelapse_screenshot(
    axis: int | str,
    slice_range: str,
    canvas_only: bool | str = True,
    interpolate_to_fit: bool = True,
) -> list[ImageContent]:
    """
    Capture a series of screenshots while sweeping a dims axis.

    Parameters
    ----------
    axis : int
        Dims axis index to sweep (e.g., temporal axis).
    slice_range : str
        Python-like slice string over step indices, e.g. "1:5", ":6", "::2".
        Defaults follow Python semantics with start=0, stop=nsteps, step=1.
    canvas_only : bool, default=True
        If True, only capture the canvas area.
    interpolate_to_fit : bool, default=False
        If True, interpolate the images to fit the total size cap of 1309246 bytes.

    Returns
    -------
    list[ImageContent]
        List of screenshots as mcp.types.ImageContent objects.
    """
    return await NapariMCPTools.timelapse_screenshot(
        axis=axis,
        slice_range=slice_range,
        canvas_only=canvas_only,
        interpolate_to_fit=interpolate_to_fit,
    )

Implementation

Capture a series of screenshots while sweeping a dims axis.

Parameters:

Name Type Description Default
axis int

Dims axis index to sweep (e.g., temporal axis).

required
slice_range str

Python-like slice string over step indices, e.g. "1:5", ":6", "::2". Defaults follow Python semantics with start=0, stop=nsteps, step=1.

required
canvas_only bool

If True, only capture the canvas area.

True
interpolate_to_fit bool

If True, interpolate the images to fit the total size cap of 1309246 bytes.

False

Returns:

Type Description
list[ImageContent]

List of screenshots as mcp.types.ImageContent objects.

Source code in src/napari_mcp/server.py
@staticmethod
async def timelapse_screenshot(
    axis: int | str,
    slice_range: str,
    canvas_only: bool | str = True,
    interpolate_to_fit: bool = True,
) -> list[ImageContent]:
    """
    Capture a series of screenshots while sweeping a dims axis.

    Parameters
    ----------
    axis : int
        Dims axis index to sweep (e.g., temporal axis).
    slice_range : str
        Python-like slice string over step indices, e.g. "1:5", ":6", "::2".
        Defaults follow Python semantics with start=0, stop=nsteps, step=1.
    canvas_only : bool, default=True
        If True, only capture the canvas area.
    interpolate_to_fit : bool, default=False
        If True, interpolate the images to fit the total size cap of 1309246 bytes.

    Returns
    -------
    list[ImageContent]
        List of screenshots as mcp.types.ImageContent objects.
    """
    max_total_base64_bytes = 1309246 if interpolate_to_fit else None

    # Try to proxy to external viewer first
    result = await _proxy_to_external(
        "timelapse_screenshot",
        {
            "axis": axis,
            "slice_range": slice_range,
            "canvas_only": canvas_only,
            "interpolate_to_fit": interpolate_to_fit,
        },
    )
    if result is not None:
        return result  # type: ignore[return-value]

    def _parse_slice(spec: str, length: int) -> list[int]:
        # Normalize
        s = (spec or "").strip()
        # Single integer
        if s and ":" not in s:
            try:
                idx = int(s)
            except Exception as err:
                raise ValueError(f"Invalid slice range: {spec!r}") from err
            if idx < 0:
                idx += length
            if not (0 <= idx < length):
                raise ValueError(
                    f"Index out of bounds for axis with {length} steps: {idx}"
                )
            return [idx]

        # Slice form start:stop:step
        parts = s.split(":")
        if len(parts) > 3:
            raise ValueError(f"Invalid slice range: {spec!r}")
        start_s, stop_s, step_s = (parts + [""] * 3)[:3]

        def _to_int_or_none(val: str) -> int | None:
            v = val.strip()
            if v == "":
                return None
            return int(v)

        start = _to_int_or_none(start_s)
        stop = _to_int_or_none(stop_s)
        step = _to_int_or_none(step_s) or 1
        if step == 0:
            raise ValueError("slice step cannot be 0")

        # Handle negatives like Python
        if start is None:
            start = 0 if step > 0 else length - 1
        if stop is None:
            stop = length if step > 0 else -1
        if start < 0:
            start += length
        if stop < 0:
            stop += length

        # Clamp to valid iteration range similar to range() behavior
        rng = range(start, stop, step)
        indices = [i for i in rng if 0 <= i < length]
        return indices

    # Local execution
    async with _viewer_lock:

        def _run_series():
            v = _ensure_viewer()

            # Determine number of steps along axis
            try:
                nsteps_tuple = getattr(v.dims, "nsteps", None)
                if nsteps_tuple is None:
                    # Fallback: infer from current_step length and a conservative stop
                    # We cannot reliably infer total steps without dims.nsteps; require it
                    raise AttributeError
                total = int(nsteps_tuple[int(axis)])
            except Exception:
                # Best effort via bounds from layers; may be approximate
                try:
                    total = max(
                        int(getattr(lyr.data, "shape", [1])[(int(axis))])
                        if int(axis) < getattr(lyr.data, "ndim", 0)
                        else 1
                        for lyr in v.layers
                    )
                except Exception:
                    total = 0

            if total <= 0:
                raise RuntimeError(
                    "Unable to determine number of steps for the given axis"
                )

            indices = _parse_slice(slice_range, total)
            if not indices:
                return []

            # Take a sample at the first index to estimate size
            v.dims.set_current_step(int(axis), int(indices[0]))
            _process_events(2)
            sample_arr = v.screenshot(canvas_only=canvas_only)
            if not isinstance(sample_arr, np.ndarray):
                sample_arr = np.asarray(sample_arr)
            if sample_arr.dtype != np.uint8:
                sample_arr = sample_arr.astype(np.uint8, copy=False)
            sample_img = Image.fromarray(sample_arr)
            sbuf = BytesIO()
            sample_img.save(sbuf, format="PNG")
            sample_png = sbuf.getvalue()
            sample_b64_len = ((len(sample_png) + 2) // 3) * 4

            # Ask user whether to downsample if estimated total exceeds cap
            downsample_factor = 1.0
            if (
                max_total_base64_bytes is not None
                and sample_b64_len * len(indices) > max_total_base64_bytes
            ):
                est_factor = math.sqrt(
                    max_total_base64_bytes
                    / float(max(1, sample_b64_len * len(indices)))
                )
                downsample_factor = max(0.05, min(1.0, est_factor))

            images: list[ImageContent] = []  # type: ignore
            total_b64_len = 0
            for idx in indices:
                # Move slider
                v.dims.set_current_step(int(axis), int(idx))
                _process_events(2)

                # Capture
                arr = v.screenshot(canvas_only=canvas_only)
                if not isinstance(arr, np.ndarray):
                    arr = np.asarray(arr)
                if arr.dtype != np.uint8:
                    arr = arr.astype(np.uint8, copy=False)

                img = Image.fromarray(arr)
                if downsample_factor < 1.0:
                    new_w = max(1, int(img.width * downsample_factor))
                    new_h = max(1, int(img.height * downsample_factor))
                    if new_w != img.width or new_h != img.height:
                        img = img.resize((new_w, new_h), resample=Image.BILINEAR)
                buf = BytesIO()
                img.save(buf, format="PNG")
                enc = buf.getvalue()
                b64_len = ((len(enc) + 2) // 3) * 4
                if (
                    max_total_base64_bytes is not None
                    and total_b64_len + b64_len > max_total_base64_bytes
                ):
                    break
                total_b64_len += b64_len
                images.append(
                    fastmcp.utilities.types.Image(
                        data=enc, format="png"
                    ).to_image_content()
                )

            return images

        return _gui_execute(_run_series)

execute_code

MCP Tool Interface

Execute arbitrary Python code in the server's interpreter.

Similar to napari's console. The execution namespace persists across calls and includes 'viewer', 'napari', and 'np'.

Parameters:

Name Type Description Default
code str

Python code string. The value of the last expression (if any) is returned as 'result_repr'.

required
line_limit int

Maximum number of output lines to return. Use -1 for unlimited output. Warning: Using -1 may consume a large number of tokens.

30

Returns:

Type Description
dict

Dictionary with 'status', optional 'result_repr', 'stdout', 'stderr', and 'output_id' for retrieving full output if truncated.

Source code in src/napari_mcp/server.py
async def execute_code(code: str, line_limit: int | str = 30) -> dict[str, Any]:
    """
    Execute arbitrary Python code in the server's interpreter.

    Similar to napari's console. The execution namespace persists across calls
    and includes 'viewer', 'napari', and 'np'.

    Parameters
    ----------
    code : str
        Python code string. The value of the last expression (if any)
        is returned as 'result_repr'.
    line_limit : int, default=30
        Maximum number of output lines to return. Use -1 for unlimited output.
        Warning: Using -1 may consume a large number of tokens.

    Returns
    -------
    dict
        Dictionary with 'status', optional 'result_repr', 'stdout', 'stderr',
        and 'output_id' for retrieving full output if truncated.
    """
    return await NapariMCPTools.execute_code(code=code, line_limit=line_limit)

Implementation

Execute arbitrary Python code in the server's interpreter.

Similar to napari's console. The execution namespace persists across calls and includes 'viewer', 'napari', and 'np'.

Parameters:

Name Type Description Default
code str

Python code string. The value of the last expression (if any) is returned as 'result_repr'.

required
line_limit int

Maximum number of output lines to return. Use -1 for unlimited output. Warning: Using -1 may consume a large number of tokens.

30

Returns:

Type Description
dict

Dictionary with 'status', optional 'result_repr', 'stdout', 'stderr', and 'output_id' for retrieving full output if truncated.

Source code in src/napari_mcp/server.py
@staticmethod
async def execute_code(code: str, line_limit: int | str = 30) -> dict[str, Any]:
    """
    Execute arbitrary Python code in the server's interpreter.

    Similar to napari's console. The execution namespace persists across calls
    and includes 'viewer', 'napari', and 'np'.

    Parameters
    ----------
    code : str
        Python code string. The value of the last expression (if any)
        is returned as 'result_repr'.
    line_limit : int, default=30
        Maximum number of output lines to return. Use -1 for unlimited output.
        Warning: Using -1 may consume a large number of tokens.

    Returns
    -------
    dict
        Dictionary with 'status', optional 'result_repr', 'stdout', 'stderr',
        and 'output_id' for retrieving full output if truncated.
    """
    # Try to proxy to external viewer first
    result = await _proxy_to_external("execute_code", {"code": code})
    if result is not None:
        return result

    # Local execution
    async with _viewer_lock:
        v = _ensure_viewer()
        _exec_globals.setdefault("__builtins__", __builtins__)  # type: ignore[assignment]
        _exec_globals["viewer"] = v
        napari_mod = napari
        if napari_mod is not None:
            _exec_globals.setdefault("napari", napari_mod)
        _exec_globals.setdefault("np", np)

        stdout_buf = StringIO()
        stderr_buf = StringIO()
        result_repr: str | None = None
        try:
            # Capture stdout/stderr during execution
            with (
                contextlib.redirect_stdout(stdout_buf),
                contextlib.redirect_stderr(stderr_buf),
            ):
                # Try to evaluate last expression if present
                parsed = ast.parse(code, mode="exec")
                if parsed.body and isinstance(parsed.body[-1], ast.Expr):
                    # Execute all but last, then eval last expression to
                    # capture a result
                    if len(parsed.body) > 1:
                        exec_ast = ast.Module(
                            body=parsed.body[:-1], type_ignores=[]
                        )
                        exec(
                            compile(exec_ast, "<mcp-exec>", "exec"),
                            _exec_globals,
                            _exec_globals,
                        )
                    last_expr = ast.Expression(body=parsed.body[-1].value)
                    value = eval(
                        compile(last_expr, "<mcp-eval>", "eval"),
                        _exec_globals,
                        _exec_globals,
                    )
                    result_repr = repr(value)
                else:
                    # Pure statements
                    exec(
                        compile(parsed, "<mcp-exec>", "exec"),
                        _exec_globals,
                        _exec_globals,
                    )
            _process_events(2)

            # Get full output
            stdout_full = stdout_buf.getvalue()
            stderr_full = stderr_buf.getvalue()

            # Store full output and get ID
            output_id = await _store_output(
                tool_name="execute_code",
                stdout=stdout_full,
                stderr=stderr_full,
                result_repr=result_repr,
                code=code,
            )

            # Prepare response with line limiting
            response = {
                "status": "ok",
                "output_id": output_id,
                **({"result_repr": result_repr} if result_repr is not None else {}),
            }

            # Add warning for unlimited output
            if line_limit == -1:
                response["warning"] = (
                    "Unlimited output requested. This may consume a large number "
                    "of tokens. Consider using read_output for large outputs."
                )
                response["stdout"] = stdout_full
                response["stderr"] = stderr_full
            else:
                # Truncate stdout and stderr
                stdout_truncated, stdout_was_truncated = _truncate_output(
                    stdout_full, int(line_limit)
                )
                stderr_truncated, stderr_was_truncated = _truncate_output(
                    stderr_full, int(line_limit)
                )

                response["stdout"] = stdout_truncated
                response["stderr"] = stderr_truncated

                if stdout_was_truncated or stderr_was_truncated:
                    response["truncated"] = True  # type: ignore
                    response["message"] = (
                        f"Output truncated to {line_limit} lines. "
                        f"Use read_output('{output_id}') to retrieve full output."
                    )

            return response
        except Exception as e:
            _process_events(1)
            tb = traceback.format_exc()

            # Get full output including traceback
            stdout_full = stdout_buf.getvalue()
            stderr_full = stderr_buf.getvalue() + tb

            # Store full output and get ID
            output_id = await _store_output(
                tool_name="execute_code",
                stdout=stdout_full,
                stderr=stderr_full,
                code=code,
                error=True,
            )

            # Prepare error response with line limiting
            response = {
                "status": "error",
                "output_id": output_id,
            }

            # Add warning for unlimited output
            if line_limit == -1:
                response["warning"] = (
                    "Unlimited output requested. This may consume a large number "
                    "of tokens. Consider using read_output for large outputs."
                )
                response["stdout"] = stdout_full
                response["stderr"] = stderr_full
            else:
                # Truncate stdout and stderr
                stdout_truncated, stdout_was_truncated = _truncate_output(
                    stdout_full, int(line_limit)
                )
                stderr_truncated, stderr_was_truncated = _truncate_output(
                    stderr_full, int(line_limit)
                )

                response["stdout"] = stdout_truncated
                # Ensure exception summary is present even when truncated
                error_summary = f"{type(e).__name__}: {e}"
                if error_summary not in stderr_truncated:
                    # Append a concise summary line so callers can see the error type
                    if stderr_truncated and not stderr_truncated.endswith("\n"):
                        stderr_truncated += "\n"
                    stderr_truncated += error_summary + "\n"
                response["stderr"] = stderr_truncated

                if stdout_was_truncated or stderr_was_truncated:
                    response["truncated"] = True  # type: ignore
                    response["message"] = (
                        f"Output truncated to {line_limit} lines. "
                        f"Use read_output('{output_id}') to retrieve full output."
                    )

            return response

install_packages

MCP Tool Interface

Install Python packages using pip.

Install packages into the currently running server environment.

Parameters:

Name Type Description Default
packages list of str

List of package specifiers (e.g., "scikit-image", "torch==2.3.1").

required
upgrade bool

If True, pass --upgrade flag.

False
no_deps bool

If True, pass --no-deps flag.

False
index_url str

Custom index URL.

None
extra_index_url str

Extra index URL.

None
pre bool

Allow pre-releases (--pre flag).

False
line_limit int

Maximum number of output lines to return. Use -1 for unlimited output. Warning: Using -1 may consume a large number of tokens.

30
timeout int

Timeout for pip install in seconds.

240

Returns:

Type Description
dict

Dictionary including status, returncode, stdout, stderr, command, and output_id for retrieving full output if truncated.

Source code in src/napari_mcp/server.py
async def install_packages(
    packages: list[str],
    upgrade: bool | None = False,
    no_deps: bool | None = False,
    index_url: str | None = None,
    extra_index_url: str | None = None,
    pre: bool | None = False,
    line_limit: int | str = 30,
    timeout: int = 240,
) -> dict[str, Any]:
    """
    Install Python packages using pip.

    Install packages into the currently running server environment.

    Parameters
    ----------
    packages : list of str
        List of package specifiers (e.g., "scikit-image", "torch==2.3.1").
    upgrade : bool, optional
        If True, pass --upgrade flag.
    no_deps : bool, optional
        If True, pass --no-deps flag.
    index_url : str, optional
        Custom index URL.
    extra_index_url : str, optional
        Extra index URL.
    pre : bool, optional
        Allow pre-releases (--pre flag).
    line_limit : int, default=30
        Maximum number of output lines to return. Use -1 for unlimited output.
        Warning: Using -1 may consume a large number of tokens.
    timeout : int, default=240
        Timeout for pip install in seconds.

    Returns
    -------
    dict
        Dictionary including status, returncode, stdout, stderr, command,
        and output_id for retrieving full output if truncated.
    """
    return await NapariMCPTools.install_packages(
        packages=packages,
        upgrade=upgrade,
        no_deps=no_deps,
        index_url=index_url,
        extra_index_url=extra_index_url,
        pre=pre,
        line_limit=line_limit,
        timeout=timeout,
    )

Implementation

Install Python packages using pip.

Install packages into the currently running server environment.

Parameters:

Name Type Description Default
packages list of str

List of package specifiers (e.g., "scikit-image", "torch==2.3.1").

required
upgrade bool

If True, pass --upgrade flag.

False
no_deps bool

If True, pass --no-deps flag.

False
index_url str

Custom index URL.

None
extra_index_url str

Extra index URL.

None
pre bool

Allow pre-releases (--pre flag).

False
line_limit int

Maximum number of output lines to return. Use -1 for unlimited output. Warning: Using -1 may consume a large number of tokens.

30
timeout int

Timeout for pip install in seconds.

240

Returns:

Type Description
dict

Dictionary including status, returncode, stdout, stderr, command, and output_id for retrieving full output if truncated.

Source code in src/napari_mcp/server.py
@staticmethod
async def install_packages(
    packages: list[str],
    upgrade: bool | None = False,
    no_deps: bool | None = False,
    index_url: str | None = None,
    extra_index_url: str | None = None,
    pre: bool | None = False,
    line_limit: int | str = 30,
    timeout: int = 240,
) -> dict[str, Any]:
    """
    Install Python packages using pip.

    Install packages into the currently running server environment.

    Parameters
    ----------
    packages : list of str
        List of package specifiers (e.g., "scikit-image", "torch==2.3.1").
    upgrade : bool, optional
        If True, pass --upgrade flag.
    no_deps : bool, optional
        If True, pass --no-deps flag.
    index_url : str, optional
        Custom index URL.
    extra_index_url : str, optional
        Extra index URL.
    pre : bool, optional
        Allow pre-releases (--pre flag).
    line_limit : int, default=30
        Maximum number of output lines to return. Use -1 for unlimited output.
        Warning: Using -1 may consume a large number of tokens.
    timeout : int, default=240
        Timeout for pip install in seconds.

    Returns
    -------
    dict
        Dictionary including status, returncode, stdout, stderr, command,
        and output_id for retrieving full output if truncated.
    """
    # Try to proxy to external viewer first
    result = await _proxy_to_external(
        "install_packages",
        {
            "packages": packages,
            "upgrade": upgrade,
            "no_deps": no_deps,
            "index_url": index_url,
            "extra_index_url": extra_index_url,
            "pre": pre,
            "line_limit": line_limit,
            "timeout": timeout,
        },
    )
    if result is not None:
        return result

    if not packages or not isinstance(packages, list):
        return {
            "status": "error",
            "message": "Parameter 'packages' must be a non-empty list of package names",
        }

    cmd: list[str] = [
        sys.executable,
        "-m",
        "pip",
        "install",
        "--no-input",
        "--disable-pip-version-check",
    ]
    if upgrade:
        cmd.append("--upgrade")
    if no_deps:
        cmd.append("--no-deps")
    if pre:
        cmd.append("--pre")
    if index_url:
        cmd.extend(["--index-url", index_url])
    if extra_index_url:
        cmd.extend(["--extra-index-url", extra_index_url])
    cmd.extend(packages)

    # Run pip as a subprocess without blocking the event loop
    proc = await asyncio.create_subprocess_exec(
        *cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    try:
        stdout_b, stderr_b = await asyncio.wait_for(
            proc.communicate(), timeout=timeout
        )
    except asyncio.TimeoutError:
        with contextlib.suppress(ProcessLookupError):
            proc.kill()
        stdout_b, stderr_b = b"", f"pip install timed out after {timeout}s".encode()
    stdout = stdout_b.decode(errors="replace")
    stderr = stderr_b.decode(errors="replace")

    status = "ok" if proc.returncode == 0 else "error"
    command_str = " ".join(shlex.quote(part) for part in cmd)

    # Store full output and get ID
    output_id = await _store_output(
        tool_name="install_packages",
        stdout=stdout,
        stderr=stderr,
        packages=packages,
        command=command_str,
        returncode=proc.returncode,
    )

    # Prepare response with line limiting
    response = {
        "status": status,
        "returncode": proc.returncode if proc.returncode is not None else -1,
        "command": command_str,
        "output_id": output_id,
    }

    # Add warning for unlimited output
    if line_limit == -1:
        response["warning"] = (
            "Unlimited output requested. This may consume a large number "
            "of tokens. Consider using read_output for large outputs."
        )
        response["stdout"] = stdout
        response["stderr"] = stderr
    else:
        # Truncate stdout and stderr
        stdout_truncated, stdout_was_truncated = _truncate_output(
            stdout, int(line_limit)
        )
        stderr_truncated, stderr_was_truncated = _truncate_output(
            stderr, int(line_limit)
        )

        response["stdout"] = stdout_truncated
        response["stderr"] = stderr_truncated

        if stdout_was_truncated or stderr_was_truncated:
            response["truncated"] = True
            response["message"] = (
                f"Output truncated to {line_limit} lines. "
                f"Use read_output('{output_id}') to retrieve full output."
            )

    return response

read_output

MCP Tool Interface

Read stored tool output with optional line range.

Parameters:

Name Type Description Default
output_id str

Unique ID of the stored output.

required
start int

Starting line number (0-indexed).

0
end int

Ending line number (exclusive). If -1, read to end.

-1

Returns:

Type Description
dict

Dictionary containing the requested output lines and metadata.

Source code in src/napari_mcp/server.py
async def read_output(
    output_id: str, start: int | str = 0, end: int | str = -1
) -> dict[str, Any]:
    """
    Read stored tool output with optional line range.

    Parameters
    ----------
    output_id : str
        Unique ID of the stored output.
    start : int, default=0
        Starting line number (0-indexed).
    end : int, default=-1
        Ending line number (exclusive). If -1, read to end.

    Returns
    -------
    dict
        Dictionary containing the requested output lines and metadata.
    """
    return await NapariMCPTools.read_output(output_id=output_id, start=start, end=end)

Implementation

Read stored tool output with optional line range.

Parameters:

Name Type Description Default
output_id str

Unique ID of the stored output.

required
start int

Starting line number (0-indexed).

0
end int

Ending line number (exclusive). If -1, read to end.

-1

Returns:

Type Description
dict

Dictionary containing the requested output lines and metadata.

Source code in src/napari_mcp/server.py
@staticmethod
async def read_output(
    output_id: str, start: int | str = 0, end: int | str = -1
) -> dict[str, Any]:
    """
    Read stored tool output with optional line range.

    Parameters
    ----------
    output_id : str
        Unique ID of the stored output.
    start : int, default=0
        Starting line number (0-indexed).
    end : int, default=-1
        Ending line number (exclusive). If -1, read to end.

    Returns
    -------
    dict
        Dictionary containing the requested output lines and metadata.
    """
    async with _output_storage_lock:
        if output_id not in _output_storage:
            return {
                "status": "error",
                "message": f"Output ID '{output_id}' not found",
            }

        stored_output = _output_storage[output_id]

        # Combine stdout and stderr for line-based access
        full_output = ""
        if stored_output.get("stdout"):
            full_output = stored_output["stdout"]
        if stored_output.get("stderr"):
            stderr_text = stored_output["stderr"]
            if (
                full_output
                and not full_output.endswith("\n")
                and not stderr_text.startswith("\n")
            ):
                full_output += "\n"
            full_output += stderr_text

        # Normalize and clamp range inputs
        try:
            start = int(start)
        except Exception:
            start = 0
        try:
            end = int(end)
        except Exception:
            end = -1

        start = max(0, start)

        lines = full_output.splitlines(keepends=True)
        total_lines = len(lines)

        # Handle line range
        end = total_lines if end == -1 else min(total_lines, end)

        selected_lines = [] if start >= total_lines else lines[start:end]

        return {
            "status": "ok",
            "output_id": output_id,
            "tool_name": stored_output["tool_name"],
            "timestamp": stored_output["timestamp"],
            "lines": selected_lines,
            "line_range": {"start": start, "end": min(end, total_lines)},
            "total_lines": total_lines,
            "result_repr": stored_output.get("result_repr"),
        }