Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/architecture/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ class WorkflowDefinition:
| `spotify_ingest_workflow` | 6回/日 (0,4,8,12,16,22 JST) | ingest → compact |
| `github_ingest_workflow` | 1回/日 (00:00 JST) | ingest → compact |
| `google_activity_ingest_workflow` | 1回/日 (23:00 JST) | ingest |
| `google_health_ingest_workflow` | API手動実行 | Raw JSON / events保存 → compacted範囲置換 |
| `local_mirror_sync_workflow` | 6時間ごと | sync |
| `browser_history_compact_workflow` | イベント駆動 | compact |
| `browser_history_compact_maintenance_workflow` | 6時間ごと | compact maintenance |
Expand Down Expand Up @@ -485,6 +486,7 @@ uv run python -m pipelines.main serve
| GET | `/v1/sources/google-health/connection` | Google Health 接続状態 |
| DELETE | `/v1/sources/google-health/connection` | Google Health 接続削除 |
| POST | `/v1/sources/google-health/connection/smoke-test` | Google Health 疎通確認 |
| POST | `/v1/sources/google-health/runs` | Google Health backfill・期間指定run作成 |

---

Expand Down
257 changes: 202 additions & 55 deletions docs/data-sources/google-health.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions egograph/pipelines/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ Pipelines Service はポート `8001`(デフォルト)で HTTP API を提供
| `POST` | `/v1/workflows/{workflow_id}/runs` | ワークフロー手動実行 |
| `POST` | `/v1/workflows/{workflow_id}/enable` | ワークフロー有効化 |
| `POST` | `/v1/workflows/{workflow_id}/disable` | ワークフロー無効化 |
| `POST` | `/v1/sources/google-health/runs` | Google Health初回backfill・期間指定・data type指定run作成 |
| `GET` | `/v1/runs` | 全 run 一覧 |
| `GET` | `/v1/runs/{run_id}` | run 詳細 |
| `GET` | `/v1/runs/{run_id}/steps/{step_id}/log` | step ログ全文 |
Expand Down
108 changes: 103 additions & 5 deletions egograph/pipelines/api/google_health.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,96 @@
"""Google Health OAuth and connection API."""
"""Google Health OAuthconnection、ingest API"""

from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import ValidationError
from datetime import date, datetime, timedelta
from zoneinfo import ZoneInfo

from fastapi import APIRouter, Body, Depends, HTTPException, Query
from pydantic import BaseModel, Field, ValidationError, model_validator

from pipelines.api.dependencies import get_service, verify_api_key
from pipelines.service import PipelineService
from pipelines.sources.google_health.client import GoogleHealthAPIError
from pipelines.sources.google_health.data_types import SMOKE_TEST_DATA_TYPES
from pipelines.sources.google_health.data_types import (
DATA_TYPE_BY_NAME,
SMOKE_TEST_DATA_TYPES,
)
from pipelines.sources.google_health.models import GoogleHealthRunMode

router = APIRouter(
prefix="/v1/sources/google-health",
tags=["sources", "google_health"],
)


class GoogleHealthRunRequest(BaseModel):
"""Google Health取り込みrun作成リクエスト。"""

mode: GoogleHealthRunMode
date_from: date | None = Field(None, alias="from")
date_to: date | None = Field(None, alias="to")
data_types: list[str] = Field(default_factory=list)

@model_validator(mode="after")
def validate_range(self) -> "GoogleHealthRunRequest":
unknown = set(self.data_types) - DATA_TYPE_BY_NAME.keys()
if unknown:
raise ValueError(
f"invalid_data_types: unsupported values: {', '.join(sorted(unknown))}"
)
if self.mode is GoogleHealthRunMode.INITIAL_BACKFILL:
if self.date_from is not None or self.date_to is not None:
raise ValueError(
"invalid_range: initial_backfill does not accept from/to"
)
if self.data_types:
raise ValueError(
"invalid_data_types: initial_backfill targets all data types"
)
return self
if self.date_from is None or self.date_to is None:
raise ValueError("invalid_range: from and to are required")
if self.date_from >= self.date_to:
raise ValueError("invalid_range: from must be earlier than to")
if self.mode is GoogleHealthRunMode.DATA_TYPE_RANGE and not self.data_types:
raise ValueError("invalid_data_types: data_type_range requires data_types")
if self.mode is GoogleHealthRunMode.RANGE and self.data_types:
raise ValueError("invalid_data_types: range targets all data types")
return self

