# Core — Authentication & Response Parsing


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

## Configuration

Credentials are loaded lazily via `get_config()`. A `RuntimeError` is
raised listing all missing variables at once, rather than failing
one-by-one at import time.

------------------------------------------------------------------------

### get_config

``` python

def get_config(
    
)->dict:

```

*Load and validate Snowflake configuration from environment variables.*

Returns a dict with keys: account, user, role, private_key_path,
database, agent_schema, mcp_schema, mcp_server_name. Required vars raise
RuntimeError if missing; optional vars have defaults.

## JWT Authentication

Snowflake’s REST APIs (MCP, Cortex Agent) authenticate via RSA key-pair
JWTs. The `JWTGenerator` class handles private key loading, public key
fingerprint calculation, and token caching with automatic refresh.

------------------------------------------------------------------------

### JWTGenerator

``` python

def JWTGenerator(
    account:str, user:str, private_key_path:str
):

```

*Generate Snowflake-compatible JWTs using RSA key-pair authentication.*

Tokens are cached and automatically refreshed when within 60 seconds of
expiry. The public key fingerprint is computed once from the private key
on first use.

------------------------------------------------------------------------

### reset_default_session

``` python

def reset_default_session(
    
)->None:

```

*Clear the cached default session, forcing re-creation on next access.*

------------------------------------------------------------------------

### default_session

``` python

def default_session(
    
)->SnowflakeSession:

```

*Return the default session, creating it lazily from env vars on first
call.*

The session is cached for the lifetime of the kernel/process. Changing
environment variables after the first call will not reconfigure the
existing session. Call `reset_default_session()` to force re-creation.

------------------------------------------------------------------------

### SnowflakeSession

``` python

def SnowflakeSession(
    account:str, user:str, role:str, private_key_path:str, database:str='AM_SKI_RESORT', agent_schema:str='AGENTS',
    mcp_schema:str='MCP_SERVERS', mcp_server_name:str='ski_resort_mcp'
):

```

*Holds Snowflake connection config, JWT generator, and header factory.*

The primary entry point for all authenticated Snowflake REST API calls.
Create one per account/user combination; reuse it across requests.

``` python
import os
import mcp_ski_resort.core as _core_mod

_saved = {k: os.environ.pop(k, None) for k in [
    "SNOWFLAKE_DATABASE", "SNOWFLAKE_AGENT_SCHEMA",
    "SNOWFLAKE_MCP_SCHEMA", "SNOWFLAKE_MCP_SERVER",
]}
_core_mod._session = None
try:
    s = SnowflakeSession.from_env()
    assert s.database == "AM_SKI_RESORT", f"expected AM_SKI_RESORT, got {s.database}"
    assert s.agent_schema == "AGENTS"
    assert s.mcp_schema == "MCP_SERVERS"
    assert s.mcp_server_name == "ski_resort_mcp"
    print("defaults OK")

    os.environ["SNOWFLAKE_DATABASE"] = "CUSTOM_DB"
    os.environ["SNOWFLAKE_MCP_SERVER"] = "custom_mcp"
    s2 = SnowflakeSession.from_env()
    assert s2.database == "CUSTOM_DB"
    assert s2.mcp_server_name == "custom_mcp"
    assert s2.agent_schema == "AGENTS"
    print("env overrides OK")

    os.environ["SNOWFLAKE_DATABASE"] = ""
    os.environ["SNOWFLAKE_AGENT_SCHEMA"] = ""
    s3 = SnowflakeSession.from_env()
    assert s3.database == "AM_SKI_RESORT", f"empty string should fall back, got {s3.database}"
    assert s3.agent_schema == "AGENTS"
    print("empty-string fallback OK")
finally:
    for k, v in _saved.items():
        if v is not None:
            os.environ[k] = v
        else:
            os.environ.pop(k, None)
    _core_mod._session = None
```

------------------------------------------------------------------------

### get_headers

``` python

def get_headers(
    accept:str='application/json'
)->dict:

```

*Return HTTP headers using the default session.*

Let’s verify the JWT generation works:

``` python
session = default_session()
token = session.jwt_gen.get_token()
print(f"Account:  {session.account}")
print(f"Host:     {session.host}")
print(f"User:     {session.user}")
print(f"JWT:      {token[:40]}...OK")
```

## Response Parsing

Snowflake returns result sets in a specific format where column names
live in `resultSetMetaData.rowType[].name` (not the often-empty
`columns` array). These helpers handle all MCP response formats:
Analyst, Agent, Search, SQL, and GENERIC (UDF).

