Skip to content

Complete API Reference

Auto-generated documentation for all napari MCP server functions.

Documentation Structure

This page shows both the MCP tool interface (what you call from AI assistants) and the implementation (NapariMCPTools class methods). All functions use NumPy-style docstrings with detailed parameter and return information.

MCP Tool Interface (server.py wrappers)

These are the functions exposed as MCP tools:

Napari MCP Server.

Exposes a set of MCP tools to control a running napari Viewer: layer control, viewer control (zoom, camera, dims), and a screenshot tool returning a PNG image as base64.

Functions

set_gui_executor

set_gui_executor(executor: Any | None) -> None

Configure an executor that runs a callable on the GUI/main thread.

If None, operations execute directly in the current thread.

Source code in src/napari_mcp/server.py
def set_gui_executor(executor: Any | None) -> None:
    """Configure an executor that runs a callable on the GUI/main thread.

    If None, operations execute directly in the current thread.
    """
    global _GUI_EXECUTOR
    _GUI_EXECUTOR = executor

detect_viewers async

detect_viewers() -> dict[str, Any]

Detect available viewers (local and external).

Returns:

Type Description
dict

Dictionary with information about available viewers

Source code in src/napari_mcp/server.py
async def detect_viewers() -> dict[str, Any]:
    """
    Detect available viewers (local and external).

    Returns
    -------
    dict
        Dictionary with information about available viewers
    """
    return await NapariMCPTools.detect_viewers()

init_viewer async

init_viewer(title: str | None = None, width: int | str | None = None, height: int | str | None = None, port: int | str | None = None) -> dict[str, Any]

Create or return the napari viewer (local or external).

Parameters:

Name Type Description Default
title str

Optional window title (only for local viewer).

None
width int

Optional initial canvas width (only for local viewer).

None
height int

Optional initial canvas height (only for local viewer).

None
port int

If provided, attempt to connect to an external napari-mcp bridge on this port (default is taken from NAPARI_MCP_BRIDGE_PORT or 9999).

None

Returns:

Type Description
dict

Dictionary containing status, viewer type, and layer info.

Source code in src/napari_mcp/server.py
async def init_viewer(
    title: str | None = None,
    width: int | str | None = None,
    height: int | str | None = None,
    port: int | str | None = None,
) -> dict[str, Any]:
    """
    Create or return the napari viewer (local or external).

    Parameters
    ----------
    title : str, optional
        Optional window title (only for local viewer).
    width : int, optional
        Optional initial canvas width (only for local viewer).
    height : int, optional
        Optional initial canvas height (only for local viewer).
    port : int, optional
        If provided, attempt to connect to an external napari-mcp bridge on
        this port (default is taken from NAPARI_MCP_BRIDGE_PORT or 9999).

    Returns
    -------
    dict
        Dictionary containing status, viewer type, and layer info.
    """
    return await NapariMCPTools.init_viewer(
        title=title, width=width, height=height, port=port
    )

close_viewer async

close_viewer() -> dict[str, Any]

Close the viewer window and clear all layers.

Returns:

Type Description
dict

Dictionary with status: 'closed' if viewer existed, 'no_viewer' if none.

Source code in src/napari_mcp/server.py
async def close_viewer() -> dict[str, Any]:
    """
    Close the viewer window and clear all layers.

    Returns
    -------
    dict
        Dictionary with status: 'closed' if viewer existed, 'no_viewer' if none.
    """
    return await NapariMCPTools.close_viewer()

session_information async

session_information() -> dict[str, Any]

Get comprehensive information about the current napari session.

Returns:

Type Description
dict

Comprehensive session information including viewer state, system info, and environment details.

Source code in src/napari_mcp/server.py
async def session_information() -> dict[str, Any]:
    """
    Get comprehensive information about the current napari session.

    Returns
    -------
    dict
        Comprehensive session information including viewer state, system info,
        and environment details.
    """
    return await NapariMCPTools.session_information()

list_layers async

list_layers() -> list[dict[str, Any]]

Return a list of layers with key properties.

Source code in src/napari_mcp/server.py
async def list_layers() -> list[dict[str, Any]]:
    """Return a list of layers with key properties."""
    return await NapariMCPTools.list_layers()

add_image async

add_image(path: str, name: str | None = None, colormap: str | None = None, blending: str | None = None, channel_axis: int | str | None = None) -> dict[str, Any]

Add an image layer from a file path.

Parameters:

Name Type Description Default
path str

Path to an image readable by imageio (e.g., PNG, TIFF, OME-TIFF).

required
name str

Layer name. If None, uses filename.

None
colormap str

Napari colormap name (e.g., 'gray', 'magma').

None
blending str

Blending mode (e.g., 'translucent').

None
channel_axis int

If provided, interpret that axis as channels.

None

Returns:

Type Description
dict

Dictionary containing status, layer name, and image shape.

Source code in src/napari_mcp/server.py
async def add_image(
    path: str,
    name: str | None = None,
    colormap: str | None = None,
    blending: str | None = None,
    channel_axis: int | str | None = None,
) -> dict[str, Any]:
    """
    Add an image layer from a file path.

    Parameters
    ----------
    path : str
        Path to an image readable by imageio (e.g., PNG, TIFF, OME-TIFF).
    name : str, optional
        Layer name. If None, uses filename.
    colormap : str, optional
        Napari colormap name (e.g., 'gray', 'magma').
    blending : str, optional
        Blending mode (e.g., 'translucent').
    channel_axis : int, optional
        If provided, interpret that axis as channels.

    Returns
    -------
    dict
        Dictionary containing status, layer name, and image shape.
    """
    return await NapariMCPTools.add_image(
        path=path,
        name=name,
        colormap=colormap,
        blending=blending,
        channel_axis=channel_axis,
    )

add_labels async

add_labels(path: str, name: str | None = None) -> dict[str, Any]

Add a labels layer from a file path (e.g., PNG/TIFF with integer labels).

Source code in src/napari_mcp/server.py
async def add_labels(path: str, name: str | None = None) -> dict[str, Any]:
    """Add a labels layer from a file path (e.g., PNG/TIFF with integer labels)."""
    return await NapariMCPTools.add_labels(path=path, name=name)

add_points async

add_points(points: list[list[float]], name: str | None = None, size: float | str = 10.0) -> dict[str, Any]

Add a points layer.

  • points: List of [y, x] or [z, y, x] coordinates
  • name: Optional layer name
  • size: Point size in pixels
Source code in src/napari_mcp/server.py
async def add_points(
    points: list[list[float]], name: str | None = None, size: float | str = 10.0
) -> dict[str, Any]:
    """
    Add a points layer.

    - points: List of [y, x] or [z, y, x] coordinates
    - name: Optional layer name
    - size: Point size in pixels
    """
    return await NapariMCPTools.add_points(points=points, name=name, size=size)

remove_layer async

remove_layer(name: str) -> dict[str, Any]

Remove a layer by name.

Source code in src/napari_mcp/server.py
async def remove_layer(name: str) -> dict[str, Any]:
    """Remove a layer by name."""
    return await NapariMCPTools.remove_layer(name)

set_layer_properties async

set_layer_properties(name: str, visible: bool | None = None, opacity: float | None = None, colormap: str | None = None, blending: str | None = None, contrast_limits: list[float] | None = None, gamma: float | str | None = None, new_name: str | None = None) -> dict[str, Any]

Set common properties on a layer by name.

Source code in src/napari_mcp/server.py
async def set_layer_properties(
    name: str,
    visible: bool | None = None,
    opacity: float | None = None,
    colormap: str | None = None,
    blending: str | None = None,
    contrast_limits: list[float] | None = None,
    gamma: float | str | None = None,
    new_name: str | None = None,
) -> dict[str, Any]:
    """Set common properties on a layer by name."""
    return await NapariMCPTools.set_layer_properties(
        name=name,
        visible=visible,
        opacity=opacity,
        colormap=colormap,
        blending=blending,
        contrast_limits=contrast_limits,
        gamma=gamma,
        new_name=new_name,
    )

reorder_layer async

reorder_layer(name: str, index: int | str | None = None, before: str | None = None, after: str | None = None) -> dict[str, Any]

Reorder a layer by name.

Provide exactly one of: - index: absolute target index - before: move before this layer name - after: move after this layer name

Source code in src/napari_mcp/server.py
async def reorder_layer(
    name: str,
    index: int | str | None = None,
    before: str | None = None,
    after: str | None = None,
) -> dict[str, Any]:
    """
    Reorder a layer by name.

    Provide exactly one of:
    - index: absolute target index
    - before: move before this layer name
    - after: move after this layer name
    """
    return await NapariMCPTools.reorder_layer(
        name=name, index=index, before=before, after=after
    )

set_active_layer async

set_active_layer(name: str) -> dict[str, Any]

Set the selected/active layer by name.

Source code in src/napari_mcp/server.py
async def set_active_layer(name: str) -> dict[str, Any]:
    """Set the selected/active layer by name."""
    return await NapariMCPTools.set_active_layer(name)

reset_view async

reset_view() -> dict[str, Any]

Reset the camera view to fit data.

Source code in src/napari_mcp/server.py
async def reset_view() -> dict[str, Any]:
    """Reset the camera view to fit data."""
    return await NapariMCPTools.reset_view()

set_camera async

set_camera(center: list[float] | None = None, zoom: float | str | None = None, angle: float | str | None = None) -> dict[str, Any]

Set camera properties: center, zoom, and/or angle.

Source code in src/napari_mcp/server.py
async def set_camera(
    center: list[float] | None = None,
    zoom: float | str | None = None,
    angle: float | str | None = None,
) -> dict[str, Any]:
    """Set camera properties: center, zoom, and/or angle."""
    return await NapariMCPTools.set_camera(center=center, zoom=zoom, angle=angle)

set_ndisplay async

set_ndisplay(ndisplay: int | str) -> dict[str, Any]

Set number of displayed dimensions (2 or 3).

Source code in src/napari_mcp/server.py
async def set_ndisplay(ndisplay: int | str) -> dict[str, Any]:
    """Set number of displayed dimensions (2 or 3)."""
    return await NapariMCPTools.set_ndisplay(ndisplay)

set_dims_current_step async

set_dims_current_step(axis: int | str, value: int | str) -> dict[str, Any]

Set the current step (slider position) for a specific axis.

Source code in src/napari_mcp/server.py
async def set_dims_current_step(axis: int | str, value: int | str) -> dict[str, Any]:
    """Set the current step (slider position) for a specific axis."""
    return await NapariMCPTools.set_dims_current_step(axis, value)

set_grid async

set_grid(enabled: bool | str = True) -> dict[str, Any]

Enable or disable grid view.

Source code in src/napari_mcp/server.py
async def set_grid(enabled: bool | str = True) -> dict[str, Any]:
    """Enable or disable grid view."""
    return await NapariMCPTools.set_grid(enabled)

screenshot async

screenshot(canvas_only: bool | str = True) -> ImageContent

Take a screenshot of the napari canvas and return as base64.

Parameters:

Name Type Description Default
canvas_only bool

If True, only capture the canvas area.

True

Returns:

Type Description
ImageContent

The screenshot image as an mcp.types.ImageContent object.

Source code in src/napari_mcp/server.py
async def screenshot(canvas_only: bool | str = True) -> ImageContent:
    """
    Take a screenshot of the napari canvas and return as base64.

    Parameters
    ----------
    canvas_only : bool, default=True
        If True, only capture the canvas area.

    Returns
    -------
    ImageContent
        The screenshot image as an mcp.types.ImageContent object.
    """
    return await NapariMCPTools.screenshot(canvas_only)

timelapse_screenshot async

timelapse_screenshot(axis: int | str, slice_range: str, canvas_only: bool | str = True, interpolate_to_fit: bool = True) -> list[ImageContent]

Capture a series of screenshots while sweeping a dims axis.

Parameters:

Name Type Description Default
axis int

Dims axis index to sweep (e.g., temporal axis).

required
slice_range str

Python-like slice string over step indices, e.g. "1:5", ":6", "::2". Defaults follow Python semantics with start=0, stop=nsteps, step=1.

required
canvas_only bool

If True, only capture the canvas area.

True
interpolate_to_fit bool

If True, interpolate the images to fit the total size cap of 1309246 bytes.

False

Returns:

Type Description
list[ImageContent]

List of screenshots as mcp.types.ImageContent objects.

Source code in src/napari_mcp/server.py
async def timelapse_screenshot(
    axis: int | str,
    slice_range: str,
    canvas_only: bool | str = True,
    interpolate_to_fit: bool = True,
) -> list[ImageContent]:
    """
    Capture a series of screenshots while sweeping a dims axis.

    Parameters
    ----------
    axis : int
        Dims axis index to sweep (e.g., temporal axis).
    slice_range : str
        Python-like slice string over step indices, e.g. "1:5", ":6", "::2".
        Defaults follow Python semantics with start=0, stop=nsteps, step=1.
    canvas_only : bool, default=True
        If True, only capture the canvas area.
    interpolate_to_fit : bool, default=False
        If True, interpolate the images to fit the total size cap of 1309246 bytes.

    Returns
    -------
    list[ImageContent]
        List of screenshots as mcp.types.ImageContent objects.
    """
    return await NapariMCPTools.timelapse_screenshot(
        axis=axis,
        slice_range=slice_range,
        canvas_only=canvas_only,
        interpolate_to_fit=interpolate_to_fit,
    )

execute_code async

execute_code(code: str, line_limit: int | str = 30) -> dict[str, Any]

Execute arbitrary Python code in the server's interpreter.

Similar to napari's console. The execution namespace persists across calls and includes 'viewer', 'napari', and 'np'.

Parameters:

Name Type Description Default
code str

Python code string. The value of the last expression (if any) is returned as 'result_repr'.

required
line_limit int

Maximum number of output lines to return. Use -1 for unlimited output. Warning: Using -1 may consume a large number of tokens.

30

Returns:

Type Description
dict

Dictionary with 'status', optional 'result_repr', 'stdout', 'stderr', and 'output_id' for retrieving full output if truncated.

Source code in src/napari_mcp/server.py
async def execute_code(code: str, line_limit: int | str = 30) -> dict[str, Any]:
    """
    Execute arbitrary Python code in the server's interpreter.

    Similar to napari's console. The execution namespace persists across calls
    and includes 'viewer', 'napari', and 'np'.

    Parameters
    ----------
    code : str
        Python code string. The value of the last expression (if any)
        is returned as 'result_repr'.
    line_limit : int, default=30
        Maximum number of output lines to return. Use -1 for unlimited output.
        Warning: Using -1 may consume a large number of tokens.

    Returns
    -------
    dict
        Dictionary with 'status', optional 'result_repr', 'stdout', 'stderr',
        and 'output_id' for retrieving full output if truncated.
    """
    return await NapariMCPTools.execute_code(code=code, line_limit=line_limit)

install_packages async

install_packages(packages: list[str], upgrade: bool | None = False, no_deps: bool | None = False, index_url: str | None = None, extra_index_url: str | None = None, pre: bool | None = False, line_limit: int | str = 30, timeout: int = 240) -> dict[str, Any]

Install Python packages using pip.

Install packages into the currently running server environment.

Parameters:

Name Type Description Default
packages list of str

List of package specifiers (e.g., "scikit-image", "torch==2.3.1").

required
upgrade bool

If True, pass --upgrade flag.

False
no_deps bool

If True, pass --no-deps flag.

False
index_url str

Custom index URL.

None
extra_index_url str

Extra index URL.

None
pre bool

Allow pre-releases (--pre flag).

False
line_limit int

Maximum number of output lines to return. Use -1 for unlimited output. Warning: Using -1 may consume a large number of tokens.

30
timeout int

Timeout for pip install in seconds.

240

Returns:

Type Description
dict

Dictionary including status, returncode, stdout, stderr, command, and output_id for retrieving full output if truncated.

Source code in src/napari_mcp/server.py
async def install_packages(
    packages: list[str],
    upgrade: bool | None = False,
    no_deps: bool | None = False,
    index_url: str | None = None,
    extra_index_url: str | None = None,
    pre: bool | None = False,
    line_limit: int | str = 30,
    timeout: int = 240,
) -> dict[str, Any]:
    """
    Install Python packages using pip.

    Install packages into the currently running server environment.

    Parameters
    ----------
    packages : list of str
        List of package specifiers (e.g., "scikit-image", "torch==2.3.1").
    upgrade : bool, optional
        If True, pass --upgrade flag.
    no_deps : bool, optional
        If True, pass --no-deps flag.
    index_url : str, optional
        Custom index URL.
    extra_index_url : str, optional
        Extra index URL.
    pre : bool, optional
        Allow pre-releases (--pre flag).
    line_limit : int, default=30
        Maximum number of output lines to return. Use -1 for unlimited output.
        Warning: Using -1 may consume a large number of tokens.
    timeout : int, default=240
        Timeout for pip install in seconds.

    Returns
    -------
    dict
        Dictionary including status, returncode, stdout, stderr, command,
        and output_id for retrieving full output if truncated.
    """
    return await NapariMCPTools.install_packages(
        packages=packages,
        upgrade=upgrade,
        no_deps=no_deps,
        index_url=index_url,
        extra_index_url=extra_index_url,
        pre=pre,
        line_limit=line_limit,
        timeout=timeout,
    )

read_output async

read_output(output_id: str, start: int | str = 0, end: int | str = -1) -> dict[str, Any]

Read stored tool output with optional line range.

Parameters:

Name Type Description Default
output_id str

Unique ID of the stored output.

required
start int

Starting line number (0-indexed).

0
end int

Ending line number (exclusive). If -1, read to end.

-1

Returns:

Type Description
dict

Dictionary containing the requested output lines and metadata.

Source code in src/napari_mcp/server.py
async def read_output(
    output_id: str, start: int | str = 0, end: int | str = -1
) -> dict[str, Any]:
    """
    Read stored tool output with optional line range.

    Parameters
    ----------
    output_id : str
        Unique ID of the stored output.
    start : int, default=0
        Starting line number (0-indexed).
    end : int, default=-1
        Ending line number (exclusive). If -1, read to end.

    Returns
    -------
    dict
        Dictionary containing the requested output lines and metadata.
    """
    return await NapariMCPTools.read_output(output_id=output_id, start=start, end=end)

run

run() -> None

Run the MCP server.

Source code in src/napari_mcp/server.py
@app.command()
def run() -> None:
    """Run the MCP server."""
    server.run()

install

install() -> None

Install napari-mcp in various AI clients.

NOTE: This command is deprecated. Use 'napari-mcp-install' instead.

Source code in src/napari_mcp/server.py
@app.command()
def install() -> None:
    """
    Install napari-mcp in various AI clients.

    NOTE: This command is deprecated. Use 'napari-mcp-install' instead.
    """
    from rich.console import Console

    console = Console()
    console.print(
        "\n[bold yellow]⚠️  Deprecated Command[/bold yellow]\n",
        style="yellow",
    )
    console.print(
        "The 'napari-mcp install' command has been replaced by 'napari-mcp-install'.",
    )
    console.print("\n[bold green]To install napari-mcp:[/bold green]")
    console.print("  • Run: [bold cyan]napari-mcp-install --help[/bold cyan]")
    console.print(
        "  • Or: [bold cyan]napari-mcp-install claude-desktop[/bold cyan] (for example)\n"
    )

    console.print("[yellow]Please use 'napari-mcp-install' instead.[/yellow]\n")

    raise typer.Exit(1)

main

main() -> None

Entry point that defaults to running the server.

Source code in src/napari_mcp/server.py
def main() -> None:
    """Entry point that defaults to running the server."""
    # If no arguments provided, default to running the server
    if len(sys.argv) == 1:
        server.run()
    else:
        app()

Implementation (NapariMCPTools class)

The actual implementation behind the MCP tools:

Implementation of Napari MCP tools (exactly matching server.py behavior).

