Skip to content

KevinCJM/CalMetricsAST

Repository files navigation

CalMetricsAST

CalMetricsAST 提供一个可扩展的量化指标计算框架,通过将配置、算子及执行流程模块化,支持多品种、多周期的批量运算。仓库默认提供一套示例指标、依赖 DAG 的执行计划、Parquet 输出与结构化日志,便于快速集成到数据分析或投研体系中。

功能总览

  • 配置驱动:指标、周期以及运行参数全部以 JSON + JSON Schema 形式管理,ConfigLoader 负责加载和校验,防止回退版本与结构漂移。
  • 可扩展算子:基于 numba 的算子注册机制,允许以 AST 组合指标公式,支持依赖关系自动排序。
  • 批量执行引擎BatchExecutor 配合窗口解析器 PeriodWindowResolver 对多产品多周期执行,输出标准化结果表。
  • 结果消费:提供 Parquet 写入、结构化日志和报表辅助函数,便于与 BI 或后续分析系统对接。
  • 工具脚本:包含配置校验、快速演示、共享内存准备与发布前流水线脚本,覆盖本地开发到上线前验证的关键步骤。

目录结构

CalMetricsAST/
├── configs/                  # 示例指标、周期与参数 JSON
├── contracts/                # 对应 JSON Schema(Draft 7)
├── pyfinance_indicators/     # 核心 Python 包
│   ├── cli/                  # 命令行入口
│   ├── config/               # Pydantic 数据模型与加载逻辑
│   ├── dag/                  # 指标依赖 DAG 构建与调度
│   ├── data/                 # 窗口划分与分区工具
│   ├── execution/            # 批处理引擎与共享内存支持
│   ├── indicators/           # AST 评估器与指标注册表
│   ├── io/                   # Parquet 写入与报表工具
│   ├── operators/            # numba 算子与注册装饰器
│   └── logger.py             # 结构化日志工具
├── scripts/                  # 管理脚本(Quickstart、验证、共享内存)
├── tests/                    # 单元 / 集成 / 性能占位测试
├── requirements.txt          # 运行及开发依赖
├── pytest.ini                # 自定义 pytest 标记
└── README.md                 # 当前文档

指标 DSL 快速入门

  • 编写表达式:在 configs/indicator-config.json 的指标条目中新增 expression 字段,例如 TotalReturn 可写为 cal_return(x)。最新的 ReturnRiskEfficiency 指标采用纯 DSL cal_div(cal_return(x), cal_std(x)),加载时会自动生成 AST、推导依赖与输入序列,可作为参考模板。
  • 语法要点:使用函数式调用 cal_xxx(arg1, arg2),支持嵌套与命名参数。保留字 x 代表原始收益序列,parameters.xxx 可引用运行参数,window_length 代表当前窗口样本数。
  • 算子对照:常用算子包含 cal_return(累计收益)、cal_std(样本标准差)、cal_div(带默认值的除法)、cal_mdd(最大回撤)等,完整表格见 docs/dsl-spec.md,可复制 docs/dsl-template.json 作为起始模板。
  • 自测流程:更新配置后运行 python scripts/run_quickstart.py --config-dir configs --schema-dir contracts --input-parquet data/etf_daily_df.parquet --output-parquet outputs/quickstart_indicators.parquet,并执行 pytest tests/unit/test_dsl_parser.py 验证解析逻辑。
  • 辅助工具:使用 python scripts/dsl_tool.py validate --config configs/indicator-config.json 校验表达式,可通过 convert 子命令批量写入 expression 字段。
  • 排错建议:若加载时报 表达式解析失败 或 DAG 环检测错误,请检查算子拼写、参数数量和指标引用;必要时参考 tests/unit/test_dsl_parser.pytests/unit/test_registry_dependencies.py 的示例。

安装与环境

  1. 建议使用 Python 3.12(与内置脚本保持一致),可通过 pyenv 或现有虚拟环境创建:
    python3.12 -m venv .venv
    source .venv/bin/activate
  2. 安装依赖:
    pip install -r requirements.txt
  3. 若需要与仓库中示例脚本保持一致,可将解释器路径设为 /Users/chenjunming/Desktop/myenv_312/bin/python3.12 或在运行命令时显式指定。

