首页 / 国内AI日报 / DeepSeek API 流式调用与长上下文处理实战 1 次阅读
DeepSeek API 流式调用与长上下文处理实战 | 国内AI日报
国内AI日报 · 实战教程 中级

DeepSeek API 流式调用
与长上下文处理实战

从零掌握 DeepSeek API 的流式传输与 128K token 长上下文能力,构建响应式、高效的生产级 AI 应用

📅 2026年2月28日 ⏱ 预计阅读 25 分钟 🐍 Python 3.11+ 💡 中级难度

DeepSeek API 兼容 OpenAI 接口格式,支持最高 128K token 的超长上下文窗口与流式响应(Server-Sent Events)。本教程将带你从环境搭建到生产级错误处理,逐步掌握流式调用、长文档处理与上下文缓存优化三大核心能力,并附带完整可运行的代码示例。

一、核心概念与系统架构

DeepSeek API 系统架构图

在深入代码之前,理解 DeepSeek API 的架构设计有助于我们更好地利用其能力。DeepSeek V3.2 基于 Mixture-of-Experts(MoE)架构,提供两个核心模型:

  • deepseek-chat:DeepSeek-V3.2 的标准模式,适合通用对话、代码生成、内容创作、知识问答
  • deepseek-reasoner:DeepSeek-V3.2 的深度推理模式,内置 Chain-of-Thought 思维链,适合数学推理、复杂逻辑分析

流式调用原理(SSE)

流式调用(Streaming)使用 Server-Sent Events(SSE) 协议。服务端将生成的 token 分成多个数据块(chunk)逐步推送,而不是等全部内容生成完毕后一次性返回。这带来两个核心优势:

  • 用户看到第一个字的感知延迟从 5–30 秒降低到 < 1 秒(Time to First Token)
  • 可以随时中断生成(用户点击 Stop),避免浪费不需要的 token

128K 长上下文窗口的价值

128K token 约等于 10 万汉字(或 9.6 万英文单词),让以下场景得以实现:

  • 一次性输入整本技术手册或完整的源代码仓库进行问答
  • 多轮对话保留数百轮的历史记忆
  • 分析完整的财报、法律合同或学术论文
  • Few-shot 学习时提供大量示例,显著提升输出质量
ℹ️
DeepSeek API Base URL 为 https://api.deepseek.com(也可使用 https://api.deepseek.com/v1,两者等价),完全兼容 OpenAI SDK,只需替换 api_keybase_url 即可从 OpenAI 迁移。

二、环境搭建与配置

环境搭建流程图
1
注册并获取 API Key

访问 platform.deepseek.com,注册账号后在左侧菜单 "API Keys" 中创建密钥。妥善保存,不要提交到 Git 仓库,不要在前端代码中暴露。

2
创建项目并安装依赖

推荐使用 Python 3.11+ 和虚拟环境隔离依赖:

🐚 bash
python3 -m venv venv
source venv/bin/activate   # Windows: venv\Scripts\activate

pip install openai httpx python-dotenv tiktoken

各依赖作用:

  • openai:官方 SDK,DeepSeek 完全兼容其接口格式
  • httpx:异步 HTTP 客户端,用于高并发流式场景
  • python-dotenv:从 .env 文件安全加载环境变量
  • tiktoken:OpenAI 的 token 计数工具,可估算 DeepSeek token 用量
3
配置环境变量

创建 .env 文件,并确保加入 .gitignore

📄 .env
DEEPSEEK_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
📄 .gitignore
.env
venv/
__pycache__/
*.pyc
4
验证环境配置

运行以下脚本,确认 API Key 有效、网络连通:

📄 verify.py
import os
from openai import OpenAI
from dotenv import load_dotenv

load_dotenv()

client = OpenAI(
    api_key=os.getenv("DEEPSEEK_API_KEY"),
    base_url="https://api.deepseek.com"
)

try:
    response = client.chat.completions.create(
        model="deepseek-chat",
        messages=[{"role": "user", "content": "Hi, reply with OK"}],
        max_tokens=10,
        stream=False
    )
    print("✅ 连接成功:", response.choices[0].message.content)
except Exception as e:
    print("❌ 连接失败:", e)
