Claude Code 源码解析
返回目录
核心引擎04

查询循环

AsyncGenerator 驱动的心脏

核心洞察

AsyncGenerator 让流式事件、工具调用、错误恢复自然串联为一个无限循环

学习

概念讲解与核心设计分析

查询循环概述

查询循环是 Claude Code 的核心执行引擎——一个精密的 AsyncGenerator 状态机,编排着从 API 调用到工具执行的完整生命周期。每次迭代包含 10 个精心排列的步骤,处理上下文管理、错误恢复和停止条件判断。

AsyncGenerator 模式

query() 是查询循环的外层包装函数,它返回一个 AsyncGenerator,yield 出三种事件类型:

  • StreamEvent — 流式 token 事件,用于实时渲染
  • RequestStartEvent — API 请求开始事件,用于 UI 加载状态
  • Message — 完整的助理消息或工具结果消息

query() 内部调用 queryLoop(),后者是真正的状态机实现。

queryLoop() 内部状态

queryLoop() 维护一个 State 对象,包含以下关键字段:

  • messages — 当前对话的完整消息历史
  • toolUseContext — 工具执行上下文,包含当前工具调用的元数据
  • autoCompactTracking — 自动压缩追踪,记录 token 增长趋势以决定何时触发压缩
  • maxOutputTokensRecoveryCount — 最大输出 token 恢复计数器,上限为 3 次。当 API 返回 max_output_tokens 停止原因时,系统最多重试 3 次
  • turnCount — 当前轮次计数,用于限制最大对话轮数
  • transition — 状态转换标志,控制循环的继续/终止
  • stopHookActive — 停止钩子激活标志,当 post-sampling 钩子返回停止信号时设为 true

10 步迭代管道

每次循环迭代包含以下 10 个步骤,顺序经过精心设计:

  1. Skill 发现预取 — 异步预取可能用到的 Skill 定义,与后续步骤并行
  2. 工具结果预算applyToolResultBudget() 对过长的工具输出进行截断,防止 token 爆炸
  3. Snip 压缩 — 对历史消息中的大文本块进行"剪切",用摘要替换全文
  4. Micro-compact — 微压缩,移除消息中的冗余空白和格式
  5. 上下文折叠 — 将老旧的对话轮次折叠为摘要,减少 token 消耗
  6. 自动压缩 — 当 token 使用量接近上下文窗口限制时,触发激进的上下文压缩
  7. 系统提示词组装 — 重新组装系统提示词(可能包含新发现的 Skill 等动态内容)
  8. API 流式调用 — 向 Claude API 发送请求并处理流式响应
  9. 工具执行 — 解析 Claude 返回的工具调用,执行工具并收集结果
  10. Post-sampling 钩子 — 执行用户定义的后采样钩子(如代码审查、安全检查)

错误恢复机制

查询循环内置了多种错误恢复策略:

  • max_output_tokens 恢复(最多 3 次)— 当 Claude 的输出被截断时,系统会自动发送"继续"消息,让 Claude 从断点继续。maxOutputTokensRecoveryCount 限制为 3 次,防止无限循环
  • 响应式压缩 — 当 API 返回 prompt_too_long 错误时,立即触发上下文压缩并重试
  • prompt_too_long 处理 — 如果压缩后仍然超长,系统会尝试更激进的压缩策略(移除中间轮次,仅保留首尾)

"思考规则"(Rules of Thinking)

当 Claude 返回 thinking blocks(扩展思考块)时,查询循环会应用特殊的处理规则:

  • thinking blocks 不计入 messages 历史(避免 token 浪费)
  • thinking 内容会被记录到 transcript 但不会发送回 API
  • 连续的 thinking blocks 会被合并以减少消息碎片

停止条件

查询循环在以下条件下终止:

  • 最大轮数turnCount 达到配置的 maxTurns
  • Token 预算 — 累计消耗超过配置的 budget 限制
  • 停止钩子 — post-sampling 钩子返回 { stop: true }
  • 用户取消 — AbortController 被触发
  • Claude 主动结束 — 返回 end_turn 停止原因且无工具调用

架构

模块关系与设计决策

查询循环架构

状态机概览

State 字段类型初始值更新时机
messagesMessage[]传入的历史每轮 API 响应后
toolUseContextToolUseContext{}工具执行前后
autoCompactTrackingCompactTracker{ tokenHistory: [] }每轮 API 调用后
maxOutputTokensRecoveryCountnumber0max_output_tokens 恢复时 +1
turnCountnumber0每轮 +1
transition'continue' | 'stop''continue'停止条件检查后
stopHookActivebooleanfalsepost-sampling 钩子返回后

10 步管道流程图

每次迭代的 10 个步骤形成一个管道:

  1. prefetchSkills() → 异步预取(不阻塞后续步骤)
  2. applyToolResultBudget() → 截断过长的工具输出
  3. snipCompact() → 剪切大文本块
  4. microCompact() → 移除冗余空白
  5. contextCollapse() → 折叠老旧轮次
  6. autoCompact() → 基于 token 增长趋势的自动压缩
  7. assembleSystemPrompt() → 组装系统提示词
  8. streamApiCall() → API 流式调用
  9. executeTools() → 工具执行
  10. runPostSamplingHooks() → 后采样钩子