------------------------------------------------------------------------

### result_set_to_dataframe

``` python

def result_set_to_dataframe(
    rs:dict
)->DataFrame:

```

*Convert a Snowflake result_set dict to a pandas DataFrame.*

Checks `resultSetMetaData.rowType` first for column names (the reliable
source), then falls back to the `columns` array, then dict keys, then
positional columns.

Here’s a quick test with the `rowType` path (the common case for Cortex
Agent responses):

``` python
sample_rs = {
    "data": [["2024-2025", "15234", "8721"]],
    "resultSetMetaData": {
        "rowType": [
            {"name": "SKI_SEASON", "type": "TEXT"},
            {"name": "TOTAL_VISITS", "type": "NUMBER"},
            {"name": "UNIQUE_VISITORS", "type": "NUMBER"},
        ]
    },
    "columns": []
}
df = result_set_to_dataframe(sample_rs)
assert list(df.columns) == ["SKI_SEASON", "TOTAL_VISITS", "UNIQUE_VISITORS"]
assert len(df) == 1
df
```

------------------------------------------------------------------------

### try_parse_json

``` python

def try_parse_json(
    text:str
):

```

*Attempt to parse a string as JSON, stripping markdown fences if
present.*

Returns the parsed object, or None on failure.

------------------------------------------------------------------------

### extract_text

``` python

def extract_text(
    response:dict
)->str:

```

*Extract concatenated text from an MCP JSON-RPC response.*

Only items with `type: text` are included. Non-text content types
(e.g. image, resource) are silently skipped.

------------------------------------------------------------------------

### extract_content

``` python

def extract_content(
    response:dict
)->list:

```

*Extract the content array from an MCP JSON-RPC response.*

------------------------------------------------------------------------

### parse_search_response

``` python

def parse_search_response(
    response:dict
)->DataFrame:

```

*Parse a Cortex Search MCP response into a DataFrame.*

------------------------------------------------------------------------

### parse_agent_response

``` python

def parse_agent_response(
    response:dict
)->dict:

```

*Parse a Cortex Agent MCP response into answer, SQL queries, and
result_sets.*

------------------------------------------------------------------------

### parse_analyst_response

``` python

def parse_analyst_response(
    response:dict
)->dict:

```

*Parse a Cortex Analyst MCP response into interpretations, SQL
statements, and result_sets.*

Returns lists to handle multi-statement responses correctly.

## Agent SSE Streaming

The Cortex Agent REST API uses Server-Sent Events (SSE) for streaming
responses. These helpers parse the SSE stream and normalize the various
event types into a clean interface.

------------------------------------------------------------------------

### build_agent_messages

``` python

def build_agent_messages(
    question:str, history:list[dict] | None=None
)->list:

```

*Build the messages payload for the Cortex Agent :run endpoint.*

------------------------------------------------------------------------

### build_agent_run_payload

``` python

def build_agent_run_payload(
    question:str, history:list[dict] | None=None, thread_id:str | None=None, parent_message_id:str | None=None
)->dict:

```

*Build the complete JSON payload for the Cortex Agent `:run` endpoint.*

Two modes:

- **Local-history mode** (default): `history` is formatted into the
  messages array. Conversation continuity is managed by the caller.
- **Thread mode** (`thread_id` provided): the server-side thread owns
  conversation continuity. `history` is **not** sent upstream – pass it
  to `AgentChat` for local transcript/inspection only.
  `parent_message_id` defaults to `"0"` for the first message in a
  thread.

When both `history` and `thread_id` are provided, thread mode wins for
the upstream payload. The caller is responsible for keeping local
history separately if desired.

------------------------------------------------------------------------

### stream_agent_sse

``` python

def stream_agent_sse(
    agent_name:str, question:str, history:list[dict] | None=None, session:__main__.SnowflakeSession | None=None,
    thread_id:str | None=None, parent_message_id:str | None=None
)->Generator:

```

*POST to the Cortex Agent :run endpoint and yield parsed SSE events.*

Each yielded dict has `{"event": str, "data": dict}`. Malformed JSON in
SSE data lines yields a `parse_error` event instead of being silently
dropped.

------------------------------------------------------------------------

### normalize_event

``` python

def normalize_event(
    raw_event:str, data:dict, seen_tool_result:bool=False
)->list:

```

*Translate raw Cortex Agent SSE events into clean normalized events.*

Returns a list of normalized `{event, data}` dicts. Event types: text,
thinking, thinking_complete, status, tool, sql, table, chart,
annotation, error, metadata, done, parse_error.