def to_run_input(
self,
*,
timezone: ZoneInfo,
now: datetime | None = None,
) -> dict:
"""実行時に解決済みのclosed-open期間へ変換する。"""
if self.mode is GoogleHealthRunMode.INITIAL_BACKFILL:
current = now or datetime.now(tz=timezone)
date_to = current.astimezone(timezone).date() + timedelta(days=1)
date_from = date_to - timedelta(days=90)
else:
date_from = self.date_from
date_to = self.date_to
if date_from is None or date_to is None: # pragma: no cover
raise ValueError("invalid_range: unresolved range")
return {
"mode": self.mode.value,
"from": date_from.isoformat(),
"to": date_to.isoformat(),
"data_types": self.data_types,
}


def _format_invalid_detail(exc: Exception) -> str:
if isinstance(exc, ValidationError):
details = []
for error in exc.errors():
field = ".".join(str(part) for part in error["loc"]) or "request"
details.append(f"invalid_{field}: {error['msg']}")
reason = str(error["msg"])
marker = "invalid_"
if marker in reason and ":" in reason:
details.append(reason[reason.index(marker) :])
else:
details.append(f"invalid_{field}: {reason}")
return "; ".join(details)

message = str(getattr(exc, "message", None) or exc).strip()
Expand Down Expand Up @@ -141,3 +212,30 @@ def smoke_test_connection(
detail=_format_invalid_detail(exc),
) from exc
return {"status": "ok", "data_types": results}


@router.post("/runs", status_code=201)
def create_ingest_run(
request_body: dict = Body(...),
_: None = Depends(verify_api_key),
service: PipelineService = Depends(get_service),
) -> dict:
"""Google Health取り込みrunを作成する。"""
try:
request = GoogleHealthRunRequest.model_validate(request_body)
except ValidationError as exc:
raise HTTPException(
status_code=400,
detail=_format_invalid_detail(exc),
) from exc
connection = service.google_health_repository.get_connection()
if connection is None or connection.status.value != "active":
raise HTTPException(
status_code=409,
detail="invalid_google_health_connection: active connection not found",
)
_require_client(service)
run = service.trigger_google_health_ingest(
request.to_run_input(timezone=ZoneInfo(service.config.timezone))
)
return run.__dict__
15 changes: 13 additions & 2 deletions egograph/pipelines/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import os
from pathlib import Path
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError

from egograph_paths import PIPELINES_LOGS_DIR, PIPELINES_STATE_DB_PATH
from pydantic import Field, SecretStr
from pydantic import Field, SecretStr, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict

USE_ENV_FILE = os.getenv("USE_ENV_FILE", "true").lower() in ("true", "1", "yes")
Expand All @@ -27,7 +28,7 @@ class PipelinesConfig(BaseSettings):
host: str = "127.0.0.1"
port: int = 8001
api_key: SecretStr | None = None
timezone: str = "UTC"
timezone: str = Field("UTC", validation_alias="TIMEZONE")
dispatcher_poll_seconds: float = 1.0
max_concurrent_runs: int = 4
lock_lease_seconds: int = 300
Expand All @@ -51,6 +52,16 @@ class PipelinesConfig(BaseSettings):
validation_alias="GOOGLE_HEALTH_TOKEN_ENCRYPTION_KEY",
)

@field_validator("timezone")
@classmethod
def validate_timezone(cls, value: str) -> str:
"""IANAタイムゾーン名を検証する。"""
try:
ZoneInfo(value)
except ZoneInfoNotFoundError as exc:
raise ValueError(f"invalid timezone: {value}") from exc
return value

@property
def google_health_is_configured(self) -> bool:
"""Google Health OAuth 設定がすべて揃っているか返す。"""
Expand Down
1 change: 1 addition & 0 deletions egograph/pipelines/domain/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class WorkflowRunStatus(StrEnum):
QUEUED = "queued"
RUNNING = "running"
SUCCEEDED = "succeeded"
PARTIAL_FAILED = "partial_failed"
FAILED = "failed"
CANCELED = "canceled"

Expand Down
37 changes: 37 additions & 0 deletions egograph/pipelines/infrastructure/db/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,41 @@ def initialize_schema(conn: sqlite3.Connection) -> None:
);
"""
)
_migrate_google_health_sync_cursors(conn)
conn.commit()


def _migrate_google_health_sync_cursors(conn: sqlite3.Connection) -> None:
"""既存のGoogle Health sync cursorへPhase 2列を追加する。"""
columns = {
row[1]
for row in conn.execute(
"PRAGMA table_info(google_health_sync_cursors)"
).fetchall()
}
additions = {
"status": (
"ALTER TABLE google_health_sync_cursors "
"ADD COLUMN status TEXT NOT NULL DEFAULT 'success'"
),
"range_start": (
"ALTER TABLE google_health_sync_cursors ADD COLUMN range_start TEXT"
),
"range_end": (
"ALTER TABLE google_health_sync_cursors ADD COLUMN range_end TEXT"
),
"last_run_id": (
"ALTER TABLE google_health_sync_cursors ADD COLUMN last_run_id TEXT"
),
"record_count": (
"ALTER TABLE google_health_sync_cursors "
"ADD COLUMN record_count INTEGER NOT NULL DEFAULT 0"
),
"last_error_message": (
"ALTER TABLE google_health_sync_cursors "
"ADD COLUMN last_error_message TEXT"
),
}
for name, statement in additions.items():
if name not in columns:
conn.execute(statement)
54 changes: 53 additions & 1 deletion egograph/pipelines/infrastructure/dispatching/run_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,29 @@ def _execute_run(
exc=last_exc,
)
return
final_status = _status_from_summary(last_summary)
error_message = (
_error_from_summary(last_summary)
if final_status
in {
WorkflowRunStatus.PARTIAL_FAILED,
WorkflowRunStatus.FAILED,
}
else None
)
self._run_repository.update_run_result(
run_id=run.run_id,
status=WorkflowRunStatus.SUCCEEDED,
status=final_status,
error_message=error_message,
result_summary=last_summary,
)
if final_status == WorkflowRunStatus.FAILED:
self._notify_failure(
workflow=workflow,
run=run,
error_message=error_message or "workflow reported failed status",
exc=None,
)
except Exception as exc:
logger.exception(
"run execution crashed unexpectedly: run_id=%s",
Expand Down Expand Up @@ -544,3 +562,37 @@ def _notify_failure(
),
)
self._notification_service.notify(event, exc=exc)


def _status_from_summary(
summary: dict | None,
) -> WorkflowRunStatus:
if summary is None:
return WorkflowRunStatus.SUCCEEDED
raw_status = summary.get("status", WorkflowRunStatus.SUCCEEDED.value)
normalized_status = str(raw_status).strip().lower()
terminal_statuses = {
status.value: status
for status in (
WorkflowRunStatus.SUCCEEDED,
WorkflowRunStatus.PARTIAL_FAILED,
WorkflowRunStatus.FAILED,
)
}
status = terminal_statuses.get(normalized_status)
if status is not None:
return status
logger.warning(
"workflow returned non-terminal or unknown summary status: status=%r",
raw_status,
)
return WorkflowRunStatus.SUCCEEDED


def _error_from_summary(summary: dict | None) -> str:
if summary is None:
return "workflow reported failed status"
errors = summary.get("errors")
if isinstance(errors, list) and errors:
return "; ".join(str(error) for error in errors)
return "workflow reported failed status"
17 changes: 17 additions & 0 deletions egograph/pipelines/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import threading
from dataclasses import dataclass
from typing import cast
from zoneinfo import ZoneInfo

from pydantic import SecretStr

Expand Down Expand Up @@ -102,6 +103,7 @@ def create(cls, config: PipelinesConfig | None = None) -> "PipelineService":
token_cipher,
client_id=client_id.get_secret_value(),
client_secret=client_secret.get_secret_value(),
timezone=ZoneInfo(config.timezone),
)
service = cls(
config=config,
Expand Down Expand Up @@ -184,6 +186,21 @@ def trigger_workflow(
requested_by=requested_by,
)

def trigger_google_health_ingest(
self,
request: dict,
*,
requested_by: str = "api",
) -> WorkflowRun:
"""Google Health取り込みrunを入力付きでqueueへ積む。"""
return self.run_repository.enqueue_run(
workflow_id="google_health_ingest_workflow",
trigger_type=TriggerType.MANUAL,
queued_reason=QueuedReason.MANUAL_REQUEST,
requested_by=requested_by,
result_summary={"request": request},
)

def set_workflow_enabled(self, workflow_id: str, enabled: bool) -> dict:
"""workflow の有効/無効フラグを更新し scheduler を再同期する。"""
workflow = self.workflow_repository.set_workflow_enabled(workflow_id, enabled)
Expand Down
Loading