关注

Python/Node.js/Go 三语言接入 AI API 网关的最佳实践

一个 API Key,调遍 Claude、GPT、Gemini、DeepSeek——本文手把手演示 Python、Node.js、Go 三种语言的完整接入方案。


背景:为什么用 API 网关?

直接接入各家大模型的痛点很明显:

  • 每家 SDK 不同,切模型要改代码
  • API Key 分散管理,账单难以统一
  • 不同提供商的限流策略、错误码各异,重试逻辑要写多遍

TheRouter 是一个 AI 模型路由网关,完全兼容 OpenAI API 格式,只需把 base_url 换成 https://api.therouter.ai/v1,就能用同一套代码调用市面上所有主流模型。模型 ID 采用 brand/model-name 格式,例如:

  • anthropic/claude-opus-4-5
  • openai/gpt-4.1
  • google/gemini-2.5-pro
  • deepseek/deepseek-r1

下面直接上代码。


Python 接入

环境准备

pip install openai python-dotenv

在项目根目录创建 .env

THEROUTER_API_KEY=sk-your-key-here

基本对话

import os
from openai import OpenAI
from dotenv import load_dotenv

load_dotenv()

client = OpenAI(
    api_key=os.environ["THEROUTER_API_KEY"],
    base_url="https://api.therouter.ai/v1",
)

response = client.chat.completions.create(
    model="anthropic/claude-opus-4-5",
    messages=[
        {"role": "user", "content": "用 Python 写一个快速排序,加注释"}
    ],
    max_tokens=1024,
)

print(response.choices[0].message.content)

只改了两行:api_key 换成你的 TheRouter Key,base_url 指向网关。其余代码和官方 OpenAI SDK 完全一致。

流式输出

import os
from openai import OpenAI
from dotenv import load_dotenv

load_dotenv()

client = OpenAI(
    api_key=os.environ["THEROUTER_API_KEY"],
    base_url="https://api.therouter.ai/v1",
)

with client.chat.completions.stream(
    model="openai/gpt-4.1",
    messages=[{"role": "user", "content": "解释一下 Python GIL 是什么"}],
    max_tokens=512,
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)
    print()  # 换行

错误处理与重试

import os
import time
from openai import OpenAI, RateLimitError, APITimeoutError, APIConnectionError
from dotenv import load_dotenv

load_dotenv()

client = OpenAI(
    api_key=os.environ["THEROUTER_API_KEY"],
    base_url="https://api.therouter.ai/v1",
    timeout=60.0,          # 请求超时 60s
    max_retries=3,         # openai SDK 内置重试(针对 5xx 和网络错误)
)

def chat_with_retry(model: str, prompt: str, max_attempts: int = 3) -> str:
    """带退避重试的对话函数"""
    for attempt in range(max_attempts):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=[{"role": "user", "content": prompt}],
                max_tokens=1024,
            )
            return response.choices[0].message.content
        except RateLimitError:
            wait = 2 ** attempt  # 指数退避:1s, 2s, 4s
            print(f"触发限流,{wait}s 后重试...")
            time.sleep(wait)
        except APITimeoutError:
            print(f"请求超时(第 {attempt + 1} 次)")
            if attempt == max_attempts - 1:
                raise
        except APIConnectionError as e:
            print(f"连接错误: {e}")
            raise

    raise RuntimeError("超出最大重试次数")


result = chat_with_retry("deepseek/deepseek-r1", "1+1=?")
print(result)

最佳实践总结(Python):

  1. python-dotenv 管理 API Key,绝不硬编码在代码里
  2. 设置 timeout=60.0,避免长时间挂起
  3. max_retries=3 交给 SDK 处理网络抖动,业务层再加退避逻辑应对限流
  4. 捕获具体异常类型,而不是裸 except Exception

Node.js 接入

环境准备

npm install openai dotenv

.env 文件同上。

基本对话(ESM)

// chat.mjs
import OpenAI from "openai";
import "dotenv/config";

const client = new OpenAI({
  apiKey: process.env.THEROUTER_API_KEY,
  baseURL: "https://api.therouter.ai/v1",
});

async function main() {
  const response = await client.chat.completions.create({
    model: "anthropic/claude-opus-4-5",
    messages: [{ role: "user", content: "写一个 Node.js 文件读取的示例" }],
    max_tokens: 512,
  });

  console.log(response.choices[0].message.content);
}

