首页 / 全球金融日报 / 用LLM构建实时股票筛选系统:多智能体架构实战指南 2 次阅读
用LLM构建实时股票筛选系统:多智能体架构实战指南
全球金融日报 实战教程
中级难度 · 实战教程

用LLM构建实时股票筛选系统
多智能体架构实战指南

📅 2026年2月28日 ⏱ 预计阅读 25 分钟 🐍 Python 3.11+ 🤖 LangGraph · FastAPI

大语言模型正在重塑金融量化分析的范式。传统股票筛选依赖固定规则和硬编码指标,而LLM驱动的筛选系统能够理解复杂的市场语义、动态生成筛选条件,并结合新闻情绪与基本面数据进行多维度综合判断。本教程将从零构建一套完整的生产级股票筛选系统,涵盖实时数据采集、多智能体协作、FastAPI后端与持续决策引擎。

ℹ️
免责声明:本教程仅用于技术学习和研究目的。系统输出不构成任何投资建议,投资有风险,入市需谨慎。

第一步:系统架构概览

系统架构概览

整个系统分为四层,各层职责清晰,通过异步消息队列解耦:

Layer 1
数据层
  • yfinance 实时行情
  • RSS 新闻流
  • Alpha Vantage 财报
  • Redis 缓存
Layer 2
智能体层
  • 基本面分析师
  • 情绪分析师
  • 估值分析师
  • 首席决策者
Layer 3
执行层
  • FastAPI 服务
  • Celery 任务队列
  • SQLite 结果库
  • WebSocket 推送
Layer 4
展示层
  • React 18 前端
  • 实时数据看板
  • 筛选结果排序
  • 交互式报告

核心数据流如下:定时任务每5分钟拉取一批股票的实时行情和新闻,推入 Redis 队列;Celery 工作进程消费队列,将数据分发给对应的 LLM 智能体进行分析;智能体协作完成后,首席决策者汇总评分并写入 SQLite;FastAPI 通过 WebSocket 将结果实时推送到前端看板。

架构选型原则:各组件均可水平扩展。LLM 供应商通过统一接口抽象,可随时切换 OpenAI、DeepSeek 或 Groq,无需修改业务逻辑。

第二步:环境准备与依赖安装

环境准备流程
1

创建项目结构

建立清晰的目录结构,分离数据层、智能体层与 API 层:

mkdir stock-screener && cd stock-screener
mkdir -p {agents,api,data,tasks,frontend/src}
touch .env requirements.txt docker-compose.yml
2

安装 Python 依赖

建议使用 Python 3.11+,核心依赖包括数据获取、LangGraph 以及 FastAPI:

# requirements.txt
# 数据层
yfinance==0.2.51
akshare==1.14.40        # A股数据支持
feedparser==6.0.11      # RSS新闻
httpx==0.28.1

# LLM与智能体
langchain==0.3.20
langgraph==0.2.73
langchain-openai==0.2.14
langchain-community==0.3.19

# 后端
fastapi==0.115.8
uvicorn[standard]==0.34.0
celery[redis]==5.4.0
redis==5.2.1
sqlalchemy==2.0.38

# 工具
python-dotenv==1.0.1
pydantic==2.10.6
pandas==2.2.3
pip install -r requirements.txt
3

配置环境变量

# .env
OPENAI_API_KEY=sk-xxx          # 或 DeepSeek/Groq 兼容密钥
OPENAI_BASE_URL=https://api.deepseek.com/v1
LLM_MODEL=deepseek-chat

REDIS_URL=redis://localhost:6379/0
DATABASE_URL=sqlite:///./screener.db
ALPHA_VANTAGE_KEY=your_key_here

# 每批筛选的股票池
STOCK_POOL=AAPL,MSFT,NVDA,GOOGL,META,TSLA,AMZN,BRK-B
4

启动 Redis(Docker 方式)

docker run -d --name redis-screener \
  -p 6379:6379 \
  redis:7-alpine \
  redis-server --maxmemory 512mb --maxmemory-policy allkeys-lru

第三步:数据采集管道实现

数据采集管道架构

数据质量是整个系统的基石。我们需要同时采集三类数据:实时价格、公司基本面、以及市场情绪新闻。

实时行情采集器

# data/collector.py
import yfinance as yf
import feedparser
import redis
import json
from datetime import datetime
from typing import Optional
import os

r = redis.from_url(os.getenv("REDIS_URL", "redis://localhost:6379/0"))