错误恢复流程

  • max_output_tokens:API 响应 → 检查 stop_reason → recoveryCount < 3 → 发送 "continue" → 重试
  • prompt_too_long:API 错误 → 触发 reactiveCompact() → 重试 → 仍然失败 → aggressiveCompact() → 重试
  • 网络错误:由 API 客户端层处理(见第 5 章),查询循环只处理语义级错误

设计决策

为什么 10 步管道的顺序如此重要?

Skill 预取放在最前面是因为它是异步的,可以与后续步骤并行。上下文压缩步骤(2-6)必须在 API 调用(8)之前执行,否则可能因 token 超限而失败。工具执行(9)必须在 API 响应(8)之后,因为需要解析 Claude 的工具调用指令。这个顺序是经过多次迭代优化的结果。

源码

3 个关键代码示例

01
query() 外层包装与 queryLoop() 状态机
TypeScript
// query.ts — 简化版
export async function* query(
  opts: QueryOptions
): AsyncGenerator<StreamEvent | RequestStartEvent | Message> {
  // 外层 query() 是一个薄包装
  yield* queryLoop({
    messages: opts.messages,
    systemPrompt: opts.systemPrompt,
    model: opts.model,
    tools: opts.tools,
    canUseTool: opts.canUseTool,
    abortSignal: opts.abortSignal,
    usage: opts.usage,
  })
}

// queryLoop 内部状态类型
interface State {
  messages: Message[]
  toolUseContext: ToolUseContext
  autoCompactTracking: {
    tokenHistory: number[]
    lastCompactAt: number
  }
  maxOutputTokensRecoveryCount: number  // 上限 3
  turnCount: number
  transition: 'continue' | 'stop'
  stopHookActive: boolean
}

async function* queryLoop(
  opts: QueryLoopOptions
): AsyncGenerator<StreamEvent | RequestStartEvent | Message> {
  // 初始化状态
  const state: State = {
    messages: opts.messages,
    toolUseContext: {},
    autoCompactTracking: { tokenHistory: [], lastCompactAt: 0 },
    maxOutputTokensRecoveryCount: 0,
    turnCount: 0,
    transition: 'continue',
    stopHookActive: false,
  }

  // 主循环
  while (state.transition === 'continue') {
    state.turnCount++

    // 检查停止条件
    if (state.turnCount > opts.maxTurns) {
      state.transition = 'stop'
      break
    }

    // 执行 10 步管道(见下一个示例)
    yield* executeIterationPipeline(state, opts)
  }
}
02
10 步迭代管道
TypeScript
// query/pipeline.ts — 简化版
async function* executeIterationPipeline(
  state: State,
  opts: QueryLoopOptions
): AsyncGenerator<StreamEvent | RequestStartEvent | Message> {

  // === 步骤 1: Skill 发现预取(异步,不阻塞) ===
  const skillPrefetch = prefetchSkillDiscovery(state.messages)

  // === 步骤 2: 工具结果预算 ===
  applyToolResultBudget(state.messages, {
    maxToolResultTokens: 50_000,  // 单个工具输出上限 50K tokens
  })

  // === 步骤 3: Snip 压缩 ===
  snipCompact(state.messages)  // 将大文本块替换为摘要

  // === 步骤 4: Micro-compact ===
  microCompact(state.messages)  // 移除冗余空白和格式

  // === 步骤 5: 上下文折叠 ===
  contextCollapse(state.messages, {
    preserveRecentTurns: 4,  // 保留最近 4 轮
  })

  // === 步骤 6: 自动压缩(基于 token 增长趋势) ===
  const compacted = await autoCompact(state.messages, state.autoCompactTracking)
  if (compacted) {
    state.autoCompactTracking.lastCompactAt = state.turnCount
  }

  // === 步骤 7: 系统提示词组装 ===
  await skillPrefetch  // 等待 Skill 预取完成
  const systemPrompt = assembleSystemPrompt(opts, state)

  // === 步骤 8: API 流式调用 ===
  yield { type: 'requestStart' } as RequestStartEvent
  const response = await streamApiCall({
    model: opts.model,
    system: systemPrompt,
    messages: state.messages,
    tools: opts.tools,
    abortSignal: opts.abortSignal,
  })

  // yield 流式事件
  for await (const event of response.stream) {
    yield event as StreamEvent
  }
  const assistantMessage = response.finalMessage
  yield assistantMessage

  // === 步骤 9: 工具执行 ===
  const toolCalls = extractToolCalls(assistantMessage)
  if (toolCalls.length > 0) {
    const toolResults = await executeTools(toolCalls, opts.canUseTool)
    state.messages.push(...toolResults)
    for (const result of toolResults) yield result
  } else {
    // 无工具调用 → Claude 主动结束
    state.transition = 'stop'
  }

  // === 步骤 10: Post-sampling 钩子 ===
  const hookResult = await runPostSamplingHooks(assistantMessage)
  if (hookResult?.stop) {
    state.stopHookActive = true
    state.transition = 'stop'
  }

  // 错误恢复:max_output_tokens
  if (response.stopReason === 'max_output_tokens') {
    if (state.maxOutputTokensRecoveryCount < 3) {
      state.maxOutputTokensRecoveryCount++
      state.messages.push({ role: 'user', content: 'Continue.' })
      // transition 保持 'continue',进入下一轮
    } else {
      state.transition = 'stop'  // 超过 3 次重试上限
    }
  }

  // 更新 token 追踪
  state.autoCompactTracking.tokenHistory.push(
    response.usage.inputTokens
  )
}
03
错误恢复策略
TypeScript
// query/errorRecovery.ts — 简化版