⚠️
安全警告:绝对不要将 API Key 硬编码在源代码中,不要上传到 GitHub。生产环境请使用 AWS Secrets Manager、HashiCorp Vault 或云平台的密钥管理服务(如阿里云 KMS)。

三、基础调用:同步非流式请求

基础 API 调用流程

先从最简单的同步非流式调用入手,理解请求结构与响应格式,为后续流式调用打好基础。

📄 basic_call.py
import os
from openai import OpenAI
from dotenv import load_dotenv

load_dotenv()

# 初始化客户端(全局复用,不要在每次请求时新建)
client = OpenAI(
    api_key=os.getenv("DEEPSEEK_API_KEY"),
    base_url="https://api.deepseek.com"
)

def chat(user_message: str, system_prompt: str = "你是一名专业助手。") -> str:
    """单轮对话,返回完整回复文本"""
    response = client.chat.completions.create(
        model="deepseek-chat",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user",   "content": user_message}
        ],
        temperature=0.7,   # 0=确定性高,1=创意性高
        max_tokens=2048,   # 限制输出长度,防止意外高费用
        stream=False
    )
    return response.choices[0].message.content

# 基本使用
result = chat("用两句话解释什么是 Transformer 架构")
print(result)

# ---- 查看详细 token 用量 ----
response = client.chat.completions.create(
    model="deepseek-chat",
    messages=[{"role": "user", "content": "Hello"}],
    stream=False
)

usage = response.usage
print(f"\n📊 Token 用量:")
print(f"  输入 tokens:  {usage.prompt_tokens:,}")
print(f"  输出 tokens:  {usage.completion_tokens:,}")
print(f"  总计 tokens:  {usage.total_tokens:,}")
print(f"  结束原因:     {response.choices[0].finish_reason}")
# finish_reason: stop=正常结束, length=达到 max_tokens, content_filter=内容过滤

多轮对话基础结构

DeepSeek API 的多轮对话通过在 messages 数组中追加历史记录来实现:

📄 multi_turn.py
def multi_turn_demo():
    messages = [
        {"role": "system", "content": "你是一名资深 Python 工程师。"}
    ]

    conversations = [
        "什么是 Python 的 GIL?",
        "它对多线程性能有什么影响?",
        "如何绕过 GIL 实现真正的并行?"
    ]

    for user_input in conversations:
        # 添加用户消息
        messages.append({"role": "user", "content": user_input})

        # 调用 API
        response = client.chat.completions.create(
            model="deepseek-chat",
            messages=messages,
            temperature=0.6,
            stream=False
        )

        assistant_reply = response.choices[0].message.content

        # 将 AI 回复也加入历史(关键!保持上下文连贯)
        messages.append({"role": "assistant", "content": assistant_reply})

        print(f"Q: {user_input}")
        print(f"A: {assistant_reply[:100]}...\n")

multi_turn_demo()
💡
参数建议:对话/问答 temperature=0.6–0.7;代码生成 temperature=0.1–0.3(减少幻觉);创意写作 temperature=0.8–1.0。始终设置 max_tokens 上限,避免失控的长输出。

四、流式调用实战(三种方式)

流式调用数据流架构图

流式调用是构建生产级 AI 应用的关键。以下提供三种实现方式,覆盖从简单脚本到高并发 Web 服务的不同场景。

方式一:OpenAI SDK(推荐首选)

📄 stream_sdk.py
import os
from openai import OpenAI
from dotenv import load_dotenv

load_dotenv()

client = OpenAI(
    api_key=os.getenv("DEEPSEEK_API_KEY"),
    base_url="https://api.deepseek.com"
)

def stream_chat(user_message: str, system_prompt: str = "你是一名资深工程师。") -> str:
    """流式输出,实时打印内容,返回完整文本"""
    full_response = []

    # 使用 with 语句确保流被正确关闭
    with client.chat.completions.create(
        model="deepseek-chat",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user",   "content": user_message}
        ],
        temperature=0.7,
        stream=True
    ) as stream:
        for chunk in stream:
            delta = chunk.choices[0].delta

            # 内容增量
            if delta.content:
                print(delta.content, end="", flush=True)
                full_response.append(delta.content)

            # 检查结束原因
            finish_reason = chunk.choices[0].finish_reason
            if finish_reason == "length":
                print("\n⚠️ [输出被 max_tokens 截断]")
            elif finish_reason == "content_filter":
                print("\n⚠️ [内容被过滤]")

    print()  # 换行
    return "".join(full_response)

