首页 / AI Agent开发 / Agent 记忆系统架构实战:从向量数据库到知识图谱的混合记忆方案设计与实现 1 次阅读
Agent 记忆系统架构实战:从向量数据库到知识图谱的混合记忆方案设计与实现
AI Agent 开发

Agent 记忆系统架构实战:从向量数据库到知识图谱的混合记忆方案设计与实现

单一向量检索已无法满足复杂 Agent 需求。本教程带你从零构建混合记忆架构:向量存储模糊语义、图谱保留因果链条、混合路由实现精准召回。

📅 2026 年 3 月 3 日 ⏱ 阅读约 18 分钟 💻 实战代码

为什么你的 Agent 总是"失忆"?

2026 年的 Agent 系统普遍存在三大记忆缺陷:

  • 碎片化存储 — 对话历史、工具调用、用户偏好分散在不同位置
  • 关系丢失 — 向量检索只能找到"相似"内容,无法追溯因果链条
  • 长期记忆失效 — 超过 100K tokens 的上下文让模型迷失在噪声中

本教程将构建一个混合记忆架构:向量数据库处理模糊语义检索,知识图谱保留实体关系与因果链条,混合路由器根据查询类型自动选择检索策略。

环境准备与工具栈

🐍
Python 3.11+
主编程语言,Async 支持
🗃️
Qdrant / Chroma
向量数据库,存储 Embedding
🕸️
Neo4j
知识图谱,存储关系网络
🔄
LangChain Memory
记忆管理抽象层
📊
text-embedding-3-large
OpenAI 高维 Embedding 模型

💡 核心思想:向量检索擅长"找相似",图谱遍历擅长"找关系"。混合架构让 Agent 同时拥有模糊匹配能力与逻辑推理能力。

步骤 1:设计混合记忆存储模型

01

定义记忆数据类型与存储策略

记忆系统需要处理三类数据:短期对话(Session)、长期事实(Facts)、关系网络(Relations)。每类数据对应不同的存储策略:

混合记忆架构图:向量库存储对话与事实,图谱存储实体关系,混合路由器统一调度
from pydantic import BaseModel, Field
from typing import List, Optional
from enum import Enum

class MemoryType(str, Enum):
    SHORT_TERM = "short_term"      # 会话级记忆(向量存储)
    LONG_TERM = "long_term"        # 持久化事实(向量 + 图谱)
    RELATION = "relation"          # 实体关系(仅图谱)

class Memory(BaseModel):
    id: str
    type: MemoryType
    content: str
    embedding: Optional[List[float]] = None
    entities: List[str] = Field(default_factory=list)  # 提取的实体
    timestamp: float
    ttl_hours: Optional[int] = None  # None = 永久

⚠️ 注意:向量存储适合全文检索,但无法表达"A 是 B 的上级"这类关系。必须在写入时同步提取实体并构建图谱关系。

步骤 2:搭建向量数据库层

02

初始化 Qdrant 客户端与 Collection

使用 Qdrant 作为向量存储,支持 HNSW 索引与 Payload 过滤:

Qdrant 向量数据库架构图:Collection 分区、Payload 过滤、HNSW 索引流程
from qdrant_client import QdrantClient
from qdrant_client.models import (
    Distance, VectorParams, PointStruct,
    Filter, FieldCondition, MatchValue
)

# 初始化客户端
client = QdrantClient(host="localhost", port=6333)

# 创建 Collection(1536 维 = text-embedding-3-large)
COLLECTION_NAME = "agent_memory"
client.create_collection(
    collection_name=COLLECTION_NAME,
    vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
)

# 创建 Payload 索引(用于按类型过滤)
client.create_payload_index(
    collection_name=COLLECTION_NAME,
    field_name="memory_type",
    field_schema="keyword"
)

写入记忆时需同时存储向量与 Payload 元数据:

from openai import OpenAI

openai_client = OpenAI()

def embed(text: str) -> List[float]:
    """生成 1536 维 Embedding"""
    response = openai_client.embeddings.create(
        model="text-embedding-3-large",
        input=text
    )
    return response.data[0].embedding

def store_memory(memory: Memory):
    # 生成向量
    if memory.embedding is None:
        memory.embedding = embed(memory.content)

    # 写入 Qdrant
    client.upsert(
        collection_name=COLLECTION_NAME,
        points=[
            PointStruct(
                id=hash(memory.id) % (2**63),
                vector=memory.embedding,
                payload=memory.model_dump()
            )
        ]
    )

步骤 3:构建知识图谱层

03

Neo4j 图谱设计与实体关系写入

使用 Neo4j 存储实体及其关系。核心设计:

  • 节点(Node):Entity(带 name、type 属性)
  • 关系(Relationship):RELATED_TO、PART_OF、CAUSES、USES 等
Neo4j 知识图谱示例:用户 - 使用 → MCP 服务器 - 部署于 → Cloudflare Workers
from neo4j import GraphDatabase

