Skip to content

Commit 7c9a6a2

Browse files
committed
feat: Add workflow orchestration system
- Implement WorkflowEngine for chaining SME skills/tools - Create StepRegistry mapping 20+ handlers (harvesting, analysis, AI, security) - Add REST API endpoints for workflow management - Document in skills/workflow-engine.md
1 parent 0d700c3 commit 7c9a6a2

4 files changed

Lines changed: 994 additions & 0 deletions

File tree

skills/workflow-engine.md

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
---
2+
Domain: SME_INTEGRATION
3+
Version: 1.0.0
4+
Complexity: Advanced
5+
Type: Tool
6+
Category: Orchestration
7+
name: workflow-engine
8+
Source: Semantic Memory Engine (SME)
9+
Source_File: src/orchestration/workflow_engine.py
10+
---
11+
12+
## Purpose
13+
14+
Enables chaining SME skills/tools into executable workflows with state management, error handling, and parallel execution.
15+
16+
## Description
17+
18+
The Workflow Engine provides orchestration capabilities for SME. It allows defining multi-step workflows that execute in sequence or parallel, with dependency management, retry logic, and persistent execution tracking.
19+
20+
## Workflow
21+
22+
1. **Define**: Create workflow with steps and dependencies
23+
2. **Register**: Register step handlers (skills/tools)
24+
3. **Execute**: Run workflow with input data
25+
4. **Track**: Monitor execution, handle errors, retries
26+
5. **Complete**: Store results in database
27+
28+
## Examples
29+
30+
### Example 1: Research Workflow
31+
**Input**: Topic: "AI ethics"
32+
**Output**: Comprehensive report with sources
33+
**Steps**: harvest_url → sentiment → stylometry → summarize
34+
35+
### Example 2: OSINT Investigation
36+
**Input**: Username: "@target"
37+
**Output**: Bot detection + profile analysis
38+
**Steps**: osint_scan → bot_detection → trust_score
39+
40+
### Example 3: Parallel Analysis
41+
**Input**: List of URLs
42+
**Output**: Multi-source analysis results
43+
**Steps**: Multiple harvest_url steps in parallel
44+
45+
## Implementation Notes
46+
47+
- **Location**: `D:/SME/src/orchestration/workflow_engine.py`
48+
- **Database**: SQLite (storage.db_path)
49+
- **API Endpoints**:
50+
- GET /workflows - List workflows
51+
- POST /workflows - Create workflow
52+
- POST /workflows/{id}/execute - Run workflow
53+
- GET /workflows/steps - List available steps
54+
- **Step Registry**: Maps step names to actual handlers in `step_registry.py`

src/api/router.py

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,3 +300,121 @@ async def ingest_from_url(request: IngestRequest):
300300
return result
301301
except Exception as e:
302302
raise HTTPException(status_code=500, detail=str(e))
303+
304+
305+
# =============================================================================
306+
# Workflow API
307+
# =============================================================================
308+
309+
310+
@router.get("/workflows")
311+
async def list_workflows():
312+
"""List all available workflows."""
313+
try:
314+
from src.orchestration.workflow_engine import get_engine
315+
316+
engine = get_engine()
317+
workflows = engine.list_workflows()
318+
return {"workflows": workflows}
319+
except Exception as e:
320+
raise HTTPException(status_code=500, detail=str(e))
321+
322+
323+
@router.post("/workflows")
324+
async def create_workflow(request: dict):
325+
"""Create a new workflow definition."""
326+
try:
327+
from src.orchestration.workflow_engine import get_engine
328+
from src.orchestration.step_registry import get_step_registry
329+
330+
engine = get_engine()
331+
registry = get_step_registry()
332+
333+
for step_def in request.get("steps", []):
334+
handler_name = step_def.get("handler")
335+
if handler_name:
336+
handler = registry.get_handler(handler_name)
337+
engine.register_step(handler_name, handler)
338+
339+
workflow = engine.create_workflow(
340+
name=request["name"],
341+
description=request.get("description", ""),
342+
steps=request.get("steps", []),
343+
parallel=request.get("parallel", False),
344+
)
345+
346+
return {
347+
"workflow_id": workflow.workflow_id,
348+
"name": workflow.name,
349+
"status": "created",
350+
}
351+
except Exception as e:
352+
raise HTTPException(status_code=500, detail=str(e))
353+
354+
355+
@router.get("/workflows/{workflow_id}")
356+
async def get_workflow(workflow_id: str):
357+
"""Get workflow definition."""
358+
try:
359+
from src.orchestration.workflow_engine import get_engine
360+
361+
engine = get_engine()
362+
workflow = engine.get_workflow(workflow_id)
363+
if not workflow:
364+
raise HTTPException(status_code=404, detail="Workflow not found")
365+
return workflow
366+
except HTTPException:
367+
raise
368+
except Exception as e:
369+
raise HTTPException(status_code=500, detail=str(e))
370+
371+
372+
@router.post("/workflows/{workflow_id}/execute")
373+
async def execute_workflow(workflow_id: str, input_data: dict):
374+
"""Execute a workflow with input data."""
375+
try:
376+
from src.orchestration.workflow_engine import get_engine
377+
from src.orchestration.step_registry import get_step_registry
378+
379+
engine = get_engine()
380+
registry = get_step_registry()
381+
382+
workflow_def = engine.get_workflow(workflow_id)
383+
if not workflow_def:
384+
raise HTTPException(status_code=404, detail="Workflow not found")
385+
386+
for step_def in workflow_def.get("definition", {}).get("steps", []):
387+
handler_name = step_def.get("handler")
388+
if handler_name:
389+
handler = registry.get_handler(handler_name)
390+
if handler:
391+
engine.register_step(handler_name, handler)
392+
393+
workflow = engine.create_workflow(
394+
name=workflow_def["name"],
395+
description=workflow_def.get("description", ""),
396+
steps=workflow_def.get("definition", {}).get("steps", []),
397+
parallel=workflow_def.get("definition", {}).get("parallel", False),
398+
)
399+
400+
import asyncio
401+
402+
result = await engine.execute(workflow, input_data)
403+
return result
404+
except HTTPException:
405+
raise
406+
except Exception as e:
407+
raise HTTPException(status_code=500, detail=str(e))
408+
409+
410+
@router.get("/workflows/steps")
411+
async def list_workflow_steps():
412+
"""List all available workflow steps."""
413+
try:
414+
from src.orchestration.step_registry import get_step_registry
415+
416+
registry = get_step_registry()
417+
steps = registry.list_steps()
418+
return {"steps": steps}
419+
except Exception as e:
420+
raise HTTPException(status_code=500, detail=str(e))

0 commit comments

Comments
 (0)