# 使用示例
result = stream_chat("写一个 Python 装饰器,统计函数执行时间,支持异步函数")
print(f"\n📝 总字符数: {len(result)}")

方式二:requests 库(原始 SSE 控制)

当需要更底层的控制(自定义代理、请求头、精细超时)时,直接处理 SSE 流:

📄 stream_raw.py
import os
import json
import requests
from dotenv import load_dotenv

load_dotenv()

API_KEY  = os.getenv("DEEPSEEK_API_KEY")
BASE_URL = "https://api.deepseek.com/v1/chat/completions"

def stream_chat_raw(messages: list, temperature: float = 0.7) -> str:
    """使用 requests 直接处理 SSE 协议"""
    payload = {
        "model": "deepseek-chat",
        "messages": messages,
        "temperature": temperature,
        "stream": True
    }
    headers = {
        "Authorization":  f"Bearer {API_KEY}",
        "Content-Type":   "application/json",
        "Accept":         "text/event-stream"
    }

    full_text = []

    with requests.post(
        BASE_URL,
        headers=headers,
        json=payload,
        stream=True,
        timeout=60          # 连接+读取超时 60 秒
    ) as response:
        response.raise_for_status()  # 非 2xx 抛出异常

        for line in response.iter_lines(decode_unicode=True):
            if not line:
                continue

            # SSE 格式:每行以 "data: " 开头
            if not line.startswith("data: "):
                continue

            data = line[6:]             # 去掉 "data: " 前缀

            if data == "[DONE]":        # 流结束标记
                break

            try:
                obj     = json.loads(data)
                content = obj["choices"][0]["delta"].get("content", "")
                if content:
                    print(content, end="", flush=True)
                    full_text.append(content)
            except (json.JSONDecodeError, KeyError):
                pass                    # 跳过格式异常的行

    print()
    return "".join(full_text)

# 多轮对话示例
messages = [
    {"role": "system", "content": "你是一名架构师,回答简洁有深度。"},
    {"role": "user",   "content": "请解释 CAP 定理,举一个实际系统的例子"}
]
stream_chat_raw(messages)

方式三:AsyncOpenAI(高并发 Web 服务)

在 FastAPI / aiohttp 等异步 Web 框架中,使用异步客户端可以同时处理大量并发请求:

📄 stream_async.py
import os
import asyncio
from openai import AsyncOpenAI
from dotenv import load_dotenv

load_dotenv()

async_client = AsyncOpenAI(
    api_key=os.getenv("DEEPSEEK_API_KEY"),
    base_url="https://api.deepseek.com"
)

async def async_stream_chat(user_message: str, session_id: str = "") -> str:
    """异步流式调用,适合高并发场景"""
    full_response = []
    prefix = f"[{session_id}] " if session_id else ""

    async with async_client.chat.completions.create(
        model="deepseek-chat",
        messages=[{"role": "user", "content": user_message}],
        temperature=0.7,
        stream=True
    ) as stream:
        async for chunk in stream:
            delta = chunk.choices[0].delta
            if delta.content:
                print(f"{prefix}{delta.content}", end="", flush=True)
                full_response.append(delta.content)

    print()
    return "".join(full_response)

async def handle_concurrent_users():
    """同时处理三个用户请求(真正并发,非顺序)"""
    tasks = [
        async_stream_chat("什么是微服务架构的核心原则?",    "user-1"),
        async_stream_chat("Kubernetes Pod 和 Node 的关系?", "user-2"),
        async_stream_chat("Python asyncio 的 Event Loop?",  "user-3"),
    ]
    results = await asyncio.gather(*tasks)
    return results

if __name__ == "__main__":
    asyncio.run(handle_concurrent_users())
💡
三种方式对比:
  • SDK(OpenAI):代码最简洁,适合 80% 的场景,推荐优先使用
  • 原始 requests:控制最精细,适合需要定制代理/请求头的场景
  • AsyncOpenAI:吞吐量最高(5–10×),适合 Web 服务中的高并发处理

