用 Embedding 构建新闻相似度聚类与去重系统
每天各大平台推送的新闻条数动辄数千,其中大量是同一事件的重复报道。 传统基于关键词的去重方法容易漏掉改了标题的重复文章,而基于 Embedding 语义向量 的方案能真正理解文本含义——即便措辞完全不同,语义相近的新闻依然会被识别为重复。
本教程将带你从零搭建一套完整的新闻聚类与去重管道:用 OpenAI Embedding API 将文本转化为向量,用 KMeans 实现语义聚类,再结合余弦相似度阈值完成精准去重,最后用 t-SNE 可视化验证效果。 全程代码均可直接运行,无需机器学习背景。
你将学到什么
- Embedding 的核心原理与向量相似度计算
- 如何批量调用 OpenAI API 生成新闻 Embedding
- 用 KMeans + Silhouette Score 自动确定最优聚类数
- 基于余弦相似度的簇内语义去重策略
- 用 FAISS 实现大规模向量检索加速
- t-SNE 可视化聚类效果验证
一、系统架构与核心原理
整体数据流
整个系统分四个阶段:
- 数据预处理:清洗原始新闻文本,去除 HTML 标签、无效字符
- Embedding 生成:调用 OpenAI API,将每条新闻转为 1536 维向量
- 语义聚类:用 KMeans 将相似新闻归入同一簇,自动发现主题
- 去重输出:在每个簇内计算两两余弦相似度,超过阈值则保留一条
高维向量空间中,欧氏距离会受向量模长影响,而余弦相似度只关注方向(角度),对文本长度不敏感,更能反映语义接近程度。
Embedding 直觉理解:把每篇文章想象成宇宙中的一颗星——语义越相近,星星越靠拢。 聚类就是把星系划分出来,去重就是把同一位置重叠的星合并为一颗。
二、环境准备与依赖安装
安装核心依赖
推荐使用 Python 3.10+。所有依赖均可通过 pip 一次性安装:
# 创建虚拟环境(推荐)
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
# 安装依赖
pip install openai==1.12.0 \
scikit-learn==1.4.0 \
numpy==1.26.3 \
pandas==2.2.0 \
faiss-cpu==1.7.4 \
matplotlib==3.8.2 \
datasketch==1.6.4 \
python-dotenv==1.0.0 \
tqdm==4.66.1
创建 .env 文件存储 API Key(不要把密钥写进代码):
# .env
OPENAI_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
text-embedding-3-small 约 $0.02/百万 token,1000 条新闻(每条 200 字)大约消耗 $0.08,成本极低。但首次运行建议先用 20 条测试。
三、数据预处理
清洗与准备新闻文本
原始新闻数据往往包含 HTML 标签、多余空白、特殊符号。 这一步先做统一清洗,再截取合理长度(Embedding 模型有 token 上限)。
import re
import json
import pandas as pd
from typing import List
def clean_news_text(text: str) -> str:
"""
清洗新闻文本:
1. 去除 HTML 标签
2. 折叠多余空白
3. 截断到 800 字(避免超过 Token 限制)
"""
# 去除 HTML 标签
text = re.sub(r'<[^>]+>', '', text)
# 去除多余换行和空格
text = re.sub(r'\s+', ' ', text).strip()
# 截断到 800 字(约 1200 token,安全范围内)
return text[:800]
def load_news_data(filepath: str) -> pd.DataFrame:
"""
从 JSONL 文件加载新闻数据
每行格式:{"title": "...", "content": "...", "source": "...", "pub_time": "..."}
"""
records = []
with open(filepath, 'r', encoding='utf-8') as f:
for line in f:
record = json.loads(line.strip())
# 标题 + 正文前 400 字,作为 Embedding 输入
combined = record['title'] + '。' + record.get('content', '')[:400]
record['clean_text'] = clean_news_text(combined)
records.append(record)
return pd.DataFrame(records)
# 示例:生成测试数据(实际替换为真实新闻加载)
sample_news = [
{"title": "工信部发布新能源汽车补贴政策", "content": "今日工信部正式发布2026年新能源汽车推广应用财政补贴方案..."},
{"title": "新能源车补贴新政策出炉 覆盖范围更广", "content": "工业和信息化部昨日公布最新新能源汽车补贴细则..."},
{"title": "春节假期全国旅游收入创历史新高", "content": "文旅部数据显示,2026年春节黄金周全国旅游接待人次..."},
{"title": "春节旅游市场迎来开门红 各地景区爆满", "content": "春节长假期间,全国各大旅游景区迎来客流高峰..."},
{"title": "DeepSeek R2模型正式发布 性能超越GPT-5", "content": "深度求索今日在北京发布最新大语言模型 R2..."},
]
with open('news_sample.jsonl', 'w', encoding='utf-8') as f:
for item in sample_news:
f.write(json.dumps(item, ensure_ascii=False) + '\n')
df = load_news_data('news_sample.jsonl')
print(f"加载 {len(df)} 条新闻,示例文本长度:{df['clean_text'].str.len().describe()}")
四、批量生成 Embedding
调用 OpenAI API 生成语义向量
批量请求可显著减少 API 调用次数(最多 2048 条/次)。 同时加入缓存机制,避免重复计费。
import os
import time
import numpy as np
from openai import OpenAI
from tqdm import tqdm
from dotenv import load_dotenv
load_dotenv()
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
EMBED_MODEL = "text-embedding-3-small" # 1536 维,性价比最高
BATCH_SIZE = 100 # 每批请求条数(API 上限 2048)
def get_embeddings_batch(texts: List[str]) -> np.ndarray:
"""
批量获取 Embedding,自动重试限速错误
返回 shape (N, 1536) 的 numpy 数组
"""
all_embeddings = []
for i in tqdm(range(0, len(texts), BATCH_SIZE), desc="生成 Embedding"):
batch = texts[i:i + BATCH_SIZE]
# 重试逻辑(应对 429 限速)
for attempt in range(3):
try:
response = client.embeddings.create(
input=batch,
model=EMBED_MODEL
)
# API 返回顺序与输入一致
batch_embeddings = [item.embedding for item in response.data]
all_embeddings.extend(batch_embeddings)
break
except Exception as e:
if attempt == 2:
raise e
wait = 2 ** attempt # 指数退避:1s, 2s
print(f"请求失败,{wait}s 后重试:{e}")
time.sleep(wait)
return np.array(all_embeddings, dtype=np.float32)
def save_embeddings(embeddings: np.ndarray, filepath: str):
"""保存 Embedding 到本地,避免重复调用 API"""
np.save(filepath, embeddings)
print(f"Embedding 已保存到 {filepath},shape: {embeddings.shape}")
def load_embeddings(filepath: str) -> np.ndarray:
"""加载已缓存的 Embedding"""
return np.load(filepath)
# 执行批量 Embedding 生成
EMBED_CACHE = 'embeddings_cache.npy'
if os.path.exists(EMBED_CACHE):
print("发现缓存,直接加载...")
embeddings = load_embeddings(EMBED_CACHE)
else:
texts = df['clean_text'].tolist()
embeddings = get_embeddings_batch(texts)
save_embeddings(embeddings, EMBED_CACHE)
print(f"Embedding 矩阵:{embeddings.shape}") # (N, 1536)
text-embedding-3-small 返回的向量已经是单位向量(模长=1),因此余弦相似度等价于点积,无需额外归一化。如果换用其他模型,建议先 normalize 再计算。
五、KMeans 语义聚类
自动确定最优簇数并执行聚类
KMeans 需要预先指定簇数 k。我们用 Silhouette Score 扫描候选范围, 自动选取分离度最好的 k 值。
from sklearn.cluster import KMeans, MiniBatchKMeans
from sklearn.metrics import silhouette_score
import matplotlib.pyplot as plt
import matplotlib
matplotlib.rcParams['font.family'] = ['PingFang SC', 'Microsoft YaHei', 'sans-serif']
def find_optimal_clusters(
X: np.ndarray,
k_min: int = 3,
k_max: int = 20,
step: int = 1
) -> tuple[int, list, list]:
"""
用 Silhouette Score 确定最优聚类数
返回 (最优k, k列表, 分数列表)
"""
k_range = list(range(k_min, min(k_max + 1, len(X)), step))
scores = []
print(f"扫描聚类数范围 [{k_min}, {min(k_max, len(X)-1)}]...")
for k in tqdm(k_range, desc="评估聚类数"):
# 数据量大时用 MiniBatchKMeans 加速
if len(X) > 5000:
model = MiniBatchKMeans(n_clusters=k, random_state=42, n_init=3)
else:
model = KMeans(n_clusters=k, random_state=42, n_init=10)
labels = model.fit_predict(X)
score = silhouette_score(X, labels, sample_size=min(2000, len(X)))
scores.append(score)
optimal_k = k_range[np.argmax(scores)]
print(f"最优聚类数:{optimal_k}(Silhouette Score = {max(scores):.4f})")
return optimal_k, k_range, scores
def run_clustering(X: np.ndarray, n_clusters: int) -> tuple[KMeans, np.ndarray]:
"""执行最终聚类"""
model = KMeans(n_clusters=n_clusters, random_state=42, n_init=20)
labels = model.fit_predict(X)
return model, labels
# 对于小数据集(<100条),直接指定 k;大数据集自动扫描
if len(df) < 20:
optimal_k = min(5, len(df) // 2)
k_range, scores = [optimal_k], [1.0]
else:
k_max = min(30, len(df) // 3)
optimal_k, k_range, scores = find_optimal_clusters(
embeddings, k_min=2, k_max=k_max
)
kmeans_model, cluster_labels = run_clustering(embeddings, optimal_k)
df['cluster'] = cluster_labels
# 可视化 Silhouette Score 曲线
plt.figure(figsize=(10, 4))
plt.plot(k_range, scores, 'o-', color='#475569', linewidth=2)
plt.axvline(optimal_k, color='#f97316', linestyle='--', label=f'最优 k={optimal_k}')
plt.xlabel('聚类数 (k)')
plt.ylabel('Silhouette Score')
plt.title('聚类数评估曲线')
plt.legend()
plt.tight_layout()
plt.savefig('silhouette_curve.png', dpi=150)
plt.show()
# 打印每个簇的代表性标题
print("\n=== 各簇代表新闻 ===")
for cid in range(optimal_k):
cluster_df = df[df['cluster'] == cid]
print(f"\n簇 {cid}({len(cluster_df)} 条):")
for title in cluster_df['title'].head(3).tolist():
print(f" • {title}")
MiniBatchKMeans 可将聚类时间从分钟级压缩到秒级,且精度损失可忽略不计。
六、语义去重
簇内余弦相似度去重
聚类后,同一簇内的新闻语义相近。在每个簇内,计算两两余弦相似度, 若相似度超过阈值(默认 0.92),保留其中一条,丢弃另一条。
from sklearn.metrics.pairwise import cosine_similarity
def deduplicate_within_cluster(
cluster_indices: list,
embeddings: np.ndarray,
threshold: float = 0.92
) -> list:
"""
在一个簇内执行去重
返回保留条目的索引列表(去除重复后)
threshold 说明:
- 0.95+:非常严格,只去除几乎完全相同的文章
- 0.92:推荐值,能识别改写型重复
- 0.85:宽松,可能误删相关但不重复的新闻
"""
if len(cluster_indices) <= 1:
return cluster_indices
# 取出该簇的向量子集
cluster_vecs = embeddings[cluster_indices]
# 计算两两余弦相似度矩阵
sim_matrix = cosine_similarity(cluster_vecs)
kept = []
removed = set()
for i in range(len(cluster_indices)):
if i in removed:
continue
kept.append(cluster_indices[i])
# 将与 i 相似度超过阈值的其他条目标记为删除
for j in range(i + 1, len(cluster_indices)):
if j not in removed and sim_matrix[i][j] >= threshold:
removed.add(j)
return kept
def deduplicate_all_clusters(
df: pd.DataFrame,
embeddings: np.ndarray,
threshold: float = 0.92
) -> pd.DataFrame:
"""
对所有簇执行去重,返回去重后的 DataFrame
"""
kept_indices = []
total_removed = 0
for cid in df['cluster'].unique():
cluster_idx = df[df['cluster'] == cid].index.tolist()
kept = deduplicate_within_cluster(cluster_idx, embeddings, threshold)
kept_indices.extend(kept)
removed_count = len(cluster_idx) - len(kept)
if removed_count > 0:
print(f"簇 {cid}:{len(cluster_idx)} 条 → 保留 {len(kept)} 条(去重 {removed_count} 条)")
total_removed += removed_count
print(f"\n总计:{len(df)} 条 → {len(kept_indices)} 条(去重率 {total_removed/len(df)*100:.1f}%)")
return df.loc[sorted(kept_indices)].reset_index(drop=True)
# 执行去重
df_deduped = deduplicate_all_clusters(df, embeddings, threshold=0.92)
print(df_deduped[['title', 'cluster']].to_string())
七、FAISS 加速大规模检索
用 FAISS 替换暴力两两计算
上一步的 cosine_similarity 是 O(N²) 复杂度,万条数据就会很慢。
FAISS(Facebook AI Similarity Search)可将查询复杂度降至近似 O(N·logN)。
import faiss
def build_faiss_index(embeddings: np.ndarray) -> faiss.IndexFlatIP:
"""
构建 FAISS 内积索引(对已归一化向量,内积=余弦相似度)
"""
dim = embeddings.shape[1]
# IndexFlatIP:精确内积搜索,无近似误差
index = faiss.IndexFlatIP(dim)
# 归一化(OpenAI 向量已归一化,其他模型需执行此步)
vecs = embeddings.copy()
faiss.normalize_L2(vecs)
index.add(vecs)
print(f"FAISS 索引构建完成,向量数:{index.ntotal}")
return index
def find_duplicates_faiss(
embeddings: np.ndarray,
index: faiss.IndexFlatIP,
threshold: float = 0.92,
top_k: int = 10
) -> set:
"""
用 FAISS 批量找出所有重复对,返回应删除的索引集合
top_k: 每条新闻查询最相似的 top_k 条
"""
vecs = embeddings.copy()
faiss.normalize_L2(vecs)
# 批量查询:每条新闻找 top_k 最相似邻居
similarities, indices = index.search(vecs, top_k + 1) # +1 因为包含自身
to_remove = set()
for i in range(len(vecs)):
if i in to_remove:
continue
for rank in range(1, top_k + 1): # 跳过第0个(自身)
j = indices[i][rank]
sim = similarities[i][rank]
if j != -1 and j > i and sim >= threshold and j not in to_remove:
to_remove.add(j) # 保留 i,删除 j
return to_remove
# 构建索引并执行去重
faiss_index = build_faiss_index(embeddings)
remove_set = find_duplicates_faiss(embeddings, faiss_index, threshold=0.92)
df_fast_deduped = df.drop(index=list(remove_set)).reset_index(drop=True)
print(f"FAISS 去重:{len(df)} → {len(df_fast_deduped)} 条(去除 {len(remove_set)} 条重复)")
IndexFlatIP 精确但内存大(适合百万级以内);百万以上可改用 IndexIVFFlat(分桶近似)或 IndexHNSWFlat(图索引),在精度损失不到 1% 的前提下速度提升 10-100 倍。
八、t-SNE 可视化验证
降维可视化聚类效果
用 t-SNE 将 1536 维向量压缩到 2 维平面,直观检验聚类是否合理—— 同色点(同簇)应该聚拢,不同色点应该分散。
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import numpy as np
def visualize_clusters(
embeddings: np.ndarray,
labels: np.ndarray,
titles: list,
output_path: str = 'cluster_visualization.png'
):
"""
t-SNE 降维可视化聚类结果
"""
print("正在执行 t-SNE 降维(可能需要 1-2 分钟)...")
# t-SNE 降到 2 维
tsne = TSNE(
n_components=2,
perplexity=min(30, len(embeddings) - 1),
random_state=42,
n_iter=1000
)
coords_2d = tsne.fit_transform(embeddings)
# 绘图
n_clusters = len(set(labels))
colors = cm.tab20(np.linspace(0, 1, n_clusters))
fig, ax = plt.subplots(figsize=(12, 8))
for cid in range(n_clusters):
mask = labels == cid
ax.scatter(
coords_2d[mask, 0], coords_2d[mask, 1],
c=[colors[cid]], label=f'簇 {cid}({mask.sum()}条)',
s=60, alpha=0.8, edgecolors='white', linewidth=0.5
)
# 标注前几条新闻标题
for i, title in enumerate(titles[:20]):
ax.annotate(
title[:15] + '...' if len(title) > 15 else title,
(coords_2d[i, 0], coords_2d[i, 1]),
fontsize=7, alpha=0.7,
xytext=(3, 3), textcoords='offset points'
)
ax.set_title(f't-SNE 新闻语义聚类可视化({n_clusters} 个簇,{len(embeddings)} 条)', fontsize=14)
ax.legend(bbox_to_anchor=(1.02, 1), loc='upper left', fontsize=8)
plt.tight_layout()
plt.savefig(output_path, dpi=150, bbox_inches='tight')
plt.show()
print(f"可视化图已保存到 {output_path}")
# 执行可视化
visualize_clusters(
embeddings=embeddings,
labels=cluster_labels,
titles=df['title'].tolist()
)
整合为完整 Pipeline
将上述所有步骤组合成一个可复用的函数:
def news_clustering_dedup_pipeline(
input_file: str,
output_file: str,
threshold: float = 0.92,
use_faiss: bool = True
):
"""
完整的新闻聚类去重 Pipeline
"""
# 1. 加载数据
df = load_news_data(input_file)
print(f"加载 {len(df)} 条新闻")
# 2. 生成 Embedding(带缓存)
cache_file = input_file.replace('.jsonl', '_embeddings.npy')
if os.path.exists(cache_file):
embeddings = load_embeddings(cache_file)
else:
embeddings = get_embeddings_batch(df['clean_text'].tolist())
save_embeddings(embeddings, cache_file)
# 3. 聚类
k_max = min(30, len(df) // 3)
optimal_k, _, _ = find_optimal_clusters(embeddings, k_min=2, k_max=k_max)
_, cluster_labels = run_clustering(embeddings, optimal_k)
df['cluster'] = cluster_labels
# 4. 去重
if use_faiss:
faiss_index = build_faiss_index(embeddings)
remove_set = find_duplicates_faiss(embeddings, faiss_index, threshold)
df_result = df.drop(index=list(remove_set)).reset_index(drop=True)
else:
df_result = deduplicate_all_clusters(df, embeddings, threshold)
# 5. 输出
df_result.to_json(output_file, orient='records', force_ascii=False, indent=2)
print(f"结果已写入 {output_file}({len(df_result)} 条,去重率 {(1-len(df_result)/len(df))*100:.1f}%)")
# 6. 可视化
visualize_clusters(embeddings, cluster_labels, df['title'].tolist())
return df_result
# 运行 pipeline
result_df = news_clustering_dedup_pipeline(
input_file='news_sample.jsonl',
output_file='news_deduped.json',
threshold=0.92
)
常见问题解答
| 问题 | 解决方案 |
|---|---|
| API 调用返回 429 限速错误 | 降低 BATCH_SIZE(改为 20-50),或在批次间 time.sleep(0.5);申请更高 API 配额 |
| 聚类结果全部在同一个簇 | 检查文本是否过于相似或过短;尝试只用标题或只用正文;确认数据多样性足够 |
| 去重阈值难以确定 | 从 0.92 开始,随机抽取 30 对被判定为重复的新闻人工校验;用 F1 分数量化调优 |
| t-SNE 运行很慢 | 数据量大时先用 PCA 降至 50 维再做 t-SNE;或改用更快的 UMAP(pip install umap-learn) |
| 想用国产 Embedding 模型 | 可替换为智谱 BGE-M3、百川 Embedding 等;API 调用接口类似,替换 model 参数即可 |
| 新增新闻如何增量处理 | 只对新增新闻生成 Embedding,再用 FAISS 与历史库查询相似度,超阈值则跳过入库 |
进阶技巧与最佳实践
- 多模型融合:分别用标题向量和正文向量,最终相似度取加权平均(标题权重 0.6,正文 0.4),能同时捕获主题和细节层面的重复。
- 时间窗口限制:只在 24 小时内的新闻间做去重,避免跨周期的同主题文章被误删(如每年一度的春节消费报道)。
-
增量更新:将历史 Embedding 持久化到 FAISS 索引文件(
faiss.write_index),新新闻入库前先查询,实现实时去重。 - HDBSCAN 替代 KMeans:KMeans 需要预设 k 值;HDBSCAN 是密度聚类,能自动发现簇数和异常点,更适合新闻主题数量动态变化的场景。
- MinHash + LSH 预筛选:在语义去重前先做基于字符 n-gram 的 MinHash 预筛,过滤掉明显不同的新闻对,减少 Embedding API 调用量。
本文技术栈总结
完整代码已覆盖从数据预处理到可视化验证的全流程,可在 5 分钟内跑通小样本测试,扩展到万级新闻时只需换用 MiniBatchKMeans + FAISS IVF 索引即可。