Skip to content

feat: add Google Health ingestion and storage#175

Merged
endo-ly merged 4 commits into
mainfrom
feat/google-health-ingestion
Jun 13, 2026
Merged

feat: add Google Health ingestion and storage#175
endo-ly merged 4 commits into
mainfrom
feat/google-health-ingestion

Conversation

@endo-ly

@endo-ly endo-ly commented Jun 12, 2026

Copy link
Copy Markdown
Owner

概要

Google Health / Fitbit Air連携のPhase 2として、期間指定のデータ取得、正規化、Raw JSON・Parquet保存、同期状態管理を実装しました。

変更内容

  • initial_backfillrangedata_type_rangeのrun作成APIを追加
  • 対象28 data typeの取得方式、pagination、期間分割を実装
  • Daily / Sample / Interval / Sessionの4形式へ正規化
  • data typeごとのRaw JSONをR2へ保存
  • 取得ごとにevents/.../{uuid}.parquetを保存
  • 後続compact stepでcompacted/.../data.parquetの対象期間を置換
  • successno_datafailed、件数、期間、エラーをSQLiteへ記録
  • 一部data type失敗時のpartial_failedをworkflow runへ反映
  • データソース資料とPipelines資料を更新

保存フロー

  1. 取得runごとの正規化結果をevents/google_health/.../{uuid}.parquetへ追加
  2. 成功またはno_dataのdata typeだけをcompact対象にする
  3. 対象期間を反映したcompacted/events/google_health/.../data.parquetを再生成
  4. events側のUUID Parquetは削除しない

影響

  • Google Health接続済み環境から手動backfill・期間再取得が可能になります。
  • Scheduler登録とDuckDB viewはPhase 3の対象です。

検証

  • uv run ruff check(変更Pythonファイル)
  • git diff --check
  • uv run pytest egograph/pipelines/tests -q
    • 468 passed, 2 skipped
  • coderabbit review --agent -t uncommitted
    • 指摘2件を修正済み

Summary by CodeRabbit

  • 新機能
    • Google Health の取り込み用APIを追加(初期バックフィル/期間指定/データタイプ指定をサポート)。手動実行でingestとcompactワークフローをトリガー可能に。
  • 改善
    • 実行ステータスに「一部失敗(partial_failed)」を追加し、結果要約とエラーメッセージ保存を強化。
    • タイムゾーン設定の検証を追加。
  • ドキュメント
    • セットアップ、API一覧、運用手順、保存/再取得の動作を大幅更新。

@coderabbitai

coderabbitai Bot commented Jun 12, 2026

Copy link
Copy Markdown

Walkthrough

API経由で Google Health ingest run を作成し、Extractor→Normalizer→Writer→Repository 経由で events を保存。続けて compact で monthly-partitioned compacted Parquet を範囲置換するワークフロー、DB移行、API・クライアント・ユニット/統合テスト、ドキュメントを追加。

Changes

Google Health データ取り込みワークフロー

Layer / File(s) Summary
ドメインモデル & DB スキーマ
egograph/pipelines/sources/google_health/models.py, egograph/pipelines/domain/workflow.py, egograph/pipelines/infrastructure/db/schema.py
GoogleHealthRunMode/SyncStatus/GoogleHealthIngestRequest/GoogleHealthSyncCursor を追加。WorkflowRunStatus.PARTIAL_FAILED 追加。PipelinesConfig.timezone 検証と sqlite の google_health_sync_cursors 列追加マイグレーションを実施。
API & サービス & ワークフロー登録
egograph/pipelines/api/google_health.py, egograph/pipelines/service.py, egograph/pipelines/workflows/registry.py, egograph/pipelines/README.md, docs/architecture/pipelines.md
POST /v1/sources/google-health/runsGoogleHealthRunRequest を追加。create_ingest_run エンドポイントと PipelineService.trigger_google_health_ingest を実装。ワークフロー google_health_ingest_workflow を登録し README/アーキ文書を更新。
Client, data types & timezone utils
egograph/pipelines/sources/google_health/client.py, egograph/pipelines/sources/google_health/data_types.py, egograph/pipelines/sources/google_health/timezone.py
Google Health API 用の reconcile_data_points/daily_rollup/rollup を追加、クライアントへ timezone 注入。RecordKind/FetchStrategy 導入と GoogleHealthDataType 再設計、DATA_TYPE_BY_NAME を追加。タイムゾーン補助関数を追加。
Extractor
egograph/pipelines/sources/google_health/extractor.py, egograph/pipelines/tests/unit/google_health/test_extractor.py, egograph/pipelines/tests/unit/google_health/test_client.py
GoogleHealthExtractor を実装(ページング・チャンク分割・フィルタ生成)。クライアント呼び出し内容と抽出の単体テストを追加。
Normalizer
egograph/pipelines/sources/google_health/normalizer.py, egograph/pipelines/tests/unit/google_health/test_normalizer.py
APIレスポンスを samples/intervals/sessions/daily_metrics の4種行に正規化、daily 集約ロジックを実装。多数の正規化ユニットテストを追加。
Writer: raw/events/compacted
egograph/pipelines/sources/google_health/writer.py, egograph/pipelines/tests/unit/google_health/test_writer.py
R2(S3互換)へ raw JSON、events(run_id.parquet)、月単位 partition の compacted data.parquet を保存・置換する GoogleHealthWriter を追加。compaction の対象外保持・UTC/月算出・sleep 特殊処理を実装しユニットテスト追加。
Repository persistence
egograph/pipelines/sources/google_health/repository.py, egograph/pipelines/tests/unit/google_health/test_repository.py
同期カーソルの upsert/get/list を追加し、DB row ↔ domain オブジェクトのマッピングと関連テストを実装。
Workflow orchestration & tests
egograph/pipelines/sources/google_health/workflow.py, egograph/pipelines/tests/unit/google_health/*, egograph/pipelines/tests/integration/google_health/*, egograph/pipelines/tests/support/dummy_steps.py
run_google_health_ingest / run_google_health_compact と依存組立、リクエスト解析、結果要約、エラーハンドリングを実装。単体/統合テスト群を追加(部分失敗・no_data・compaction 挙動等)。
Dispatcher status handling & tests
egograph/pipelines/infrastructure/dispatching/run_dispatcher.py, egograph/pipelines/tests/unit/test_dispatcher.py
RunDispatcher の終了ステータス決定を last_summary に基づく SUCCEEDED/PARTIAL_FAILED/FAILED に変更。エラーメッセージ抽出と通知条件を更新しテストを追加。
Docs
docs/data-sources/google-health.md, docs/architecture/pipelines.md, egograph/pipelines/README.md
データフロー、R2 ディレクトリ構造、再取得方針、取得モード/手順、OAuth 設定、API エンドポイント説明を更新・追加。

Sequence Diagram

sequenceDiagram
  participant Client as API Client
  participant API as Pipelines API
  participant Service as PipelineService
  participant Workflow as Workflow Engine
  participant Extractor as GoogleHealthExtractor
  participant Normalizer as Normalizer
  participant Writer as GoogleHealthWriter
  participant Repo as GoogleHealthRepository

  Client->>API: POST /v1/sources/google-health/runs
  API->>Service: validate & trigger_google_health_ingest(request)
  Service->>Workflow: enqueue google_health_ingest_workflow
  Workflow->>Extractor: extract(data_type, date_from, date_to)
  Extractor->>API: call reconcile/rollup endpoints
  Extractor->>Normalizer: payloads
  Normalizer->>Writer: normalized rows
  Writer->>Repo: save_events / save_raw / compact_range
  Repo-->>Workflow: sync cursor results
  Workflow->>Writer: compact_range(selected_data_types, date_from, date_to)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 分

Possibly related PRs

  • endo-ly/egograph#172: Dispatcher の失敗処理/通知ロジックを変更する点で重なりあり。
  • endo-ly/egograph#174: Google Health 接続基盤(OAuth/トークン/クライアント)実装と関連。

ぽかぽか CodeRabbit よ、うさぎ跳ねる。
API 呼んでデータ拾って、パーケットに詰める。
テストがピョンと合格して、スキーマも整う。
部分失敗も認める、賢いワークフロー。
おつかれ自分、にんまり🥕

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 50.60% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed PRタイトル「feat: add Google Health ingestion and storage」はこのPRの主要な変更内容(Google Health データの取り込み・保存機能追加)を明確に要約している。
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/google-health-ingestion

@endo-ly endo-ly marked this pull request as ready for review June 12, 2026 18:16

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🧹 Nitpick comments (4)
egograph/pipelines/README.md (1)

129-129: 💤 Low value

エンドポイント説明文の詳細さを docs/architecture/pipelines.md と比較

README.md line 129 の説明「Google Health取り込みrun作成」は正確だが、docs/architecture/pipelines.md line 489 の説明「Google Health backfill・期間指定run作成」と比べると、「backfill・期間指定」といった mode 情報が missing している。

README.md はユーザーが first touch する可能性が高いドキュメントなので、他のエンドポイント説明と並べたとき情報量が consistent であるほど good。説明を少し詳細にすることを検討してもよい。

Optional な改善案:

- | `POST` | `/v1/sources/google-health/runs` | Google Health取り込みrun作成 |
+ | `POST` | `/v1/sources/google-health/runs` | Google Health backfill・期間指定run作成 |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@egograph/pipelines/README.md` at line 129, Update the README.md entry for the
endpoint `POST /v1/sources/google-health/runs` to match the detail level in
docs/architecture/pipelines.md by adding mode information (e.g., "backfill" and
"period-specified run" capability); locate the table row containing the endpoint
string `POST /v1/sources/google-health/runs` and replace the brief description
"Google Health取り込みrun作成" with a more explicit description such as "Google Health
backfill・期間指定 run 作成(backfill / period-specified)" or equivalent phrasing to
clearly indicate supported modes.
docs/data-sources/google-health.md (1)

293-314: 同期状態・再取得ロジックの実装、ドキュメントと一致してるよ(CodeRabbit)

  • writer.pycompact_range は、月partitionごとに既存 compacted .../data.parquet を読み、_retain_outside_targetconnection_iddata_type が対象&date_from <= row_date < date_to の行だけを置換して、他data type/期間外の行は保持したまま retained + current を再生成してる
  • events.../{run_id}.parquetsave_events で追加するだけで、削除は compacted_key 側の _delete_if_exists のみ(events側は消さない)
  • google_health_sync_cursorsrepository.pysave_sync_resultINSERT ... ON CONFLICT DO UPDATE で data type単位に status/range_start/range_end/last_run_id/record_count/last_error_message/updated_at を保存・更新し、compact側は list_sync_results_for_runlast_run_id=run.run_id を参照してる
  • 7.3の補足として、sessions かつ data_type=="sleep" の場合はターゲット判定に ended_at_utc を使う特例があるので一言あると安心
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@docs/data-sources/google-health.md` around lines 293 - 314, Docs currently
describe re-fetch and sync-cursor behavior but lack a note about the special
session/sleep target and the code-level mappings; update the Google Health doc
to explicitly state (1) compact_range in writer.py replaces only rows matching
connection_id/data_type and date_from <= row_date < date_to via
_retain_outside_target while preserving other data types in the same month, (2)
events saved by save_events (…/{run_id}.parquet) are never deleted (only
compact-side _delete_if_exists removes compacted_key files), (3) sync cursors
are persisted by repository.py::save_sync_result (INSERT … ON CONFLICT DO
UPDATE) and read by list_sync_results_for_run using last_run_id, and (4) add the
clarifying special-case note that for sessions with data_type == "sleep" the
target selection uses ended_at_utc instead of row_date.
egograph/pipelines/tests/unit/google_health/test_client.py (1)

104-109: ⚡ Quick win

filter_expression の検証も入れておこう。

テスト名は filter 送信をうたってるのに、params["filter"] の確認がない。ここを明示すると退行を拾いやすくなる。

差分イメージ
     # Assert
     params = session.calls[0][2]["params"]
+    assert params["filter"] == 'steps.interval.start_time >= "2026-06-01T00:00:00Z"'
     assert params["pageToken"] == "next-token"
     assert params["pageSize"] == 10000
     assert params["dataSourceFamily"].endswith("/google-wearables")
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@egograph/pipelines/tests/unit/google_health/test_client.py` around lines 104
- 109, テストが「filter を送信する」ことを謳っているのに params["filter"]
の検証が抜けているので、session.calls[0][2]["params"] から取得した params に対して期待する filter
値(テストで設定したフィルタ文字列)を assert するアサーションを追加して退行を検出できるようにしてください(参照: params,
session.calls を使っている箇所)。
egograph/pipelines/tests/unit/google_health/test_extractor.py (1)

111-123: ⚡ Quick win

テスト名どおり reversed ケースも足そう。

今は date_from == date_to だけ確認してる。date_from > date_to も同じテストで押さえると意図がブレない。

差分イメージ
 def test_extract_rejects_empty_or_reversed_range():
@@
     with pytest.raises(ValueError, match="date_from must be earlier"):
         extractor.extract(
             connection_id="connection-1",
             data_type=DATA_TYPE_BY_NAME["steps"],
             date_from=date(2026, 6, 3),
             date_to=date(2026, 6, 3),
         )
+    with pytest.raises(ValueError, match="date_from must be earlier"):
+        extractor.extract(
+            connection_id="connection-1",
+            data_type=DATA_TYPE_BY_NAME["steps"],
+            date_from=date(2026, 6, 4),
+            date_to=date(2026, 6, 3),
+        )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@egograph/pipelines/tests/unit/google_health/test_extractor.py` around lines
111 - 123, The test test_extract_rejects_empty_or_reversed_range only asserts
the equal-dates case; add the reversed-range case so it also covers date_from >
date_to by invoking GoogleHealthExtractor.extract with date_from later than
date_to and asserting it raises the same ValueError (match "date_from must be
earlier"); you can either add a second with pytest.raises block calling
extractor.extract(...) with date_from=date(2026,6,4) and date_to=date(2026,6,3)
or parametrize the test to include both scenarios, referencing the existing test
name and GoogleHealthExtractor.extract to locate where to change.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@docs/data-sources/google-health.md`:
- Around line 142-147: Update the docs entry for
google_health_samples.measured_at_utc to match the implementation in
egograph/pipelines/sources/google_health/normalizer.py: state that the source is
`{dataType}.sampleTime.physicalTime` with a fallback to `{dataType}.instantTime`
(i.e., "sampleTime.physicalTime (if missing, instantTime)"), and make this
wording consistent with the `{dataType}.sampleTime.physicalTime` rows so both
locations reference the same precedence.
- Line 351: The sample JSON uses the field "dataPointName" which conflicts with
the API spec and surrounding docs that expect the DataPoint identifier field
"name"; update the sample to use "name" (e.g.,
"users/me/dataTypes/heart-rate/dataPoints/example") or, if you intentionally
want a different input shape, add a clear note describing the alternate
convention and why "dataPointName" is used to keep the document consistent with
the Google Health API v4 DataPoint identifier format; ensure references in the
doc that mention the identifier (e.g., examples and explanatory text) are
aligned with the chosen field.

In `@egograph/pipelines/infrastructure/db/schema.py`:
- Around line 151-155: The loop building dynamic SQL with an f-string in the
block using additions, columns and conn.execute (altering
google_health_sync_cursors) must be changed to use fixed SQL statements per
column instead of string interpolation; for each name in additions that is
missing from columns, call conn.execute with a constant query like "ALTER TABLE
google_health_sync_cursors ADD COLUMN {col} {type}" constructed by selecting
between a small set of allowed column names/types (or mapping additions to
explicit SQL templates) and pass any variable parts as parameters/validated
values rather than via f-strings so you remove direct interpolation and satisfy
the placeholder-based execute pattern.

In `@egograph/pipelines/sources/google_health/extractor.py`:
- Around line 116-131: The pagination loop calling
self._client.reconcile_data_points can infinite-loop if the API keeps returning
the same nextPageToken; add a duplicate-token guard: track the last seen
page_token (and a set of seen tokens) inside the while True in the function
using reconcile_data_points and raise an exception (or break) if the token
repeats or is unchanged between iterations. Apply the same guard to the
analogous loops in _fetch_daily_rollups and _fetch_interval_rollups; reference
the page_token variable and nextPageToken response field, and ensure you still
append responses and respect SESSION_PAGE_SIZE/DEFAULT_PAGE_SIZE behavior.

In `@egograph/pipelines/sources/google_health/normalizer.py`:
- Around line 274-285: _daily_date and _parse_datetime currently pass external
strings directly to date.fromisoformat()/datetime.fromisoformat() which can
raise ValueError and crash normalize_google_health_payload, causing an entire
data type to be marked FAILED; update _daily_date (function name: _daily_date)
and _parse_datetime (function name: _parse_datetime) to catch ValueError around
the fromisoformat calls and return None on parse failure, so callers like
_append_daily_metrics and _normalize_data_point will skip None dates rather than
letting the exception bubble up.

In `@egograph/pipelines/sources/google_health/workflow.py`:
- Around line 151-162: ループ内で extractor の返却値に含まれる cursor を
GoogleHealthRepository.save_sync_result に渡していないため GoogleHealthSyncCursor
が永続化されていません。workflow.py のループ(for result in results:)で result から cursor を取り出し(例:
result.get("cursor") または str(result["cursor"])
を必要に応じて使用)、dependencies.repository.save_sync_result の呼び出しに cursor=...
引数を追加して渡すように修正してください(参照シンボル: GoogleHealthSyncCursor,
GoogleHealthRepository.save_sync_result)。変更後は cursor が None
の場合でも明示的に渡されるようにして、sync-state の再開点が保存されることを確認してください。
- Around line 140-175: The top-level record_count should reflect per-data-type
result["record_count"] (which is set to 0 on save failure) instead of summing
the in-memory records; update the return value to compute record_count by
summing int(result["record_count"]) over results (use the existing results list
and result["record_count"]) or, alternatively, clear entries in records for
failed data_types when you set result.status to FAILED; modify the code around
the return block that currently sums len(dataset_records) for dataset_records in
records.values() to use the results-based count so failures correctly produce 0
saved records.

In `@egograph/pipelines/tests/integration/google_health/__init__.py`:
- Line 1: The module docstring in __init__.py is in English and should be
changed to Japanese to match project guidelines; update the top-level module
docstring in egograph.pipelines.tests.integration.google_health.__init__.py to a
concise Japanese string (e.g., "Google Health 統合テスト。" or similar) using the
project's preferred full-width punctuation and style so it aligns with other
Python files' docstrings.

In `@egograph/pipelines/tests/unit/google_health/test_workflow.py`:
- Around line 68-83: FakeRepository.list_sync_results_for_run currently returns
all self.sync_results and overwrites last_run_id with the call argument, which
masks bugs in filtering; change list_sync_results_for_run to filter
self.sync_results by the incoming run_id and connection_id (e.g., require
item["last_run_id"] == run_id and item["connection_id"] == connection_id or
similar keys used in your test fixtures) and preserve each item's actual
last_run_id when constructing GoogleHealthSyncCursor instead of blindly using
the run_id parameter so the fake repo accurately reproduces run-scoped results.

---

Nitpick comments:
In `@docs/data-sources/google-health.md`:
- Around line 293-314: Docs currently describe re-fetch and sync-cursor behavior
but lack a note about the special session/sleep target and the code-level
mappings; update the Google Health doc to explicitly state (1) compact_range in
writer.py replaces only rows matching connection_id/data_type and date_from <=
row_date < date_to via _retain_outside_target while preserving other data types
in the same month, (2) events saved by save_events (…/{run_id}.parquet) are
never deleted (only compact-side _delete_if_exists removes compacted_key files),
(3) sync cursors are persisted by repository.py::save_sync_result (INSERT … ON
CONFLICT DO UPDATE) and read by list_sync_results_for_run using last_run_id, and
(4) add the clarifying special-case note that for sessions with data_type ==
"sleep" the target selection uses ended_at_utc instead of row_date.

In `@egograph/pipelines/README.md`:
- Line 129: Update the README.md entry for the endpoint `POST
/v1/sources/google-health/runs` to match the detail level in
docs/architecture/pipelines.md by adding mode information (e.g., "backfill" and
"period-specified run" capability); locate the table row containing the endpoint
string `POST /v1/sources/google-health/runs` and replace the brief description
"Google Health取り込みrun作成" with a more explicit description such as "Google Health
backfill・期間指定 run 作成(backfill / period-specified)" or equivalent phrasing to
clearly indicate supported modes.

In `@egograph/pipelines/tests/unit/google_health/test_client.py`:
- Around line 104-109: テストが「filter を送信する」ことを謳っているのに params["filter"]
の検証が抜けているので、session.calls[0][2]["params"] から取得した params に対して期待する filter
値(テストで設定したフィルタ文字列)を assert するアサーションを追加して退行を検出できるようにしてください(参照: params,
session.calls を使っている箇所)。

In `@egograph/pipelines/tests/unit/google_health/test_extractor.py`:
- Around line 111-123: The test test_extract_rejects_empty_or_reversed_range
only asserts the equal-dates case; add the reversed-range case so it also covers
date_from > date_to by invoking GoogleHealthExtractor.extract with date_from
later than date_to and asserting it raises the same ValueError (match "date_from
must be earlier"); you can either add a second with pytest.raises block calling
extractor.extract(...) with date_from=date(2026,6,4) and date_to=date(2026,6,3)
or parametrize the test to include both scenarios, referencing the existing test
name and GoogleHealthExtractor.extract to locate where to change.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 218339f6-8af2-463e-9f47-9bd36db66c6c

📥 Commits

Reviewing files that changed from the base of the PR and between f2caeac and 35581b4.

📒 Files selected for processing (29)
  • docs/architecture/pipelines.md
  • docs/data-sources/google-health.md
  • egograph/pipelines/README.md
  • egograph/pipelines/api/google_health.py
  • egograph/pipelines/domain/workflow.py
  • egograph/pipelines/infrastructure/db/schema.py
  • egograph/pipelines/infrastructure/dispatching/run_dispatcher.py
  • egograph/pipelines/service.py
  • egograph/pipelines/sources/google_health/client.py
  • egograph/pipelines/sources/google_health/data_types.py
  • egograph/pipelines/sources/google_health/extractor.py
  • egograph/pipelines/sources/google_health/models.py
  • egograph/pipelines/sources/google_health/normalizer.py
  • egograph/pipelines/sources/google_health/repository.py
  • egograph/pipelines/sources/google_health/workflow.py
  • egograph/pipelines/sources/google_health/writer.py
  • egograph/pipelines/tests/integration/google_health/__init__.py
  • egograph/pipelines/tests/integration/google_health/test_pipeline.py
  • egograph/pipelines/tests/support/dummy_steps.py
  • egograph/pipelines/tests/unit/google_health/test_api.py
  • egograph/pipelines/tests/unit/google_health/test_client.py
  • egograph/pipelines/tests/unit/google_health/test_data_types.py
  • egograph/pipelines/tests/unit/google_health/test_extractor.py
  • egograph/pipelines/tests/unit/google_health/test_normalizer.py
  • egograph/pipelines/tests/unit/google_health/test_repository.py
  • egograph/pipelines/tests/unit/google_health/test_workflow.py
  • egograph/pipelines/tests/unit/google_health/test_writer.py
  • egograph/pipelines/tests/unit/test_dispatcher.py
  • egograph/pipelines/workflows/registry.py

Comment thread docs/data-sources/google-health.md Outdated
Comment thread docs/data-sources/google-health.md Outdated
Comment thread egograph/pipelines/infrastructure/db/schema.py Outdated
Comment thread egograph/pipelines/sources/google_health/extractor.py
Comment thread egograph/pipelines/sources/google_health/normalizer.py
Comment thread egograph/pipelines/sources/google_health/workflow.py
Comment on lines +151 to +162
for result in results:
status = SyncStatus(str(result["status"]))
dependencies.repository.save_sync_result(
connection_id=connection.connection_id,
data_type=str(result["data_type"]),
status=status,
range_start=request.date_from,
range_end=request.date_to,
run_id=run.run_id,
record_count=int(result["record_count"]),
error_message=str(result["error"]) if "error" in result else None,
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

cursor が永続化経路に乗ってない

GoogleHealthSyncCursorGoogleHealthRepository.save_sync_result(..., cursor=...) まで足してるのに、この呼び出しでは cursor を一度も渡してない。今の実装だと google_health_sync_cursors.cursor は全 run で NULL のままで、PR で追加した sync-state の再開点が残らない。extractor の返却値からここまで cursor を通す配線が必要。

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@egograph/pipelines/sources/google_health/workflow.py` around lines 151 - 162,
ループ内で extractor の返却値に含まれる cursor を GoogleHealthRepository.save_sync_result
に渡していないため GoogleHealthSyncCursor が永続化されていません。workflow.py のループ(for result in
results:)で result から cursor を取り出し(例: result.get("cursor") または
str(result["cursor"]) を必要に応じて使用)、dependencies.repository.save_sync_result の呼び出しに
cursor=... 引数を追加して渡すように修正してください(参照シンボル: GoogleHealthSyncCursor,
GoogleHealthRepository.save_sync_result)。変更後は cursor が None
の場合でも明示的に渡されるようにして、sync-state の再開点が保存されることを確認してください。

Comment thread egograph/pipelines/tests/integration/google_health/__init__.py Outdated
Comment thread egograph/pipelines/tests/unit/google_health/test_workflow.py

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (2)
egograph/pipelines/sources/google_health/workflow.py (2)

113-119: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

cursor が sync-state 永続化まで流れてない

Line 113 の result 生成でも Line 156 の save_sync_result() 呼び出しでも cursor を保持してない。これだと GoogleHealthSyncCursor.cursor がずっと埋まらず、今回足した再開点管理が機能しない。extractor で得た cursor を result か別変数で持ち回して、そのまま repository 保存まで配線したい。GoogleHealthSyncCursorcursor を持つ契約とも噛み合ってない。

Also applies to: 154-165

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@egograph/pipelines/sources/google_health/workflow.py` around lines 113 - 119,
The results being appended in the block building the sync result (the dict
created where "data_type", "status", "record_count", "raw_ref" are set) do not
include the extractor cursor, so the cursor never reaches save_sync_result or
the GoogleHealthSyncCursor.cursor field; include the cursor produced by the
extractor (e.g., add "cursor": cursor or store extractor.cursor in a local
variable) when constructing each result entry, and ensure save_sync_result(...)
is called with that cursor (or with a results object that includes cursor) so
that the repository persists GoogleHealthSyncCursor.cursor; update any code
paths around the results.append(...) site and the save_sync_result(...) call to
forward the same cursor value rather than dropping it.

146-151: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

save_events() 失敗後もトップレベル record_count だけ成功件数のまま

Line 146 で各 data type の record_count を 0 に落としてるのに、返り値は records から再集計してる。これだと run summary だけ「保存できた件数」が残って、失敗監視と再実行判断がずれる。

💡 最小修正案
         "saved_keys": saved_keys,
-        "record_count": sum(
-            len(dataset_records) for dataset_records in records.values()
-        ),
+        "record_count": sum(int(result["record_count"]) for result in results),
         "errors": _result_errors(results),

Also applies to: 174-176

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@egograph/pipelines/sources/google_health/workflow.py` around lines 146 - 151,
When save_events() fails and you mark each result's record_count to 0 (e.g., in
the loop that updates result.update(status=SyncStatus.FAILED.value,
record_count=0, error=...)), the top-level aggregation still uses the original
"records" tally and leaves the run summary showing successful counts; change the
final aggregation logic that builds the return/summary to recompute totals from
the updated per-data-type entries (sum(result["record_count"]) and count
statuses from result["status"]) instead of using the pre-existing "records"
variable so the top-level record_count and summary reflect the per-result
changes (apply the same change at the other analogous site where results are
adjusted).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Duplicate comments:
In `@egograph/pipelines/sources/google_health/workflow.py`:
- Around line 113-119: The results being appended in the block building the sync
result (the dict created where "data_type", "status", "record_count", "raw_ref"
are set) do not include the extractor cursor, so the cursor never reaches
save_sync_result or the GoogleHealthSyncCursor.cursor field; include the cursor
produced by the extractor (e.g., add "cursor": cursor or store extractor.cursor
in a local variable) when constructing each result entry, and ensure
save_sync_result(...) is called with that cursor (or with a results object that
includes cursor) so that the repository persists GoogleHealthSyncCursor.cursor;
update any code paths around the results.append(...) site and the
save_sync_result(...) call to forward the same cursor value rather than dropping
it.
- Around line 146-151: When save_events() fails and you mark each result's
record_count to 0 (e.g., in the loop that updates
result.update(status=SyncStatus.FAILED.value, record_count=0, error=...)), the
top-level aggregation still uses the original "records" tally and leaves the run
summary showing successful counts; change the final aggregation logic that
builds the return/summary to recompute totals from the updated per-data-type
entries (sum(result["record_count"]) and count statuses from result["status"])
instead of using the pre-existing "records" variable so the top-level
record_count and summary reflect the per-result changes (apply the same change
at the other analogous site where results are adjusted).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 8d4ae61e-2f86-41cc-981d-8a48f285cb34

📥 Commits

Reviewing files that changed from the base of the PR and between 35581b4 and 53dac89.

📒 Files selected for processing (15)
  • docs/data-sources/google-health.md
  • egograph/pipelines/api/google_health.py
  • egograph/pipelines/config.py
  • egograph/pipelines/service.py
  • egograph/pipelines/sources/google_health/client.py
  • egograph/pipelines/sources/google_health/extractor.py
  • egograph/pipelines/sources/google_health/normalizer.py
  • egograph/pipelines/sources/google_health/timezone.py
  • egograph/pipelines/sources/google_health/workflow.py
  • egograph/pipelines/sources/google_health/writer.py
  • egograph/pipelines/tests/unit/google_health/test_api.py
  • egograph/pipelines/tests/unit/google_health/test_client.py
  • egograph/pipelines/tests/unit/google_health/test_extractor.py
  • egograph/pipelines/tests/unit/google_health/test_normalizer.py
  • egograph/pipelines/tests/unit/google_health/test_writer.py
🚧 Files skipped from review as they are similar to previous changes (10)
  • egograph/pipelines/service.py
  • egograph/pipelines/tests/unit/google_health/test_client.py
  • egograph/pipelines/tests/unit/google_health/test_writer.py
  • egograph/pipelines/tests/unit/google_health/test_extractor.py
  • egograph/pipelines/tests/unit/google_health/test_normalizer.py
  • egograph/pipelines/sources/google_health/extractor.py
  • egograph/pipelines/sources/google_health/normalizer.py
  • egograph/pipelines/sources/google_health/writer.py
  • docs/data-sources/google-health.md
  • egograph/pipelines/api/google_health.py

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
egograph/pipelines/tests/unit/google_health/test_workflow.py (1)

68-84: ⚡ Quick win

フィルタリング修正は good だけど、Line 79 も .get() にしとくと一貫性が出るよ。

Line 83 でちゃんと connection_idrun_id でフィルタリングするようになって、過去レビューで指摘された問題が解決されてる!ただ、Line 73 は item.get("cursor") を使ってるのに、Line 79 は item["error_message"] を直接アクセスしてるんだよね。

workflow コードでは error_message は常に渡されるから動くけど、テストコードの一貫性と将来の保守性を考えると、Line 79 も .get("error_message") にしといた方が安全だよ。

♻️ 一貫性を高める提案
                 last_run_id=item["run_id"],
                 record_count=item["record_count"],
-                last_error_message=item["error_message"],
+                last_error_message=item.get("error_message"),
                 updated_at=datetime(2026, 6, 4, tzinfo=UTC),
             )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@egograph/pipelines/tests/unit/google_health/test_workflow.py` around lines 68
- 84, The test helper list_sync_results_for_run currently constructs
GoogleHealthSyncCursor instances and accesses item["error_message"] directly;
change that to use item.get("error_message") for consistency with
item.get("cursor") and to make the test more robust—update the construction in
list_sync_results_for_run so the last_error_message field is assigned
item.get("error_message") instead of item["error_message"].
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@egograph/pipelines/tests/unit/google_health/test_workflow.py`:
- Around line 195-212: The test function
test_save_failure_reports_zero_saved_records is missing AAA comment markers; add
"# Arrange" before repository and FailingWriter setup and the
monkeypatch.setattr block, add "# Act" immediately before calling
run_google_health_ingest(_run(["steps"])), and add "# Assert" immediately before
the two assert lines so the test follows the Arrange/Act/Assert pattern used by
other tests.

---

Nitpick comments:
In `@egograph/pipelines/tests/unit/google_health/test_workflow.py`:
- Around line 68-84: The test helper list_sync_results_for_run currently
constructs GoogleHealthSyncCursor instances and accesses item["error_message"]
directly; change that to use item.get("error_message") for consistency with
item.get("cursor") and to make the test more robust—update the construction in
list_sync_results_for_run so the last_error_message field is assigned
item.get("error_message") instead of item["error_message"].
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 6d10da2e-51b3-4698-9c3f-2946641e4f8d

📥 Commits

Reviewing files that changed from the base of the PR and between 53dac89 and 9abc587.

📒 Files selected for processing (11)
  • docs/data-sources/google-health.md
  • egograph/pipelines/README.md
  • egograph/pipelines/infrastructure/db/schema.py
  • egograph/pipelines/sources/google_health/extractor.py
  • egograph/pipelines/sources/google_health/normalizer.py
  • egograph/pipelines/sources/google_health/workflow.py
  • egograph/pipelines/tests/integration/google_health/__init__.py
  • egograph/pipelines/tests/unit/google_health/test_client.py
  • egograph/pipelines/tests/unit/google_health/test_extractor.py
  • egograph/pipelines/tests/unit/google_health/test_normalizer.py
  • egograph/pipelines/tests/unit/google_health/test_workflow.py
✅ Files skipped from review due to trivial changes (2)
  • egograph/pipelines/README.md
  • egograph/pipelines/tests/integration/google_health/init.py
🚧 Files skipped from review as they are similar to previous changes (7)
  • egograph/pipelines/tests/unit/google_health/test_client.py
  • egograph/pipelines/tests/unit/google_health/test_extractor.py
  • egograph/pipelines/tests/unit/google_health/test_normalizer.py
  • egograph/pipelines/infrastructure/db/schema.py
  • docs/data-sources/google-health.md
  • egograph/pipelines/sources/google_health/workflow.py
  • egograph/pipelines/sources/google_health/normalizer.py

Comment on lines +195 to +212
def test_save_failure_reports_zero_saved_records(monkeypatch):
"""events保存失敗時はメモリ上の正規化件数を返さない。"""
repository = FakeRepository()

class FailingWriter(FakeWriter):
def save_events(self, **kwargs):
raise RuntimeError("save failed")

monkeypatch.setattr(
"pipelines.sources.google_health.workflow._build_dependencies",
lambda: _dependencies(repository, FailingWriter()),
)

result = run_google_health_ingest(_run(["steps"]))

assert result["status"] == "failed"
assert result["record_count"] == 0

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

AAA pattern のコメントマーカーを追加してね。

このテスト、構造的には AAA pattern に従ってるんだけど、# Arrange, # Act, # Assert のコメントマーカーが抜けてるよ。コーディングガイドラインで「Test code must follow the AAA pattern」って指定されてるし、同じファイルの他のテスト(lines 141-163, 165-193, 214-251, 253-281)は全部コメントマーカーを付けてるから、一貫性を保つために追加しよう。

🐛 AAA コメントを追加する提案
 def test_save_failure_reports_zero_saved_records(monkeypatch):
     """events保存失敗時はメモリ上の正規化件数を返さない。"""
+    # Arrange
     repository = FakeRepository()
 
     class FailingWriter(FakeWriter):
         def save_events(self, **kwargs):
             raise RuntimeError("save failed")
 
     monkeypatch.setattr(
         "pipelines.sources.google_health.workflow._build_dependencies",
         lambda: _dependencies(repository, FailingWriter()),
     )
 
+    # Act
     result = run_google_health_ingest(_run(["steps"]))
 
+    # Assert
     assert result["status"] == "failed"
     assert result["record_count"] == 0

As per coding guidelines, test code must follow the AAA pattern (Arrange, Act, Assert).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@egograph/pipelines/tests/unit/google_health/test_workflow.py` around lines
195 - 212, The test function test_save_failure_reports_zero_saved_records is
missing AAA comment markers; add "# Arrange" before repository and FailingWriter
setup and the monkeypatch.setattr block, add "# Act" immediately before calling
run_google_health_ingest(_run(["steps"])), and add "# Assert" immediately before
the two assert lines so the test follows the Arrange/Act/Assert pattern used by
other tests.

Source: Coding guidelines

@endo-ly endo-ly merged commit 308d5f8 into main Jun 13, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant