PCVR 数据管道¶
PCVR 数据管道负责把 parquet 数据变成训练 batch,并在需要时叠加 cache、增强、shuffle 和 observed schema 统计。数据管道不是一个单独命令,而是训练 workflow 通过 PCVRTrainConfig.data_pipeline 注入到 get_pcvr_data() 的运行时能力。
训练路径现在优先使用 step-indexed map-style Dataset:PyTorch DataLoader 负责 worker、prefetch、pin memory 和 sampler 调度;仓库侧只负责把一个 step index 映射为一次有放回采样、cache 读取和 batch 组装。验证和推理仍使用顺序 scan 语义,不启用随机增强。
什么时候看这页¶
- 你要解释 Baseline 和 Baseline+ 为什么数据侧行为不同。
- 你想给新实验加数据增强或 cache。
- 你在排查吞吐、内存或线上数据分布问题。
本地和线上数据路径¶
本地 PCVR 训练、评估和推理默认使用 Hugging Face demo 数据;调试、benchmark 或复现线上样本时,也可以显式传 --dataset-path 指向本地 parquet 文件或目录。显式数据路径不会被写入实验包,提交前仍应避免把临时本地路径固化到默认配置或文档主流程里。
线上 bundle 由平台环境变量注入真实数据:
- 训练 / 评估:
TRAIN_DATA_PATH - 推理:
EVAL_DATA_PATH - schema:
TAAC_SCHEMA_PATH
维护类实验例外,例如 Online Dataset EDA 本身就是为了读取指定数据集。
当前组件¶
数据管道配置位于 PCVRTrainConfig.data_pipeline,类型定义在 src/taac2026/domain/config.py。
| 组件 | 作用 | 常见用途 |
|---|---|---|
PCVRDataCacheConfig |
缓存已转换的基础 batch | 加速重复访问,减少数据转换开销 |
PCVRSequenceCropConfig |
为序列生成尾窗或随机窗口 | 控制有效序列长度,做轻量增强 |
PCVRFeatureMaskConfig |
随机置零稀疏特征和序列事件 | 提升鲁棒性 |
PCVRDomainDropoutConfig |
按行丢弃整个序列域 | 模拟缺失域,减少单域依赖 |
cache 支持这些模式:
none:关闭。lru:最近最少使用淘汰。fifo:先入先出淘汰。lfu:最少使用频次淘汰。rr:随机淘汰。opt:按已知访问轨迹做 OPT 淘汰;不满足条件时会退回安全策略。
配置对象的当前字段:
PCVRDataPipelineConfig(
cache=PCVRDataCacheConfig(mode="none", max_batches=0),
transforms=(),
seed=None,
strict_time_filter=True,
)
transform 配置字段:
PCVRSequenceCropConfig(
enabled=True,
views_per_row=1,
seq_window_mode="tail", # tail / random_tail / rolling
seq_window_min_len=1,
)
PCVRFeatureMaskConfig(
enabled=True,
probability=0.0,
)
PCVRDomainDropoutConfig(
enabled=True,
probability=0.0,
)
PCVRDataPipelineConfig.enabled 只表示是否存在启用的 transform;cache 是否启用由 cache.enabled 单独判断。
执行顺序¶
训练数据从 parquet 到模型输入大致经历这些阶段:
get_pcvr_data()解析 parquet 文件或目录,读取 schema。- 训练端的
PCVRStepDataset将 step index 映射为有放回采样的 parquet batch key。 - base batch cache 尝试命中;命中后返回 clone,避免增强污染缓存。
- transform 依次作用在 batch 上;如果多视图增强产生超过一个 optimizer batch 的行数,step dataset 会重采样回单个训练 batch。
- PyTorch
DataLoader继续负责 worker / prefetch / pin memory。 - training workflow 把 batch 转成
ModelInput。
train_steps_per_sweep=0 时,step-random 训练 loader 的一次逻辑 sweep 使用训练数据推导出的 batch 数;如果显式设置 --train-steps-per-sweep,则 PyTorch sampler 会使用这个固定长度的 step 窗口。max_steps>0 时,训练器仍按总 optimizer step 截断。
验证和推理不应该启用随机增强。训练增强通过实验包默认配置进入,不应该在评估入口临时拼出来。
Transform 语义¶
PCVRSequenceCropTransform 会对每个序列域裁剪窗口:
tail:保留尾部窗口。random_tail:随机选择窗口长度,但窗口仍贴近尾部。rolling:随机选择窗口起点。views_per_row > 1时会复制行,形成多视图训练样本。
PCVRFeatureMaskTransform 会随机置零:
user_int_featsitem_int_feats- 各序列域里的事件 token
- 对序列事件 mask 后会 compact 序列,更新长度和 time bucket。
PCVRDomainDropoutTransform 会按行丢弃整个序列域:
- sequence 置零。
- 对应 length 置零。
- time bucket 如果存在也置零。
所有随机 transform 都使用传入的 torch.Generator,seed 来自 pipeline 配置和数据读取上下文。
Cache 语义¶
cache 存的是增强前的基础 batch。
lru、fifo、lfu、rr 和 opt 都使用项目内 native C++ policy index。Python 层负责 batch 对象或 shared tensor 槽位存储,C++ 层负责 key/slot 索引、访问计数和驱逐选择。
opt 模式需要已知访问 trace。trace 满足预期时按下一次使用距离淘汰;没有 trace 时按 lru 安全行为运行。多 worker 场景下所有 cache 策略都使用 shared-memory cache,具体实现见:
PCVRMemoryBatchCachePCVRSharedBatchCache
cache 返回 batch clone,因此后续 transform 不会修改缓存里的基础样本。
Baseline 和 Baseline+¶
Baseline 关闭数据增强和 cache:
Baseline+ 默认打开更激进的组合:
PCVRDataPipelineConfig(
cache=PCVRDataCacheConfig(mode="opt", max_batches=512),
transforms=(
PCVRSequenceCropConfig(views_per_row=2, seq_window_mode="random_tail", seq_window_min_len=8),
PCVRFeatureMaskConfig(probability=0.03),
PCVRDomainDropoutConfig(probability=0.03),
),
seed=42,
)
做消融时不要只比较实验名;先确认模型、数据增强、cache 和 backend 哪些同时变了。
改配置的位置¶
普通实验只需要改自己的入口文件:
共享实现位于:
- 配置对象:
src/taac2026/domain/config.py - batch transform:
src/taac2026/infrastructure/data/transforms.py - cache:
src/taac2026/infrastructure/data/cache.py - 训练 step dataset:
src/taac2026/infrastructure/data/step_dataset.py - scan 数据读取和 batch 转换:
src/taac2026/infrastructure/data/dataset.py - batch 类型:
src/taac2026/infrastructure/data/batches.py
如果只是调整某个实验的增强策略,改实验包的 TRAIN_DEFAULTS。如果要新增一种 transform,再改 domain/config.py 和 infrastructure/data/transforms.py,并补测试。
吞吐压测¶
如果你改了数据管道,先用 benchmark 做一个小口径对比:
uv run taac-benchmark-pcvr-data-pipeline \
--dataset-path data/sample_1000_raw/demo_1000.parquet \
--schema-path docs/archive/files/schema/sample_1000_raw.schema.json \
--preset augment
preset 可选 none、cache、augment。正式性能结论应使用更大的合成数据,并在当前代码和当前环境下重新生成;推荐把 JSON 输出放到 outputs/benchmarks/,同时记录 commit、硬件、CUDA / PyTorch 版本和完整命令。
最小复核¶
uv run pytest tests/unit/infrastructure/data/test_augmentation.py -q
uv run pytest tests/unit/infrastructure/data/test_split.py -q
uv run pytest tests/contract/experiments/test_runtime_contract_matrix.py -q
如果改了 cache 或 dataloader 行为,再跑:
uv run pytest tests/unit/application/training/test_cli.py -q
uv run pytest tests/unit/application/experiments/test_pcvr_runtime.py -q