自定义 Provider
扩展可通过 pi.registerProvider() 注册自定义模型 provider。这支持:
- 代理 - 通过企业代理或 API 网关路由请求
- 自定义端点 - 使用自托管或私有模型部署
- OAuth/SSO - 为企业 provider 添加认证流程
- 自定义 API - 为非标准 LLM API 实现流式传输
查看这些完整的 provider 示例:
import type { ExtensionAPI } from "@earendil-works/pi-coding-agent";
export default function (pi: ExtensionAPI) { // Override baseUrl for existing provider pi.registerProvider("anthropic", { baseUrl: "https://proxy.example.com" });
// Register new provider with models pi.registerProvider("my-provider", { name: "My Provider", baseUrl: "https://api.example.com", apiKey: "$MY_API_KEY", api: "openai-completions", models: [ { id: "my-model", name: "My Model", reasoning: false, input: ["text", "image"], cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, contextWindow: 128000, maxTokens: 4096 } ] });}扩展工厂也可以是 async。对于动态模型发现,在工厂中 fetch 并注册模型,而不是在 session_start 中。pi 会等待工厂完成后再继续启动,因此 provider 在交互式启动期间以及 pi --list-models 时可用。
覆盖现有 Provider
Section titled “覆盖现有 Provider”最简单的用例:将现有 provider 重定向到代理。
// All Anthropic requests now go through your proxypi.registerProvider("anthropic", { baseUrl: "https://proxy.example.com"});
// Add custom headers to OpenAI requestspi.registerProvider("openai", { headers: { "X-Custom-Header": "value" }});
// Both baseUrl and headerspi.registerProvider("google", { baseUrl: "https://ai-gateway.corp.com/google", headers: { "X-Corp-Auth": "$CORP_AUTH_TOKEN" // env var or literal }});当仅提供 baseUrl 和/或 headers(无 models)时,该 provider 的所有现有模型会保留,并使用新端点。
注册新 Provider
Section titled “注册新 Provider”要添加全新的 provider,请指定 models 以及所需配置。
如果模型列表来自远程端点,请使用 async 扩展工厂:
import type { ExtensionAPI } from "@earendil-works/pi-coding-agent";
export default async function (pi: ExtensionAPI) { const response = await fetch("http://localhost:1234/v1/models"); const payload = (await response.json()) as { data: Array<{ id: string; name?: string; context_window?: number; max_tokens?: number; }>; };
pi.registerProvider("local-openai", { baseUrl: "http://localhost:1234/v1", apiKey: "$LOCAL_OPENAI_API_KEY", api: "openai-completions", models: payload.data.map((model) => ({ id: model.id, name: model.name ?? model.id, reasoning: false, input: ["text"], cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, contextWindow: model.context_window ?? 128000, maxTokens: model.max_tokens ?? 4096, })), });}这会在启动完成前注册获取到的模型。
pi.registerProvider("my-llm", { baseUrl: "https://api.my-llm.com/v1", apiKey: "$MY_LLM_API_KEY", // env var reference api: "openai-completions", // which streaming API to use models: [ { id: "my-llm-large", name: "My LLM Large", reasoning: true, // supports extended thinking input: ["text", "image"], cost: { input: 3.0, // $/million tokens output: 15.0, cacheRead: 0.3, cacheWrite: 3.75 }, contextWindow: 200000, maxTokens: 16384 } ]});提供 models 时,会替换该 provider 的所有现有模型。
apiKey 和自定义 header 值使用与 models.json 相同的配置值语法:以 !command 开头会执行命令获取整个值,$ENV_VAR 和 ${ENV_VAR} 插值环境变量,$$ 输出字面量 $,$! 输出字面量 !。
注销 Provider
Section titled “注销 Provider”使用 pi.unregisterProvider(name) 移除先前通过 pi.registerProvider(name, ...) 注册的 provider:
// Registerpi.registerProvider("my-llm", { baseUrl: "https://api.my-llm.com/v1", apiKey: "$MY_LLM_API_KEY", api: "openai-completions", models: [ { id: "my-llm-large", name: "My LLM Large", reasoning: true, input: ["text", "image"], cost: { input: 3.0, output: 15.0, cacheRead: 0.3, cacheWrite: 3.75 }, contextWindow: 200000, maxTokens: 16384 } ]});
// Later, remove itpi.unregisterProvider("my-llm");注销会移除该 provider 的动态模型、API 密钥回退、OAuth provider 注册以及自定义流处理器注册。任何被覆盖的内置模型或 provider 行为都会恢复。
在初始扩展加载阶段之后进行的调用会立即生效,因此无需 /reload。
API 类型
Section titled “API 类型”api 字段决定使用哪种流式实现:
| API | 适用于 |
|---|---|
anthropic-messages | Anthropic Claude API 及兼容实现 |
openai-completions | OpenAI Chat Completions API 及兼容实现 |
openai-responses | OpenAI Responses API |
azure-openai-responses | Azure OpenAI Responses API |
openai-codex-responses | OpenAI Codex Responses API |
mistral-conversations | Mistral SDK Conversations/Chat 流式传输 |
google-generative-ai | Google Generative AI API |
google-vertex | Google Vertex AI API |
bedrock-converse-stream | Amazon Bedrock Converse API |
大多数 OpenAI 兼容 provider 可使用 openai-completions。使用模型级 thinkingLevelMap 处理模型特定的思考级别,使用 compat 处理 provider 特性差异:
models: [{ id: "custom-model", // ... reasoning: true, thinkingLevelMap: { // map pi levels to provider values; null hides unsupported levels minimal: null, low: null, medium: null, high: "default", xhigh: "max" }, compat: { supportsDeveloperRole: false, // use "system" instead of "developer" supportsReasoningEffort: true, maxTokensField: "max_tokens", // instead of "max_completion_tokens" requiresToolResultName: true, // tool results need name field thinkingFormat: "qwen", // top-level enable_thinking: true cacheControlFormat: "anthropic" // Anthropic-style cache_control markers }}]使用 openrouter 实现 OpenRouter 风格的 reasoning: { effort } 控制。使用 together 实现 Together 风格的 reasoning: { enabled } 控制;配合 supportsReasoningEffort 时,还会发送 reasoning_effort。对于读取 chat_template_kwargs.enable_thinking 的本地 Qwen 兼容服务器,请改用 qwen-chat-template。
对于通过系统提示、最后一个工具定义以及最后一个 user/assistant 文本内容暴露 Anthropic 风格提示缓存(cache_control)的 OpenAI 兼容 provider,使用 cacheControlFormat: "anthropic"。
对于使用 api: "anthropic-messages" 的 Anthropic 兼容 provider,若上游模型需要自适应思考(thinking.type: "adaptive" 加 output_config.effort),请在模型或 provider 上设置 compat.forceAdaptiveThinking: true。内置自适应 Claude 模型会自动设置此项。仅当 provider 发出空思考签名并在重放时期望 signature: "" 时,才设置 compat.allowEmptySignature: true。
迁移说明:Mistral 已从
openai-completions迁移到mistral-conversations。 原生 Mistral 模型请使用mistral-conversations。 若你有意通过openai-completions路由 Mistral 兼容/自定义端点,请根据需要显式设置compat标志。
Auth Header
Section titled “Auth Header”如果 provider 期望 Authorization: Bearer <key> 但不使用标准 API,请设置 authHeader: true:
pi.registerProvider("custom-api", { baseUrl: "https://api.example.com", apiKey: "$MY_API_KEY", authHeader: true, // adds Authorization: Bearer header api: "openai-completions", models: [...]});OAuth 支持
Section titled “OAuth 支持”添加与 /login 集成的 OAuth/SSO 认证:
import type { OAuthCredentials, OAuthLoginCallbacks } from "@earendil-works/pi-ai";
pi.registerProvider("corporate-ai", { baseUrl: "https://ai.corp.com/v1", api: "openai-responses", models: [...], oauth: { name: "Corporate AI (SSO)",
async login(callbacks: OAuthLoginCallbacks): Promise<OAuthCredentials> { const method = await callbacks.onSelect({ message: "Select login method:", options: [ { id: "browser", label: "Browser OAuth" }, { id: "device", label: "Device code" } ] }); if (!method) throw new Error("Login cancelled");
let code: string; if (method === "device") { callbacks.onDeviceCode({ userCode: "ABCD-1234", verificationUri: "https://sso.corp.com/device", intervalSeconds: 5, expiresInSeconds: 900 }); code = await pollDeviceCodeUntilComplete(); } else { callbacks.onAuth({ url: "https://sso.corp.com/authorize?..." }); code = await callbacks.onPrompt({ message: "Enter SSO code:" }); }
// Exchange for tokens (your implementation) const tokens = await exchangeCodeForTokens(code);
return { refresh: tokens.refreshToken, access: tokens.accessToken, expires: Date.now() + tokens.expiresIn * 1000 }; },
async refreshToken(credentials: OAuthCredentials): Promise<OAuthCredentials> { const tokens = await refreshAccessToken(credentials.refresh); return { refresh: tokens.refreshToken ?? credentials.refresh, access: tokens.accessToken, expires: Date.now() + tokens.expiresIn * 1000 }; },
getApiKey(credentials: OAuthCredentials): string { return credentials.access; },
// Optional: modify models based on user's subscription modifyModels(models, credentials) { const region = decodeRegionFromToken(credentials.access); return models.map(m => ({ ...m, baseUrl: `https://${region}.ai.corp.com/v1` })); } }});注册后,用户可通过 /login corporate-ai 进行认证。
OAuthLoginCallbacks
Section titled “OAuthLoginCallbacks”callbacks 对象提供三种认证方式:
interface OAuthLoginCallbacks { // Open URL in browser (for OAuth redirects) onAuth(params: { url: string }): void;
// Show device code (for device authorization flow) onDeviceCode(params: { userCode: string; verificationUri: string; intervalSeconds?: number; expiresInSeconds?: number; }): void;
// Prompt user for input (for manual token entry) onPrompt(params: { message: string }): Promise<string>;
// Show an interactive selector, e.g. to choose browser OAuth vs device code onSelect(params: { message: string; options: { id: string; label: string }[]; }): Promise<string | undefined>;}OAuthCredentials
Section titled “OAuthCredentials”凭证持久化在 ~/.pi/agent/auth.json:
interface OAuthCredentials { refresh: string; // Refresh token (for refreshToken()) access: string; // Access token (returned by getApiKey()) expires: number; // Expiration timestamp in milliseconds}自定义流式 API
Section titled “自定义流式 API”对于具有非标准 API 的 provider,实现 streamSimple。编写自己的实现前,请先研究现有 provider 实现:
参考实现:
- anthropic.ts - Anthropic Messages API
- mistral.ts - Mistral Conversations API
- openai-completions.ts - OpenAI Chat Completions
- openai-responses.ts - OpenAI Responses API
- google.ts - Google Generative AI
- amazon-bedrock.ts - AWS Bedrock
所有 provider 遵循相同模式:
import { type AssistantMessage, type AssistantMessageEventStream, type Context, type Model, type SimpleStreamOptions, calculateCost, createAssistantMessageEventStream,} from "@earendil-works/pi-ai";
function streamMyProvider( model: Model<any>, context: Context, options?: SimpleStreamOptions): AssistantMessageEventStream { const stream = createAssistantMessageEventStream();
(async () => { // Initialize output message const output: AssistantMessage = { role: "assistant", content: [], api: model.api, provider: model.provider, model: model.id, usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0, cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, }, stopReason: "stop", timestamp: Date.now(), };
try { // Push start event stream.push({ type: "start", partial: output });
// Make API request and process response... // Push content events as they arrive...
// Push done event stream.push({ type: "done", reason: output.stopReason as "stop" | "length" | "toolUse", message: output }); stream.end(); } catch (error) { output.stopReason = options?.signal?.aborted ? "aborted" : "error"; output.errorMessage = error instanceof Error ? error.message : String(error); stream.push({ type: "error", reason: output.stopReason, error: output }); stream.end(); } })();
return stream;}按以下顺序通过 stream.push() 推送事件:
-
{ type: "start", partial: output }- 流已开始 -
内容事件(可重复,为每个块跟踪
contentIndex):{ type: "text_start", contentIndex, partial }- 文本块开始{ type: "text_delta", contentIndex, delta, partial }- 文本块{ type: "text_end", contentIndex, content, partial }- 文本块结束{ type: "thinking_start", contentIndex, partial }- 思考开始{ type: "thinking_delta", contentIndex, delta, partial }- 思考块{ type: "thinking_end", contentIndex, content, partial }- 思考结束{ type: "toolcall_start", contentIndex, partial }- 工具调用开始{ type: "toolcall_delta", contentIndex, delta, partial }- 工具调用 JSON 块{ type: "toolcall_end", contentIndex, toolCall, partial }- 工具调用结束
-
{ type: "done", reason, message }或{ type: "error", reason, error }- 流结束
每个事件中的 partial 字段包含当前 AssistantMessage 状态。在接收数据时更新 output.content,然后将 output 作为 partial 包含在内。
在数据到达时将内容块添加到 output.content:
// Text blockoutput.content.push({ type: "text", text: "" });stream.push({ type: "text_start", contentIndex: output.content.length - 1, partial: output });
// As text arrivesconst block = output.content[contentIndex];if (block.type === "text") { block.text += delta; stream.push({ type: "text_delta", contentIndex, delta, partial: output });}
// When block completesstream.push({ type: "text_end", contentIndex, content: block.text, partial: output });工具调用需要累积 JSON 并解析:
// Start tool calloutput.content.push({ type: "toolCall", id: toolCallId, name: toolName, arguments: {}});stream.push({ type: "toolcall_start", contentIndex: output.content.length - 1, partial: output });
// Accumulate JSONlet partialJson = "";partialJson += jsonDelta;try { block.arguments = JSON.parse(partialJson);} catch {}stream.push({ type: "toolcall_delta", contentIndex, delta: jsonDelta, partial: output });
// Completestream.push({ type: "toolcall_end", contentIndex, toolCall: { type: "toolCall", id, name, arguments: block.arguments }, partial: output});根据 API 响应更新用量并计算成本:
output.usage.input = response.usage.input_tokens;output.usage.output = response.usage.output_tokens;output.usage.cacheRead = response.usage.cache_read_tokens ?? 0;output.usage.cacheWrite = response.usage.cache_write_tokens ?? 0;output.usage.totalTokens = output.usage.input + output.usage.output + output.usage.cacheRead + output.usage.cacheWrite;calculateCost(model, output.usage);上下文溢出错误
Section titled “上下文溢出错误”当请求超过模型的上下文窗口时,pi 可通过压缩对话并重试来自动恢复。仅当 pi 将失败识别为溢出时,此恢复才会生效。
检测在最终确定的 assistant 消息上运行:
stopReason === "error"errorMessage匹配 pi 的已知溢出模式之一(参见packages/ai/src/utils/overflow.ts)
如果 provider 返回 pi 无法识别的溢出错误消息,请在注册该 provider 的同一扩展中规范化错误。使用 message_end 处理器重写 assistant 消息,使其 errorMessage 以 pi 可识别的短语开头。通用回退 context_length_exceeded 是最安全的选择。
const MY_PROVIDER_OVERFLOW_PATTERN = /your provider's overflow phrase/i;
export default function (pi: ExtensionAPI) { pi.registerProvider("my-provider", { /* ... */ });
pi.on("message_end", (event, ctx) => { const message = event.message; if (message.role !== "assistant") return; if (message.stopReason !== "error") return; if ( message.provider !== "my-provider" && ctx.model?.provider !== "my-provider" ) return;
const errorMessage = message.errorMessage ?? ""; if (errorMessage.includes("context_length_exceeded")) return; if (!MY_PROVIDER_OVERFLOW_PATTERN.test(errorMessage)) return;
return { message: { ...message, errorMessage: `context_length_exceeded: ${errorMessage}`, }, }; });}message_end 在 pi 跟踪 assistant 消息以进行自动压缩之前运行,因此 pi 检查的是重写后的 errorMessage。设置完成后,pi 将:
- 从
errorMessage检测溢出。 - 从实时上下文中删除失败的 assistant 消息。
- 运行压缩。
- 重试请求一次。
请谨慎保护重写逻辑:
- 限定到你的 provider(
message.provider和ctx.model?.provider),以免触碰其他 provider 的无关错误。 - 匹配 provider 特定模式,而非 pi 的通用溢出模式。重写速率限制或节流错误(
rate limit、too many requests)会错误触发压缩,而不是 pi 的正常退避重试路径。 - 当
errorMessage已包含context_length_exceeded时跳过,使处理器幂等。
注册你的流函数:
pi.registerProvider("my-provider", { baseUrl: "https://api.example.com", apiKey: "$MY_API_KEY", api: "my-custom-api", models: [...], streamSimple: streamMyProvider});测试你的实现
Section titled “测试你的实现”针对与内置 provider 相同的测试套件测试你的 provider。从 packages/ai/test/ 复制并改编这些测试文件:
| 测试 | 用途 |
|---|---|
stream.test.ts | 基本流式传输、文本输出 |
tokens.test.ts | Token 计数与用量 |
abort.test.ts | AbortSignal 处理 |
empty.test.ts | 空/最小响应 |
context-overflow.test.ts | 上下文窗口限制 |
image-limits.test.ts | 图像输入处理 |
unicode-surrogate.test.ts | Unicode 边界情况 |
tool-call-without-result.test.ts | 工具调用边界情况 |
image-tool-result.test.ts | 工具结果中的图像 |
total-tokens.test.ts | 总 token 计算 |
cross-provider-handoff.test.ts | provider 之间的上下文交接 |
使用你的 provider/模型对运行测试以验证兼容性。
interface ProviderConfig { /** Display name for the provider in UI such as /login. */ name?: string;
/** API endpoint URL. Required when defining models. */ baseUrl?: string;
/** API key literal, env interpolation ($ENV_VAR or ${ENV_VAR}), or !command. Required when defining models (unless oauth). */ apiKey?: string;
/** API type for streaming. Required at provider or model level when defining models. */ api?: Api;
/** Custom streaming implementation for non-standard APIs. */ streamSimple?: ( model: Model<Api>, context: Context, options?: SimpleStreamOptions ) => AssistantMessageEventStream;
/** Custom headers to include in requests. Values use the same resolution syntax as apiKey. */ headers?: Record<string, string>;
/** If true, adds Authorization: Bearer header with the resolved API key. */ authHeader?: boolean;
/** Models to register. If provided, replaces all existing models for this provider. */ models?: ProviderModelConfig[];
/** OAuth provider for /login support. */ oauth?: { name: string; login(callbacks: OAuthLoginCallbacks): Promise<OAuthCredentials>; refreshToken(credentials: OAuthCredentials): Promise<OAuthCredentials>; getApiKey(credentials: OAuthCredentials): string; modifyModels?(models: Model<Api>[], credentials: OAuthCredentials): Model<Api>[]; };}模型定义参考
Section titled “模型定义参考”interface ProviderModelConfig { /** Model ID (e.g., "claude-sonnet-4-20250514"). */ id: string;
/** Display name (e.g., "Claude 4 Sonnet"). */ name: string;
/** API type override for this specific model. */ api?: Api;
/** API endpoint URL override for this specific model. */ baseUrl?: string;
/** Whether the model supports extended thinking. */ reasoning: boolean;
/** Maps pi thinking levels to provider/model-specific values; null marks a level unsupported. */ thinkingLevelMap?: Partial<Record<"off" | "minimal" | "low" | "medium" | "high" | "xhigh", string | null>>;
/** Supported input types. */ input: ("text" | "image")[];
/** Cost per million tokens (for usage tracking). */ cost: { input: number; output: number; cacheRead: number; cacheWrite: number; };
/** Maximum context window size in tokens. */ contextWindow: number;
/** Maximum output tokens. */ maxTokens: number;
/** Custom headers for this specific model. */ headers?: Record<string, string>;
/** Compatibility settings for the selected API. */ compat?: { // openai-completions supportsStore?: boolean; supportsDeveloperRole?: boolean; supportsReasoningEffort?: boolean; supportsUsageInStreaming?: boolean; maxTokensField?: "max_completion_tokens" | "max_tokens"; requiresToolResultName?: boolean; requiresAssistantAfterToolResult?: boolean; requiresThinkingAsText?: boolean; requiresReasoningContentOnAssistantMessages?: boolean; thinkingFormat?: "openai" | "openrouter" | "deepseek" | "together" | "zai" | "qwen" | "qwen-chat-template"; cacheControlFormat?: "anthropic";
// anthropic-messages supportsEagerToolInputStreaming?: boolean; supportsLongCacheRetention?: boolean; sendSessionAffinityHeaders?: boolean; supportsCacheControlOnTools?: boolean; forceAdaptiveThinking?: boolean; allowEmptySignature?: boolean; };}openrouter 发送 reasoning: { effort }。deepseek 发送 thinking: { type: "enabled" | "disabled" },启用时还会发送 reasoning_effort。together 发送 reasoning: { enabled },启用 supportsReasoningEffort 时还会发送 reasoning_effort。qwen 用于 DashScope 风格的顶层 enable_thinking。对于读取 chat_template_kwargs.enable_thinking 的本地 Qwen 兼容服务器,请使用 qwen-chat-template。
cacheControlFormat: "anthropic" 将 Anthropic 风格的 cache_control 标记应用于系统提示、最后一个工具定义以及最后一个 user/assistant 文本内容。