Skip to content

数据流

本文档描述 HeurAMS 中数据在组件间的流动和处理过程.

数据流总览

mermaid
graph TB
    subgraph "用户交互"
        UI[用户界面]
        Input[用户输入]
        Output[界面输出]
    end
    
    subgraph "业务处理"
        SVC[服务层]
        KRN[内核层]
        PRV[提供者层]
    end
    
    subgraph "数据存储"
        CFG[配置文件]
        NUC[Nucleon 内容]
        ELE[Electron 状态]
        ORB[Orbital 策略]
        CCH[缓存文件]
    end
    
    subgraph "外部服务"
        TTS[微软 TTS]
        LLM[大语言模型]
        AUD[系统音频]
    end
    
    UI --> SVC
    SVC --> KRN
    KRN --> PRV
    PRV --> TTS
    PRV --> LLM
    PRV --> AUD
    
    SVC --> CFG
    KRN --> NUC
    KRN --> ELE
    KRN --> ORB
    KRN --> CCH
    
    TTS --> CCH
    LLM --> CCH

核心数据流

1. 应用启动数据流

用户启动 → 解析参数 → 加载配置 → 初始化上下文 → 启动服务 → 显示界面
    ↓           ↓           ↓            ↓           ↓          ↓
命令行     默认配置     用户配置     ContextVar   提供者     Textual
           ↓           ↓           ↓           ↓          ↓
        config.toml config/      config_var  audio/     screens/
                                              tts/       widgets/

详细步骤

  1. 命令行解析

    • 读取 sys.argv
    • 解析选项和参数
    • 设置初始工作目录
  2. 配置加载

    • 检查 config/config.toml(用户配置)
    • 如果不存在, 使用 src/heurams/default/config/config.toml(默认配置)
    • 合并配置, 用户配置优先
    • 验证配置有效性
  3. 上下文初始化

    • 创建 config_var 上下文变量
    • 设置当前配置
    • 初始化日志系统
  4. 服务启动

    • 根据配置加载提供者
    • 初始化音频、TTS、LLM 服务
    • 建立服务到提供者的映射
  5. 界面显示

    • 创建 Textual 应用实例
    • 加载屏幕和组件
    • 应用 CSS 样式
    • 进入主事件循环

2. 复习流程数据流

获取到期内容 → 排序优先级 → 显示谜题 → 用户回答 → 评估反馈 → 更新状态
     ↓             ↓           ↓          ↓          ↓           ↓
扫描Electron   队列算法    选择谜题    界面输入    谜题评估   算法更新
     ↓             ↓           ↓          ↓          ↓           ↓
next_date≤今天  逾期优先   Orbital配置  用户交互   正确/错误   SM-2/FSRS

详细步骤

  1. 获取到期内容

    python
    # 扫描所有 Electron 文件
    for electron_path in electron_dir.glob("**/*.json"):
        electron = load_electron(electron_path)
        if electron.next_date <= current_daystamp:
            due_atoms.append(create_atom(electron.ident))
  2. 排序优先级

    • 严重逾期:超过7天以上
    • 一般逾期:1-7天
    • 今日到期:刚好到期
    • 新内容:首次学习(如果每日限额未满)
  3. 显示谜题

    • 根据 Orbital 配置选择谜题类型
    • 使用谜题生成器从 Nucleon 创建题目
    • 在界面显示问题, 等待用户尝试回忆
    • 用户请求显示答案
  4. 用户评估

    • 用户自我评估记忆质量 (0-5)
    • 系统将评估转换为算法 quality 值
    • 记录评估时间和上下文信息
  5. 更新算法状态

    python
    # SM-2 算法更新
    algodata = electron.data["SM-2"]
    SM2Algorithm.revisor(algodata, quality, is_new_activation)
    electron.save()

3. 内容创建数据流

用户输入 → 应用模板 → 解析宏 → 验证数据 → 创建文件 → 初始化状态
   ↓         ↓         ↓         ↓         ↓           ↓
界面表单   模板引擎   宏处理器   数据验证   Nucleon    Electron
                    ↓         ↓         ↓           ↓
                {{date}}    类型检查   .toml文件   .json文件
                {{random}}  必填字段   存储内容   初始状态

