Core — Authentication & Response Parsing

Shared utilities for Snowflake MCP Server and Cortex Agent interactions

Configuration

Credentials are loaded lazily via get_config(). A RuntimeError is raised listing all missing variables at once, rather than failing one-by-one at import time.


get_config


def get_config(
    
)->dict:

Load and validate Snowflake configuration from environment variables.

Returns a dict with keys: account, user, role, private_key_path, database, agent_schema, mcp_schema, mcp_server_name. Required vars raise RuntimeError if missing; optional vars have defaults.

JWT Authentication

Snowflake’s REST APIs (MCP, Cortex Agent) authenticate via RSA key-pair JWTs. The JWTGenerator class handles private key loading, public key fingerprint calculation, and token caching with automatic refresh.


JWTGenerator


def JWTGenerator(
    account:str, user:str, private_key_path:str
):

Generate Snowflake-compatible JWTs using RSA key-pair authentication.

Tokens are cached and automatically refreshed when within 60 seconds of expiry. The public key fingerprint is computed once from the private key on first use.


reset_default_session


def reset_default_session(
    
)->None:

Clear the cached default session, forcing re-creation on next access.


default_session


def default_session(
    
)->SnowflakeSession:

Return the default session, creating it lazily from env vars on first call.

The session is cached for the lifetime of the kernel/process. Changing environment variables after the first call will not reconfigure the existing session. Call reset_default_session() to force re-creation.


SnowflakeSession


def SnowflakeSession(
    account:str, user:str, role:str, private_key_path:str, database:str='AM_SKI_RESORT', agent_schema:str='AGENTS',
    mcp_schema:str='MCP_SERVERS', mcp_server_name:str='ski_resort_mcp'
):

Holds Snowflake connection config, JWT generator, and header factory.

The primary entry point for all authenticated Snowflake REST API calls. Create one per account/user combination; reuse it across requests.

import os
import mcp_ski_resort.core as _core_mod

_saved = {k: os.environ.pop(k, None) for k in [
    "SNOWFLAKE_DATABASE", "SNOWFLAKE_AGENT_SCHEMA",
    "SNOWFLAKE_MCP_SCHEMA", "SNOWFLAKE_MCP_SERVER",
]}
_core_mod._session = None
try:
    s = SnowflakeSession.from_env()
    assert s.database == "AM_SKI_RESORT", f"expected AM_SKI_RESORT, got {s.database}"
    assert s.agent_schema == "AGENTS"
    assert s.mcp_schema == "MCP_SERVERS"
    assert s.mcp_server_name == "ski_resort_mcp"
    print("defaults OK")

    os.environ["SNOWFLAKE_DATABASE"] = "CUSTOM_DB"
    os.environ["SNOWFLAKE_MCP_SERVER"] = "custom_mcp"
    s2 = SnowflakeSession.from_env()
    assert s2.database == "CUSTOM_DB"
    assert s2.mcp_server_name == "custom_mcp"
    assert s2.agent_schema == "AGENTS"
    print("env overrides OK")

    os.environ["SNOWFLAKE_DATABASE"] = ""
    os.environ["SNOWFLAKE_AGENT_SCHEMA"] = ""
    s3 = SnowflakeSession.from_env()
    assert s3.database == "AM_SKI_RESORT", f"empty string should fall back, got {s3.database}"
    assert s3.agent_schema == "AGENTS"
    print("empty-string fallback OK")
finally:
    for k, v in _saved.items():
        if v is not None:
            os.environ[k] = v
        else:
            os.environ.pop(k, None)
    _core_mod._session = None

get_headers


def get_headers(
    accept:str='application/json'
)->dict:

Return HTTP headers using the default session.

Let’s verify the JWT generation works:

session = default_session()
token = session.jwt_gen.get_token()
print(f"Account:  {session.account}")
print(f"Host:     {session.host}")
print(f"User:     {session.user}")
print(f"JWT:      {token[:40]}...OK")

Response Parsing

Snowflake returns result sets in a specific format where column names live in resultSetMetaData.rowType[].name (not the often-empty columns array). These helpers handle all MCP response formats: Analyst, Agent, Search, SQL, and GENERIC (UDF).


