
A deep dive into the async generator-based agent harness that powers Kopilot — routing, tool execution, streaming, and human-in-the-loop approval.
Kopilot is the AI assistant inside Auxx.ai. It can search your tickets, draft replies, tag conversations, look up Shopify orders — basically anything a support agent would do, but triggered by natural language.
The interesting part isn't the prompts. It's the execution harness — the engine that takes a user message, figures out what to do with it, runs the right agents in the right order, calls tools, streams everything back in real-time, and pauses when it needs your approval before doing something irreversible.
This post walks through how that harness works. Real code, real decisions, real trade-offs.
When a user sends a message, here's what happens:
User message
→ SSE endpoint (auth + feature gate)
→ AgentEngine
→ Context Manager (compress old messages if over token budget)
→ Supervisor Agent (classify intent → pick a route)
→ Route Pipeline (ordered sequence of agents)
→ Planner (multi-step only — break into plan steps)
→ Executor (agentic tool loop, up to 10 iterations)
→ Responder (synthesize results into rich blocks)
→ SSE events → Frontend store → UI
→ Persist: messages + domainState → database (JSONB)
The first key decision: the engine is domain-agnostic. Kopilot is one "domain config" plugged into a generic agent framework. The same engine powers our builder agent too. A domain config defines agents, routes, and initial state — the framework handles orchestration.
interface AgentDomainConfig<TDomainState> {
type: AgentSessionType
agents: Record<string, AgentDefinition<TDomainState>>
routes: Route[]
supervisorAgent: string
createInitialState(context): TDomainState
defaultModel: string
defaultProvider: string
}
Kopilot provides its own config with four agents and five routes. A different product could plug in completely different agents with different tools and the engine wouldn't care.
Every message gets classified before anything runs. The supervisor agent looks at the user's message, the conversation history, and the available tools, then outputs a structured JSON classification — not free text, a strict JSON schema response:
{
route: 'search' | 'action' | 'multi-step' | 'simple' | 'conversational',
reasoning: string
}
Each route defines which agents run in sequence:
| Route | Pipeline | When |
|---|---|---|
simple | supervisor → responder | Quick answers, no tools needed |
search | supervisor → executor → responder | Finding data (records, threads) |
multi-step | supervisor → planner → executor → resp. | Complex requests requiring a plan |
action | supervisor → executor → responder | Single action (tag, assign, send reply) |
conversational | supervisor → responder | Follow-ups, clarifications |
Why this matters: you don't want to spin up a planner for "how many open tickets do I have?" Routes keep things fast and cheap. A simple question skips the executor entirely and goes straight to the responder. A complex request like "find all unresolved tickets about shipping delays and draft a bulk reply" goes through the full planner → executor → responder pipeline.
routes: [
{ name: 'simple', agents: ['supervisor', 'responder'] },
{ name: 'search', agents: ['supervisor', 'executor', 'responder'] },
{ name: 'multi-step', agents: ['supervisor', 'planner', 'executor', 'responder'] },
{ name: 'action', agents: ['supervisor', 'executor', 'responder'] },
{ name: 'conversational', agents: ['supervisor', 'responder'] },
]
This is the core of the whole system. The agentQueryLoop function runs a single agent to completion. It's an async generator that yields events as things happen.
Here's the simplified flow:
Loop (up to maxIterations):
1. buildMessages(state) → construct the prompt
2. callModel(messages, tools) → stream the LLM response
3. Collect content + tool calls
4. No tool calls?
→ Check minToolCalls threshold → nudge and retry if not met
→ processResult() → update state → break
5. Tool calls?
→ Check requiresApproval → pause if needed
→ Execute tools
→ Append results to state
→ Continue loop
The actual implementation is a while loop with an iteration counter:
export async function* agentQueryLoop(
agent: AgentDefinition,
state: AgentState,
config: AgentEngineConfig
): AsyncGenerator<AgentEvent, AgentState> {
const maxIterations = agent.maxIterations ?? 10
let currentState = state
let iteration = 0
let totalToolCallCount = 0
yield { type: 'agent-started', agent: agent.name }
while (iteration < maxIterations) {
if (config.signal?.aborted) break
iteration++
// Build the prompt from current state
const messages = await agent.buildMessages(currentState, deps)
// Stream the LLM response
let content = ''
let toolCalls: ToolCall[] = []
for await (const event of config.callModel(callParams)) {
switch (event.type) {
case 'text-delta':
yield { type: 'llm-stream', agent: agent.name, delta: event.delta }
break
case 'done':
content = event.content
toolCalls = event.toolCalls
break
}
}
// No tool calls — agent is done
if (toolCalls.length === 0) {
currentState = await agent.processResult(content, toolCalls, currentState, deps)
break
}
// Execute tool calls and append results
const toolResults = await executeToolCalls(toolCalls, agent.tools, ...)
for (const event of toolResults.events) yield event
currentState = {
...currentState,
messages: [...currentState.messages, assistantMessage, ...toolResultMessages],
}
// Let the agent process intermediate results
currentState = await agent.processResult(content, toolCalls, currentState, deps)
}
yield { type: 'agent-completed', agent: agent.name }
return currentState
}
A few things worth calling out:
The maxIterations cap is a safety valve. Default is 10. Without it, a confused model could loop forever calling tools that don't help. The total iteration count across all agents in a pipeline is also capped at 50.
The minToolCalls nudge handles a specific failure mode: the executor is supposed to call tools, but sometimes the LLM returns text instead of a tool call. When that happens, we inject a synthetic message telling it to use its tools and retry:
if (totalToolCallCount < minToolCalls && iteration < maxIterations) {
currentState = {
...currentState,
messages: [
...currentState.messages,
{ role: 'assistant', content },
{
role: 'user',
content: 'You must use tools to complete this task. Do not write the result as text — call the appropriate tool now.',
metadata: { synthetic: true },
},
],
}
continue
}
The return type matters. The function signature is AsyncGenerator<AgentEvent, AgentState> — it yields events and returns the final state. The engine captures this via manual iteration:
private async *runAgentAndUpdateState(agent, config) {
const gen = agentQueryLoop(agent, this.state, config)
while (true) {
const { value, done } = await gen.next()
if (done) {
if (value) this.state = value as AgentState
break
}
yield value as AgentEvent
}
}
This is one of the subtler parts. In JavaScript, for await...of discards the return value of a generator. We need that return value (it's the updated state), so the engine manually calls .next() until done is true.
The entire pipeline is built on async generators. The engine is a generator. Each agent is a generator. The LLM adapter is a generator. They compose naturally:
// Engine yields pipeline events
async *submitMessage(userMessage): AsyncGenerator<AgentEvent> {
yield* this.runPipeline(config)
}
// Pipeline yields agent events
private async *runPipeline(config): AsyncGenerator<AgentEvent> {
yield* this.runAgentAndUpdateState(supervisor, config)
yield* this.executeRoute(route, config)
}
// Route yields events from each agent in sequence
private async *executeRoute(route, config): AsyncGenerator<AgentEvent> {
for (const agentName of route.agents) {
yield* this.runAgentAndUpdateState(agent, config)
if (this.state.waitingForApproval) break
}
}
Why generators instead of callbacks or event emitters?
yield* delegates to a sub-generator seamlessly. The engine doesn't need to know the internal structure of each agent.AbortController signal threads through the whole pipeline. When the user clicks stop, the engine calls interrupt(), the signal fires, and every layer checks it on the next iteration.The SSE endpoint on the server side just iterates the engine and writes each event:
for await (const event of engine.submitMessage(message, context)) {
writer.write(encoder.encode(`data: ${JSON.stringify(event)}\n\n`))
if (event.type === 'done') break
}
The frontend Zustand store processes these events as they arrive — updating thinking steps, streaming text, showing tool progress, and rendering the final response.
An AI assistant that can send emails needs guardrails. Some tools are marked requiresApproval:
{
name: 'send_reply',
description: 'Send a reply to a support ticket',
parameters: { ... },
execute: async (args, deps) => { ... },
requiresApproval: true
}
When the executor hits one of these during the query loop, the pipeline pauses:
const approvalTool = findApprovalTool(toolCalls, agent.tools)
if (approvalTool) {
yield {
type: 'approval-required',
agent: agent.name,
tool: approvalTool.function.name,
toolCallId: approvalTool.id,
args: approvalArgs,
}
currentState = {
...currentState,
waitingForApproval: true,
pendingToolCall: {
toolCallId: approvalTool.id,
toolName: approvalTool.function.name,
agentName: agent.name,
args: approvalArgs,
},
}
break
}
The state gets persisted with the pending tool call. The frontend renders an approval card showing what the agent wants to do. The user can approve, reject, or — this is the nice part — amend the input before approving. Want to change the reply draft before it sends? Edit it right in the approval card.
When the user responds, the engine's resume() method picks up where it left off:
async *resume(opts: ResumeOptions): AsyncGenerator<AgentEvent> {
if (opts.action === 'reject') {
// Replace the pending result with a rejection
// Run remaining agents (responder) for an acknowledgement
yield* this.runRemainingAgents(route, pending.agentName)
return
}
// Merge any input amendments
const finalArgs = opts.inputAmendment
? { ...pending.args, ...opts.inputAmendment }
: pending.args
// Execute the tool directly — no re-running the LLM
const result = await tool.execute(finalArgs, deps)
// Continue the pipeline (run responder)
yield* this.runRemainingAgents(route, pending.agentName)
}
On approval, the tool executes immediately with no LLM re-call. The engine then runs the remaining agents in the route (usually just the responder) so the user gets a summary of what happened.
One detail that took some debugging: when a tool requires approval, we still append the assistant's tool-call message and a placeholder tool result to the state before pausing. This keeps the message history valid. On resume, we replace the placeholder with the real result (or a rejection marker). If we didn't do this, the message history would be malformed — an assistant message with tool calls but no corresponding tool results.
Long conversations blow up context windows. The context manager handles this with a simple strategy:
[system, summary, recent]// Split: system | middle (to summarize) | recent (to keep)
const systemMessages = messages[0]?.role === 'system' ? [messages[0]] : []
const middleMessages = messages.slice(startIdx, recentStartIdx)
const recentMessages = messages.slice(recentStartIdx)
const summary = await summarizeMessages(middleMessages, config)
return [...systemMessages, summaryMessage, ...recentMessages]
Token estimation uses a rough 4 chars/token heuristic. It's not precise, but it doesn't need to be — the goal is staying under budget, not counting exact tokens.
One edge case worth mentioning: the manager never splits in the middle of a tool call chain. OpenAI's API requires every tool result message to follow the assistant message that made the tool call. If the split point lands on a tool message, it walks back until it hits a safe boundary.
There's also a step that strips stale reasoning_content from older messages. Models like DeepSeek emit chain-of-thought reasoning that's useful for the current turn but just wastes tokens on subsequent turns. We keep only the most recent assistant's reasoning and drop the rest.
The AgentState object is the single source of truth during pipeline execution:
interface AgentState<TDomainState> {
messages: SessionMessage[]
domainState: TDomainState
currentRoute?: string
waitingForApproval?: boolean
pendingToolCall?: PendingToolCall
}
After the pipeline completes (or pauses for approval), the full state gets serialized and stored as JSONB in a single AiAgentSession row. Messages, domain state, pending approvals — all of it.
We chose JSONB over normalized rows because conversations are always read and written as a unit. You never query "find all messages where content contains X" in production. You load a session, run the pipeline, and save the updated session. JSONB makes that a single read and a single write. It also means schema changes to message format don't require migrations — the JSON structure is versioned implicitly.
A few things we'd reconsider if starting over:
Token counting. The 4 chars/token heuristic works but it's imprecise. A proper tokenizer (like tiktoken) would give us tighter context budget management. We went with the heuristic because it's fast and dependency-free, and being off by 10-20% hasn't caused real issues — but it's the kind of tech debt that'll matter more as we push context limits.
Evaluation framework. We test agents manually. An automated eval suite — golden conversations with expected tool calls and responses — would catch regressions faster. This is the biggest gap.
Parallel tool execution. Right now tool calls within a single iteration run sequentially. Most of the time there's only one or two, but for multi-tool iterations, parallel execution would cut latency. The code is structured to support it (tool calls are independent), we just haven't wired it up yet.
The agent harness is roughly 400 lines of TypeScript across three files: the engine, the query loop, and the context manager. It's not a framework you install — it's a pattern. Async generators for streaming, a supervisor for routing, a loop with safety valves for tool execution, and a pause/resume mechanism for human oversight.
If you want to dig into the code, the entry points are:
packages/lib/src/ai/agent-framework/engine.ts — the pipeline orchestratorpackages/lib/src/ai/agent-framework/query-loop.ts — the inner agent looppackages/lib/src/ai/agent-framework/types.ts — core typespackages/lib/src/ai/kopilot/domain-config.ts — Kopilot's route and agent wiringAuxx.ai is open source. PRs welcome.