Writing
Building Agents and Sessions in Genkit Python
Here's what that looks like in Genkit Python:
The problem with stateless LLM calls in production: every request starts cold. You have to re-send the entire conversation history on every turn. For simple chatbots this is fine. For agents that use tools, track progress across multiple user interactions, or need to recover from failures, you need something more structured.
Here’s what that looks like in Genkit Python:
# Define an agent with persistent memory
weather_agent = ai.define_agent(
name='weatherAgent',
model='googleai/gemini-2.0-flash',
system='You are a helpful weather assistant. Remember context across our conversation.',
tools=[get_weather],
store=InMemorySessionStore(), # <- persistent session storage
)
# Start a session and have a multi-turn conversation
conn = await weather_agent.stream_bidi()
await conn.send_text("What's the weather in Chicago?")
async for chunk in conn.receive():
if chunk.model_chunk and chunk.model_chunk.text:
print(chunk.model_chunk.text, end='', flush=True)
await conn.send_text("How about tomorrow?") # agent remembers the context
async for chunk in conn.receive():
if chunk.model_chunk and chunk.model_chunk.text:
print(chunk.model_chunk.text, end='', flush=True)
await conn.close()
output = await conn.output()
The agent remembers “Chicago” on the second turn without you re-sending it. Sessions handle the history. Let’s build up to this from first principles.
The Multi-Turn Problem
Without sessions, a multi-turn conversation looks like this:
# Pattern A: Manual history management
messages = []
response = await ai.generate(
prompt='What is the capital of France?',
messages=messages,
)
messages = response.messages # save conversation history
response = await ai.generate(
prompt='What language do they speak there?',
messages=messages, # re-send history on every turn
)
messages = response.messages
print(response.text) # "They speak French in Paris."
This works and is appropriate for simple chatbots. You accumulate response.messages after each turn, and pass them back on the next call. The agent knows “there” means Paris because the history is in messages.
The limitation: you own the history list. If the process restarts, the history is gone. If two concurrent requests arrive, you have to handle thread safety yourself. And there’s no way to resume an interrupted tool call.
Sessions solve all of this.
The Simple Session: run_with_session
For flows that need to read and write persistent state across calls, you can bind a session to a coroutine:
from genkit.agent import Session, InMemorySessionStore, run_with_session
# Create a session store (in-memory for dev; replace with a DB-backed store in prod)
store = InMemorySessionStore()
# Create a session
session = Session(store=store)
# Run code with the session bound to async context
async def my_flow():
# Access the session anywhere in this coroutine or its children
from genkit.agent import get_current_session
sess = get_current_session() # returns the bound Session
messages = await sess.get_messages()
# ... do work ...
await sess.add_messages(new_message)
await run_with_session(session, my_flow())
get_current_session() works like a context variable—it’s available anywhere inside run_with_session’s coroutine tree.
The Session class is thread-safe and supports:
get_messages()/add_messages()/set_messages()— conversation historyget_custom()/update_custom()— typed application stateget_artifacts()/add_artifacts()— named binary/text artifacts
Agents: the Full Sessions API
For production multi-turn agents, ai.define_agent() wires sessions, tool loops, and streaming together:
from genkit import Genkit
from genkit.plugins.google_genai import GoogleAI
from genkit.agent import InMemorySessionStore
from pydantic import BaseModel
ai = Genkit(
plugins=[GoogleAI()],
model='googleai/gemini-2.0-flash',
)
class WeatherInput(BaseModel):
city: str
@ai.tool()
async def get_weather(input: WeatherInput) -> str:
"""Return current weather for a city."""
# In production, call a real weather API
return f'Sunny and 72°F in {input.city}'
# Create a persistent session store
weather_store = InMemorySessionStore()
# Define the agent
weather_agent = ai.define_agent(
name='weatherAgent',
model='googleai/gemini-2.0-flash',
system='You are a weather assistant. Use get_weather for all weather queries.',
tools=[get_weather],
store=weather_store, # sessions persist here
)
define_agent returns an Agent object with one main method: stream_bidi().
The AgentConnection API
stream_bidi() starts a session and returns an AgentConnection:
# Start a new session
conn = await weather_agent.stream_bidi()
# Send a user message
await conn.send_text("What's the weather in Chicago?")
# Stream the response
async for chunk in conn.receive():
if chunk.model_chunk and chunk.model_chunk.text:
print(chunk.model_chunk.text, end='', flush=True)
print() # newline after streaming
# Continue the conversation — agent remembers Chicago
await conn.send_text("Is it warmer in Miami right now?")
async for chunk in conn.receive():
if chunk.model_chunk and chunk.model_chunk.text:
print(chunk.model_chunk.text, end='', flush=True)
print()
# Close and get the final output
await conn.close()
output = await conn.output()
The AgentConnection interface:
- Method: send_text(text) · Purpose: Send a user message
- Method: send(AgentInput(…)) · Purpose: Send a raw AgentInput (for resuming interrupts)
- Method: receive() · Purpose: Async iterator of AgentStreamChunk objects
- Method: close() · Purpose: Signal no more inputs
- Method: output() · Purpose: Await final AgentOutput
- Method: detach() · Purpose: Detach and let the server finish in background
Tool Use Inside Agents
Tools work inside agents exactly as they do in generate()—you register them with @ai.tool() and pass them to define_agent. The agent handles the tool loop automatically: model calls the tool, tool returns a result, model continues.
class StockInput(BaseModel):
ticker: str
class SearchInput(BaseModel):
query: str
@ai.tool()
async def get_stock_price(input: StockInput) -> str:
"""Get current stock price for a ticker."""
# Stub — in production call a real API
prices = {'AAPL': 195.42, 'GOOG': 172.81, 'MSFT': 421.00}
price = prices.get(input.ticker.upper(), 'unknown')
return f'{input.ticker.upper()}: ${price}'
@ai.tool()
async def search_news(input: SearchInput) -> str:
"""Search recent news for a query."""
return f'Top result for "{input.query}": Markets steady as tech sector leads gains.'
portfolio_store = InMemorySessionStore()
portfolio_agent = ai.define_agent(
name='portfolioAgent',
model='googleai/gemini-2.0-flash',
system="""You are a portfolio assistant.
Use get_stock_price and search_news to answer questions.
Track which tickers the user asks about across the conversation.""",
tools=[get_stock_price, search_news],
store=portfolio_store,
)
Usage:
conn = await portfolio_agent.stream_bidi()
await conn.send_text("What's Apple's stock price?")
async for chunk in conn.receive():
if chunk.model_chunk and chunk.model_chunk.text:
print(chunk.model_chunk.text, end='')
print()
# Agent remembers Apple
await conn.send_text("Any news about it?")
async for chunk in conn.receive():
if chunk.model_chunk and chunk.model_chunk.text:
print(chunk.model_chunk.text, end='')
print()
await conn.close()
Session Persistence: Resuming Across Requests
The real power of sessions is resuming a conversation after the process restarts—or across HTTP requests in a web server.
Every session gets a UUID. Pass session_id to AgentInit to resume an existing session:
from genkit._core._typing import AgentInit
import uuid
# --- First request (e.g. HTTP POST /chat/start) ---
session_id = str(uuid.uuid4())
conn = await weather_agent.stream_bidi(
init=AgentInit(session_id=session_id)
)
await conn.send_text("What's the weather in Seattle?")
async for chunk in conn.receive():
if chunk.model_chunk and chunk.model_chunk.text:
print(chunk.model_chunk.text, end='')
await conn.close()
await conn.output()
# --- Second request (e.g. HTTP POST /chat/continue) ---
# Process may have restarted; session_id is all you need
conn2 = await weather_agent.stream_bidi(
init=AgentInit(session_id=session_id) # resume same session
)
await conn2.send_text("How about Portland?") # agent still knows context
async for chunk in conn2.receive():
if chunk.model_chunk and chunk.model_chunk.text:
print(chunk.model_chunk.text, end='')
await conn2.close()
For this to survive process restarts, weather_store needs to be a persistent backend—not InMemorySessionStore. Implement the SessionStore protocol:
from genkit.agent import SessionStore
from genkit._core._typing import SessionSnapshot
from collections.abc import Callable
class RedisSessionStore:
"""Production session store backed by Redis."""
def __init__(self, redis_client):
self.redis = redis_client
async def get_snapshot(
self,
*,
snapshot_id: str | None = None,
session_id: str | None = None,
) -> SessionSnapshot | None:
key = f'genkit:snapshot:{snapshot_id or session_id}'
data = await self.redis.get(key)
if data is None:
return None
return SessionSnapshot.model_validate_json(data)
async def save_snapshot(
self,
snapshot_id: str | None,
fn: Callable[[SessionSnapshot | None], SessionSnapshot | None],
) -> SessionSnapshot | None:
# Read-modify-write — fn receives existing snapshot, returns new one
key = f'genkit:snapshot:{snapshot_id}'
existing_data = await self.redis.get(key)
existing = SessionSnapshot.model_validate_json(existing_data) if existing_data else None
new_snapshot = fn(existing)
if new_snapshot is None:
return None
await self.redis.set(key, new_snapshot.model_dump_json())
return new_snapshot
The SessionStore protocol has exactly two required methods. save_snapshot’s fn receives the existing snapshot (or None for a new session) and returns the snapshot to persist. Implement this with any database.
Complete Working Example: Multi-Turn Assistant with Tools and Session State
This is a full CLI chatbot that uses tools, maintains conversation history, and tracks custom state:
# src/assistant.py
import asyncio
import uuid
from pydantic import BaseModel
from genkit import Genkit
from genkit.plugins.google_genai import GoogleAI
from genkit.agent import InMemorySessionStore
from genkit._core._typing import AgentInit
ai = Genkit(
plugins=[GoogleAI()],
model='googleai/gemini-2.0-flash',
)
# ── Tools ───────────────────────────────────────────────────────────────────
class WeatherInput(BaseModel):
city: str
class CalcInput(BaseModel):
expression: str
@ai.tool()
async def get_weather(input: WeatherInput) -> str:
"""Get current weather for a city."""
data = {
'chicago': 'Partly cloudy, 68°F, humidity 55%',
'seattle': 'Overcast and rainy, 54°F',
'miami': 'Sunny, 87°F, humidity 78%',
}
return data.get(input.city.lower(), f'Weather data unavailable for {input.city}')
@ai.tool()
async def calculate(input: CalcInput) -> str:
"""Evaluate a simple math expression."""
try:
# In production: use a safe evaluator
result = eval(input.expression, {'__builtins__': {}}, {}) # noqa: S307
return str(result)
except Exception as e:
return f'Error: {e}'
# ── Agent ────────────────────────────────────────────────────────────────────
session_store = InMemorySessionStore()
assistant = ai.define_agent(
name='personalAssistant',
model='googleai/gemini-2.0-flash',
system="""You are a helpful personal assistant.
You have access to weather and calculator tools.
Remember context across our conversation.
Be concise — 1-2 sentences per response unless asked for more.""",
tools=[get_weather, calculate],
store=session_store,
)
# ── CLI ───────────────────────────────────────────────────────────────────────
async def chat():
session_id = str(uuid.uuid4())
print(f'Session: {session_id}')
print('Type "quit" to exit, "new" to start a fresh session.\n')
conn = await assistant.stream_bidi(init=AgentInit(session_id=session_id))
while True:
try:
user_input = input('You: ').strip()
except (EOFError, KeyboardInterrupt):
break
if not user_input:
continue
if user_input.lower() == 'quit':
break
if user_input.lower() == 'new':
await conn.close()
await conn.output()
session_id = str(uuid.uuid4())
conn = await assistant.stream_bidi(init=AgentInit(session_id=session_id))
print(f'New session: {session_id}')
continue
await conn.send_text(user_input)
print('Assistant: ', end='', flush=True)
async for chunk in conn.receive():
if chunk.model_chunk and chunk.model_chunk.text:
print(chunk.model_chunk.text, end='', flush=True)
print() # newline
await conn.close()
await conn.output()
print('Goodbye.')
if __name__ == '__main__':
ai.run_main(chat())
Sample conversation:
Session: f3a2c8e1-...
Type "quit" to exit, "new" to start a fresh session.
You: What's the weather in Chicago?
Assistant: It's partly cloudy and 68°F in Chicago with 55% humidity.
You: Is that warmer than Seattle?
Assistant: Yes — Chicago is 14°F warmer than Seattle right now, which is overcast and rainy at 54°F.
You: What's 68 - 54?
Assistant: That's 14.
You: quit
Goodbye.
The agent resolves “that” and “warmer than Seattle” correctly because the conversation history is in the session.
Session State Beyond Messages
Sessions aren’t just conversation history. You can store arbitrary typed state:
from typing import TypedDict
class AssistantState(TypedDict):
cities_asked: list[str]
calculation_count: int
# Inside a custom agent turn handler:
async def handle_turn(inp):
await sess.update_custom(lambda s: {
'cities_asked': (s or {}).get('cities_asked', []) + ['Chicago'],
'calculation_count': (s or {}).get('calculation_count', 0),
})
current = await sess.get_custom()
print(f"Cities asked so far: {current['cities_asked']}")
update_custom takes a function that receives the current state and returns the new state—atomically, under the session lock.
Key Architecture Points
InMemorySessionStore is for development only. State is lost on process exit. Use a database-backed store in production.
The agent handles the tool loop. When the model returns tool requests, define_agent automatically executes the tools and continues the conversation. You don’t write a while response.interrupts: loop for standard tool use.
stream_bidi() opens one connection per turn sequence. For a web server, open a connection at the start of an HTTP request, drive the full turn (including tool calls), and close when done.
Sessions are append-only by design. History grows; Genkit creates snapshot checkpoints. This is intentional—it enables reliable resumption and audit trails.
What’s Next
Article 3 covers what happens when generate() fails in production: retry middleware, fallback models, and how to gate tool execution behind human approval.