result_set_to_dataframe


def result_set_to_dataframe(
    rs:dict
)->DataFrame:

Convert a Snowflake result_set dict to a pandas DataFrame.

Checks resultSetMetaData.rowType first for column names (the reliable source), then falls back to the columns array, then dict keys, then positional columns.

Here’s a quick test with the rowType path (the common case for Cortex Agent responses):

sample_rs = {
    "data": [["2024-2025", "15234", "8721"]],
    "resultSetMetaData": {
        "rowType": [
            {"name": "SKI_SEASON", "type": "TEXT"},
            {"name": "TOTAL_VISITS", "type": "NUMBER"},
            {"name": "UNIQUE_VISITORS", "type": "NUMBER"},
        ]
    },
    "columns": []
}
df = result_set_to_dataframe(sample_rs)
assert list(df.columns) == ["SKI_SEASON", "TOTAL_VISITS", "UNIQUE_VISITORS"]
assert len(df) == 1
df

try_parse_json


def try_parse_json(
    text:str
):

Attempt to parse a string as JSON, stripping markdown fences if present.

Returns the parsed object, or None on failure.


extract_text


def extract_text(
    response:dict
)->str:

Extract concatenated text from an MCP JSON-RPC response.

Only items with type: text are included. Non-text content types (e.g. image, resource) are silently skipped.


extract_content


def extract_content(
    response:dict
)->list:

Extract the content array from an MCP JSON-RPC response.


parse_search_response


def parse_search_response(
    response:dict
)->DataFrame:

Parse a Cortex Search MCP response into a DataFrame.


parse_agent_response


def parse_agent_response(
    response:dict
)->dict:

Parse a Cortex Agent MCP response into answer, SQL queries, and result_sets.


parse_analyst_response


def parse_analyst_response(
    response:dict
)->dict:

Parse a Cortex Analyst MCP response into interpretations, SQL statements, and result_sets.

Returns lists to handle multi-statement responses correctly.

Agent SSE Streaming

The Cortex Agent REST API uses Server-Sent Events (SSE) for streaming responses. These helpers parse the SSE stream and normalize the various event types into a clean interface.


build_agent_messages


def build_agent_messages(
    question:str, history:list[dict] | None=None
)->list:

Build the messages payload for the Cortex Agent :run endpoint.


build_agent_run_payload


def build_agent_run_payload(
    question:str, history:list[dict] | None=None, thread_id:str | None=None, parent_message_id:str | None=None
)->dict:

Build the complete JSON payload for the Cortex Agent :run endpoint.

Two modes:

  • Local-history mode (default): history is formatted into the messages array. Conversation continuity is managed by the caller.
  • Thread mode (thread_id provided): the server-side thread owns conversation continuity. history is not sent upstream – pass it to AgentChat for local transcript/inspection only. parent_message_id defaults to "0" for the first message in a thread.

When both history and thread_id are provided, thread mode wins for the upstream payload. The caller is responsible for keeping local history separately if desired.


stream_agent_sse


def stream_agent_sse(
    agent_name:str, question:str, history:list[dict] | None=None, session:__main__.SnowflakeSession | None=None,
    thread_id:str | None=None, parent_message_id:str | None=None
)->Generator:

POST to the Cortex Agent :run endpoint and yield parsed SSE events.

Each yielded dict has {"event": str, "data": dict}. Malformed JSON in SSE data lines yields a parse_error event instead of being silently dropped.


normalize_event


def normalize_event(
    raw_event:str, data:dict, seen_tool_result:bool=False
)->list:

Translate raw Cortex Agent SSE events into clean normalized events.

Returns a list of normalized {event, data} dicts. Event types: text, thinking, thinking_complete, status, tool, sql, table, chart, annotation, error, metadata, done, parse_error.

Note on seen_tool_result: Cortex Agents emit response.text.delta for both thinking text (before tool execution) and final answer text (after tool results come back). The seen_tool_result flag disambiguates: before any tool_result event, text deltas are classified as thinking; after, they become text. This is based on observed agent protocol behavior and may need updating if the SSE event contract changes.


AgentChat