def fetch_stock_snapshot(ticker: str) -> dict:
    """获取单只股票的完整快照数据"""
    stock = yf.Ticker(ticker)
    info = stock.info
    hist = stock.history(period="5d", interval="1d")

    # 计算技术指标
    closes = hist["Close"].tolist()
    sma5 = sum(closes[-5:]) / min(5, len(closes)) if closes else None

    return {
        "ticker": ticker,
        "timestamp": datetime.now().isoformat(),
        # 价格数据
        "current_price": info.get("currentPrice"),
        "day_high": info.get("dayHigh"),
        "day_low": info.get("dayLow"),
        "volume": info.get("volume"),
        "avg_volume": info.get("averageVolume"),
        "sma_5d": round(sma5, 2) if sma5 else None,
        # 基本面
        "pe_ratio": info.get("trailingPE"),
        "forward_pe": info.get("forwardPE"),
        "revenue_growth": info.get("revenueGrowth"),
        "profit_margin": info.get("profitMargins"),
        "roe": info.get("returnOnEquity"),
        "debt_to_equity": info.get("debtToEquity"),
        "market_cap": info.get("marketCap"),
        "sector": info.get("sector", "Unknown"),
        "company_name": info.get("longName", ticker),
    }

def fetch_news_sentiment(ticker: str, limit: int = 5) -> list[dict]:
    """从 Yahoo Finance RSS 获取最新新闻标题"""
    url = f"https://feeds.finance.yahoo.com/rss/2.0/headline?s={ticker}®ion=US&lang=en-US"
    feed = feedparser.parse(url)
    news = []
    for entry in feed.entries[:limit]:
        news.append({
            "title": entry.title,
            "summary": getattr(entry, "summary", "")[:200],
            "published": getattr(entry, "published", ""),
        })
    return news

def push_to_queue(ticker: str) -> None:
    """采集数据并推入 Redis 任务队列"""
    snapshot = fetch_stock_snapshot(ticker)
    snapshot["news"] = fetch_news_sentiment(ticker)
    r.lpush("analysis_queue", json.dumps(snapshot))
    r.expire("analysis_queue", 3600)  # 1小时过期
    print(f"[{ticker}] 数据已推入队列")

if __name__ == "__main__":
    tickers = os.getenv("STOCK_POOL", "").split(",")
    for t in tickers:
        push_to_queue(t.strip())
⚠️
API 限速注意:yfinance 免费版有请求频率限制。生产环境中建议为每个 ticker 请求间加 0.5s 延迟,或升级使用付费数据源(如 Polygon.io、Alpha Vantage Premium)。

第四步:LLM 智能代理设计

LLM智能代理设计

每个智能代理专注于单一分析维度,通过结构化输出保证下游可程序化消费。我们使用 LangChain 的结构化输出能力,强制 LLM 返回 Pydantic 模型定义的 JSON 格式。

# agents/base.py
from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
import os

def get_llm(temperature: float = 0.1) -> ChatOpenAI:
    """获取统一配置的 LLM 客户端,支持 OpenAI 兼容协议"""
    return ChatOpenAI(
        model=os.getenv("LLM_MODEL", "deepseek-chat"),
        api_key=os.getenv("OPENAI_API_KEY"),
        base_url=os.getenv("OPENAI_BASE_URL"),
        temperature=temperature,
        max_retries=3,
        timeout=30,
    )

# ── 基本面分析师 ──
class FundamentalSignal(BaseModel):
    score: float = Field(ge=0, le=10, description="基本面综合评分 0-10")
    pe_assessment: str = Field(description="PE估值评价")
    growth_signal: str = Field(description="成长性信号: STRONG/MODERATE/WEAK")
    key_risks: list[str] = Field(description="主要风险点列表")
    summary: str = Field(description="50字以内的基本面总结")

FUNDAMENTAL_PROMPT = ChatPromptTemplate.from_template("""
你是一位资深基本面分析师。请根据以下财务数据,对该股票进行基本面评估。

股票: {ticker}
当前价格: ${current_price}
市盈率(PE): {pe_ratio}
预期PE: {forward_pe}
营收增长: {revenue_growth}
利润率: {profit_margin}
ROE: {roe}
负债权益比: {debt_to_equity}
市值: ${market_cap}
行业: {sector}

请严格按照 JSON 格式输出评估结果,评分标准:
- 9-10分: 优质成长股,各项指标突出
- 7-8分: 良好,部分指标优秀
- 5-6分: 一般,需持续观察
- 3-4分: 较弱,存在明显风险
- 0-2分: 差,建议回避
""")

