import inspect
from mcp_ski_resort.core import (
stream_agent_sse, iter_normalized_agent_events, run_agent,
collect_agent_events, create_thread, AgentChat,
)
sync_async_pairs = [
(stream_agent_sse, async_stream_agent_sse),
(iter_normalized_agent_events, async_iter_normalized_agent_events),
(run_agent, async_run_agent),
(collect_agent_events, async_collect_agent_events),
(create_thread, async_create_thread),
]
for sync_fn, async_fn in sync_async_pairs:
sync_params = set(inspect.signature(sync_fn).parameters.keys())
async_params = set(inspect.signature(async_fn).parameters.keys())
extra = async_params - sync_params
assert extra <= {"client"}, f"{async_fn.__name__} has unexpected extra params: {extra - {'client'}}"
missing = sync_params - async_params
assert not missing, f"{async_fn.__name__} missing params: {missing}"
print(f" {sync_fn.__name__} <-> {async_fn.__name__}: OK")
for method in ["ask", "reset", "last"]:
assert hasattr(AsyncAgentChat, method), f"AsyncAgentChat missing {method}"
print(" AsyncAgentChat API parity: OK")
ac = AsyncAgentChat("test", thread_id="t-1")
assert "thread=t-1" in repr(ac)
ac.reset()
assert ac.thread_id == "t-1"
print(" AsyncAgentChat reset keeps thread_id: OK")
print("\nAll async parity tests passed")Async Transport
httpx.
Async Streaming
These functions mirror the sync API in core.py but use httpx.AsyncClient for non-blocking I/O. All functions accept an optional client parameter for connection reuse (e.g., from a FastAPI lifespan).
async_stream_agent_sse
def async_stream_agent_sse(
agent_name:str, question:str, history:list[dict] | None=None,
session:mcp_ski_resort.core.SnowflakeSession | None=None, thread_id:str | None=None,
parent_message_id:str | None=None, client:httpx.AsyncClient | None=None
)->AsyncIterator:
POST to the Cortex Agent :run endpoint and yield parsed SSE events (async).
Each yielded dict has {"event": str, "data": dict}. Malformed JSON in SSE data lines yields a parse_error event instead of being silently dropped.
async_create_thread
async def async_create_thread(
session:mcp_ski_resort.core.SnowflakeSession | None=None, origin_application:str='mcp_ski_resort',
client:httpx.AsyncClient | None=None
)->str:
Create a Cortex conversation thread (async). Returns the thread_id string.
Raises KeyError if the response contains neither thread_id nor id.
AsyncAgentChat
def AsyncAgentChat(
agent_name:str, session:mcp_ski_resort.core.SnowflakeSession | None=None, verbose:bool=True,
reporter:collections.abc.Callable[[str, dict], None] | None=None, history:list[dict] | None=None,
thread_id:str | None=None, client:httpx.AsyncClient | None=None
):
Async stateful conversation wrapper for Cortex Agents.
Operates in two modes:
- Local-history mode (default): conversation history is sent with each request.
- Thread mode (
thread_idprovided): the server-side thread owns continuity. Local history is still recorded for inspection.
Lower-level control is available via async_run_agent() and async_stream_agent_sse().
async_run_agent
async def async_run_agent(
agent_name:str, question:str, history:list[dict] | None=None, verbose:bool=True,
reporter:collections.abc.Callable[[str, dict], None] | None=None,
session:mcp_ski_resort.core.SnowflakeSession | None=None, thread_id:str | None=None,
parent_message_id:str | None=None, client:httpx.AsyncClient | None=None
)->AgentResult:
Call a Cortex Agent with async streaming SSE and accumulate the result.
Args: agent_name: Name of the Cortex Agent to call. question: User question text. history: Optional conversation history. verbose: If True and no reporter given, prints progress to stdout. reporter: Optional callback (event_type, event_data) -> None. session: Optional SnowflakeSession; uses default if None. thread_id: Optional Cortex thread identifier for server-side continuity. parent_message_id: Optional parent message for threading. client: Optional httpx.AsyncClient for connection reuse.
async_collect_agent_events
async def async_collect_agent_events(
agent_name:str, question:str, history:list[dict] | None=None,
session:mcp_ski_resort.core.SnowflakeSession | None=None, thread_id:str | None=None,
parent_message_id:str | None=None, client:httpx.AsyncClient | None=None
)->AgentResult:
Pure async collector: stream and accumulate into AgentResult.
async_iter_normalized_agent_events
def async_iter_normalized_agent_events(
agent_name:str, question:str, history:list[dict] | None=None,
session:mcp_ski_resort.core.SnowflakeSession | None=None, thread_id:str | None=None,
parent_message_id:str | None=None, client:httpx.AsyncClient | None=None
)->AsyncIterator:
Stream and normalize Cortex Agent events in one step (async).
Yields normalized {"event": str, "data": dict} dicts. This is the recommended entry point for building async SSE proxies.