依赖说明

  • numpy / pandas:核心数值与表格处理库。
  • pyarrow:写入标准化 Parquet 输出。
  • pydantic / jsonschema:配置模型与 Schema 校验。
  • numba:对指标算子进行 JIT 加速。
  • pytest:单元、集成与性能标记测试。

快速开始

  1. 配置校验

    python scripts/validate_release.py --skip-performance
    • 校验示例配置与 Schema 一致性。
    • 运行 tests/unittests/integration,如需包含性能标记,移除 --skip-performance
  2. 快速演示

    python scripts/run_quickstart.py \
      --config-dir configs \
      --schema-dir contracts \
      --input-parquet data/etf_daily_df.parquet \
      --output-parquet outputs/quickstart_indicators.parquet \
      --window-mode calendar

    若输入 Parquet 不存在,脚本会生成合成数据并自动创建输出目录。对于仅包含 nav_date 和净值的文件,会自动计算日收益率并转换为所需的 trade_date/return 结构。 --window-mode 默认使用 calendar,如需固定交易日窗口可切换为 trading_days。 启用多进程 + 共享内存模式时,可额外指定 --use-shared-dataset;若未显式指定 --max-workers,进程池默认规模为 CPU 核心数 - 1。可使用 --log-file 将运行日志镜像到文件。

  3. 批量执行 CLI

    python -m pyfinance_indicators.cli.batch_runner \
      --config-dir configs \
      --schema-dir contracts \
      --input-parquet data/etf_daily_df.parquet \
      --output-parquet outputs/full_run.parquet \
      --window-mode calendar
    • 可通过 --products 指定产品列表。要开启共享内存 + 多进程模式,可附带 --use-shared-dataset --max-workers <进程数>;如仍需手工句柄,保留 --shm-handles 接口。
    • --log-file 参数可将执行日志镜像到文件。
  4. 共享内存工具

    python scripts/load_shared_memory.py \
      --input data/etf_daily_df.parquet \
      --handle-out tmp/handles.json
    • 生成句柄供外部进程复用;处理完成后可使用 --cleanup 释放资源。
    • 在新版批处理流程中,常规运行无需手动调用该脚本;仅在跨系统集成或排错时使用。

共享内存与多进程说明

  • BatchExecutor 在传入 use_shared_dataset=True 时,会自动把输入 DataFrame 写入共享内存, 并通过进程池结合 numba.prange 对产品执行并行计算。未设定该标志时保持原有单进程行为。
  • 日志中会统计每个周期的状态分布及零拷贝命中率,便于监控性能与数据质量。
  • 若希望继续使用手动共享内存(例如在其他进程中预热数据),仍可通过 --shm-handles 向 CLI 注入现有句柄。

临时 DSL 指标执行

当仅需基于 DSL 表达式临时计算指标并落地输出时,可使用新增的 CLI:

  1. 快速解析表达式与依赖(不执行)

    python -m pyfinance_indicators.cli.dsl_runner \
      --expression "cal_div(TotalReturn, prodStd, default=0.0)" \
      --period 1M --period 1Y \
      --config-dir configs --schema-dir contracts

    输出 JSON 包含 AST 与依赖,便于上线前审阅。

  2. 执行并写入结果

    python -m pyfinance_indicators.cli.dsl_execute \
      --expression "cal_div(TotalReturn, prodStd, default=0.0)" \
      --period 1M --period 1Y \
      --input-parquet data/etf_daily_df.parquet \
      --output-parquet outputs/dsl_ratio.parquet
    • 可选 --code / --description 指定指标信息;未提供时系统自动生成合法编码与默认描述。
    • 支持 --products 过滤产品,或用 --shm-handles 读取共享内存句柄。
    • CLI 会注册临时指标到运行期 BatchExecutor,仅输出该指标的计算结果,并返回记录数与输出路径。
    • 若希望自动生成编码/描述且直接落地结果,可使用 python -m pyfinance_indicators.cli.dsl_run ...,参数与 dsl_execute 保持一致。

核心流程

  1. 配置加载ConfigLoader 读取 configs/ 下的 JSON,并使用 contracts/ 的 Schema 校验字段、版本递增与 Checksum(pyfinance_indicators/config/params.py)。
  2. 指标注册IndicatorDefinitionRegistry 构建指标图,算子依赖通过 operators 包注册(pyfinance_indicators/operators)。
  3. DAG 构建DAGBuilder 根据周期映射生成 IndicatorPlanbuild_schedule 将请求周期转换为有序任务列表。
  4. 数据窗口PeriodWindowResolver 对交易日序列解析滚动/年度窗口,并输出缺失数据原因码(pyfinance_indicators/data/partitioning.py)。
  5. 执行与输出BatchExecutor 在共享内存模式下按周期构造任务并提交进程池(默认 CPU 核心数 - 1 个工作进程),每个周期子进程内部通过 numba.prange 并行遍历产品,计算指标并实时输出周期级日志;最终按周期顺序合并结果并写入 Parquet(pyfinance_indicators/execution/batch.pypyfinance_indicators/io/parquet_writer.pypyfinance_indicators/logger.py)。
  6. 结果消费pyfinance_indicators/io/reporting.py 提供透视表与缺失原因分析工具,便于集成到 BI 或监控工作流中。

配置与扩展

  • 新增指标:在 configs/indicator-config.json 添加条目,定义 AST、输入以及依赖。若需要新算子,可在 pyfinance_indicators/operators/math_ops.py 中编写函数并用 @register_operator 装饰。
  • 新增周期:编辑 configs/period-config.jsonperiodsmappings,并确保相应 Schema 与版本号更新。
    • 对滚动周期可设置 period_modecalendar/trading_days),覆盖 CLI 默认行为。
  • 参数调整configs/indicator-parameters.json 控制年化天数、风险自由率等运行参数;变更时应更新版本号,防止 ConfigLoader 检测到回退。
  • 共享 Schema:若 Schema 储存在外部仓库,可将路径传入 CLI 的 --schema-dir,或扩展 ConfigPaths 指向自定义目录。

开发与测试

  1. 安装开发环境后,可运行以下命令进行冒烟测试:
    python scripts/validate_release.py --skip-performance
  2. 使用 pytest 的标记机制单独运行特定测试:
    pytest tests/unit -k config
    pytest -m performance tests/performance
  3. 新增模块时,请补充相应 Schema/配置以及测试用例,并更新版本号。
  4. 建议在提交前执行完整验证:
    python scripts/validate_release.py

贡献指南

  • 遵循 AGENTS.md 中的贡献规范,采用 Conventional Commits(如 feat: add sharpe ratio operator)。
  • PR 需说明变更动机、使用的校验命令和潜在风险;涉及配置时附上 validate_config 的输出摘要。
  • 保持代码四空格缩进、类型注解、from __future__ import annotations 以及模块顶部 docstring。
  • 确保新增的 Parquet 输出、共享内存句柄等未纳入版本控制,可通过 .gitignore 中的 data/outputs/tmp/ 规则排除。

常见问题

  • 找不到 JSON Schema:确认 contracts/ 目录存在且命令行参数 --schema-dir 正确;也可将 Schema 复制到配置目录下的 contracts/ 子目录。
  • numba 版本兼容:若 Python 或 numpy 升级后出现编译错误,请同步升级 numba,确保官方发布支持对应 Python 版本。
  • PyArrow 写入失败:检查输出目录权限或磁盘空间;如需写入分区数据,可扩展 write_batch_results 或在外层进行分组处理。

参考命令速查

场景 命令
配置校验 + 测试 python scripts/validate_release.py
快速演示 python scripts/run_quickstart.py
快速演示(共享内存并行) python scripts/run_quickstart.py --use-shared-dataset --max-workers 4
批量 CLI python -m pyfinance_indicators.cli.batch_runner --input-parquet <file>
DSL 一键执行 python -m pyfinance_indicators.cli.dsl_run --expression "cal_return(x)" --period 1M --input-parquet <file> --output-parquet <file>
生成共享内存句柄 python scripts/load_shared_memory.py --input <parquet>
清理共享内存 python scripts/load_shared_memory.py --handle-out tmp/handles.json --cleanup

如需了解整体贡献流程、目录约束及安全注意事项,请配合阅读 AGENTS.md

About

使用AST表达式以及DAG逻辑, 高效计算金融产品指标. 并且采用了共享内存和零拷贝技术.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages