跳转到内容

自定义 Provider

扩展可通过 pi.registerProvider() 注册自定义模型 provider。这支持:

  • 代理 - 通过企业代理或 API 网关路由请求
  • 自定义端点 - 使用自托管或私有模型部署
  • OAuth/SSO - 为企业 provider 添加认证流程
  • 自定义 API - 为非标准 LLM API 实现流式传输

查看这些完整的 provider 示例:

import type { ExtensionAPI } from "@earendil-works/pi-coding-agent";
export default function (pi: ExtensionAPI) {
// Override baseUrl for existing provider
pi.registerProvider("anthropic", {
baseUrl: "https://proxy.example.com"
});
// Register new provider with models
pi.registerProvider("my-provider", {
name: "My Provider",
baseUrl: "https://api.example.com",
apiKey: "$MY_API_KEY",
api: "openai-completions",
models: [
{
id: "my-model",
name: "My Model",
reasoning: false,
input: ["text", "image"],
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
contextWindow: 128000,
maxTokens: 4096
}
]
});
}

扩展工厂也可以是 async。对于动态模型发现,在工厂中 fetch 并注册模型,而不是在 session_start 中。pi 会等待工厂完成后再继续启动,因此 provider 在交互式启动期间以及 pi --list-models 时可用。

最简单的用例:将现有 provider 重定向到代理。

// All Anthropic requests now go through your proxy
pi.registerProvider("anthropic", {
baseUrl: "https://proxy.example.com"
});
// Add custom headers to OpenAI requests
pi.registerProvider("openai", {
headers: {
"X-Custom-Header": "value"
}
});
// Both baseUrl and headers
pi.registerProvider("google", {
baseUrl: "https://ai-gateway.corp.com/google",
headers: {
"X-Corp-Auth": "$CORP_AUTH_TOKEN" // env var or literal
}
});

当仅提供 baseUrl 和/或 headers(无 models)时,该 provider 的所有现有模型会保留,并使用新端点。

要添加全新的 provider,请指定 models 以及所需配置。

如果模型列表来自远程端点,请使用 async 扩展工厂:

import type { ExtensionAPI } from "@earendil-works/pi-coding-agent";
export default async function (pi: ExtensionAPI) {
const response = await fetch("http://localhost:1234/v1/models");
const payload = (await response.json()) as {
data: Array<{
id: string;
name?: string;
context_window?: number;
max_tokens?: number;
}>;
};
pi.registerProvider("local-openai", {
baseUrl: "http://localhost:1234/v1",
apiKey: "$LOCAL_OPENAI_API_KEY",
api: "openai-completions",
models: payload.data.map((model) => ({
id: model.id,
name: model.name ?? model.id,
reasoning: false,
input: ["text"],
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
contextWindow: model.context_window ?? 128000,
maxTokens: model.max_tokens ?? 4096,
})),
});
}

这会在启动完成前注册获取到的模型。

pi.registerProvider("my-llm", {
baseUrl: "https://api.my-llm.com/v1",
apiKey: "$MY_LLM_API_KEY", // env var reference
api: "openai-completions", // which streaming API to use
models: [
{
id: "my-llm-large",
name: "My LLM Large",
reasoning: true, // supports extended thinking
input: ["text", "image"],
cost: {
input: 3.0, // $/million tokens
output: 15.0,
cacheRead: 0.3,
cacheWrite: 3.75
},
contextWindow: 200000,
maxTokens: 16384
}
]
});

提供 models 时,会替换该 provider 的所有现有模型。

apiKey 和自定义 header 值使用与 models.json 相同的配置值语法:以 !command 开头会执行命令获取整个值,$ENV_VAR${ENV_VAR} 插值环境变量,$$ 输出字面量 $$! 输出字面量 !

使用 pi.unregisterProvider(name) 移除先前通过 pi.registerProvider(name, ...) 注册的 provider:

// Register
pi.registerProvider("my-llm", {
baseUrl: "https://api.my-llm.com/v1",
apiKey: "$MY_LLM_API_KEY",
api: "openai-completions",
models: [
{
id: "my-llm-large",
name: "My LLM Large",
reasoning: true,
input: ["text", "image"],
cost: { input: 3.0, output: 15.0, cacheRead: 0.3, cacheWrite: 3.75 },
contextWindow: 200000,
maxTokens: 16384
}
]
});
// Later, remove it
pi.unregisterProvider("my-llm");

