import os
import mcp_ski_resort.core as _core_mod
_saved = {k: os.environ.pop(k, None) for k in [
"SNOWFLAKE_DATABASE", "SNOWFLAKE_AGENT_SCHEMA",
"SNOWFLAKE_MCP_SCHEMA", "SNOWFLAKE_MCP_SERVER",
]}
_core_mod._session = None
try:
s = SnowflakeSession.from_env()
assert s.database == "AM_SKI_RESORT", f"expected AM_SKI_RESORT, got {s.database}"
assert s.agent_schema == "AGENTS"
assert s.mcp_schema == "MCP_SERVERS"
assert s.mcp_server_name == "ski_resort_mcp"
print("defaults OK")
os.environ["SNOWFLAKE_DATABASE"] = "CUSTOM_DB"
os.environ["SNOWFLAKE_MCP_SERVER"] = "custom_mcp"
s2 = SnowflakeSession.from_env()
assert s2.database == "CUSTOM_DB"
assert s2.mcp_server_name == "custom_mcp"
assert s2.agent_schema == "AGENTS"
print("env overrides OK")
os.environ["SNOWFLAKE_DATABASE"] = ""
os.environ["SNOWFLAKE_AGENT_SCHEMA"] = ""
s3 = SnowflakeSession.from_env()
assert s3.database == "AM_SKI_RESORT", f"empty string should fall back, got {s3.database}"
assert s3.agent_schema == "AGENTS"
print("empty-string fallback OK")
finally:
for k, v in _saved.items():
if v is not None:
os.environ[k] = v
else:
os.environ.pop(k, None)
_core_mod._session = NoneCore — Authentication & Response Parsing
Configuration
Credentials are loaded lazily via get_config(). A RuntimeError is raised listing all missing variables at once, rather than failing one-by-one at import time.
get_config
def get_config(
)->dict:
Load and validate Snowflake configuration from environment variables.
Returns a dict with keys: account, user, role, private_key_path, database, agent_schema, mcp_schema, mcp_server_name. Required vars raise RuntimeError if missing; optional vars have defaults.
JWT Authentication
Snowflake’s REST APIs (MCP, Cortex Agent) authenticate via RSA key-pair JWTs. The JWTGenerator class handles private key loading, public key fingerprint calculation, and token caching with automatic refresh.
JWTGenerator
def JWTGenerator(
account:str, user:str, private_key_path:str
):
Generate Snowflake-compatible JWTs using RSA key-pair authentication.
Tokens are cached and automatically refreshed when within 60 seconds of expiry. The public key fingerprint is computed once from the private key on first use.
reset_default_session
def reset_default_session(
)->None:
Clear the cached default session, forcing re-creation on next access.
default_session
def default_session(
)->SnowflakeSession:
Return the default session, creating it lazily from env vars on first call.
The session is cached for the lifetime of the kernel/process. Changing environment variables after the first call will not reconfigure the existing session. Call reset_default_session() to force re-creation.
SnowflakeSession
def SnowflakeSession(
account:str, user:str, role:str, private_key_path:str, database:str='AM_SKI_RESORT', agent_schema:str='AGENTS',
mcp_schema:str='MCP_SERVERS', mcp_server_name:str='ski_resort_mcp'
):
Holds Snowflake connection config, JWT generator, and header factory.
The primary entry point for all authenticated Snowflake REST API calls. Create one per account/user combination; reuse it across requests.
get_headers
def get_headers(
accept:str='application/json'
)->dict:
Return HTTP headers using the default session.
Let’s verify the JWT generation works:
session = default_session()
token = session.jwt_gen.get_token()
print(f"Account: {session.account}")
print(f"Host: {session.host}")
print(f"User: {session.user}")
print(f"JWT: {token[:40]}...OK")Response Parsing
Snowflake returns result sets in a specific format where column names live in resultSetMetaData.rowType[].name (not the often-empty columns array). These helpers handle all MCP response formats: Analyst, Agent, Search, SQL, and GENERIC (UDF).
result_set_to_dataframe
def result_set_to_dataframe(
rs:dict
)->DataFrame:
Convert a Snowflake result_set dict to a pandas DataFrame.
Checks resultSetMetaData.rowType first for column names (the reliable source), then falls back to the columns array, then dict keys, then positional columns.
Here’s a quick test with the rowType path (the common case for Cortex Agent responses):
sample_rs = {
"data": [["2024-2025", "15234", "8721"]],
"resultSetMetaData": {
"rowType": [
{"name": "SKI_SEASON", "type": "TEXT"},
{"name": "TOTAL_VISITS", "type": "NUMBER"},
{"name": "UNIQUE_VISITORS", "type": "NUMBER"},
]
},
"columns": []
}
df = result_set_to_dataframe(sample_rs)
assert list(df.columns) == ["SKI_SEASON", "TOTAL_VISITS", "UNIQUE_VISITORS"]
assert len(df) == 1
dftry_parse_json
def try_parse_json(
text:str
):
Attempt to parse a string as JSON, stripping markdown fences if present.
Returns the parsed object, or None on failure.
extract_text
def extract_text(
response:dict
)->str:
Extract concatenated text from an MCP JSON-RPC response.
Only items with type: text are included. Non-text content types (e.g. image, resource) are silently skipped.
extract_content
def extract_content(
response:dict
)->list:
Extract the content array from an MCP JSON-RPC response.
parse_search_response
def parse_search_response(
response:dict
)->DataFrame:
Parse a Cortex Search MCP response into a DataFrame.
parse_agent_response
def parse_agent_response(
response:dict
)->dict:
Parse a Cortex Agent MCP response into answer, SQL queries, and result_sets.
parse_analyst_response
def parse_analyst_response(
response:dict
)->dict:
Parse a Cortex Analyst MCP response into interpretations, SQL statements, and result_sets.
Returns lists to handle multi-statement responses correctly.
Agent SSE Streaming
The Cortex Agent REST API uses Server-Sent Events (SSE) for streaming responses. These helpers parse the SSE stream and normalize the various event types into a clean interface.
build_agent_messages
def build_agent_messages(
question:str, history:list[dict] | None=None
)->list:
Build the messages payload for the Cortex Agent :run endpoint.
build_agent_run_payload
def build_agent_run_payload(
question:str, history:list[dict] | None=None, thread_id:str | None=None, parent_message_id:str | None=None
)->dict:
Build the complete JSON payload for the Cortex Agent :run endpoint.
Two modes:
- Local-history mode (default):
historyis formatted into the messages array. Conversation continuity is managed by the caller. - Thread mode (
thread_idprovided): the server-side thread owns conversation continuity.historyis not sent upstream – pass it toAgentChatfor local transcript/inspection only.parent_message_iddefaults to"0"for the first message in a thread.
When both history and thread_id are provided, thread mode wins for the upstream payload. The caller is responsible for keeping local history separately if desired.
stream_agent_sse
def stream_agent_sse(
agent_name:str, question:str, history:list[dict] | None=None, session:__main__.SnowflakeSession | None=None,
thread_id:str | None=None, parent_message_id:str | None=None
)->Generator:
POST to the Cortex Agent :run endpoint and yield parsed SSE events.
Each yielded dict has {"event": str, "data": dict}. Malformed JSON in SSE data lines yields a parse_error event instead of being silently dropped.
normalize_event
def normalize_event(
raw_event:str, data:dict, seen_tool_result:bool=False
)->list:
Translate raw Cortex Agent SSE events into clean normalized events.
Returns a list of normalized {event, data} dicts. Event types: text, thinking, thinking_complete, status, tool, sql, table, chart, annotation, error, metadata, done, parse_error.
Note on seen_tool_result: Cortex Agents emit response.text.delta for both thinking text (before tool execution) and final answer text (after tool results come back). The seen_tool_result flag disambiguates: before any tool_result event, text deltas are classified as thinking; after, they become text. This is based on observed agent protocol behavior and may need updating if the SSE event contract changes.
AgentChat
def AgentChat(
agent_name:str, session:__main__.SnowflakeSession | None=None, verbose:bool=True, reporter:Optional=None,
history:list[dict] | None=None, thread_id:str | None=None
):
Stateful conversation wrapper for Cortex Agents.
Operates in two modes:
- Local-history mode (default): conversation history is sent with each request. The caller owns continuity.
- Thread mode (
thread_idprovided): the server-side thread owns continuity. Local history is still recorded for inspection, debugging, and transcript export, but is not sent upstream.parent_message_idis tracked automatically frommetadataevents.
Lower-level control is available via run_agent() and stream_agent_sse().
create_thread
def create_thread(
session:__main__.SnowflakeSession | None=None, origin_application:str='mcp_ski_resort'
)->str:
Create a Cortex conversation thread. Returns the thread_id string.
Only the thread identifier is returned. The full server response (which may include additional metadata) is not preserved.
Raises KeyError if the response contains neither thread_id nor id.
run_agent
def run_agent(
agent_name:str, question:str, history:list[dict] | None=None, verbose:bool=True, reporter:Optional=None,
session:__main__.SnowflakeSession | None=None, thread_id:str | None=None, parent_message_id:str | None=None
)->AgentResult:
Call a Cortex Agent with streaming SSE and accumulate the full result.
Args: agent_name: Name of the Cortex Agent to call. question: User question text. history: Optional conversation history. verbose: If True and no reporter given, prints progress to stdout. reporter: Optional callback (event_type, event_data) -> None for custom output. Callbacks fire after each event is applied to the AgentResult. For table events, edata is enriched: {"result_set": ..., "rows": N, "cols": N}. For all other events, edata is passed through from normalize_event(). session: Optional SnowflakeSession; uses default if None. thread_id: Optional Cortex thread identifier for server-side continuity. parent_message_id: Optional parent message for threading. Defaults to "0" when thread_id is provided.
collect_agent_events
def collect_agent_events(
agent_name:str, question:str, history:list[dict] | None=None, session:__main__.SnowflakeSession | None=None,
thread_id:str | None=None, parent_message_id:str | None=None
)->AgentResult:
Pure collector: stream agent events and accumulate into AgentResult.
No printing or side effects. Use this when you want programmatic access only.
iter_normalized_agent_events
def iter_normalized_agent_events(
agent_name:str, question:str, history:list[dict] | None=None, session:__main__.SnowflakeSession | None=None,
thread_id:str | None=None, parent_message_id:str | None=None
)->Generator:
Stream and normalize Cortex Agent events in one step.
Yields normalized {"event": str, "data": dict} dicts. Callers do not need to track seen_tool_result or call normalize_event() themselves. This is the recommended entry point for building custom SSE proxies.
AgentResult
def AgentResult(
answer:str='', thinking:list=<factory>, sql_queries:list=<factory>, result_sets:list=<factory>,
dataframes:list=<factory>, chart_specs:list=<factory>, tools_used:list=<factory>, statuses:list=<factory>,
errors:list=<factory>, raw_events:list=<factory>, duration_seconds:float=0.0, thread_metadata:dict=<factory>
)->None:
Accumulated result from a Cortex Agent streaming call.
# ── Test _apply_normalized_event accumulation ──
r = AgentResult()
ct = ""
# 1. thinking -> thinking_complete flushes accumulated text
ct = _apply_normalized_event(r, "thinking", {"text": "Analyzing "}, ct)
ct = _apply_normalized_event(r, "thinking", {"text": "the data..."}, ct)
assert ct == "Analyzing the data..."
ct = _apply_normalized_event(r, "thinking_complete", {"text": ""}, ct)
assert r.thinking == ["Analyzing the data..."]
assert ct == ""
# 2. text event (post-tool-result) accumulates into answer
ct = _apply_normalized_event(r, "text", {"text": "The answer is 42."}, ct)
assert r.answer == "The answer is 42."
# 3. table event appends result_set and dataframe with correct shape
table_data = {
"data": [["A", 1], ["B", 2], ["C", 3]],
"resultSetMetaData": {
"rowType": [{"name": "NAME"}, {"name": "VALUE"}]
},
}
ct = _apply_normalized_event(r, "table", table_data, ct)
assert len(r.result_sets) == 1
assert len(r.dataframes) == 1
assert r.dataframes[0].shape == (3, 2)
assert list(r.dataframes[0].columns) == ["NAME", "VALUE"]
# 4. parse_error lands in errors list
ct = _apply_normalized_event(r, "parse_error", {"error": "bad json", "raw_event": "response.chart"}, ct)
assert r.errors == ["bad json"]
# 4b. metadata event stores in thread_metadata
ct = _apply_normalized_event(r, "metadata", {"message_id": "99"}, ct)
assert r.thread_metadata == {"message_id": "99"}
print("All _apply_normalized_event tests passed")
# ── Test event semantics: normalize_event + _apply + _finalize ──
# 5. Tool-less direct answer: all text.delta -> thinking, finalize promotes to answer
r2 = AgentResult()
ct2 = ""
for text in ["Hello, ", "how ", "can I help?"]:
evts = normalize_event("response.text.delta", {"text": text}, seen_tool_result=False)
for e in evts:
ct2 = _apply_normalized_event(r2, e["event"], e["data"], ct2)
assert r2.answer == "", "before finalize, answer should be empty for tool-less flow"
assert ct2 == "Hello, how can I help?"
_finalize_agent_result(r2, ct2)
assert r2.answer == "Hello, how can I help?", f"finalize should promote thinking->answer, got {r2.answer!r}"
assert r2.thinking == ["Hello, how can I help?"]
print("tool-less direct answer OK")
# 6. Tool-using answer: text before tool -> thinking, text after -> answer
r3 = AgentResult()
ct3 = ""
seen = False
for e in normalize_event("response.text.delta", {"text": "Let me check..."}, seen_tool_result=False):
ct3 = _apply_normalized_event(r3, e["event"], e["data"], ct3)
for e in normalize_event("response.tool_use", {"name": "query_db"}, seen_tool_result=False):
ct3 = _apply_normalized_event(r3, e["event"], e["data"], ct3)
seen = True
tool_result_data = {"content": [{"json": {"sql": "SELECT 1", "result_set": {"data": [[1]], "columns": ["val"]}}}]}
for e in normalize_event("response.tool_result", tool_result_data, seen_tool_result=seen):
ct3 = _apply_normalized_event(r3, e["event"], e["data"], ct3)
for e in normalize_event("response.text.delta", {"text": "The result is 1."}, seen_tool_result=True):
ct3 = _apply_normalized_event(r3, e["event"], e["data"], ct3)
_finalize_agent_result(r3, ct3)
assert r3.answer == "The result is 1.", f"post-tool text should be answer, got {r3.answer!r}"
assert "Let me check..." in r3.thinking[0]
assert len(r3.sql_queries) == 1
assert len(r3.result_sets) == 1
assert r3.tools_used == ["query db"]
print("tool-using answer OK")
# 7. Error run: finalize should NOT promote thinking to answer
r4 = AgentResult()
ct4 = ""
for e in normalize_event("response.text.delta", {"text": "Starting..."}, seen_tool_result=False):
ct4 = _apply_normalized_event(r4, e["event"], e["data"], ct4)
for e in normalize_event("response.error", {"message": "something broke"}, seen_tool_result=False):
ct4 = _apply_normalized_event(r4, e["event"], e["data"], ct4)
_finalize_agent_result(r4, ct4)
assert r4.answer == "", "error run should not promote thinking to answer"
assert r4.errors == ["something broke"]
print("error run OK")
# 8. build_agent_messages constructs correct payload
msgs = build_agent_messages("hello", [{"role": "user", "content": "prior"}, {"role": "assistant", "content": "yes"}])
assert len(msgs) == 3
assert msgs[0]["role"] == "user"
assert msgs[0]["content"][0]["text"] == "prior"
assert msgs[2]["content"][0]["text"] == "hello"
print("build_agent_messages OK")
# ── build_agent_run_payload mode tests ──
# 9. Local-history mode
p1 = build_agent_run_payload("hello", [{"role": "user", "content": "prior"}])
assert "thread_id" not in p1
assert len(p1["messages"]) == 2
# 10. Thread mode — no history sent upstream
p2 = build_agent_run_payload("hello", thread_id="t-123")
assert p2["thread_id"] == "t-123"
assert p2["parent_message_id"] == "0"
assert len(p2["messages"]) == 1
# 11. Thread mode with explicit parent
p3 = build_agent_run_payload("hello", thread_id="t-123", parent_message_id="msg-5")
assert p3["parent_message_id"] == "msg-5"
# 12. history + thread_id — thread mode wins, history ignored upstream
p4 = build_agent_run_payload("hello", [{"role": "user", "content": "prior"}], thread_id="t-123")
assert len(p4["messages"]) == 1
assert p4["thread_id"] == "t-123"
print("build_agent_run_payload OK")
# ── Metadata event tests ──
# 13. normalize_event coerces message_id to string
evts = normalize_event("metadata", {"metadata": {"message_id": 12345}})
assert evts[0]["data"]["message_id"] == "12345"
assert evts[0]["event"] == "metadata"
# 14. normalize_event passes through string message_id
evts2 = normalize_event("metadata", {"metadata": {"message_id": "abc"}})
assert evts2[0]["data"]["message_id"] == "abc"
# 15. _apply stores metadata in thread_metadata
rm = AgentResult()
_apply_normalized_event(rm, "metadata", {"message_id": "42", "role": "assistant"}, "")
assert rm.thread_metadata == {"message_id": "42", "role": "assistant"}
# 16. Multiple metadata events merge
_apply_normalized_event(rm, "metadata", {"thread_id": "t-1"}, "")
assert rm.thread_metadata == {"message_id": "42", "role": "assistant", "thread_id": "t-1"}
print("metadata tests OK")
# ── AgentChat thread-mode tests (synthetic, no network) ──
def _fake_stream(agent_name, question, history=None, session=None,
thread_id=None, parent_message_id=None):
"""Fake stream_agent_sse that yields synthetic raw SSE events."""
yield {"event": "response.text.delta", "data": {"text": f"Answer to: {question}"}}
yield {"event": "metadata", "data": {"metadata": {"message_id": f"msg-{abs(hash(question)) % 1000}"}}}
yield {"event": "done", "data": {}}
_real_stream = stream_agent_sse
try:
globals()["stream_agent_sse"] = _fake_stream
import mcp_ski_resort.core as _cm
_cm.stream_agent_sse = _fake_stream
chat = AgentChat("test_agent", verbose=False, thread_id="t-abc")
# 17. repr shows thread mode
assert "thread=t-abc" in repr(chat)
# 18. First turn — parent starts as None
r1 = chat.ask("question one")
assert r1.answer == "Answer to: question one"
assert r1.thread_metadata.get("message_id") is not None
first_msg_id = r1.thread_metadata["message_id"]
assert chat._parent_message_id == first_msg_id
# 19. Local transcript grows
assert len(chat.history) == 2
assert chat.history[0] == {"role": "user", "content": "question one"}
assert chat.history[1]["role"] == "assistant"
# 20. Second turn — parent_message_id advances
r2 = chat.ask("question two")
second_msg_id = r2.thread_metadata["message_id"]
assert chat._parent_message_id == second_msg_id
assert first_msg_id != second_msg_id
assert len(chat.history) == 4
assert len(chat.results) == 2
# 21. reset() clears transcript but keeps thread_id
chat.reset()
assert chat.thread_id == "t-abc"
assert chat._parent_message_id is None
assert len(chat.history) == 0
assert len(chat.results) == 0
# 22. last property
assert chat.last is None
r3 = chat.ask("after reset")
assert chat.last is r3
print("AgentChat thread-mode tests OK")
finally:
globals()["stream_agent_sse"] = _real_stream
_cm.stream_agent_sse = _real_stream
print("\nAll tests passed")All _apply_normalized_event tests passed
tool-less direct answer OK
tool-using answer OK
error run OK
build_agent_messages OK
build_agent_run_payload OK
metadata tests OK
AgentChat thread-mode tests OK
All tests passed