核心引擎第 04 章
查询循环
AsyncGenerator 驱动的心脏
核心洞察
AsyncGenerator 让流式事件、工具调用、错误恢复自然串联为一个无限循环
学习
概念讲解与核心设计分析
查询循环概述
查询循环是 Claude Code 的核心执行引擎——一个精密的 AsyncGenerator 状态机,编排着从 API 调用到工具执行的完整生命周期。每次迭代包含 10 个精心排列的步骤,处理上下文管理、错误恢复和停止条件判断。
AsyncGenerator 模式
query() 是查询循环的外层包装函数,它返回一个 AsyncGenerator,yield 出三种事件类型:
- StreamEvent — 流式 token 事件,用于实时渲染
- RequestStartEvent — API 请求开始事件,用于 UI 加载状态
- Message — 完整的助理消息或工具结果消息
query() 内部调用 queryLoop(),后者是真正的状态机实现。
queryLoop() 内部状态
queryLoop() 维护一个 State 对象,包含以下关键字段:
- messages — 当前对话的完整消息历史
- toolUseContext — 工具执行上下文,包含当前工具调用的元数据
- autoCompactTracking — 自动压缩追踪,记录 token 增长趋势以决定何时触发压缩
- maxOutputTokensRecoveryCount — 最大输出 token 恢复计数器,上限为 3 次。当 API 返回
max_output_tokens停止原因时,系统最多重试 3 次 - turnCount — 当前轮次计数,用于限制最大对话轮数
- transition — 状态转换标志,控制循环的继续/终止
- stopHookActive — 停止钩子激活标志,当 post-sampling 钩子返回停止信号时设为 true
10 步迭代管道
每次循环迭代包含以下 10 个步骤,顺序经过精心设计:
- Skill 发现预取 — 异步预取可能用到的 Skill 定义,与后续步骤并行
- 工具结果预算 —
applyToolResultBudget()对过长的工具输出进行截断,防止 token 爆炸 - Snip 压缩 — 对历史消息中的大文本块进行"剪切",用摘要替换全文
- Micro-compact — 微压缩,移除消息中的冗余空白和格式
- 上下文折叠 — 将老旧的对话轮次折叠为摘要,减少 token 消耗
- 自动压缩 — 当 token 使用量接近上下文窗口限制时,触发激进的上下文压缩
- 系统提示词组装 — 重新组装系统提示词(可能包含新发现的 Skill 等动态内容)
- API 流式调用 — 向 Claude API 发送请求并处理流式响应
- 工具执行 — 解析 Claude 返回的工具调用,执行工具并收集结果
- Post-sampling 钩子 — 执行用户定义的后采样钩子(如代码审查、安全检查)
错误恢复机制
查询循环内置了多种错误恢复策略:
- max_output_tokens 恢复(最多 3 次)— 当 Claude 的输出被截断时,系统会自动发送"继续"消息,让 Claude 从断点继续。
maxOutputTokensRecoveryCount限制为 3 次,防止无限循环 - 响应式压缩 — 当 API 返回
prompt_too_long错误时,立即触发上下文压缩并重试 - prompt_too_long 处理 — 如果压缩后仍然超长,系统会尝试更激进的压缩策略(移除中间轮次,仅保留首尾)
"思考规则"(Rules of Thinking)
当 Claude 返回 thinking blocks(扩展思考块)时,查询循环会应用特殊的处理规则:
- thinking blocks 不计入
messages历史(避免 token 浪费) - thinking 内容会被记录到 transcript 但不会发送回 API
- 连续的 thinking blocks 会被合并以减少消息碎片
停止条件
查询循环在以下条件下终止:
- 最大轮数 —
turnCount达到配置的maxTurns - Token 预算 — 累计消耗超过配置的
budget限制 - 停止钩子 — post-sampling 钩子返回
{ stop: true } - 用户取消 — AbortController 被触发
- Claude 主动结束 — 返回
end_turn停止原因且无工具调用
架构
模块关系与设计决策
查询循环架构
状态机概览
| State 字段 | 类型 | 初始值 | 更新时机 |
|---|---|---|---|
| messages | Message[] | 传入的历史 | 每轮 API 响应后 |
| toolUseContext | ToolUseContext | {} | 工具执行前后 |
| autoCompactTracking | CompactTracker | { tokenHistory: [] } | 每轮 API 调用后 |
| maxOutputTokensRecoveryCount | number | 0 | max_output_tokens 恢复时 +1 |
| turnCount | number | 0 | 每轮 +1 |
| transition | 'continue' | 'stop' | 'continue' | 停止条件检查后 |
| stopHookActive | boolean | false | post-sampling 钩子返回后 |
10 步管道流程图
每次迭代的 10 个步骤形成一个管道:
prefetchSkills()→ 异步预取(不阻塞后续步骤)applyToolResultBudget()→ 截断过长的工具输出snipCompact()→ 剪切大文本块microCompact()→ 移除冗余空白contextCollapse()→ 折叠老旧轮次autoCompact()→ 基于 token 增长趋势的自动压缩assembleSystemPrompt()→ 组装系统提示词streamApiCall()→ API 流式调用executeTools()→ 工具执行runPostSamplingHooks()→ 后采样钩子
错误恢复流程
- max_output_tokens:API 响应 → 检查 stop_reason → recoveryCount < 3 → 发送 "continue" → 重试
- prompt_too_long:API 错误 → 触发 reactiveCompact() → 重试 → 仍然失败 → aggressiveCompact() → 重试
- 网络错误:由 API 客户端层处理(见第 5 章),查询循环只处理语义级错误
设计决策
为什么 10 步管道的顺序如此重要?
Skill 预取放在最前面是因为它是异步的,可以与后续步骤并行。上下文压缩步骤(2-6)必须在 API 调用(8)之前执行,否则可能因 token 超限而失败。工具执行(9)必须在 API 响应(8)之后,因为需要解析 Claude 的工具调用指令。这个顺序是经过多次迭代优化的结果。
源码
共 3 个关键代码示例
01
query() 外层包装与 queryLoop() 状态机TypeScript
// query.ts — 简化版
export async function* query(
opts: QueryOptions
): AsyncGenerator<StreamEvent | RequestStartEvent | Message> {
// 外层 query() 是一个薄包装
yield* queryLoop({
messages: opts.messages,
systemPrompt: opts.systemPrompt,
model: opts.model,
tools: opts.tools,
canUseTool: opts.canUseTool,
abortSignal: opts.abortSignal,
usage: opts.usage,
})
}
// queryLoop 内部状态类型
interface State {
messages: Message[]
toolUseContext: ToolUseContext
autoCompactTracking: {
tokenHistory: number[]
lastCompactAt: number
}
maxOutputTokensRecoveryCount: number // 上限 3
turnCount: number
transition: 'continue' | 'stop'
stopHookActive: boolean
}
async function* queryLoop(
opts: QueryLoopOptions
): AsyncGenerator<StreamEvent | RequestStartEvent | Message> {
// 初始化状态
const state: State = {
messages: opts.messages,
toolUseContext: {},
autoCompactTracking: { tokenHistory: [], lastCompactAt: 0 },
maxOutputTokensRecoveryCount: 0,
turnCount: 0,
transition: 'continue',
stopHookActive: false,
}
// 主循环
while (state.transition === 'continue') {
state.turnCount++
// 检查停止条件
if (state.turnCount > opts.maxTurns) {
state.transition = 'stop'
break
}
// 执行 10 步管道(见下一个示例)
yield* executeIterationPipeline(state, opts)
}
}02
10 步迭代管道TypeScript
// query/pipeline.ts — 简化版
async function* executeIterationPipeline(
state: State,
opts: QueryLoopOptions
): AsyncGenerator<StreamEvent | RequestStartEvent | Message> {
// === 步骤 1: Skill 发现预取(异步,不阻塞) ===
const skillPrefetch = prefetchSkillDiscovery(state.messages)
// === 步骤 2: 工具结果预算 ===
applyToolResultBudget(state.messages, {
maxToolResultTokens: 50_000, // 单个工具输出上限 50K tokens
})
// === 步骤 3: Snip 压缩 ===
snipCompact(state.messages) // 将大文本块替换为摘要
// === 步骤 4: Micro-compact ===
microCompact(state.messages) // 移除冗余空白和格式
// === 步骤 5: 上下文折叠 ===
contextCollapse(state.messages, {
preserveRecentTurns: 4, // 保留最近 4 轮
})
// === 步骤 6: 自动压缩(基于 token 增长趋势) ===
const compacted = await autoCompact(state.messages, state.autoCompactTracking)
if (compacted) {
state.autoCompactTracking.lastCompactAt = state.turnCount
}
// === 步骤 7: 系统提示词组装 ===
await skillPrefetch // 等待 Skill 预取完成
const systemPrompt = assembleSystemPrompt(opts, state)
// === 步骤 8: API 流式调用 ===
yield { type: 'requestStart' } as RequestStartEvent
const response = await streamApiCall({
model: opts.model,
system: systemPrompt,
messages: state.messages,
tools: opts.tools,
abortSignal: opts.abortSignal,
})
// yield 流式事件
for await (const event of response.stream) {
yield event as StreamEvent
}
const assistantMessage = response.finalMessage
yield assistantMessage
// === 步骤 9: 工具执行 ===
const toolCalls = extractToolCalls(assistantMessage)
if (toolCalls.length > 0) {
const toolResults = await executeTools(toolCalls, opts.canUseTool)
state.messages.push(...toolResults)
for (const result of toolResults) yield result
} else {
// 无工具调用 → Claude 主动结束
state.transition = 'stop'
}
// === 步骤 10: Post-sampling 钩子 ===
const hookResult = await runPostSamplingHooks(assistantMessage)
if (hookResult?.stop) {
state.stopHookActive = true
state.transition = 'stop'
}
// 错误恢复:max_output_tokens
if (response.stopReason === 'max_output_tokens') {
if (state.maxOutputTokensRecoveryCount < 3) {
state.maxOutputTokensRecoveryCount++
state.messages.push({ role: 'user', content: 'Continue.' })
// transition 保持 'continue',进入下一轮
} else {
state.transition = 'stop' // 超过 3 次重试上限
}
}
// 更新 token 追踪
state.autoCompactTracking.tokenHistory.push(
response.usage.inputTokens
)
}03
错误恢复策略TypeScript
// query/errorRecovery.ts — 简化版
// max_output_tokens 恢复
function handleMaxOutputTokens(
state: State,
response: ApiResponse
): void {
if (response.stopReason !== 'max_output_tokens') return
if (state.maxOutputTokensRecoveryCount < 3) {
// 还有重试机会
state.maxOutputTokensRecoveryCount++
state.messages.push({
role: 'user',
content: 'Your response was cut off. Please continue from where you stopped.',
})
// state.transition 保持 'continue'
} else {
// 已重试 3 次,放弃
state.transition = 'stop'
}
}
// prompt_too_long 响应式压缩
async function handlePromptTooLong(
state: State,
error: ApiError
): Promise<boolean> {
// 第一次尝试:温和压缩
const gentleResult = await reactiveCompact(state.messages, {
strategy: 'preserve-recent',
targetReduction: 0.3, // 减少 30% token
})
if (gentleResult.success) return true // 重试
// 第二次尝试:激进压缩
const aggressiveResult = await aggressiveCompact(state.messages, {
strategy: 'keep-first-last', // 仅保留首尾消息
targetReduction: 0.6, // 减少 60% token
})
return aggressiveResult.success // 成功则重试,否则放弃
}
// 停止条件检查
function checkStopConditions(state: State, opts: QueryLoopOptions): void {
// 最大轮数
if (state.turnCount >= opts.maxTurns) {
state.transition = 'stop'
return
}
// Token 预算
if (opts.usage.totalTokens >= opts.budget) {
state.transition = 'stop'
return
}
// 停止钩子
if (state.stopHookActive) {
state.transition = 'stop'
return
}
}互动
步进式流程演示
互动演示
查询循环互动解析
第 1 步:理解 10 步管道
想象你正在和 Claude 进行一个长对话(已经 50 轮),然后发送了新消息。此时 10 步管道是如何保护系统不崩溃的?
- 步骤 2-6(压缩步骤)会逐层削减 token 消耗:先截断过长的工具输出,再压缩旧消息,最后在必要时折叠历史轮次
- 步骤 6(自动压缩)会检查 token 增长趋势——如果最近几轮 token 增长速度过快,会提前触发压缩,而不是等到 API 报错
第 2 步:max_output_tokens 的 3 次重试
当 Claude 写一段很长的代码,输出被截断时:
- 第 1 次:系统发送"继续",Claude 从断点继续
- 第 2 次:如果又被截断,再次发送"继续"
- 第 3 次:最后一次机会
- 第 4 次截断:系统放弃,将已有内容返回给用户
为什么限制 3 次?因为无限重试可能导致费用失控,而且 3 次恢复后输出已经足够长(约 16K * 4 = 64K tokens),几乎覆盖所有实际场景。
第 3 步:压缩策略的层次
查询循环的 5 种压缩策略形成一个渐进的"防线":
- 工具结果预算 — 最温和,只截断单个工具的过长输出
- Snip 压缩 — 将大文本块替换为摘要标记
- Micro-compact — 移除冗余空白,最小化改变
- 上下文折叠 — 折叠旧轮次,保留最近 4 轮
- 自动压缩 — 最激进,可能重写整个对话历史的摘要
第 4 步:停止条件的安全网
查询循环有 5 种停止条件,形成多层安全网:
- 用户取消(Ctrl+C)— 最即时的停止方式
- Claude 主动结束 — 正常情况下的停止
- 最大轮数 — 防止无限循环
- Token 预算 — 防止费用失控
- 停止钩子 — 外部系统的停止信号(如 CI 超时)
关键设计洞察
- 渐进式压缩:从温和到激进的 5 层压缩策略,确保在最小改变的前提下保持 token 可控
- 有限重试:max_output_tokens 的 3 次重试上限,平衡了完整性和成本
- Skill 预取并行:步骤 1 的异步预取与步骤 2-6 并行,避免阻塞管道
- 状态机清晰:State 类型明确定义了所有可变状态,transition 字段统一控制流程转换
相关源文件
query.tsquery/