DATAGEN 架构分析
date
Oct 29, 2025
slug
DATAGEN
status
Published
tags
Agent
Website
summary
DATAGEN 架构分析
type
Post
网址
DATAGENstarpig1129 • Updated Oct 28, 2025
DATAGEN
starpig1129 • Updated Oct 28, 2025
系统概览
DATAGEN 是一个基于 LangChain 与 LangGraph 的多智能体数据分析平台。它通过“状态—节点—路由—工作流图”的组合来编排多个专职代理(Hypothesis、Process、Visualization、Search、Coder、Report、QualityReview、NoteTaker、Refiner)完成从假设生成、数据分析、可视化、检索、报告撰写到质量审查与最终精炼的人机协同闭环。

- 入口:
DATAGEN/main.py调用src/system.py的MultiAgentSystem.run。
- 编排:
src/core/workflow.py使用 LangGraph 构建StateGraph,定义节点与条件边。
- 状态:
src/core/state.py以TypedDict描述完整对话与任务上下文,消息序列用add_messages汇聚。
- 节点:
src/core/node.py实现各代理节点处理、人类交互节点以及精炼节点逻辑。
- 路由:
src/core/router.py根据状态字段决定下一跳节点(假设、流程分支、质量回流与记要聚合)。
- 模型:
src/core/language_models.py+src/llm/*通过 ProviderFactory 与config/agent_models.yaml选择并实例化不同模型提供者。
- 工具:
src/tools/*提供文件读写、网页抓取、目录列举等能力供不同代理调用。
承载层(System)
src/system.py::MultiAgentSystem- 环境初始化:加载
.env,设置OPENAI_API_KEY、GOOGLE_API_KEY、LANGCHAIN_API_KEY等;确保WORKING_DIRECTORY存在;开启 LangChain Tracing。 - 组件装配:创建
LanguageModelManager与WorkflowManager(传入工作目录)。 - 运行入口:
run(user_input)构造初始State并以graph.stream(..., stream_mode="values")运行,逐步打印消息。
- 初始状态字段(简化):
messages: 包含首个HumanMessage(user_input)。- 其他上下文字段:
hypothesis、process_decision、process、visualization_state、searcher_state、code_state、report_section、quality_review、needs_revision、last_sender。
配置与模型提供者
src/config.py- 环境变量:
WORKING_DIRECTORY、OPENAI_API_KEY、GOOGLE_API_KEY、FIRECRAWL_API_KEY、CHROMEDRIVER_PATH、VENV_PATH等。 AgentModelsConfig从config/agent_models.yaml加载每个代理的provider与model_config,通过全局AGENT_MODELS提供查询。
config/agent_models.yaml- 针对每个代理配置模型提供者与参数。例如默认多数代理使用
provider: google,model: gemini-2.5-...。 - 可按代理粒度灵活切换提供者与温度、其他模型参数。
src/llm/factory.py::ProviderFactory- 支持
openai、anthropic、google、ollama、azure、groq等提供者,返回其get_model_class()。 - 示例:
src/llm/google.py返回ChatGoogleGenerativeAI;src/llm/azure.py返回AzureChatOpenAI。
src/core/language_models.py::LanguageModelManager- 基于
AGENT_MODELS获取代理的提供者与模型参数,实例化具体模型类。
代理层(Agents)
- 抽象基类:
src/agents/base.py::BaseAgent - 负责:统一的代理创建流程,包含模型获取、工具列表与系统提示(role prompt)组装。
- 系统提示:若子类
_get_system_prompt()前缀为SYSTEM_PROMPT:,则直接使用;否则拼装通用提示并附带团队成员、工具名。 - 代理执行体:通过
langchain.agents.create_agent(model, tools, system_prompt, response_format)创建;invoke(state)转发调用。 - 结构化输出:子类可提供
response_format(如pydantic模型),使节点能解析结构化结果。
- 代理工厂:
src/agents/factory.py::AgentFactory - 名称到类的映射:
visualization_agent、code_agent、searcher_agent、report_agent、quality_review_agent、refiner_agent、hypothesis_agent、process_agent、note_agent。 WorkflowManager.create_agents()使用工厂为各节点创建实例并缓存。
- 典型代理职责与要点:
- HypothesisAgent:生成/优化研究假设。
- ProcessAgent:总控与调度,输出
ProcessRouteSchema(next决策与task指令)。 - VisualizationAgent:生成与解释数据可视化。
- CodeAgent:编写可复现的数据分析代码。
- SearchAgent:检索资料(含 Wikipedia 工具、网页抓取)。
- ReportAgent:撰写/整合研究报告片段。
- QualityReviewAgent:质量审查,给出
feedback与needs_revision。 - NoteAgent:记录并重构全局
State(NoteState),包含消息裁剪与合并逻辑。 - RefinerAgent:对成品报告进行最终精炼(可读入
.md与.png作为材料)。
工具层(Tools)
src/tools/FileEdit.py- 文件与数据操作:
create_document、read_document、edit_document、collect_data(读 CSV/数据帧),确保WORKING_DIRECTORY存在。 - 典型用途:Report/Code/Refiner 等代理产出与读取中间成果。
src/tools/internet.py- 抓取与搜索:
google_search、scrape_webpages,可用WebBaseLoader、FireCrawlLoader或 Selenium(需CHROMEDRIVER_PATH)。
src/tools/basetool.py- 目录与通用工具:如
list_directory等供代理了解工作目录状态。
状态层(State)
src/core/state.py- 类型:
TypedDict,消息序列使用add_messages聚合器。 - 字段:
hypothesis、process、process_decision、visualization_state、searcher_state、code_state、report_section、quality_review、needs_revision、sender等。
节点与路由(Node & Router)
src/core/node.pyagent_node(state, agent, name): 调用代理并将结果以AIMessage写入messages,并根据代理类型更新特定状态字段:hypothesis_agent→hypothesisprocess_agent→process、process_decision(结构化)visualization_agent→visualization_statesearcher_agent→searcher_statereport_agent→report_sectionquality_review_agent→quality_review、needs_revision(结构化)human_choice_node(state): 交互式选择继续流程或重生假设(命令行输入)。note_agent_node(state, agent, name): 裁剪超长消息(头尾保留),调用记要代理返回NoteState,合并到整体State,并重设sender='note_agent'。human_review_node(state): 展示进展并询问是否继续分析或结束(更新needs_revision与追加HumanMessage)。refiner_node(state, agent, name): 读取WORKING_DIRECTORY下.md与.png构造材料消息,交给精炼代理处理后写回messages。
src/core/router.pyhypothesis_router(state): 若process == "Continue the research process"走Process;否则走Hypothesis(用于人类选择后回到流程或重新生成假设)。process_router(state): 基于process_decision在Coder/Search/Visualization/Report四者间路由;若FINISH→Refiner;非法值默认回Process。QualityReview_router(state): 若needs_revision为真,则根据前一个消息的name回到具体代理(如visualization_agent→Visualization);否则走NoteTaker。
工作流图(Workflow)
src/core/workflow.py中节点与边:- 节点:
Hypothesis、Process、Visualization、Search、Coder、Report、QualityReview、NoteTaker、HumanChoice、HumanReview、Refiner。 - 边与条件:
START → Hypothesis → HumanChoiceHumanChoice条件:Hypothesis或ProcessProcess条件:Coder | Search | Visualization | Report | Process | RefinerVisualization/Search/Coder/Report → QualityReviewQualityReview条件:Visualization | Search | Coder | Report | NoteTakerNoteTaker → ProcessRefiner → HumanReviewHumanReview条件:Process(若needs_revision=True)或END- 编译:使用
MemorySaver持久化检查点;self.graph = self.workflow.compile()。
- ASCII 流程示意:
数据流与控制流
- 消息流:各代理在
agent_node产生AIMessage并追加到messages;人类交互节点产生HumanMessage注入请求或选择。
- 状态流:代理按职责更新相应字段;
NoteAgent汇总与重构全局状态;路由器根据状态字段决定下一节点。
- 终止条件:
ProcessAgent输出next="FINISH"进入Refiner,HumanReview若无需继续(needs_revision=False)则走END。
扩展与自定义
- 新增代理:
- 新建子类实现
_get_system_prompt()与_get_tools();必要时定义response_format。 - 在
AgentFactory映射中加入代理名称;在WorkflowManager.setup_workflow()中添加节点与边;在router.py中补充路由逻辑。
- 切换/微调模型:
- 编辑
config/agent_models.yaml,为目标代理选择provider与model_config(如temperature、model)。
- 工具扩展:
- 在
src/tools/添加新工具并在对应代理的_get_tools()返回;确保工具具备清晰的name与输入/输出约定。
健壮性与限制
- 人类交互:
human_choice_node与human_review_node需要命令行输入,自动化运行时需替换或注入默认策略。
- 状态增长:长对话可能导致消息膨胀,
note_agent_node采取头尾保留与中段裁剪以控制 token。
- 错误处理:
agent_node与其他节点异常时会记录日志并将错误写入messages;system.py对缺失 API Key 进行提示。
- 模型依赖:默认配置使用 Google 提供者,需
GOOGLE_API_KEY;其他提供者需相应凭据。
- 递归限制:
run()使用recursion_limit=3000,注意在复杂循环下的性能与成本。
运行示例
- 在
DATAGEN/main.py中示例:
- 启动:在项目根目录执行
python DATAGEN/main.py(确保虚拟环境与 API Key 就绪)。
总结
DATAGEN 通过 LangGraph 的显式状态机与多代理协作,将数据分析流程拆解为可编排、可回路修正的模块化步骤。其清晰的配置—提供者—代理—工具—工作流分层使得扩展与替换变得简单,同时保留了人类在关键节点上的掌控力与质量闭环能力。