注销会移除该 provider 的动态模型、API 密钥回退、OAuth provider 注册以及自定义流处理器注册。任何被覆盖的内置模型或 provider 行为都会恢复。

在初始扩展加载阶段之后进行的调用会立即生效,因此无需 /reload

api 字段决定使用哪种流式实现:

API适用于
anthropic-messagesAnthropic Claude API 及兼容实现
openai-completionsOpenAI Chat Completions API 及兼容实现
openai-responsesOpenAI Responses API
azure-openai-responsesAzure OpenAI Responses API
openai-codex-responsesOpenAI Codex Responses API
mistral-conversationsMistral SDK Conversations/Chat 流式传输
google-generative-aiGoogle Generative AI API
google-vertexGoogle Vertex AI API
bedrock-converse-streamAmazon Bedrock Converse API

大多数 OpenAI 兼容 provider 可使用 openai-completions。使用模型级 thinkingLevelMap 处理模型特定的思考级别,使用 compat 处理 provider 特性差异:

models: [{
id: "custom-model",
// ...
reasoning: true,
thinkingLevelMap: { // map pi levels to provider values; null hides unsupported levels
minimal: null,
low: null,
medium: null,
high: "default",
xhigh: "max"
},
compat: {
supportsDeveloperRole: false, // use "system" instead of "developer"
supportsReasoningEffort: true,
maxTokensField: "max_tokens", // instead of "max_completion_tokens"
requiresToolResultName: true, // tool results need name field
thinkingFormat: "qwen", // top-level enable_thinking: true
cacheControlFormat: "anthropic" // Anthropic-style cache_control markers
}
}]

使用 openrouter 实现 OpenRouter 风格的 reasoning: { effort } 控制。使用 together 实现 Together 风格的 reasoning: { enabled } 控制;配合 supportsReasoningEffort 时,还会发送 reasoning_effort。对于读取 chat_template_kwargs.enable_thinking 的本地 Qwen 兼容服务器,请改用 qwen-chat-template。 对于通过系统提示、最后一个工具定义以及最后一个 user/assistant 文本内容暴露 Anthropic 风格提示缓存(cache_control)的 OpenAI 兼容 provider,使用 cacheControlFormat: "anthropic"

对于使用 api: "anthropic-messages" 的 Anthropic 兼容 provider,若上游模型需要自适应思考(thinking.type: "adaptive"output_config.effort),请在模型或 provider 上设置 compat.forceAdaptiveThinking: true。内置自适应 Claude 模型会自动设置此项。仅当 provider 发出空思考签名并在重放时期望 signature: "" 时,才设置 compat.allowEmptySignature: true

迁移说明:Mistral 已从 openai-completions 迁移到 mistral-conversations。 原生 Mistral 模型请使用 mistral-conversations。 若你有意通过 openai-completions 路由 Mistral 兼容/自定义端点,请根据需要显式设置 compat 标志。

如果 provider 期望 Authorization: Bearer <key> 但不使用标准 API,请设置 authHeader: true

pi.registerProvider("custom-api", {
baseUrl: "https://api.example.com",
apiKey: "$MY_API_KEY",
authHeader: true, // adds Authorization: Bearer header
api: "openai-completions",
models: [...]
});

添加与 /login 集成的 OAuth/SSO 认证:

import type { OAuthCredentials, OAuthLoginCallbacks } from "@earendil-works/pi-ai";
pi.registerProvider("corporate-ai", {
baseUrl: "https://ai.corp.com/v1",
api: "openai-responses",
models: [...],
oauth: {
name: "Corporate AI (SSO)",
async login(callbacks: OAuthLoginCallbacks): Promise<OAuthCredentials> {
const method = await callbacks.onSelect({
message: "Select login method:",
options: [
{ id: "browser", label: "Browser OAuth" },
{ id: "device", label: "Device code" }
]
});
if (!method) throw new Error("Login cancelled");
let code: string;
if (method === "device") {
callbacks.onDeviceCode({
userCode: "ABCD-1234",
verificationUri: "https://sso.corp.com/device",
intervalSeconds: 5,
expiresInSeconds: 900
});
code = await pollDeviceCodeUntilComplete();
} else {
callbacks.onAuth({ url: "https://sso.corp.com/authorize?..." });
code = await callbacks.onPrompt({ message: "Enter SSO code:" });
}
// Exchange for tokens (your implementation)
const tokens = await exchangeCodeForTokens(code);
return {
refresh: tokens.refreshToken,
access: tokens.accessToken,
expires: Date.now() + tokens.expiresIn * 1000
};
},
async refreshToken(credentials: OAuthCredentials): Promise<OAuthCredentials> {
const tokens = await refreshAccessToken(credentials.refresh);
return {
refresh: tokens.refreshToken ?? credentials.refresh,
access: tokens.accessToken,
expires: Date.now() + tokens.expiresIn * 1000
};
},
getApiKey(credentials: OAuthCredentials): string {
return credentials.access;
},
// Optional: modify models based on user's subscription
modifyModels(models, credentials) {
const region = decodeRegionFromToken(credentials.access);
return models.map(m => ({
...m,
baseUrl: `https://${region}.ai.corp.com/v1`
}));
}
}
});

注册后,用户可通过 /login corporate-ai 进行认证。

callbacks 对象提供三种认证方式:

interface OAuthLoginCallbacks {
// Open URL in browser (for OAuth redirects)
onAuth(params: { url: string }): void;
// Show device code (for device authorization flow)
onDeviceCode(params: {
userCode: string;
verificationUri: string;
intervalSeconds?: number;
expiresInSeconds?: number;
}): void;
// Prompt user for input (for manual token entry)
onPrompt(params: { message: string }): Promise<string>;
// Show an interactive selector, e.g. to choose browser OAuth vs device code
onSelect(params: {
message: string;
options: { id: string; label: string }[];
}): Promise<string | undefined>;
}

凭证持久化在 ~/.pi/agent/auth.json

interface OAuthCredentials {
refresh: string; // Refresh token (for refreshToken())
access: string; // Access token (returned by getApiKey())
expires: number; // Expiration timestamp in milliseconds
}

对于具有非标准 API 的 provider,实现 streamSimple。编写自己的实现前,请先研究现有 provider 实现:

参考实现:

所有 provider 遵循相同模式:

import {
type AssistantMessage,
type AssistantMessageEventStream,
type Context,
type Model,
type SimpleStreamOptions,
calculateCost,
createAssistantMessageEventStream,
} from "@earendil-works/pi-ai";
function streamMyProvider(
model: Model<any>,
context: Context,
options?: SimpleStreamOptions
): AssistantMessageEventStream {
const stream = createAssistantMessageEventStream();
(async () => {
// Initialize output message
const output: AssistantMessage = {
role: "assistant",
content: [],
api: model.api,
provider: model.provider,
model: model.id,
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason: "stop",
timestamp: Date.now(),
};
try {
// Push start event
stream.push({ type: "start", partial: output });
// Make API request and process response...
// Push content events as they arrive...
// Push done event
stream.push({
type: "done",
reason: output.stopReason as "stop" | "length" | "toolUse",
message: output
});
stream.end();
} catch (error) {
output.stopReason = options?.signal?.aborted ? "aborted" : "error";
output.errorMessage = error instanceof Error ? error.message : String(error);
stream.push({ type: "error", reason: output.stopReason, error: output });
stream.end();
}
})();
return stream;
}

按以下顺序通过 stream.push() 推送事件:

  1. { type: "start", partial: output } - 流已开始

  2. 内容事件(可重复,为每个块跟踪 contentIndex):

    • { type: "text_start", contentIndex, partial } - 文本块开始
    • { type: "text_delta", contentIndex, delta, partial } - 文本块
    • { type: "text_end", contentIndex, content, partial } - 文本块结束
    • { type: "thinking_start", contentIndex, partial } - 思考开始
    • { type: "thinking_delta", contentIndex, delta, partial } - 思考块
    • { type: "thinking_end", contentIndex, content, partial } - 思考结束
    • { type: "toolcall_start", contentIndex, partial } - 工具调用开始
    • { type: "toolcall_delta", contentIndex, delta, partial } - 工具调用 JSON 块
    • { type: "toolcall_end", contentIndex, toolCall, partial } - 工具调用结束
  3. { type: "done", reason, message }{ type: "error", reason, error } - 流结束