def AgentChat(
    agent_name:str, session:__main__.SnowflakeSession | None=None, verbose:bool=True, reporter:Optional=None,
    history:list[dict] | None=None, thread_id:str | None=None
):

Stateful conversation wrapper for Cortex Agents.

Operates in two modes:

  • Local-history mode (default): conversation history is sent with each request. The caller owns continuity.
  • Thread mode (thread_id provided): the server-side thread owns continuity. Local history is still recorded for inspection, debugging, and transcript export, but is not sent upstream. parent_message_id is tracked automatically from metadata events.

Lower-level control is available via run_agent() and stream_agent_sse().


create_thread


def create_thread(
    session:__main__.SnowflakeSession | None=None, origin_application:str='mcp_ski_resort'
)->str:

Create a Cortex conversation thread. Returns the thread_id string.

Only the thread identifier is returned. The full server response (which may include additional metadata) is not preserved.

Raises KeyError if the response contains neither thread_id nor id.


run_agent


def run_agent(
    agent_name:str, question:str, history:list[dict] | None=None, verbose:bool=True, reporter:Optional=None,
    session:__main__.SnowflakeSession | None=None, thread_id:str | None=None, parent_message_id:str | None=None
)->AgentResult:

Call a Cortex Agent with streaming SSE and accumulate the full result.

Args: agent_name: Name of the Cortex Agent to call. question: User question text. history: Optional conversation history. verbose: If True and no reporter given, prints progress to stdout. reporter: Optional callback (event_type, event_data) -> None for custom output. Callbacks fire after each event is applied to the AgentResult. For table events, edata is enriched: {"result_set": ..., "rows": N, "cols": N}. For all other events, edata is passed through from normalize_event(). session: Optional SnowflakeSession; uses default if None. thread_id: Optional Cortex thread identifier for server-side continuity. parent_message_id: Optional parent message for threading. Defaults to "0" when thread_id is provided.


collect_agent_events


def collect_agent_events(
    agent_name:str, question:str, history:list[dict] | None=None, session:__main__.SnowflakeSession | None=None,
    thread_id:str | None=None, parent_message_id:str | None=None
)->AgentResult:

Pure collector: stream agent events and accumulate into AgentResult.

No printing or side effects. Use this when you want programmatic access only.


iter_normalized_agent_events


def iter_normalized_agent_events(
    agent_name:str, question:str, history:list[dict] | None=None, session:__main__.SnowflakeSession | None=None,
    thread_id:str | None=None, parent_message_id:str | None=None
)->Generator:

Stream and normalize Cortex Agent events in one step.

Yields normalized {"event": str, "data": dict} dicts. Callers do not need to track seen_tool_result or call normalize_event() themselves. This is the recommended entry point for building custom SSE proxies.


AgentResult


def AgentResult(
    answer:str='', thinking:list=<factory>, sql_queries:list=<factory>, result_sets:list=<factory>,
    dataframes:list=<factory>, chart_specs:list=<factory>, tools_used:list=<factory>, statuses:list=<factory>,
    errors:list=<factory>, raw_events:list=<factory>, duration_seconds:float=0.0, thread_metadata:dict=<factory>
)->None:

Accumulated result from a Cortex Agent streaming call.

# ── Test _apply_normalized_event accumulation ──
r = AgentResult()
ct = ""

# 1. thinking -> thinking_complete flushes accumulated text
ct = _apply_normalized_event(r, "thinking", {"text": "Analyzing "}, ct)
ct = _apply_normalized_event(r, "thinking", {"text": "the data..."}, ct)
assert ct == "Analyzing the data..."
ct = _apply_normalized_event(r, "thinking_complete", {"text": ""}, ct)
assert r.thinking == ["Analyzing the data..."]
assert ct == ""

# 2. text event (post-tool-result) accumulates into answer
ct = _apply_normalized_event(r, "text", {"text": "The answer is 42."}, ct)
assert r.answer == "The answer is 42."

# 3. table event appends result_set and dataframe with correct shape
table_data = {
    "data": [["A", 1], ["B", 2], ["C", 3]],
    "resultSetMetaData": {
        "rowType": [{"name": "NAME"}, {"name": "VALUE"}]
    },
}
ct = _apply_normalized_event(r, "table", table_data, ct)
assert len(r.result_sets) == 1
assert len(r.dataframes) == 1
assert r.dataframes[0].shape == (3, 2)
assert list(r.dataframes[0].columns) == ["NAME", "VALUE"]