main().catch(console.error);

流式输出

// stream.mjs
import OpenAI from "openai";
import "dotenv/config";

const client = new OpenAI({
  apiKey: process.env.THEROUTER_API_KEY,
  baseURL: "https://api.therouter.ai/v1",
});

async function streamChat() {
  const stream = client.chat.completions.stream({
    model: "openai/gpt-4.1",
    messages: [{ role: "user", content: "解释 JavaScript 事件循环" }],
    max_tokens: 512,
  });

  stream.on("content", (delta) => {
    process.stdout.write(delta);
  });

  const finalMessage = await stream.finalMessage();
  console.log("\n\n[完成] 总 token 消耗:", finalMessage.usage?.total_tokens);
}

streamChat().catch(console.error);

错误处理

// error-handling.mjs
import OpenAI, { APIError } from "openai";
import "dotenv/config";

const client = new OpenAI({
  apiKey: process.env.THEROUTER_API_KEY,
  baseURL: "https://api.therouter.ai/v1",
  timeout: 60_000,   // 60 秒
  maxRetries: 3,
});

async function safeChat(model, prompt) {
  try {
    const response = await client.chat.completions.create({
      model,
      messages: [{ role: "user", content: prompt }],
      max_tokens: 1024,
    });
    return response.choices[0].message.content;
  } catch (err) {
    if (err instanceof APIError) {
      console.error(`API 错误 ${err.status}: ${err.message}`);
      if (err.status === 429) {
        console.error("触发限流,请稍后重试");
      } else if (err.status >= 500) {
        console.error("服务端错误,可重试");
      }
    }
    throw err;
  }
}

const result = await safeChat("google/gemini-2.5-pro", "什么是 Promise?");
console.log(result);

最佳实践总结(Node.js):

  1. dotenv/config 在入口处一次性加载环境变量
  2. timeoutmaxRetries 在构造 client 时统一配置
  3. 捕获 APIError 并根据 status 码做差异化处理
  4. 生产环境建议使用 TypeScript,可以得到完整的类型提示

Go 接入

Go 生态里 go-openai 是最常用的 OpenAI 兼容库。

环境准备

go get github.com/sashabaranov/go-openai

.env 文件同上,或直接用系统环境变量:

export THEROUTER_API_KEY=sk-your-key-here

基本对话

// main.go
package main

import (
	"context"
	"fmt"
	"os"

	openai "github.com/sashabaranov/go-openai"
)

func newClient() *openai.Client {
	config := openai.DefaultConfig(os.Getenv("THEROUTER_API_KEY"))
	config.BaseURL = "https://api.therouter.ai/v1"
	return openai.NewClientWithConfig(config)
}

func main() {
	client := newClient()

	resp, err := client.CreateChatCompletion(
		context.Background(),
		openai.ChatCompletionRequest{
			Model: "anthropic/claude-opus-4-5",
			Messages: []openai.ChatCompletionMessage{
				{
					Role:    openai.ChatMessageRoleUser,
					Content: "用 Go 实现一个并发安全的计数器",
				},
			},
			MaxTokens: 1024,
		},
	)
	if err != nil {
		fmt.Fprintf(os.Stderr, "请求失败: %v\n", err)
		os.Exit(1)
	}

	fmt.Println(resp.Choices[0].Message.Content)
}

流式输出

package main

import (
	"context"
	"errors"
	"fmt"
	"io"
	"os"

	openai "github.com/sashabaranov/go-openai"
)

func streamChat(client *openai.Client, model, prompt string) error {
	stream, err := client.CreateChatCompletionStream(
		context.Background(),
		openai.ChatCompletionRequest{
			Model: model,
			Messages: []openai.ChatCompletionMessage{
				{Role: openai.ChatMessageRoleUser, Content: prompt},
			},
			MaxTokens: 512,
			Stream:    true,
		},
	)
	if err != nil {
		return fmt.Errorf("创建流失败: %w", err)
	}
	defer stream.Close()

	for {
		response, err := stream.Recv()
		if errors.Is(err, io.EOF) {
			fmt.Println() // 换行
			return nil
		}
		if err != nil {
			return fmt.Errorf("流读取错误: %w", err)
		}
		if len(response.Choices) > 0 {
			fmt.Print(response.Choices[0].Delta.Content)
		}
	}
}

