diff --git a/docs/architecture/pipelines.md b/docs/architecture/pipelines.md index e6c9910e..b2fc2147 100644 --- a/docs/architecture/pipelines.md +++ b/docs/architecture/pipelines.md @@ -316,6 +316,7 @@ class WorkflowDefinition: | `spotify_ingest_workflow` | 6回/日 (0,4,8,12,16,22 JST) | ingest → compact | | `github_ingest_workflow` | 1回/日 (00:00 JST) | ingest → compact | | `google_activity_ingest_workflow` | 1回/日 (23:00 JST) | ingest | +| `google_health_ingest_workflow` | API手動実行 | Raw JSON / events保存 → compacted範囲置換 | | `local_mirror_sync_workflow` | 6時間ごと | sync | | `browser_history_compact_workflow` | イベント駆動 | compact | | `browser_history_compact_maintenance_workflow` | 6時間ごと | compact maintenance | @@ -485,6 +486,7 @@ uv run python -m pipelines.main serve | GET | `/v1/sources/google-health/connection` | Google Health 接続状態 | | DELETE | `/v1/sources/google-health/connection` | Google Health 接続削除 | | POST | `/v1/sources/google-health/connection/smoke-test` | Google Health 疎通確認 | +| POST | `/v1/sources/google-health/runs` | Google Health backfill・期間指定run作成 | --- diff --git a/docs/data-sources/google-health.md b/docs/data-sources/google-health.md index 46b9ef0b..523a722e 100644 --- a/docs/data-sources/google-health.md +++ b/docs/data-sources/google-health.md @@ -49,7 +49,11 @@ APIレスポンス原本をRaw JSONとして保持し、日次指標、サンプ ↓ [Normalizer: Daily / Sample / Interval / Session] ↓ -[Storage: R2へParquet保存] +[Storage: R2 eventsへ{uuid}.parquet保存] + ↓ +[Compact: 対象期間を統合・置換] + ↓ +[Storage: R2 compactedへdata.parquet保存] ↓ [DuckDB: 日次・時系列分析] ``` @@ -71,6 +75,7 @@ APIレスポンス原本をRaw JSONとして保持し、日次指標、サンプ | **Daily rollup endpoint** | `POST /users/me/dataTypes/{dataType}/dataPoints:dailyRollUp` | | **認証方式** | Google OAuth 2.0 Web Server flow | | **必要なスコープ** | Activity and Fitness / Health Metrics and Measurements / Sleep のread-only scope | +| **期間の日付解釈** | `TIMEZONE`のローカル日付。物理時刻APIにはUTCへ変換して送信 | ### 3.2 取得対象data type @@ -135,11 +140,12 @@ Google Health APIの`DataPoint`はdata typeごとの値をunionとして保持 | フィールド名 | 型 | 必須 | 説明 | 例 | |---|---|---|---|---| -| `instantTime` | datetime | Conditional | サンプルの測定時刻 | `"2026-06-10T00:15:00Z"` | -| `interval.startTime` | datetime | Conditional | 区間・セッション開始時刻 | `"2026-06-10T00:00:00Z"` | -| `interval.endTime` | datetime | Conditional | 区間・セッション終了時刻 | `"2026-06-10T00:05:00Z"` | -| `civilTimeInterval.startTime` | string | Conditional | 日次集計のローカル開始時刻 | `"2026-06-10T00:00:00"` | -| `civilTimeInterval.endTime` | string | Conditional | 日次集計のローカル終了時刻 | `"2026-06-11T00:00:00"` | +| `{dataType}.sampleTime.physicalTime` | datetime | Conditional | サンプルの測定時刻。欠落時は`{dataType}.instantTime`を使用 | `"2026-06-10T00:15:00Z"` | +| `{dataType}.interval.startTime` | datetime | Conditional | 区間・セッション開始時刻 | `"2026-06-10T00:00:00Z"` | +| `{dataType}.interval.endTime` | datetime | Conditional | 区間・セッション終了時刻 | `"2026-06-10T00:05:00Z"` | +| `{dataType}.date` | date | Conditional | Daily recordの日付 | `"2026-06-10"` | +| `civilStartTime` | object | Conditional | daily rollupのローカル開始日時 | `{"date":{"year":2026,"month":6,"day":10}}` | +| `civilEndTime` | object | Conditional | daily rollupのローカル終了日時 | `{"date":{"year":2026,"month":6,"day":11}}` | #### data type固有値 @@ -161,7 +167,8 @@ Google Health APIの`DataPoint`はdata typeごとの値をunionとして保持 | 列名 | 型 | 説明 | 変換元 | |---|---|---|---| | `connection_id` | VARCHAR | 接続識別子 | SQLite connection | -| `date` | DATE | 指標のローカル日付 | `civilTimeInterval` | +| `data_type` | VARCHAR | Google Health data type | API path | +| `date` | DATE | 指標のローカル日付 | `{dataType}.date` / `civilStartTime` | | `metric_name` | VARCHAR | 正規化した指標名 | data type | | `value` | DOUBLE | 数値 | data type固有値 | | `unit` | VARCHAR | 単位 | data type定義 | @@ -175,7 +182,7 @@ Google Health APIの`DataPoint`はdata typeごとの値をunionとして保持 |---|---|---|---| | `connection_id` | VARCHAR | 接続識別子 | SQLite connection | | `data_type` | VARCHAR | Google Health data type | API path | -| `measured_at_utc` | TIMESTAMP | 測定時刻 | `instantTime` | +| `measured_at_utc` | TIMESTAMP | 測定時刻 | `{dataType}.sampleTime.physicalTime`(欠落時は`{dataType}.instantTime`) | | `value` | DOUBLE | 測定値 | data type固有値 | | `unit` | VARCHAR | 単位 | data type定義 | | `device_family` | VARCHAR | `fitbit_air`または`unknown` | `dataOrigin` | @@ -214,9 +221,10 @@ Google Health APIの`DataPoint`はdata typeごとの値をunionとして保持 ### 4.5 パーティション - **パーティションキー**: `year`, `month` -- **基準日**: dailyは`date`、sampleは`measured_at_utc`、interval/sessionは`started_at_utc` -- **再取得**: 対象期間のpartitionをoverwriteする -- **理由**: 期間指定クエリのpartition pruningと再取得時の重複防止 +- **基準日**: dailyはローカル`date`、sampleは`measured_at_utc`、interval/sessionは`started_at_utc` +- **再取得**: compacted内の対象期間を置換する +- **時刻partition**: sample / interval / sessionの`year` / `month`はUTC基準 +- **理由**: 保存時刻をUTCへ統一し、期間指定クエリのpartition pruningと再取得時の重複を防ぐ --- @@ -232,14 +240,16 @@ s3://egograph/ │ └── from={from}/ │ └── to={to}/ │ └── run_id={run_id}.json - └── events/google_health/ - ├── daily_metrics/ - ├── samples/ - ├── intervals/ - └── sessions/ + ├── events/google_health/ + │ └── {daily_metrics|samples|intervals|sessions}/ + │ └── year=YYYY/ + │ └── month=MM/ + │ └── {uuid}.parquet + └── compacted/events/google_health/ + └── {daily_metrics|samples|intervals|sessions}/ └── year=YYYY/ └── month=MM/ - └── {uuid}.parquet + └── data.parquet ``` 同期cursor、connection、OAuth tokenはR2ではなくPipelines ServiceのSQLiteへ保存する。 @@ -251,6 +261,7 @@ s3://egograph/ - **Sample**: `s3://egograph/events/google_health/samples/year=2026/month=06/{uuid}.parquet` - **Interval**: `s3://egograph/events/google_health/intervals/year=2026/month=06/{uuid}.parquet` - **Session**: `s3://egograph/events/google_health/sessions/year=2026/month=06/{uuid}.parquet` +- **Compacted**: `s3://egograph/compacted/events/google_health/samples/year=2026/month=06/data.parquet` --- @@ -272,7 +283,8 @@ s3://egograph/ | 対象 | 保存先 | 理由 | |---|---|---| | APIレスポンス原本 | R2 Raw JSON | normalizer修正後の再処理と監査に使用する | -| 正規化データ | R2 Parquet | DuckDBの列指向分析とpartition pruningに適する | +| 取得単位の正規化データ | R2 events Parquet | runごとの取得結果を保持する | +| 分析用の正規化データ | R2 compacted Parquet | DuckDBの列指向分析とpartition pruningに適する | | connection / token / cursor | SQLite | Pipelines Serviceの状態として一貫して更新する | ### 7.2 正規化粒度 @@ -282,14 +294,39 @@ data typeごとに個別テーブルを増やさず、Daily / Sample / Interval ### 7.3 再取得 -同一期間を再取得した場合は対象partitionをoverwriteする。 +同一期間を再取得した場合は、選択したdata typeの対象期間だけを月partition内で置換する。 +同じ月に保存されている他のdata typeと対象期間外の行は保持する。 +`compact_range`は`connection_id`と`data_type`が一致し、`date_from <= row_date < date_to`となる行だけを置換する。 +sessionの`data_type`が`sleep`の場合、対象日には`started_at_utc`ではなく`ended_at_utc`を使用する。 +取得ごとにeventsへ`{uuid}.parquet`を追加し、compact処理で対象期間を反映したcompactedの`data.parquet`を再生成する。 +events側の`{run_id}.parquet`は削除せず、削除対象になるのは置換後に空となったcompactedファイルだけである。 Google Health側の遅延同期や後日補完を取り込みながら、重複レコードを残さない。 +APIの`from` / `to`は`TIMEZONE`のローカル日付として扱う。 +`reconcile`とphysical `rollUp`には、各日付のローカル0時をUTCへ変換して送信する。 +`dailyRollUp`にはcivil dateをそのまま送信する。 +取得した絶対時刻と`ingested_at_utc`はUTCで保存し、睡眠などから生成する日次行の日付は`TIMEZONE`で判定する。 + +### 7.4 同期状態 + +`google_health_sync_cursors`へdata type単位で次を保存する。 +同期結果は`save_sync_result`がupsertし、compact処理では`last_run_id`を使って対象runの結果だけを読み込む。 + +| 列 | 内容 | +|---|---| +| `status` | `success` / `no_data` / `failed` | +| `range_start` | 取得範囲の開始日 | +| `range_end` | 取得範囲の終了日(この日を含まない) | +| `last_run_id` | 最後に処理したworkflow run ID | +| `record_count` | 正規化後の保存件数 | +| `last_error_message` | 失敗理由 | +| `updated_at` | 更新時刻 | + --- -## 8. 実装時の考慮事項 +## 11. 実装時の考慮事項 -### 8.1 エッジケース +### 11.1 エッジケース - 値が返らないdata typeは失敗ではなく`no_data`として扱う - 一部data typeが失敗しても、他data typeの取得と保存を継続する @@ -297,7 +334,7 @@ Google Health側の遅延同期や後日補完を取り込みながら、重複 - 取得期間上限が異なるdata typeはリクエスト期間を分割する - Fitbitアプリとの同期遅延を考慮して直近期間を再取得する -### 8.2 セキュリティ +### 11.2 制約・制限 - access tokenとrefresh tokenはFernetで暗号化してSQLiteへ保存する - token、authorization code、Raw JSON本文をログへ出さない @@ -305,26 +342,27 @@ Google Health側の遅延同期や後日補完を取り込みながら、重複 - callbackのquery stringはaccess logで伏せる - 健康データを含むため、Raw JSONとParquetをHigh sensitivityとして扱う -### 8.3 障害処理 +### 11.3 将来拡張 -- access tokenは期限切れ前、またはAPIの401応答時にrefreshする -- refresh tokenが拒否された場合はconnectionを`revoked`にする -- 429、5xx、network errorは指数backoffで再試行する -- API errorはauthentication、rate limit、server、network、clientに分類する +- Schedulerによるsame-day / daily / weekly repair +- DuckDB viewによる日次サマリ参照 +- 失敗runの運用向け再実行導線 --- -## 9. サンプルデータ +## 12. サンプルデータ -### 9.1 入力データ例 +### 12.1 入力データ例 ```json { "dataPoints": [ { "name": "users/me/dataTypes/heart-rate/dataPoints/example", - "instantTime": "2026-06-10T00:15:00Z", "heartRate": { + "sampleTime": { + "physicalTime": "2026-06-10T00:15:00Z" + }, "beatsPerMinute": 72 } } @@ -332,7 +370,7 @@ Google Health側の遅延同期や後日補完を取り込みながら、重複 } ``` -### 9.2 Parquet行例 +### 12.2 Parquet行例 ```json { @@ -349,11 +387,32 @@ Google Health側の遅延同期や後日補完を取り込みながら、重複 --- -## 10. セットアップ・運用手順 +## 13. 次のステップ + +### 実装状況 + +- [x] OAuth接続、token暗号化保存、token refresh +- [x] 対象data typeの取得とpagination +- [x] Raw JSON保存 +- [x] Daily / Sample / Interval / Session正規化 +- [x] events保存とcompactedの月partition範囲置換 +- [x] data type単位の同期状態保存 +- [x] backfill / range / data type指定run API +- [x] 単体テスト・統合テスト +- [ ] DuckDBマウント + +### 未実装機能 + +- [ ] Schedulerによる定期取得とrepair +- [ ] `google_health_daily_summary` view + +--- + +## 14. セットアップ・運用手順 -### 10.1 Google Cloud +### 14.1 Google Cloud -#### 10.1.1 プロジェクトを選択する +#### 14.1.1 プロジェクトを選択する 1. [Google Cloud Console](https://console.cloud.google.com/)を開き、Google Accountでログインする。 2. 画面上部のプロジェクト名をクリックする。 @@ -368,7 +427,7 @@ Google Health側の遅延同期や後日補完を取り込みながら、重複 以降の操作前に、画面上部で選択中のプロジェクトがEgoGraph用になっていることを確認する。 -#### 10.1.2 Google Health APIを有効化する +#### 14.1.2 Google Health APIを有効化する 1. [Google Health APIのAPIライブラリ](https://console.cloud.google.com/apis/library/health.googleapis.com)を開く。 2. ページ上部の選択中プロジェクトがEgoGraph用であることを確認する。 @@ -377,7 +436,7 @@ Google Health側の遅延同期や後日補完を取り込みながら、重複 ページが見つからない場合は、[APIライブラリ](https://console.cloud.google.com/apis/library)を開き、検索欄へ`Google Health API`と入力して同名のAPIを選択する。 -#### 10.1.3 OAuth consent screenとAudienceを設定する +#### 14.1.3 OAuth consent screenとAudienceを設定する 1. [Google Auth Platform Overview](https://console.cloud.google.com/auth/overview)を開く。 2. 未設定の場合は`Get started`をクリックする。 @@ -404,7 +463,7 @@ OAuth認証時は、Test userへ追加したメインGoogle Accountでログイ 継続運用へ移行する際は`In production`へ変更し、OAuth app verificationを完了する。 100ユーザーを超える利用や一般公開では、Google Healthの第三者セキュリティレビューも必要になる。 -#### 10.1.4 OAuth scopeを追加する +#### 14.1.4 OAuth scopeを追加する 1. [Data Access](https://console.cloud.google.com/auth/scopes)を開く。 2. `Add or remove scopes`をクリックする。 @@ -420,7 +479,7 @@ https://www.googleapis.com/auth/googlehealth.sleep.readonly 5. `Update`をクリックする。 6. Data Access画面へ戻ったら`Save`をクリックする。 -#### 10.1.5 OAuth clientを作成する +#### 14.1.5 OAuth clientを作成する 1. [Clients](https://console.cloud.google.com/auth/clients)を開く。 2. `Create client`をクリックする。 @@ -437,10 +496,11 @@ https://www.googleapis.com/auth/googlehealth.sleep.readonly 5. 表示されたClient IDとClient secretを安全な場所へ控える。 6. Client IDとClient secretをGit、Issue、PR、チャットへ貼らない。 -### 10.2 環境変数 +### 14.2 環境変数 | 変数 | 必須 | 内容 | |---|---|---| +| `TIMEZONE` | Yes | 取得日付と日次判定に使うIANAタイムゾーン名(例: `Asia/Tokyo`) | | `GOOGLE_HEALTH_CLIENT_ID` | Yes | OAuth client ID | | `GOOGLE_HEALTH_CLIENT_SECRET` | Yes | OAuth client secret | | `GOOGLE_HEALTH_REDIRECT_URI` | Yes | Google Cloud登録済みcallback URI | @@ -459,18 +519,19 @@ keyを変更すると既存tokenは復号できないため、tokenとは別のs `egograph/pipelines/.env`へ次を設定する。 ```dotenv +TIMEZONE=Asia/Tokyo PIPELINES_API_KEY=<十分に長いランダム文字列> -GOOGLE_HEALTH_CLIENT_ID=<10.1.5で取得したClient ID> -GOOGLE_HEALTH_CLIENT_SECRET=<10.1.5で取得したClient secret> +GOOGLE_HEALTH_CLIENT_ID=<14.1.5で取得したClient ID> +GOOGLE_HEALTH_CLIENT_SECRET=<14.1.5で取得したClient secret> GOOGLE_HEALTH_REDIRECT_URI=https:///v1/sources/google-health/auth/callback GOOGLE_HEALTH_TOKEN_ENCRYPTION_KEY=<生成したFernet key> ``` -### 10.3 接続 +### 14.3 接続 -10.2まで完了したら、次の手順を上から順に実行する。 +14.2まで完了したら、次の手順を上から順に実行する。 -#### 10.3.1 Pipelines Serviceを起動する +#### 14.3.1 Pipelines Serviceを起動する リポジトリルートで次を実行する。 @@ -480,7 +541,7 @@ uv run python -m pipelines.main serve `Uvicorn running`と表示されたら、接続作業が完了するまでこのターミナルを開いたままにする。 -#### 10.3.2 OAuth callbackの受信方法を選択する +#### 14.3.2 OAuth callbackの受信方法を選択する 次の3方式から1つを選択する。 どの方式でも、Google CloudのAuthorized redirect URIと`GOOGLE_HEALTH_REDIRECT_URI`を完全に一致させる。 @@ -563,11 +624,11 @@ ssh -N \ ``` サーバー側のPipelines Serviceが`8001`以外で待ち受ける場合は、コマンド末尾のポートを合わせる。 -認可URLは10.3.3の手順でサーバー上から取得し、手元PCのブラウザで開く。 +認可URLは14.3.3の手順でサーバー上から取得し、手元PCのブラウザで開く。 callbackは手元PCの`127.0.0.1:18001`からSSHトンネルを通ってサーバーへ届く。 認証完了後はSSHコマンドを`Ctrl+C`で停止する。 -#### 10.3.3 認可URLを取得する +#### 14.3.3 認可URLを取得する 別のターミナルを開き、リポジトリルートで次を実行する。 @@ -579,32 +640,33 @@ set +a curl -s \ -H "X-API-Key: ${PIPELINES_API_KEY}" \ "http://${PIPELINES_HOST:-127.0.0.1}:${PIPELINES_PORT:-8001}/v1/sources/google-health/auth/start" \ - | jq -r '.authorization_url' + | uv run python -c \ + 'import json, sys; print(json.load(sys.stdin)["authorization_url"])' ``` 表示された`https://accounts.google.com/...`で始まるURLをブラウザで開く。 -#### 10.3.4 Google Accountで認証する +#### 14.3.4 Google Accountで認証する 1. Test userへ追加した、Google Fitbit Airの健康データを持つメインGoogle Accountを選択する。 2. EgoGraphが要求する3つのread-only権限を確認する。 3. 権限を許可する。 4. callbackの結果として、connection IDと`active`がブラウザへ表示されることを確認する。 -#### 10.3.5 接続状態を確認する +#### 14.3.5 接続状態を確認する -10.3.3で使用したターミナルで次を実行する。 +14.3.3で使用したターミナルで次を実行する。 ```bash curl -s \ -H "X-API-Key: ${PIPELINES_API_KEY}" \ "http://${PIPELINES_HOST:-127.0.0.1}:${PIPELINES_PORT:-8001}/v1/sources/google-health/connection" \ - | jq + | uv run python -m json.tool ``` `connected`が`true`、`status`が`active`であれば接続完了。 -### 10.4 疎通確認 +### 14.4 疎通確認 `steps`と`sleep`を各1件まで取得する。 @@ -614,7 +676,92 @@ curl -X POST \ "http://${PIPELINES_HOST:-127.0.0.1}:${PIPELINES_PORT:-8001}/v1/sources/google-health/connection/smoke-test" ``` -### 10.5 接続削除 +### 14.5 データ取得 + +すべての期間指定は`from`を含み、`to`を含まない。 +たとえば`from=2026-06-01`, `to=2026-06-03`は6月1日と6月2日を取得する。 + +#### 14.5.1 初回90日分を取得する + +```bash +curl -s -X POST \ + -H "X-API-Key: ${PIPELINES_API_KEY}" \ + -H "Content-Type: application/json" \ + -d '{"mode":"initial_backfill"}' \ + "http://${PIPELINES_HOST:-127.0.0.1}:${PIPELINES_PORT:-8001}/v1/sources/google-health/runs" \ + | uv run python -m json.tool +``` + +#### 14.5.2 指定期間を全data type再取得する + +```bash +curl -s -X POST \ + -H "X-API-Key: ${PIPELINES_API_KEY}" \ + -H "Content-Type: application/json" \ + -d '{"mode":"range","from":"2026-06-01","to":"2026-06-03"}' \ + "http://${PIPELINES_HOST:-127.0.0.1}:${PIPELINES_PORT:-8001}/v1/sources/google-health/runs" \ + | uv run python -m json.tool +``` + +#### 14.5.3 指定data typeだけ再取得する + +```bash +curl -s -X POST \ + -H "X-API-Key: ${PIPELINES_API_KEY}" \ + -H "Content-Type: application/json" \ + -d '{ + "mode":"data_type_range", + "from":"2026-06-01", + "to":"2026-06-03", + "data_types":["steps","sleep","heart-rate"] + }' \ + "http://${PIPELINES_HOST:-127.0.0.1}:${PIPELINES_PORT:-8001}/v1/sources/google-health/runs" \ + | uv run python -m json.tool +``` + +レスポンスの`run_id`を使って実行結果を確認する。 + +```bash +RUN_ID=<レスポンスのrun_id> + +curl -s \ + -H "X-API-Key: ${PIPELINES_API_KEY}" \ + "http://${PIPELINES_HOST:-127.0.0.1}:${PIPELINES_PORT:-8001}/v1/runs/${RUN_ID}" \ + | uv run python -m json.tool +``` + +runの`status`は`succeeded`、`partial_failed`、`failed`のいずれかになる。 +`result_summary.data_types`でdata typeごとの`success`、`no_data`、`failed`、件数、エラーを確認する。 + +#### 14.5.4 SQLiteの同期状態を確認する + +```bash +uv run python <<'PY' +import sqlite3 +from pipelines.config import PipelinesConfig + +db_path = PipelinesConfig().database_path +conn = sqlite3.connect(db_path) +conn.row_factory = sqlite3.Row + +for row in conn.execute(""" + SELECT + data_type, + status, + range_start, + range_end, + last_run_id, + record_count, + last_error_message, + updated_at + FROM google_health_sync_cursors + ORDER BY data_type +"""): + print(dict(row)) +PY +``` + +### 14.6 接続削除 ```bash curl -X DELETE \ @@ -624,7 +771,7 @@ curl -X DELETE \ connectionを削除すると、SQLite内のOAuth tokenとsync cursorも削除される。 -### 10.6 参照 +### 14.7 参照 - [Google Health API: Set up Google Cloud and OAuth](https://developers.google.com/health/setup) - [Google OAuth 2.0 for Web Server Applications](https://developers.google.com/identity/protocols/oauth2/web-server) diff --git a/egograph/pipelines/README.md b/egograph/pipelines/README.md index 4c9d30ed..56e1b86e 100644 --- a/egograph/pipelines/README.md +++ b/egograph/pipelines/README.md @@ -126,6 +126,7 @@ Pipelines Service はポート `8001`(デフォルト)で HTTP API を提供 | `POST` | `/v1/workflows/{workflow_id}/runs` | ワークフロー手動実行 | | `POST` | `/v1/workflows/{workflow_id}/enable` | ワークフロー有効化 | | `POST` | `/v1/workflows/{workflow_id}/disable` | ワークフロー無効化 | +| `POST` | `/v1/sources/google-health/runs` | Google Health初回backfill・期間指定・data type指定run作成 | | `GET` | `/v1/runs` | 全 run 一覧 | | `GET` | `/v1/runs/{run_id}` | run 詳細 | | `GET` | `/v1/runs/{run_id}/steps/{step_id}/log` | step ログ全文 | diff --git a/egograph/pipelines/api/google_health.py b/egograph/pipelines/api/google_health.py index 4c64d0da..9507bd43 100644 --- a/egograph/pipelines/api/google_health.py +++ b/egograph/pipelines/api/google_health.py @@ -1,12 +1,19 @@ -"""Google Health OAuth and connection API.""" +"""Google Health OAuth、connection、ingest API。""" -from fastapi import APIRouter, Depends, HTTPException, Query -from pydantic import ValidationError +from datetime import date, datetime, timedelta +from zoneinfo import ZoneInfo + +from fastapi import APIRouter, Body, Depends, HTTPException, Query +from pydantic import BaseModel, Field, ValidationError, model_validator from pipelines.api.dependencies import get_service, verify_api_key from pipelines.service import PipelineService from pipelines.sources.google_health.client import GoogleHealthAPIError -from pipelines.sources.google_health.data_types import SMOKE_TEST_DATA_TYPES +from pipelines.sources.google_health.data_types import ( + DATA_TYPE_BY_NAME, + SMOKE_TEST_DATA_TYPES, +) +from pipelines.sources.google_health.models import GoogleHealthRunMode router = APIRouter( prefix="/v1/sources/google-health", @@ -14,12 +21,76 @@ ) +class GoogleHealthRunRequest(BaseModel): + """Google Health取り込みrun作成リクエスト。""" + + mode: GoogleHealthRunMode + date_from: date | None = Field(None, alias="from") + date_to: date | None = Field(None, alias="to") + data_types: list[str] = Field(default_factory=list) + + @model_validator(mode="after") + def validate_range(self) -> "GoogleHealthRunRequest": + unknown = set(self.data_types) - DATA_TYPE_BY_NAME.keys() + if unknown: + raise ValueError( + f"invalid_data_types: unsupported values: {', '.join(sorted(unknown))}" + ) + if self.mode is GoogleHealthRunMode.INITIAL_BACKFILL: + if self.date_from is not None or self.date_to is not None: + raise ValueError( + "invalid_range: initial_backfill does not accept from/to" + ) + if self.data_types: + raise ValueError( + "invalid_data_types: initial_backfill targets all data types" + ) + return self + if self.date_from is None or self.date_to is None: + raise ValueError("invalid_range: from and to are required") + if self.date_from >= self.date_to: + raise ValueError("invalid_range: from must be earlier than to") + if self.mode is GoogleHealthRunMode.DATA_TYPE_RANGE and not self.data_types: + raise ValueError("invalid_data_types: data_type_range requires data_types") + if self.mode is GoogleHealthRunMode.RANGE and self.data_types: + raise ValueError("invalid_data_types: range targets all data types") + return self + + def to_run_input( + self, + *, + timezone: ZoneInfo, + now: datetime | None = None, + ) -> dict: + """実行時に解決済みのclosed-open期間へ変換する。""" + if self.mode is GoogleHealthRunMode.INITIAL_BACKFILL: + current = now or datetime.now(tz=timezone) + date_to = current.astimezone(timezone).date() + timedelta(days=1) + date_from = date_to - timedelta(days=90) + else: + date_from = self.date_from + date_to = self.date_to + if date_from is None or date_to is None: # pragma: no cover + raise ValueError("invalid_range: unresolved range") + return { + "mode": self.mode.value, + "from": date_from.isoformat(), + "to": date_to.isoformat(), + "data_types": self.data_types, + } + + def _format_invalid_detail(exc: Exception) -> str: if isinstance(exc, ValidationError): details = [] for error in exc.errors(): field = ".".join(str(part) for part in error["loc"]) or "request" - details.append(f"invalid_{field}: {error['msg']}") + reason = str(error["msg"]) + marker = "invalid_" + if marker in reason and ":" in reason: + details.append(reason[reason.index(marker) :]) + else: + details.append(f"invalid_{field}: {reason}") return "; ".join(details) message = str(getattr(exc, "message", None) or exc).strip() @@ -141,3 +212,30 @@ def smoke_test_connection( detail=_format_invalid_detail(exc), ) from exc return {"status": "ok", "data_types": results} + + +@router.post("/runs", status_code=201) +def create_ingest_run( + request_body: dict = Body(...), + _: None = Depends(verify_api_key), + service: PipelineService = Depends(get_service), +) -> dict: + """Google Health取り込みrunを作成する。""" + try: + request = GoogleHealthRunRequest.model_validate(request_body) + except ValidationError as exc: + raise HTTPException( + status_code=400, + detail=_format_invalid_detail(exc), + ) from exc + connection = service.google_health_repository.get_connection() + if connection is None or connection.status.value != "active": + raise HTTPException( + status_code=409, + detail="invalid_google_health_connection: active connection not found", + ) + _require_client(service) + run = service.trigger_google_health_ingest( + request.to_run_input(timezone=ZoneInfo(service.config.timezone)) + ) + return run.__dict__ diff --git a/egograph/pipelines/config.py b/egograph/pipelines/config.py index 7b59a53d..fcc5311f 100644 --- a/egograph/pipelines/config.py +++ b/egograph/pipelines/config.py @@ -2,9 +2,10 @@ import os from pathlib import Path +from zoneinfo import ZoneInfo, ZoneInfoNotFoundError from egograph_paths import PIPELINES_LOGS_DIR, PIPELINES_STATE_DB_PATH -from pydantic import Field, SecretStr +from pydantic import Field, SecretStr, field_validator from pydantic_settings import BaseSettings, SettingsConfigDict USE_ENV_FILE = os.getenv("USE_ENV_FILE", "true").lower() in ("true", "1", "yes") @@ -27,7 +28,7 @@ class PipelinesConfig(BaseSettings): host: str = "127.0.0.1" port: int = 8001 api_key: SecretStr | None = None - timezone: str = "UTC" + timezone: str = Field("UTC", validation_alias="TIMEZONE") dispatcher_poll_seconds: float = 1.0 max_concurrent_runs: int = 4 lock_lease_seconds: int = 300 @@ -51,6 +52,16 @@ class PipelinesConfig(BaseSettings): validation_alias="GOOGLE_HEALTH_TOKEN_ENCRYPTION_KEY", ) + @field_validator("timezone") + @classmethod + def validate_timezone(cls, value: str) -> str: + """IANAタイムゾーン名を検証する。""" + try: + ZoneInfo(value) + except ZoneInfoNotFoundError as exc: + raise ValueError(f"invalid timezone: {value}") from exc + return value + @property def google_health_is_configured(self) -> bool: """Google Health OAuth 設定がすべて揃っているか返す。""" diff --git a/egograph/pipelines/domain/workflow.py b/egograph/pipelines/domain/workflow.py index ffe4c3f9..0217671b 100644 --- a/egograph/pipelines/domain/workflow.py +++ b/egograph/pipelines/domain/workflow.py @@ -43,6 +43,7 @@ class WorkflowRunStatus(StrEnum): QUEUED = "queued" RUNNING = "running" SUCCEEDED = "succeeded" + PARTIAL_FAILED = "partial_failed" FAILED = "failed" CANCELED = "canceled" diff --git a/egograph/pipelines/infrastructure/db/schema.py b/egograph/pipelines/infrastructure/db/schema.py index 6b4c97a4..44a5c2c9 100644 --- a/egograph/pipelines/infrastructure/db/schema.py +++ b/egograph/pipelines/infrastructure/db/schema.py @@ -128,4 +128,41 @@ def initialize_schema(conn: sqlite3.Connection) -> None: ); """ ) + _migrate_google_health_sync_cursors(conn) conn.commit() + + +def _migrate_google_health_sync_cursors(conn: sqlite3.Connection) -> None: + """既存のGoogle Health sync cursorへPhase 2列を追加する。""" + columns = { + row[1] + for row in conn.execute( + "PRAGMA table_info(google_health_sync_cursors)" + ).fetchall() + } + additions = { + "status": ( + "ALTER TABLE google_health_sync_cursors " + "ADD COLUMN status TEXT NOT NULL DEFAULT 'success'" + ), + "range_start": ( + "ALTER TABLE google_health_sync_cursors ADD COLUMN range_start TEXT" + ), + "range_end": ( + "ALTER TABLE google_health_sync_cursors ADD COLUMN range_end TEXT" + ), + "last_run_id": ( + "ALTER TABLE google_health_sync_cursors ADD COLUMN last_run_id TEXT" + ), + "record_count": ( + "ALTER TABLE google_health_sync_cursors " + "ADD COLUMN record_count INTEGER NOT NULL DEFAULT 0" + ), + "last_error_message": ( + "ALTER TABLE google_health_sync_cursors " + "ADD COLUMN last_error_message TEXT" + ), + } + for name, statement in additions.items(): + if name not in columns: + conn.execute(statement) diff --git a/egograph/pipelines/infrastructure/dispatching/run_dispatcher.py b/egograph/pipelines/infrastructure/dispatching/run_dispatcher.py index 3f2c99bf..387ec56f 100644 --- a/egograph/pipelines/infrastructure/dispatching/run_dispatcher.py +++ b/egograph/pipelines/infrastructure/dispatching/run_dispatcher.py @@ -277,11 +277,29 @@ def _execute_run( exc=last_exc, ) return + final_status = _status_from_summary(last_summary) + error_message = ( + _error_from_summary(last_summary) + if final_status + in { + WorkflowRunStatus.PARTIAL_FAILED, + WorkflowRunStatus.FAILED, + } + else None + ) self._run_repository.update_run_result( run_id=run.run_id, - status=WorkflowRunStatus.SUCCEEDED, + status=final_status, + error_message=error_message, result_summary=last_summary, ) + if final_status == WorkflowRunStatus.FAILED: + self._notify_failure( + workflow=workflow, + run=run, + error_message=error_message or "workflow reported failed status", + exc=None, + ) except Exception as exc: logger.exception( "run execution crashed unexpectedly: run_id=%s", @@ -544,3 +562,37 @@ def _notify_failure( ), ) self._notification_service.notify(event, exc=exc) + + +def _status_from_summary( + summary: dict | None, +) -> WorkflowRunStatus: + if summary is None: + return WorkflowRunStatus.SUCCEEDED + raw_status = summary.get("status", WorkflowRunStatus.SUCCEEDED.value) + normalized_status = str(raw_status).strip().lower() + terminal_statuses = { + status.value: status + for status in ( + WorkflowRunStatus.SUCCEEDED, + WorkflowRunStatus.PARTIAL_FAILED, + WorkflowRunStatus.FAILED, + ) + } + status = terminal_statuses.get(normalized_status) + if status is not None: + return status + logger.warning( + "workflow returned non-terminal or unknown summary status: status=%r", + raw_status, + ) + return WorkflowRunStatus.SUCCEEDED + + +def _error_from_summary(summary: dict | None) -> str: + if summary is None: + return "workflow reported failed status" + errors = summary.get("errors") + if isinstance(errors, list) and errors: + return "; ".join(str(error) for error in errors) + return "workflow reported failed status" diff --git a/egograph/pipelines/service.py b/egograph/pipelines/service.py index c1ecb632..a2730f40 100644 --- a/egograph/pipelines/service.py +++ b/egograph/pipelines/service.py @@ -5,6 +5,7 @@ import threading from dataclasses import dataclass from typing import cast +from zoneinfo import ZoneInfo from pydantic import SecretStr @@ -102,6 +103,7 @@ def create(cls, config: PipelinesConfig | None = None) -> "PipelineService": token_cipher, client_id=client_id.get_secret_value(), client_secret=client_secret.get_secret_value(), + timezone=ZoneInfo(config.timezone), ) service = cls( config=config, @@ -184,6 +186,21 @@ def trigger_workflow( requested_by=requested_by, ) + def trigger_google_health_ingest( + self, + request: dict, + *, + requested_by: str = "api", + ) -> WorkflowRun: + """Google Health取り込みrunを入力付きでqueueへ積む。""" + return self.run_repository.enqueue_run( + workflow_id="google_health_ingest_workflow", + trigger_type=TriggerType.MANUAL, + queued_reason=QueuedReason.MANUAL_REQUEST, + requested_by=requested_by, + result_summary={"request": request}, + ) + def set_workflow_enabled(self, workflow_id: str, enabled: bool) -> dict: """workflow の有効/無効フラグを更新し scheduler を再同期する。""" workflow = self.workflow_repository.set_workflow_enabled(workflow_id, enabled) diff --git a/egograph/pipelines/sources/google_health/client.py b/egograph/pipelines/sources/google_health/client.py index 857f3a92..9b474af2 100644 --- a/egograph/pipelines/sources/google_health/client.py +++ b/egograph/pipelines/sources/google_health/client.py @@ -5,14 +5,16 @@ import logging import time from collections.abc import Callable -from datetime import UTC, datetime, timedelta +from datetime import UTC, date, datetime, timedelta from typing import Any +from zoneinfo import ZoneInfo import requests from requests import Response from pipelines.sources.google_health.models import ConnectionStatus, OAuthToken from pipelines.sources.google_health.repository import GoogleHealthRepository +from pipelines.sources.google_health.timezone import local_date_start_rfc3339 from pipelines.sources.google_health.token_cipher import TokenCipher logger = logging.getLogger(__name__) @@ -58,6 +60,7 @@ def __init__( session: requests.Session | None = None, sleep: Callable[[float], None] = time.sleep, max_attempts: int = 3, + timezone: ZoneInfo | None = None, ) -> None: self._repository = repository self._token_cipher = token_cipher @@ -66,6 +69,7 @@ def __init__( self._session = session or requests.Session() self._sleep = sleep self._max_attempts = max_attempts + self._timezone = timezone or ZoneInfo("UTC") def list_data_points( self, @@ -85,6 +89,96 @@ def list_data_points( params={"pageSize": page_size}, ) + def reconcile_data_points( + self, + connection_id: str, + data_type: str, + *, + filter_expression: str, + page_size: int, + page_token: str | None = None, + ) -> dict[str, Any]: + """指定期間のreconciled data pointを取得する。""" + token = self._get_valid_token(connection_id) + url = f"{API_BASE_URL}/users/me/dataTypes/{data_type}/dataPoints:reconcile" + params: dict[str, Any] = { + "filter": filter_expression, + "pageSize": page_size, + "dataSourceFamily": "users/me/dataSourceFamilies/google-wearables", + } + if page_token: + params["pageToken"] = page_token + return self._request_json( + "GET", + url, + connection_id=connection_id, + token=token, + params=params, + ) + + def daily_rollup( + self, + connection_id: str, + data_type: str, + *, + date_from: date, + date_to: date, + page_token: str | None = None, + ) -> dict[str, Any]: + """指定civil date範囲の日次rollupを取得する。""" + token = self._get_valid_token(connection_id) + url = f"{API_BASE_URL}/users/me/dataTypes/{data_type}/dataPoints:dailyRollUp" + body: dict[str, Any] = { + "range": { + "start": _civil_midnight(date_from), + "end": _civil_midnight(date_to), + }, + "windowSizeDays": 1, + "pageSize": 10_000, + "dataSourceFamily": "users/me/dataSourceFamilies/google-wearables", + } + if page_token: + body["pageToken"] = page_token + return self._request_json( + "POST", + url, + connection_id=connection_id, + token=token, + json=body, + ) + + def rollup( + self, + connection_id: str, + data_type: str, + *, + date_from: date, + date_to: date, + window_size_seconds: int, + page_token: str | None = None, + ) -> dict[str, Any]: + """指定物理時間範囲を固定windowでrollupする。""" + token = self._get_valid_token(connection_id) + url = f"{API_BASE_URL}/users/me/dataTypes/{data_type}/dataPoints:rollUp" + body: dict[str, Any] = { + "range": { + "startTime": local_date_start_rfc3339(date_from, self._timezone), + "endTime": local_date_start_rfc3339(date_to, self._timezone), + }, + "windowSize": f"{window_size_seconds}s", + "pageSize": 10_000, + "dataSourceFamily": "users/me/dataSourceFamilies/google-wearables", + } + if page_token: + body["pageToken"] = page_token + return self._request_json( + "POST", + url, + connection_id=connection_id, + token=token, + json=body, + ) + def _get_valid_token(self, connection_id: str) -> OAuthToken: encrypted = self._repository.get_encrypted_token(connection_id) connection = self._repository.get_connection() @@ -195,6 +289,7 @@ def _request_json( connection_id: str, token: OAuthToken, params: dict[str, Any] | None = None, + json: dict[str, Any] | None = None, ) -> dict[str, Any]: last_error: GoogleHealthAPIError | None = None refreshed_after_unauthorized = False @@ -209,6 +304,7 @@ def _request_json( "Accept": "application/json", }, params=params, + json=json, timeout=30, ) except requests.RequestException as exc: @@ -256,3 +352,15 @@ def _request_json( if last_error is None: # pragma: no cover raise GoogleHealthAPIError("google_health_request_failed") raise last_error + + +def _civil_midnight(value: date) -> dict[str, Any]: + """Google Health CivilDateTimeの日付境界を組み立てる。""" + return { + "date": { + "year": value.year, + "month": value.month, + "day": value.day, + }, + "time": {}, + } diff --git a/egograph/pipelines/sources/google_health/data_types.py b/egograph/pipelines/sources/google_health/data_types.py index 65f990cb..dcc058f3 100644 --- a/egograph/pipelines/sources/google_health/data_types.py +++ b/egograph/pipelines/sources/google_health/data_types.py @@ -1,4 +1,4 @@ -"""Fitbit Air から取得する Google Health data type registry.""" +"""Fitbit Air から取得する Google Health data type registry。""" from dataclasses import dataclass from enum import StrEnum @@ -12,57 +12,237 @@ class DataCategory(StrEnum): SLEEP = "sleep" +class RecordKind(StrEnum): + """正規化後のrecord種別。""" + + DAILY = "daily" + SAMPLE = "sample" + INTERVAL = "interval" + SESSION = "session" + + +class FetchStrategy(StrEnum): + """data point取得方式。""" + + RECONCILE = "reconcile" + ROLLUP = "rollup" + DAILY_ROLLUP = "daily_rollup" + + @dataclass(frozen=True) class GoogleHealthDataType: - """取得対象 data type の定義。""" + """取得対象data typeの定義。""" name: str category: DataCategory - supports_list: bool = True + record_kind: RecordKind + unit: str + fetch_strategy: FetchStrategy = FetchStrategy.RECONCILE + include_interval_rollup: bool = False + include_daily_rollup: bool = False smoke_test: bool = False + @property + def filter_name(self) -> str: + """filter式で使用するsnake_case名を返す。""" + return self.name.replace("-", "_") + + @property + def payload_name(self) -> str: + """API payloadで使用するlowerCamelCase名を返す。""" + parts = self.name.split("-") + return parts[0] + "".join(part.title() for part in parts[1:]) + DATA_TYPES = ( - GoogleHealthDataType("steps", DataCategory.ACTIVITY, smoke_test=True), - GoogleHealthDataType("distance", DataCategory.ACTIVITY), - GoogleHealthDataType("total-calories", DataCategory.ACTIVITY), - GoogleHealthDataType("active-energy-burned", DataCategory.ACTIVITY), - GoogleHealthDataType("active-minutes", DataCategory.ACTIVITY), - GoogleHealthDataType("active-zone-minutes", DataCategory.ACTIVITY), - GoogleHealthDataType("activity-level", DataCategory.ACTIVITY), - GoogleHealthDataType("sedentary-period", DataCategory.ACTIVITY), - GoogleHealthDataType("calories-in-heart-rate-zone", DataCategory.ACTIVITY), - GoogleHealthDataType("time-in-heart-rate-zone", DataCategory.ACTIVITY), - GoogleHealthDataType("exercise", DataCategory.ACTIVITY), - GoogleHealthDataType("floors", DataCategory.ACTIVITY), - GoogleHealthDataType("altitude", DataCategory.ACTIVITY), - GoogleHealthDataType("swim-lengths-data", DataCategory.ACTIVITY), - GoogleHealthDataType("daily-vo2-max", DataCategory.ACTIVITY), - GoogleHealthDataType("vo2-max", DataCategory.ACTIVITY), - GoogleHealthDataType("run-vo2-max", DataCategory.ACTIVITY), - GoogleHealthDataType("heart-rate", DataCategory.HEALTH_METRICS), - GoogleHealthDataType("daily-resting-heart-rate", DataCategory.HEALTH_METRICS), - GoogleHealthDataType("heart-rate-variability", DataCategory.HEALTH_METRICS), + GoogleHealthDataType( + "steps", + DataCategory.ACTIVITY, + RecordKind.INTERVAL, + "count", + include_daily_rollup=True, + smoke_test=True, + ), + GoogleHealthDataType( + "distance", + DataCategory.ACTIVITY, + RecordKind.INTERVAL, + "millimeter", + include_daily_rollup=True, + ), + GoogleHealthDataType( + "total-calories", + DataCategory.ACTIVITY, + RecordKind.DAILY, + "kilocalorie", + fetch_strategy=FetchStrategy.DAILY_ROLLUP, + ), + GoogleHealthDataType( + "active-energy-burned", + DataCategory.ACTIVITY, + RecordKind.INTERVAL, + "kilocalorie", + include_daily_rollup=True, + ), + GoogleHealthDataType( + "active-minutes", + DataCategory.ACTIVITY, + RecordKind.INTERVAL, + "minute", + include_daily_rollup=True, + ), + GoogleHealthDataType( + "active-zone-minutes", + DataCategory.ACTIVITY, + RecordKind.INTERVAL, + "minute", + include_daily_rollup=True, + ), + GoogleHealthDataType( + "activity-level", + DataCategory.ACTIVITY, + RecordKind.INTERVAL, + "second", + fetch_strategy=FetchStrategy.ROLLUP, + include_interval_rollup=True, + ), + GoogleHealthDataType( + "sedentary-period", + DataCategory.ACTIVITY, + RecordKind.INTERVAL, + "second", + include_daily_rollup=True, + ), + GoogleHealthDataType( + "calories-in-heart-rate-zone", + DataCategory.ACTIVITY, + RecordKind.INTERVAL, + "kilocalorie", + fetch_strategy=FetchStrategy.DAILY_ROLLUP, + include_interval_rollup=True, + ), + GoogleHealthDataType( + "time-in-heart-rate-zone", + DataCategory.ACTIVITY, + RecordKind.INTERVAL, + "second", + include_daily_rollup=True, + ), + GoogleHealthDataType( + "exercise", + DataCategory.ACTIVITY, + RecordKind.SESSION, + "second", + ), + GoogleHealthDataType( + "floors", + DataCategory.ACTIVITY, + RecordKind.INTERVAL, + "count", + include_daily_rollup=True, + ), + GoogleHealthDataType( + "altitude", + DataCategory.ACTIVITY, + RecordKind.INTERVAL, + "millimeter", + include_daily_rollup=True, + ), + GoogleHealthDataType( + "swim-lengths-data", + DataCategory.ACTIVITY, + RecordKind.INTERVAL, + "count", + include_daily_rollup=True, + ), + GoogleHealthDataType( + "daily-vo2-max", + DataCategory.ACTIVITY, + RecordKind.DAILY, + "milliliter_per_kilogram_per_minute", + ), + GoogleHealthDataType( + "vo2-max", + DataCategory.ACTIVITY, + RecordKind.SAMPLE, + "milliliter_per_kilogram_per_minute", + ), + GoogleHealthDataType( + "run-vo2-max", + DataCategory.ACTIVITY, + RecordKind.SAMPLE, + "milliliter_per_kilogram_per_minute", + include_daily_rollup=True, + ), + GoogleHealthDataType( + "heart-rate", + DataCategory.HEALTH_METRICS, + RecordKind.SAMPLE, + "beats_per_minute", + include_daily_rollup=True, + ), + GoogleHealthDataType( + "daily-resting-heart-rate", + DataCategory.HEALTH_METRICS, + RecordKind.DAILY, + "beats_per_minute", + ), + GoogleHealthDataType( + "heart-rate-variability", + DataCategory.HEALTH_METRICS, + RecordKind.SAMPLE, + "millisecond", + ), GoogleHealthDataType( "daily-heart-rate-variability", DataCategory.HEALTH_METRICS, + RecordKind.DAILY, + "millisecond", + ), + GoogleHealthDataType( + "daily-heart-rate-zones", + DataCategory.HEALTH_METRICS, + RecordKind.DAILY, + "beats_per_minute", + ), + GoogleHealthDataType( + "oxygen-saturation", + DataCategory.HEALTH_METRICS, + RecordKind.SAMPLE, + "percent", ), - GoogleHealthDataType("daily-heart-rate-zones", DataCategory.HEALTH_METRICS), - GoogleHealthDataType("oxygen-saturation", DataCategory.HEALTH_METRICS), GoogleHealthDataType( "daily-oxygen-saturation", DataCategory.HEALTH_METRICS, + RecordKind.DAILY, + "percent", ), GoogleHealthDataType( "respiratory-rate-sleep-summary", DataCategory.HEALTH_METRICS, + RecordKind.SAMPLE, + "breaths_per_minute", + ), + GoogleHealthDataType( + "daily-respiratory-rate", + DataCategory.HEALTH_METRICS, + RecordKind.DAILY, + "breaths_per_minute", ), - GoogleHealthDataType("daily-respiratory-rate", DataCategory.HEALTH_METRICS), GoogleHealthDataType( "daily-sleep-temperature-derivations", DataCategory.HEALTH_METRICS, + RecordKind.DAILY, + "celsius", + ), + GoogleHealthDataType( + "sleep", + DataCategory.SLEEP, + RecordKind.SESSION, + "second", + smoke_test=True, ), - GoogleHealthDataType("sleep", DataCategory.SLEEP, smoke_test=True), ) +DATA_TYPE_BY_NAME = {item.name: item for item in DATA_TYPES} SMOKE_TEST_DATA_TYPES = tuple(item.name for item in DATA_TYPES if item.smoke_test) diff --git a/egograph/pipelines/sources/google_health/extractor.py b/egograph/pipelines/sources/google_health/extractor.py new file mode 100644 index 00000000..6b178508 --- /dev/null +++ b/egograph/pipelines/sources/google_health/extractor.py @@ -0,0 +1,245 @@ +"""Google Health data type単位の取得処理。""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import date, timedelta +from typing import Any +from zoneinfo import ZoneInfo + +from pipelines.sources.google_health.client import GoogleHealthAPIClient +from pipelines.sources.google_health.data_types import ( + FetchStrategy, + GoogleHealthDataType, + RecordKind, +) +from pipelines.sources.google_health.timezone import local_date_start_rfc3339 + +DEFAULT_PAGE_SIZE = 10_000 +SESSION_PAGE_SIZE = 25 +INTERVAL_ROLLUP_WINDOW_SECONDS = 300 +SHORT_ROLLUP_DATA_TYPES = { + "active-minutes", + "calories-in-heart-rate-zone", + "heart-rate", + "total-calories", +} + + +@dataclass(frozen=True) +class ExtractedGoogleHealthData: + """1 data type分のAPIレスポンス原本。""" + + payload: dict[str, Any] + record_count: int + + +class GoogleHealthExtractor: + """期間指定、pagination、daily rollup分割を扱う。""" + + def __init__( + self, + client: GoogleHealthAPIClient, + *, + timezone: ZoneInfo | None = None, + ) -> None: + self._client = client + self._timezone = timezone or ZoneInfo("UTC") + + def extract( + self, + *, + connection_id: str, + data_type: GoogleHealthDataType, + date_from: date, + date_to: date, + ) -> ExtractedGoogleHealthData: + """指定data typeの期間内データをすべて取得する。""" + if date_from >= date_to: + raise ValueError("date_from must be earlier than date_to") + reconcile_responses: list[dict[str, Any]] = [] + rollup_responses: list[dict[str, Any]] = [] + daily_rollup_responses: list[dict[str, Any]] = [] + + if data_type.fetch_strategy is FetchStrategy.RECONCILE: + reconcile_responses = self._fetch_reconciled_pages( + connection_id=connection_id, + data_type=data_type, + date_from=date_from, + date_to=date_to, + ) + if data_type.include_interval_rollup: + rollup_responses = self._fetch_interval_rollups( + connection_id=connection_id, + data_type=data_type, + date_from=date_from, + date_to=date_to, + ) + if ( + data_type.fetch_strategy is FetchStrategy.DAILY_ROLLUP + or data_type.include_daily_rollup + ): + daily_rollup_responses = self._fetch_daily_rollups( + connection_id=connection_id, + data_type=data_type, + date_from=date_from, + date_to=date_to, + ) + + record_count = ( + sum(len(response.get("dataPoints", [])) for response in reconcile_responses) + + sum( + len(response.get("rollupDataPoints", [])) + for response in rollup_responses + ) + + sum( + len(response.get("rollupDataPoints", [])) + for response in daily_rollup_responses + ) + ) + return ExtractedGoogleHealthData( + payload={ + "dataType": data_type.name, + "range": { + "from": date_from.isoformat(), + "to": date_to.isoformat(), + }, + "reconcileResponses": reconcile_responses, + "rollupResponses": rollup_responses, + "dailyRollupResponses": daily_rollup_responses, + }, + record_count=record_count, + ) + + def _fetch_reconciled_pages( + self, + *, + connection_id: str, + data_type: GoogleHealthDataType, + date_from: date, + date_to: date, + ) -> list[dict[str, Any]]: + responses: list[dict[str, Any]] = [] + page_token: str | None = None + seen_page_tokens: set[str] = set() + while True: + response = self._client.reconcile_data_points( + connection_id, + data_type.name, + filter_expression=_build_filter( + data_type, + date_from, + date_to, + timezone=self._timezone, + ), + page_size=( + SESSION_PAGE_SIZE + if data_type.record_kind is RecordKind.SESSION + else DEFAULT_PAGE_SIZE + ), + page_token=page_token, + ) + responses.append(response) + next_page_token = response.get("nextPageToken") + if not isinstance(next_page_token, str) or not next_page_token: + return responses + if next_page_token in seen_page_tokens: + raise RuntimeError("google_health_repeated_page_token") + seen_page_tokens.add(next_page_token) + page_token = next_page_token + + def _fetch_daily_rollups( + self, + *, + connection_id: str, + data_type: GoogleHealthDataType, + date_from: date, + date_to: date, + ) -> list[dict[str, Any]]: + responses: list[dict[str, Any]] = [] + max_days = 14 if data_type.name in SHORT_ROLLUP_DATA_TYPES else 90 + chunk_start = date_from + while chunk_start < date_to: + chunk_end = min(chunk_start + timedelta(days=max_days), date_to) + page_token: str | None = None + seen_page_tokens: set[str] = set() + while True: + response = self._client.daily_rollup( + connection_id, + data_type.name, + date_from=chunk_start, + date_to=chunk_end, + page_token=page_token, + ) + responses.append(response) + next_page_token = response.get("nextPageToken") + if not isinstance(next_page_token, str) or not next_page_token: + break + if next_page_token in seen_page_tokens: + raise RuntimeError("google_health_repeated_page_token") + seen_page_tokens.add(next_page_token) + page_token = next_page_token + chunk_start = chunk_end + return responses + + def _fetch_interval_rollups( + self, + *, + connection_id: str, + data_type: GoogleHealthDataType, + date_from: date, + date_to: date, + ) -> list[dict[str, Any]]: + responses: list[dict[str, Any]] = [] + max_days = 14 if data_type.name in SHORT_ROLLUP_DATA_TYPES else 90 + chunk_start = date_from + while chunk_start < date_to: + chunk_end = min(chunk_start + timedelta(days=max_days), date_to) + page_token: str | None = None + seen_page_tokens: set[str] = set() + while True: + response = self._client.rollup( + connection_id, + data_type.name, + date_from=chunk_start, + date_to=chunk_end, + window_size_seconds=INTERVAL_ROLLUP_WINDOW_SECONDS, + page_token=page_token, + ) + responses.append(response) + next_page_token = response.get("nextPageToken") + if not isinstance(next_page_token, str) or not next_page_token: + break + if next_page_token in seen_page_tokens: + raise RuntimeError("google_health_repeated_page_token") + seen_page_tokens.add(next_page_token) + page_token = next_page_token + chunk_start = chunk_end + return responses + + +def _build_filter( + data_type: GoogleHealthDataType, + date_from: date, + date_to: date, + *, + timezone: ZoneInfo | None = None, +) -> str: + """record種別に対応するGoogle Health filter式を返す。""" + if data_type.record_kind is RecordKind.DAILY: + field = f"{data_type.filter_name}.date" + start = date_from.isoformat() + end = date_to.isoformat() + elif data_type.record_kind is RecordKind.SAMPLE: + field = f"{data_type.filter_name}.sample_time.physical_time" + start = local_date_start_rfc3339(date_from, timezone or ZoneInfo("UTC")) + end = local_date_start_rfc3339(date_to, timezone or ZoneInfo("UTC")) + elif data_type.record_kind is RecordKind.SESSION and data_type.name == "sleep": + field = "sleep.interval.end_time" + start = local_date_start_rfc3339(date_from, timezone or ZoneInfo("UTC")) + end = local_date_start_rfc3339(date_to, timezone or ZoneInfo("UTC")) + else: + field = f"{data_type.filter_name}.interval.start_time" + start = local_date_start_rfc3339(date_from, timezone or ZoneInfo("UTC")) + end = local_date_start_rfc3339(date_to, timezone or ZoneInfo("UTC")) + return f'{field} >= "{start}" AND {field} < "{end}"' diff --git a/egograph/pipelines/sources/google_health/models.py b/egograph/pipelines/sources/google_health/models.py index 4bf29ef7..7b195265 100644 --- a/egograph/pipelines/sources/google_health/models.py +++ b/egograph/pipelines/sources/google_health/models.py @@ -1,7 +1,7 @@ -"""Google Health 接続のドメインモデル。""" +"""Google Healthのドメインモデル。""" from dataclasses import dataclass -from datetime import datetime +from datetime import date, datetime from enum import StrEnum @@ -47,3 +47,45 @@ class EncryptedOAuthToken: expires_at: datetime token_type: str updated_at: datetime + + +class GoogleHealthRunMode(StrEnum): + """Google Health取り込みの実行モード。""" + + INITIAL_BACKFILL = "initial_backfill" + RANGE = "range" + DATA_TYPE_RANGE = "data_type_range" + + +class SyncStatus(StrEnum): + """data type単位の同期結果。""" + + SUCCESS = "success" + NO_DATA = "no_data" + FAILED = "failed" + + +@dataclass(frozen=True) +class GoogleHealthIngestRequest: + """Google Health取り込みrunの入力。""" + + mode: GoogleHealthRunMode + date_from: date + date_to: date + data_types: tuple[str, ...] + + +@dataclass(frozen=True) +class GoogleHealthSyncCursor: + """data type単位の最終同期結果。""" + + connection_id: str + data_type: str + cursor: str | None + status: SyncStatus + range_start: date | None + range_end: date | None + last_run_id: str | None + record_count: int + last_error_message: str | None + updated_at: datetime diff --git a/egograph/pipelines/sources/google_health/normalizer.py b/egograph/pipelines/sources/google_health/normalizer.py new file mode 100644 index 00000000..be79b615 --- /dev/null +++ b/egograph/pipelines/sources/google_health/normalizer.py @@ -0,0 +1,416 @@ +"""Google Health APIレスポンスの正規化。""" + +from __future__ import annotations + +import re +from collections import defaultdict +from datetime import UTC, date, datetime +from typing import Any +from zoneinfo import ZoneInfo + +from pipelines.sources.google_health.data_types import ( + GoogleHealthDataType, + RecordKind, +) +from pipelines.sources.google_health.timezone import local_date + +_DURATION_PATTERN = re.compile(r"^-?\d+(?:\.\d+)?s$") +_IGNORED_NUMERIC_PATH_PARTS = { + "year", + "month", + "day", + "hours", + "minutes", + "seconds", + "nanos", +} +_ACTIVITY_LEVEL_VALUES = { + "ACTIVITY_LEVEL_TYPE_UNSPECIFIED": 0.0, + "SEDENTARY": 1.0, + "LIGHT": 2.0, + "MODERATE": 3.0, + "VIGOROUS": 4.0, +} + + +def normalize_google_health_payload( + *, + connection_id: str, + data_type: GoogleHealthDataType, + payload: dict[str, Any], + raw_ref: str, + ingested_at: datetime | None = None, + timezone: ZoneInfo | None = None, +) -> dict[str, list[dict[str, Any]]]: + """APIレスポンス原本を4種類のParquet行へ変換する。""" + ingested_at = ingested_at or datetime.now(tz=UTC) + timezone = timezone or ZoneInfo("UTC") + result: dict[str, list[dict[str, Any]]] = { + "daily_metrics": [], + "samples": [], + "intervals": [], + "sessions": [], + } + + for response in payload.get("reconcileResponses") or []: + for point in response.get("dataPoints", []): + _normalize_data_point( + result=result, + connection_id=connection_id, + data_type=data_type, + point=point, + raw_ref=raw_ref, + ingested_at=ingested_at, + timezone=timezone, + ) + + for response in payload.get("rollupResponses") or []: + for point in response.get("rollupDataPoints", []): + _normalize_data_point( + result=result, + connection_id=connection_id, + data_type=data_type, + point=point, + raw_ref=raw_ref, + ingested_at=ingested_at, + timezone=timezone, + ) + + for response in payload.get("dailyRollupResponses") or []: + for point in response.get("rollupDataPoints", []): + _append_daily_metrics( + result["daily_metrics"], + connection_id=connection_id, + data_type=data_type, + point=point, + raw_ref=raw_ref, + ingested_at=ingested_at, + timezone=timezone, + ) + return result + + +def _normalize_data_point( + *, + result: dict[str, list[dict[str, Any]]], + connection_id: str, + data_type: GoogleHealthDataType, + point: dict[str, Any], + raw_ref: str, + ingested_at: datetime, + timezone: ZoneInfo, +) -> None: + payload = _payload_for(point, data_type) + device_family = _device_family(point) + common = { + "connection_id": connection_id, + "data_type": data_type.name, + "device_family": device_family, + "raw_ref": raw_ref, + "ingested_at_utc": ingested_at, + } + + if data_type.record_kind is RecordKind.DAILY: + _append_daily_metrics( + result["daily_metrics"], + connection_id=connection_id, + data_type=data_type, + point=point, + raw_ref=raw_ref, + ingested_at=ingested_at, + timezone=timezone, + ) + return + + if data_type.record_kind is RecordKind.SAMPLE: + measured_at = _parse_datetime( + _nested(payload, "sampleTime", "physicalTime") or point.get("instantTime") + ) + value = _first_numeric_value(payload) + if measured_at is not None and value is not None: + result["samples"].append( + { + **common, + "measured_at_utc": measured_at, + "value": value, + "unit": data_type.unit, + } + ) + if data_type.name == "respiratory-rate-sleep-summary": + result["daily_metrics"].append( + { + "connection_id": connection_id, + "data_type": data_type.name, + "date": local_date(measured_at, timezone), + "metric_name": "respiratory_rate_sleep_summary", + "value": value, + "unit": data_type.unit, + "device_family": device_family, + "raw_ref": raw_ref, + "ingested_at_utc": ingested_at, + } + ) + return + + started_at, ended_at = _interval_times(payload, point) + if started_at is None or ended_at is None: + return + + if data_type.record_kind is RecordKind.INTERVAL: + value = _interval_value( + payload, + data_type=data_type, + started_at=started_at, + ended_at=ended_at, + ) + if value is not None: + result["intervals"].append( + { + **common, + "started_at_utc": started_at, + "ended_at_utc": ended_at, + "value": value, + "unit": data_type.unit, + } + ) + return + + duration_seconds = int((ended_at - started_at).total_seconds()) + result["sessions"].append( + { + **common, + "session_id": str(point.get("dataPointName") or point.get("name") or ""), + "started_at_utc": started_at, + "ended_at_utc": ended_at, + "duration_seconds": duration_seconds, + "session_type": _session_type(payload, data_type), + } + ) + result["daily_metrics"].append( + { + "connection_id": connection_id, + "data_type": data_type.name, + "date": local_date( + ended_at if data_type.name == "sleep" else started_at, + timezone, + ), + "metric_name": ( + "sleep_duration" if data_type.name == "sleep" else "exercise_duration" + ), + "value": float(duration_seconds), + "unit": "second", + "device_family": device_family, + "raw_ref": raw_ref, + "ingested_at_utc": ingested_at, + } + ) + + +def _append_daily_metrics( + rows: list[dict[str, Any]], + *, + connection_id: str, + data_type: GoogleHealthDataType, + point: dict[str, Any], + raw_ref: str, + ingested_at: datetime, + timezone: ZoneInfo, +) -> None: + payload = _payload_for(point, data_type) + metric_date = _daily_date(point, payload) + if metric_date is None: + return + values = list(_numeric_leaves(payload)) + if not values: + return + device_family = _device_family(point) + base_name = data_type.name.replace("-", "_") + for path, value in values: + suffix = "_".join(_snake_case(part) for part in path) + metric_name = base_name if len(values) == 1 else f"{base_name}_{suffix}" + rows.append( + { + "connection_id": connection_id, + "data_type": data_type.name, + "date": metric_date, + "metric_name": metric_name, + "value": value, + "unit": data_type.unit, + "device_family": device_family, + "raw_ref": raw_ref, + "ingested_at_utc": ingested_at, + } + ) + + +def aggregate_daily_metrics( + rows: list[dict[str, Any]], +) -> list[dict[str, Any]]: + """同一data type・日付・metricの行を合算して日次1行へまとめる。""" + grouped: dict[tuple[str, str, date, str], list[dict[str, Any]]] = defaultdict(list) + for row in rows: + grouped[ + ( + row["connection_id"], + row["data_type"], + row["date"], + row["metric_name"], + ) + ].append(row) + + aggregated: list[dict[str, Any]] = [] + for group in grouped.values(): + latest = max(group, key=lambda item: item["ingested_at_utc"]) + values = [float(item["value"]) for item in group] + value = ( + sum(values) / len(values) + if latest["data_type"] == "respiratory-rate-sleep-summary" + else sum(values) + ) + aggregated.append( + { + **latest, + "value": value, + } + ) + return aggregated + + +def _payload_for( + point: dict[str, Any], + data_type: GoogleHealthDataType, +) -> dict[str, Any]: + value = point.get(data_type.payload_name) + return value if isinstance(value, dict) else point + + +def _daily_date(point: dict[str, Any], payload: dict[str, Any]) -> date | None: + raw_date = payload.get("date") + if isinstance(raw_date, str): + try: + return date.fromisoformat(raw_date) + except ValueError: + return None + if isinstance(raw_date, dict): + return _civil_date(raw_date) + civil_start = point.get("civilStartTime") or point.get("civilTimeInterval", {}).get( + "startTime" + ) + if isinstance(civil_start, str): + try: + return date.fromisoformat(civil_start[:10]) + except ValueError: + return None + if isinstance(civil_start, dict): + return _civil_date(civil_start.get("date", civil_start)) + return None + + +def _civil_date(value: dict[str, Any]) -> date | None: + try: + return date(int(value["year"]), int(value["month"]), int(value["day"])) + except (KeyError, TypeError, ValueError): + return None + + +def _interval_times( + payload: dict[str, Any], + point: dict[str, Any], +) -> tuple[datetime | None, datetime | None]: + interval = payload.get("interval") or point.get("interval") or {} + if not isinstance(interval, dict): + interval = {} + return ( + _parse_datetime(interval.get("startTime") or point.get("startTime")), + _parse_datetime(interval.get("endTime") or point.get("endTime")), + ) + + +def _parse_datetime(value: Any) -> datetime | None: + if not isinstance(value, str): + return None + try: + parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) + except ValueError: + return None + return parsed.astimezone(UTC) + + +def _first_numeric_value(payload: dict[str, Any]) -> float | None: + return next((value for _, value in _numeric_leaves(payload)), None) + + +def _interval_value( + payload: dict[str, Any], + *, + data_type: GoogleHealthDataType, + started_at: datetime, + ended_at: datetime, +) -> float | None: + value = _first_numeric_value(payload) + if value is not None: + return value + if data_type.name in {"sedentary-period", "time-in-heart-rate-zone"}: + return (ended_at - started_at).total_seconds() + if data_type.name == "activity-level": + raw_level = payload.get("activityLevelType") or payload.get("activityLevel") + if isinstance(raw_level, str): + return _ACTIVITY_LEVEL_VALUES.get(raw_level.upper()) + return None + + +def _numeric_leaves( + value: Any, + path: tuple[str, ...] = (), +): + if isinstance(value, bool): + return + if isinstance(value, (int, float)): + if not path or path[-1] not in _IGNORED_NUMERIC_PATH_PARTS: + yield path, float(value) + return + if isinstance(value, str): + try: + parsed = ( + float(value[:-1]) if _DURATION_PATTERN.match(value) else float(value) + ) + except ValueError: + return + if not path or path[-1] not in _IGNORED_NUMERIC_PATH_PARTS: + yield path, parsed + return + if isinstance(value, dict): + for key, item in value.items(): + if key in {"interval", "sampleTime", "date"}: + continue + yield from _numeric_leaves(item, (*path, key)) + elif isinstance(value, list): + for index, item in enumerate(value): + yield from _numeric_leaves(item, (*path, str(index))) + + +def _nested(value: dict[str, Any], *path: str) -> Any: + current: Any = value + for key in path: + if not isinstance(current, dict): + return None + current = current.get(key) + return current + + +def _device_family(point: dict[str, Any]) -> str: + data_source = point.get("dataSource") or point.get("dataOrigin") or {} + text = str(data_source).lower() + return "fitbit_air" if "fitbit" in text or "google-wearables" in text else "unknown" + + +def _session_type( + payload: dict[str, Any], + data_type: GoogleHealthDataType, +) -> str: + value = payload.get("type") or payload.get("exerciseType") + return str(value).lower() if value else data_type.name + + +def _snake_case(value: str) -> str: + return re.sub(r"(? EncryptedOAuthToken: ) +def _map_sync_cursor(row: sqlite3.Row) -> GoogleHealthSyncCursor: + return GoogleHealthSyncCursor( + connection_id=row["connection_id"], + data_type=row["data_type"], + cursor=row["cursor"], + status=SyncStatus(row["status"]), + range_start=date.fromisoformat(row["range_start"]) + if row["range_start"] + else None, + range_end=date.fromisoformat(row["range_end"]) if row["range_end"] else None, + last_run_id=row["last_run_id"], + record_count=row["record_count"], + last_error_message=row["last_error_message"], + updated_at=_required_datetime(row["updated_at"], "updated_at"), + ) + + class GoogleHealthRepository(SQLiteRepository): """Google Health connection、token、OAuth state を永続化する。""" @@ -264,6 +283,99 @@ def update_connection_status( raise RuntimeError("google_health_connection_fetch_failed") return connection + def save_sync_result( + self, + *, + connection_id: str, + data_type: str, + status: SyncStatus, + range_start: date, + range_end: date, + run_id: str, + record_count: int = 0, + cursor: str | None = None, + error_message: str | None = None, + ) -> GoogleHealthSyncCursor: + """data type単位の最終同期結果を保存する。""" + with self._mutex, self._conn: + self._conn.execute( + """ + INSERT INTO google_health_sync_cursors ( + connection_id, + data_type, + cursor, + status, + range_start, + range_end, + last_run_id, + record_count, + last_error_message, + updated_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(connection_id, data_type) DO UPDATE SET + cursor = excluded.cursor, + status = excluded.status, + range_start = excluded.range_start, + range_end = excluded.range_end, + last_run_id = excluded.last_run_id, + record_count = excluded.record_count, + last_error_message = excluded.last_error_message, + updated_at = excluded.updated_at + """, + ( + connection_id, + data_type, + cursor, + status.value, + range_start.isoformat(), + range_end.isoformat(), + run_id, + record_count, + error_message, + dt_to_text(utc_now()), + ), + ) + result = self.get_sync_cursor(connection_id, data_type) + if result is None: # pragma: no cover + raise RuntimeError("google_health_sync_cursor_fetch_failed") + return result + + def get_sync_cursor( + self, + connection_id: str, + data_type: str, + ) -> GoogleHealthSyncCursor | None: + """data typeの最終同期結果を取得する。""" + with self._mutex: + row = self._conn.execute( + """ + SELECT * + FROM google_health_sync_cursors + WHERE connection_id = ? AND data_type = ? + """, + (connection_id, data_type), + ).fetchone() + return _map_sync_cursor(row) if row else None + + def list_sync_results_for_run( + self, + connection_id: str, + run_id: str, + ) -> list[GoogleHealthSyncCursor]: + """指定runで更新されたdata type別同期結果を返す。""" + with self._mutex: + rows = self._conn.execute( + """ + SELECT * + FROM google_health_sync_cursors + WHERE connection_id = ? AND last_run_id = ? + ORDER BY data_type + """, + (connection_id, run_id), + ).fetchall() + return [_map_sync_cursor(row) for row in rows] + def delete_connection(self, connection_id: str) -> bool: """connection と関連 token/cursor を削除する。""" with self._mutex, self._conn: diff --git a/egograph/pipelines/sources/google_health/timezone.py b/egograph/pipelines/sources/google_health/timezone.py new file mode 100644 index 00000000..13aa5ace --- /dev/null +++ b/egograph/pipelines/sources/google_health/timezone.py @@ -0,0 +1,19 @@ +"""Google Health取得で使用するタイムゾーン変換。""" + +from datetime import UTC, date, datetime, time +from zoneinfo import ZoneInfo + + +def local_date_start_utc(value: date, timezone: ZoneInfo) -> datetime: + """ローカル日付の開始時刻をUTCへ変換する。""" + return datetime.combine(value, time.min, tzinfo=timezone).astimezone(UTC) + + +def local_date_start_rfc3339(value: date, timezone: ZoneInfo) -> str: + """ローカル日付の開始時刻をUTCのRFC 3339文字列で返す。""" + return local_date_start_utc(value, timezone).isoformat().replace("+00:00", "Z") + + +def local_date(value: datetime, timezone: ZoneInfo) -> date: + """絶対時刻を設定タイムゾーンの日付へ変換する。""" + return value.astimezone(timezone).date() diff --git a/egograph/pipelines/sources/google_health/workflow.py b/egograph/pipelines/sources/google_health/workflow.py new file mode 100644 index 00000000..364b5e7c --- /dev/null +++ b/egograph/pipelines/sources/google_health/workflow.py @@ -0,0 +1,395 @@ +"""Google Health ingestion workflow。""" + +from __future__ import annotations + +import logging +import sqlite3 +from dataclasses import dataclass +from datetime import date +from typing import cast +from zoneinfo import ZoneInfo + +from pydantic import SecretStr + +from pipelines.config import PipelinesConfig +from pipelines.domain.workflow import WorkflowRun +from pipelines.infrastructure.db.connection import connect +from pipelines.infrastructure.db.schema import initialize_schema +from pipelines.sources.common.settings import PipelinesSettings +from pipelines.sources.google_health.client import GoogleHealthAPIClient +from pipelines.sources.google_health.data_types import DATA_TYPE_BY_NAME, DATA_TYPES +from pipelines.sources.google_health.extractor import GoogleHealthExtractor +from pipelines.sources.google_health.models import ( + ConnectionStatus, + GoogleHealthIngestRequest, + GoogleHealthRunMode, + GoogleHealthSyncCursor, + SyncStatus, +) +from pipelines.sources.google_health.normalizer import ( + aggregate_daily_metrics, + normalize_google_health_payload, +) +from pipelines.sources.google_health.repository import GoogleHealthRepository +from pipelines.sources.google_health.token_cipher import TokenCipher +from pipelines.sources.google_health.writer import GoogleHealthWriter + +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class GoogleHealthWorkflowDependencies: + """workflow実行に必要なadapter群。""" + + repository: GoogleHealthRepository + extractor: GoogleHealthExtractor + writer: GoogleHealthWriter + timezone: ZoneInfo = ZoneInfo("UTC") + db_connection: sqlite3.Connection | None = None + + +def run_google_health_ingest(run: WorkflowRun) -> dict[str, object]: + """Google Healthの対象data typeを取得しRaw/eventsへ保存する。""" + dependencies = _build_dependencies() + try: + return _execute_google_health_ingest(run, dependencies) + finally: + if dependencies.db_connection is not None: + dependencies.db_connection.close() + + +def _execute_google_health_ingest( + run: WorkflowRun, + dependencies: GoogleHealthWorkflowDependencies, +) -> dict[str, object]: + """解決済みadapterを使ってGoogle Health取り込みを実行する。""" + request = _parse_request(run) + connection = dependencies.repository.get_connection() + if connection is None or connection.status is not ConnectionStatus.ACTIVE: + raise RuntimeError("google_health_active_connection_not_found") + + records: dict[str, list[dict]] = { + "daily_metrics": [], + "samples": [], + "intervals": [], + "sessions": [], + } + results: list[dict[str, object]] = [] + completed_data_types: list[str] = [] + + for data_type_name in request.data_types: + data_type = DATA_TYPE_BY_NAME[data_type_name] + try: + extracted = dependencies.extractor.extract( + connection_id=connection.connection_id, + data_type=data_type, + date_from=request.date_from, + date_to=request.date_to, + ) + raw_ref = dependencies.writer.save_raw( + connection_id=connection.connection_id, + data_type=data_type.name, + date_from=request.date_from, + date_to=request.date_to, + run_id=run.run_id, + payload=extracted.payload, + ) + normalized = normalize_google_health_payload( + connection_id=connection.connection_id, + data_type=data_type, + payload=extracted.payload, + raw_ref=raw_ref, + timezone=dependencies.timezone, + ) + normalized_count = sum(len(rows) for rows in normalized.values()) + if extracted.record_count > 0 and normalized_count == 0: + raise ValueError("google_health_normalization_produced_no_records") + for dataset, rows in normalized.items(): + records[dataset].extend(rows) + status = ( + SyncStatus.SUCCESS if extracted.record_count > 0 else SyncStatus.NO_DATA + ) + completed_data_types.append(data_type.name) + results.append( + { + "data_type": data_type.name, + "status": status.value, + "record_count": normalized_count, + "raw_ref": raw_ref, + } + ) + except Exception as exc: + logger.exception( + "Google Health data type ingest failed: data_type=%s", + data_type.name, + ) + results.append( + { + "data_type": data_type.name, + "status": SyncStatus.FAILED.value, + "record_count": 0, + "error": _short_error(exc), + } + ) + + records["daily_metrics"] = aggregate_daily_metrics(records["daily_metrics"]) + saved_keys: list[str] = [] + if completed_data_types: + try: + saved_keys = dependencies.writer.save_events( + run_id=run.run_id, + records=records, + ) + except Exception as exc: + logger.exception("Google Health events保存に失敗しました") + error = _short_error(exc) + for result in results: + if result["data_type"] in completed_data_types: + result.update( + status=SyncStatus.FAILED.value, + record_count=0, + error=error, + ) + + for result in results: + status = SyncStatus(str(result["status"])) + dependencies.repository.save_sync_result( + connection_id=connection.connection_id, + data_type=str(result["data_type"]), + status=status, + range_start=request.date_from, + range_end=request.date_to, + run_id=run.run_id, + record_count=int(result["record_count"]), + error_message=str(result["error"]) if "error" in result else None, + ) + + return { + "provider": "google_health", + "operation": "ingest", + "status": _run_status(results), + "request": _request_summary(request), + "data_types": results, + "saved_keys": saved_keys, + "record_count": sum(int(result["record_count"]) for result in results), + "errors": _result_errors(results), + } + + +def run_google_health_compact(run: WorkflowRun) -> dict[str, object]: + """今回runのeventsを対象期間のcompacted Parquetへ反映する。""" + dependencies = _build_dependencies() + try: + return _execute_google_health_compact(run, dependencies) + finally: + if dependencies.db_connection is not None: + dependencies.db_connection.close() + + +def _execute_google_health_compact( + run: WorkflowRun, + dependencies: GoogleHealthWorkflowDependencies, +) -> dict[str, object]: + """同期結果を基に成功したdata typeだけをcompactする。""" + request = _parse_request(run) + connection = dependencies.repository.get_connection() + if connection is None or connection.status is not ConnectionStatus.ACTIVE: + raise RuntimeError("google_health_active_connection_not_found") + + cursors = dependencies.repository.list_sync_results_for_run( + connection.connection_id, + run.run_id, + ) + results = _cursor_results(request, cursors) + completed_data_types = tuple( + str(result["data_type"]) + for result in results + if result["status"] in {SyncStatus.SUCCESS.value, SyncStatus.NO_DATA.value} + ) + compacted_keys: list[str] = [] + if completed_data_types: + try: + compacted_keys = dependencies.writer.compact_range( + connection_id=connection.connection_id, + selected_data_types=completed_data_types, + date_from=request.date_from, + date_to=request.date_to, + run_id=run.run_id, + ) + except Exception as exc: + logger.exception("Google Health compactionに失敗しました") + error = _short_error(exc) + for result in results: + if result["data_type"] not in completed_data_types: + continue + result.update( + status=SyncStatus.FAILED.value, + error=error, + ) + dependencies.repository.save_sync_result( + connection_id=connection.connection_id, + data_type=str(result["data_type"]), + status=SyncStatus.FAILED, + range_start=request.date_from, + range_end=request.date_to, + run_id=run.run_id, + record_count=int(result["record_count"]), + error_message=error, + ) + + return { + "provider": "google_health", + "operation": "compact", + "status": _run_status(results), + "request": _request_summary(request), + "data_types": results, + "compacted_keys": compacted_keys, + "record_count": sum(int(result["record_count"]) for result in results), + "errors": _result_errors(results), + } + + +def _cursor_results( + request: GoogleHealthIngestRequest, + cursors: list[GoogleHealthSyncCursor], +) -> list[dict[str, object]]: + cursors_by_type = {cursor.data_type: cursor for cursor in cursors} + results: list[dict[str, object]] = [] + for data_type in request.data_types: + cursor = cursors_by_type.get(data_type) + if cursor is None: + results.append( + { + "data_type": data_type, + "status": SyncStatus.FAILED.value, + "record_count": 0, + "error": "sync_result_not_found", + } + ) + continue + result: dict[str, object] = { + "data_type": cursor.data_type, + "status": cursor.status.value, + "record_count": cursor.record_count, + } + if cursor.last_error_message: + result["error"] = cursor.last_error_message + results.append(result) + return results + + +def _request_summary(request: GoogleHealthIngestRequest) -> dict[str, object]: + return { + "mode": request.mode.value, + "from": request.date_from.isoformat(), + "to": request.date_to.isoformat(), + "data_types": list(request.data_types), + } + + +def _run_status(results: list[dict[str, object]]) -> str: + failed_count = sum( + result["status"] == SyncStatus.FAILED.value for result in results + ) + if failed_count == len(results): + return "failed" + if failed_count: + return "partial_failed" + return "succeeded" + + +def _result_errors(results: list[dict[str, object]]) -> list[str]: + return [ + f"{result['data_type']}: {result['error']}" + for result in results + if "error" in result + ] + + +def _parse_request(run: WorkflowRun) -> GoogleHealthIngestRequest: + summary = run.result_summary or {} + raw = summary.get("request") + if not isinstance(raw, dict): + raise ValueError("invalid_request: Google Health run input is required") + try: + mode = GoogleHealthRunMode(str(raw["mode"])) + date_from = date.fromisoformat(str(raw["from"])) + date_to = date.fromisoformat(str(raw["to"])) + except (KeyError, TypeError, ValueError) as exc: + raise ValueError("invalid_request: invalid mode or range") from exc + if date_from >= date_to: + raise ValueError("invalid_range: from must be earlier than to") + + requested_types = raw.get("data_types") + if requested_types: + if not isinstance(requested_types, list): + raise ValueError("invalid_data_types: list is required") + unknown = set(requested_types) - DATA_TYPE_BY_NAME.keys() + if unknown: + raise ValueError( + f"invalid_data_types: unsupported values: {', '.join(sorted(unknown))}" + ) + data_types = tuple(dict.fromkeys(str(item) for item in requested_types)) + else: + data_types = tuple(item.name for item in DATA_TYPES) + return GoogleHealthIngestRequest( + mode=mode, + date_from=date_from, + date_to=date_to, + data_types=data_types, + ) + + +def _build_dependencies() -> GoogleHealthWorkflowDependencies: + config = PipelinesConfig() + if not config.google_health_is_configured: + raise ValueError("google_health_oauth_configuration_required") + conn = connect(config.database_path) + try: + initialize_schema(conn) + repository = GoogleHealthRepository(conn) + encryption_key = cast(SecretStr, config.google_health_token_encryption_key) + client_id = cast(SecretStr, config.google_health_client_id) + client_secret = cast(SecretStr, config.google_health_client_secret) + cipher = TokenCipher(encryption_key.get_secret_value()) + client = GoogleHealthAPIClient( + repository, + cipher, + client_id=client_id.get_secret_value(), + client_secret=client_secret.get_secret_value(), + timezone=ZoneInfo(config.timezone), + ) + + source_config = PipelinesSettings.load() + if source_config.duckdb is None or source_config.duckdb.r2 is None: + raise ValueError("R2 configuration is required for google health pipeline") + r2 = source_config.duckdb.r2 + return GoogleHealthWorkflowDependencies( + repository=repository, + extractor=GoogleHealthExtractor( + client, + timezone=ZoneInfo(config.timezone), + ), + writer=GoogleHealthWriter( + endpoint_url=r2.endpoint_url, + access_key_id=r2.access_key_id, + secret_access_key=r2.secret_access_key.get_secret_value(), + bucket_name=r2.bucket_name, + raw_path=r2.raw_path, + events_path=r2.events_path, + timezone=ZoneInfo(config.timezone), + ), + timezone=ZoneInfo(config.timezone), + db_connection=conn, + ) + except Exception: + conn.close() + raise + + +def _short_error(exc: Exception) -> str: + message = next( + (line.strip() for line in str(exc).splitlines() if line.strip()), + None, + ) + return message or exc.__class__.__name__ diff --git a/egograph/pipelines/sources/google_health/writer.py b/egograph/pipelines/sources/google_health/writer.py new file mode 100644 index 00000000..3d7f8131 --- /dev/null +++ b/egograph/pipelines/sources/google_health/writer.py @@ -0,0 +1,292 @@ +"""Google Health Raw JSON、events、compacted Parquetの保存。""" + +from __future__ import annotations + +import json +from datetime import date, datetime, timedelta +from io import BytesIO +from typing import Any +from zoneinfo import ZoneInfo + +import boto3 +import pandas as pd +from botocore.exceptions import ClientError + +from pipelines.sources.common.compaction import ( + COMPACTED_ROOT, + dataframe_to_parquet_bytes, +) +from pipelines.sources.google_health.timezone import ( + local_date, + local_date_start_utc, +) + +DATASET_DATE_COLUMNS = { + "daily_metrics": "date", + "samples": "measured_at_utc", + "intervals": "started_at_utc", + "sessions": "started_at_utc", +} + + +class GoogleHealthWriter: + """Google HealthのR2保存と対象期間compactionを担う。""" + + def __init__( + self, + *, + endpoint_url: str, + access_key_id: str, + secret_access_key: str, + bucket_name: str, + raw_path: str = "raw/", + events_path: str = "events/", + compacted_path: str = COMPACTED_ROOT, + timezone: ZoneInfo | None = None, + ) -> None: + self.bucket_name = bucket_name + self.raw_path = _normalize_path(raw_path) + self.events_path = _normalize_path(events_path) + self.compacted_path = _normalize_path(compacted_path) + self.timezone = timezone or ZoneInfo("UTC") + self.s3 = boto3.client( + "s3", + endpoint_url=endpoint_url, + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, + region_name="auto", + ) + + def save_raw( + self, + *, + connection_id: str, + data_type: str, + date_from: date, + date_to: date, + run_id: str, + payload: dict[str, Any], + ) -> str: + """data type単位のAPIレスポンス原本を保存する。""" + key = ( + f"{self.raw_path}google_health/" + f"connection_id={connection_id}/data_type={data_type}/" + f"from={date_from.isoformat()}/to={date_to.isoformat()}/" + f"run_id={run_id}.json" + ) + self.s3.put_object( + Bucket=self.bucket_name, + Key=key, + Body=json.dumps( + payload, + ensure_ascii=False, + separators=(",", ":"), + ).encode(), + ContentType="application/json", + ) + return key + + def save_events( + self, + *, + run_id: str, + records: dict[str, list[dict[str, Any]]], + ) -> list[str]: + """今回runの正規化行をUUID名のevents Parquetへ保存する。""" + saved_keys: list[str] = [] + for dataset, date_column in DATASET_DATE_COLUMNS.items(): + rows_by_month: dict[tuple[int, int], list[dict[str, Any]]] = {} + for row in records.get(dataset, []): + month = _row_month(row, date_column) + rows_by_month.setdefault(month, []).append(row) + for (year, month), rows in sorted(rows_by_month.items()): + key = ( + f"{self.events_path}google_health/{dataset}/" + f"year={year}/month={month:02d}/{run_id}.parquet" + ) + self.s3.put_object( + Bucket=self.bucket_name, + Key=key, + Body=_parquet_bytes(rows), + ContentType="application/octet-stream", + ) + saved_keys.append(key) + return saved_keys + + def compact_range( + self, + *, + connection_id: str, + selected_data_types: tuple[str, ...], + date_from: date, + date_to: date, + run_id: str, + ) -> list[str]: + """既存compactedの対象範囲を今回runのeventsで置換する。""" + compacted_keys: list[str] = [] + for dataset, date_column in DATASET_DATE_COLUMNS.items(): + months = set( + _target_months( + dataset, + date_from=date_from, + date_to=date_to, + timezone=self.timezone, + ) + ) + if dataset == "sessions" and "sleep" in selected_data_types: + sleep_start = local_date_start_utc( + date_from - timedelta(days=1), + self.timezone, + ) + months.add((sleep_start.year, sleep_start.month)) + for year, month in sorted(months): + compacted_key = self._compacted_key(dataset, year, month) + existing = self._load_parquet(compacted_key) + retained = _retain_outside_target( + existing, + connection_id=connection_id, + selected_data_types=selected_data_types, + date_column=date_column, + date_from=date_from, + date_to=date_to, + timezone=self.timezone, + ) + event_key = self._event_key(dataset, year, month, run_id) + current = self._load_parquet(event_key) + merged = [*retained, *current] + if not merged: + self._delete_if_exists(compacted_key) + continue + self.s3.put_object( + Bucket=self.bucket_name, + Key=compacted_key, + Body=_parquet_bytes(merged), + ContentType="application/octet-stream", + ) + compacted_keys.append(compacted_key) + return compacted_keys + + def _event_key( + self, + dataset: str, + year: int, + month: int, + run_id: str, + ) -> str: + return ( + f"{self.events_path}google_health/{dataset}/" + f"year={year}/month={month:02d}/{run_id}.parquet" + ) + + def _compacted_key(self, dataset: str, year: int, month: int) -> str: + return ( + f"{self.compacted_path}events/google_health/{dataset}/" + f"year={year}/month={month:02d}/data.parquet" + ) + + def _load_parquet(self, key: str) -> list[dict[str, Any]]: + try: + response = self.s3.get_object(Bucket=self.bucket_name, Key=key) + except ClientError as exc: + if exc.response["Error"]["Code"] in {"NoSuchKey", "404"}: + return [] + raise + return pd.read_parquet(BytesIO(response["Body"].read())).to_dict( + orient="records" + ) + + def _delete_if_exists(self, key: str) -> None: + try: + self.s3.delete_object(Bucket=self.bucket_name, Key=key) + except ClientError as exc: + if exc.response["Error"]["Code"] not in {"NoSuchKey", "404"}: + raise + + +def _retain_outside_target( + rows: list[dict[str, Any]], + *, + connection_id: str, + selected_data_types: tuple[str, ...], + date_column: str, + date_from: date, + date_to: date, + timezone: ZoneInfo, +) -> list[dict[str, Any]]: + retained = [] + selected = set(selected_data_types) + for row in rows: + target_column = ( + "ended_at_utc" + if date_column == "started_at_utc" + and row.get("data_type") == "sleep" + and "ended_at_utc" in row + else date_column + ) + row_date = _target_date(row[target_column], date_column, timezone) + is_target = ( + row.get("connection_id") == connection_id + and row.get("data_type") in selected + and date_from <= row_date < date_to + ) + if not is_target: + retained.append(row) + return retained + + +def _row_month(row: dict[str, Any], date_column: str) -> tuple[int, int]: + value = _as_date(row[date_column]) + return value.year, value.month + + +def _as_date(value: Any) -> date: + if isinstance(value, datetime): + return value.date() + if isinstance(value, date): + return value + return pd.Timestamp(value).date() + + +def _target_date( + value: Any, + date_column: str, + timezone: ZoneInfo, +) -> date: + if date_column == "date": + return _as_date(value) + timestamp = pd.Timestamp(value) + if timestamp.tzinfo is None: + timestamp = timestamp.tz_localize("UTC") + return local_date(timestamp.to_pydatetime(), timezone) + + +def _target_months( + dataset: str, + *, + date_from: date, + date_to: date, + timezone: ZoneInfo, +): + if dataset == "daily_metrics": + current = date(date_from.year, date_from.month, 1) + limit = date_to + else: + start_utc = local_date_start_utc(date_from, timezone) + end_utc = local_date_start_utc(date_to, timezone) + current = date(start_utc.year, start_utc.month, 1) + limit = end_utc.date() + timedelta(days=1) + while current < limit: + yield current.year, current.month + current = ( + date(current.year + 1, 1, 1) + if current.month == 12 + else date(current.year, current.month + 1, 1) + ) + + +def _normalize_path(path: str) -> str: + return path.rstrip("/") + "/" + + +def _parquet_bytes(rows: list[dict[str, Any]]) -> bytes: + return dataframe_to_parquet_bytes(pd.DataFrame(rows)) diff --git a/egograph/pipelines/tests/integration/google_health/__init__.py b/egograph/pipelines/tests/integration/google_health/__init__.py new file mode 100644 index 00000000..c7a6eb45 --- /dev/null +++ b/egograph/pipelines/tests/integration/google_health/__init__.py @@ -0,0 +1 @@ +"""Google Health統合テスト。""" diff --git a/egograph/pipelines/tests/integration/google_health/test_pipeline.py b/egograph/pipelines/tests/integration/google_health/test_pipeline.py new file mode 100644 index 00000000..d583cd16 --- /dev/null +++ b/egograph/pipelines/tests/integration/google_health/test_pipeline.py @@ -0,0 +1,245 @@ +"""Google Health取り込みの統合テスト。""" + +from datetime import UTC, datetime, timedelta +from io import BytesIO +from unittest.mock import patch + +import pandas as pd +from botocore.exceptions import ClientError + +from pipelines.domain.workflow import ( + QueuedReason, + TriggerType, + WorkflowRun, + WorkflowRunStatus, +) +from pipelines.infrastructure.db.connection import connect +from pipelines.infrastructure.db.schema import initialize_schema +from pipelines.sources.google_health.extractor import ExtractedGoogleHealthData +from pipelines.sources.google_health.models import OAuthToken, SyncStatus +from pipelines.sources.google_health.repository import GoogleHealthRepository +from pipelines.sources.google_health.workflow import ( + GoogleHealthWorkflowDependencies, + run_google_health_compact, + run_google_health_ingest, +) +from pipelines.sources.google_health.writer import GoogleHealthWriter + + +class MemoryS3: + """統合テスト用in-memory S3。""" + + def __init__(self): + self.objects = {} + + def put_object(self, *, Bucket, Key, Body, ContentType): # noqa: N803 + self.objects[Key] = Body + + def get_object(self, *, Bucket, Key): # noqa: N803 + if Key not in self.objects: + raise ClientError({"Error": {"Code": "NoSuchKey"}}, "GetObject") + return {"Body": BytesIO(self.objects[Key])} + + def delete_object(self, *, Bucket, Key): # noqa: N803 + self.objects.pop(Key, None) + + def get_paginator(self, name): + assert name == "list_objects_v2" + objects = self.objects + + class Paginator: + def paginate(self, *, Bucket, Prefix): # noqa: N803 + yield { + "Contents": [ + {"Key": key} for key in objects if key.startswith(Prefix) + ] + } + + return Paginator() + + +class FixtureExtractor: + """3種類のrecordを返すfixture extractor。""" + + def extract(self, *, data_type, **kwargs): + payloads = { + "steps": { + "reconcileResponses": [ + { + "dataPoints": [ + { + "steps": { + "interval": { + "startTime": "2026-06-01T00:00:00Z", + "endTime": "2026-06-01T00:05:00Z", + }, + "count": 120, + } + } + ] + } + ], + "dailyRollupResponses": [ + { + "rollupDataPoints": [ + { + "civilStartTime": { + "date": { + "year": 2026, + "month": 6, + "day": 1, + } + }, + "steps": {"countSum": "1000"}, + } + ] + } + ], + }, + "heart-rate": { + "reconcileResponses": [ + { + "dataPoints": [ + { + "heartRate": { + "sampleTime": { + "physicalTime": "2026-06-01T01:00:00Z" + }, + "beatsPerMinute": 72, + } + } + ] + } + ], + "dailyRollupResponses": [], + }, + "sleep": { + "reconcileResponses": [ + { + "dataPoints": [ + { + "dataPointName": "sleep-1", + "sleep": { + "interval": { + "startTime": "2026-05-31T23:00:00Z", + "endTime": "2026-06-01T07:00:00Z", + }, + "type": "SLEEP", + }, + } + ] + } + ], + "dailyRollupResponses": [], + }, + } + payload = payloads[data_type.name] + count = sum( + len(response.get("dataPoints", [])) + for response in payload["reconcileResponses"] + ) + sum( + len(response.get("rollupDataPoints", [])) + for response in payload["dailyRollupResponses"] + ) + return ExtractedGoogleHealthData(payload=payload, record_count=count) + + +def _run(): + return WorkflowRun( + run_id="f62ef091-9372-4e14-b129-55729525bd78", + workflow_id="google_health_ingest_workflow", + trigger_type=TriggerType.MANUAL, + queued_reason=QueuedReason.MANUAL_REQUEST, + status=WorkflowRunStatus.RUNNING, + scheduled_at=None, + queued_at=datetime(2026, 6, 4, tzinfo=UTC), + started_at=datetime(2026, 6, 4, tzinfo=UTC), + finished_at=None, + last_error_message=None, + requested_by="api", + parent_run_id=None, + result_summary={ + "request": { + "mode": "data_type_range", + "from": "2026-06-01", + "to": "2026-06-03", + "data_types": ["steps", "heart-rate", "sleep"], + } + }, + ) + + +def test_ingest_saves_raw_all_parquet_kinds_and_sync_state( + tmp_path, + monkeypatch, +): + """1 runでRaw、4 Parquet、data type別sync結果を保存する。""" + # Arrange + conn = connect(tmp_path / "state.sqlite3") + initialize_schema(conn) + repository = GoogleHealthRepository(conn) + token = OAuthToken( + access_token="access-token", + refresh_token="refresh-token", + expires_at=datetime.now(tz=UTC) + timedelta(hours=1), + token_type="Bearer", + scopes=("scope",), + ) + repository.save_connection( + token=token, + access_token_encrypted=b"encrypted-access", + refresh_token_encrypted=b"encrypted-refresh", + ) + memory_s3 = MemoryS3() + with patch( + "pipelines.sources.google_health.writer.boto3.client", + return_value=memory_s3, + ): + writer = GoogleHealthWriter( + endpoint_url="https://r2.example.test", + access_key_id="key", + secret_access_key="secret", + bucket_name="bucket", + ) + monkeypatch.setattr( + "pipelines.sources.google_health.workflow._build_dependencies", + lambda: GoogleHealthWorkflowDependencies( + repository=repository, + extractor=FixtureExtractor(), + writer=writer, + ), + ) + + # Act + ingest_result = run_google_health_ingest(_run()) + compact_result = run_google_health_compact(_run()) + + # Assert + assert ingest_result["status"] == "succeeded" + assert compact_result["status"] == "succeeded" + raw_keys = [key for key in memory_s3.objects if key.startswith("raw/")] + assert len(raw_keys) == 3 + for dataset in ("daily_metrics", "samples", "intervals"): + event_key = ( + f"events/google_health/{dataset}/year=2026/month=06/" + "f62ef091-9372-4e14-b129-55729525bd78.parquet" + ) + compacted_key = ( + f"compacted/events/google_health/{dataset}/year=2026/month=06/data.parquet" + ) + assert event_key in memory_s3.objects + assert compacted_key in memory_s3.objects + assert not pd.read_parquet(BytesIO(memory_s3.objects[compacted_key])).empty + session_prefix = "events/google_health/sessions/year=2026/month=05/" + session_keys = [key for key in memory_s3.objects if key.startswith(session_prefix)] + assert len(session_keys) == 1 + assert not pd.read_parquet(BytesIO(memory_s3.objects[session_keys[0]])).empty + compacted_session_key = ( + "compacted/events/google_health/sessions/year=2026/month=05/data.parquet" + ) + assert compacted_session_key in memory_s3.objects + for data_type in ("steps", "heart-rate", "sleep"): + cursor = repository.get_sync_cursor("google-health-primary", data_type) + assert cursor is not None + assert cursor.status is SyncStatus.SUCCESS + assert cursor.record_count > 0 diff --git a/egograph/pipelines/tests/support/dummy_steps.py b/egograph/pipelines/tests/support/dummy_steps.py index dc1c9096..9f851ccf 100644 --- a/egograph/pipelines/tests/support/dummy_steps.py +++ b/egograph/pipelines/tests/support/dummy_steps.py @@ -32,3 +32,11 @@ def sleep_briefly() -> dict: """並列実行テスト用に短時間 sleep する。""" time.sleep(0.5) return {"slept": True} + + +def partial_failure() -> dict: + """部分失敗summaryを返すテストstep。""" + return { + "status": "partial_failed", + "errors": ["sleep: unavailable"], + } diff --git a/egograph/pipelines/tests/unit/google_health/test_api.py b/egograph/pipelines/tests/unit/google_health/test_api.py index 2c49c950..6653f726 100644 --- a/egograph/pipelines/tests/unit/google_health/test_api.py +++ b/egograph/pipelines/tests/unit/google_health/test_api.py @@ -1,15 +1,22 @@ """Google Health connection API のテスト。""" import logging +import os +from datetime import date, datetime, timedelta +from unittest.mock import patch from urllib.parse import parse_qs, urlparse +from zoneinfo import ZoneInfo +import pytest from cryptography.fernet import Fernet from fastapi.testclient import TestClient +from pipelines.api.google_health import GoogleHealthRunRequest from pipelines.app import create_app from pipelines.config import PipelinesConfig from pipelines.infrastructure.logging_filters import OAuthCallbackAccessLogFilter from pipelines.sources.google_health.auth import DEFAULT_SCOPES from pipelines.sources.google_health.client import GoogleHealthRateLimitError +from pipelines.sources.google_health.models import ConnectionStatus from pydantic import SecretStr @@ -164,6 +171,35 @@ def test_google_health_configuration_rejects_blank_values(tmp_path): assert config.google_health_is_configured is False +def test_pipelines_config_reads_shared_timezone(tmp_path): + """PipelinesもBackendと同じTIMEZONEを読み込む。""" + # Arrange + with patch.dict( + os.environ, + { + "TIMEZONE": "Asia/Tokyo", + "PIPELINES_DATABASE_PATH": str(tmp_path / "state.sqlite3"), + }, + clear=True, + ): + # Act + config = PipelinesConfig(_env_file=None) + + # Assert + assert config.timezone == "Asia/Tokyo" + + +def test_pipelines_config_rejects_invalid_timezone(tmp_path): + """存在しないタイムゾーン名を起動設定として受理しない。""" + # Arrange / Act / Assert + with patch.dict(os.environ, {"TIMEZONE": "Mars/Olympus"}, clear=True): + with pytest.raises(ValueError, match="invalid timezone"): + PipelinesConfig( + database_path=tmp_path / "state.sqlite3", + _env_file=None, + ) + + def test_oauth_callback_access_log_redacts_query_string(): """OAuth callback の code/state は access log で伏せる。""" # Arrange @@ -190,3 +226,146 @@ def test_oauth_callback_access_log_redacts_query_string(): assert "secret" not in record.getMessage() assert "state=state" not in record.getMessage() assert "[REDACTED]" in record.getMessage() + + +def test_create_range_ingest_run_preserves_request_context(tmp_path): + """range run作成APIはclosed-open期間をrunへ保存する。""" + # Arrange + app = create_app(_config(tmp_path)) + service = app.state.service + service.google_health_repository.get_connection = lambda: type( + "Connection", + (), + {"status": ConnectionStatus.ACTIVE}, + )() + + # Act + with TestClient(app) as client: + response = client.post( + "/v1/sources/google-health/runs", + headers={"X-API-Key": "api-key"}, + json={ + "mode": "range", + "from": "2026-06-01", + "to": "2026-06-03", + }, + ) + + # Assert + assert response.status_code == 201 + run = service.run_repository.get_run(response.json()["run_id"]) + assert run.workflow_id == "google_health_ingest_workflow" + assert run.result_summary == { + "request": { + "mode": "range", + "from": "2026-06-01", + "to": "2026-06-03", + "data_types": [], + } + } + + +def test_create_initial_backfill_resolves_ninety_day_range(tmp_path): + """initial_backfillは実行時点までの90日をrunへ保存する。""" + # Arrange + app = create_app(_config(tmp_path)) + service = app.state.service + service.google_health_repository.get_connection = lambda: type( + "Connection", + (), + {"status": ConnectionStatus.ACTIVE}, + )() + + # Act + with TestClient(app) as client: + response = client.post( + "/v1/sources/google-health/runs", + headers={"X-API-Key": "api-key"}, + json={"mode": "initial_backfill"}, + ) + + # Assert + assert response.status_code == 201 + run = service.run_repository.get_run(response.json()["run_id"]) + request = run.result_summary["request"] + assert request["mode"] == "initial_backfill" + assert date.fromisoformat(request["to"]) - date.fromisoformat( + request["from"] + ) == timedelta(days=90) + assert request["data_types"] == [] + + +def test_initial_backfill_uses_configured_local_date(): + """initial_backfillの今日を設定タイムゾーンで判定する。""" + # Arrange + request = GoogleHealthRunRequest(mode="initial_backfill") + now = datetime(2026, 6, 1, 15, 30, tzinfo=ZoneInfo("UTC")) + + # Act + result = request.to_run_input( + timezone=ZoneInfo("Asia/Tokyo"), + now=now, + ) + + # Assert + assert result["to"] == "2026-06-03" + assert result["from"] == "2026-03-05" + + +def test_create_data_type_range_preserves_selected_types(tmp_path): + """data_type_rangeは指定data typeと期間をrunへ保存する。""" + # Arrange + app = create_app(_config(tmp_path)) + service = app.state.service + service.google_health_repository.get_connection = lambda: type( + "Connection", + (), + {"status": ConnectionStatus.ACTIVE}, + )() + + # Act + with TestClient(app) as client: + response = client.post( + "/v1/sources/google-health/runs", + headers={"X-API-Key": "api-key"}, + json={ + "mode": "data_type_range", + "from": "2026-06-01", + "to": "2026-06-03", + "data_types": ["steps", "sleep"], + }, + ) + + # Assert + assert response.status_code == 201 + run = service.run_repository.get_run(response.json()["run_id"]) + assert run.result_summary == { + "request": { + "mode": "data_type_range", + "from": "2026-06-01", + "to": "2026-06-03", + "data_types": ["steps", "sleep"], + } + } + + +def test_create_data_type_range_requires_data_types(tmp_path): + """data_type_rangeは対象data typeなしでは受理しない。""" + # Arrange + app = create_app(_config(tmp_path)) + + # Act + with TestClient(app) as client: + response = client.post( + "/v1/sources/google-health/runs", + headers={"X-API-Key": "api-key"}, + json={ + "mode": "data_type_range", + "from": "2026-06-01", + "to": "2026-06-03", + }, + ) + + # Assert + assert response.status_code == 400 + assert response.json()["detail"].startswith("invalid_data_types:") diff --git a/egograph/pipelines/tests/unit/google_health/test_client.py b/egograph/pipelines/tests/unit/google_health/test_client.py index 8952097a..413c762f 100644 --- a/egograph/pipelines/tests/unit/google_health/test_client.py +++ b/egograph/pipelines/tests/unit/google_health/test_client.py @@ -1,6 +1,7 @@ """Google Health API client のテスト。""" -from datetime import UTC, datetime, timedelta +from datetime import UTC, date, datetime, timedelta +from zoneinfo import ZoneInfo import pytest import requests @@ -85,6 +86,108 @@ def test_list_data_points_uses_bearer_token(tmp_path): ) +def test_reconcile_data_points_sends_filter_and_page_token(tmp_path): + """reconcileは期間filterとpagination tokenを送信する。""" + # Arrange + repository, cipher, connection = _repository_with_token(tmp_path) + session = FakeSession([FakeResponse({"dataPoints": []})]) + client = GoogleHealthAPIClient(repository, cipher, session=session) + + # Act + client.reconcile_data_points( + connection.connection_id, + "steps", + filter_expression='steps.interval.start_time >= "2026-06-01T00:00:00Z"', + page_size=10000, + page_token="next-token", + ) + + # Assert + params = session.calls[0][2]["params"] + assert params["filter"] == 'steps.interval.start_time >= "2026-06-01T00:00:00Z"' + assert params["pageToken"] == "next-token" + assert params["pageSize"] == 10000 + assert params["dataSourceFamily"].endswith("/google-wearables") + + +def test_daily_rollup_sends_closed_open_civil_range(tmp_path): + """daily rollupはcivil dateのclosed-open範囲を送信する。""" + # Arrange + repository, cipher, connection = _repository_with_token(tmp_path) + session = FakeSession([FakeResponse({"rollupDataPoints": []})]) + client = GoogleHealthAPIClient(repository, cipher, session=session) + + # Act + client.daily_rollup( + connection.connection_id, + "steps", + date_from=date(2026, 6, 1), + date_to=date(2026, 6, 3), + ) + + # Assert + body = session.calls[0][2]["json"] + assert body["range"]["start"]["date"] == { + "year": 2026, + "month": 6, + "day": 1, + } + assert body["range"]["end"]["date"]["day"] == 3 + + +def test_rollup_sends_physical_range_and_window(tmp_path): + """physical rollupはRFC3339範囲とwindow sizeを送信する。""" + # Arrange + repository, cipher, connection = _repository_with_token(tmp_path) + session = FakeSession([FakeResponse({"rollupDataPoints": []})]) + client = GoogleHealthAPIClient(repository, cipher, session=session) + + # Act + client.rollup( + connection.connection_id, + "calories-in-heart-rate-zone", + date_from=date(2026, 6, 1), + date_to=date(2026, 6, 3), + window_size_seconds=300, + ) + + # Assert + body = session.calls[0][2]["json"] + assert body["range"] == { + "startTime": "2026-06-01T00:00:00Z", + "endTime": "2026-06-03T00:00:00Z", + } + assert body["windowSize"] == "300s" + + +def test_rollup_uses_configured_timezone_boundary(tmp_path): + """physical rollupは設定TZのローカル日付境界をUTCで送信する。""" + # Arrange + repository, cipher, connection = _repository_with_token(tmp_path) + session = FakeSession([FakeResponse({"rollupDataPoints": []})]) + client = GoogleHealthAPIClient( + repository, + cipher, + session=session, + timezone=ZoneInfo("Asia/Tokyo"), + ) + + # Act + client.rollup( + connection.connection_id, + "heart-rate", + date_from=date(2026, 6, 1), + date_to=date(2026, 6, 2), + window_size_seconds=300, + ) + + # Assert + assert session.calls[0][2]["json"]["range"] == { + "startTime": "2026-05-31T15:00:00Z", + "endTime": "2026-06-01T15:00:00Z", + } + + def test_expired_access_token_is_refreshed_before_request(tmp_path): """期限切れ access token は API 呼び出し前に refresh する。""" # Arrange diff --git a/egograph/pipelines/tests/unit/google_health/test_data_types.py b/egograph/pipelines/tests/unit/google_health/test_data_types.py new file mode 100644 index 00000000..c77298b5 --- /dev/null +++ b/egograph/pipelines/tests/unit/google_health/test_data_types.py @@ -0,0 +1,47 @@ +"""Google Health data type registryのテスト。""" + +from pipelines.sources.google_health.data_types import DATA_TYPE_BY_NAME, RecordKind + + +def test_registry_matches_fitbit_air_target_data_types(): + """計画で定義した28 data typeだけを取得対象にする。""" + # Arrange + expected = { + "steps", + "distance", + "total-calories", + "active-energy-burned", + "active-minutes", + "active-zone-minutes", + "activity-level", + "sedentary-period", + "calories-in-heart-rate-zone", + "time-in-heart-rate-zone", + "exercise", + "floors", + "altitude", + "swim-lengths-data", + "daily-vo2-max", + "vo2-max", + "run-vo2-max", + "heart-rate", + "daily-resting-heart-rate", + "heart-rate-variability", + "daily-heart-rate-variability", + "daily-heart-rate-zones", + "oxygen-saturation", + "daily-oxygen-saturation", + "respiratory-rate-sleep-summary", + "daily-respiratory-rate", + "daily-sleep-temperature-derivations", + "sleep", + } + + # Act & Assert + assert set(DATA_TYPE_BY_NAME) == expected + assert DATA_TYPE_BY_NAME["steps"].record_kind is RecordKind.INTERVAL + assert DATA_TYPE_BY_NAME["heart-rate"].record_kind is RecordKind.SAMPLE + assert DATA_TYPE_BY_NAME["sleep"].record_kind is RecordKind.SESSION + assert DATA_TYPE_BY_NAME["daily-oxygen-saturation"].record_kind is ( + RecordKind.DAILY + ) diff --git a/egograph/pipelines/tests/unit/google_health/test_extractor.py b/egograph/pipelines/tests/unit/google_health/test_extractor.py new file mode 100644 index 00000000..235c58db --- /dev/null +++ b/egograph/pipelines/tests/unit/google_health/test_extractor.py @@ -0,0 +1,176 @@ +"""Google Health extractorのテスト。""" + +from datetime import date +from zoneinfo import ZoneInfo + +import pytest +from pipelines.sources.google_health.data_types import DATA_TYPE_BY_NAME +from pipelines.sources.google_health.extractor import ( + GoogleHealthExtractor, + _build_filter, +) + + +class FakeClient: + """Extractorが使用するclientの記録用fake。""" + + def __init__(self): + self.reconcile_calls = [] + self.interval_rollup_calls = [] + self.rollup_calls = [] + + def reconcile_data_points(self, *args, **kwargs): + self.reconcile_calls.append((args, kwargs)) + if len(self.reconcile_calls) == 1: + return {"dataPoints": [{"steps": {}}], "nextPageToken": "next"} + return {"dataPoints": [{"steps": {}}]} + + def daily_rollup(self, *args, **kwargs): + self.rollup_calls.append((args, kwargs)) + return {"rollupDataPoints": [{"steps": {"countSum": "1"}}]} + + def rollup(self, *args, **kwargs): + self.interval_rollup_calls.append((args, kwargs)) + return { + "rollupDataPoints": [{"caloriesInHeartRateZone": {"kilocaloriesSum": "1"}}] + } + + +def test_extract_follows_pagination_and_collects_daily_rollup(): + """reconcileの全pageとdaily rollupを原本へ保持する。""" + # Arrange + client = FakeClient() + extractor = GoogleHealthExtractor(client) + + # Act + result = extractor.extract( + connection_id="connection-1", + data_type=DATA_TYPE_BY_NAME["steps"], + date_from=date(2026, 6, 1), + date_to=date(2026, 6, 3), + ) + + # Assert + assert len(client.reconcile_calls) == 2 + assert client.reconcile_calls[1][1]["page_token"] == "next" + assert ( + "steps.interval.start_time" + in (client.reconcile_calls[0][1]["filter_expression"]) + ) + assert len(result.payload["reconcileResponses"]) == 2 + assert len(result.payload["dailyRollupResponses"]) == 1 + assert result.record_count == 3 + + +def test_short_daily_rollup_is_split_into_fourteen_day_ranges(): + """制限対象data typeのdaily rollupは14日以下へ分割する。""" + # Arrange + client = FakeClient() + extractor = GoogleHealthExtractor(client) + + # Act + extractor.extract( + connection_id="connection-1", + data_type=DATA_TYPE_BY_NAME["calories-in-heart-rate-zone"], + date_from=date(2026, 1, 1), + date_to=date(2026, 2, 1), + ) + + # Assert + assert len(client.reconcile_calls) == 0 + assert len(client.interval_rollup_calls) == 3 + expected_ranges = [ + (date(2026, 1, 1), date(2026, 1, 15)), + (date(2026, 1, 15), date(2026, 1, 29)), + (date(2026, 1, 29), date(2026, 2, 1)), + ] + assert [ + (call[1]["date_from"], call[1]["date_to"]) + for call in client.interval_rollup_calls + ] == expected_ranges + assert [ + (call[1]["date_from"], call[1]["date_to"]) for call in client.rollup_calls + ] == expected_ranges + + +def test_daily_filter_uses_proto_field_name(): + """daily reconcile filterはsnake_caseのproto field名を使う。""" + # Act + result = _build_filter( + DATA_TYPE_BY_NAME["daily-resting-heart-rate"], + date(2026, 6, 1), + date(2026, 6, 3), + ) + + # Assert + assert result == ( + 'daily_resting_heart_rate.date >= "2026-06-01" AND ' + 'daily_resting_heart_rate.date < "2026-06-03"' + ) + + +def test_physical_filter_uses_configured_timezone_boundary(): + """物理時刻filterは設定TZのローカル日付境界をUTCへ変換する。""" + # Act + result = _build_filter( + DATA_TYPE_BY_NAME["heart-rate"], + date(2026, 6, 1), + date(2026, 6, 2), + timezone=ZoneInfo("Asia/Tokyo"), + ) + + # Assert + assert result == ( + 'heart_rate.sample_time.physical_time >= "2026-05-31T15:00:00Z" AND ' + 'heart_rate.sample_time.physical_time < "2026-06-01T15:00:00Z"' + ) + + +def test_extract_rejects_empty_or_reversed_range(): + """空または逆転した期間をAPIへ送信しない。""" + # Arrange + extractor = GoogleHealthExtractor(FakeClient()) + + # Act & Assert + with pytest.raises(ValueError, match="date_from must be earlier"): + extractor.extract( + connection_id="connection-1", + data_type=DATA_TYPE_BY_NAME["steps"], + date_from=date(2026, 6, 3), + date_to=date(2026, 6, 3), + ) + with pytest.raises(ValueError, match="date_from must be earlier"): + extractor.extract( + connection_id="connection-1", + data_type=DATA_TYPE_BY_NAME["steps"], + date_from=date(2026, 6, 4), + date_to=date(2026, 6, 3), + ) + + +@pytest.mark.parametrize( + "data_type_name", + ["steps", "total-calories", "calories-in-heart-rate-zone"], +) +def test_extract_rejects_repeated_page_token(data_type_name): + """同じpage tokenが再返却された場合は無限取得を防止する。""" + + class RepeatingTokenClient(FakeClient): + def reconcile_data_points(self, *args, **kwargs): + return {"dataPoints": [], "nextPageToken": "repeated"} + + def daily_rollup(self, *args, **kwargs): + return {"rollupDataPoints": [], "nextPageToken": "repeated"} + + def rollup(self, *args, **kwargs): + return {"rollupDataPoints": [], "nextPageToken": "repeated"} + + extractor = GoogleHealthExtractor(RepeatingTokenClient()) + + with pytest.raises(RuntimeError, match="google_health_repeated_page_token"): + extractor.extract( + connection_id="connection-1", + data_type=DATA_TYPE_BY_NAME[data_type_name], + date_from=date(2026, 6, 1), + date_to=date(2026, 6, 2), + ) diff --git a/egograph/pipelines/tests/unit/google_health/test_normalizer.py b/egograph/pipelines/tests/unit/google_health/test_normalizer.py new file mode 100644 index 00000000..98849a20 --- /dev/null +++ b/egograph/pipelines/tests/unit/google_health/test_normalizer.py @@ -0,0 +1,240 @@ +"""Google Health normalizerのテスト。""" + +from datetime import UTC, datetime +from zoneinfo import ZoneInfo + +from pipelines.sources.google_health.data_types import DATA_TYPE_BY_NAME +from pipelines.sources.google_health.normalizer import ( + aggregate_daily_metrics, + normalize_google_health_payload, +) + + +def _normalize(data_type, point, *, rollup=False): + return normalize_google_health_payload( + connection_id="connection-1", + data_type=DATA_TYPE_BY_NAME[data_type], + payload={ + "reconcileResponses": [] if rollup else [{"dataPoints": [point]}], + "dailyRollupResponses": ([{"rollupDataPoints": [point]}] if rollup else []), + }, + raw_ref="raw/google_health/example.json", + ingested_at=datetime(2026, 6, 4, tzinfo=UTC), + ) + + +def test_normalizes_sample_interval_session_and_daily_records(): + """4種類のParquet schemaへ必要な列を変換する。""" + # Arrange & Act + sample = _normalize( + "heart-rate", + { + "heartRate": { + "sampleTime": {"physicalTime": "2026-06-01T01:00:00Z"}, + "beatsPerMinute": 72, + }, + "dataSource": {"device": {"displayName": "Fitbit Air"}}, + }, + ) + interval = _normalize( + "steps", + { + "steps": { + "interval": { + "startTime": "2026-06-01T00:00:00Z", + "endTime": "2026-06-01T00:05:00Z", + }, + "count": 120, + } + }, + ) + session = _normalize( + "sleep", + { + "dataPointName": "sleep-1", + "sleep": { + "interval": { + "startTime": "2026-05-31T23:00:00Z", + "endTime": "2026-06-01T07:00:00Z", + }, + "type": "SLEEP", + }, + }, + ) + daily = _normalize( + "steps", + { + "civilStartTime": {"date": {"year": 2026, "month": 6, "day": 1}}, + "steps": {"countSum": "1000"}, + }, + rollup=True, + ) + + # Assert + assert sample["samples"][0]["value"] == 72 + assert sample["samples"][0]["device_family"] == "fitbit_air" + assert interval["intervals"][0]["value"] == 120 + assert session["sessions"][0]["duration_seconds"] == 8 * 60 * 60 + assert session["daily_metrics"][0]["metric_name"] == "sleep_duration" + assert daily["daily_metrics"][0]["value"] == 1000 + assert daily["daily_metrics"][0]["raw_ref"].startswith("raw/") + + +def test_aggregate_daily_metrics_sums_multiple_sessions(): + """同日の複数sessionから作った日次値を1行へ集約する。""" + # Arrange + first = _normalize( + "exercise", + { + "exercise": { + "interval": { + "startTime": "2026-06-01T01:00:00Z", + "endTime": "2026-06-01T01:30:00Z", + } + } + }, + )["daily_metrics"][0] + second = {**first, "value": 900.0} + + # Act + result = aggregate_daily_metrics([first, second]) + + # Assert + assert len(result) == 1 + assert result[0]["value"] == 2700 + + +def test_derived_daily_date_uses_configured_timezone_and_keeps_utc_timestamp(): + """派生日次は設定TZの日付、時刻列はUTCで保存する。""" + # Arrange + point = { + "dataPointName": "sleep-1", + "sleep": { + "interval": { + "startTime": "2026-05-31T15:30:00Z", + "endTime": "2026-05-31T23:00:00Z", + }, + "type": "SLEEP", + }, + } + + # Act + result = normalize_google_health_payload( + connection_id="connection-1", + data_type=DATA_TYPE_BY_NAME["sleep"], + payload={"reconcileResponses": [{"dataPoints": [point]}]}, + raw_ref="raw/example.json", + timezone=ZoneInfo("Asia/Tokyo"), + ) + + # Assert + assert result["daily_metrics"][0]["date"].isoformat() == "2026-06-01" + assert result["sessions"][0]["ended_at_utc"] == datetime( + 2026, + 5, + 31, + 23, + tzinfo=UTC, + ) + + +def test_normalizes_physical_rollup_and_averages_respiratory_daily(): + """physical rollupをintervalへ、呼吸数sampleを日次平均へ変換する。""" + # Arrange + interval = normalize_google_health_payload( + connection_id="connection-1", + data_type=DATA_TYPE_BY_NAME["calories-in-heart-rate-zone"], + payload={ + "reconcileResponses": [], + "rollupResponses": [ + { + "rollupDataPoints": [ + { + "startTime": "2026-06-01T00:00:00Z", + "endTime": "2026-06-01T00:05:00Z", + "caloriesInHeartRateZone": {"kilocaloriesSum": "2.5"}, + } + ] + } + ], + "dailyRollupResponses": [], + }, + raw_ref="raw/example.json", + ) + respiratory = [ + { + **_normalize( + "respiratory-rate-sleep-summary", + { + "respiratoryRateSleepSummary": { + "sampleTime": {"physicalTime": f"2026-06-01T0{hour}:00:00Z"}, + "breathsPerMinute": value, + } + }, + )["daily_metrics"][0] + } + for hour, value in ((1, 12), (2, 16)) + ] + + # Act + daily = aggregate_daily_metrics(respiratory) + + # Assert + assert interval["intervals"][0]["value"] == 2.5 + assert daily[0]["value"] == 14 + + +def test_normalizes_categorical_and_duration_only_intervals(): + """数値を持たないintervalをlevelまたは継続秒へ変換する。""" + # Arrange & Act + activity_level = _normalize( + "activity-level", + { + "activityLevel": { + "interval": { + "startTime": "2026-06-01T00:00:00Z", + "endTime": "2026-06-01T00:05:00Z", + }, + "activityLevelType": "MODERATE", + } + }, + ) + sedentary = _normalize( + "sedentary-period", + { + "sedentaryPeriod": { + "interval": { + "startTime": "2026-06-01T00:00:00Z", + "endTime": "2026-06-01T00:10:00Z", + } + } + }, + ) + + # Assert + assert activity_level["intervals"][0]["value"] == 3 + assert sedentary["intervals"][0]["value"] == 600 + + +def test_skips_records_with_invalid_date_or_datetime(): + """不正なAPI日時を含む行だけをスキップする。""" + invalid_sample = _normalize( + "heart-rate", + { + "heartRate": { + "sampleTime": {"physicalTime": "invalid"}, + "beatsPerMinute": 72, + } + }, + ) + invalid_daily = _normalize( + "steps", + { + "civilStartTime": "invalid", + "steps": {"countSum": "1000"}, + }, + rollup=True, + ) + + assert invalid_sample["samples"] == [] + assert invalid_daily["daily_metrics"] == [] diff --git a/egograph/pipelines/tests/unit/google_health/test_repository.py b/egograph/pipelines/tests/unit/google_health/test_repository.py index 2dfe24d1..eb775e78 100644 --- a/egograph/pipelines/tests/unit/google_health/test_repository.py +++ b/egograph/pipelines/tests/unit/google_health/test_repository.py @@ -1,11 +1,15 @@ """Google Health connection repository のテスト。""" -from datetime import UTC, datetime, timedelta +from datetime import UTC, date, datetime, timedelta import pytest from pipelines.infrastructure.db.connection import connect from pipelines.infrastructure.db.schema import initialize_schema -from pipelines.sources.google_health.models import ConnectionStatus, OAuthToken +from pipelines.sources.google_health.models import ( + ConnectionStatus, + OAuthToken, + SyncStatus, +) from pipelines.sources.google_health.repository import GoogleHealthRepository @@ -119,3 +123,53 @@ def test_update_status_preserves_supported_connection_state(repository): # Assert assert updated.status is ConnectionStatus.REVOKED assert updated.last_error_message == "refresh token revoked" + + +def test_save_sync_result_upserts_data_type_state(repository): + """data type単位の同期結果をSQLiteへupsertできる。""" + # Arrange + token = OAuthToken( + access_token="access-token", + refresh_token="refresh-token", + expires_at=datetime.now(tz=UTC) + timedelta(hours=1), + token_type="Bearer", + scopes=("scope-a",), + ) + connection = repository.save_connection( + token=token, + access_token_encrypted=b"encrypted-access", + refresh_token_encrypted=b"encrypted-refresh", + ) + + # Act + repository.save_sync_result( + connection_id=connection.connection_id, + data_type="steps", + status=SyncStatus.FAILED, + range_start=date(2026, 6, 1), + range_end=date(2026, 6, 3), + run_id="run-1", + error_message="temporary failure", + ) + updated = repository.save_sync_result( + connection_id=connection.connection_id, + data_type="steps", + status=SyncStatus.SUCCESS, + range_start=date(2026, 6, 1), + range_end=date(2026, 6, 3), + run_id="run-2", + record_count=42, + ) + run_results = repository.list_sync_results_for_run( + connection.connection_id, + "run-2", + ) + + # Assert + assert updated.status is SyncStatus.SUCCESS + assert updated.range_start == date(2026, 6, 1) + assert updated.range_end == date(2026, 6, 3) + assert updated.last_run_id == "run-2" + assert updated.record_count == 42 + assert updated.last_error_message is None + assert run_results == [updated] diff --git a/egograph/pipelines/tests/unit/google_health/test_workflow.py b/egograph/pipelines/tests/unit/google_health/test_workflow.py new file mode 100644 index 00000000..eb82674c --- /dev/null +++ b/egograph/pipelines/tests/unit/google_health/test_workflow.py @@ -0,0 +1,289 @@ +"""Google Health ingestion workflowのテスト。""" + +from datetime import UTC, date, datetime + +from pipelines.domain.workflow import ( + QueuedReason, + TriggerType, + WorkflowRun, + WorkflowRunStatus, +) +from pipelines.sources.google_health.extractor import ExtractedGoogleHealthData +from pipelines.sources.google_health.models import ( + ConnectionStatus, + GoogleHealthConnection, + GoogleHealthSyncCursor, + SyncStatus, +) +from pipelines.sources.google_health.workflow import ( + GoogleHealthWorkflowDependencies, + _short_error, + run_google_health_compact, + run_google_health_ingest, +) + + +def _run(data_types): + return WorkflowRun( + run_id="run-1", + workflow_id="google_health_ingest_workflow", + trigger_type=TriggerType.MANUAL, + queued_reason=QueuedReason.MANUAL_REQUEST, + status=WorkflowRunStatus.RUNNING, + scheduled_at=None, + queued_at=datetime(2026, 6, 4, tzinfo=UTC), + started_at=datetime(2026, 6, 4, tzinfo=UTC), + finished_at=None, + last_error_message=None, + requested_by="api", + parent_run_id=None, + result_summary={ + "request": { + "mode": "data_type_range", + "from": "2026-06-01", + "to": "2026-06-03", + "data_types": data_types, + } + }, + ) + + +class FakeRepository: + def __init__(self): + self.sync_results = [] + + def get_connection(self): + return GoogleHealthConnection( + connection_id="google-health-primary", + status=ConnectionStatus.ACTIVE, + scopes=(), + created_at=datetime(2026, 6, 1, tzinfo=UTC), + updated_at=datetime(2026, 6, 1, tzinfo=UTC), + last_error_message=None, + ) + + def save_sync_result(self, **kwargs): + self.sync_results.append(kwargs) + + def list_sync_results_for_run(self, connection_id, run_id): + return [ + GoogleHealthSyncCursor( + connection_id=item["connection_id"], + data_type=item["data_type"], + cursor=item.get("cursor"), + status=item["status"], + range_start=item["range_start"], + range_end=item["range_end"], + last_run_id=item["run_id"], + record_count=item["record_count"], + last_error_message=item["error_message"], + updated_at=datetime(2026, 6, 4, tzinfo=UTC), + ) + for item in self.sync_results + if item["connection_id"] == connection_id and item["run_id"] == run_id + ] + + +class FakeExtractor: + def extract(self, *, data_type, **kwargs): + if data_type.name == "sleep": + raise RuntimeError("sleep unavailable") + return ExtractedGoogleHealthData( + payload={ + "reconcileResponses": [ + { + "dataPoints": [ + { + "steps": { + "interval": { + "startTime": "2026-06-01T00:00:00Z", + "endTime": "2026-06-01T00:05:00Z", + }, + "count": 120, + } + } + ] + } + ], + "dailyRollupResponses": [], + }, + record_count=1, + ) + + +class FakeWriter: + def __init__(self): + self.event_calls = [] + self.compact_calls = [] + + def save_raw(self, **kwargs): + return f"raw/{kwargs['data_type']}.json" + + def save_events(self, **kwargs): + self.event_calls.append(kwargs) + return ["events/google_health/intervals/year=2026/month=06/run-1.parquet"] + + def compact_range(self, **kwargs): + self.compact_calls.append(kwargs) + return [ + "compacted/events/google_health/intervals/year=2026/month=06/data.parquet" + ] + + +def _dependencies(repository, writer, extractor=None): + return GoogleHealthWorkflowDependencies( + repository=repository, + extractor=extractor or FakeExtractor(), + writer=writer, + ) + + +def test_partial_failure_saves_successful_events_and_sync_results(monkeypatch): + """一部失敗時も成功data typeのeventsと各sync結果を保存する。""" + # Arrange + repository = FakeRepository() + writer = FakeWriter() + monkeypatch.setattr( + "pipelines.sources.google_health.workflow._build_dependencies", + lambda: _dependencies(repository, writer), + ) + + # Act + result = run_google_health_ingest(_run(["steps", "sleep"])) + + # Assert + assert result["status"] == "partial_failed" + assert result["record_count"] == 1 + assert len(writer.event_calls) == 1 + assert [item["status"] for item in repository.sync_results] == [ + SyncStatus.SUCCESS, + SyncStatus.FAILED, + ] + assert [item["record_count"] for item in repository.sync_results] == [1, 0] + + +def test_no_data_is_successful_and_writes_no_event_file(monkeypatch): + """データ0件はno_dataとして正常終了しeventsを作らない。""" + # Arrange + repository = FakeRepository() + writer = FakeWriter() + + class EmptyExtractor: + def extract(self, **kwargs): + return ExtractedGoogleHealthData( + payload={ + "reconcileResponses": [{"dataPoints": []}], + "dailyRollupResponses": [], + }, + record_count=0, + ) + + monkeypatch.setattr( + "pipelines.sources.google_health.workflow._build_dependencies", + lambda: _dependencies(repository, writer, EmptyExtractor()), + ) + + # Act + result = run_google_health_ingest(_run(["steps"])) + + # Assert + assert result["status"] == "succeeded" + assert result["data_types"][0]["status"] == "no_data" + assert writer.event_calls[0]["records"]["intervals"] == [] + + +def test_save_failure_reports_zero_saved_records(monkeypatch): + """events保存失敗時はメモリ上の正規化件数を返さない。""" + repository = FakeRepository() + + class FailingWriter(FakeWriter): + def save_events(self, **kwargs): + raise RuntimeError("save failed") + + monkeypatch.setattr( + "pipelines.sources.google_health.workflow._build_dependencies", + lambda: _dependencies(repository, FailingWriter()), + ) + + result = run_google_health_ingest(_run(["steps"])) + + assert result["status"] == "failed" + assert result["record_count"] == 0 + + +def test_compact_uses_only_successful_and_no_data_types(monkeypatch): + """失敗data typeを除外して成功・no_dataの範囲だけ置換する。""" + # Arrange + repository = FakeRepository() + writer = FakeWriter() + for data_type, status, count, error in ( + ("steps", SyncStatus.SUCCESS, 1, None), + ("heart-rate", SyncStatus.NO_DATA, 0, None), + ("sleep", SyncStatus.FAILED, 0, "sleep unavailable"), + ): + repository.save_sync_result( + connection_id="google-health-primary", + data_type=data_type, + status=status, + range_start=date(2026, 6, 1), + range_end=date(2026, 6, 3), + run_id="run-1", + record_count=count, + error_message=error, + ) + monkeypatch.setattr( + "pipelines.sources.google_health.workflow._build_dependencies", + lambda: _dependencies(repository, writer), + ) + + # Act + result = run_google_health_compact(_run(["steps", "heart-rate", "sleep"])) + + # Assert + assert result["status"] == "partial_failed" + assert writer.compact_calls[0]["selected_data_types"] == ( + "steps", + "heart-rate", + ) + assert result["compacted_keys"] == [ + "compacted/events/google_health/intervals/year=2026/month=06/data.parquet" + ] + + +def test_unrecognized_non_empty_payload_is_failed(monkeypatch): + """API件数があるのに正規化0件なら成功扱いにしない。""" + # Arrange + repository = FakeRepository() + writer = FakeWriter() + + class UnrecognizedExtractor: + def extract(self, **kwargs): + return ExtractedGoogleHealthData( + payload={ + "reconcileResponses": [{"dataPoints": [{"steps": {}}]}], + "dailyRollupResponses": [], + }, + record_count=1, + ) + + monkeypatch.setattr( + "pipelines.sources.google_health.workflow._build_dependencies", + lambda: _dependencies(repository, writer, UnrecognizedExtractor()), + ) + + # Act + result = run_google_health_ingest(_run(["steps"])) + + # Assert + assert result["status"] == "failed" + assert result["data_types"][0]["status"] == "failed" + assert writer.event_calls == [] + + +def test_short_error_falls_back_to_exception_class_for_empty_message(): + """例外メッセージが空でもclass名を返す。""" + # Act + result = _short_error(RuntimeError()) + + # Assert + assert result == "RuntimeError" diff --git a/egograph/pipelines/tests/unit/google_health/test_writer.py b/egograph/pipelines/tests/unit/google_health/test_writer.py new file mode 100644 index 00000000..21db5d0f --- /dev/null +++ b/egograph/pipelines/tests/unit/google_health/test_writer.py @@ -0,0 +1,277 @@ +"""Google Health writerのテスト。""" + +from datetime import UTC, date, datetime +from io import BytesIO +from unittest.mock import patch +from zoneinfo import ZoneInfo + +import pandas as pd +from botocore.exceptions import ClientError +from pipelines.sources.google_health.writer import GoogleHealthWriter + + +class MemoryS3: + """events保存とcompactionを検証するin-memory S3。""" + + def __init__(self): + self.objects = {} + + def put_object(self, *, Bucket, Key, Body, ContentType): # noqa: N803 + self.objects[Key] = Body + + def get_object(self, *, Bucket, Key): # noqa: N803 + if Key not in self.objects: + raise ClientError({"Error": {"Code": "NoSuchKey"}}, "GetObject") + return {"Body": BytesIO(self.objects[Key])} + + def delete_object(self, *, Bucket, Key): # noqa: N803 + self.objects.pop(Key, None) + + +def _writer(memory_s3, *, timezone=None): + with patch( + "pipelines.sources.google_health.writer.boto3.client", + return_value=memory_s3, + ): + return GoogleHealthWriter( + endpoint_url="https://r2.example.test", + access_key_id="key", + secret_access_key="secret", + bucket_name="bucket", + timezone=timezone, + ) + + +def _put_parquet(memory_s3, key, rows): + buffer = BytesIO() + pd.DataFrame(rows).to_parquet(buffer, index=False) + memory_s3.objects[key] = buffer.getvalue() + + +def test_raw_key_contains_required_lineage_fields(): + """Raw保存先にconnection、data type、期間、run IDを含める。""" + # Arrange + memory_s3 = MemoryS3() + writer = _writer(memory_s3) + + # Act + key = writer.save_raw( + connection_id="google-health-primary", + data_type="steps", + date_from=date(2026, 6, 1), + date_to=date(2026, 6, 3), + run_id="run-1", + payload={"dataPoints": []}, + ) + + # Assert + assert key == ( + "raw/google_health/connection_id=google-health-primary/" + "data_type=steps/from=2026-06-01/to=2026-06-03/run_id=run-1.json" + ) + + +def test_save_events_adds_run_file_without_deleting_existing_files(): + """取得ごとのUUID Parquetをeventsへ追加し既存ファイルを残す。""" + # Arrange + memory_s3 = MemoryS3() + writer = _writer(memory_s3) + old_key = "events/google_health/samples/year=2026/month=06/old.parquet" + memory_s3.objects[old_key] = b"existing" + + # Act + saved_keys = writer.save_events( + run_id="run-1", + records={ + "samples": [ + { + "connection_id": "google-health-primary", + "data_type": "heart-rate", + "measured_at_utc": datetime(2026, 6, 1, tzinfo=UTC), + "value": 75.0, + } + ] + }, + ) + + # Assert + assert saved_keys == [ + "events/google_health/samples/year=2026/month=06/run-1.parquet" + ] + assert old_key in memory_s3.objects + + +def test_compact_range_replaces_only_selected_data_type(): + """compacted内の対象期間だけを今回runのeventsで置換する。""" + # Arrange + memory_s3 = MemoryS3() + writer = _writer(memory_s3) + compacted_key = ( + "compacted/events/google_health/samples/year=2026/month=06/data.parquet" + ) + event_key = "events/google_health/samples/year=2026/month=06/run-1.parquet" + _put_parquet( + memory_s3, + compacted_key, + [ + { + "connection_id": "google-health-primary", + "data_type": "heart-rate", + "measured_at_utc": datetime(2026, 6, 1, tzinfo=UTC), + "value": 70.0, + }, + { + "connection_id": "google-health-primary", + "data_type": "oxygen-saturation", + "measured_at_utc": datetime(2026, 6, 1, tzinfo=UTC), + "value": 95.0, + }, + ], + ) + _put_parquet( + memory_s3, + event_key, + [ + { + "connection_id": "google-health-primary", + "data_type": "heart-rate", + "measured_at_utc": datetime(2026, 6, 1, 2, tzinfo=UTC), + "value": 75.0, + } + ], + ) + + # Act + saved_keys = writer.compact_range( + connection_id="google-health-primary", + selected_data_types=("heart-rate",), + date_from=date(2026, 6, 1), + date_to=date(2026, 6, 2), + run_id="run-1", + ) + + # Assert + assert saved_keys == [compacted_key] + rows = pd.read_parquet(BytesIO(memory_s3.objects[compacted_key])).to_dict( + orient="records" + ) + assert {(row["data_type"], row["value"]) for row in rows} == { + ("heart-rate", 75.0), + ("oxygen-saturation", 95.0), + } + assert event_key in memory_s3.objects + + +def test_compact_range_removes_no_data_range_without_deleting_events(): + """no_dataの対象範囲をcompactedから除きeventsファイルは保持する。""" + # Arrange + memory_s3 = MemoryS3() + writer = _writer(memory_s3) + compacted_key = ( + "compacted/events/google_health/samples/year=2026/month=06/data.parquet" + ) + unrelated_event_key = ( + "events/google_health/samples/year=2026/month=06/previous-run.parquet" + ) + _put_parquet( + memory_s3, + compacted_key, + [ + { + "connection_id": "google-health-primary", + "data_type": "heart-rate", + "measured_at_utc": datetime(2026, 6, 1, tzinfo=UTC), + "value": 70.0, + } + ], + ) + memory_s3.objects[unrelated_event_key] = b"existing" + + # Act + writer.compact_range( + connection_id="google-health-primary", + selected_data_types=("heart-rate",), + date_from=date(2026, 6, 1), + date_to=date(2026, 6, 2), + run_id="run-with-no-data", + ) + + # Assert + assert compacted_key not in memory_s3.objects + assert unrelated_event_key in memory_s3.objects + + +def test_compact_range_reads_utc_month_crossed_by_local_date(): + """JST日付が跨ぐ前月UTC partitionもcompact対象にする。""" + # Arrange + memory_s3 = MemoryS3() + writer = _writer(memory_s3, timezone=ZoneInfo("Asia/Tokyo")) + event_key = "events/google_health/samples/year=2026/month=05/run-1.parquet" + compacted_key = ( + "compacted/events/google_health/samples/year=2026/month=05/data.parquet" + ) + _put_parquet( + memory_s3, + event_key, + [ + { + "connection_id": "google-health-primary", + "data_type": "heart-rate", + "measured_at_utc": datetime(2026, 5, 31, 15, 30, tzinfo=UTC), + "value": 75.0, + } + ], + ) + + # Act + writer.compact_range( + connection_id="google-health-primary", + selected_data_types=("heart-rate",), + date_from=date(2026, 6, 1), + date_to=date(2026, 6, 2), + run_id="run-1", + ) + + # Assert + assert compacted_key in memory_s3.objects + rows = pd.read_parquet(BytesIO(memory_s3.objects[compacted_key])) + assert rows.iloc[0]["value"] == 75.0 + assert event_key in memory_s3.objects + + +def test_sleep_uses_end_date_for_range_and_start_date_for_partition(): + """月跨ぎsleepは終了日で置換し開始月partitionへ保存する。""" + # Arrange + memory_s3 = MemoryS3() + writer = _writer(memory_s3) + event_key = "events/google_health/sessions/year=2026/month=05/run-1.parquet" + compacted_key = ( + "compacted/events/google_health/sessions/year=2026/month=05/data.parquet" + ) + _put_parquet( + memory_s3, + event_key, + [ + { + "connection_id": "google-health-primary", + "data_type": "sleep", + "started_at_utc": datetime(2026, 5, 31, 23, tzinfo=UTC), + "ended_at_utc": datetime(2026, 6, 1, 7, tzinfo=UTC), + "duration_seconds": 28800, + } + ], + ) + + # Act + writer.compact_range( + connection_id="google-health-primary", + selected_data_types=("sleep",), + date_from=date(2026, 6, 1), + date_to=date(2026, 6, 2), + run_id="run-1", + ) + + # Assert + rows = pd.read_parquet(BytesIO(memory_s3.objects[compacted_key])) + assert rows.iloc[0]["duration_seconds"] == 28800 + assert event_key in memory_s3.objects diff --git a/egograph/pipelines/tests/unit/test_dispatcher.py b/egograph/pipelines/tests/unit/test_dispatcher.py index 275859ce..f2c8ca5f 100644 --- a/egograph/pipelines/tests/unit/test_dispatcher.py +++ b/egograph/pipelines/tests/unit/test_dispatcher.py @@ -5,6 +5,7 @@ from datetime import datetime, timezone from unittest.mock import Mock, patch +import pytest from pipelines.domain.errors import AuthenticationError from pipelines.domain.workflow import ( QueuedReason, @@ -22,7 +23,10 @@ from pipelines.infrastructure.db.step_run_repository import StepRunRepository from pipelines.infrastructure.db.workflow_repository import WorkflowRepository from pipelines.infrastructure.dispatching.lock_manager import WorkflowLockManager -from pipelines.infrastructure.dispatching.run_dispatcher import RunDispatcher +from pipelines.infrastructure.dispatching.run_dispatcher import ( + RunDispatcher, + _status_from_summary, +) from pipelines.infrastructure.execution.inprocess_executor import InProcessStepExecutor from pipelines.infrastructure.execution.log_store import LocalLogStore from pipelines.infrastructure.execution.subprocess_executor import ( @@ -105,6 +109,56 @@ def test_dispatch_once_succeeds_and_writes_step_log(tmp_path): assert "dummy step succeeded" in LocalLogStore.read_log(steps[0].log_path) +def test_dispatch_once_persists_partial_failed_summary_status(tmp_path): + """step summaryのpartial_failedをrun statusへ反映する。""" + # Arrange + workflows = { + "partial_workflow": WorkflowDefinition( + workflow_id="partial_workflow", + name="Partial workflow", + description="Partial workflow for tests", + steps=( + StepDefinition( + step_id="partial", + step_name="Partial", + executor_type=StepExecutorType.INPROCESS, + callable_ref=( + "pipelines.tests.support.dummy_steps:partial_failure" + ), + ), + ), + ) + } + run_repository, _, dispatcher, _ = _build_dispatcher(tmp_path, workflows) + run = run_repository.enqueue_run( + workflow_id="partial_workflow", + trigger_type=TriggerType.MANUAL, + queued_reason=QueuedReason.MANUAL_REQUEST, + ) + + # Act + dispatcher.dispatch_once() + updated_run = run_repository.get_run(run.run_id) + + # Assert + assert updated_run.status is WorkflowRunStatus.PARTIAL_FAILED + assert updated_run.last_error_message == "sleep: unavailable" + assert updated_run.result_summary == { + "status": "partial_failed", + "errors": ["sleep: unavailable"], + } + + +@pytest.mark.parametrize("summary_status", ["unexpected", "running", "queued"]) +def test_invalid_summary_status_logs_warning_and_succeeds(caplog, summary_status): + """未知または非終端statusは警告を残してsucceededへ寄せる。""" + with caplog.at_level(logging.WARNING): + status = _status_from_summary({"status": summary_status}) + + assert status is WorkflowRunStatus.SUCCEEDED + assert "non-terminal or unknown summary status" in caplog.text + + def test_dispatch_once_skips_remaining_steps_after_failure(tmp_path): """前段 step 失敗時は後続 step を skipped にする。""" # Arrange diff --git a/egograph/pipelines/workflows/registry.py b/egograph/pipelines/workflows/registry.py index f3bab8f7..bc37a816 100644 --- a/egograph/pipelines/workflows/registry.py +++ b/egograph/pipelines/workflows/registry.py @@ -141,6 +141,29 @@ def get_workflows() -> dict[str, WorkflowDefinition]: timeout_seconds=3600, misfire_policy=MisfirePolicy.SKIP_MISFIRE, ), + WorkflowDefinition( + workflow_id="google_health_ingest_workflow", + name="Google Health ingest workflow", + description="Collect Google Health data and store Raw JSON and Parquet", + steps=( + _inprocess_step( + "run_google_health_ingest", + "Run Google Health ingest", + "pipelines.sources.google_health.workflow:run_google_health_ingest", + timeout_seconds=7200, + ), + _inprocess_step( + "run_google_health_compact", + "Run Google Health compact", + "pipelines.sources.google_health.workflow:run_google_health_compact", + timeout_seconds=1800, + ), + ), + triggers=(), + concurrency_key="google_health_ingest_workflow", + timeout_seconds=9000, + misfire_policy=MisfirePolicy.SKIP_MISFIRE, + ), WorkflowDefinition( workflow_id="local_mirror_sync_workflow", name="Local compacted parquet mirror sync",