# 4. parse_error lands in errors list
ct = _apply_normalized_event(r, "parse_error", {"error": "bad json", "raw_event": "response.chart"}, ct)
assert r.errors == ["bad json"]

# 4b. metadata event stores in thread_metadata
ct = _apply_normalized_event(r, "metadata", {"message_id": "99"}, ct)
assert r.thread_metadata == {"message_id": "99"}

print("All _apply_normalized_event tests passed")

# ── Test event semantics: normalize_event + _apply + _finalize ──

# 5. Tool-less direct answer: all text.delta -> thinking, finalize promotes to answer
r2 = AgentResult()
ct2 = ""
for text in ["Hello, ", "how ", "can I help?"]:
    evts = normalize_event("response.text.delta", {"text": text}, seen_tool_result=False)
    for e in evts:
        ct2 = _apply_normalized_event(r2, e["event"], e["data"], ct2)
assert r2.answer == "", "before finalize, answer should be empty for tool-less flow"
assert ct2 == "Hello, how can I help?"
_finalize_agent_result(r2, ct2)
assert r2.answer == "Hello, how can I help?", f"finalize should promote thinking->answer, got {r2.answer!r}"
assert r2.thinking == ["Hello, how can I help?"]
print("tool-less direct answer OK")

# 6. Tool-using answer: text before tool -> thinking, text after -> answer
r3 = AgentResult()
ct3 = ""
seen = False
for e in normalize_event("response.text.delta", {"text": "Let me check..."}, seen_tool_result=False):
    ct3 = _apply_normalized_event(r3, e["event"], e["data"], ct3)
for e in normalize_event("response.tool_use", {"name": "query_db"}, seen_tool_result=False):
    ct3 = _apply_normalized_event(r3, e["event"], e["data"], ct3)
seen = True
tool_result_data = {"content": [{"json": {"sql": "SELECT 1", "result_set": {"data": [[1]], "columns": ["val"]}}}]}
for e in normalize_event("response.tool_result", tool_result_data, seen_tool_result=seen):
    ct3 = _apply_normalized_event(r3, e["event"], e["data"], ct3)
for e in normalize_event("response.text.delta", {"text": "The result is 1."}, seen_tool_result=True):
    ct3 = _apply_normalized_event(r3, e["event"], e["data"], ct3)
_finalize_agent_result(r3, ct3)
assert r3.answer == "The result is 1.", f"post-tool text should be answer, got {r3.answer!r}"
assert "Let me check..." in r3.thinking[0]
assert len(r3.sql_queries) == 1
assert len(r3.result_sets) == 1
assert r3.tools_used == ["query db"]
print("tool-using answer OK")

# 7. Error run: finalize should NOT promote thinking to answer
r4 = AgentResult()
ct4 = ""
for e in normalize_event("response.text.delta", {"text": "Starting..."}, seen_tool_result=False):
    ct4 = _apply_normalized_event(r4, e["event"], e["data"], ct4)
for e in normalize_event("response.error", {"message": "something broke"}, seen_tool_result=False):
    ct4 = _apply_normalized_event(r4, e["event"], e["data"], ct4)
_finalize_agent_result(r4, ct4)
assert r4.answer == "", "error run should not promote thinking to answer"
assert r4.errors == ["something broke"]
print("error run OK")

# 8. build_agent_messages constructs correct payload
msgs = build_agent_messages("hello", [{"role": "user", "content": "prior"}, {"role": "assistant", "content": "yes"}])
assert len(msgs) == 3
assert msgs[0]["role"] == "user"
assert msgs[0]["content"][0]["text"] == "prior"
assert msgs[2]["content"][0]["text"] == "hello"
print("build_agent_messages OK")

# ── build_agent_run_payload mode tests ──

# 9. Local-history mode
p1 = build_agent_run_payload("hello", [{"role": "user", "content": "prior"}])
assert "thread_id" not in p1
assert len(p1["messages"]) == 2