func main() {
	client := newClient()
	if err := streamChat(client, "openai/gpt-4.1", "解释 Go 的 goroutine 调度"); err != nil {
		fmt.Fprintln(os.Stderr, err)
		os.Exit(1)
	}
}

错误处理与超时

package main

import (
	"context"
	"fmt"
	"net/http"
	"os"
	"time"

	openai "github.com/sashabaranov/go-openai"
)

func newClientWithTimeout(timeout time.Duration) *openai.Client {
	config := openai.DefaultConfig(os.Getenv("THEROUTER_API_KEY"))
	config.BaseURL = "https://api.therouter.ai/v1"
	config.HTTPClient = &http.Client{Timeout: timeout}
	return openai.NewClientWithConfig(config)
}

func chatWithRetry(client *openai.Client, model, prompt string, maxAttempts int) (string, error) {
	var lastErr error
	for attempt := 0; attempt < maxAttempts; attempt++ {
		if attempt > 0 {
			wait := time.Duration(1<<attempt) * time.Second // 指数退避
			fmt.Printf("第 %d 次重试,等待 %s...\n", attempt, wait)
			time.Sleep(wait)
		}

		ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
		resp, err := client.CreateChatCompletion(ctx, openai.ChatCompletionRequest{
			Model:     model,
			Messages:  []openai.ChatCompletionMessage{{Role: openai.ChatMessageRoleUser, Content: prompt}},
			MaxTokens: 1024,
		})
		cancel()

		if err == nil {
			return resp.Choices[0].Message.Content, nil
		}

		lastErr = err
		// 429 限流 和 5xx 服务端错误才重试
		apiErr, ok := err.(*openai.APIError)
		if ok && (apiErr.HTTPStatusCode == 429 || apiErr.HTTPStatusCode >= 500) {
			continue
		}
		// 其他错误(如 401 鉴权失败)直接返回
		return "", err
	}
	return "", fmt.Errorf("超出最大重试次数: %w", lastErr)
}

func main() {
	client := newClientWithTimeout(90 * time.Second)
	result, err := chatWithRetry(client, "deepseek/deepseek-r1", "写一个归并排序", 3)
	if err != nil {
		fmt.Fprintln(os.Stderr, err)
		os.Exit(1)
	}
	fmt.Println(result)
}

最佳实践总结(Go):

  1. 通过 config.HTTPClient 自定义底层 HTTP 客户端,统一设置超时
  2. context.WithTimeout 给每次请求加超时上下文,防止 goroutine 泄漏
  3. 类型断言 *openai.APIError 拿到 HTTP 状态码,区分可重试和不可重试错误
  4. API Key 从环境变量读取,不要写进代码或配置文件里提交 git

环境变量管理最佳实践

无论哪种语言,API Key 管理遵循以下原则:

场景推荐方案
本地开发.env 文件 + .gitignore 忽略
CI/CDGitHub Actions Secrets / GitLab CI Variables
容器化部署K8s Secret 挂载为环境变量
云函数平台提供的密钥管理服务(AWS SSM / 阿里云 KMS)

永远不要:

  • 把 API Key 硬编码在源码里
  • 把包含 Key 的 .env 文件提交到 git
  • 在日志里打印完整的 API Key(可以只打印前 8 位用于调试)

超时与重试参数参考

参数推荐值说明
连接超时10s建立 TCP 连接的最长等待时间
请求超时60-120s流式场景建议 120s,非流式 60s
最大重试次数3 次仅对 429/5xx 重试
重试间隔指数退避,初始 1s避免雪崩效应

小结

三种语言的接入方式高度一致,核心改动只有两处:

  1. base_url 指向 https://api.therouter.ai/v1
  2. api_key 使用你的 TheRouter API Key

其余代码——包括流式输出、错误处理、重试逻辑——都可以直接复用你已有的 OpenAI SDK 代码。模型 ID 换成 brand/model-name 格式,就能随时切换到任意模型,不需要修改任何业务逻辑。

获取 API Key:therouter.ai

转载自CSDN-专业IT技术社区

原文链接:https://blog.csdn.net/cmzznet/article/details/159562969

评论

赞0

评论列表

微信小程序
QQ小程序

关于作者

点赞数:0
关注数:0
粉丝:0
文章:0
关注标签:0
加入于:--