def analyze_fundamentals(data: dict) -> FundamentalSignal:
    llm = get_llm()
    chain = FUNDAMENTAL_PROMPT | llm.with_structured_output(FundamentalSignal)
    return chain.invoke(data)
# agents/sentiment.py — 情绪分析师
from pydantic import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate
from agents.base import get_llm

class SentimentSignal(BaseModel):
    overall_sentiment: str = Field(description="BULLISH/NEUTRAL/BEARISH")
    score: float = Field(ge=-1, le=1, description="情绪得分 -1到1")
    key_catalysts: list[str] = Field(description="主要催化剂(利多/利空)")
    news_highlights: list[str] = Field(description="重要新闻摘要")

SENTIMENT_PROMPT = ChatPromptTemplate.from_template("""
你是一位专业的市场情绪分析师。分析以下新闻标题对 {ticker} 股价的潜在影响。

公司: {company_name}
最新新闻:
{news_text}

请分析这些新闻的整体情绪倾向和主要催化剂。对于每条新闻,
判断其是利多、利空还是中性,并给出综合情绪评分。
""")

def analyze_sentiment(data: dict) -> SentimentSignal:
    news_text = "\n".join(
        [f"- {n['title']}" for n in data.get("news", [])]
    ) or "暂无最新新闻"
    llm = get_llm(temperature=0.0)
    chain = SENTIMENT_PROMPT | llm.with_structured_output(SentimentSignal)
    return chain.invoke({**data, "news_text": news_text})

第五步:多智能体协作架构(LangGraph)

多智能体协作架构

LangGraph 允许我们将多个智能体组织成有向状态图(StateGraph),支持并行执行、条件分支和状态共享。三位分析师并行工作,最后由首席决策者汇总。

# agents/graph.py
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from typing import TypedDict, Annotated, Optional
from pydantic import BaseModel, Field
from agents.base import analyze_fundamentals, get_llm
from agents.sentiment import analyze_sentiment

# ── 共享状态定义 ──
class ScreenerState(TypedDict):
    stock_data: dict               # 原始股票数据
    fundamental: Optional[dict]   # 基本面分析结果
    sentiment: Optional[dict]     # 情绪分析结果
    valuation: Optional[dict]     # 估值分析结果
    final_score: Optional[float]  # 综合评分
    recommendation: Optional[str] # BUY/HOLD/AVOID
    rationale: Optional[str]      # 决策理由

# ── 估值分析节点 ──
class ValuationSignal(BaseModel):
    intrinsic_value_estimate: float
    margin_of_safety: float = Field(description="安全边际百分比")
    valuation_method: str = Field(description="使用的估值方法")
    score: float = Field(ge=0, le=10)

def run_fundamental_node(state: ScreenerState) -> ScreenerState:
    result = analyze_fundamentals(state["stock_data"])
    return {**state, "fundamental": result.model_dump()}

def run_sentiment_node(state: ScreenerState) -> ScreenerState:
    result = analyze_sentiment(state["stock_data"])
    return {**state, "sentiment": result.model_dump()}

def run_valuation_node(state: ScreenerState) -> ScreenerState:
    from langchain_core.prompts import ChatPromptTemplate
    prompt = ChatPromptTemplate.from_template("""
    对 {ticker}(当前价 ${current_price},PE {pe_ratio},ROE {roe})
    进行简单估值分析,计算内在价值并判断安全边际。
    """)
    llm = get_llm()
    chain = prompt | llm.with_structured_output(ValuationSignal)
    result = chain.invoke(state["stock_data"])
    return {**state, "valuation": result.model_dump()}

def chief_decision_node(state: ScreenerState) -> ScreenerState:
    """首席分析师:汇总三路信号,生成最终评级"""
    from langchain_core.prompts import ChatPromptTemplate
    class FinalDecision(BaseModel):
        final_score: float = Field(ge=0, le=10)
        recommendation: str = Field(description="BUY/HOLD/AVOID")
        rationale: str = Field(description="100字以内的决策理由")

    prompt = ChatPromptTemplate.from_template("""
    你是首席分析师,请综合以下三位分析师的报告,给出最终评级。

    股票: {ticker}
    基本面报告: {fundamental}
    情绪报告: {sentiment}
    估值报告: {valuation}

    BUY: 综合评分>=7,各维度均衡
    HOLD: 综合评分5-7,存在不确定性
    AVOID: 综合评分<5,或存在重大风险
    """)
    llm = get_llm(temperature=0.0)
    chain = prompt | llm.with_structured_output(FinalDecision)
    result = chain.invoke({
        "ticker": state["stock_data"]["ticker"],
        "fundamental": str(state["fundamental"]),
        "sentiment": str(state["sentiment"]),
        "valuation": str(state["valuation"]),
    })
    return {**state, "final_score": result.final_score,
            "recommendation": result.recommendation,
            "rationale": result.rationale}