每个事件中的 partial 字段包含当前 AssistantMessage 状态。在接收数据时更新 output.content,然后将 output 作为 partial 包含在内。

在数据到达时将内容块添加到 output.content

// Text block
output.content.push({ type: "text", text: "" });
stream.push({ type: "text_start", contentIndex: output.content.length - 1, partial: output });
// As text arrives
const block = output.content[contentIndex];
if (block.type === "text") {
block.text += delta;
stream.push({ type: "text_delta", contentIndex, delta, partial: output });
}
// When block completes
stream.push({ type: "text_end", contentIndex, content: block.text, partial: output });

工具调用需要累积 JSON 并解析:

// Start tool call
output.content.push({
type: "toolCall",
id: toolCallId,
name: toolName,
arguments: {}
});
stream.push({ type: "toolcall_start", contentIndex: output.content.length - 1, partial: output });
// Accumulate JSON
let partialJson = "";
partialJson += jsonDelta;
try {
block.arguments = JSON.parse(partialJson);
} catch {}
stream.push({ type: "toolcall_delta", contentIndex, delta: jsonDelta, partial: output });
// Complete
stream.push({
type: "toolcall_end",
contentIndex,
toolCall: { type: "toolCall", id, name, arguments: block.arguments },
partial: output
});

根据 API 响应更新用量并计算成本:

output.usage.input = response.usage.input_tokens;
output.usage.output = response.usage.output_tokens;
output.usage.cacheRead = response.usage.cache_read_tokens ?? 0;
output.usage.cacheWrite = response.usage.cache_write_tokens ?? 0;
output.usage.totalTokens = output.usage.input + output.usage.output +
output.usage.cacheRead + output.usage.cacheWrite;
calculateCost(model, output.usage);

当请求超过模型的上下文窗口时,pi 可通过压缩对话并重试来自动恢复。仅当 pi 将失败识别为溢出时,此恢复才会生效。

检测在最终确定的 assistant 消息上运行:

如果 provider 返回 pi 无法识别的溢出错误消息,请在注册该 provider 的同一扩展中规范化错误。使用 message_end 处理器重写 assistant 消息,使其 errorMessage 以 pi 可识别的短语开头。通用回退 context_length_exceeded 是最安全的选择。

const MY_PROVIDER_OVERFLOW_PATTERN = /your provider's overflow phrase/i;
export default function (pi: ExtensionAPI) {
pi.registerProvider("my-provider", { /* ... */ });
pi.on("message_end", (event, ctx) => {
const message = event.message;
if (message.role !== "assistant") return;
if (message.stopReason !== "error") return;
if (
message.provider !== "my-provider" &&
ctx.model?.provider !== "my-provider"
)
return;
const errorMessage = message.errorMessage ?? "";
if (errorMessage.includes("context_length_exceeded")) return;
if (!MY_PROVIDER_OVERFLOW_PATTERN.test(errorMessage)) return;
return {
message: {
...message,
errorMessage: `context_length_exceeded: ${errorMessage}`,
},
};
});
}

message_end 在 pi 跟踪 assistant 消息以进行自动压缩之前运行,因此 pi 检查的是重写后的 errorMessage。设置完成后,pi 将:

  1. errorMessage 检测溢出。
  2. 从实时上下文中删除失败的 assistant 消息。
  3. 运行压缩。
  4. 重试请求一次。

请谨慎保护重写逻辑:

  • 限定到你的 provider(message.providerctx.model?.provider),以免触碰其他 provider 的无关错误。
  • 匹配 provider 特定模式,而非 pi 的通用溢出模式。重写速率限制或节流错误(rate limittoo many requests)会错误触发压缩,而不是 pi 的正常退避重试路径。
  • errorMessage 已包含 context_length_exceeded 时跳过,使处理器幂等。

注册你的流函数:

pi.registerProvider("my-provider", {
baseUrl: "https://api.example.com",
apiKey: "$MY_API_KEY",
api: "my-custom-api",
models: [...],
streamSimple: streamMyProvider
});

针对与内置 provider 相同的测试套件测试你的 provider。从 packages/ai/test/ 复制并改编这些测试文件:

测试用途
stream.test.ts基本流式传输、文本输出
tokens.test.tsToken 计数与用量
abort.test.tsAbortSignal 处理
empty.test.ts空/最小响应
context-overflow.test.ts上下文窗口限制
image-limits.test.ts图像输入处理
unicode-surrogate.test.tsUnicode 边界情况
tool-call-without-result.test.ts工具调用边界情况
image-tool-result.test.ts工具结果中的图像
total-tokens.test.ts总 token 计算
cross-provider-handoff.test.tsprovider 之间的上下文交接

使用你的 provider/模型对运行测试以验证兼容性。

interface ProviderConfig {
/** Display name for the provider in UI such as /login. */
name?: string;
/** API endpoint URL. Required when defining models. */
baseUrl?: string;
/** API key literal, env interpolation ($ENV_VAR or ${ENV_VAR}), or !command. Required when defining models (unless oauth). */
apiKey?: string;
/** API type for streaming. Required at provider or model level when defining models. */
api?: Api;
/** Custom streaming implementation for non-standard APIs. */
streamSimple?: (
model: Model<Api>,
context: Context,
options?: SimpleStreamOptions
) => AssistantMessageEventStream;
/** Custom headers to include in requests. Values use the same resolution syntax as apiKey. */
headers?: Record<string, string>;
/** If true, adds Authorization: Bearer header with the resolved API key. */
authHeader?: boolean;
/** Models to register. If provided, replaces all existing models for this provider. */
models?: ProviderModelConfig[];
/** OAuth provider for /login support. */
oauth?: {
name: string;
login(callbacks: OAuthLoginCallbacks): Promise<OAuthCredentials>;
refreshToken(credentials: OAuthCredentials): Promise<OAuthCredentials>;
getApiKey(credentials: OAuthCredentials): string;
modifyModels?(models: Model<Api>[], credentials: OAuthCredentials): Model<Api>[];
};
}
interface ProviderModelConfig {
/** Model ID (e.g., "claude-sonnet-4-20250514"). */
id: string;
/** Display name (e.g., "Claude 4 Sonnet"). */
name: string;
/** API type override for this specific model. */
api?: Api;
/** API endpoint URL override for this specific model. */
baseUrl?: string;
/** Whether the model supports extended thinking. */
reasoning: boolean;
/** Maps pi thinking levels to provider/model-specific values; null marks a level unsupported. */
thinkingLevelMap?: Partial<Record<"off" | "minimal" | "low" | "medium" | "high" | "xhigh", string | null>>;
/** Supported input types. */
input: ("text" | "image")[];
/** Cost per million tokens (for usage tracking). */
cost: {
input: number;
output: number;
cacheRead: number;
cacheWrite: number;
};
/** Maximum context window size in tokens. */
contextWindow: number;
/** Maximum output tokens. */
maxTokens: number;
/** Custom headers for this specific model. */
headers?: Record<string, string>;
/** Compatibility settings for the selected API. */
compat?: {
// openai-completions
supportsStore?: boolean;
supportsDeveloperRole?: boolean;
supportsReasoningEffort?: boolean;
supportsUsageInStreaming?: boolean;
maxTokensField?: "max_completion_tokens" | "max_tokens";
requiresToolResultName?: boolean;
requiresAssistantAfterToolResult?: boolean;
requiresThinkingAsText?: boolean;
requiresReasoningContentOnAssistantMessages?: boolean;
thinkingFormat?: "openai" | "openrouter" | "deepseek" | "together" | "zai" | "qwen" | "qwen-chat-template";
cacheControlFormat?: "anthropic";
// anthropic-messages
supportsEagerToolInputStreaming?: boolean;
supportsLongCacheRetention?: boolean;
sendSessionAffinityHeaders?: boolean;
supportsCacheControlOnTools?: boolean;
forceAdaptiveThinking?: boolean;
allowEmptySignature?: boolean;
};
}

openrouter 发送 reasoning: { effort }deepseek 发送 thinking: { type: "enabled" | "disabled" },启用时还会发送 reasoning_efforttogether 发送 reasoning: { enabled },启用 supportsReasoningEffort 时还会发送 reasoning_effortqwen 用于 DashScope 风格的顶层 enable_thinking。对于读取 chat_template_kwargs.enable_thinking 的本地 Qwen 兼容服务器,请使用 qwen-chat-templatecacheControlFormat: "anthropic" 将 Anthropic 风格的 cache_control 标记应用于系统提示、最后一个工具定义以及最后一个 user/assistant 文本内容。