# Async Transport


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

## Async Streaming

These functions mirror the sync API in `core.py` but use
`httpx.AsyncClient` for non-blocking I/O. All functions accept an
optional `client` parameter for connection reuse (e.g., from a FastAPI
lifespan).

------------------------------------------------------------------------

### async_stream_agent_sse

``` python

def async_stream_agent_sse(
    agent_name:str, question:str, history:list[dict] | None=None,
    session:mcp_ski_resort.core.SnowflakeSession | None=None, thread_id:str | None=None,
    parent_message_id:str | None=None, client:httpx.AsyncClient | None=None
)->AsyncIterator:

```

*POST to the Cortex Agent :run endpoint and yield parsed SSE events
(async).*

Each yielded dict has `{"event": str, "data": dict}`. Malformed JSON in
SSE data lines yields a `parse_error` event instead of being silently
dropped.

------------------------------------------------------------------------

### async_create_thread

``` python

async def async_create_thread(
    session:mcp_ski_resort.core.SnowflakeSession | None=None, origin_application:str='mcp_ski_resort',
    client:httpx.AsyncClient | None=None
)->str:

```

*Create a Cortex conversation thread (async). Returns the `thread_id`
string.*

Raises `KeyError` if the response contains neither `thread_id` nor `id`.

------------------------------------------------------------------------

### AsyncAgentChat

``` python

def AsyncAgentChat(
    agent_name:str, session:mcp_ski_resort.core.SnowflakeSession | None=None, verbose:bool=True,
    reporter:collections.abc.Callable[[str, dict], None] | None=None, history:list[dict] | None=None,
    thread_id:str | None=None, client:httpx.AsyncClient | None=None
):

```

*Async stateful conversation wrapper for Cortex Agents.*

Operates in two modes:

- **Local-history mode** (default): conversation history is sent with
  each request.
- **Thread mode** (`thread_id` provided): the server-side thread owns
  continuity. Local history is still recorded for inspection.

Lower-level control is available via `async_run_agent()` and
`async_stream_agent_sse()`.

------------------------------------------------------------------------

### async_run_agent

``` python

async def async_run_agent(
    agent_name:str, question:str, history:list[dict] | None=None, verbose:bool=True,
    reporter:collections.abc.Callable[[str, dict], None] | None=None,
    session:mcp_ski_resort.core.SnowflakeSession | None=None, thread_id:str | None=None,
    parent_message_id:str | None=None, client:httpx.AsyncClient | None=None
)->AgentResult:

```

*Call a Cortex Agent with async streaming SSE and accumulate the
result.*

Args: agent_name: Name of the Cortex Agent to call. question: User
question text. history: Optional conversation history. verbose: If True
and no reporter given, prints progress to stdout. reporter: Optional
callback `(event_type, event_data) -> None`. session: Optional
SnowflakeSession; uses default if None. thread_id: Optional Cortex
thread identifier for server-side continuity. parent_message_id:
Optional parent message for threading. client: Optional
`httpx.AsyncClient` for connection reuse.

------------------------------------------------------------------------

### async_collect_agent_events

``` python

async def async_collect_agent_events(
    agent_name:str, question:str, history:list[dict] | None=None,
    session:mcp_ski_resort.core.SnowflakeSession | None=None, thread_id:str | None=None,
    parent_message_id:str | None=None, client:httpx.AsyncClient | None=None
)->AgentResult:

```

*Pure async collector: stream and accumulate into AgentResult.*

------------------------------------------------------------------------

### async_iter_normalized_agent_events

``` python

def async_iter_normalized_agent_events(
    agent_name:str, question:str, history:list[dict] | None=None,
    session:mcp_ski_resort.core.SnowflakeSession | None=None, thread_id:str | None=None,
    parent_message_id:str | None=None, client:httpx.AsyncClient | None=None
)->AsyncIterator:

```

*Stream and normalize Cortex Agent events in one step (async).*

Yields normalized `{"event": str, "data": dict}` dicts. This is the
recommended entry point for building async SSE proxies.

``` python
import inspect
from mcp_ski_resort.core import (
    stream_agent_sse, iter_normalized_agent_events, run_agent,
    collect_agent_events, create_thread, AgentChat,
)

sync_async_pairs = [
    (stream_agent_sse, async_stream_agent_sse),
    (iter_normalized_agent_events, async_iter_normalized_agent_events),
    (run_agent, async_run_agent),
    (collect_agent_events, async_collect_agent_events),
    (create_thread, async_create_thread),
]

for sync_fn, async_fn in sync_async_pairs:
    sync_params = set(inspect.signature(sync_fn).parameters.keys())
    async_params = set(inspect.signature(async_fn).parameters.keys())
    extra = async_params - sync_params
    assert extra <= {"client"}, f"{async_fn.__name__} has unexpected extra params: {extra - {'client'}}"
    missing = sync_params - async_params
    assert not missing, f"{async_fn.__name__} missing params: {missing}"
    print(f"  {sync_fn.__name__} <-> {async_fn.__name__}: OK")

for method in ["ask", "reset", "last"]:
    assert hasattr(AsyncAgentChat, method), f"AsyncAgentChat missing {method}"
print("  AsyncAgentChat API parity: OK")

ac = AsyncAgentChat("test", thread_id="t-1")
assert "thread=t-1" in repr(ac)
ac.reset()
assert ac.thread_id == "t-1"
print("  AsyncAgentChat reset keeps thread_id: OK")

print("\nAll async parity tests passed")
```