五、长上下文处理实战

长上下文处理要点信息图

128K token 的上下文窗口为长文档处理开辟了新可能。以下是两个典型的生产级场景实现。

场景一:大型文档问答系统

📄 long_doc_qa.py
import os
import tiktoken
from openai import OpenAI
from pathlib import Path
from dotenv import load_dotenv

load_dotenv()

client = OpenAI(
    api_key=os.getenv("DEEPSEEK_API_KEY"),
    base_url="https://api.deepseek.com"
)

def estimate_tokens(text: str) -> int:
    """估算 token 数(DeepSeek 与 GPT-4 编码相近)"""
    try:
        enc = tiktoken.encoding_for_model("gpt-4")
        return len(enc.encode(text))
    except Exception:
        # 降级:中文约 1.5字/token,英文约 4字/token
        return max(len(text) // 2, len(text.encode("utf-8")) // 4)

def analyze_document(doc_path: str, question: str) -> str:
    """分析文档并回答问题(支持大文档)"""
    doc_content = Path(doc_path).read_text(encoding="utf-8")

    doc_tokens = estimate_tokens(doc_content)
    print(f"📄 文档大小: {len(doc_content):,} 字符 / 约 {doc_tokens:,} tokens")

    # 128K 上下文,预留 4096 给输出,留 1000 给 system prompt 和问题
    MAX_DOC_TOKENS = 122000
    if doc_tokens > MAX_DOC_TOKENS:
        print(f"⚠️  文档超出限制,截取前 {MAX_DOC_TOKENS:,} tokens")
        # 按字符比例截断(粗略估算)
        ratio = MAX_DOC_TOKENS / doc_tokens
        doc_content = doc_content[:int(len(doc_content) * ratio)]

    print(f"🔍 正在分析文档,回答:{question}")

    # 流式输出答案
    answer_parts = []
    with client.chat.completions.create(
        model="deepseek-chat",
        messages=[
            {
                "role": "system",
                "content": (
                    "你是专业的文档分析专家。"
                    "基于用户提供的文档内容,准确、简洁地回答问题。"
                    "如果文档中没有相关信息,明确说明而非猜测。"
                    "回答时引用文档中的具体内容支持你的观点。"
                )
            },
            {
                "role": "user",
                "content": f"## 文档内容\n\n{doc_content}\n\n## 问题\n\n{question}"
            }
        ],
        temperature=0.3,    # 文档问答用低温度,减少幻觉
        max_tokens=4096,
        stream=True
    ) as stream:
        for chunk in stream:
            if chunk.choices[0].delta.content:
                content = chunk.choices[0].delta.content
                print(content, end="", flush=True)
                answer_parts.append(content)

    print()
    return "".join(answer_parts)

# 使用示例(需要提供实际文件)
# answer = analyze_document("annual_report_2025.txt", "总结2025年的主要财务指标和增长亮点")

场景二:长对话记忆管理器

在长对话中,历史消息会快速消耗上下文窗口。需要实现智能的历史管理策略:

📄 conversation_manager.py
import os
from openai import OpenAI
from dotenv import load_dotenv
from collections import deque

load_dotenv()

client = OpenAI(
    api_key=os.getenv("DEEPSEEK_API_KEY"),
    base_url="https://api.deepseek.com"
)

class ConversationManager:
    """智能多轮对话管理器,自动控制上下文长度"""

    def __init__(
        self,
        system_prompt: str,
        max_history_turns: int = 20,    # 最多保留 N 轮对话
        model: str = "deepseek-chat"
    ):
        self.system_prompt  = system_prompt
        self.model          = model
        # deque 自动丢弃最旧的消息(每轮 = 1条user + 1条assistant)
        self.history        = deque(maxlen=max_history_turns * 2)

    def chat(self, user_message: str) -> str:
        """发送消息,流式输出,并维护对话历史"""
        self.history.append({"role": "user", "content": user_message})

        messages = [{"role": "system", "content": self.system_prompt}] \
                 + list(self.history)

        response_parts = []
        with client.chat.completions.create(
            model=self.model,
            messages=messages,
            temperature=0.7,
            stream=True
        ) as stream:
            for chunk in stream:
                if chunk.choices[0].delta.content:
                    content = chunk.choices[0].delta.content
                    print(content, end="", flush=True)
                    response_parts.append(content)

        print()
        assistant_message = "".join(response_parts)
        self.history.append({"role": "assistant", "content": assistant_message})
        return assistant_message

    def clear(self):
        """清空对话历史"""
        self.history.clear()
        print("🗑️  对话历史已清空")

    @property
    def turn_count(self) -> int:
        return len(self.history) // 2

# 交互式对话
manager = ConversationManager(
    system_prompt="你是一名资深系统架构师,专注分布式系统和云原生技术。",
    max_history_turns=15
)

print("💬 开始对话(输入 'quit' 退出,'clear' 清空历史)\n")
while True:
    user_input = input(f"[第{manager.turn_count + 1}轮] You: ").strip()
    if not user_input:
        continue
    if user_input.lower() == "quit":
        break
    if user_input.lower() == "clear":
        manager.clear()
    else:
        manager.chat(user_input)

六、上下文缓存优化(大幅降低成本)

有缓存与无缓存成本对比图

DeepSeek API 默认启用磁盘 KV 缓存。当多次请求共享相同的前缀(如相同的 system prompt 或文档内容),重复部分仅计 10% 的输入费用(缓存命中价)。对长上下文应用,这意味着超过 80% 的成本节省。

缓存触发规则

  • 64 tokens 为最小缓存单元,少于 64 tokens 不缓存
  • 只缓存前缀部分:两次请求共同的开头才算命中
  • 缓存自动管理,无需修改代码,默认对所有用户启用
  • 缓存通常数小时到数天内有效,无活动后自动清除
📄 cache_demo.py
import os
import time
from openai import OpenAI
from dotenv import load_dotenv

load_dotenv()

client = OpenAI(
    api_key=os.getenv("DEEPSEEK_API_KEY"),
    base_url="https://api.deepseek.com"
)

# 大型固定 System Prompt(会被缓存复用)
# 实际场景中,这里可以放大型文档、Few-shot 示例、详细指令等
LARGE_SYSTEM_PROMPT = """你是一名资深 Python 架构师,精通以下领域:
1. 分布式系统设计(微服务、事件驱动架构、CQRS)
2. 高性能 Python(asyncio、多进程、NumPy/Cython 加速)
3. 数据库优化(PostgreSQL 查询优化、Redis 缓存策略、Elasticsearch)
4. 云原生技术(Kubernetes、Docker、Terraform、ArgoCD)
5. 安全最佳实践(OWASP Top 10、零信任架构、OAuth 2.0/OIDC)
6. 可观测性(Prometheus、Grafana、分布式链路追踪 OpenTelemetry)

在回答时请:
- 分析问题的核心约束和权衡
- 提供可运行的代码示例
- 明确指出潜在陷阱和最佳实践
- 从性能、安全、可维护性三个维度给出建议""" * 4   # 扩充至超过 64 tokens

def chat_with_cache_stats(question: str) -> dict:
    """发送请求并追踪缓存命中情况"""
    t0 = time.time()

    response = client.chat.completions.create(
        model="deepseek-chat",
        messages=[
            {"role": "system", "content": LARGE_SYSTEM_PROMPT},
            {"role": "user",   "content": question}
        ],
        temperature=0.5,
        stream=False
    )

    elapsed = time.time() - t0
    usage   = response.usage

    # 从 usage 中提取缓存命中 tokens(DeepSeek 专有字段)
    cache_hit   = getattr(usage, "prompt_cache_hit_tokens",  0)
    cache_miss  = getattr(usage, "prompt_cache_miss_tokens", usage.prompt_tokens)
    hit_rate    = cache_hit / usage.prompt_tokens if usage.prompt_tokens > 0 else 0

    print(f"\n⏱  耗时:      {elapsed:.2f}s")
    print(f"📊 输入/输出: {usage.prompt_tokens} / {usage.completion_tokens} tokens")
    print(f"🎯 缓存命中:  {cache_hit:,} tokens ({hit_rate:.1%})")
    print(f"💰 成本节省:  约 {hit_rate * 0.9:.1%}(命中部分仅计 10%)")

    return {
        "answer":    response.choices[0].message.content,
        "hit_rate":  hit_rate,
        "elapsed":   elapsed,
    }

# 第一次请求(建立缓存,冷启动)
print("=== 第一次请求(建立缓存)===")
r1 = chat_with_cache_stats("如何设计一个高可用的分布式锁?")

# 第二次请求(同一 system prompt 前缀,命中缓存)
print("\n=== 第二次请求(预期命中缓存)===")
r2 = chat_with_cache_stats("Python asyncio 的 Event Loop 原理是什么?")

最大化缓存命中的消息结构

📄 cache_best_practice.py
# ✅ 正确:固定内容在前,变化内容在后
messages = [
    # 1. System prompt(固定,每次请求相同)→ 会被缓存
    {"role": "system", "content": LARGE_SYSTEM_PROMPT},

    # 2. Few-shot 示例(固定)→ 会被缓存
    {"role": "user",      "content": "示例问题 1"},
    {"role": "assistant", "content": "示例答案 1"},
    {"role": "user",      "content": "示例问题 2"},
    {"role": "assistant", "content": "示例答案 2"},

    # 3. 实际用户问题(每次变化)→ 仅这部分不命中缓存
    {"role": "user", "content": user_question}  # ← 唯一变化的部分
]

# ❌ 错误:每次在前面插入时间戳或随机内容,破坏前缀匹配
messages_wrong = [
    {"role": "system", "content": f"当前时间: {time.time()}\n{LARGE_SYSTEM_PROMPT}"},
    # ↑ 时间戳每次变化,导致整个 system prompt 缓存失效!
]
ℹ️
缓存最佳实践:
  • 将固定内容(指令、文档、示例)始终放在 messages 列表的开头
  • 变化的用户输入放在最后
  • 不要在 system prompt 前面插入动态内容(时间戳、随机 ID 等)
  • system prompt 越长、越固定,缓存效益越显著

七、错误处理与生产级最佳实践

错误处理与最佳实践信息图

生产环境中,完善的错误处理和重试机制是服务稳定性的核心保障。

📄 production_client.py
import os
import time
import logging
from openai import OpenAI, APIError, APIConnectionError, RateLimitError
from dotenv import load_dotenv

load_dotenv()

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)