# ── 构建图并编译 ──
def build_screener_graph():
    graph = StateGraph(ScreenerState)
    graph.add_node("fundamental", run_fundamental_node)
    graph.add_node("sentiment", run_sentiment_node)
    graph.add_node("valuation", run_valuation_node)
    graph.add_node("chief", chief_decision_node)

    # 并行启动三位分析师
    graph.set_entry_point("fundamental")
    graph.add_edge("fundamental", "sentiment")
    graph.add_edge("sentiment", "valuation")
    graph.add_edge("valuation", "chief")
    graph.add_edge("chief", END)
    return graph.compile()

screener_app = build_screener_graph()
💡
并行优化:LangGraph 支持使用 graph.add_node 配合 Send API 实现真正的并行分析。当前教程采用串行简化版本,生产中可将三位分析师改为 parallel 分支以减少约 60% 的等待时间。

第六步:筛选策略与执行引擎

筛选策略对比

系统内置多种经典筛选策略,LLM 会根据市场环境动态选择最合适的策略组合:

策略 核心条件 适用市场 LLM增强点
CANSLIM EPS增长>25%,相对强度≥80 牛市 自动识别季报超预期
Minervini 价格>150/200日均线,均线多头排列 趋势市 新闻催化剂验证
价值投资 PE<15,PB<1.5,ROE>15% 熊市/震荡 行业景气度判断
成长策略 营收增长>20%,毛利率>40% 成长市 赛道前景评估
# tasks/celery_tasks.py
from celery import Celery
from sqlalchemy import create_engine, Column, Float, String, DateTime, Text
from sqlalchemy.orm import declarative_base, Session
from datetime import datetime
import json, os
from agents.graph import screener_app

app = Celery(
    "screener",
    broker=os.getenv("REDIS_URL"),
    backend=os.getenv("REDIS_URL"),
)

Base = declarative_base()

class ScreeningResult(Base):
    __tablename__ = "results"
    id = Column(String, primary_key=True)
    ticker = Column(String, index=True)
    run_at = Column(DateTime, default=datetime.utcnow)
    final_score = Column(Float)
    recommendation = Column(String)
    rationale = Column(Text)
    raw_data = Column(Text)  # JSON

engine = create_engine(os.getenv("DATABASE_URL", "sqlite:///./screener.db"))
Base.metadata.create_all(engine)

@app.task(bind=True, max_retries=2, default_retry_delay=10)
def analyze_stock(self, stock_json: str):
    """Celery 任务:分析单只股票并存储结果"""
    try:
        stock_data = json.loads(stock_json)
        ticker = stock_data["ticker"]
        print(f"[{ticker}] 开始多智能体分析...")

        # 调用 LangGraph 状态机
        result = screener_app.invoke({
            "stock_data": stock_data,
            "fundamental": None,
            "sentiment": None,
            "valuation": None,
            "final_score": None,
            "recommendation": None,
            "rationale": None,
        })

        # 持久化到数据库
        with Session(engine) as session:
            record = ScreeningResult(
                id=f"{ticker}-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
                ticker=ticker,
                final_score=result["final_score"],
                recommendation=result["recommendation"],
                rationale=result["rationale"],
                raw_data=json.dumps(result, ensure_ascii=False),
            )
            session.add(record)
            session.commit()
        print(f"[{ticker}] 分析完成: {result['recommendation']} ({result['final_score']:.1f}分)")
        return {"ticker": ticker, "score": result["final_score"], "rec": result["recommendation"]}

    except Exception as e:
        raise self.retry(exc=e)

第七步:FastAPI 服务与系统集成

系统集成与部署流程

最后将所有组件串联为可运行的完整系统,提供 REST API 和定时调度器:

# api/main.py
from fastapi import FastAPI, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from tasks.celery_tasks import analyze_stock, engine, ScreeningResult
from data.collector import push_to_queue
import redis, json, os
from apscheduler.schedulers.background import BackgroundScheduler

api = FastAPI(title="LLM Stock Screener API", version="1.0.0")
api.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])

