首页 / 国内大新闻日报 / 用Embedding构建新闻相似度聚类与去重系统 1 次阅读
用Embedding构建新闻相似度聚类与去重系统
国内大新闻日报 中级难度

用 Embedding 构建新闻相似度聚类与去重系统

2026年2月28日 预计阅读 20 分钟 Python · OpenAI · Scikit-learn · FAISS 完整可运行代码

每天各大平台推送的新闻条数动辄数千,其中大量是同一事件的重复报道。 传统基于关键词的去重方法容易漏掉改了标题的重复文章,而基于 Embedding 语义向量 的方案能真正理解文本含义——即便措辞完全不同,语义相近的新闻依然会被识别为重复。

本教程将带你从零搭建一套完整的新闻聚类与去重管道:用 OpenAI Embedding API 将文本转化为向量,用 KMeans 实现语义聚类,再结合余弦相似度阈值完成精准去重,最后用 t-SNE 可视化验证效果。 全程代码均可直接运行,无需机器学习背景。

你将学到什么

  • Embedding 的核心原理与向量相似度计算
  • 如何批量调用 OpenAI API 生成新闻 Embedding
  • 用 KMeans + Silhouette Score 自动确定最优聚类数
  • 基于余弦相似度的簇内语义去重策略
  • 用 FAISS 实现大规模向量检索加速
  • t-SNE 可视化聚类效果验证

一、系统架构与核心原理

系统架构概览
1

整体数据流

整个系统分四个阶段:

  1. 数据预处理:清洗原始新闻文本,去除 HTML 标签、无效字符
  2. Embedding 生成:调用 OpenAI API,将每条新闻转为 1536 维向量
  3. 语义聚类:用 KMeans 将相似新闻归入同一簇,自动发现主题
  4. 去重输出:在每个簇内计算两两余弦相似度,超过阈值则保留一条
为什么用余弦相似度而不是欧氏距离?
高维向量空间中,欧氏距离会受向量模长影响,而余弦相似度只关注方向(角度),对文本长度不敏感,更能反映语义接近程度。

Embedding 直觉理解:把每篇文章想象成宇宙中的一颗星——语义越相近,星星越靠拢。 聚类就是把星系划分出来,去重就是把同一位置重叠的星合并为一颗。

二、环境准备与依赖安装

环境配置流程
2

安装核心依赖

推荐使用 Python 3.10+。所有依赖均可通过 pip 一次性安装:

# 创建虚拟环境(推荐)
python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate

# 安装依赖
pip install openai==1.12.0 \
            scikit-learn==1.4.0 \
            numpy==1.26.3 \
            pandas==2.2.0 \
            faiss-cpu==1.7.4 \
            matplotlib==3.8.2 \
            datasketch==1.6.4 \
            python-dotenv==1.0.0 \
            tqdm==4.66.1

创建 .env 文件存储 API Key(不要把密钥写进代码):

# .env
OPENAI_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
API 费用提醒text-embedding-3-small 约 $0.02/百万 token,1000 条新闻(每条 200 字)大约消耗 $0.08,成本极低。但首次运行建议先用 20 条测试。

三、数据预处理

数据预处理流程
3

清洗与准备新闻文本

原始新闻数据往往包含 HTML 标签、多余空白、特殊符号。 这一步先做统一清洗,再截取合理长度(Embedding 模型有 token 上限)。

import re
import json
import pandas as pd
from typing import List

def clean_news_text(text: str) -> str:
    """
    清洗新闻文本:
    1. 去除 HTML 标签
    2. 折叠多余空白
    3. 截断到 800 字(避免超过 Token 限制)
    """
    # 去除 HTML 标签
    text = re.sub(r'<[^>]+>', '', text)
    # 去除多余换行和空格
    text = re.sub(r'\s+', ' ', text).strip()
    # 截断到 800 字(约 1200 token,安全范围内)
    return text[:800]