详细步骤

  1. 用户输入

    • 通过界面表单收集内容数据
    • 包括问题、答案、标签、元数据等
    • 可选应用模板
  2. 模板处理

    • 加载选定的模板文件
    • 应用模板字段映射
    • 填充默认值和格式
  3. 宏解析

    • 扫描内容中的 占位符
    • 调用宏处理器替换为实际值
    • 支持嵌套和条件宏
  4. 数据验证

    • 检查必填字段
    • 验证数据类型
    • 确保唯一标识符
  5. 文件创建

    • 生成唯一的文件标识符
    • 创建 Nucleon TOML 文件
    • 创建初始 Electron JSON 文件
    • 关联到默认或指定的 Orbital

4. TTS 语音生成数据流

文本输入 → 检查缓存 → 调用TTS → 接收音频 → 保存缓存 → 播放音频
   ↓          ↓         ↓         ↓         ↓          ↓
Nucleon内容  哈希键   提供者API   字节流   本地文件   音频设备
   ↓          ↓         ↓         ↓         ↓          ↓
问题/答案   MD5(text+voice)  微软Edge    MP3/WAV   cache/tts/   playsound

详细步骤

  1. 文本准备

    python
    # 从 Nucleon 提取 TTS 文本
    tts_text = nucleon.get("tts_text", nucleon["content"])
  2. 缓存检查

    python
    cache_key = hashlib.md5(f"{tts_text}:{voice}:{rate}".encode()).hexdigest()
    cache_path = cache_dir / f"{cache_key}.mp3"
    if cache_path.exists():
        return cache_path.read_bytes()
  3. TTS 调用

    python
    # 通过提供者调用外部服务
    audio_data = tts_provider.synthesize(tts_text, voice)
  4. 缓存保存

    python
    cache_path.write_bytes(audio_data)
    update_cache_index(cache_key, tts_text)
  5. 音频播放

    python
    audio_provider.play(audio_data)

5. 数据同步流程(计划中)

本地修改 → 检测变化 → 序列化数据 → 网络传输 → 远程存储 → 冲突解决
   ↓          ↓           ↓           ↓           ↓           ↓
文件写入   inotify/轮询    JSON/TOML   HTTP/WebDAV  云存储    合并算法
   ↓          ↓           ↓           ↓           ↓           ↓
save()    修改时间    增量更新      加密传输   Nextcloud   三向合并

数据格式转换

TOML ↔ 内存对象

python
# 加载 TOML
with open(path, "r") as f:
    data = toml.load(f)
nucleon = Nucleon(data, path)

# 保存 TOML
with open(path, "w") as f:
    toml.dump(nucleon.data, f)

JSON ↔ 内存对象

python
# 加载 JSON
with open(path, "r") as f:
    data = json.load(f)
electron = Electron(data, path)

# 保存 JSON
with open(path, "w") as f:
    json.dump(electron.data, f, indent=2)

评估 ↔ 算法 quality

python
# 用户评估到算法 quality 的映射
quality_map = {
    0: 0,  # 完全忘记
    1: 1,  # 困难
    2: 2,  # 一般
    3: 3,  # 容易
    4: 4,  # 非常容易
    5: 5,  # 简单
}
quality = quality_map[user_feedback]

数据验证流程

输入验证

python
def validate_nucleon(data: dict) -> List[str]:
    """验证 Nucleon 数据有效性"""
    errors = []
    
    # 必填字段检查
    if "id" not in data:
        errors.append("缺少 id 字段")
    if "question" not in data:
        errors.append("缺少 question 字段")
    
    # 数据类型检查
    if not isinstance(data.get("tags", []), list):
        errors.append("tags 必须是列表")
    
    # 业务规则检查
    if len(data.get("question", "")) > 1000:
        errors.append("问题过长(最大1000字符)")
    
    return errors

一致性检查

python
def check_data_consistency():
    """检查数据一致性"""
    # Nucleon 和 Electron 匹配
    nucleon_ids = {n.id for n in all_nucleons}
    electron_ids = {e.ident for e in all_electrons}
    
    # 报告不匹配
    missing_electrons = nucleon_ids - electron_ids
    orphaned_electrons = electron_ids - nucleon_ids
    
    # 文件系统状态检查
    for nucleon in all_nucleons:
        if not nucleon.path.exists():
            log_warning(f"Nucleon 文件丢失: {nucleon.path}")

错误处理数据流

异常传播

操作失败 → 捕获异常 → 记录日志 → 用户反馈 → 恢复尝试 → 最终处理
   ↓          ↓          ↓          ↓          ↓          ↓
文件IO    try/except   日志文件   错误消息   重试/回退  终止/继续
网络错误 异常类型      错误级别  友好提示   备用方案  事务状态

错误恢复策略

  1. 文件操作错误

    python
    try:
        data = load_file(path)
    except (IOError, PermissionError) as e:
        logger.error(f"文件加载失败: {path}, 错误: {e}")
        # 尝试备用文件
        backup_path = find_backup(path)
        if backup_path.exists():
            data = load_file(backup_path)
        else:
            # 使用默认值
            data = create_default_data()
  2. 网络请求错误

    python
    retry_count = 0
    while retry_count < MAX_RETRIES:
        try:
            response = make_request(url)
            return process_response(response)
        except NetworkError as e:
            retry_count += 1
            logger.warning(f"网络请求失败, 重试 {retry_count}/{MAX_RETRIES}")
            time.sleep(2 ** retry_count)  # 指数退避
    # 所有重试失败, 使用离线模式
    return get_cached_data() or raise OfflineModeError()

缓存数据流

缓存策略

数据请求 → 一级缓存 → 二级缓存 → 数据源 → 更新缓存 → 返回数据
   ↓          ↓          ↓         ↓         ↓          ↓
读取操作   内存缓存   磁盘缓存   文件/网络  写入缓存   应用程序
            LRU策略   文件系统   原始数据   失效时间   处理结果

缓存实现

python
class TTSCache:
    def __init__(self, max_memory_items=100, max_disk_items=1000):
        self.memory_cache = LRUCache(max_memory_items)
        self.disk_dir = Path("data/cache/tts")
        self.disk_dir.mkdir(exist_ok=True)
    
    def get(self, key: str) -> Optional[bytes]:
        # 1. 检查内存缓存
        if key in self.memory_cache:
            return self.memory_cache[key]
        
        # 2. 检查磁盘缓存
        disk_path = self.disk_dir / f"{key}.mp3"
        if disk_path.exists():
            data = disk_path.read_bytes()
            # 更新内存缓存
            self.memory_cache[key] = data
            return data
        
        # 3. 缓存未命中
        return None
    
    def set(self, key: str, data: bytes):
        # 更新内存缓存
        self.memory_cache[key] = data
        
        # 更新磁盘缓存
        disk_path = self.disk_dir / f"{key}.mp3"
        disk_path.write_bytes(data)
        
        # 清理过期缓存
        self.cleanup()

性能监控数据流

指标收集

操作执行 → 时间测量 → 计数统计 → 聚合数据 → 存储指标 → 分析报告
   ↓          ↓          ↓          ↓          ↓          ↓
函数调用   time.time()  计数器     分组聚合   日志文件   可视化
API请求   上下文管理器 成功/失败  时间窗口   数据库    性能图表

监控实现

python
@contextmanager
def measure_time(operation: str):
    """测量操作执行时间"""
    start_time = time.time()
    try:
        yield
    finally:
        elapsed = time.time() - start_time
        metrics.record_timing(operation, elapsed)

# 使用示例
with measure_time("nucleon_load"):
    nucleon = load_nucleon(path)

数据备份流程

自动备份

定时触发 → 创建快照 → 压缩数据 → 验证完整性 → 存储备份 → 清理旧备份
   ↓          ↓          ↓          ↓           ↓          ↓
cron任务   时间戳目录   tar/zip    校验和     本地/远程   保留策略
          完整复制     加密压缩   SHA256    云存储      时间窗口

备份脚本示例

bash
#!/bin/bash
# backup.sh
BACKUP_DIR="/backup/heurams/$(date +%Y%m%d_%H%M%S)"
mkdir -p "$BACKUP_DIR"

# 复制数据
cp -r data "$BACKUP_DIR/"
cp config/config.toml "$BACKUP_DIR/"

# 创建校验和
find "$BACKUP_DIR" -type f -exec sha256sum {} \; > "$BACKUP_DIR/checksums.txt"

# 压缩备份
tar -czf "$BACKUP_DIR.tar.gz" "$BACKUP_DIR"

# 清理临时目录
rm -rf "$BACKUP_DIR"

# 清理旧备份(保留最近30天)
find /backup/heurams -name "*.tar.gz" -mtime +30 -delete

相关文档