class GraphMemory:
    def __init__(self, uri: str, user: str, password: str):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))

    def close(self):
        self.driver.close()

    def add_entity(self, name: str, type: str):
        """添加实体节点"""
        with self.driver.session() as session:
            session.run(
                "MERGE (e:Entity {name: $name}) SET e.type = $type",
                name=name, type=type
            )

    def add_relation(self, from_entity: str, to_entity: str, rel_type: str):
        """添加实体关系"""
        with self.driver.session() as session:
            session.run(
                f"""
                MATCH (a:Entity {{name: $from}})
                MATCH (b:Entity {{name: $to}})
                MERGE (a)-[:{rel_type}]->(b)
                """,
                from=from_entity, to=to_entity
            )

从对话中提取实体与关系的简单实现(实际项目可用 LLM 提取):

import re

def extract_entities(text: str) -> List[str]:
    """简单规则提取:大写字母开头的词、技术术语"""
    # 简化示例:提取引号内的内容
    return re.findall(r'"([^"]+)"', text)

def build_graph_from_memory(memory: Memory, graph: GraphMemory):
    """将记忆中的实体写入图谱"""
    entities = extract_entities(memory.content)

    # 添加实体
    for entity in entities:
        graph.add_entity(entity, "concept")

    # 简单关系:如果两个实体在同一段记忆中出现,建立弱相关关系
    for i, e1 in enumerate(entities):
        for e2 in entities[i+1:]:
            graph.add_relation(e1, e2, "RELATED_TO")

💡 进阶技巧:使用 LLM 提取结构化实体关系,如:"MCP 服务器部署在 Cloudflare Workers 上"MCP 服务器 -[DEPLOYED_ON]→ Cloudflare Workers

步骤 4:实现混合路由器

04

查询分类与双路检索策略

混合路由器的核心逻辑:根据查询类型决定检索策略。

混合路由器流程图:用户查询 → 意图分类 → 向量检索/图谱遍历/双路合并
from typing import Literal

class QueryType(str, Enum):
    FACTUAL = "factual"         # 事实型:图谱优先
    SEMANTIC = "semantic"       # 语义型:向量优先
    RELATIONAL = "relational"   # 关系型:仅图谱

def classify_query(query: str) -> QueryType:
    """简单规则分类(生产环境用 LLM 分类器)"""
    query_lower = query.lower()

    # 关系型关键词
    if any(w in query_lower for w in ["关系", "连接", "属于", "依赖"]):
        return QueryType.RELATIONAL

    # 事实型关键词
    if any(w in query_lower for w in ["什么是", "定义", "原理", "架构"]):
        return QueryType.FACTUAL

    return QueryType.SEMANTIC

class HybridRouter:
    def __init__(self, vector_client: QdrantClient, graph: GraphMemory):
        self.vector_client = vector_client
        self.graph = graph

    def search(self, query: str, top_k: int = 5) -> List[dict]:
        query_type = classify_query(query)

        if query_type == QueryType.RELATIONAL:
            return self.graph_search(query)
        elif query_type == QueryType.FACTUAL:
            # 双路检索:向量 + 图谱
            vector_results = self.vector_search(query, top_k // 2)
            graph_results = self.graph_search(query)
            return self.merge_results(vector_results, graph_results)
        else:
            return self.vector_search(query, top_k)

向量检索实现:

def vector_search(self, query: str, top_k: int) -> List[dict]:
    query_embedding = embed(query)

    results = self.vector_client.search(
        collection_name=COLLECTION_NAME,
        query_vector=query_embedding,
        limit=top_k
    )

    return [
        {"source": "vector", "score": r.score, "content": r.payload["content"]}
        for r in results
    ]

图谱检索实现(遍历相关实体):

def graph_search(self, query: str) -> List[dict]:
    """从图谱中检索相关实体与关系"""
    # 简单实现:提取查询中的实体,查找其关联
    entities = extract_entities(query)

    if not entities:
        return []

    with self.graph.driver.session() as session:
        cypher = """
        MATCH (e:Entity)-[r]-(related:Entity)
        WHERE e.name IN $entities
        RETURN e.name, type(r) as rel_type, related.name as related_name
        LIMIT 20
        """
        result = session.run(cypher, entities=entities)

        return [
            {
                "source": "graph",
                "content": f"{record['e.name']} -[{record['rel_type']}]-> {record['related_name']}"
            }
            for record in result
        ]

步骤 5:实现记忆压缩与 TTL 管理

05

长期记忆压缩与过期清理

短期记忆会随时间膨胀,需要定期压缩并转移到长期存储:

记忆压缩流程:会话记忆 → 提取关键事实 → 压缩摘要 → 长期存储
from openai import OpenAI

class MemoryCompressor:
    def __init__(self):
        self.llm = OpenAI()

    def compress_session(self, memories: List[Memory]) -> Memory:
        """压缩多段会话记忆为摘要"""
        content = "\n".join(m.content for m in memories)

        response = self.llm.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": "将以下对话压缩为 3-5 个关键事实,每句一行。"},
                {"role": "user", "content: content}
            ]
        )

        return Memory(
            id=f"summary_{memories[0].id}",
            type=MemoryType.LONG_TERM,
            content=response.choices[0].message.content,
            timestamp=time.time(),
            ttl_hours=None  # 永久保存
        )

    def cleanup_expired(self):
        """清理过期记忆"""
        cutoff = time.time() - (24 * 3600)  # 24 小时前

        expired = client.scroll(
            collection_name=COLLECTION_NAME,
            scroll_filter=Filter(
                must=[FieldCondition(key="ttl_hours", match=MatchValue(value=24))]
            )
        )

        for point in expired:
            if point.payload["timestamp"] < cutoff:
                client.delete(
                    collection_name=COLLECTION_NAME,
                    points_selector=[point.id]
                )