# 10. Thread mode — no history sent upstream
p2 = build_agent_run_payload("hello", thread_id="t-123")
assert p2["thread_id"] == "t-123"
assert p2["parent_message_id"] == "0"
assert len(p2["messages"]) == 1

# 11. Thread mode with explicit parent
p3 = build_agent_run_payload("hello", thread_id="t-123", parent_message_id="msg-5")
assert p3["parent_message_id"] == "msg-5"

# 12. history + thread_id — thread mode wins, history ignored upstream
p4 = build_agent_run_payload("hello", [{"role": "user", "content": "prior"}], thread_id="t-123")
assert len(p4["messages"]) == 1
assert p4["thread_id"] == "t-123"
print("build_agent_run_payload OK")

# ── Metadata event tests ──

# 13. normalize_event coerces message_id to string
evts = normalize_event("metadata", {"metadata": {"message_id": 12345}})
assert evts[0]["data"]["message_id"] == "12345"
assert evts[0]["event"] == "metadata"

# 14. normalize_event passes through string message_id
evts2 = normalize_event("metadata", {"metadata": {"message_id": "abc"}})
assert evts2[0]["data"]["message_id"] == "abc"

# 15. _apply stores metadata in thread_metadata
rm = AgentResult()
_apply_normalized_event(rm, "metadata", {"message_id": "42", "role": "assistant"}, "")
assert rm.thread_metadata == {"message_id": "42", "role": "assistant"}

# 16. Multiple metadata events merge
_apply_normalized_event(rm, "metadata", {"thread_id": "t-1"}, "")
assert rm.thread_metadata == {"message_id": "42", "role": "assistant", "thread_id": "t-1"}
print("metadata tests OK")

# ── AgentChat thread-mode tests (synthetic, no network) ──

def _fake_stream(agent_name, question, history=None, session=None,
                 thread_id=None, parent_message_id=None):
    """Fake stream_agent_sse that yields synthetic raw SSE events."""
    yield {"event": "response.text.delta", "data": {"text": f"Answer to: {question}"}}
    yield {"event": "metadata", "data": {"metadata": {"message_id": f"msg-{abs(hash(question)) % 1000}"}}}
    yield {"event": "done", "data": {}}

_real_stream = stream_agent_sse
try:
    globals()["stream_agent_sse"] = _fake_stream
    import mcp_ski_resort.core as _cm
    _cm.stream_agent_sse = _fake_stream

    chat = AgentChat("test_agent", verbose=False, thread_id="t-abc")

    # 17. repr shows thread mode
    assert "thread=t-abc" in repr(chat)

    # 18. First turn — parent starts as None
    r1 = chat.ask("question one")
    assert r1.answer == "Answer to: question one"
    assert r1.thread_metadata.get("message_id") is not None
    first_msg_id = r1.thread_metadata["message_id"]
    assert chat._parent_message_id == first_msg_id

    # 19. Local transcript grows
    assert len(chat.history) == 2
    assert chat.history[0] == {"role": "user", "content": "question one"}
    assert chat.history[1]["role"] == "assistant"

    # 20. Second turn — parent_message_id advances
    r2 = chat.ask("question two")
    second_msg_id = r2.thread_metadata["message_id"]
    assert chat._parent_message_id == second_msg_id
    assert first_msg_id != second_msg_id
    assert len(chat.history) == 4
    assert len(chat.results) == 2

    # 21. reset() clears transcript but keeps thread_id
    chat.reset()
    assert chat.thread_id == "t-abc"
    assert chat._parent_message_id is None
    assert len(chat.history) == 0
    assert len(chat.results) == 0

    # 22. last property
    assert chat.last is None
    r3 = chat.ask("after reset")
    assert chat.last is r3

    print("AgentChat thread-mode tests OK")
finally:
    globals()["stream_agent_sse"] = _real_stream
    _cm.stream_agent_sse = _real_stream

print("\nAll tests passed")
All _apply_normalized_event tests passed
tool-less direct answer OK
tool-using answer OK
error run OK
build_agent_messages OK
build_agent_run_payload OK
metadata tests OK
AgentChat thread-mode tests OK

All tests passed