Note on `seen_tool_result`: Cortex Agents emit `response.text.delta` for
both thinking text (before tool execution) and final answer text (after
tool results come back). The `seen_tool_result` flag disambiguates:
before any tool_result event, text deltas are classified as `thinking`;
after, they become `text`. This is based on observed agent protocol
behavior and may need updating if the SSE event contract changes.

------------------------------------------------------------------------

### AgentChat

``` python

def AgentChat(
    agent_name:str, session:__main__.SnowflakeSession | None=None, verbose:bool=True, reporter:Optional=None,
    history:list[dict] | None=None, thread_id:str | None=None
):

```

*Stateful conversation wrapper for Cortex Agents.*

Operates in two modes:

- **Local-history mode** (default): conversation history is sent with
  each request. The caller owns continuity.
- **Thread mode** (`thread_id` provided): the server-side thread owns
  continuity. Local history is still recorded for inspection, debugging,
  and transcript export, but is **not** sent upstream.
  `parent_message_id` is tracked automatically from `metadata` events.

Lower-level control is available via `run_agent()` and
`stream_agent_sse()`.

------------------------------------------------------------------------

### create_thread

``` python

def create_thread(
    session:__main__.SnowflakeSession | None=None, origin_application:str='mcp_ski_resort'
)->str:

```

*Create a Cortex conversation thread. Returns the `thread_id` string.*

Only the thread identifier is returned. The full server response (which
may include additional metadata) is not preserved.

Raises `KeyError` if the response contains neither `thread_id` nor `id`.

------------------------------------------------------------------------

### run_agent

``` python

def run_agent(
    agent_name:str, question:str, history:list[dict] | None=None, verbose:bool=True, reporter:Optional=None,
    session:__main__.SnowflakeSession | None=None, thread_id:str | None=None, parent_message_id:str | None=None
)->AgentResult:

```

*Call a Cortex Agent with streaming SSE and accumulate the full result.*

Args: agent_name: Name of the Cortex Agent to call. question: User
question text. history: Optional conversation history. verbose: If True
and no reporter given, prints progress to stdout. reporter: Optional
callback `(event_type, event_data) -> None` for custom output. Callbacks
fire after each event is applied to the AgentResult. For table events,
edata is enriched: `{"result_set": ..., "rows": N, "cols": N}`. For all
other events, edata is passed through from normalize_event(). session:
Optional SnowflakeSession; uses default if None. thread_id: Optional
Cortex thread identifier for server-side continuity. parent_message_id:
Optional parent message for threading. Defaults to `"0"` when
`thread_id` is provided.

------------------------------------------------------------------------

### collect_agent_events

``` python

def collect_agent_events(
    agent_name:str, question:str, history:list[dict] | None=None, session:__main__.SnowflakeSession | None=None,
    thread_id:str | None=None, parent_message_id:str | None=None
)->AgentResult:

```

*Pure collector: stream agent events and accumulate into AgentResult.*

No printing or side effects. Use this when you want programmatic access
only.

------------------------------------------------------------------------

### iter_normalized_agent_events

``` python

def iter_normalized_agent_events(
    agent_name:str, question:str, history:list[dict] | None=None, session:__main__.SnowflakeSession | None=None,
    thread_id:str | None=None, parent_message_id:str | None=None
)->Generator:

```

*Stream and normalize Cortex Agent events in one step.*

Yields normalized `{"event": str, "data": dict}` dicts. Callers do not
need to track `seen_tool_result` or call `normalize_event()` themselves.
This is the recommended entry point for building custom SSE proxies.

------------------------------------------------------------------------

### AgentResult

``` python

def AgentResult(
    answer:str='', thinking:list=<factory>, sql_queries:list=<factory>, result_sets:list=<factory>,
    dataframes:list=<factory>, chart_specs:list=<factory>, tools_used:list=<factory>, statuses:list=<factory>,
    errors:list=<factory>, raw_events:list=<factory>, duration_seconds:float=0.0, thread_metadata:dict=<factory>
)->None:

```

*Accumulated result from a Cortex Agent streaming call.*

``` python
# ── Test _apply_normalized_event accumulation ──
r = AgentResult()
ct = ""

# 1. thinking -> thinking_complete flushes accumulated text
ct = _apply_normalized_event(r, "thinking", {"text": "Analyzing "}, ct)
ct = _apply_normalized_event(r, "thinking", {"text": "the data..."}, ct)
assert ct == "Analyzing the data..."
ct = _apply_normalized_event(r, "thinking_complete", {"text": ""}, ct)
assert r.thinking == ["Analyzing the data..."]
assert ct == ""

# 2. text event (post-tool-result) accumulates into answer
ct = _apply_normalized_event(r, "text", {"text": "The answer is 42."}, ct)
assert r.answer == "The answer is 42."

# 3. table event appends result_set and dataframe with correct shape
table_data = {
    "data": [["A", 1], ["B", 2], ["C", 3]],
    "resultSetMetaData": {
        "rowType": [{"name": "NAME"}, {"name": "VALUE"}]
    },
}
ct = _apply_normalized_event(r, "table", table_data, ct)
assert len(r.result_sets) == 1
assert len(r.dataframes) == 1
assert r.dataframes[0].shape == (3, 2)
assert list(r.dataframes[0].columns) == ["NAME", "VALUE"]

# 4. parse_error lands in errors list
ct = _apply_normalized_event(r, "parse_error", {"error": "bad json", "raw_event": "response.chart"}, ct)
assert r.errors == ["bad json"]

# 4b. metadata event stores in thread_metadata
ct = _apply_normalized_event(r, "metadata", {"message_id": "99"}, ct)
assert r.thread_metadata == {"message_id": "99"}

print("All _apply_normalized_event tests passed")

# ── Test event semantics: normalize_event + _apply + _finalize ──

# 5. Tool-less direct answer: all text.delta -> thinking, finalize promotes to answer
r2 = AgentResult()
ct2 = ""
for text in ["Hello, ", "how ", "can I help?"]:
    evts = normalize_event("response.text.delta", {"text": text}, seen_tool_result=False)
    for e in evts:
        ct2 = _apply_normalized_event(r2, e["event"], e["data"], ct2)
assert r2.answer == "", "before finalize, answer should be empty for tool-less flow"
assert ct2 == "Hello, how can I help?"
_finalize_agent_result(r2, ct2)
assert r2.answer == "Hello, how can I help?", f"finalize should promote thinking->answer, got {r2.answer!r}"
assert r2.thinking == ["Hello, how can I help?"]
print("tool-less direct answer OK")

# 6. Tool-using answer: text before tool -> thinking, text after -> answer
r3 = AgentResult()
ct3 = ""
seen = False
for e in normalize_event("response.text.delta", {"text": "Let me check..."}, seen_tool_result=False):
    ct3 = _apply_normalized_event(r3, e["event"], e["data"], ct3)
for e in normalize_event("response.tool_use", {"name": "query_db"}, seen_tool_result=False):
    ct3 = _apply_normalized_event(r3, e["event"], e["data"], ct3)
seen = True
tool_result_data = {"content": [{"json": {"sql": "SELECT 1", "result_set": {"data": [[1]], "columns": ["val"]}}}]}
for e in normalize_event("response.tool_result", tool_result_data, seen_tool_result=seen):
    ct3 = _apply_normalized_event(r3, e["event"], e["data"], ct3)
for e in normalize_event("response.text.delta", {"text": "The result is 1."}, seen_tool_result=True):
    ct3 = _apply_normalized_event(r3, e["event"], e["data"], ct3)
_finalize_agent_result(r3, ct3)
assert r3.answer == "The result is 1.", f"post-tool text should be answer, got {r3.answer!r}"
assert "Let me check..." in r3.thinking[0]
assert len(r3.sql_queries) == 1
assert len(r3.result_sets) == 1
assert r3.tools_used == ["query db"]
print("tool-using answer OK")

# 7. Error run: finalize should NOT promote thinking to answer
r4 = AgentResult()
ct4 = ""
for e in normalize_event("response.text.delta", {"text": "Starting..."}, seen_tool_result=False):
    ct4 = _apply_normalized_event(r4, e["event"], e["data"], ct4)
for e in normalize_event("response.error", {"message": "something broke"}, seen_tool_result=False):
    ct4 = _apply_normalized_event(r4, e["event"], e["data"], ct4)
_finalize_agent_result(r4, ct4)
assert r4.answer == "", "error run should not promote thinking to answer"
assert r4.errors == ["something broke"]
print("error run OK")

# 8. build_agent_messages constructs correct payload
msgs = build_agent_messages("hello", [{"role": "user", "content": "prior"}, {"role": "assistant", "content": "yes"}])
assert len(msgs) == 3
assert msgs[0]["role"] == "user"
assert msgs[0]["content"][0]["text"] == "prior"
assert msgs[2]["content"][0]["text"] == "hello"
print("build_agent_messages OK")

# ── build_agent_run_payload mode tests ──

# 9. Local-history mode
p1 = build_agent_run_payload("hello", [{"role": "user", "content": "prior"}])
assert "thread_id" not in p1
assert len(p1["messages"]) == 2

# 10. Thread mode — no history sent upstream
p2 = build_agent_run_payload("hello", thread_id="t-123")
assert p2["thread_id"] == "t-123"
assert p2["parent_message_id"] == "0"
assert len(p2["messages"]) == 1

# 11. Thread mode with explicit parent
p3 = build_agent_run_payload("hello", thread_id="t-123", parent_message_id="msg-5")
assert p3["parent_message_id"] == "msg-5"

# 12. history + thread_id — thread mode wins, history ignored upstream
p4 = build_agent_run_payload("hello", [{"role": "user", "content": "prior"}], thread_id="t-123")
assert len(p4["messages"]) == 1
assert p4["thread_id"] == "t-123"
print("build_agent_run_payload OK")

# ── Metadata event tests ──

# 13. normalize_event coerces message_id to string
evts = normalize_event("metadata", {"metadata": {"message_id": 12345}})
assert evts[0]["data"]["message_id"] == "12345"
assert evts[0]["event"] == "metadata"

# 14. normalize_event passes through string message_id
evts2 = normalize_event("metadata", {"metadata": {"message_id": "abc"}})
assert evts2[0]["data"]["message_id"] == "abc"

# 15. _apply stores metadata in thread_metadata
rm = AgentResult()
_apply_normalized_event(rm, "metadata", {"message_id": "42", "role": "assistant"}, "")
assert rm.thread_metadata == {"message_id": "42", "role": "assistant"}

# 16. Multiple metadata events merge
_apply_normalized_event(rm, "metadata", {"thread_id": "t-1"}, "")
assert rm.thread_metadata == {"message_id": "42", "role": "assistant", "thread_id": "t-1"}
print("metadata tests OK")

# ── AgentChat thread-mode tests (synthetic, no network) ──

def _fake_stream(agent_name, question, history=None, session=None,
                 thread_id=None, parent_message_id=None):
    """Fake stream_agent_sse that yields synthetic raw SSE events."""
    yield {"event": "response.text.delta", "data": {"text": f"Answer to: {question}"}}
    yield {"event": "metadata", "data": {"metadata": {"message_id": f"msg-{abs(hash(question)) % 1000}"}}}
    yield {"event": "done", "data": {}}

_real_stream = stream_agent_sse
try:
    globals()["stream_agent_sse"] = _fake_stream
    import mcp_ski_resort.core as _cm
    _cm.stream_agent_sse = _fake_stream

    chat = AgentChat("test_agent", verbose=False, thread_id="t-abc")

    # 17. repr shows thread mode
    assert "thread=t-abc" in repr(chat)

    # 18. First turn — parent starts as None
    r1 = chat.ask("question one")
    assert r1.answer == "Answer to: question one"
    assert r1.thread_metadata.get("message_id") is not None
    first_msg_id = r1.thread_metadata["message_id"]
    assert chat._parent_message_id == first_msg_id

    # 19. Local transcript grows
    assert len(chat.history) == 2
    assert chat.history[0] == {"role": "user", "content": "question one"}
    assert chat.history[1]["role"] == "assistant"

    # 20. Second turn — parent_message_id advances
    r2 = chat.ask("question two")
    second_msg_id = r2.thread_metadata["message_id"]
    assert chat._parent_message_id == second_msg_id
    assert first_msg_id != second_msg_id
    assert len(chat.history) == 4
    assert len(chat.results) == 2

    # 21. reset() clears transcript but keeps thread_id
    chat.reset()
    assert chat.thread_id == "t-abc"
    assert chat._parent_message_id is None
    assert len(chat.history) == 0
    assert len(chat.results) == 0

    # 22. last property
    assert chat.last is None
    r3 = chat.ask("after reset")
    assert chat.last is r3

    print("AgentChat thread-mode tests OK")
finally:
    globals()["stream_agent_sse"] = _real_stream
    _cm.stream_agent_sse = _real_stream

print("\nAll tests passed")
```

    All _apply_normalized_event tests passed
    tool-less direct answer OK
    tool-using answer OK
    error run OK
    build_agent_messages OK
    build_agent_run_payload OK
    metadata tests OK
    AgentChat thread-mode tests OK

    All tests passed