步骤 6:集成到 Agent 运行时

06

LangChain Memory 集成示例

将混合记忆系统集成为 LangChain 的 BaseMemory 接口:

LangChain 集成架构:Agent → HybridMemory → 向量库 + 图谱
from langchain.memory import BaseMemory
from typing import Dict, List

class HybridMemory(BaseMemory):
    router: HybridRouter
    compressor: MemoryCompressor
    session_memories: List[Memory] = []

    class Config:
        arbitrary_types_allowed = True

    def save_context(self, inputs: Dict, outputs: Dict):
        """保存对话上下文"""
        user_input = inputs.get("input", "")
        agent_output = outputs.get("output", "")

        # 写入短期记忆
        self.session_memories.extend([
            Memory(
                id=f"session_{len(self.session_memories)}",
                type=MemoryType.SHORT_TERM,
                content=user_input,
                timestamp=time.time(),
                ttl_hours=24
            ),
            Memory(
                id=f"session_{len(self.session_memories)+1}",
                type=MemoryType.SHORT_TERM,
                content=agent_output,
                timestamp=time.time(),
                ttl_hours=24
            )
        ])

        # 每 10 轮对话压缩一次
        if len(self.session_memories) >= 20:
            compressed = self.compressor.compress_session(self.session_memories)
            store_memory(compressed)
            self.session_memories = []

    def load_memory_variables(self, inputs: Dict) -> Dict:
        """加载相关记忆"""
        query = inputs.get("input", "")
        results = self.router.search(query, top_k=5)

        return {
            "history": "\n".join(r["content"] for r in results)
        }

使用示例:

from langchain.agents import AgentExecutor, create_openai_tools_agent

# 初始化混合记忆
hybrid_memory = HybridMemory(
    router=HybridRouter(client, graph),
    compressor=MemoryCompressor()
)

# 创建 Agent
agent = create_openai_tools_agent(llm, tools, prompt)
executor = AgentExecutor(
    agent=agent,
    tools=tools,
    memory=hybrid_memory,  # 注入记忆
    verbose=True
)

# 运行
response = executor.invoke({"input": "我之前部署的 MCP 服务器在哪里?"})
print(response["output"])

常见问题 FAQ

向量数据库和图谱的存储成本对比如何?
Qdrant 存储 1536 维向量每条约 6KB(含 Payload),Neo4j 节点 + 关系约 1KB。对于 10 万条记忆,混合架构总存储约 500MB–800MB,成本可接受。
如何避免图谱中产生过多冗余关系?
设置关系阈值:同一对实体之间的相同关系只保留一条。定期运行图谱清理任务,合并弱相关关系(如 A-RELATED_TO-B 且 B-RELATED_TO-C 且时间相近 → A-RELATED_TO-C)。
生产环境中如何扩展这套架构?
向量库使用 Qdrant Cloud 或自建集群(分片 + 副本);图谱使用 Neo4j Aura 或搭建因果集群;路由器添加缓存层(Redis)加速高频查询。
是否可以用其他向量数据库替代 Qdrant?
可以。Chroma(轻量)、Weaviate(自带分类器)、Pinecone(托管服务)都支持类似 API。只需修改 store_memory 和 vector_search 的实现。

总结

  • 向量存储处理模糊语义检索,图谱存储处理关系推理
  • 混合路由器根据查询类型自动选择检索策略
  • 记忆压缩防止短期记忆无限膨胀,TTL 管理自动清理过期数据
  • LangChain 集成让现有 Agent 无缝获得混合记忆能力
AI Agent 记忆系统 向量数据库 知识图谱 RAG Neo4j Qdrant
选择栏目
今日简报 播客电台 实战教程 AI挣钱计划 关于我
栏目
全球AI日报国内AI日报全球金融日报国内金融日报全球大新闻日报国内大新闻日报Claude Code 玩法日报OpenClaw 动态日报GitHub 热门项目日报AI工具实战AI应用开发编程实战工作流自动化AI原理图解AI Agent开发AI变现案例库AI工具创收AI内容变现AI接单提效变现前沿研究
我的收藏
播客版
0:00
--:--