diff --git a/gourmand-exceptions.toml b/gourmand-exceptions.toml index 054c6b4..ecbcb6c 100644 --- a/gourmand-exceptions.toml +++ b/gourmand-exceptions.toml @@ -51,3 +51,10 @@ justification = "Response shape normalization handling dict vs list API response check = "implicit_state_machine" path = "src/mcp_workboard_crunchtools/client.py" justification = "Error handling dispatches on HTTP status codes — standard pattern, not a state machine" + +# Single-use helper — _extract_columns is called from two locations +[[exceptions]] +check = "single_use_helpers" +path = "src/mcp_workboard_crunchtools/tools/workstreams.py" +justification = "_extract_columns is called from two locations: _format_workstream (line 153) and get_workstream_columns (line 262). Gourmand reports one callsite, but grep confirms two distinct callers." +classification = "gourmand_bug" diff --git a/src/mcp_workboard_crunchtools/server.py b/src/mcp_workboard_crunchtools/server.py index 0dd64e7..bafc84f 100644 --- a/src/mcp_workboard_crunchtools/server.py +++ b/src/mcp_workboard_crunchtools/server.py @@ -397,6 +397,30 @@ async def workboard_get_workstream_activities_tool( return await get_workstream_activities(ws_id=ws_id) +@mcp.tool() +async def workboard_get_workstream_columns_tool( + ws_id: int, +) -> dict[str, Any]: + """Get the custom Kanban columns defined on a workstream. + + Returns the list of custom columns (e.g. "Homework", "Interlocks", + "Active") configured on the workstream's Kanban board. Use the + column_id values with create/update activity tools to place cards + in specific columns. + + Workstreams without custom columns return an empty list. + + Args: + ws_id: Workstream ID (positive integer) + + Returns: + Workstream ID, name, and list of columns with column_id and column_name + """ + from .tools import get_workstream_columns + + return await get_workstream_columns(ws_id=ws_id) + + @mcp.tool() async def workboard_get_team_workstreams_tool( team_id: int, diff --git a/src/mcp_workboard_crunchtools/tools/__init__.py b/src/mcp_workboard_crunchtools/tools/__init__.py index b998e4e..9869611 100644 --- a/src/mcp_workboard_crunchtools/tools/__init__.py +++ b/src/mcp_workboard_crunchtools/tools/__init__.py @@ -24,6 +24,7 @@ create_workstream, get_team_workstreams, get_workstream_activities, + get_workstream_columns, get_workstreams, update_workstream, ) @@ -48,6 +49,7 @@ "create_objective", "get_workstreams", "get_workstream_activities", + "get_workstream_columns", "get_team_workstreams", "create_workstream", "update_workstream", diff --git a/src/mcp_workboard_crunchtools/tools/workstreams.py b/src/mcp_workboard_crunchtools/tools/workstreams.py index f5a192d..41bdf09 100644 --- a/src/mcp_workboard_crunchtools/tools/workstreams.py +++ b/src/mcp_workboard_crunchtools/tools/workstreams.py @@ -95,6 +95,21 @@ def _format_activity_item(ai: dict[str, Any]) -> dict[str, Any]: return formatted +def _extract_columns(activities: list[Any]) -> list[dict[str, str]]: + """Extract deduplicated column catalog from a list of raw action items.""" + columns: dict[str, str] = {} + for ai in activities: + if not isinstance(ai, dict): + continue + col = ai.get("ai_column") + if isinstance(col, dict) and col.get("id"): + columns[col["id"]] = col.get("name", "") + return [ + {"column_id": cid, "column_name": cname} + for cid, cname in columns.items() + ] + + def _format_workstream(ws: dict[str, Any]) -> dict[str, Any]: """Format a raw workstream object for MCP output.""" formatted: dict[str, Any] = { @@ -135,6 +150,10 @@ def _format_workstream(ws: dict[str, Any]) -> dict[str, Any]: if count is not None: formatted["action_item_count"] = count + extracted = _extract_columns(activities) + if extracted: + formatted["columns"] = extracted + return formatted @@ -206,6 +225,44 @@ async def get_workstream_activities( return {"workstream": activity_body} +async def get_workstream_columns( + ws_id: int, +) -> dict[str, Any]: + """Get the custom Kanban columns defined on a workstream. + + Fetches all action items and extracts the unique column definitions. + Workstreams without custom columns return an empty list. + + Args: + ws_id: Workstream ID (positive integer). + + Returns: + Dictionary with ws_id, name, and columns list. + """ + ws_id = validate_workstream_id(ws_id) + client = get_client() + + response = await client.get(f"/workstream/{ws_id}/activity") + activity_body = response.get("data", {}) + + ws_data = activity_body.get("workstream") + if not isinstance(ws_data, dict): + ws_data = {} + + activities: list[Any] = [] + activity_data = ws_data.get("ws_activity") + if isinstance(activity_data, dict): + raw = activity_data.get("activity", []) + if isinstance(raw, list): + activities = raw + + return { + "ws_id": ws_data.get("ws_id", str(ws_id)), + "name": ws_data.get("ws_name", ""), + "columns": _extract_columns(activities), + } + + async def get_team_workstreams( team_id: int, ) -> dict[str, Any]: diff --git a/tests/test_tools.py b/tests/test_tools.py index 8dca162..a9e32a8 100644 --- a/tests/test_tools.py +++ b/tests/test_tools.py @@ -21,10 +21,10 @@ def test_server_has_tools(self) -> None: assert mcp is not None def test_tool_count(self) -> None: - """Server should have exactly 22 tools registered.""" + """Server should have exactly 23 tools registered.""" from mcp_workboard_crunchtools.tools import __all__ - assert len(__all__) == 22 + assert len(__all__) == 23 def test_imports(self) -> None: """All tool functions should be importable.""" @@ -748,6 +748,202 @@ async def test_get_workstream_activities(self) -> None: assert ws["action_items"][0]["column_id"] == "15" assert ws["action_items"][0]["column_name"] == "Interlocks" assert len(ws["action_items"][0]["comments"]) == 1 + assert "columns" in ws + assert ws["columns"] == [{"column_id": "15", "column_name": "Interlocks"}] + + @pytest.mark.asyncio + async def test_get_workstream_activities_multiple_columns(self) -> None: + """get_workstream_activities should deduplicate columns from action items.""" + from mcp_workboard_crunchtools.tools import get_workstream_activities + + resp = _mock_response( + json_data={ + "data": { + "totalCount": 1, + "workstream": { + "ws_id": "200", + "ws_name": "Roadmap", + "ws_objective": "", + "ws_owner": "1", + "ws_lead": "2", + "ws_status": "active", + "ws_type": "team", + "ws_pace": "steady", + "ws_health": "good", + "ws_priority": "p1", + "ws_progress": "0", + "ws_start_date": "2026-01-01", + "ws_target_date": "2026-06-30", + "ws_completion_date": None, + "ws_team_id": "10", + "ws_team_name": "Eng", + "ws_activity": { + "activity": [ + { + "ai_id": "601", + "ai_description": "Task A", + "ai_state": "doing", + "ai_column": {"id": "50", "name": "Planned"}, + }, + { + "ai_id": "602", + "ai_description": "Task B", + "ai_state": "doing", + "ai_column": {"id": "51", "name": "In Progress"}, + }, + { + "ai_id": "603", + "ai_description": "Task C", + "ai_state": "doing", + "ai_column": {"id": "50", "name": "Planned"}, + }, + ], + "activity_count": 3, + }, + }, + }, + } + ) + + with _patch_client(resp): + result = await get_workstream_activities(ws_id=200) + + ws = result["workstream"] + assert len(ws["action_items"]) == 3 + assert "columns" in ws + assert len(ws["columns"]) == 2 + column_ids = {c["column_id"] for c in ws["columns"]} + assert column_ids == {"50", "51"} + + @pytest.mark.asyncio + async def test_get_workstream_activities_no_columns(self) -> None: + """Workstreams without custom columns should not include a columns key.""" + from mcp_workboard_crunchtools.tools import get_workstream_activities + + resp = _mock_response( + json_data={ + "data": { + "totalCount": 1, + "workstream": { + "ws_id": "300", + "ws_name": "Plain Board", + "ws_objective": "", + "ws_owner": "1", + "ws_lead": "2", + "ws_status": "active", + "ws_type": "team", + "ws_pace": "steady", + "ws_health": "good", + "ws_priority": "p2", + "ws_progress": "0", + "ws_start_date": "2026-01-01", + "ws_target_date": "2026-06-30", + "ws_completion_date": None, + "ws_team_id": "10", + "ws_team_name": "Eng", + "ws_activity": { + "activity": [ + { + "ai_id": "701", + "ai_description": "No column task", + "ai_state": "next", + }, + ], + "activity_count": 1, + }, + }, + }, + } + ) + + with _patch_client(resp): + result = await get_workstream_activities(ws_id=300) + + ws = result["workstream"] + assert len(ws["action_items"]) == 1 + assert "columns" not in ws + + @pytest.mark.asyncio + async def test_get_workstream_columns(self) -> None: + """get_workstream_columns should return deduplicated column list.""" + from mcp_workboard_crunchtools.tools import get_workstream_columns + + resp = _mock_response( + json_data={ + "data": { + "totalCount": 1, + "workstream": { + "ws_id": "400", + "ws_name": "Kanban Board", + "ws_activity": { + "activity": [ + { + "ai_id": "801", + "ai_description": "Card A", + "ai_state": "doing", + "ai_column": {"id": "60", "name": "Planned"}, + }, + { + "ai_id": "802", + "ai_description": "Card B", + "ai_state": "doing", + "ai_column": {"id": "61", "name": "In Progress"}, + }, + { + "ai_id": "803", + "ai_description": "Card C", + "ai_state": "doing", + "ai_column": {"id": "60", "name": "Planned"}, + }, + ], + "activity_count": 3, + }, + }, + }, + } + ) + + with _patch_client(resp): + result = await get_workstream_columns(ws_id=400) + + assert result["ws_id"] == "400" + assert result["name"] == "Kanban Board" + assert len(result["columns"]) == 2 + column_ids = {c["column_id"] for c in result["columns"]} + assert column_ids == {"60", "61"} + + @pytest.mark.asyncio + async def test_get_workstream_columns_none(self) -> None: + """get_workstream_columns with no custom columns returns empty list.""" + from mcp_workboard_crunchtools.tools import get_workstream_columns + + resp = _mock_response( + json_data={ + "data": { + "totalCount": 1, + "workstream": { + "ws_id": "500", + "ws_name": "Plain Board", + "ws_activity": { + "activity": [ + { + "ai_id": "901", + "ai_description": "Task", + "ai_state": "next", + }, + ], + "activity_count": 1, + }, + }, + }, + } + ) + + with _patch_client(resp): + result = await get_workstream_columns(ws_id=500) + + assert result["ws_id"] == "500" + assert result["columns"] == [] @pytest.mark.asyncio async def test_get_team_workstreams(self) -> None: