Skip to content

academy_extensions.mcp

MCP Server interface to Academy Exchange.

AppContext

AppContext(
    exchange_client: ExchangeClient[Any],
    agents: set[AgentId[Any]] | None = None,
)

Application context with typed dependencies.

Parameters:

  • exchange_client (ExchangeClient[Any]) –

    The client used to communicate on the exchange.

Source code in academy_extensions/mcp.py
def __init__(
    self,
    exchange_client: ExchangeClient[Any],
    agents: set[AgentId[Any]] | None = None,
):
    self.exchange_client = exchange_client
    self.agents = agents if agents else set()

format_name

format_name(agent: AgentId[Any], action: str) -> str

Format an agent action into a tool name.

Source code in academy_extensions/mcp.py
def format_name(agent: AgentId[Any], action: str) -> str:
    """Format an agent action into a tool name."""
    return f'{agent.uid}_{action}'

wrap_agent async

wrap_agent(
    server: FastMCP, agent: Handle[Any]
) -> dict[str, str]

Wrap tool from agent for use by server.

Source code in academy_extensions/mcp.py
async def wrap_agent(server: FastMCP, agent: Handle[Any]) -> dict[str, str]:
    """Wrap tool from agent for use by server."""
    logger.debug(f'Starting wrap agent for {agent.agent_id}')
    agent_info = await agent.agent_describe()
    logger.debug(f'Got description for {agent.agent_id}')
    tools: dict[str, str] = {}
    for action, description in agent_info.actions.items():
        name = format_name(agent.agent_id, action)

        async def invoke(
            args: tuple[Any, ...],
            kwargs: dict[str, Any],
            name_: str = name,
            action_: str = action,
        ) -> Any:
            try:
                return await agent.action(action_, *args, **kwargs)
            except MailboxTerminatedError as e:
                server.remove_tool(name_)
                raise e

        desc = (
            f'This tool executes an action on agent {agent.agent_id}\n'
            f'Documentation: {description.doc}\n'
            f'Type Signature: {description.type_signature}\n'
            'Note: Arguments must be passed as `args`: a tuple of positional'
            'arguments and `kwargs`: a dictionary of key-word arguments.'
        )
        server.add_tool(
            invoke,
            name=name,
            title=action,
            description=desc,
        )
        tools[name] = desc
        logger.info(f'Added tool: {name}')

    return tools

update_tools async

update_tools(
    server: FastMCP,
    existing: set[AgentId[Any]],
    client: ExchangeClient[Any],
    base_class: type[Agent] = Agent,
    allow_subclasses: bool = True,
) -> dict[AgentId[Any], Task[Any]]

Update tools by discovering agents on the exchange.

Source code in academy_extensions/mcp.py
async def update_tools(
    server: FastMCP,
    existing: set[AgentId[Any]],
    client: ExchangeClient[Any],
    base_class: type[Agent] = Agent,
    allow_subclasses: bool = True,
) -> dict[AgentId[Any], asyncio.Task[Any]]:
    """Update tools by discovering agents on the exchange."""
    update_futures: dict[AgentId[Any], asyncio.Task[Any]] = {}
    agent_ids = await client.discover(
        base_class,
        allow_subclasses=allow_subclasses,
    )
    new_agents = set(agent_ids) - existing
    for agent_id in new_agents:
        logger.info(f'Adding agent {agent_id}')
        agent = Handle(agent_id)
        # Create task in case agent is not online
        update_futures[agent_id] = asyncio.create_task(
            wrap_agent(server, agent),
        )
        existing.add(agent_id)

    return update_futures

refresh_loop async

refresh_loop(
    server: FastMCP,
    context: AppContext,
    interval_s: int = 300,
) -> None

Regularly update the discovered tools.

Source code in academy_extensions/mcp.py
async def refresh_loop(
    server: FastMCP,
    context: AppContext,
    interval_s: int = 300,
) -> None:
    """Regularly update the discovered tools."""
    tasks: list[asyncio.Task[Any]] = []
    try:
        while True:
            updates = await update_tools(
                server,
                context.agents,
                context.exchange_client,
            )
            tasks.extend(updates.values())
            await asyncio.sleep(interval_s)
    except asyncio.CancelledError as e:
        for task in tasks:
            task.cancel()

        raise e

app_lifespan async

app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]

Initialize exchange client for lifespan of server.

Source code in academy_extensions/mcp.py
@asynccontextmanager
async def app_lifespan(
    server: FastMCP,
) -> AsyncIterator[AppContext]:
    """Initialize exchange client for lifespan of server."""
    if 'ACADEMY_MCP_EXCHANGE_ADDRESS' in os.environ:
        url = os.environ['ACADEMY_MCP_EXCHANGE_ADDRESS']
        auth = 'globus' if 'ACADEMY_MCP_EXCHANGE_AUTH' in os.environ else None
        logger.info(f'Connection to exchange at {url}')
        exchange_factory = HttpExchangeFactory(
            url,
            auth_method=auth,  # type: ignore
        )
    else:  # pragma: no cover
        logger.info(
            'Connection to exchange at https://exchange.academy-agents.org',
        )
        exchange_factory = HttpExchangeFactory(
            'https://exchange.academy-agents.org',
            auth_method='globus',
        )

    async with await exchange_factory.create_user_client() as client:
        logger.info('Connected to exchange')
        context = AppContext(exchange_client=client)
        refresh_task = asyncio.create_task(refresh_loop(server, context))
        yield context
        logger.info('Context exiting!')
        refresh_task.cancel()

add_agent async

add_agent(
    ctx: Context[ServerSession, AppContext], agent_uid: UUID
) -> dict[str, str]

Add agent to MCP server based on ID.

Parameters:

  • ctx (Context[ServerSession, AppContext]) –

    FastMCP context (provided)

  • agent_uid (UUID) –

    uuid of the agent to add to MCP server.

Returns:

  • dict[str, str]

    A dictionary of newely added actions and their docs.

Source code in academy_extensions/mcp.py
@mcp.tool()
async def add_agent(
    ctx: Context[ServerSession, AppContext],
    agent_uid: uuid.UUID,
) -> dict[str, str]:
    """Add agent to MCP server based on ID.

    Args:
        ctx: FastMCP context (provided)
        agent_uid: uuid of the agent to add to MCP server.

    Returns:
        A dictionary of newely added actions and their docs.
    """
    aid: AgentId[Any] = AgentId(uid=agent_uid)  # type: ignore[call-arg]
    agent: Handle[Any] = Handle(aid)
    tools = await wrap_agent(mcp, agent)
    ctx.request_context.lifespan_context.agents.add(aid)
    return tools

discover async

discover(
    ctx: Context[ServerSession, AppContext],
    agent: str,
    module: str,
    allow_subclasses: bool = True,
) -> tuple[UUID, ...]

Search for agents of type Agent on the exchange.

To search for all agents, use agent_type="Agent", module="academy.Agent".

Parameters:

  • ctx (Context[ServerSession, AppContext]) –

    FastMCP context (provided)

  • agent (str) –

    The type of the agent to return.

  • module (str) –

    The module where the agent was implemented.

  • allow_subclasses (bool, default: True ) –

    Return agents implementing subclasses of the agent.

Returns:

  • tuple[UUID, ...]

    Tuple of agent uids implementing the agent.

Source code in academy_extensions/mcp.py
@mcp.tool()
async def discover(
    ctx: Context[ServerSession, AppContext],
    agent: str,
    module: str,
    allow_subclasses: bool = True,
) -> tuple[uuid.UUID, ...]:
    """Search for agents of type Agent on the exchange.

    To search for all agents, use agent_type="Agent",
    module="academy.Agent".

    Args:
        ctx: FastMCP context (provided)
        agent: The type of the agent to return.
        module: The module where the agent was implemented.
        allow_subclasses: Return agents implementing subclasses of the
            agent.

    Returns:
        Tuple of agent uids implementing the agent.
    """
    exchange = ctx.request_context.lifespan_context.exchange_client

    fake_agent = type(agent, (Agent,), {'__module__': module})
    assert (
        f'{module}.{agent}' == f'{fake_agent.__module__}.{fake_agent.__name__}'
    )
    agent_ids = await exchange.discover(
        fake_agent,
        allow_subclasses=allow_subclasses,
    )
    return tuple(agent_id.uid for agent_id in agent_ids)