数据流
本文档描述 HeurAMS 中数据在组件间的流动和处理过程.
数据流总览
mermaid
graph TB
subgraph "用户交互"
UI[用户界面]
Input[用户输入]
Output[界面输出]
end
subgraph "业务处理"
SVC[服务层]
KRN[内核层]
PRV[提供者层]
end
subgraph "数据存储"
CFG[配置文件]
NUC[Nucleon 内容]
ELE[Electron 状态]
ORB[Orbital 策略]
CCH[缓存文件]
end
subgraph "外部服务"
TTS[微软 TTS]
LLM[大语言模型]
AUD[系统音频]
end
UI --> SVC
SVC --> KRN
KRN --> PRV
PRV --> TTS
PRV --> LLM
PRV --> AUD
SVC --> CFG
KRN --> NUC
KRN --> ELE
KRN --> ORB
KRN --> CCH
TTS --> CCH
LLM --> CCH核心数据流
1. 应用启动数据流
用户启动 → 解析参数 → 加载配置 → 初始化上下文 → 启动服务 → 显示界面
↓ ↓ ↓ ↓ ↓ ↓
命令行 默认配置 用户配置 ContextVar 提供者 Textual
↓ ↓ ↓ ↓ ↓
config.toml config/ config_var audio/ screens/
tts/ widgets/详细步骤:
命令行解析
- 读取
sys.argv - 解析选项和参数
- 设置初始工作目录
- 读取
配置加载
- 检查
config/config.toml(用户配置) - 如果不存在, 使用
src/heurams/default/config/config.toml(默认配置) - 合并配置, 用户配置优先
- 验证配置有效性
- 检查
上下文初始化
- 创建
config_var上下文变量 - 设置当前配置
- 初始化日志系统
- 创建
服务启动
- 根据配置加载提供者
- 初始化音频、TTS、LLM 服务
- 建立服务到提供者的映射
界面显示
- 创建 Textual 应用实例
- 加载屏幕和组件
- 应用 CSS 样式
- 进入主事件循环
2. 复习流程数据流
获取到期内容 → 排序优先级 → 显示谜题 → 用户回答 → 评估反馈 → 更新状态
↓ ↓ ↓ ↓ ↓ ↓
扫描Electron 队列算法 选择谜题 界面输入 谜题评估 算法更新
↓ ↓ ↓ ↓ ↓ ↓
next_date≤今天 逾期优先 Orbital配置 用户交互 正确/错误 SM-2/FSRS详细步骤:
获取到期内容
python# 扫描所有 Electron 文件 for electron_path in electron_dir.glob("**/*.json"): electron = load_electron(electron_path) if electron.next_date <= current_daystamp: due_atoms.append(create_atom(electron.ident))排序优先级
- 严重逾期:超过7天以上
- 一般逾期:1-7天
- 今日到期:刚好到期
- 新内容:首次学习(如果每日限额未满)
显示谜题
- 根据 Orbital 配置选择谜题类型
- 使用谜题生成器从 Nucleon 创建题目
- 在界面显示问题, 等待用户尝试回忆
- 用户请求显示答案
用户评估
- 用户自我评估记忆质量 (0-5)
- 系统将评估转换为算法 quality 值
- 记录评估时间和上下文信息
更新算法状态
python# SM-2 算法更新 algodata = electron.data["SM-2"] SM2Algorithm.revisor(algodata, quality, is_new_activation) electron.save()
3. 内容创建数据流
用户输入 → 应用模板 → 解析宏 → 验证数据 → 创建文件 → 初始化状态
↓ ↓ ↓ ↓ ↓ ↓
界面表单 模板引擎 宏处理器 数据验证 Nucleon Electron
↓ ↓ ↓ ↓
{{date}} 类型检查 .toml文件 .json文件
{{random}} 必填字段 存储内容 初始状态详细步骤:
用户输入
- 通过界面表单收集内容数据
- 包括问题、答案、标签、元数据等
- 可选应用模板
模板处理
- 加载选定的模板文件
- 应用模板字段映射
- 填充默认值和格式
宏解析
- 扫描内容中的
占位符 - 调用宏处理器替换为实际值
- 支持嵌套和条件宏
- 扫描内容中的
数据验证
- 检查必填字段
- 验证数据类型
- 确保唯一标识符
文件创建
- 生成唯一的文件标识符
- 创建 Nucleon TOML 文件
- 创建初始 Electron JSON 文件
- 关联到默认或指定的 Orbital
4. TTS 语音生成数据流
文本输入 → 检查缓存 → 调用TTS → 接收音频 → 保存缓存 → 播放音频
↓ ↓ ↓ ↓ ↓ ↓
Nucleon内容 哈希键 提供者API 字节流 本地文件 音频设备
↓ ↓ ↓ ↓ ↓ ↓
问题/答案 MD5(text+voice) 微软Edge MP3/WAV cache/tts/ playsound详细步骤:
文本准备
python# 从 Nucleon 提取 TTS 文本 tts_text = nucleon.get("tts_text", nucleon["content"])缓存检查
pythoncache_key = hashlib.md5(f"{tts_text}:{voice}:{rate}".encode()).hexdigest() cache_path = cache_dir / f"{cache_key}.mp3" if cache_path.exists(): return cache_path.read_bytes()TTS 调用
python# 通过提供者调用外部服务 audio_data = tts_provider.synthesize(tts_text, voice)缓存保存
pythoncache_path.write_bytes(audio_data) update_cache_index(cache_key, tts_text)音频播放
pythonaudio_provider.play(audio_data)
5. 数据同步流程(计划中)
本地修改 → 检测变化 → 序列化数据 → 网络传输 → 远程存储 → 冲突解决
↓ ↓ ↓ ↓ ↓ ↓
文件写入 inotify/轮询 JSON/TOML HTTP/WebDAV 云存储 合并算法
↓ ↓ ↓ ↓ ↓ ↓
save() 修改时间 增量更新 加密传输 Nextcloud 三向合并数据格式转换
TOML ↔ 内存对象
python
# 加载 TOML
with open(path, "r") as f:
data = toml.load(f)
nucleon = Nucleon(data, path)
# 保存 TOML
with open(path, "w") as f:
toml.dump(nucleon.data, f)JSON ↔ 内存对象
python
# 加载 JSON
with open(path, "r") as f:
data = json.load(f)
electron = Electron(data, path)
# 保存 JSON
with open(path, "w") as f:
json.dump(electron.data, f, indent=2)评估 ↔ 算法 quality
python
# 用户评估到算法 quality 的映射
quality_map = {
0: 0, # 完全忘记
1: 1, # 困难
2: 2, # 一般
3: 3, # 容易
4: 4, # 非常容易
5: 5, # 简单
}
quality = quality_map[user_feedback]数据验证流程
输入验证
python
def validate_nucleon(data: dict) -> List[str]:
"""验证 Nucleon 数据有效性"""
errors = []
# 必填字段检查
if "id" not in data:
errors.append("缺少 id 字段")
if "question" not in data:
errors.append("缺少 question 字段")
# 数据类型检查
if not isinstance(data.get("tags", []), list):
errors.append("tags 必须是列表")
# 业务规则检查
if len(data.get("question", "")) > 1000:
errors.append("问题过长(最大1000字符)")
return errors一致性检查
python
def check_data_consistency():
"""检查数据一致性"""
# Nucleon 和 Electron 匹配
nucleon_ids = {n.id for n in all_nucleons}
electron_ids = {e.ident for e in all_electrons}
# 报告不匹配
missing_electrons = nucleon_ids - electron_ids
orphaned_electrons = electron_ids - nucleon_ids
# 文件系统状态检查
for nucleon in all_nucleons:
if not nucleon.path.exists():
log_warning(f"Nucleon 文件丢失: {nucleon.path}")错误处理数据流
异常传播
操作失败 → 捕获异常 → 记录日志 → 用户反馈 → 恢复尝试 → 最终处理
↓ ↓ ↓ ↓ ↓ ↓
文件IO try/except 日志文件 错误消息 重试/回退 终止/继续
网络错误 异常类型 错误级别 友好提示 备用方案 事务状态错误恢复策略
文件操作错误
pythontry: data = load_file(path) except (IOError, PermissionError) as e: logger.error(f"文件加载失败: {path}, 错误: {e}") # 尝试备用文件 backup_path = find_backup(path) if backup_path.exists(): data = load_file(backup_path) else: # 使用默认值 data = create_default_data()网络请求错误
pythonretry_count = 0 while retry_count < MAX_RETRIES: try: response = make_request(url) return process_response(response) except NetworkError as e: retry_count += 1 logger.warning(f"网络请求失败, 重试 {retry_count}/{MAX_RETRIES}") time.sleep(2 ** retry_count) # 指数退避 # 所有重试失败, 使用离线模式 return get_cached_data() or raise OfflineModeError()
缓存数据流
缓存策略
数据请求 → 一级缓存 → 二级缓存 → 数据源 → 更新缓存 → 返回数据
↓ ↓ ↓ ↓ ↓ ↓
读取操作 内存缓存 磁盘缓存 文件/网络 写入缓存 应用程序
LRU策略 文件系统 原始数据 失效时间 处理结果缓存实现
python
class TTSCache:
def __init__(self, max_memory_items=100, max_disk_items=1000):
self.memory_cache = LRUCache(max_memory_items)
self.disk_dir = Path("data/cache/tts")
self.disk_dir.mkdir(exist_ok=True)
def get(self, key: str) -> Optional[bytes]:
# 1. 检查内存缓存
if key in self.memory_cache:
return self.memory_cache[key]
# 2. 检查磁盘缓存
disk_path = self.disk_dir / f"{key}.mp3"
if disk_path.exists():
data = disk_path.read_bytes()
# 更新内存缓存
self.memory_cache[key] = data
return data
# 3. 缓存未命中
return None
def set(self, key: str, data: bytes):
# 更新内存缓存
self.memory_cache[key] = data
# 更新磁盘缓存
disk_path = self.disk_dir / f"{key}.mp3"
disk_path.write_bytes(data)
# 清理过期缓存
self.cleanup()性能监控数据流
指标收集
操作执行 → 时间测量 → 计数统计 → 聚合数据 → 存储指标 → 分析报告
↓ ↓ ↓ ↓ ↓ ↓
函数调用 time.time() 计数器 分组聚合 日志文件 可视化
API请求 上下文管理器 成功/失败 时间窗口 数据库 性能图表监控实现
python
@contextmanager
def measure_time(operation: str):
"""测量操作执行时间"""
start_time = time.time()
try:
yield
finally:
elapsed = time.time() - start_time
metrics.record_timing(operation, elapsed)
# 使用示例
with measure_time("nucleon_load"):
nucleon = load_nucleon(path)数据备份流程
自动备份
定时触发 → 创建快照 → 压缩数据 → 验证完整性 → 存储备份 → 清理旧备份
↓ ↓ ↓ ↓ ↓ ↓
cron任务 时间戳目录 tar/zip 校验和 本地/远程 保留策略
完整复制 加密压缩 SHA256 云存储 时间窗口备份脚本示例
bash
#!/bin/bash
# backup.sh
BACKUP_DIR="/backup/heurams/$(date +%Y%m%d_%H%M%S)"
mkdir -p "$BACKUP_DIR"
# 复制数据
cp -r data "$BACKUP_DIR/"
cp config/config.toml "$BACKUP_DIR/"
# 创建校验和
find "$BACKUP_DIR" -type f -exec sha256sum {} \; > "$BACKUP_DIR/checksums.txt"
# 压缩备份
tar -czf "$BACKUP_DIR.tar.gz" "$BACKUP_DIR"
# 清理临时目录
rm -rf "$BACKUP_DIR"
# 清理旧备份(保留最近30天)
find /backup/heurams -name "*.tar.gz" -mtime +30 -delete