feat(dagster): add workflow orchestration integration#2652
Conversation
Greptile code reviewThis repo uses Greptile for automated review. Before merge, aim for Confidence Score: 5/5 with zero unresolved review threads — see CONTRIBUTING.md. Run a review — add a PR comment with: Give it ~5-10 minutes (sometimes longer) for results, then fix feedback and re-trigger until you reach Confidence Score: 5/5. Optional: automate with the greploop skill. |
|
@greptile review |
Greptile SummaryThis PR adds a complete Dagster workflow orchestration integration with five read-only tools (
Confidence Score: 5/5All five tools are read-only, properly scoped, and the integration handles auth, timeouts, and pagination errors gracefully without risk to the Dagster instance. The integration is well-structured: the paginated run-log fetcher correctly handles mid-pagination errors without discarding accumulated failures, all required selector coordinates are enforced without defaults, the httpx client lifecycle is managed via context managers throughout, and the 1500-line test suite covers all edge cases including multi-page pagination, concurrent step failures, and RunFailureEvent vs ExecutionStepFailureEvent semantics. No functional gaps were found beyond those already addressed in prior review threads. No files require special attention. Important Files Changed
Sequence DiagramsequenceDiagram
participant Agent
participant Tool as Dagster Tool Layer
participant Integration as app/integrations/dagster.py
participant Client as DagsterClient
participant API as Dagster GraphQL API
Agent->>Tool: get_dagster_run_logs(run_id)
Tool->>Integration: get_run_logs(config, run_id)
loop "Pagination up to MAX_RUN_LOG_PAGES=100"
Integration->>Client: "get_run_logs(run_id, limit=250, cursor)"
Client->>API: POST /graphql (GetRunLogs)
API-->>Client: "EventConnection { events, cursor, hasMore }"
Client-->>Integration: "{data: {logsForRun: ...}}"
Integration->>Integration: partition into failure_events / non_failure_events deque
alt "hasMore=false or error"
Integration-->>Integration: break loop
else "hasMore=true and cursor present"
Integration->>Integration: advance cursor
end
end
Integration->>Integration: _extract_step_failures(failure_events) → summary
Integration-->>Tool: "{data: {logsForRun: aggregated}, summary: {failure_count, failures, truncated}}"
Tool-->>Agent: enriched run log response
Agent->>Tool: list_dagster_runs(job_name, status)
Tool->>Integration: list_runs(config, status, job_name)
Integration->>Client: list_runs(limit, statuses, pipelineName)
Client->>API: POST /graphql (ListRuns)
API-->>Client: "runsOrError { Runs | InvalidPipelineRunsFilterError | PythonError }"
Client-->>Integration: "{data: {runsOrError: ...}}"
Integration->>Integration: _compute_run_durations adds duration_seconds per run
Integration-->>Tool: enriched runs response
Tool-->>Agent: run list with durations
Reviews (10): Last reviewed commit: "fix(dagster): signal partial fetch when ..." | Re-trigger Greptile |
7ba0b74 to
f9c032d
Compare
|
@greptile review |
874850c to
d627982
Compare
|
@greptile review |
|
looks good to me |
|
@Davidson3556 thank you |
|
need to see the entire flow from opensre onboard the integration should work here -> all the env needs to be pasted here, then all of it here https://www.opensre.com/docs/integrations-overview#local-integrations |
|
@muddlebee thanks for the review, just want to be sure I don't miss anything. since the CLI setup already works end-to-end, so what's missing is just the docs surfaces, right? |
|
Docs and demo both. |
|
alright will do. thanks. |
Adds Dagster integration with four tools (list_runs, run_logs, assets, sensor_ticks) backed by the GraphQL API. Works against OSS Dagster and Dagster+ Cloud. Wires config, verification, env-var fallback, setup handler, docs, and tests. Fixes Tracer-Cloud#2571
Closes two integration gaps: - `list_dagster_schedule_ticks` mirrors the sensor tool shape - `list_dagster_runs` rows now include `duration_seconds` (null for in-flight runs); so the agent no longer derives it.
d627982 to
965894e
Compare
|
@greptile review |
- get_run_logs now paginates until hasMore=false, always retaining ExecutionStepFailureEvent and RunFailureEvent across pages. - Non-failure events are held in a sliding window of the most recent MAX_NON_FAILURE_RUN_LOG_EVENTS=1500 (older events evicted) so the kept context stays adjacent to the typically later-in-stream failures while bounding LLM context. - A MAX_RUN_LOG_PAGES=100 safety net bounds HTTP latency for outsized runs. - summary.truncated signals when the window overflowed or the page cap fired so the agent can qualify the finding as partial.
|
@greptile review |
Sort the aggregated list by timestamp before returning. As concatenating non-failure and failure event partitions is not enough when events from each interleave chronologically: a downstream skip event (kept in non_failure_events) would appear BEFORE the upstream failure that caused it (kept in failure_events), inverting the causal chain for the LLM.
6852640 to
ae6e0cf
Compare
|
@greptile review |
mid-pagination errors preserve collected failures and set summary.fetch_error so callers know the fetch was partial.
40a782d to
81e7bc9
Compare
|
@greptile review |
|
@greptile review |
|
Want your agent to iterate on Greptile's feedback? Try greploops. |
|
Hello @muddlebee here is what i added since last review: setup demo + docs modif get_run_logs pagination and tests:
I considered exposing Let me know if you'd rather see it exposed, or bigger than 1500 or have another idea. same for |
Fixes #2571
Summary
Adds Dagster integration covering:
list_dagster_runs,get_dagster_run_logs,list_dagster_assets,list_dagster_sensor_ticks,list_dagster_schedule_ticks) backed by theDagster GraphQL API
dagster dev, self-hosted webserver) and Dagster+ CloudWiring
flowchart TD A([Dagster-source alert]) --> R[/"_ALERT_SOURCE_TO_TOOL_SOURCES<br/>(agent/investigation.py + agent/prompt.py)"/] R --> T1["list_dagster_runs"] R --> T2["get_dagster_run_logs"] R --> T3["list_dagster_assets"] R --> T4["list_dagster_sensor_ticks"] R --> T5["list_dagster_schedule_ticks"] subgraph helpers ["app/integrations/dagster.py"] H1["list_runs(status, job_name)<br/>+ _compute_run_durations"] H2["get_run_logs<br/>paginated; non-failures bounded, failures preserved<br/>+ _extract_step_failures (summary block)"] H3["list_assets_with_materialization"] H4["list_sensor_ticks<br/>(repo, location, sensor)"] H5["list_schedule_ticks<br/>(repo, location, schedule)"] end T1 --> H1 T2 --> H2 T3 --> H3 T4 --> H4 T5 --> H5 H1 --> C H2 --> C H3 --> C H4 --> C H5 --> C C["DagsterClient<br/>app/services/dagster/client.py"] C -->|"POST /graphql"| API[("Dagster GraphQL<br/>OSS or Cloud")]Notes
Demo
A
supply_chain_pipelineDagster job with 4 ops in a partial-success shape:fetch_inventory,fetch_sales_history, andcalculate_reorder_pointsall succeed, thengenerate_purchase_ordersfails with a vendor-portal HTTP 503 deep in the DAG, blocking thefinal notification step.
Setup
dagster_setup.mov
Investigation (model: deepseek-v4-pro):
dagster_investigation.mov