r = redis.from_url(os.getenv("REDIS_URL"))
STOCK_POOL = os.getenv("STOCK_POOL", "").split(",")

def scheduled_screening():
    """每5分钟采集数据并触发分析"""
    for ticker in STOCK_POOL:
        push_to_queue(ticker.strip())

    # 消费队列,触发 Celery 任务
    while True:
        item = r.rpop("analysis_queue")
        if not item:
            break
        analyze_stock.delay(item.decode())

# 定时调度器
scheduler = BackgroundScheduler()
scheduler.add_job(scheduled_screening, "interval", minutes=5)

@api.on_event("startup")
async def startup():
    scheduler.start()
    print("调度器已启动,每5分钟自动运行一次筛选")

@api.get("/results")
def get_results(min_score: float = 0.0, recommendation: str = None):
    """获取筛选结果,支持按评分和推荐类型过滤"""
    with Session(engine) as session:
        q = session.query(ScreeningResult).filter(
            ScreeningResult.final_score >= min_score
        )
        if recommendation:
            q = q.filter(ScreeningResult.recommendation == recommendation.upper())
        results = q.order_by(ScreeningResult.final_score.desc()).limit(50).all()
    return [{
        "ticker": r.ticker,
        "score": r.final_score,
        "recommendation": r.recommendation,
        "rationale": r.rationale,
        "run_at": r.run_at.isoformat(),
    } for r in results]

@api.post("/trigger")
def manual_trigger(background_tasks: BackgroundTasks):
    """手动触发一次全量筛选"""
    background_tasks.add_task(scheduled_screening)
    return {"status": "triggered", "pool_size": len(STOCK_POOL)}

启动全部服务

# 终端1:启动 FastAPI 主服务
uvicorn api.main:api --reload --port 8000

# 终端2:启动 Celery 工作进程
celery -A tasks.celery_tasks.app worker --loglevel=info --concurrency=4

# 终端3:验证 API
curl http://localhost:8000/trigger -X POST
curl "http://localhost:8000/results?min_score=7&recommendation=BUY"
验证系统运行:触发一次手动筛选后约 30-60 秒(取决于 LLM 响应速度),访问 /results 接口应能看到评分结果。若返回空数组,检查 Celery 日志中是否有错误信息。

常见问题

LLM 分析结果不稳定,同一只股票每次评分差异较大怎么办?
设置 temperature=0.0 降低随机性;使用 Few-Shot 提示词,在 Prompt 中提供2-3个参考案例;对高置信度决策设置阈值(如评分<3或>8才执行操作)。
yfinance 频繁返回空数据如何处理?
yfinance 依赖 Yahoo Finance 非官方接口,可能因 IP 被限速。建议:添加随机延迟(0.5-2秒);使用代理轮换;切换至 Alpha Vantage 或 Polygon.io 等商业 API 作为备用数据源。
如何支持 A 股数据?
将 yfinance 替换为 akshare 库:akshare.stock_zh_a_spot_em() 获取沪深全市场实时行情,支持所有 A 股代码(如 600519.SH)。
如何降低 LLM API 调用成本?
缓存相同 ticker 在同一天内的分析结果(Redis TTL=3600s);对基本面变化不大的股票降低分析频率;使用 DeepSeek-Chat 等性价比高的模型(成本约为 GPT-4o 的 1/30)。

本教程关键收获

  • 基于 FastAPI + Celery + Redis 构建可扩展的异步处理架构
  • 使用 LangGraph 的 StateGraph 编排多智能体协作流程
  • 通过 Pydantic 结构化输出确保 LLM 返回可程序化处理的 JSON
  • yfinance + feedparser 构建实时多源数据采集管道
  • 三层智能体分工(基本面/情绪/估值)+ 首席汇总的多角色架构
  • APScheduler 实现定时自动运行,全程无人干预
⚠️
生产环境建议:在部署到实盘辅助场景前,务必进行充分的历史数据回测,设置明确的止损策略,并对 LLM 输出进行人工审核。本系统适合作为投研辅助工具,不建议全自动执行交易指令。
选择栏目
今日简报 播客电台 实战教程 AI挣钱计划 关于我
栏目
全球AI日报国内AI日报全球金融日报国内金融日报全球大新闻日报国内大新闻日报Claude Code 玩法日报OpenClaw 动态日报GitHub 热门项目日报AI工具实战AI应用开发编程实战工作流自动化AI原理图解AI Agent开发AI变现案例库AI工具创收AI内容变现AI接单提效变现前沿研究
我的收藏