中级难度 · 实战教程
用LLM构建实时股票筛选系统
多智能体架构实战指南
大语言模型正在重塑金融量化分析的范式。传统股票筛选依赖固定规则和硬编码指标,而LLM驱动的筛选系统能够理解复杂的市场语义、动态生成筛选条件,并结合新闻情绪与基本面数据进行多维度综合判断。本教程将从零构建一套完整的生产级股票筛选系统,涵盖实时数据采集、多智能体协作、FastAPI后端与持续决策引擎。
ℹ️
免责声明:本教程仅用于技术学习和研究目的。系统输出不构成任何投资建议,投资有风险,入市需谨慎。
第一步:系统架构概览
整个系统分为四层,各层职责清晰,通过异步消息队列解耦:
Layer 1
数据层
- yfinance 实时行情
- RSS 新闻流
- Alpha Vantage 财报
- Redis 缓存
Layer 2
智能体层
- 基本面分析师
- 情绪分析师
- 估值分析师
- 首席决策者
Layer 3
执行层
- FastAPI 服务
- Celery 任务队列
- SQLite 结果库
- WebSocket 推送
Layer 4
展示层
- React 18 前端
- 实时数据看板
- 筛选结果排序
- 交互式报告
核心数据流如下:定时任务每5分钟拉取一批股票的实时行情和新闻,推入 Redis 队列;Celery 工作进程消费队列,将数据分发给对应的 LLM 智能体进行分析;智能体协作完成后,首席决策者汇总评分并写入 SQLite;FastAPI 通过 WebSocket 将结果实时推送到前端看板。
✅
架构选型原则:各组件均可水平扩展。LLM 供应商通过统一接口抽象,可随时切换 OpenAI、DeepSeek 或 Groq,无需修改业务逻辑。
第二步:环境准备与依赖安装
1
创建项目结构
建立清晰的目录结构,分离数据层、智能体层与 API 层:
mkdir stock-screener && cd stock-screener
mkdir -p {agents,api,data,tasks,frontend/src}
touch .env requirements.txt docker-compose.yml
2
安装 Python 依赖
建议使用 Python 3.11+,核心依赖包括数据获取、LangGraph 以及 FastAPI:
# requirements.txt
# 数据层
yfinance==0.2.51
akshare==1.14.40 # A股数据支持
feedparser==6.0.11 # RSS新闻
httpx==0.28.1
# LLM与智能体
langchain==0.3.20
langgraph==0.2.73
langchain-openai==0.2.14
langchain-community==0.3.19
# 后端
fastapi==0.115.8
uvicorn[standard]==0.34.0
celery[redis]==5.4.0
redis==5.2.1
sqlalchemy==2.0.38
# 工具
python-dotenv==1.0.1
pydantic==2.10.6
pandas==2.2.3
pip install -r requirements.txt
3
配置环境变量
# .env
OPENAI_API_KEY=sk-xxx # 或 DeepSeek/Groq 兼容密钥
OPENAI_BASE_URL=https://api.deepseek.com/v1
LLM_MODEL=deepseek-chat
REDIS_URL=redis://localhost:6379/0
DATABASE_URL=sqlite:///./screener.db
ALPHA_VANTAGE_KEY=your_key_here
# 每批筛选的股票池
STOCK_POOL=AAPL,MSFT,NVDA,GOOGL,META,TSLA,AMZN,BRK-B
4
启动 Redis(Docker 方式)
docker run -d --name redis-screener \
-p 6379:6379 \
redis:7-alpine \
redis-server --maxmemory 512mb --maxmemory-policy allkeys-lru
第三步:数据采集管道实现
数据质量是整个系统的基石。我们需要同时采集三类数据:实时价格、公司基本面、以及市场情绪新闻。
实时行情采集器
# data/collector.py
import yfinance as yf
import feedparser
import redis
import json
from datetime import datetime
from typing import Optional
import os
r = redis.from_url(os.getenv("REDIS_URL", "redis://localhost:6379/0"))
def fetch_stock_snapshot(ticker: str) -> dict:
"""获取单只股票的完整快照数据"""
stock = yf.Ticker(ticker)
info = stock.info
hist = stock.history(period="5d", interval="1d")
# 计算技术指标
closes = hist["Close"].tolist()
sma5 = sum(closes[-5:]) / min(5, len(closes)) if closes else None
return {
"ticker": ticker,
"timestamp": datetime.now().isoformat(),
# 价格数据
"current_price": info.get("currentPrice"),
"day_high": info.get("dayHigh"),
"day_low": info.get("dayLow"),
"volume": info.get("volume"),
"avg_volume": info.get("averageVolume"),
"sma_5d": round(sma5, 2) if sma5 else None,
# 基本面
"pe_ratio": info.get("trailingPE"),
"forward_pe": info.get("forwardPE"),
"revenue_growth": info.get("revenueGrowth"),
"profit_margin": info.get("profitMargins"),
"roe": info.get("returnOnEquity"),
"debt_to_equity": info.get("debtToEquity"),
"market_cap": info.get("marketCap"),
"sector": info.get("sector", "Unknown"),
"company_name": info.get("longName", ticker),
}
def fetch_news_sentiment(ticker: str, limit: int = 5) -> list[dict]:
"""从 Yahoo Finance RSS 获取最新新闻标题"""
url = f"https://feeds.finance.yahoo.com/rss/2.0/headline?s={ticker}®ion=US&lang=en-US"
feed = feedparser.parse(url)
news = []
for entry in feed.entries[:limit]:
news.append({
"title": entry.title,
"summary": getattr(entry, "summary", "")[:200],
"published": getattr(entry, "published", ""),
})
return news
def push_to_queue(ticker: str) -> None:
"""采集数据并推入 Redis 任务队列"""
snapshot = fetch_stock_snapshot(ticker)
snapshot["news"] = fetch_news_sentiment(ticker)
r.lpush("analysis_queue", json.dumps(snapshot))
r.expire("analysis_queue", 3600) # 1小时过期
print(f"[{ticker}] 数据已推入队列")
if __name__ == "__main__":
tickers = os.getenv("STOCK_POOL", "").split(",")
for t in tickers:
push_to_queue(t.strip())
⚠️
API 限速注意:yfinance 免费版有请求频率限制。生产环境中建议为每个 ticker 请求间加 0.5s 延迟,或升级使用付费数据源(如 Polygon.io、Alpha Vantage Premium)。
第四步:LLM 智能代理设计
每个智能代理专注于单一分析维度,通过结构化输出保证下游可程序化消费。我们使用 LangChain 的结构化输出能力,强制 LLM 返回 Pydantic 模型定义的 JSON 格式。
# agents/base.py
from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
import os
def get_llm(temperature: float = 0.1) -> ChatOpenAI:
"""获取统一配置的 LLM 客户端,支持 OpenAI 兼容协议"""
return ChatOpenAI(
model=os.getenv("LLM_MODEL", "deepseek-chat"),
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL"),
temperature=temperature,
max_retries=3,
timeout=30,
)
# ── 基本面分析师 ──
class FundamentalSignal(BaseModel):
score: float = Field(ge=0, le=10, description="基本面综合评分 0-10")
pe_assessment: str = Field(description="PE估值评价")
growth_signal: str = Field(description="成长性信号: STRONG/MODERATE/WEAK")
key_risks: list[str] = Field(description="主要风险点列表")
summary: str = Field(description="50字以内的基本面总结")
FUNDAMENTAL_PROMPT = ChatPromptTemplate.from_template("""
你是一位资深基本面分析师。请根据以下财务数据,对该股票进行基本面评估。
股票: {ticker}
当前价格: ${current_price}
市盈率(PE): {pe_ratio}
预期PE: {forward_pe}
营收增长: {revenue_growth}
利润率: {profit_margin}
ROE: {roe}
负债权益比: {debt_to_equity}
市值: ${market_cap}
行业: {sector}
请严格按照 JSON 格式输出评估结果,评分标准:
- 9-10分: 优质成长股,各项指标突出
- 7-8分: 良好,部分指标优秀
- 5-6分: 一般,需持续观察
- 3-4分: 较弱,存在明显风险
- 0-2分: 差,建议回避
""")
def analyze_fundamentals(data: dict) -> FundamentalSignal:
llm = get_llm()
chain = FUNDAMENTAL_PROMPT | llm.with_structured_output(FundamentalSignal)
return chain.invoke(data)
# agents/sentiment.py — 情绪分析师
from pydantic import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate
from agents.base import get_llm
class SentimentSignal(BaseModel):
overall_sentiment: str = Field(description="BULLISH/NEUTRAL/BEARISH")
score: float = Field(ge=-1, le=1, description="情绪得分 -1到1")
key_catalysts: list[str] = Field(description="主要催化剂(利多/利空)")
news_highlights: list[str] = Field(description="重要新闻摘要")
SENTIMENT_PROMPT = ChatPromptTemplate.from_template("""
你是一位专业的市场情绪分析师。分析以下新闻标题对 {ticker} 股价的潜在影响。
公司: {company_name}
最新新闻:
{news_text}
请分析这些新闻的整体情绪倾向和主要催化剂。对于每条新闻,
判断其是利多、利空还是中性,并给出综合情绪评分。
""")
def analyze_sentiment(data: dict) -> SentimentSignal:
news_text = "\n".join(
[f"- {n['title']}" for n in data.get("news", [])]
) or "暂无最新新闻"
llm = get_llm(temperature=0.0)
chain = SENTIMENT_PROMPT | llm.with_structured_output(SentimentSignal)
return chain.invoke({**data, "news_text": news_text})
第五步:多智能体协作架构(LangGraph)
LangGraph 允许我们将多个智能体组织成有向状态图(StateGraph),支持并行执行、条件分支和状态共享。三位分析师并行工作,最后由首席决策者汇总。
# agents/graph.py
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from typing import TypedDict, Annotated, Optional
from pydantic import BaseModel, Field
from agents.base import analyze_fundamentals, get_llm
from agents.sentiment import analyze_sentiment
# ── 共享状态定义 ──
class ScreenerState(TypedDict):
stock_data: dict # 原始股票数据
fundamental: Optional[dict] # 基本面分析结果
sentiment: Optional[dict] # 情绪分析结果
valuation: Optional[dict] # 估值分析结果
final_score: Optional[float] # 综合评分
recommendation: Optional[str] # BUY/HOLD/AVOID
rationale: Optional[str] # 决策理由
# ── 估值分析节点 ──
class ValuationSignal(BaseModel):
intrinsic_value_estimate: float
margin_of_safety: float = Field(description="安全边际百分比")
valuation_method: str = Field(description="使用的估值方法")
score: float = Field(ge=0, le=10)
def run_fundamental_node(state: ScreenerState) -> ScreenerState:
result = analyze_fundamentals(state["stock_data"])
return {**state, "fundamental": result.model_dump()}
def run_sentiment_node(state: ScreenerState) -> ScreenerState:
result = analyze_sentiment(state["stock_data"])
return {**state, "sentiment": result.model_dump()}
def run_valuation_node(state: ScreenerState) -> ScreenerState:
from langchain_core.prompts import ChatPromptTemplate
prompt = ChatPromptTemplate.from_template("""
对 {ticker}(当前价 ${current_price},PE {pe_ratio},ROE {roe})
进行简单估值分析,计算内在价值并判断安全边际。
""")
llm = get_llm()
chain = prompt | llm.with_structured_output(ValuationSignal)
result = chain.invoke(state["stock_data"])
return {**state, "valuation": result.model_dump()}
def chief_decision_node(state: ScreenerState) -> ScreenerState:
"""首席分析师:汇总三路信号,生成最终评级"""
from langchain_core.prompts import ChatPromptTemplate
class FinalDecision(BaseModel):
final_score: float = Field(ge=0, le=10)
recommendation: str = Field(description="BUY/HOLD/AVOID")
rationale: str = Field(description="100字以内的决策理由")
prompt = ChatPromptTemplate.from_template("""
你是首席分析师,请综合以下三位分析师的报告,给出最终评级。
股票: {ticker}
基本面报告: {fundamental}
情绪报告: {sentiment}
估值报告: {valuation}
BUY: 综合评分>=7,各维度均衡
HOLD: 综合评分5-7,存在不确定性
AVOID: 综合评分<5,或存在重大风险
""")
llm = get_llm(temperature=0.0)
chain = prompt | llm.with_structured_output(FinalDecision)
result = chain.invoke({
"ticker": state["stock_data"]["ticker"],
"fundamental": str(state["fundamental"]),
"sentiment": str(state["sentiment"]),
"valuation": str(state["valuation"]),
})
return {**state, "final_score": result.final_score,
"recommendation": result.recommendation,
"rationale": result.rationale}
# ── 构建图并编译 ──
def build_screener_graph():
graph = StateGraph(ScreenerState)
graph.add_node("fundamental", run_fundamental_node)
graph.add_node("sentiment", run_sentiment_node)
graph.add_node("valuation", run_valuation_node)
graph.add_node("chief", chief_decision_node)
# 并行启动三位分析师
graph.set_entry_point("fundamental")
graph.add_edge("fundamental", "sentiment")
graph.add_edge("sentiment", "valuation")
graph.add_edge("valuation", "chief")
graph.add_edge("chief", END)
return graph.compile()
screener_app = build_screener_graph()
💡
并行优化:LangGraph 支持使用
graph.add_node 配合 Send API 实现真正的并行分析。当前教程采用串行简化版本,生产中可将三位分析师改为 parallel 分支以减少约 60% 的等待时间。第六步:筛选策略与执行引擎
系统内置多种经典筛选策略,LLM 会根据市场环境动态选择最合适的策略组合:
| 策略 | 核心条件 | 适用市场 | LLM增强点 |
|---|---|---|---|
| CANSLIM | EPS增长>25%,相对强度≥80 | 牛市 | 自动识别季报超预期 |
| Minervini | 价格>150/200日均线,均线多头排列 | 趋势市 | 新闻催化剂验证 |
| 价值投资 | PE<15,PB<1.5,ROE>15% | 熊市/震荡 | 行业景气度判断 |
| 成长策略 | 营收增长>20%,毛利率>40% | 成长市 | 赛道前景评估 |
# tasks/celery_tasks.py
from celery import Celery
from sqlalchemy import create_engine, Column, Float, String, DateTime, Text
from sqlalchemy.orm import declarative_base, Session
from datetime import datetime
import json, os
from agents.graph import screener_app
app = Celery(
"screener",
broker=os.getenv("REDIS_URL"),
backend=os.getenv("REDIS_URL"),
)
Base = declarative_base()
class ScreeningResult(Base):
__tablename__ = "results"
id = Column(String, primary_key=True)
ticker = Column(String, index=True)
run_at = Column(DateTime, default=datetime.utcnow)
final_score = Column(Float)
recommendation = Column(String)
rationale = Column(Text)
raw_data = Column(Text) # JSON
engine = create_engine(os.getenv("DATABASE_URL", "sqlite:///./screener.db"))
Base.metadata.create_all(engine)
@app.task(bind=True, max_retries=2, default_retry_delay=10)
def analyze_stock(self, stock_json: str):
"""Celery 任务:分析单只股票并存储结果"""
try:
stock_data = json.loads(stock_json)
ticker = stock_data["ticker"]
print(f"[{ticker}] 开始多智能体分析...")
# 调用 LangGraph 状态机
result = screener_app.invoke({
"stock_data": stock_data,
"fundamental": None,
"sentiment": None,
"valuation": None,
"final_score": None,
"recommendation": None,
"rationale": None,
})
# 持久化到数据库
with Session(engine) as session:
record = ScreeningResult(
id=f"{ticker}-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
ticker=ticker,
final_score=result["final_score"],
recommendation=result["recommendation"],
rationale=result["rationale"],
raw_data=json.dumps(result, ensure_ascii=False),
)
session.add(record)
session.commit()
print(f"[{ticker}] 分析完成: {result['recommendation']} ({result['final_score']:.1f}分)")
return {"ticker": ticker, "score": result["final_score"], "rec": result["recommendation"]}
except Exception as e:
raise self.retry(exc=e)
第七步:FastAPI 服务与系统集成
最后将所有组件串联为可运行的完整系统,提供 REST API 和定时调度器:
# api/main.py
from fastapi import FastAPI, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from tasks.celery_tasks import analyze_stock, engine, ScreeningResult
from data.collector import push_to_queue
import redis, json, os
from apscheduler.schedulers.background import BackgroundScheduler
api = FastAPI(title="LLM Stock Screener API", version="1.0.0")
api.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
r = redis.from_url(os.getenv("REDIS_URL"))
STOCK_POOL = os.getenv("STOCK_POOL", "").split(",")
def scheduled_screening():
"""每5分钟采集数据并触发分析"""
for ticker in STOCK_POOL:
push_to_queue(ticker.strip())
# 消费队列,触发 Celery 任务
while True:
item = r.rpop("analysis_queue")
if not item:
break
analyze_stock.delay(item.decode())
# 定时调度器
scheduler = BackgroundScheduler()
scheduler.add_job(scheduled_screening, "interval", minutes=5)
@api.on_event("startup")
async def startup():
scheduler.start()
print("调度器已启动,每5分钟自动运行一次筛选")
@api.get("/results")
def get_results(min_score: float = 0.0, recommendation: str = None):
"""获取筛选结果,支持按评分和推荐类型过滤"""
with Session(engine) as session:
q = session.query(ScreeningResult).filter(
ScreeningResult.final_score >= min_score
)
if recommendation:
q = q.filter(ScreeningResult.recommendation == recommendation.upper())
results = q.order_by(ScreeningResult.final_score.desc()).limit(50).all()
return [{
"ticker": r.ticker,
"score": r.final_score,
"recommendation": r.recommendation,
"rationale": r.rationale,
"run_at": r.run_at.isoformat(),
} for r in results]
@api.post("/trigger")
def manual_trigger(background_tasks: BackgroundTasks):
"""手动触发一次全量筛选"""
background_tasks.add_task(scheduled_screening)
return {"status": "triggered", "pool_size": len(STOCK_POOL)}
启动全部服务
# 终端1:启动 FastAPI 主服务
uvicorn api.main:api --reload --port 8000
# 终端2:启动 Celery 工作进程
celery -A tasks.celery_tasks.app worker --loglevel=info --concurrency=4
# 终端3:验证 API
curl http://localhost:8000/trigger -X POST
curl "http://localhost:8000/results?min_score=7&recommendation=BUY"
✅
验证系统运行:触发一次手动筛选后约 30-60 秒(取决于 LLM 响应速度),访问
/results 接口应能看到评分结果。若返回空数组,检查 Celery 日志中是否有错误信息。常见问题
LLM 分析结果不稳定,同一只股票每次评分差异较大怎么办?
设置
temperature=0.0 降低随机性;使用 Few-Shot 提示词,在 Prompt 中提供2-3个参考案例;对高置信度决策设置阈值(如评分<3或>8才执行操作)。yfinance 频繁返回空数据如何处理?
yfinance 依赖 Yahoo Finance 非官方接口,可能因 IP 被限速。建议:添加随机延迟(0.5-2秒);使用代理轮换;切换至 Alpha Vantage 或 Polygon.io 等商业 API 作为备用数据源。
如何支持 A 股数据?
将 yfinance 替换为 akshare 库:
akshare.stock_zh_a_spot_em() 获取沪深全市场实时行情,支持所有 A 股代码(如 600519.SH)。如何降低 LLM API 调用成本?
缓存相同 ticker 在同一天内的分析结果(Redis TTL=3600s);对基本面变化不大的股票降低分析频率;使用 DeepSeek-Chat 等性价比高的模型(成本约为 GPT-4o 的 1/30)。
本教程关键收获
- 基于 FastAPI + Celery + Redis 构建可扩展的异步处理架构
- 使用 LangGraph 的 StateGraph 编排多智能体协作流程
- 通过 Pydantic 结构化输出确保 LLM 返回可程序化处理的 JSON
- yfinance + feedparser 构建实时多源数据采集管道
- 三层智能体分工(基本面/情绪/估值)+ 首席汇总的多角色架构
- APScheduler 实现定时自动运行,全程无人干预
⚠️
生产环境建议:在部署到实盘辅助场景前,务必进行充分的历史数据回测,设置明确的止损策略,并对 LLM 输出进行人工审核。本系统适合作为投研辅助工具,不建议全自动执行交易指令。