DATAGEN 架构分析

date
Oct 29, 2025
slug
DATAGEN
status
Published
tags
Agent
Website
summary
DATAGEN 架构分析
type
Post
网址

DATAGEN
starpig1129Updated Oct 28, 2025

系统概览

DATAGEN 是一个基于 LangChain 与 LangGraph 的多智能体数据分析平台。它通过“状态—节点—路由—工作流图”的组合来编排多个专职代理(Hypothesis、Process、Visualization、Search、Coder、Report、QualityReview、NoteTaker、Refiner)完成从假设生成、数据分析、可视化、检索、报告撰写到质量审查与最终精炼的人机协同闭环。
notion image
  • 入口:DATAGEN/main.py 调用 src/system.pyMultiAgentSystem.run
  • 编排:src/core/workflow.py 使用 LangGraph 构建 StateGraph,定义节点与条件边。
  • 状态:src/core/state.pyTypedDict 描述完整对话与任务上下文,消息序列用 add_messages 汇聚。
  • 节点:src/core/node.py 实现各代理节点处理、人类交互节点以及精炼节点逻辑。
  • 路由:src/core/router.py 根据状态字段决定下一跳节点(假设、流程分支、质量回流与记要聚合)。
  • 模型:src/core/language_models.py + src/llm/* 通过 ProviderFactory 与 config/agent_models.yaml 选择并实例化不同模型提供者。
  • 工具:src/tools/* 提供文件读写、网页抓取、目录列举等能力供不同代理调用。

承载层(System)

  • src/system.py::MultiAgentSystem
    • 环境初始化:加载 .env,设置 OPENAI_API_KEYGOOGLE_API_KEYLANGCHAIN_API_KEY 等;确保 WORKING_DIRECTORY 存在;开启 LangChain Tracing。
    • 组件装配:创建 LanguageModelManagerWorkflowManager(传入工作目录)。
    • 运行入口:run(user_input) 构造初始 State 并以 graph.stream(..., stream_mode="values") 运行,逐步打印消息。
  • 初始状态字段(简化):
    • messages: 包含首个 HumanMessage(user_input)
    • 其他上下文字段:hypothesisprocess_decisionprocessvisualization_statesearcher_statecode_statereport_sectionquality_reviewneeds_revisionlast_sender

配置与模型提供者

  • src/config.py
    • 环境变量:WORKING_DIRECTORYOPENAI_API_KEYGOOGLE_API_KEYFIRECRAWL_API_KEYCHROMEDRIVER_PATHVENV_PATH 等。
    • AgentModelsConfigconfig/agent_models.yaml 加载每个代理的 providermodel_config,通过全局 AGENT_MODELS 提供查询。
  • config/agent_models.yaml
    • 针对每个代理配置模型提供者与参数。例如默认多数代理使用 provider: googlemodel: gemini-2.5-...
    • 可按代理粒度灵活切换提供者与温度、其他模型参数。
  • src/llm/factory.py::ProviderFactory
    • 支持 openaianthropicgoogleollamaazuregroq 等提供者,返回其 get_model_class()
    • 示例:src/llm/google.py 返回 ChatGoogleGenerativeAIsrc/llm/azure.py 返回 AzureChatOpenAI
  • src/core/language_models.py::LanguageModelManager
    • 基于 AGENT_MODELS 获取代理的提供者与模型参数,实例化具体模型类。

代理层(Agents)

  • 抽象基类:src/agents/base.py::BaseAgent
    • 负责:统一的代理创建流程,包含模型获取、工具列表与系统提示(role prompt)组装。
    • 系统提示:若子类 _get_system_prompt() 前缀为 SYSTEM_PROMPT:,则直接使用;否则拼装通用提示并附带团队成员、工具名。
    • 代理执行体:通过 langchain.agents.create_agent(model, tools, system_prompt, response_format) 创建;invoke(state) 转发调用。
    • 结构化输出:子类可提供 response_format(如 pydantic 模型),使节点能解析结构化结果。
  • 代理工厂:src/agents/factory.py::AgentFactory
    • 名称到类的映射:visualization_agentcode_agentsearcher_agentreport_agentquality_review_agentrefiner_agenthypothesis_agentprocess_agentnote_agent
    • WorkflowManager.create_agents() 使用工厂为各节点创建实例并缓存。
  • 典型代理职责与要点:
    • HypothesisAgent:生成/优化研究假设。
    • ProcessAgent:总控与调度,输出 ProcessRouteSchemanext 决策与 task 指令)。
    • VisualizationAgent:生成与解释数据可视化。
    • CodeAgent:编写可复现的数据分析代码。
    • SearchAgent:检索资料(含 Wikipedia 工具、网页抓取)。
    • ReportAgent:撰写/整合研究报告片段。
    • QualityReviewAgent:质量审查,给出 feedbackneeds_revision
    • NoteAgent:记录并重构全局 StateNoteState),包含消息裁剪与合并逻辑。
    • RefinerAgent:对成品报告进行最终精炼(可读入 .md.png 作为材料)。

工具层(Tools)

  • src/tools/FileEdit.py
    • 文件与数据操作:create_documentread_documentedit_documentcollect_data(读 CSV/数据帧),确保 WORKING_DIRECTORY 存在。
    • 典型用途:Report/Code/Refiner 等代理产出与读取中间成果。
  • src/tools/internet.py
    • 抓取与搜索:google_searchscrape_webpages,可用 WebBaseLoaderFireCrawlLoader 或 Selenium(需 CHROMEDRIVER_PATH)。
  • src/tools/basetool.py
    • 目录与通用工具:如 list_directory 等供代理了解工作目录状态。

状态层(State)

  • src/core/state.py
    • 类型:TypedDict,消息序列使用 add_messages 聚合器。
    • 字段:hypothesisprocessprocess_decisionvisualization_statesearcher_statecode_statereport_sectionquality_reviewneeds_revisionsender 等。

节点与路由(Node & Router)

  • src/core/node.py
    • agent_node(state, agent, name): 调用代理并将结果以 AIMessage 写入 messages,并根据代理类型更新特定状态字段:
      • hypothesis_agenthypothesis
      • process_agentprocessprocess_decision(结构化)
      • visualization_agentvisualization_state
      • searcher_agentsearcher_state
      • report_agentreport_section
      • quality_review_agentquality_reviewneeds_revision(结构化)
    • human_choice_node(state): 交互式选择继续流程或重生假设(命令行输入)。
    • note_agent_node(state, agent, name): 裁剪超长消息(头尾保留),调用记要代理返回 NoteState,合并到整体 State,并重设 sender='note_agent'
    • human_review_node(state): 展示进展并询问是否继续分析或结束(更新 needs_revision 与追加 HumanMessage)。
    • refiner_node(state, agent, name): 读取 WORKING_DIRECTORY.md.png 构造材料消息,交给精炼代理处理后写回 messages
  • src/core/router.py
    • hypothesis_router(state): 若 process == "Continue the research process"Process;否则走 Hypothesis(用于人类选择后回到流程或重新生成假设)。
    • process_router(state): 基于 process_decisionCoder/Search/Visualization/Report 四者间路由;若 FINISHRefiner;非法值默认回 Process
    • QualityReview_router(state): 若 needs_revision 为真,则根据前一个消息的 name 回到具体代理(如 visualization_agentVisualization);否则走 NoteTaker

工作流图(Workflow)

  • src/core/workflow.py 中节点与边:
    • 节点:HypothesisProcessVisualizationSearchCoderReportQualityReviewNoteTakerHumanChoiceHumanReviewRefiner
    • 边与条件:
      • START → Hypothesis → HumanChoice
      • HumanChoice 条件:HypothesisProcess
      • Process 条件:Coder | Search | Visualization | Report | Process | Refiner
      • Visualization/Search/Coder/Report → QualityReview
      • QualityReview 条件:Visualization | Search | Coder | Report | NoteTaker
      • NoteTaker → Process
      • Refiner → HumanReview
      • HumanReview 条件:Process(若 needs_revision=True)或 END
    • 编译:使用 MemorySaver 持久化检查点;self.graph = self.workflow.compile()
  • ASCII 流程示意:

数据流与控制流

  • 消息流:各代理在 agent_node 产生 AIMessage 并追加到 messages;人类交互节点产生 HumanMessage 注入请求或选择。
  • 状态流:代理按职责更新相应字段;NoteAgent 汇总与重构全局状态;路由器根据状态字段决定下一节点。
  • 终止条件:ProcessAgent 输出 next="FINISH" 进入 RefinerHumanReview 若无需继续(needs_revision=False)则走 END

扩展与自定义

  • 新增代理:
    • 新建子类实现 _get_system_prompt()_get_tools();必要时定义 response_format
    • AgentFactory 映射中加入代理名称;在 WorkflowManager.setup_workflow() 中添加节点与边;在 router.py 中补充路由逻辑。
  • 切换/微调模型:
    • 编辑 config/agent_models.yaml,为目标代理选择 providermodel_config(如 temperaturemodel)。
  • 工具扩展:
    • src/tools/ 添加新工具并在对应代理的 _get_tools() 返回;确保工具具备清晰的 name 与输入/输出约定。

健壮性与限制

  • 人类交互:human_choice_nodehuman_review_node 需要命令行输入,自动化运行时需替换或注入默认策略。
  • 状态增长:长对话可能导致消息膨胀,note_agent_node 采取头尾保留与中段裁剪以控制 token。
  • 错误处理:agent_node 与其他节点异常时会记录日志并将错误写入 messagessystem.py 对缺失 API Key 进行提示。
  • 模型依赖:默认配置使用 Google 提供者,需 GOOGLE_API_KEY;其他提供者需相应凭据。
  • 递归限制:run() 使用 recursion_limit=3000,注意在复杂循环下的性能与成本。

运行示例

  • DATAGEN/main.py 中示例:
  • 启动:在项目根目录执行 python DATAGEN/main.py(确保虚拟环境与 API Key 就绪)。

总结

DATAGEN 通过 LangGraph 的显式状态机与多代理协作,将数据分析流程拆解为可编排、可回路修正的模块化步骤。其清晰的配置—提供者—代理—工具—工作流分层使得扩展与替换变得简单,同时保留了人类在关键节点上的掌控力与质量闭环能力。

© chen_yan 2024 - 2026