class DeepSeekClient:
    """生产级 DeepSeek 客户端:含重试、限流处理、超时控制"""

    def __init__(self):
        self.client = OpenAI(
            api_key=os.getenv("DEEPSEEK_API_KEY"),
            base_url="https://api.deepseek.com",
            timeout=60.0,       # 整体超时 60 秒
            max_retries=0       # 禁用 SDK 内置重试,使用自定义逻辑
        )
        self.model = "deepseek-chat"

    def chat(
        self,
        messages: list,
        stream: bool = True,
        temperature: float = 0.7,
        max_tokens: int = 2048,
        max_retries: int = 3
    ) -> str:
        """带指数退避重试的聊天接口"""
        last_exception = None

        for attempt in range(max_retries):
            try:
                return self._do_chat(messages, stream, temperature, max_tokens)

            except RateLimitError as e:
                # 429:触发限流,指数退避后重试
                wait = 5 * (2 ** attempt)     # 5s → 10s → 20s
                logger.warning(f"限流 (第{attempt+1}次),等待 {wait}s... | {e}")
                time.sleep(wait)
                last_exception = e

            except APIConnectionError as e:
                # 网络连接错误(DNS 失败、连接超时等)
                wait = 2 ** attempt           # 1s → 2s → 4s
                logger.warning(f"网络错误 (第{attempt+1}次),等待 {wait}s... | {e}")
                time.sleep(wait)
                last_exception = e

            except APIError as e:
                if e.status_code in (500, 502, 503, 504):
                    # 服务端临时错误,可以重试
                    wait = 2 * (2 ** attempt)
                    logger.warning(f"服务器错误 {e.status_code} (第{attempt+1}次),等待 {wait}s...")
                    time.sleep(wait)
                    last_exception = e
                elif e.status_code == 401:
                    # 认证错误:不重试,立即失败
                    logger.error("❌ API Key 无效或已过期,请检查 DEEPSEEK_API_KEY")
                    raise
                elif e.status_code == 400:
                    # 请求格式错误:不重试,立即失败
                    logger.error(f"❌ 请求格式错误 (400): {e}")
                    raise
                else:
                    logger.error(f"❌ 未知 API 错误 ({e.status_code}): {e}")
                    raise

        raise RuntimeError(
            f"请求在 {max_retries} 次重试后仍然失败: {last_exception}"
        )

    def _do_chat(self, messages, stream, temperature, max_tokens) -> str:
        """实际发起 API 请求"""
        if stream:
            parts = []
            with self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                temperature=temperature,
                max_tokens=max_tokens,
                stream=True
            ) as response_stream:
                for chunk in response_stream:
                    if chunk.choices[0].delta.content:
                        c = chunk.choices[0].delta.content
                        print(c, end="", flush=True)
                        parts.append(c)
            print()
            return "".join(parts)
        else:
            resp = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                temperature=temperature,
                max_tokens=max_tokens,
                stream=False
            )
            return resp.choices[0].message.content


# 使用示例
deepseek = DeepSeekClient()

try:
    result = deepseek.chat(
        messages=[{"role": "user", "content": "用 Python 写一个并发安全的计数器"}],
        stream=True
    )
except RuntimeError as e:
    logger.error(f"请求最终失败: {e}")
    # 根据业务逻辑:降级处理 / 返回缓存内容 / 告知用户稍后重试
💡
生产监控建议:记录每次请求的延迟(P50/P95/P99)、token 用量、错误率和缓存命中率。服务状态实时查看:status.deepseek.com。建议配置 Prometheus + Grafana 或接入云监控告警,当错误率超过阈值时及时通知。

常见问题解答

问题现象 可能原因 解决方案
收到 401 Unauthorized API Key 无效或环境变量未正确加载 打印 os.getenv("DEEPSEEK_API_KEY") 检查,或在平台重新生成 Key
收到 429 Too Many Requests 超出 RPM(每分钟请求数)或 TPM(每分钟 token 数)限制 实现指数退避重试;有需要可申请提升配额
流式输出中途断开 网络超时、防火墙中断 SSE 连接 增大 timeout;检查代理/防火墙;捕获 ConnectionError 后重连
输出被截断,不完整 max_tokens 设置过小,finish_reason == "length" 增大 max_tokens;或检测到 length 后自动续写
缓存没有命中 前缀少于 64 tokens;或前缀中含动态内容(时间戳等) 确保固定前缀足够长且内容稳定;检查 messages 顺序
context_length_exceeded 错误 输入超过 128K token 限制 tiktoken 预估 token 数,超限时截断、分块或摘要后再发送