def load_news_data(filepath: str) -> pd.DataFrame:
    """
    从 JSONL 文件加载新闻数据
    每行格式:{"title": "...", "content": "...", "source": "...", "pub_time": "..."}
    """
    records = []
    with open(filepath, 'r', encoding='utf-8') as f:
        for line in f:
            record = json.loads(line.strip())
            # 标题 + 正文前 400 字,作为 Embedding 输入
            combined = record['title'] + '。' + record.get('content', '')[:400]
            record['clean_text'] = clean_news_text(combined)
            records.append(record)
    return pd.DataFrame(records)

# 示例:生成测试数据(实际替换为真实新闻加载)
sample_news = [
    {"title": "工信部发布新能源汽车补贴政策", "content": "今日工信部正式发布2026年新能源汽车推广应用财政补贴方案..."},
    {"title": "新能源车补贴新政策出炉 覆盖范围更广", "content": "工业和信息化部昨日公布最新新能源汽车补贴细则..."},
    {"title": "春节假期全国旅游收入创历史新高", "content": "文旅部数据显示,2026年春节黄金周全国旅游接待人次..."},
    {"title": "春节旅游市场迎来开门红 各地景区爆满", "content": "春节长假期间,全国各大旅游景区迎来客流高峰..."},
    {"title": "DeepSeek R2模型正式发布 性能超越GPT-5", "content": "深度求索今日在北京发布最新大语言模型 R2..."},
]

with open('news_sample.jsonl', 'w', encoding='utf-8') as f:
    for item in sample_news:
        f.write(json.dumps(item, ensure_ascii=False) + '\n')

df = load_news_data('news_sample.jsonl')
print(f"加载 {len(df)} 条新闻,示例文本长度:{df['clean_text'].str.len().describe()}")
最佳实践:将标题和正文前 400 字拼接作为 Embedding 输入,效果优于只用标题(标题有时过于简短)或只用正文(含大量冗余信息)。

四、批量生成 Embedding

Embedding生成策略
4

调用 OpenAI API 生成语义向量

批量请求可显著减少 API 调用次数(最多 2048 条/次)。 同时加入缓存机制,避免重复计费。

import os
import time
import numpy as np
from openai import OpenAI
from tqdm import tqdm
from dotenv import load_dotenv

load_dotenv()
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))

EMBED_MODEL = "text-embedding-3-small"  # 1536 维,性价比最高
BATCH_SIZE = 100  # 每批请求条数(API 上限 2048)

def get_embeddings_batch(texts: List[str]) -> np.ndarray:
    """
    批量获取 Embedding,自动重试限速错误
    返回 shape (N, 1536) 的 numpy 数组
    """
    all_embeddings = []

    for i in tqdm(range(0, len(texts), BATCH_SIZE), desc="生成 Embedding"):
        batch = texts[i:i + BATCH_SIZE]

        # 重试逻辑(应对 429 限速)
        for attempt in range(3):
            try:
                response = client.embeddings.create(
                    input=batch,
                    model=EMBED_MODEL
                )
                # API 返回顺序与输入一致
                batch_embeddings = [item.embedding for item in response.data]
                all_embeddings.extend(batch_embeddings)
                break
            except Exception as e:
                if attempt == 2:
                    raise e
                wait = 2 ** attempt  # 指数退避:1s, 2s
                print(f"请求失败,{wait}s 后重试:{e}")
                time.sleep(wait)

    return np.array(all_embeddings, dtype=np.float32)

def save_embeddings(embeddings: np.ndarray, filepath: str):
    """保存 Embedding 到本地,避免重复调用 API"""
    np.save(filepath, embeddings)
    print(f"Embedding 已保存到 {filepath},shape: {embeddings.shape}")

def load_embeddings(filepath: str) -> np.ndarray:
    """加载已缓存的 Embedding"""
    return np.load(filepath)

# 执行批量 Embedding 生成
EMBED_CACHE = 'embeddings_cache.npy'

if os.path.exists(EMBED_CACHE):
    print("发现缓存,直接加载...")
    embeddings = load_embeddings(EMBED_CACHE)
else:
    texts = df['clean_text'].tolist()
    embeddings = get_embeddings_batch(texts)
    save_embeddings(embeddings, EMBED_CACHE)

print(f"Embedding 矩阵:{embeddings.shape}")  # (N, 1536)
向量归一化:OpenAI text-embedding-3-small 返回的向量已经是单位向量(模长=1),因此余弦相似度等价于点积,无需额外归一化。如果换用其他模型,建议先 normalize 再计算。

五、KMeans 语义聚类

聚类算法对比
5

自动确定最优簇数并执行聚类

KMeans 需要预先指定簇数 k。我们用 Silhouette Score 扫描候选范围, 自动选取分离度最好的 k 值。

from sklearn.cluster import KMeans, MiniBatchKMeans
from sklearn.metrics import silhouette_score
import matplotlib.pyplot as plt
import matplotlib
matplotlib.rcParams['font.family'] = ['PingFang SC', 'Microsoft YaHei', 'sans-serif']

def find_optimal_clusters(
    X: np.ndarray,
    k_min: int = 3,
    k_max: int = 20,
    step: int = 1
) -> tuple[int, list, list]:
    """
    用 Silhouette Score 确定最优聚类数
    返回 (最优k, k列表, 分数列表)
    """
    k_range = list(range(k_min, min(k_max + 1, len(X)), step))
    scores = []

    print(f"扫描聚类数范围 [{k_min}, {min(k_max, len(X)-1)}]...")
    for k in tqdm(k_range, desc="评估聚类数"):
        # 数据量大时用 MiniBatchKMeans 加速
        if len(X) > 5000:
            model = MiniBatchKMeans(n_clusters=k, random_state=42, n_init=3)
        else:
            model = KMeans(n_clusters=k, random_state=42, n_init=10)
        labels = model.fit_predict(X)
        score = silhouette_score(X, labels, sample_size=min(2000, len(X)))
        scores.append(score)

    optimal_k = k_range[np.argmax(scores)]
    print(f"最优聚类数:{optimal_k}(Silhouette Score = {max(scores):.4f})")
    return optimal_k, k_range, scores

def run_clustering(X: np.ndarray, n_clusters: int) -> tuple[KMeans, np.ndarray]:
    """执行最终聚类"""
    model = KMeans(n_clusters=n_clusters, random_state=42, n_init=20)
    labels = model.fit_predict(X)
    return model, labels

# 对于小数据集(<100条),直接指定 k;大数据集自动扫描
if len(df) < 20:
    optimal_k = min(5, len(df) // 2)
    k_range, scores = [optimal_k], [1.0]
else:
    k_max = min(30, len(df) // 3)
    optimal_k, k_range, scores = find_optimal_clusters(
        embeddings, k_min=2, k_max=k_max
    )

kmeans_model, cluster_labels = run_clustering(embeddings, optimal_k)
df['cluster'] = cluster_labels

# 可视化 Silhouette Score 曲线
plt.figure(figsize=(10, 4))
plt.plot(k_range, scores, 'o-', color='#475569', linewidth=2)
plt.axvline(optimal_k, color='#f97316', linestyle='--', label=f'最优 k={optimal_k}')
plt.xlabel('聚类数 (k)')
plt.ylabel('Silhouette Score')
plt.title('聚类数评估曲线')
plt.legend()
plt.tight_layout()
plt.savefig('silhouette_curve.png', dpi=150)
plt.show()

# 打印每个簇的代表性标题
print("\n=== 各簇代表新闻 ===")
for cid in range(optimal_k):
    cluster_df = df[df['cluster'] == cid]
    print(f"\n簇 {cid}({len(cluster_df)} 条):")
    for title in cluster_df['title'].head(3).tolist():
        print(f"  • {title}")
性能优化:当新闻条数超过 5 万时,切换 MiniBatchKMeans 可将聚类时间从分钟级压缩到秒级,且精度损失可忽略不计。

六、语义去重

去重流程与策略
6

簇内余弦相似度去重

聚类后,同一簇内的新闻语义相近。在每个簇内,计算两两余弦相似度, 若相似度超过阈值(默认 0.92),保留其中一条,丢弃另一条。

from sklearn.metrics.pairwise import cosine_similarity

def deduplicate_within_cluster(
    cluster_indices: list,
    embeddings: np.ndarray,
    threshold: float = 0.92
) -> list:
    """
    在一个簇内执行去重
    返回保留条目的索引列表(去除重复后)

    threshold 说明:
    - 0.95+:非常严格,只去除几乎完全相同的文章
    - 0.92:推荐值,能识别改写型重复
    - 0.85:宽松,可能误删相关但不重复的新闻
    """
    if len(cluster_indices) <= 1:
        return cluster_indices

    # 取出该簇的向量子集
    cluster_vecs = embeddings[cluster_indices]
    # 计算两两余弦相似度矩阵
    sim_matrix = cosine_similarity(cluster_vecs)

    kept = []
    removed = set()

    for i in range(len(cluster_indices)):
        if i in removed:
            continue
        kept.append(cluster_indices[i])
        # 将与 i 相似度超过阈值的其他条目标记为删除
        for j in range(i + 1, len(cluster_indices)):
            if j not in removed and sim_matrix[i][j] >= threshold:
                removed.add(j)

    return kept

def deduplicate_all_clusters(
    df: pd.DataFrame,
    embeddings: np.ndarray,
    threshold: float = 0.92
) -> pd.DataFrame:
    """
    对所有簇执行去重,返回去重后的 DataFrame
    """
    kept_indices = []
    total_removed = 0

    for cid in df['cluster'].unique():
        cluster_idx = df[df['cluster'] == cid].index.tolist()
        kept = deduplicate_within_cluster(cluster_idx, embeddings, threshold)
        kept_indices.extend(kept)
        removed_count = len(cluster_idx) - len(kept)
        if removed_count > 0:
            print(f"簇 {cid}:{len(cluster_idx)} 条 → 保留 {len(kept)} 条(去重 {removed_count} 条)")
        total_removed += removed_count

    print(f"\n总计:{len(df)} 条 → {len(kept_indices)} 条(去重率 {total_removed/len(df)*100:.1f}%)")
    return df.loc[sorted(kept_indices)].reset_index(drop=True)

# 执行去重
df_deduped = deduplicate_all_clusters(df, embeddings, threshold=0.92)
print(df_deduped[['title', 'cluster']].to_string())
阈值调参建议:0.92 是中文新闻的经验值。上线前建议随机抽取 50 对被标记为"重复"的新闻对,人工评估精确率。若误删率高,可适当提高阈值;若遗漏重复多,可降低阈值。

七、FAISS 加速大规模检索

FAISS向量检索架构
7

用 FAISS 替换暴力两两计算

上一步的 cosine_similarity 是 O(N²) 复杂度,万条数据就会很慢。 FAISS(Facebook AI Similarity Search)可将查询复杂度降至近似 O(N·logN)。

import faiss

def build_faiss_index(embeddings: np.ndarray) -> faiss.IndexFlatIP:
    """
    构建 FAISS 内积索引(对已归一化向量,内积=余弦相似度)
    """
    dim = embeddings.shape[1]
    # IndexFlatIP:精确内积搜索,无近似误差
    index = faiss.IndexFlatIP(dim)

    # 归一化(OpenAI 向量已归一化,其他模型需执行此步)
    vecs = embeddings.copy()
    faiss.normalize_L2(vecs)
    index.add(vecs)

    print(f"FAISS 索引构建完成,向量数:{index.ntotal}")
    return index

def find_duplicates_faiss(
    embeddings: np.ndarray,
    index: faiss.IndexFlatIP,
    threshold: float = 0.92,
    top_k: int = 10
) -> set:
    """
    用 FAISS 批量找出所有重复对,返回应删除的索引集合
    top_k: 每条新闻查询最相似的 top_k 条
    """
    vecs = embeddings.copy()
    faiss.normalize_L2(vecs)

    # 批量查询:每条新闻找 top_k 最相似邻居
    similarities, indices = index.search(vecs, top_k + 1)  # +1 因为包含自身

    to_remove = set()
    for i in range(len(vecs)):
        if i in to_remove:
            continue
        for rank in range(1, top_k + 1):  # 跳过第0个(自身)
            j = indices[i][rank]
            sim = similarities[i][rank]
            if j != -1 and j > i and sim >= threshold and j not in to_remove:
                to_remove.add(j)  # 保留 i,删除 j

    return to_remove

# 构建索引并执行去重
faiss_index = build_faiss_index(embeddings)
remove_set = find_duplicates_faiss(embeddings, faiss_index, threshold=0.92)

df_fast_deduped = df.drop(index=list(remove_set)).reset_index(drop=True)
print(f"FAISS 去重:{len(df)} → {len(df_fast_deduped)} 条(去除 {len(remove_set)} 条重复)")
FAISS 索引选型IndexFlatIP 精确但内存大(适合百万级以内);百万以上可改用 IndexIVFFlat(分桶近似)或 IndexHNSWFlat(图索引),在精度损失不到 1% 的前提下速度提升 10-100 倍。

八、t-SNE 可视化验证

可视化与系统评估
8

降维可视化聚类效果

用 t-SNE 将 1536 维向量压缩到 2 维平面,直观检验聚类是否合理—— 同色点(同簇)应该聚拢,不同色点应该分散。

from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import numpy as np

def visualize_clusters(
    embeddings: np.ndarray,
    labels: np.ndarray,
    titles: list,
    output_path: str = 'cluster_visualization.png'
):
    """
    t-SNE 降维可视化聚类结果
    """
    print("正在执行 t-SNE 降维(可能需要 1-2 分钟)...")

    # t-SNE 降到 2 维
    tsne = TSNE(
        n_components=2,
        perplexity=min(30, len(embeddings) - 1),
        random_state=42,
        n_iter=1000
    )
    coords_2d = tsne.fit_transform(embeddings)

    # 绘图
    n_clusters = len(set(labels))
    colors = cm.tab20(np.linspace(0, 1, n_clusters))

    fig, ax = plt.subplots(figsize=(12, 8))
    for cid in range(n_clusters):
        mask = labels == cid
        ax.scatter(
            coords_2d[mask, 0], coords_2d[mask, 1],
            c=[colors[cid]], label=f'簇 {cid}({mask.sum()}条)',
            s=60, alpha=0.8, edgecolors='white', linewidth=0.5
        )

    # 标注前几条新闻标题
    for i, title in enumerate(titles[:20]):
        ax.annotate(
            title[:15] + '...' if len(title) > 15 else title,
            (coords_2d[i, 0], coords_2d[i, 1]),
            fontsize=7, alpha=0.7,
            xytext=(3, 3), textcoords='offset points'
        )

    ax.set_title(f't-SNE 新闻语义聚类可视化({n_clusters} 个簇,{len(embeddings)} 条)', fontsize=14)
    ax.legend(bbox_to_anchor=(1.02, 1), loc='upper left', fontsize=8)
    plt.tight_layout()
    plt.savefig(output_path, dpi=150, bbox_inches='tight')
    plt.show()
    print(f"可视化图已保存到 {output_path}")

# 执行可视化
visualize_clusters(
    embeddings=embeddings,
    labels=cluster_labels,
    titles=df['title'].tolist()
)

整合为完整 Pipeline

将上述所有步骤组合成一个可复用的函数:

def news_clustering_dedup_pipeline(
    input_file: str,
    output_file: str,
    threshold: float = 0.92,
    use_faiss: bool = True
):
    """
    完整的新闻聚类去重 Pipeline
    """
    # 1. 加载数据
    df = load_news_data(input_file)
    print(f"加载 {len(df)} 条新闻")

    # 2. 生成 Embedding(带缓存)
    cache_file = input_file.replace('.jsonl', '_embeddings.npy')
    if os.path.exists(cache_file):
        embeddings = load_embeddings(cache_file)
    else:
        embeddings = get_embeddings_batch(df['clean_text'].tolist())
        save_embeddings(embeddings, cache_file)

    # 3. 聚类
    k_max = min(30, len(df) // 3)
    optimal_k, _, _ = find_optimal_clusters(embeddings, k_min=2, k_max=k_max)
    _, cluster_labels = run_clustering(embeddings, optimal_k)
    df['cluster'] = cluster_labels

    # 4. 去重
    if use_faiss:
        faiss_index = build_faiss_index(embeddings)
        remove_set = find_duplicates_faiss(embeddings, faiss_index, threshold)
        df_result = df.drop(index=list(remove_set)).reset_index(drop=True)
    else:
        df_result = deduplicate_all_clusters(df, embeddings, threshold)

    # 5. 输出
    df_result.to_json(output_file, orient='records', force_ascii=False, indent=2)
    print(f"结果已写入 {output_file}({len(df_result)} 条,去重率 {(1-len(df_result)/len(df))*100:.1f}%)")

    # 6. 可视化
    visualize_clusters(embeddings, cluster_labels, df['title'].tolist())

    return df_result

# 运行 pipeline
result_df = news_clustering_dedup_pipeline(
    input_file='news_sample.jsonl',
    output_file='news_deduped.json',
    threshold=0.92
)

常见问题解答

问题 解决方案
API 调用返回 429 限速错误 降低 BATCH_SIZE(改为 20-50),或在批次间 time.sleep(0.5);申请更高 API 配额
聚类结果全部在同一个簇 检查文本是否过于相似或过短;尝试只用标题或只用正文;确认数据多样性足够
去重阈值难以确定 从 0.92 开始,随机抽取 30 对被判定为重复的新闻人工校验;用 F1 分数量化调优
t-SNE 运行很慢 数据量大时先用 PCA 降至 50 维再做 t-SNE;或改用更快的 UMAP(pip install umap-learn
想用国产 Embedding 模型 可替换为智谱 BGE-M3、百川 Embedding 等;API 调用接口类似,替换 model 参数即可
新增新闻如何增量处理 只对新增新闻生成 Embedding,再用 FAISS 与历史库查询相似度,超阈值则跳过入库

进阶技巧与最佳实践

  • 多模型融合:分别用标题向量和正文向量,最终相似度取加权平均(标题权重 0.6,正文 0.4),能同时捕获主题和细节层面的重复。
  • 时间窗口限制:只在 24 小时内的新闻间做去重,避免跨周期的同主题文章被误删(如每年一度的春节消费报道)。
  • 增量更新:将历史 Embedding 持久化到 FAISS 索引文件(faiss.write_index),新新闻入库前先查询,实现实时去重。
  • HDBSCAN 替代 KMeans:KMeans 需要预设 k 值;HDBSCAN 是密度聚类,能自动发现簇数和异常点,更适合新闻主题数量动态变化的场景。
  • MinHash + LSH 预筛选:在语义去重前先做基于字符 n-gram 的 MinHash 预筛,过滤掉明显不同的新闻对,减少 Embedding API 调用量。

本文技术栈总结

Python 3.10+ OpenAI text-embedding-3-small KMeans / MiniBatchKMeans Silhouette Score cosine_similarity FAISS IndexFlatIP t-SNE HDBSCAN(进阶)

完整代码已覆盖从数据预处理到可视化验证的全流程,可在 5 分钟内跑通小样本测试,扩展到万级新闻时只需换用 MiniBatchKMeans + FAISS IVF 索引即可。

选择栏目
今日简报 播客电台 实战教程 AI挣钱计划 关于我
栏目
全球AI日报国内AI日报全球金融日报国内金融日报全球大新闻日报国内大新闻日报Claude Code 玩法日报OpenClaw 动态日报GitHub 热门项目日报AI工具实战AI应用开发编程实战工作流自动化AI原理图解AI Agent开发AI变现案例库AI工具创收AI内容变现AI接单提效变现前沿研究
我的收藏