// max_output_tokens 恢复
function handleMaxOutputTokens(
  state: State,
  response: ApiResponse
): void {
  if (response.stopReason !== 'max_output_tokens') return

  if (state.maxOutputTokensRecoveryCount < 3) {
    // 还有重试机会
    state.maxOutputTokensRecoveryCount++
    state.messages.push({
      role: 'user',
      content: 'Your response was cut off. Please continue from where you stopped.',
    })
    // state.transition 保持 'continue'
  } else {
    // 已重试 3 次,放弃
    state.transition = 'stop'
  }
}

// prompt_too_long 响应式压缩
async function handlePromptTooLong(
  state: State,
  error: ApiError
): Promise<boolean> {
  // 第一次尝试:温和压缩
  const gentleResult = await reactiveCompact(state.messages, {
    strategy: 'preserve-recent',
    targetReduction: 0.3,  // 减少 30% token
  })

  if (gentleResult.success) return true  // 重试

  // 第二次尝试:激进压缩
  const aggressiveResult = await aggressiveCompact(state.messages, {
    strategy: 'keep-first-last',  // 仅保留首尾消息
    targetReduction: 0.6,  // 减少 60% token
  })

  return aggressiveResult.success  // 成功则重试,否则放弃
}

// 停止条件检查
function checkStopConditions(state: State, opts: QueryLoopOptions): void {
  // 最大轮数
  if (state.turnCount >= opts.maxTurns) {
    state.transition = 'stop'
    return
  }

  // Token 预算
  if (opts.usage.totalTokens >= opts.budget) {
    state.transition = 'stop'
    return
  }

  // 停止钩子
  if (state.stopHookActive) {
    state.transition = 'stop'
    return
  }
}

互动

步进式流程演示

互动演示

查询循环互动解析

第 1 步:理解 10 步管道

想象你正在和 Claude 进行一个长对话(已经 50 轮),然后发送了新消息。此时 10 步管道是如何保护系统不崩溃的?

  • 步骤 2-6(压缩步骤)会逐层削减 token 消耗:先截断过长的工具输出,再压缩旧消息,最后在必要时折叠历史轮次
  • 步骤 6(自动压缩)会检查 token 增长趋势——如果最近几轮 token 增长速度过快,会提前触发压缩,而不是等到 API 报错

第 2 步:max_output_tokens 的 3 次重试

当 Claude 写一段很长的代码,输出被截断时:

  1. 第 1 次:系统发送"继续",Claude 从断点继续
  2. 第 2 次:如果又被截断,再次发送"继续"
  3. 第 3 次:最后一次机会
  4. 第 4 次截断:系统放弃,将已有内容返回给用户

为什么限制 3 次?因为无限重试可能导致费用失控,而且 3 次恢复后输出已经足够长(约 16K * 4 = 64K tokens),几乎覆盖所有实际场景。

第 3 步:压缩策略的层次

查询循环的 5 种压缩策略形成一个渐进的"防线":

  1. 工具结果预算 — 最温和,只截断单个工具的过长输出
  2. Snip 压缩 — 将大文本块替换为摘要标记
  3. Micro-compact — 移除冗余空白,最小化改变
  4. 上下文折叠 — 折叠旧轮次,保留最近 4 轮
  5. 自动压缩 — 最激进,可能重写整个对话历史的摘要

第 4 步:停止条件的安全网

查询循环有 5 种停止条件,形成多层安全网:

  • 用户取消(Ctrl+C)— 最即时的停止方式
  • Claude 主动结束 — 正常情况下的停止
  • 最大轮数 — 防止无限循环
  • Token 预算 — 防止费用失控
  • 停止钩子 — 外部系统的停止信号(如 CI 超时)

关键设计洞察

  • 渐进式压缩:从温和到激进的 5 层压缩策略,确保在最小改变的前提下保持 token 可控
  • 有限重试:max_output_tokens 的 3 次重试上限,平衡了完整性和成本
  • Skill 预取并行:步骤 1 的异步预取与步骤 2-6 并行,避免阻塞管道
  • 状态机清晰:State 类型明确定义了所有可变状态,transition 字段统一控制流程转换

相关源文件

query.tsquery/