本文要点总结

  • 兼容性:DeepSeek API 完全兼容 OpenAI SDK,替换 base_urlapi_key 即可从 GPT-4 迁移
  • 流式调用:优先使用 OpenAI SDK 的 stream=True;高并发场景使用 AsyncOpenAI 可提升 5–10× 吞吐量
  • 长上下文:128K token 窗口支持 10 万字文档;用 ConversationManager + deque 控制对话历史长度
  • 缓存优化:固定前缀自动命中磁盘缓存,重复内容仅计 10% 费用,可节省高达 80%+ 的长上下文成本
  • 错误处理:区分 401(不重试)/ 429(指数退避)/ 5xx(退避重试)三类错误,保障服务稳定性
  • 安全:API Key 存入环境变量,绝不硬编码;生产环境接入密钥管理服务
DeepSeek API 流式调用 长上下文 Python OpenAI SDK SSE KV Cache AsyncOpenAI 错误处理
选择栏目
今日简报 播客电台 实战教程 AI挣钱计划 关于我
栏目
全球AI日报国内AI日报全球金融日报国内金融日报全球大新闻日报国内大新闻日报Claude Code 玩法日报OpenClaw 动态日报GitHub 热门项目日报AI工具实战AI应用开发编程实战工作流自动化AI原理图解AI Agent开发AI变现案例库AI工具创收AI内容变现AI接单提效变现前沿研究
我的收藏