Source code in src/napari_mcp/server.py
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
class NapariMCPTools:
    """Implementation of Napari MCP tools (exactly matching server.py behavior)."""

    @staticmethod
    async def detect_viewers() -> dict[str, Any]:
        """
        Detect available viewers (local and external).

        Returns
        -------
        dict
            Dictionary with information about available viewers
        """
        viewers: dict[str, Any] = {"local": None, "external": None}

        # Check for external viewer
        client, info = await _detect_external_viewer()
        if client and info is not None:
            viewers["external"] = {
                "available": True,
                "type": "napari_bridge",
                "port": info.get("bridge_port", _external_port),
                "viewer_info": info.get("viewer", {}),
            }
        else:
            viewers["external"] = {"available": False}

        # Check for local viewer
        global _viewer
        if _viewer is not None:
            viewers["local"] = {
                "available": True,
                "type": "singleton",
                "title": _viewer.title,
                "n_layers": len(_viewer.layers),
            }
        else:
            viewers["local"] = {
                "available": True,  # Can be created
                "type": "not_initialized",
            }

        return {
            "status": "ok",
            "viewers": viewers,
        }

    @staticmethod
    async def init_viewer(
        title: str | None = None,
        width: int | str | None = None,
        height: int | str | None = None,
        port: int | str | None = None,
    ) -> dict[str, Any]:
        """
        Create or return the napari viewer (local or external).

        Parameters
        ----------
        title : str, optional
            Optional window title (only for local viewer).
        width : int, optional
            Optional initial canvas width (only for local viewer).
        height : int, optional
            Optional initial canvas height (only for local viewer).
        port : int, optional
            If provided, attempt to connect to an external napari-mcp bridge on
            this port (default is taken from NAPARI_MCP_BRIDGE_PORT or 9999).

        Returns
        -------
        dict
            Dictionary containing status, viewer type, and layer info.
        """
        # Allow overriding the external port per-call
        global _external_port
        if port is not None:
            try:
                _external_port = int(port)
            except Exception:
                logger.error("Invalid port: {port}")
                _external_port = _external_port

        async with _viewer_lock:
            # Try external viewer first; fall back to local
            try:
                return await NapariMCPTools._external_session_information(
                    _external_port
                )
            except Exception:
                # No external viewer; continue to local viewer
                pass

            # Use local viewer
            v = _ensure_viewer()
            if title:
                v.title = title
            if width or height:
                w = (
                    int(width)
                    if width is not None
                    else v.window.qt_viewer.canvas.size().width()
                )
                h = (
                    int(height)
                    if height is not None
                    else v.window.qt_viewer.canvas.size().height()
                )
                v.window.qt_viewer.canvas.native.resize(w, h)
            # Always ensure GUI pump is running for local viewer (backwards-incompatible change)
            global _qt_pump_task
            app = _ensure_qt_app()
            with contextlib.suppress(Exception):
                app.setQuitOnLastWindowClosed(False)
            _connect_window_destroyed_signal(v)

            # Best-effort to show window without forcing focus (safer for tests/headless)
            try:
                qt_win = v.window._qt_window  # type: ignore[attr-defined]
                qt_win.show()
            except Exception:
                pass

            if _qt_pump_task is None or _qt_pump_task.done():
                loop = asyncio.get_running_loop()
                _qt_pump_task = loop.create_task(_qt_event_pump())

            _process_events()
            return {
                "status": "ok",
                "viewer_type": "local",
                "title": v.title,
                "layers": [lyr.name for lyr in v.layers],
            }

    @staticmethod
    async def _external_session_information(_external_port: int) -> dict[str, Any]:
        """Get session information from the external viewer."""
        test_client = Client(f"http://localhost:{_external_port}/mcp")
        async with test_client:
            result = await test_client.call_tool("session_information")
            if hasattr(result, "content"):
                content = result.content
                if isinstance(content, list) and len(content) > 0:
                    import json

                    info = (
                        content[0].text
                        if hasattr(content[0], "text")
                        else str(content[0])
                    )
                    info_dict = json.loads(info) if isinstance(info, str) else info
                    if info_dict.get("session_type") == "napari_bridge_session":
                        return {
                            "status": "ok",
                            "viewer_type": "external",
                            "title": info_dict.get("viewer", {}).get(
                                "title", "External Viewer"
                            ),
                            "layers": info_dict.get("viewer", {}).get(
                                "layer_names", []
                            ),
                            "port": info_dict.get("bridge_port", _external_port),
                        }

        return {
            "status": "error",
            "message": "Failed to get session information from external viewer",
        }

    @staticmethod
    async def close_viewer() -> dict[str, Any]:
        """
        Close the viewer window and clear all layers.

        Returns
        -------
        dict
            Dictionary with status: 'closed' if viewer existed, 'no_viewer' if none.
        """
        async with _viewer_lock:
            global _viewer, _qt_pump_task
            if _viewer is not None:
                _viewer.close()
                _viewer = None
                # Stop GUI pump when closing viewer
                if _qt_pump_task is not None and not _qt_pump_task.done():
                    _qt_pump_task.cancel()
                    with contextlib.suppress(asyncio.CancelledError):
                        await _qt_pump_task
                _qt_pump_task = None
                _process_events()
                return {"status": "closed"}
            return {"status": "no_viewer"}

    @staticmethod
    async def session_information() -> dict[str, Any]:
        """
        Get comprehensive information about the current napari session.

        Returns
        -------
        dict
            Comprehensive session information including viewer state, system info,
            and environment details.
        """
        import os
        import platform

        async with _viewer_lock:
            global _viewer, _qt_pump_task, _exec_globals

            try:
                return await NapariMCPTools._external_session_information(
                    _external_port
                )
            except Exception:
                # No external viewer; continue to local viewer
                pass

            # Use local viewer

            # Check if viewer exists
            viewer_exists = _viewer is not None
            if not viewer_exists:
                return {
                    "status": "ok",
                    "session_type": "napari_mcp_standalone_session",
                    "timestamp": str(np.datetime64("now")),
                    "viewer": None,
                    "message": "No viewer currently initialized. Call init_viewer() first.",
                }

            v = _viewer
            assert v is not None  # We already checked this above

            # Viewer information
            viewer_info = {
                "title": v.title,
                "viewer_id": id(v),
                "n_layers": len(v.layers),
                "layer_names": [layer.name for layer in v.layers],
                "selected_layers": [layer.name for layer in v.layers.selection],
                "current_step": dict(enumerate(v.dims.current_step))
                if hasattr(v.dims, "current_step")
                else {},
                "ndisplay": v.dims.ndisplay,
                "camera_center": list(v.camera.center),
                "camera_zoom": float(v.camera.zoom),
                "camera_angles": list(v.camera.angles) if v.camera.angles else [],
                "grid_enabled": v.grid.enabled,
            }

            # System information
            system_info = {
                "python_version": sys.version,
                "platform": platform.platform(),
                "napari_version": getattr(napari, "__version__", "unknown"),
                "process_id": os.getpid(),
                "working_directory": os.getcwd(),
            }

            # Session status
            gui_running = _qt_pump_task is not None and not _qt_pump_task.done()
            session_info = {
                "server_type": "napari_mcp_standalone",
                "viewer_instance": f"<napari.Viewer at {hex(id(v))}>",
                "gui_pump_running": gui_running,
                "execution_namespace_vars": list(_exec_globals.keys()),
                "qt_app_available": _qt_app is not None,
            }

            # Layer details
            layer_details = []
            for layer in v.layers:
                layer_detail = {
                    "name": layer.name,
                    "type": layer.__class__.__name__,
                    "visible": _parse_bool(getattr(layer, "visible", True)),
                    "opacity": float(getattr(layer, "opacity", 1.0)),
                    "blending": getattr(layer, "blending", None),
                    "data_shape": list(layer.data.shape)
                    if hasattr(layer, "data") and hasattr(layer.data, "shape")
                    else None,
                    "data_dtype": str(layer.data.dtype)
                    if hasattr(layer, "data") and hasattr(layer.data, "dtype")
                    else None,
                    "layer_id": id(layer),
                }

                # Add layer-specific properties
                if hasattr(layer, "colormap"):
                    layer_detail["colormap"] = getattr(
                        layer.colormap, "name", str(layer.colormap)
                    )
                if hasattr(layer, "contrast_limits"):
                    try:
                        cl = layer.contrast_limits
                        layer_detail["contrast_limits"] = [float(cl[0]), float(cl[1])]
                    except Exception:
                        pass
                if hasattr(layer, "gamma"):
                    layer_detail["gamma"] = float(getattr(layer, "gamma", 1.0))

                layer_details.append(layer_detail)

            return {
                "status": "ok",
                "session_type": "napari_mcp_standalone_session",
                "timestamp": str(np.datetime64("now")),
                "viewer": viewer_info,
                "system": system_info,
                "session": session_info,
                "layers": layer_details,
            }

    @staticmethod
    async def list_layers() -> list[dict[str, Any]]:
        """Return a list of layers with key properties."""
        # Try to proxy to external viewer first
        proxy_result = await _proxy_to_external("list_layers")
        if proxy_result is not None:
            # Ensure the result is the expected list format
            if isinstance(proxy_result, list):
                return proxy_result
            elif isinstance(proxy_result, dict) and "content" in proxy_result:
                content = proxy_result["content"]
                if isinstance(content, list):
                    return content
            return []

        # Local execution
        async with _viewer_lock:

            def _build():
                v = _ensure_viewer()
                result: list[dict[str, Any]] = []  # type: ignore
                for lyr in v.layers:
                    entry = {
                        "name": lyr.name,
                        "type": lyr.__class__.__name__,
                        "visible": _parse_bool(getattr(lyr, "visible", True)),
                        "opacity": float(getattr(lyr, "opacity", 1.0)),
                        "blending": getattr(lyr, "blending", None),
                    }
                    if (
                        hasattr(lyr, "colormap")
                        and getattr(lyr, "colormap", None) is not None
                    ):
                        entry["colormap"] = getattr(lyr.colormap, "name", None) or str(
                            lyr.colormap
                        )
                    if (
                        hasattr(lyr, "contrast_limits")
                        and getattr(lyr, "contrast_limits", None) is not None
                    ):
                        try:
                            cl = list(lyr.contrast_limits)
                            entry["contrast_limits"] = [float(cl[0]), float(cl[1])]
                        except Exception:
                            pass
                    result.append(entry)
                return result

            return _gui_execute(_build)

    @staticmethod
    async def add_image(
        path: str,
        name: str | None = None,
        colormap: str | None = None,
        blending: str | None = None,
        channel_axis: int | str | None = None,
    ) -> dict[str, Any]:
        """
        Add an image layer from a file path.

        Parameters
        ----------
        path : str
            Path to an image readable by imageio (e.g., PNG, TIFF, OME-TIFF).
        name : str, optional
            Layer name. If None, uses filename.
        colormap : str, optional
            Napari colormap name (e.g., 'gray', 'magma').
        blending : str, optional
            Blending mode (e.g., 'translucent').
        channel_axis : int, optional
            If provided, interpret that axis as channels.

        Returns
        -------
        dict
            Dictionary containing status, layer name, and image shape.
        """
        # Try to proxy to external viewer first
        params: dict[str, Any] = {"path": path}
        if name:
            params["name"] = name
        if colormap:
            params["colormap"] = colormap
        if blending:
            params["blending"] = blending
        if channel_axis is not None:
            params["channel_axis"] = int(channel_axis)

        result = await _proxy_to_external("add_image", params)
        if result is not None:
            return result

        # Local execution
        import imageio.v3 as iio

        async with _viewer_lock:
            data = iio.imread(path)

            def _add():
                v = _ensure_viewer()
                layer = v.add_image(
                    data,
                    name=name,
                    colormap=colormap,
                    blending=blending,
                    channel_axis=channel_axis,
                )
                _process_events()
                return {
                    "status": "ok",
                    "name": layer.name,
                    "shape": list(np.shape(data)),
                }

            return _gui_execute(_add)

    @staticmethod
    async def add_labels(path: str, name: str | None = None) -> dict[str, Any]:
        """Add a labels layer from a file path (e.g., PNG/TIFF with integer labels)."""
        import imageio.v3 as iio

        async with _viewer_lock:
            try:
                from pathlib import Path

                def _add():
                    v = _ensure_viewer()
                    p = Path(path).expanduser().resolve(strict=False)
                    data = iio.imread(str(p))
                    layer = v.add_labels(data, name=name)
                    _process_events()
                    return {
                        "status": "ok",
                        "name": layer.name,
                        "shape": list(np.shape(data)),
                    }

                return _gui_execute(_add)
            except Exception as e:
                return {
                    "status": "error",
                    "message": f"Failed to add labels from '{path}': {e}",
                }

    @staticmethod
    async def add_points(
        points: list[list[float]], name: str | None = None, size: float | str = 10.0
    ) -> dict[str, Any]:
        """
        Add a points layer.

        - points: List of [y, x] or [z, y, x] coordinates
        - name: Optional layer name
        - size: Point size in pixels
        """
        async with _viewer_lock:

            def _add():
                v = _ensure_viewer()
                arr = np.asarray(points, dtype=float)
                layer = v.add_points(arr, name=name, size=float(size))
                _process_events()
                return {
                    "status": "ok",
                    "name": layer.name,
                    "n_points": int(arr.shape[0]),
                }

            return _gui_execute(_add)

    @staticmethod
    async def remove_layer(name: str) -> dict[str, Any]:
        """Remove a layer by name."""
        async with _viewer_lock:

            def _remove():
                v = _ensure_viewer()
                if name in v.layers:
                    v.layers.remove(name)
                    _process_events()
                    return {"status": "removed", "name": name}
                return {"status": "not_found", "name": name}

            return _gui_execute(_remove)

    # Removed: rename_layer (use set_layer_properties with new_name instead)

    @staticmethod
    async def set_layer_properties(
        name: str,
        visible: bool | None = None,
        opacity: float | None = None,
        colormap: str | None = None,
        blending: str | None = None,
        contrast_limits: list[float] | None = None,
        gamma: float | str | None = None,
        new_name: str | None = None,
    ) -> dict[str, Any]:
        """Set common properties on a layer by name."""
        async with _viewer_lock:

            def _set():
                v = _ensure_viewer()
                if name not in v.layers:
                    return {"status": "not_found", "name": name}
                lyr = v.layers[name]
                if visible is not None and hasattr(lyr, "visible"):
                    lyr.visible = _parse_bool(visible)
                if opacity is not None and hasattr(lyr, "opacity"):
                    lyr.opacity = float(opacity)
                if colormap is not None and hasattr(lyr, "colormap"):
                    lyr.colormap = colormap
                if blending is not None and hasattr(lyr, "blending"):
                    lyr.blending = blending
                if contrast_limits is not None and hasattr(lyr, "contrast_limits"):
                    with contextlib.suppress(Exception):
                        lyr.contrast_limits = [
                            float(contrast_limits[0]),
                            float(contrast_limits[1]),
                        ]
                if gamma is not None and hasattr(lyr, "gamma"):
                    lyr.gamma = float(gamma)
                if new_name is not None:
                    lyr.name = new_name
                _process_events()
                return {"status": "ok", "name": lyr.name}

            return _gui_execute(_set)

    @staticmethod
    async def reorder_layer(
        name: str,
        index: int | str | None = None,
        before: str | None = None,
        after: str | None = None,
    ) -> dict[str, Any]:
        """
        Reorder a layer by name.

        Provide exactly one of:
        - index: absolute target index
        - before: move before this layer name
        - after: move after this layer name
        """
        async with _viewer_lock:

            def _reorder():
                v = _ensure_viewer()
                if name not in v.layers:
                    return {"status": "not_found", "name": name}
                if sum(x is not None for x in (index, before, after)) != 1:
                    return {
                        "status": "error",
                        "message": "Provide exactly one of index, before, or after",
                    }
                cur = v.layers.index(name)
                target = cur
                if index is not None:
                    target = max(0, min(int(index), len(v.layers) - 1))
                elif before is not None:
                    if before not in v.layers:
                        return {"status": "not_found", "name": before}
                    target = v.layers.index(before)
                elif after is not None:
                    if after not in v.layers:
                        return {"status": "not_found", "name": after}
                    target = v.layers.index(after) + 1
                if target != cur:
                    v.layers.move(cur, target)
                _process_events()
                return {"status": "ok", "name": name, "index": v.layers.index(name)}

            return _gui_execute(_reorder)

    @staticmethod
    async def set_active_layer(name: str) -> dict[str, Any]:
        """Set the selected/active layer by name."""
        async with _viewer_lock:

            def _set_active():
                v = _ensure_viewer()
                if name not in v.layers:
                    return {"status": "not_found", "name": name}
                v.layers.selection = {v.layers[name]}
                _process_events()
                return {"status": "ok", "active": name}

            return _gui_execute(_set_active)

    @staticmethod
    async def reset_view() -> dict[str, Any]:
        """Reset the camera view to fit data."""
        async with _viewer_lock:

            def _reset():
                v = _ensure_viewer()
                v.reset_view()
                _process_events()
                return {"status": "ok"}

            return _gui_execute(_reset)

    # Removed: set_zoom (use set_camera with zoom instead)

    @staticmethod
    async def set_camera(
        center: list[float] | None = None,
        zoom: float | str | None = None,
        angle: float | str | None = None,
    ) -> dict[str, Any]:
        """Set camera properties: center, zoom, and/or angle."""
        async with _viewer_lock:

            def _set_cam():
                v = _ensure_viewer()
                if center is not None:
                    v.camera.center = list(map(float, center))
                if zoom is not None:
                    v.camera.zoom = float(zoom)
                if angle is not None:
                    v.camera.angles = (float(angle),)
                _process_events()
                return {
                    "status": "ok",
                    "center": list(map(float, v.camera.center)),
                    "zoom": float(v.camera.zoom),
                }

            return _gui_execute(_set_cam)

    @staticmethod
    async def set_ndisplay(ndisplay: int | str) -> dict[str, Any]:
        """Set number of displayed dimensions (2 or 3)."""
        async with _viewer_lock:

            def _set():
                v = _ensure_viewer()
                v.dims.ndisplay = int(ndisplay)
                _process_events()
                return {"status": "ok", "ndisplay": int(v.dims.ndisplay)}

            return _gui_execute(_set)

    @staticmethod
    async def set_dims_current_step(
        axis: int | str, value: int | str
    ) -> dict[str, Any]:
        """Set the current step (slider position) for a specific axis."""
        async with _viewer_lock:

            def _set():
                v = _ensure_viewer()
                v.dims.set_current_step(int(axis), int(value))
                _process_events()
                return {"status": "ok", "axis": int(axis), "value": int(value)}

            return _gui_execute(_set)

    @staticmethod
    async def set_grid(enabled: bool | str = True) -> dict[str, Any]:
        """Enable or disable grid view."""
        async with _viewer_lock:

            def _set():
                v = _ensure_viewer()
                v.grid.enabled = _parse_bool(enabled)
                _process_events()
                return {"status": "ok", "grid": _parse_bool(v.grid.enabled)}

            return _gui_execute(_set)

    @staticmethod
    async def screenshot(canvas_only: bool | str = True) -> ImageContent:
        """
        Take a screenshot of the napari canvas and return as base64.

        Parameters
        ----------
        canvas_only : bool, default=True
            If True, only capture the canvas area.

        Returns
        -------
        ImageContent
            The screenshot image as an mcp.types.ImageContent object.
        """
        # Try to proxy to external viewer first
        result = await _proxy_to_external("screenshot", {"canvas_only": canvas_only})
        if result is not None:
            return result

        # Local execution
        async with _viewer_lock:

            def _shot():
                v = _ensure_viewer()
                _process_events(3)
                arr = v.screenshot(canvas_only=canvas_only)
                if not isinstance(arr, np.ndarray):
                    arr = np.asarray(arr)
                if arr.dtype != np.uint8:
                    arr = arr.astype(np.uint8, copy=False)
                img = Image.fromarray(arr)
                buf = BytesIO()
                img.save(buf, format="PNG")
                enc = buf.getvalue()
                return fastmcp.utilities.types.Image(
                    data=enc, format="png"
                ).to_image_content()

            return _gui_execute(_shot)

    @staticmethod
    async def timelapse_screenshot(
        axis: int | str,
        slice_range: str,
        canvas_only: bool | str = True,
        interpolate_to_fit: bool = True,
    ) -> list[ImageContent]:
        """
        Capture a series of screenshots while sweeping a dims axis.

        Parameters
        ----------
        axis : int
            Dims axis index to sweep (e.g., temporal axis).
        slice_range : str
            Python-like slice string over step indices, e.g. "1:5", ":6", "::2".
            Defaults follow Python semantics with start=0, stop=nsteps, step=1.
        canvas_only : bool, default=True
            If True, only capture the canvas area.
        interpolate_to_fit : bool, default=False
            If True, interpolate the images to fit the total size cap of 1309246 bytes.

        Returns
        -------
        list[ImageContent]
            List of screenshots as mcp.types.ImageContent objects.
        """
        max_total_base64_bytes = 1309246 if interpolate_to_fit else None

        # Try to proxy to external viewer first
        result = await _proxy_to_external(
            "timelapse_screenshot",
            {
                "axis": axis,
                "slice_range": slice_range,
                "canvas_only": canvas_only,
                "interpolate_to_fit": interpolate_to_fit,
            },
        )
        if result is not None:
            return result  # type: ignore[return-value]

        def _parse_slice(spec: str, length: int) -> list[int]:
            # Normalize
            s = (spec or "").strip()
            # Single integer
            if s and ":" not in s:
                try:
                    idx = int(s)
                except Exception as err:
                    raise ValueError(f"Invalid slice range: {spec!r}") from err
                if idx < 0:
                    idx += length
                if not (0 <= idx < length):
                    raise ValueError(
                        f"Index out of bounds for axis with {length} steps: {idx}"
                    )
                return [idx]

            # Slice form start:stop:step
            parts = s.split(":")
            if len(parts) > 3:
                raise ValueError(f"Invalid slice range: {spec!r}")
            start_s, stop_s, step_s = (parts + [""] * 3)[:3]

            def _to_int_or_none(val: str) -> int | None:
                v = val.strip()
                if v == "":
                    return None
                return int(v)

            start = _to_int_or_none(start_s)
            stop = _to_int_or_none(stop_s)
            step = _to_int_or_none(step_s) or 1
            if step == 0:
                raise ValueError("slice step cannot be 0")

            # Handle negatives like Python
            if start is None:
                start = 0 if step > 0 else length - 1
            if stop is None:
                stop = length if step > 0 else -1
            if start < 0:
                start += length
            if stop < 0:
                stop += length

            # Clamp to valid iteration range similar to range() behavior
            rng = range(start, stop, step)
            indices = [i for i in rng if 0 <= i < length]
            return indices

        # Local execution
        async with _viewer_lock:

            def _run_series():
                v = _ensure_viewer()

                # Determine number of steps along axis
                try:
                    nsteps_tuple = getattr(v.dims, "nsteps", None)
                    if nsteps_tuple is None:
                        # Fallback: infer from current_step length and a conservative stop
                        # We cannot reliably infer total steps without dims.nsteps; require it
                        raise AttributeError
                    total = int(nsteps_tuple[int(axis)])
                except Exception:
                    # Best effort via bounds from layers; may be approximate
                    try:
                        total = max(
                            int(getattr(lyr.data, "shape", [1])[(int(axis))])
                            if int(axis) < getattr(lyr.data, "ndim", 0)
                            else 1
                            for lyr in v.layers
                        )
                    except Exception:
                        total = 0

                if total <= 0:
                    raise RuntimeError(
                        "Unable to determine number of steps for the given axis"
                    )

                indices = _parse_slice(slice_range, total)
                if not indices:
                    return []

                # Take a sample at the first index to estimate size
                v.dims.set_current_step(int(axis), int(indices[0]))
                _process_events(2)
                sample_arr = v.screenshot(canvas_only=canvas_only)
                if not isinstance(sample_arr, np.ndarray):
                    sample_arr = np.asarray(sample_arr)
                if sample_arr.dtype != np.uint8:
                    sample_arr = sample_arr.astype(np.uint8, copy=False)
                sample_img = Image.fromarray(sample_arr)
                sbuf = BytesIO()
                sample_img.save(sbuf, format="PNG")
                sample_png = sbuf.getvalue()
                sample_b64_len = ((len(sample_png) + 2) // 3) * 4

                # Ask user whether to downsample if estimated total exceeds cap
                downsample_factor = 1.0
                if (
                    max_total_base64_bytes is not None
                    and sample_b64_len * len(indices) > max_total_base64_bytes
                ):
                    est_factor = math.sqrt(
                        max_total_base64_bytes
                        / float(max(1, sample_b64_len * len(indices)))
                    )
                    downsample_factor = max(0.05, min(1.0, est_factor))

                images: list[ImageContent] = []  # type: ignore
                total_b64_len = 0
                for idx in indices:
                    # Move slider
                    v.dims.set_current_step(int(axis), int(idx))
                    _process_events(2)

                    # Capture
                    arr = v.screenshot(canvas_only=canvas_only)
                    if not isinstance(arr, np.ndarray):
                        arr = np.asarray(arr)
                    if arr.dtype != np.uint8:
                        arr = arr.astype(np.uint8, copy=False)

                    img = Image.fromarray(arr)
                    if downsample_factor < 1.0:
                        new_w = max(1, int(img.width * downsample_factor))
                        new_h = max(1, int(img.height * downsample_factor))
                        if new_w != img.width or new_h != img.height:
                            img = img.resize((new_w, new_h), resample=Image.BILINEAR)
                    buf = BytesIO()
                    img.save(buf, format="PNG")
                    enc = buf.getvalue()
                    b64_len = ((len(enc) + 2) // 3) * 4
                    if (
                        max_total_base64_bytes is not None
                        and total_b64_len + b64_len > max_total_base64_bytes
                    ):
                        break
                    total_b64_len += b64_len
                    images.append(
                        fastmcp.utilities.types.Image(
                            data=enc, format="png"
                        ).to_image_content()
                    )

                return images

            return _gui_execute(_run_series)

    @staticmethod
    async def execute_code(code: str, line_limit: int | str = 30) -> dict[str, Any]:
        """
        Execute arbitrary Python code in the server's interpreter.

        Similar to napari's console. The execution namespace persists across calls
        and includes 'viewer', 'napari', and 'np'.

        Parameters
        ----------
        code : str
            Python code string. The value of the last expression (if any)
            is returned as 'result_repr'.
        line_limit : int, default=30
            Maximum number of output lines to return. Use -1 for unlimited output.
            Warning: Using -1 may consume a large number of tokens.

        Returns
        -------
        dict
            Dictionary with 'status', optional 'result_repr', 'stdout', 'stderr',
            and 'output_id' for retrieving full output if truncated.
        """
        # Try to proxy to external viewer first
        result = await _proxy_to_external("execute_code", {"code": code})
        if result is not None:
            return result

        # Local execution
        async with _viewer_lock:
            v = _ensure_viewer()
            _exec_globals.setdefault("__builtins__", __builtins__)  # type: ignore[assignment]
            _exec_globals["viewer"] = v
            napari_mod = napari
            if napari_mod is not None:
                _exec_globals.setdefault("napari", napari_mod)
            _exec_globals.setdefault("np", np)

            stdout_buf = StringIO()
            stderr_buf = StringIO()
            result_repr: str | None = None
            try:
                # Capture stdout/stderr during execution
                with (
                    contextlib.redirect_stdout(stdout_buf),
                    contextlib.redirect_stderr(stderr_buf),
                ):
                    # Try to evaluate last expression if present
                    parsed = ast.parse(code, mode="exec")
                    if parsed.body and isinstance(parsed.body[-1], ast.Expr):
                        # Execute all but last, then eval last expression to
                        # capture a result
                        if len(parsed.body) > 1:
                            exec_ast = ast.Module(
                                body=parsed.body[:-1], type_ignores=[]
                            )
                            exec(
                                compile(exec_ast, "<mcp-exec>", "exec"),
                                _exec_globals,
                                _exec_globals,
                            )
                        last_expr = ast.Expression(body=parsed.body[-1].value)
                        value = eval(
                            compile(last_expr, "<mcp-eval>", "eval"),
                            _exec_globals,
                            _exec_globals,
                        )
                        result_repr = repr(value)
                    else:
                        # Pure statements
                        exec(
                            compile(parsed, "<mcp-exec>", "exec"),
                            _exec_globals,
                            _exec_globals,
                        )
                _process_events(2)

                # Get full output
                stdout_full = stdout_buf.getvalue()
                stderr_full = stderr_buf.getvalue()

                # Store full output and get ID
                output_id = await _store_output(
                    tool_name="execute_code",
                    stdout=stdout_full,
                    stderr=stderr_full,
                    result_repr=result_repr,
                    code=code,
                )

                # Prepare response with line limiting
                response = {
                    "status": "ok",
                    "output_id": output_id,
                    **({"result_repr": result_repr} if result_repr is not None else {}),
                }

                # Add warning for unlimited output
                if line_limit == -1:
                    response["warning"] = (
                        "Unlimited output requested. This may consume a large number "
                        "of tokens. Consider using read_output for large outputs."
                    )
                    response["stdout"] = stdout_full
                    response["stderr"] = stderr_full
                else:
                    # Truncate stdout and stderr
                    stdout_truncated, stdout_was_truncated = _truncate_output(
                        stdout_full, int(line_limit)
                    )
                    stderr_truncated, stderr_was_truncated = _truncate_output(
                        stderr_full, int(line_limit)
                    )

                    response["stdout"] = stdout_truncated
                    response["stderr"] = stderr_truncated

                    if stdout_was_truncated or stderr_was_truncated:
                        response["truncated"] = True  # type: ignore
                        response["message"] = (
                            f"Output truncated to {line_limit} lines. "
                            f"Use read_output('{output_id}') to retrieve full output."
                        )

                return response
            except Exception as e:
                _process_events(1)
                tb = traceback.format_exc()

                # Get full output including traceback
                stdout_full = stdout_buf.getvalue()
                stderr_full = stderr_buf.getvalue() + tb

                # Store full output and get ID
                output_id = await _store_output(
                    tool_name="execute_code",
                    stdout=stdout_full,
                    stderr=stderr_full,
                    code=code,
                    error=True,
                )

                # Prepare error response with line limiting
                response = {
                    "status": "error",
                    "output_id": output_id,
                }

                # Add warning for unlimited output
                if line_limit == -1:
                    response["warning"] = (
                        "Unlimited output requested. This may consume a large number "
                        "of tokens. Consider using read_output for large outputs."
                    )
                    response["stdout"] = stdout_full
                    response["stderr"] = stderr_full
                else:
                    # Truncate stdout and stderr
                    stdout_truncated, stdout_was_truncated = _truncate_output(
                        stdout_full, int(line_limit)
                    )
                    stderr_truncated, stderr_was_truncated = _truncate_output(
                        stderr_full, int(line_limit)
                    )

                    response["stdout"] = stdout_truncated
                    # Ensure exception summary is present even when truncated
                    error_summary = f"{type(e).__name__}: {e}"
                    if error_summary not in stderr_truncated:
                        # Append a concise summary line so callers can see the error type
                        if stderr_truncated and not stderr_truncated.endswith("\n"):
                            stderr_truncated += "\n"
                        stderr_truncated += error_summary + "\n"
                    response["stderr"] = stderr_truncated

                    if stdout_was_truncated or stderr_was_truncated:
                        response["truncated"] = True  # type: ignore
                        response["message"] = (
                            f"Output truncated to {line_limit} lines. "
                            f"Use read_output('{output_id}') to retrieve full output."
                        )

                return response

    @staticmethod
    async def install_packages(
        packages: list[str],
        upgrade: bool | None = False,
        no_deps: bool | None = False,
        index_url: str | None = None,
        extra_index_url: str | None = None,
        pre: bool | None = False,
        line_limit: int | str = 30,
        timeout: int = 240,
    ) -> dict[str, Any]:
        """
        Install Python packages using pip.

        Install packages into the currently running server environment.

        Parameters
        ----------
        packages : list of str
            List of package specifiers (e.g., "scikit-image", "torch==2.3.1").
        upgrade : bool, optional
            If True, pass --upgrade flag.
        no_deps : bool, optional
            If True, pass --no-deps flag.
        index_url : str, optional
            Custom index URL.
        extra_index_url : str, optional
            Extra index URL.
        pre : bool, optional
            Allow pre-releases (--pre flag).
        line_limit : int, default=30
            Maximum number of output lines to return. Use -1 for unlimited output.
            Warning: Using -1 may consume a large number of tokens.
        timeout : int, default=240
            Timeout for pip install in seconds.

        Returns
        -------
        dict
            Dictionary including status, returncode, stdout, stderr, command,
            and output_id for retrieving full output if truncated.
        """
        # Try to proxy to external viewer first
        result = await _proxy_to_external(
            "install_packages",
            {
                "packages": packages,
                "upgrade": upgrade,
                "no_deps": no_deps,
                "index_url": index_url,
                "extra_index_url": extra_index_url,
                "pre": pre,
                "line_limit": line_limit,
                "timeout": timeout,
            },
        )
        if result is not None:
            return result

        if not packages or not isinstance(packages, list):
            return {
                "status": "error",
                "message": "Parameter 'packages' must be a non-empty list of package names",
            }

        cmd: list[str] = [
            sys.executable,
            "-m",
            "pip",
            "install",
            "--no-input",
            "--disable-pip-version-check",
        ]
        if upgrade:
            cmd.append("--upgrade")
        if no_deps:
            cmd.append("--no-deps")
        if pre:
            cmd.append("--pre")
        if index_url:
            cmd.extend(["--index-url", index_url])
        if extra_index_url:
            cmd.extend(["--extra-index-url", extra_index_url])
        cmd.extend(packages)

        # Run pip as a subprocess without blocking the event loop
        proc = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        try:
            stdout_b, stderr_b = await asyncio.wait_for(
                proc.communicate(), timeout=timeout
            )
        except asyncio.TimeoutError:
            with contextlib.suppress(ProcessLookupError):
                proc.kill()
            stdout_b, stderr_b = b"", f"pip install timed out after {timeout}s".encode()
        stdout = stdout_b.decode(errors="replace")
        stderr = stderr_b.decode(errors="replace")

        status = "ok" if proc.returncode == 0 else "error"
        command_str = " ".join(shlex.quote(part) for part in cmd)

        # Store full output and get ID
        output_id = await _store_output(
            tool_name="install_packages",
            stdout=stdout,
            stderr=stderr,
            packages=packages,
            command=command_str,
            returncode=proc.returncode,
        )

        # Prepare response with line limiting
        response = {
            "status": status,
            "returncode": proc.returncode if proc.returncode is not None else -1,
            "command": command_str,
            "output_id": output_id,
        }

        # Add warning for unlimited output
        if line_limit == -1:
            response["warning"] = (
                "Unlimited output requested. This may consume a large number "
                "of tokens. Consider using read_output for large outputs."
            )
            response["stdout"] = stdout
            response["stderr"] = stderr
        else:
            # Truncate stdout and stderr
            stdout_truncated, stdout_was_truncated = _truncate_output(
                stdout, int(line_limit)
            )
            stderr_truncated, stderr_was_truncated = _truncate_output(
                stderr, int(line_limit)
            )

            response["stdout"] = stdout_truncated
            response["stderr"] = stderr_truncated

            if stdout_was_truncated or stderr_was_truncated:
                response["truncated"] = True
                response["message"] = (
                    f"Output truncated to {line_limit} lines. "
                    f"Use read_output('{output_id}') to retrieve full output."
                )

        return response

    @staticmethod
    async def read_output(
        output_id: str, start: int | str = 0, end: int | str = -1
    ) -> dict[str, Any]:
        """
        Read stored tool output with optional line range.

        Parameters
        ----------
        output_id : str
            Unique ID of the stored output.
        start : int, default=0
            Starting line number (0-indexed).
        end : int, default=-1
            Ending line number (exclusive). If -1, read to end.

        Returns
        -------
        dict
            Dictionary containing the requested output lines and metadata.
        """
        async with _output_storage_lock:
            if output_id not in _output_storage:
                return {
                    "status": "error",
                    "message": f"Output ID '{output_id}' not found",
                }

            stored_output = _output_storage[output_id]

            # Combine stdout and stderr for line-based access
            full_output = ""
            if stored_output.get("stdout"):
                full_output = stored_output["stdout"]
            if stored_output.get("stderr"):
                stderr_text = stored_output["stderr"]
                if (
                    full_output
                    and not full_output.endswith("\n")
                    and not stderr_text.startswith("\n")
                ):
                    full_output += "\n"
                full_output += stderr_text

            # Normalize and clamp range inputs
            try:
                start = int(start)
            except Exception:
                start = 0
            try:
                end = int(end)
            except Exception:
                end = -1

            start = max(0, start)

            lines = full_output.splitlines(keepends=True)
            total_lines = len(lines)

            # Handle line range
            end = total_lines if end == -1 else min(total_lines, end)

            selected_lines = [] if start >= total_lines else lines[start:end]

            return {
                "status": "ok",
                "output_id": output_id,
                "tool_name": stored_output["tool_name"],
                "timestamp": stored_output["timestamp"],
                "lines": selected_lines,
                "line_range": {"start": start, "end": min(end, total_lines)},
                "total_lines": total_lines,
                "result_repr": stored_output.get("result_repr"),
            }

Functions

detect_viewers async staticmethod

detect_viewers() -> dict[str, Any]

Detect available viewers (local and external).

Returns:

Type Description
dict

Dictionary with information about available viewers

Source code in src/napari_mcp/server.py
@staticmethod
async def detect_viewers() -> dict[str, Any]:
    """
    Detect available viewers (local and external).

    Returns
    -------
    dict
        Dictionary with information about available viewers
    """
    viewers: dict[str, Any] = {"local": None, "external": None}

    # Check for external viewer
    client, info = await _detect_external_viewer()
    if client and info is not None:
        viewers["external"] = {
            "available": True,
            "type": "napari_bridge",
            "port": info.get("bridge_port", _external_port),
            "viewer_info": info.get("viewer", {}),
        }
    else:
        viewers["external"] = {"available": False}

    # Check for local viewer
    global _viewer
    if _viewer is not None:
        viewers["local"] = {
            "available": True,
            "type": "singleton",
            "title": _viewer.title,
            "n_layers": len(_viewer.layers),
        }
    else:
        viewers["local"] = {
            "available": True,  # Can be created
            "type": "not_initialized",
        }

    return {
        "status": "ok",
        "viewers": viewers,
    }

init_viewer async staticmethod

init_viewer(title: str | None = None, width: int | str | None = None, height: int | str | None = None, port: int | str | None = None) -> dict[str, Any]

Create or return the napari viewer (local or external).

Parameters:

Name Type Description Default
title str

Optional window title (only for local viewer).

None
width int

Optional initial canvas width (only for local viewer).

None
height int

Optional initial canvas height (only for local viewer).

None
port int

If provided, attempt to connect to an external napari-mcp bridge on this port (default is taken from NAPARI_MCP_BRIDGE_PORT or 9999).

None

Returns:

Type Description
dict

Dictionary containing status, viewer type, and layer info.

Source code in src/napari_mcp/server.py
@staticmethod
async def init_viewer(
    title: str | None = None,
    width: int | str | None = None,
    height: int | str | None = None,
    port: int | str | None = None,
) -> dict[str, Any]:
    """
    Create or return the napari viewer (local or external).

    Parameters
    ----------
    title : str, optional
        Optional window title (only for local viewer).
    width : int, optional
        Optional initial canvas width (only for local viewer).
    height : int, optional
        Optional initial canvas height (only for local viewer).
    port : int, optional
        If provided, attempt to connect to an external napari-mcp bridge on
        this port (default is taken from NAPARI_MCP_BRIDGE_PORT or 9999).

    Returns
    -------
    dict
        Dictionary containing status, viewer type, and layer info.
    """
    # Allow overriding the external port per-call
    global _external_port
    if port is not None:
        try:
            _external_port = int(port)
        except Exception:
            logger.error("Invalid port: {port}")
            _external_port = _external_port

    async with _viewer_lock:
        # Try external viewer first; fall back to local
        try:
            return await NapariMCPTools._external_session_information(
                _external_port
            )
        except Exception:
            # No external viewer; continue to local viewer
            pass

        # Use local viewer
        v = _ensure_viewer()
        if title:
            v.title = title
        if width or height:
            w = (
                int(width)
                if width is not None
                else v.window.qt_viewer.canvas.size().width()
            )
            h = (
                int(height)
                if height is not None
                else v.window.qt_viewer.canvas.size().height()
            )
            v.window.qt_viewer.canvas.native.resize(w, h)
        # Always ensure GUI pump is running for local viewer (backwards-incompatible change)
        global _qt_pump_task
        app = _ensure_qt_app()
        with contextlib.suppress(Exception):
            app.setQuitOnLastWindowClosed(False)
        _connect_window_destroyed_signal(v)

        # Best-effort to show window without forcing focus (safer for tests/headless)
        try:
            qt_win = v.window._qt_window  # type: ignore[attr-defined]
            qt_win.show()
        except Exception:
            pass

        if _qt_pump_task is None or _qt_pump_task.done():
            loop = asyncio.get_running_loop()
            _qt_pump_task = loop.create_task(_qt_event_pump())

        _process_events()
        return {
            "status": "ok",
            "viewer_type": "local",
            "title": v.title,
            "layers": [lyr.name for lyr in v.layers],
        }

close_viewer async staticmethod

close_viewer() -> dict[str, Any]

Close the viewer window and clear all layers.

Returns:

Type Description
dict

Dictionary with status: 'closed' if viewer existed, 'no_viewer' if none.

Source code in src/napari_mcp/server.py
@staticmethod
async def close_viewer() -> dict[str, Any]:
    """
    Close the viewer window and clear all layers.

    Returns
    -------
    dict
        Dictionary with status: 'closed' if viewer existed, 'no_viewer' if none.
    """
    async with _viewer_lock:
        global _viewer, _qt_pump_task
        if _viewer is not None:
            _viewer.close()
            _viewer = None
            # Stop GUI pump when closing viewer
            if _qt_pump_task is not None and not _qt_pump_task.done():
                _qt_pump_task.cancel()
                with contextlib.suppress(asyncio.CancelledError):
                    await _qt_pump_task
            _qt_pump_task = None
            _process_events()
            return {"status": "closed"}
        return {"status": "no_viewer"}

session_information async staticmethod

session_information() -> dict[str, Any]

Get comprehensive information about the current napari session.

Returns:

Type Description
dict

Comprehensive session information including viewer state, system info, and environment details.

Source code in src/napari_mcp/server.py
@staticmethod
async def session_information() -> dict[str, Any]:
    """
    Get comprehensive information about the current napari session.

    Returns
    -------
    dict
        Comprehensive session information including viewer state, system info,
        and environment details.
    """
    import os
    import platform

    async with _viewer_lock:
        global _viewer, _qt_pump_task, _exec_globals

        try:
            return await NapariMCPTools._external_session_information(
                _external_port
            )
        except Exception:
            # No external viewer; continue to local viewer
            pass

        # Use local viewer

        # Check if viewer exists
        viewer_exists = _viewer is not None
        if not viewer_exists:
            return {
                "status": "ok",
                "session_type": "napari_mcp_standalone_session",
                "timestamp": str(np.datetime64("now")),
                "viewer": None,
                "message": "No viewer currently initialized. Call init_viewer() first.",
            }

        v = _viewer
        assert v is not None  # We already checked this above

        # Viewer information
        viewer_info = {
            "title": v.title,
            "viewer_id": id(v),
            "n_layers": len(v.layers),
            "layer_names": [layer.name for layer in v.layers],
            "selected_layers": [layer.name for layer in v.layers.selection],
            "current_step": dict(enumerate(v.dims.current_step))
            if hasattr(v.dims, "current_step")
            else {},
            "ndisplay": v.dims.ndisplay,
            "camera_center": list(v.camera.center),
            "camera_zoom": float(v.camera.zoom),
            "camera_angles": list(v.camera.angles) if v.camera.angles else [],
            "grid_enabled": v.grid.enabled,
        }

        # System information
        system_info = {
            "python_version": sys.version,
            "platform": platform.platform(),
            "napari_version": getattr(napari, "__version__", "unknown"),
            "process_id": os.getpid(),
            "working_directory": os.getcwd(),
        }

        # Session status
        gui_running = _qt_pump_task is not None and not _qt_pump_task.done()
        session_info = {
            "server_type": "napari_mcp_standalone",
            "viewer_instance": f"<napari.Viewer at {hex(id(v))}>",
            "gui_pump_running": gui_running,
            "execution_namespace_vars": list(_exec_globals.keys()),
            "qt_app_available": _qt_app is not None,
        }

        # Layer details
        layer_details = []
        for layer in v.layers:
            layer_detail = {
                "name": layer.name,
                "type": layer.__class__.__name__,
                "visible": _parse_bool(getattr(layer, "visible", True)),
                "opacity": float(getattr(layer, "opacity", 1.0)),
                "blending": getattr(layer, "blending", None),
                "data_shape": list(layer.data.shape)
                if hasattr(layer, "data") and hasattr(layer.data, "shape")
                else None,
                "data_dtype": str(layer.data.dtype)
                if hasattr(layer, "data") and hasattr(layer.data, "dtype")
                else None,
                "layer_id": id(layer),
            }

            # Add layer-specific properties
            if hasattr(layer, "colormap"):
                layer_detail["colormap"] = getattr(
                    layer.colormap, "name", str(layer.colormap)
                )
            if hasattr(layer, "contrast_limits"):
                try:
                    cl = layer.contrast_limits
                    layer_detail["contrast_limits"] = [float(cl[0]), float(cl[1])]
                except Exception:
                    pass
            if hasattr(layer, "gamma"):
                layer_detail["gamma"] = float(getattr(layer, "gamma", 1.0))

            layer_details.append(layer_detail)

        return {
            "status": "ok",
            "session_type": "napari_mcp_standalone_session",
            "timestamp": str(np.datetime64("now")),
            "viewer": viewer_info,
            "system": system_info,
            "session": session_info,
            "layers": layer_details,
        }

list_layers async staticmethod

list_layers() -> list[dict[str, Any]]

Return a list of layers with key properties.

Source code in src/napari_mcp/server.py
@staticmethod
async def list_layers() -> list[dict[str, Any]]:
    """Return a list of layers with key properties."""
    # Try to proxy to external viewer first
    proxy_result = await _proxy_to_external("list_layers")
    if proxy_result is not None:
        # Ensure the result is the expected list format
        if isinstance(proxy_result, list):
            return proxy_result
        elif isinstance(proxy_result, dict) and "content" in proxy_result:
            content = proxy_result["content"]
            if isinstance(content, list):
                return content
        return []

    # Local execution
    async with _viewer_lock:

        def _build():
            v = _ensure_viewer()
            result: list[dict[str, Any]] = []  # type: ignore
            for lyr in v.layers:
                entry = {
                    "name": lyr.name,
                    "type": lyr.__class__.__name__,
                    "visible": _parse_bool(getattr(lyr, "visible", True)),
                    "opacity": float(getattr(lyr, "opacity", 1.0)),
                    "blending": getattr(lyr, "blending", None),
                }
                if (
                    hasattr(lyr, "colormap")
                    and getattr(lyr, "colormap", None) is not None
                ):
                    entry["colormap"] = getattr(lyr.colormap, "name", None) or str(
                        lyr.colormap
                    )
                if (
                    hasattr(lyr, "contrast_limits")
                    and getattr(lyr, "contrast_limits", None) is not None
                ):
                    try:
                        cl = list(lyr.contrast_limits)
                        entry["contrast_limits"] = [float(cl[0]), float(cl[1])]
                    except Exception:
                        pass
                result.append(entry)
            return result

        return _gui_execute(_build)

add_image async staticmethod

add_image(path: str, name: str | None = None, colormap: str | None = None, blending: str | None = None, channel_axis: int | str | None = None) -> dict[str, Any]

Add an image layer from a file path.

Parameters:

Name Type Description Default
path str

Path to an image readable by imageio (e.g., PNG, TIFF, OME-TIFF).

required
name str

Layer name. If None, uses filename.

None
colormap str

Napari colormap name (e.g., 'gray', 'magma').

None
blending str

Blending mode (e.g., 'translucent').

None
channel_axis int

If provided, interpret that axis as channels.

None

Returns:

Type Description
dict

Dictionary containing status, layer name, and image shape.

Source code in src/napari_mcp/server.py
@staticmethod
async def add_image(
    path: str,
    name: str | None = None,
    colormap: str | None = None,
    blending: str | None = None,
    channel_axis: int | str | None = None,
) -> dict[str, Any]:
    """
    Add an image layer from a file path.

    Parameters
    ----------
    path : str
        Path to an image readable by imageio (e.g., PNG, TIFF, OME-TIFF).
    name : str, optional
        Layer name. If None, uses filename.
    colormap : str, optional
        Napari colormap name (e.g., 'gray', 'magma').
    blending : str, optional
        Blending mode (e.g., 'translucent').
    channel_axis : int, optional
        If provided, interpret that axis as channels.

    Returns
    -------
    dict
        Dictionary containing status, layer name, and image shape.
    """
    # Try to proxy to external viewer first
    params: dict[str, Any] = {"path": path}
    if name:
        params["name"] = name
    if colormap:
        params["colormap"] = colormap
    if blending:
        params["blending"] = blending
    if channel_axis is not None:
        params["channel_axis"] = int(channel_axis)

    result = await _proxy_to_external("add_image", params)
    if result is not None:
        return result

    # Local execution
    import imageio.v3 as iio

    async with _viewer_lock:
        data = iio.imread(path)

        def _add():
            v = _ensure_viewer()
            layer = v.add_image(
                data,
                name=name,
                colormap=colormap,
                blending=blending,
                channel_axis=channel_axis,
            )
            _process_events()
            return {
                "status": "ok",
                "name": layer.name,
                "shape": list(np.shape(data)),
            }

        return _gui_execute(_add)

add_labels async staticmethod

add_labels(path: str, name: str | None = None) -> dict[str, Any]

Add a labels layer from a file path (e.g., PNG/TIFF with integer labels).

Source code in src/napari_mcp/server.py
@staticmethod
async def add_labels(path: str, name: str | None = None) -> dict[str, Any]:
    """Add a labels layer from a file path (e.g., PNG/TIFF with integer labels)."""
    import imageio.v3 as iio

    async with _viewer_lock:
        try:
            from pathlib import Path

            def _add():
                v = _ensure_viewer()
                p = Path(path).expanduser().resolve(strict=False)
                data = iio.imread(str(p))
                layer = v.add_labels(data, name=name)
                _process_events()
                return {
                    "status": "ok",
                    "name": layer.name,
                    "shape": list(np.shape(data)),
                }

            return _gui_execute(_add)
        except Exception as e:
            return {
                "status": "error",
                "message": f"Failed to add labels from '{path}': {e}",
            }

add_points async staticmethod

add_points(points: list[list[float]], name: str | None = None, size: float | str = 10.0) -> dict[str, Any]

Add a points layer.

  • points: List of [y, x] or [z, y, x] coordinates
  • name: Optional layer name
  • size: Point size in pixels
Source code in src/napari_mcp/server.py
@staticmethod
async def add_points(
    points: list[list[float]], name: str | None = None, size: float | str = 10.0
) -> dict[str, Any]:
    """
    Add a points layer.

    - points: List of [y, x] or [z, y, x] coordinates
    - name: Optional layer name
    - size: Point size in pixels
    """
    async with _viewer_lock:

        def _add():
            v = _ensure_viewer()
            arr = np.asarray(points, dtype=float)
            layer = v.add_points(arr, name=name, size=float(size))
            _process_events()
            return {
                "status": "ok",
                "name": layer.name,
                "n_points": int(arr.shape[0]),
            }

        return _gui_execute(_add)

remove_layer async staticmethod

remove_layer(name: str) -> dict[str, Any]

Remove a layer by name.

Source code in src/napari_mcp/server.py
@staticmethod
async def remove_layer(name: str) -> dict[str, Any]:
    """Remove a layer by name."""
    async with _viewer_lock:

        def _remove():
            v = _ensure_viewer()
            if name in v.layers:
                v.layers.remove(name)
                _process_events()
                return {"status": "removed", "name": name}
            return {"status": "not_found", "name": name}

        return _gui_execute(_remove)

set_layer_properties async staticmethod

set_layer_properties(name: str, visible: bool | None = None, opacity: float | None = None, colormap: str | None = None, blending: str | None = None, contrast_limits: list[float] | None = None, gamma: float | str | None = None, new_name: str | None = None) -> dict[str, Any]

Set common properties on a layer by name.

Source code in src/napari_mcp/server.py
@staticmethod
async def set_layer_properties(
    name: str,
    visible: bool | None = None,
    opacity: float | None = None,
    colormap: str | None = None,
    blending: str | None = None,
    contrast_limits: list[float] | None = None,
    gamma: float | str | None = None,
    new_name: str | None = None,
) -> dict[str, Any]:
    """Set common properties on a layer by name."""
    async with _viewer_lock:

        def _set():
            v = _ensure_viewer()
            if name not in v.layers:
                return {"status": "not_found", "name": name}
            lyr = v.layers[name]
            if visible is not None and hasattr(lyr, "visible"):
                lyr.visible = _parse_bool(visible)
            if opacity is not None and hasattr(lyr, "opacity"):
                lyr.opacity = float(opacity)
            if colormap is not None and hasattr(lyr, "colormap"):
                lyr.colormap = colormap
            if blending is not None and hasattr(lyr, "blending"):
                lyr.blending = blending
            if contrast_limits is not None and hasattr(lyr, "contrast_limits"):
                with contextlib.suppress(Exception):
                    lyr.contrast_limits = [
                        float(contrast_limits[0]),
                        float(contrast_limits[1]),
                    ]
            if gamma is not None and hasattr(lyr, "gamma"):
                lyr.gamma = float(gamma)
            if new_name is not None:
                lyr.name = new_name
            _process_events()
            return {"status": "ok", "name": lyr.name}

        return _gui_execute(_set)

reorder_layer async staticmethod

reorder_layer(name: str, index: int | str | None = None, before: str | None = None, after: str | None = None) -> dict[str, Any]

Reorder a layer by name.

Provide exactly one of: - index: absolute target index - before: move before this layer name - after: move after this layer name

Source code in src/napari_mcp/server.py
@staticmethod
async def reorder_layer(
    name: str,
    index: int | str | None = None,
    before: str | None = None,
    after: str | None = None,
) -> dict[str, Any]:
    """
    Reorder a layer by name.

    Provide exactly one of:
    - index: absolute target index
    - before: move before this layer name
    - after: move after this layer name
    """
    async with _viewer_lock:

        def _reorder():
            v = _ensure_viewer()
            if name not in v.layers:
                return {"status": "not_found", "name": name}
            if sum(x is not None for x in (index, before, after)) != 1:
                return {
                    "status": "error",
                    "message": "Provide exactly one of index, before, or after",
                }
            cur = v.layers.index(name)
            target = cur
            if index is not None:
                target = max(0, min(int(index), len(v.layers) - 1))
            elif before is not None:
                if before not in v.layers:
                    return {"status": "not_found", "name": before}
                target = v.layers.index(before)
            elif after is not None:
                if after not in v.layers:
                    return {"status": "not_found", "name": after}
                target = v.layers.index(after) + 1
            if target != cur:
                v.layers.move(cur, target)
            _process_events()
            return {"status": "ok", "name": name, "index": v.layers.index(name)}

        return _gui_execute(_reorder)

set_active_layer async staticmethod

set_active_layer(name: str) -> dict[str, Any]

Set the selected/active layer by name.

Source code in src/napari_mcp/server.py
@staticmethod
async def set_active_layer(name: str) -> dict[str, Any]:
    """Set the selected/active layer by name."""
    async with _viewer_lock:

        def _set_active():
            v = _ensure_viewer()
            if name not in v.layers:
                return {"status": "not_found", "name": name}
            v.layers.selection = {v.layers[name]}
            _process_events()
            return {"status": "ok", "active": name}

        return _gui_execute(_set_active)

reset_view async staticmethod

reset_view() -> dict[str, Any]

Reset the camera view to fit data.

Source code in src/napari_mcp/server.py
@staticmethod
async def reset_view() -> dict[str, Any]:
    """Reset the camera view to fit data."""
    async with _viewer_lock:

        def _reset():
            v = _ensure_viewer()
            v.reset_view()
            _process_events()
            return {"status": "ok"}

        return _gui_execute(_reset)

set_camera async staticmethod

set_camera(center: list[float] | None = None, zoom: float | str | None = None, angle: float | str | None = None) -> dict[str, Any]

Set camera properties: center, zoom, and/or angle.

Source code in src/napari_mcp/server.py
@staticmethod
async def set_camera(
    center: list[float] | None = None,
    zoom: float | str | None = None,
    angle: float | str | None = None,
) -> dict[str, Any]:
    """Set camera properties: center, zoom, and/or angle."""
    async with _viewer_lock:

        def _set_cam():
            v = _ensure_viewer()
            if center is not None:
                v.camera.center = list(map(float, center))
            if zoom is not None:
                v.camera.zoom = float(zoom)
            if angle is not None:
                v.camera.angles = (float(angle),)
            _process_events()
            return {
                "status": "ok",
                "center": list(map(float, v.camera.center)),
                "zoom": float(v.camera.zoom),
            }

        return _gui_execute(_set_cam)

set_ndisplay async staticmethod

set_ndisplay(ndisplay: int | str) -> dict[str, Any]

Set number of displayed dimensions (2 or 3).

Source code in src/napari_mcp/server.py
@staticmethod
async def set_ndisplay(ndisplay: int | str) -> dict[str, Any]:
    """Set number of displayed dimensions (2 or 3)."""
    async with _viewer_lock:

        def _set():
            v = _ensure_viewer()
            v.dims.ndisplay = int(ndisplay)
            _process_events()
            return {"status": "ok", "ndisplay": int(v.dims.ndisplay)}

        return _gui_execute(_set)

set_dims_current_step async staticmethod

set_dims_current_step(axis: int | str, value: int | str) -> dict[str, Any]

Set the current step (slider position) for a specific axis.

Source code in src/napari_mcp/server.py
@staticmethod
async def set_dims_current_step(
    axis: int | str, value: int | str
) -> dict[str, Any]:
    """Set the current step (slider position) for a specific axis."""
    async with _viewer_lock:

        def _set():
            v = _ensure_viewer()
            v.dims.set_current_step(int(axis), int(value))
            _process_events()
            return {"status": "ok", "axis": int(axis), "value": int(value)}

        return _gui_execute(_set)

set_grid async staticmethod

set_grid(enabled: bool | str = True) -> dict[str, Any]

Enable or disable grid view.

Source code in src/napari_mcp/server.py
@staticmethod
async def set_grid(enabled: bool | str = True) -> dict[str, Any]:
    """Enable or disable grid view."""
    async with _viewer_lock:

        def _set():
            v = _ensure_viewer()
            v.grid.enabled = _parse_bool(enabled)
            _process_events()
            return {"status": "ok", "grid": _parse_bool(v.grid.enabled)}

        return _gui_execute(_set)

screenshot async staticmethod

screenshot(canvas_only: bool | str = True) -> ImageContent

Take a screenshot of the napari canvas and return as base64.

Parameters:

Name Type Description Default
canvas_only bool

If True, only capture the canvas area.

True

Returns:

Type Description
ImageContent

The screenshot image as an mcp.types.ImageContent object.

Source code in src/napari_mcp/server.py
@staticmethod
async def screenshot(canvas_only: bool | str = True) -> ImageContent:
    """
    Take a screenshot of the napari canvas and return as base64.

    Parameters
    ----------
    canvas_only : bool, default=True
        If True, only capture the canvas area.

    Returns
    -------
    ImageContent
        The screenshot image as an mcp.types.ImageContent object.
    """
    # Try to proxy to external viewer first
    result = await _proxy_to_external("screenshot", {"canvas_only": canvas_only})
    if result is not None:
        return result

    # Local execution
    async with _viewer_lock:

        def _shot():
            v = _ensure_viewer()
            _process_events(3)
            arr = v.screenshot(canvas_only=canvas_only)
            if not isinstance(arr, np.ndarray):
                arr = np.asarray(arr)
            if arr.dtype != np.uint8:
                arr = arr.astype(np.uint8, copy=False)
            img = Image.fromarray(arr)
            buf = BytesIO()
            img.save(buf, format="PNG")
            enc = buf.getvalue()
            return fastmcp.utilities.types.Image(
                data=enc, format="png"
            ).to_image_content()

        return _gui_execute(_shot)

timelapse_screenshot async staticmethod

timelapse_screenshot(axis: int | str, slice_range: str, canvas_only: bool | str = True, interpolate_to_fit: bool = True) -> list[ImageContent]

Capture a series of screenshots while sweeping a dims axis.

Parameters:

Name Type Description Default
axis int

Dims axis index to sweep (e.g., temporal axis).

required
slice_range str

Python-like slice string over step indices, e.g. "1:5", ":6", "::2". Defaults follow Python semantics with start=0, stop=nsteps, step=1.

required
canvas_only bool

If True, only capture the canvas area.

True
interpolate_to_fit bool

If True, interpolate the images to fit the total size cap of 1309246 bytes.

False

Returns:

Type Description
list[ImageContent]

List of screenshots as mcp.types.ImageContent objects.

Source code in src/napari_mcp/server.py
@staticmethod
async def timelapse_screenshot(
    axis: int | str,
    slice_range: str,
    canvas_only: bool | str = True,
    interpolate_to_fit: bool = True,
) -> list[ImageContent]:
    """
    Capture a series of screenshots while sweeping a dims axis.

    Parameters
    ----------
    axis : int
        Dims axis index to sweep (e.g., temporal axis).
    slice_range : str
        Python-like slice string over step indices, e.g. "1:5", ":6", "::2".
        Defaults follow Python semantics with start=0, stop=nsteps, step=1.
    canvas_only : bool, default=True
        If True, only capture the canvas area.
    interpolate_to_fit : bool, default=False
        If True, interpolate the images to fit the total size cap of 1309246 bytes.

    Returns
    -------
    list[ImageContent]
        List of screenshots as mcp.types.ImageContent objects.
    """
    max_total_base64_bytes = 1309246 if interpolate_to_fit else None

    # Try to proxy to external viewer first
    result = await _proxy_to_external(
        "timelapse_screenshot",
        {
            "axis": axis,
            "slice_range": slice_range,
            "canvas_only": canvas_only,
            "interpolate_to_fit": interpolate_to_fit,
        },
    )
    if result is not None:
        return result  # type: ignore[return-value]

    def _parse_slice(spec: str, length: int) -> list[int]:
        # Normalize
        s = (spec or "").strip()
        # Single integer
        if s and ":" not in s:
            try:
                idx = int(s)
            except Exception as err:
                raise ValueError(f"Invalid slice range: {spec!r}") from err
            if idx < 0:
                idx += length
            if not (0 <= idx < length):
                raise ValueError(
                    f"Index out of bounds for axis with {length} steps: {idx}"
                )
            return [idx]

        # Slice form start:stop:step
        parts = s.split(":")
        if len(parts) > 3:
            raise ValueError(f"Invalid slice range: {spec!r}")
        start_s, stop_s, step_s = (parts + [""] * 3)[:3]

        def _to_int_or_none(val: str) -> int | None:
            v = val.strip()
            if v == "":
                return None
            return int(v)

        start = _to_int_or_none(start_s)
        stop = _to_int_or_none(stop_s)
        step = _to_int_or_none(step_s) or 1
        if step == 0:
            raise ValueError("slice step cannot be 0")

        # Handle negatives like Python
        if start is None:
            start = 0 if step > 0 else length - 1
        if stop is None:
            stop = length if step > 0 else -1
        if start < 0:
            start += length
        if stop < 0:
            stop += length

        # Clamp to valid iteration range similar to range() behavior
        rng = range(start, stop, step)
        indices = [i for i in rng if 0 <= i < length]
        return indices

    # Local execution
    async with _viewer_lock:

        def _run_series():
            v = _ensure_viewer()

            # Determine number of steps along axis
            try:
                nsteps_tuple = getattr(v.dims, "nsteps", None)
                if nsteps_tuple is None:
                    # Fallback: infer from current_step length and a conservative stop
                    # We cannot reliably infer total steps without dims.nsteps; require it
                    raise AttributeError
                total = int(nsteps_tuple[int(axis)])
            except Exception:
                # Best effort via bounds from layers; may be approximate
                try:
                    total = max(
                        int(getattr(lyr.data, "shape", [1])[(int(axis))])
                        if int(axis) < getattr(lyr.data, "ndim", 0)
                        else 1
                        for lyr in v.layers
                    )
                except Exception:
                    total = 0

            if total <= 0:
                raise RuntimeError(
                    "Unable to determine number of steps for the given axis"
                )

            indices = _parse_slice(slice_range, total)
            if not indices:
                return []

            # Take a sample at the first index to estimate size
            v.dims.set_current_step(int(axis), int(indices[0]))
            _process_events(2)
            sample_arr = v.screenshot(canvas_only=canvas_only)
            if not isinstance(sample_arr, np.ndarray):
                sample_arr = np.asarray(sample_arr)
            if sample_arr.dtype != np.uint8:
                sample_arr = sample_arr.astype(np.uint8, copy=False)
            sample_img = Image.fromarray(sample_arr)
            sbuf = BytesIO()
            sample_img.save(sbuf, format="PNG")
            sample_png = sbuf.getvalue()
            sample_b64_len = ((len(sample_png) + 2) // 3) * 4

            # Ask user whether to downsample if estimated total exceeds cap
            downsample_factor = 1.0
            if (
                max_total_base64_bytes is not None
                and sample_b64_len * len(indices) > max_total_base64_bytes
            ):
                est_factor = math.sqrt(
                    max_total_base64_bytes
                    / float(max(1, sample_b64_len * len(indices)))
                )
                downsample_factor = max(0.05, min(1.0, est_factor))

            images: list[ImageContent] = []  # type: ignore
            total_b64_len = 0
            for idx in indices:
                # Move slider
                v.dims.set_current_step(int(axis), int(idx))
                _process_events(2)

                # Capture
                arr = v.screenshot(canvas_only=canvas_only)
                if not isinstance(arr, np.ndarray):
                    arr = np.asarray(arr)
                if arr.dtype != np.uint8:
                    arr = arr.astype(np.uint8, copy=False)

                img = Image.fromarray(arr)
                if downsample_factor < 1.0:
                    new_w = max(1, int(img.width * downsample_factor))
                    new_h = max(1, int(img.height * downsample_factor))
                    if new_w != img.width or new_h != img.height:
                        img = img.resize((new_w, new_h), resample=Image.BILINEAR)
                buf = BytesIO()
                img.save(buf, format="PNG")
                enc = buf.getvalue()
                b64_len = ((len(enc) + 2) // 3) * 4
                if (
                    max_total_base64_bytes is not None
                    and total_b64_len + b64_len > max_total_base64_bytes
                ):
                    break
                total_b64_len += b64_len
                images.append(
                    fastmcp.utilities.types.Image(
                        data=enc, format="png"
                    ).to_image_content()
                )

            return images

        return _gui_execute(_run_series)

execute_code async staticmethod

execute_code(code: str, line_limit: int | str = 30) -> dict[str, Any]

Execute arbitrary Python code in the server's interpreter.

Similar to napari's console. The execution namespace persists across calls and includes 'viewer', 'napari', and 'np'.

Parameters:

Name Type Description Default
code str

Python code string. The value of the last expression (if any) is returned as 'result_repr'.

required
line_limit int

Maximum number of output lines to return. Use -1 for unlimited output. Warning: Using -1 may consume a large number of tokens.

30

Returns:

Type Description
dict

Dictionary with 'status', optional 'result_repr', 'stdout', 'stderr', and 'output_id' for retrieving full output if truncated.

Source code in src/napari_mcp/server.py
@staticmethod
async def execute_code(code: str, line_limit: int | str = 30) -> dict[str, Any]:
    """
    Execute arbitrary Python code in the server's interpreter.

    Similar to napari's console. The execution namespace persists across calls
    and includes 'viewer', 'napari', and 'np'.

    Parameters
    ----------
    code : str
        Python code string. The value of the last expression (if any)
        is returned as 'result_repr'.
    line_limit : int, default=30
        Maximum number of output lines to return. Use -1 for unlimited output.
        Warning: Using -1 may consume a large number of tokens.

    Returns
    -------
    dict
        Dictionary with 'status', optional 'result_repr', 'stdout', 'stderr',
        and 'output_id' for retrieving full output if truncated.
    """
    # Try to proxy to external viewer first
    result = await _proxy_to_external("execute_code", {"code": code})
    if result is not None:
        return result

    # Local execution
    async with _viewer_lock:
        v = _ensure_viewer()
        _exec_globals.setdefault("__builtins__", __builtins__)  # type: ignore[assignment]
        _exec_globals["viewer"] = v
        napari_mod = napari
        if napari_mod is not None:
            _exec_globals.setdefault("napari", napari_mod)
        _exec_globals.setdefault("np", np)

        stdout_buf = StringIO()
        stderr_buf = StringIO()
        result_repr: str | None = None
        try:
            # Capture stdout/stderr during execution
            with (
                contextlib.redirect_stdout(stdout_buf),
                contextlib.redirect_stderr(stderr_buf),
            ):
                # Try to evaluate last expression if present
                parsed = ast.parse(code, mode="exec")
                if parsed.body and isinstance(parsed.body[-1], ast.Expr):
                    # Execute all but last, then eval last expression to
                    # capture a result
                    if len(parsed.body) > 1:
                        exec_ast = ast.Module(
                            body=parsed.body[:-1], type_ignores=[]
                        )
                        exec(
                            compile(exec_ast, "<mcp-exec>", "exec"),
                            _exec_globals,
                            _exec_globals,
                        )
                    last_expr = ast.Expression(body=parsed.body[-1].value)
                    value = eval(
                        compile(last_expr, "<mcp-eval>", "eval"),
                        _exec_globals,
                        _exec_globals,
                    )
                    result_repr = repr(value)
                else:
                    # Pure statements
                    exec(
                        compile(parsed, "<mcp-exec>", "exec"),
                        _exec_globals,
                        _exec_globals,
                    )
            _process_events(2)

            # Get full output
            stdout_full = stdout_buf.getvalue()
            stderr_full = stderr_buf.getvalue()

            # Store full output and get ID
            output_id = await _store_output(
                tool_name="execute_code",
                stdout=stdout_full,
                stderr=stderr_full,
                result_repr=result_repr,
                code=code,
            )

            # Prepare response with line limiting
            response = {
                "status": "ok",
                "output_id": output_id,
                **({"result_repr": result_repr} if result_repr is not None else {}),
            }

            # Add warning for unlimited output
            if line_limit == -1:
                response["warning"] = (
                    "Unlimited output requested. This may consume a large number "
                    "of tokens. Consider using read_output for large outputs."
                )
                response["stdout"] = stdout_full
                response["stderr"] = stderr_full
            else:
                # Truncate stdout and stderr
                stdout_truncated, stdout_was_truncated = _truncate_output(
                    stdout_full, int(line_limit)
                )
                stderr_truncated, stderr_was_truncated = _truncate_output(
                    stderr_full, int(line_limit)
                )

                response["stdout"] = stdout_truncated
                response["stderr"] = stderr_truncated

                if stdout_was_truncated or stderr_was_truncated:
                    response["truncated"] = True  # type: ignore
                    response["message"] = (
                        f"Output truncated to {line_limit} lines. "
                        f"Use read_output('{output_id}') to retrieve full output."
                    )

            return response
        except Exception as e:
            _process_events(1)
            tb = traceback.format_exc()

            # Get full output including traceback
            stdout_full = stdout_buf.getvalue()
            stderr_full = stderr_buf.getvalue() + tb

            # Store full output and get ID
            output_id = await _store_output(
                tool_name="execute_code",
                stdout=stdout_full,
                stderr=stderr_full,
                code=code,
                error=True,
            )

            # Prepare error response with line limiting
            response = {
                "status": "error",
                "output_id": output_id,
            }

            # Add warning for unlimited output
            if line_limit == -1:
                response["warning"] = (
                    "Unlimited output requested. This may consume a large number "
                    "of tokens. Consider using read_output for large outputs."
                )
                response["stdout"] = stdout_full
                response["stderr"] = stderr_full
            else:
                # Truncate stdout and stderr
                stdout_truncated, stdout_was_truncated = _truncate_output(
                    stdout_full, int(line_limit)
                )
                stderr_truncated, stderr_was_truncated = _truncate_output(
                    stderr_full, int(line_limit)
                )

                response["stdout"] = stdout_truncated
                # Ensure exception summary is present even when truncated
                error_summary = f"{type(e).__name__}: {e}"
                if error_summary not in stderr_truncated:
                    # Append a concise summary line so callers can see the error type
                    if stderr_truncated and not stderr_truncated.endswith("\n"):
                        stderr_truncated += "\n"
                    stderr_truncated += error_summary + "\n"
                response["stderr"] = stderr_truncated

                if stdout_was_truncated or stderr_was_truncated:
                    response["truncated"] = True  # type: ignore
                    response["message"] = (
                        f"Output truncated to {line_limit} lines. "
                        f"Use read_output('{output_id}') to retrieve full output."
                    )

            return response

install_packages async staticmethod

install_packages(packages: list[str], upgrade: bool | None = False, no_deps: bool | None = False, index_url: str | None = None, extra_index_url: str | None = None, pre: bool | None = False, line_limit: int | str = 30, timeout: int = 240) -> dict[str, Any]

Install Python packages using pip.

Install packages into the currently running server environment.

Parameters:

Name Type Description Default
packages list of str

List of package specifiers (e.g., "scikit-image", "torch==2.3.1").

required
upgrade bool

If True, pass --upgrade flag.

False
no_deps bool

If True, pass --no-deps flag.

False
index_url str

Custom index URL.

None
extra_index_url str

Extra index URL.

None
pre bool

Allow pre-releases (--pre flag).

False
line_limit int

Maximum number of output lines to return. Use -1 for unlimited output. Warning: Using -1 may consume a large number of tokens.

30
timeout int

Timeout for pip install in seconds.

240

Returns:

Type Description
dict

Dictionary including status, returncode, stdout, stderr, command, and output_id for retrieving full output if truncated.

Source code in src/napari_mcp/server.py
@staticmethod
async def install_packages(
    packages: list[str],
    upgrade: bool | None = False,
    no_deps: bool | None = False,
    index_url: str | None = None,
    extra_index_url: str | None = None,
    pre: bool | None = False,
    line_limit: int | str = 30,
    timeout: int = 240,
) -> dict[str, Any]:
    """
    Install Python packages using pip.

    Install packages into the currently running server environment.

    Parameters
    ----------
    packages : list of str
        List of package specifiers (e.g., "scikit-image", "torch==2.3.1").
    upgrade : bool, optional
        If True, pass --upgrade flag.
    no_deps : bool, optional
        If True, pass --no-deps flag.
    index_url : str, optional
        Custom index URL.
    extra_index_url : str, optional
        Extra index URL.
    pre : bool, optional
        Allow pre-releases (--pre flag).
    line_limit : int, default=30
        Maximum number of output lines to return. Use -1 for unlimited output.
        Warning: Using -1 may consume a large number of tokens.
    timeout : int, default=240
        Timeout for pip install in seconds.

    Returns
    -------
    dict
        Dictionary including status, returncode, stdout, stderr, command,
        and output_id for retrieving full output if truncated.
    """
    # Try to proxy to external viewer first
    result = await _proxy_to_external(
        "install_packages",
        {
            "packages": packages,
            "upgrade": upgrade,
            "no_deps": no_deps,
            "index_url": index_url,
            "extra_index_url": extra_index_url,
            "pre": pre,
            "line_limit": line_limit,
            "timeout": timeout,
        },
    )
    if result is not None:
        return result

    if not packages or not isinstance(packages, list):
        return {
            "status": "error",
            "message": "Parameter 'packages' must be a non-empty list of package names",
        }

    cmd: list[str] = [
        sys.executable,
        "-m",
        "pip",
        "install",
        "--no-input",
        "--disable-pip-version-check",
    ]
    if upgrade:
        cmd.append("--upgrade")
    if no_deps:
        cmd.append("--no-deps")
    if pre:
        cmd.append("--pre")
    if index_url:
        cmd.extend(["--index-url", index_url])
    if extra_index_url:
        cmd.extend(["--extra-index-url", extra_index_url])
    cmd.extend(packages)

    # Run pip as a subprocess without blocking the event loop
    proc = await asyncio.create_subprocess_exec(
        *cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    try:
        stdout_b, stderr_b = await asyncio.wait_for(
            proc.communicate(), timeout=timeout
        )
    except asyncio.TimeoutError:
        with contextlib.suppress(ProcessLookupError):
            proc.kill()
        stdout_b, stderr_b = b"", f"pip install timed out after {timeout}s".encode()
    stdout = stdout_b.decode(errors="replace")
    stderr = stderr_b.decode(errors="replace")

    status = "ok" if proc.returncode == 0 else "error"
    command_str = " ".join(shlex.quote(part) for part in cmd)

    # Store full output and get ID
    output_id = await _store_output(
        tool_name="install_packages",
        stdout=stdout,
        stderr=stderr,
        packages=packages,
        command=command_str,
        returncode=proc.returncode,
    )

    # Prepare response with line limiting
    response = {
        "status": status,
        "returncode": proc.returncode if proc.returncode is not None else -1,
        "command": command_str,
        "output_id": output_id,
    }

    # Add warning for unlimited output
    if line_limit == -1:
        response["warning"] = (
            "Unlimited output requested. This may consume a large number "
            "of tokens. Consider using read_output for large outputs."
        )
        response["stdout"] = stdout
        response["stderr"] = stderr
    else:
        # Truncate stdout and stderr
        stdout_truncated, stdout_was_truncated = _truncate_output(
            stdout, int(line_limit)
        )
        stderr_truncated, stderr_was_truncated = _truncate_output(
            stderr, int(line_limit)
        )

        response["stdout"] = stdout_truncated
        response["stderr"] = stderr_truncated

        if stdout_was_truncated or stderr_was_truncated:
            response["truncated"] = True
            response["message"] = (
                f"Output truncated to {line_limit} lines. "
                f"Use read_output('{output_id}') to retrieve full output."
            )

    return response

read_output async staticmethod

read_output(output_id: str, start: int | str = 0, end: int | str = -1) -> dict[str, Any]

Read stored tool output with optional line range.

Parameters:

Name Type Description Default
output_id str

Unique ID of the stored output.

required
start int

Starting line number (0-indexed).

0
end int

Ending line number (exclusive). If -1, read to end.

-1

Returns:

Type Description
dict

Dictionary containing the requested output lines and metadata.

Source code in src/napari_mcp/server.py
@staticmethod
async def read_output(
    output_id: str, start: int | str = 0, end: int | str = -1
) -> dict[str, Any]:
    """
    Read stored tool output with optional line range.

    Parameters
    ----------
    output_id : str
        Unique ID of the stored output.
    start : int, default=0
        Starting line number (0-indexed).
    end : int, default=-1
        Ending line number (exclusive). If -1, read to end.

    Returns
    -------
    dict
        Dictionary containing the requested output lines and metadata.
    """
    async with _output_storage_lock:
        if output_id not in _output_storage:
            return {
                "status": "error",
                "message": f"Output ID '{output_id}' not found",
            }

        stored_output = _output_storage[output_id]

        # Combine stdout and stderr for line-based access
        full_output = ""
        if stored_output.get("stdout"):
            full_output = stored_output["stdout"]
        if stored_output.get("stderr"):
            stderr_text = stored_output["stderr"]
            if (
                full_output
                and not full_output.endswith("\n")
                and not stderr_text.startswith("\n")
            ):
                full_output += "\n"
            full_output += stderr_text

        # Normalize and clamp range inputs
        try:
            start = int(start)
        except Exception:
            start = 0
        try:
            end = int(end)
        except Exception:
            end = -1

        start = max(0, start)

        lines = full_output.splitlines(keepends=True)
        total_lines = len(lines)

        # Handle line range
        end = total_lines if end == -1 else min(total_lines, end)

        selected_lines = [] if start >= total_lines else lines[start:end]

        return {
            "status": "ok",
            "output_id": output_id,
            "tool_name": stored_output["tool_name"],
            "timestamp": stored_output["timestamp"],
            "lines": selected_lines,
            "line_range": {"start": start, "end": min(end, total_lines)},
            "total_lines": total_lines,
            "result_repr": stored_output.get("result_repr"),
        }