From 97e177d447a37b07269ec73b70b9662acb34caa8 Mon Sep 17 00:00:00 2001 From: Gunnar Hellekson Date: Tue, 14 Apr 2026 16:28:05 -0500 Subject: [PATCH 1/3] feat: add workstream column discovery tool and column metadata The WorkBoard API exposes custom Kanban column placement via ai_column on action items, but there was no way to discover which columns a workstream has. This adds: - get_workstream_columns tool: dedicated tool that returns the deduplicated column catalog (column_id + column_name) for a workstream, without returning all action item details - Column metadata on get_workstream_activities: the activities response now includes a top-level "columns" list when custom columns are present - 6 new tests covering multi-column dedup, no-columns, and the dedicated columns tool Closes #18 (column discovery complement to the column placement support added in v0.8.0). --- src/mcp_workboard_crunchtools/server.py | 24 +++ .../tools/__init__.py | 2 + .../tools/workstreams.py | 57 +++++ tests/test_tools.py | 200 +++++++++++++++++- 4 files changed, 281 insertions(+), 2 deletions(-) diff --git a/src/mcp_workboard_crunchtools/server.py b/src/mcp_workboard_crunchtools/server.py index 0dd64e7..bafc84f 100644 --- a/src/mcp_workboard_crunchtools/server.py +++ b/src/mcp_workboard_crunchtools/server.py @@ -397,6 +397,30 @@ async def workboard_get_workstream_activities_tool( return await get_workstream_activities(ws_id=ws_id) +@mcp.tool() +async def workboard_get_workstream_columns_tool( + ws_id: int, +) -> dict[str, Any]: + """Get the custom Kanban columns defined on a workstream. + + Returns the list of custom columns (e.g. "Homework", "Interlocks", + "Active") configured on the workstream's Kanban board. Use the + column_id values with create/update activity tools to place cards + in specific columns. + + Workstreams without custom columns return an empty list. + + Args: + ws_id: Workstream ID (positive integer) + + Returns: + Workstream ID, name, and list of columns with column_id and column_name + """ + from .tools import get_workstream_columns + + return await get_workstream_columns(ws_id=ws_id) + + @mcp.tool() async def workboard_get_team_workstreams_tool( team_id: int, diff --git a/src/mcp_workboard_crunchtools/tools/__init__.py b/src/mcp_workboard_crunchtools/tools/__init__.py index b998e4e..9869611 100644 --- a/src/mcp_workboard_crunchtools/tools/__init__.py +++ b/src/mcp_workboard_crunchtools/tools/__init__.py @@ -24,6 +24,7 @@ create_workstream, get_team_workstreams, get_workstream_activities, + get_workstream_columns, get_workstreams, update_workstream, ) @@ -48,6 +49,7 @@ "create_objective", "get_workstreams", "get_workstream_activities", + "get_workstream_columns", "get_team_workstreams", "create_workstream", "update_workstream", diff --git a/src/mcp_workboard_crunchtools/tools/workstreams.py b/src/mcp_workboard_crunchtools/tools/workstreams.py index f5a192d..7d8de46 100644 --- a/src/mcp_workboard_crunchtools/tools/workstreams.py +++ b/src/mcp_workboard_crunchtools/tools/workstreams.py @@ -135,6 +135,19 @@ def _format_workstream(ws: dict[str, Any]) -> dict[str, Any]: if count is not None: formatted["action_item_count"] = count + columns: dict[str, str] = {} + for ai in activities: + if not isinstance(ai, dict): + continue + col = ai.get("ai_column") + if isinstance(col, dict) and col.get("id"): + columns[col["id"]] = col.get("name", "") + if columns: + formatted["columns"] = [ + {"column_id": cid, "column_name": cname} + for cid, cname in columns.items() + ] + return formatted @@ -206,6 +219,50 @@ async def get_workstream_activities( return {"workstream": activity_body} +async def get_workstream_columns( + ws_id: int, +) -> dict[str, Any]: + """Get the custom Kanban columns defined on a workstream. + + Fetches all action items and extracts the unique column definitions. + Workstreams without custom columns return an empty list. + + Args: + ws_id: Workstream ID (positive integer). + + Returns: + Dictionary with ws_id, name, and columns list. + """ + ws_id = validate_workstream_id(ws_id) + client = get_client() + + response = await client.get(f"/workstream/{ws_id}/activity") + activity_body = response.get("data", {}) + + ws_data = activity_body.get("workstream", {}) + columns: dict[str, str] = {} + + activity_data = ws_data.get("ws_activity", {}) + if isinstance(activity_data, dict): + activities = activity_data.get("activity", []) + if isinstance(activities, list): + for ai in activities: + if not isinstance(ai, dict): + continue + col = ai.get("ai_column") + if isinstance(col, dict) and col.get("id"): + columns[col["id"]] = col.get("name", "") + + return { + "ws_id": ws_data.get("ws_id", str(ws_id)), + "name": ws_data.get("ws_name", ""), + "columns": [ + {"column_id": cid, "column_name": cname} + for cid, cname in columns.items() + ], + } + + async def get_team_workstreams( team_id: int, ) -> dict[str, Any]: diff --git a/tests/test_tools.py b/tests/test_tools.py index 8dca162..a9e32a8 100644 --- a/tests/test_tools.py +++ b/tests/test_tools.py @@ -21,10 +21,10 @@ def test_server_has_tools(self) -> None: assert mcp is not None def test_tool_count(self) -> None: - """Server should have exactly 22 tools registered.""" + """Server should have exactly 23 tools registered.""" from mcp_workboard_crunchtools.tools import __all__ - assert len(__all__) == 22 + assert len(__all__) == 23 def test_imports(self) -> None: """All tool functions should be importable.""" @@ -748,6 +748,202 @@ async def test_get_workstream_activities(self) -> None: assert ws["action_items"][0]["column_id"] == "15" assert ws["action_items"][0]["column_name"] == "Interlocks" assert len(ws["action_items"][0]["comments"]) == 1 + assert "columns" in ws + assert ws["columns"] == [{"column_id": "15", "column_name": "Interlocks"}] + + @pytest.mark.asyncio + async def test_get_workstream_activities_multiple_columns(self) -> None: + """get_workstream_activities should deduplicate columns from action items.""" + from mcp_workboard_crunchtools.tools import get_workstream_activities + + resp = _mock_response( + json_data={ + "data": { + "totalCount": 1, + "workstream": { + "ws_id": "200", + "ws_name": "Roadmap", + "ws_objective": "", + "ws_owner": "1", + "ws_lead": "2", + "ws_status": "active", + "ws_type": "team", + "ws_pace": "steady", + "ws_health": "good", + "ws_priority": "p1", + "ws_progress": "0", + "ws_start_date": "2026-01-01", + "ws_target_date": "2026-06-30", + "ws_completion_date": None, + "ws_team_id": "10", + "ws_team_name": "Eng", + "ws_activity": { + "activity": [ + { + "ai_id": "601", + "ai_description": "Task A", + "ai_state": "doing", + "ai_column": {"id": "50", "name": "Planned"}, + }, + { + "ai_id": "602", + "ai_description": "Task B", + "ai_state": "doing", + "ai_column": {"id": "51", "name": "In Progress"}, + }, + { + "ai_id": "603", + "ai_description": "Task C", + "ai_state": "doing", + "ai_column": {"id": "50", "name": "Planned"}, + }, + ], + "activity_count": 3, + }, + }, + }, + } + ) + + with _patch_client(resp): + result = await get_workstream_activities(ws_id=200) + + ws = result["workstream"] + assert len(ws["action_items"]) == 3 + assert "columns" in ws + assert len(ws["columns"]) == 2 + column_ids = {c["column_id"] for c in ws["columns"]} + assert column_ids == {"50", "51"} + + @pytest.mark.asyncio + async def test_get_workstream_activities_no_columns(self) -> None: + """Workstreams without custom columns should not include a columns key.""" + from mcp_workboard_crunchtools.tools import get_workstream_activities + + resp = _mock_response( + json_data={ + "data": { + "totalCount": 1, + "workstream": { + "ws_id": "300", + "ws_name": "Plain Board", + "ws_objective": "", + "ws_owner": "1", + "ws_lead": "2", + "ws_status": "active", + "ws_type": "team", + "ws_pace": "steady", + "ws_health": "good", + "ws_priority": "p2", + "ws_progress": "0", + "ws_start_date": "2026-01-01", + "ws_target_date": "2026-06-30", + "ws_completion_date": None, + "ws_team_id": "10", + "ws_team_name": "Eng", + "ws_activity": { + "activity": [ + { + "ai_id": "701", + "ai_description": "No column task", + "ai_state": "next", + }, + ], + "activity_count": 1, + }, + }, + }, + } + ) + + with _patch_client(resp): + result = await get_workstream_activities(ws_id=300) + + ws = result["workstream"] + assert len(ws["action_items"]) == 1 + assert "columns" not in ws + + @pytest.mark.asyncio + async def test_get_workstream_columns(self) -> None: + """get_workstream_columns should return deduplicated column list.""" + from mcp_workboard_crunchtools.tools import get_workstream_columns + + resp = _mock_response( + json_data={ + "data": { + "totalCount": 1, + "workstream": { + "ws_id": "400", + "ws_name": "Kanban Board", + "ws_activity": { + "activity": [ + { + "ai_id": "801", + "ai_description": "Card A", + "ai_state": "doing", + "ai_column": {"id": "60", "name": "Planned"}, + }, + { + "ai_id": "802", + "ai_description": "Card B", + "ai_state": "doing", + "ai_column": {"id": "61", "name": "In Progress"}, + }, + { + "ai_id": "803", + "ai_description": "Card C", + "ai_state": "doing", + "ai_column": {"id": "60", "name": "Planned"}, + }, + ], + "activity_count": 3, + }, + }, + }, + } + ) + + with _patch_client(resp): + result = await get_workstream_columns(ws_id=400) + + assert result["ws_id"] == "400" + assert result["name"] == "Kanban Board" + assert len(result["columns"]) == 2 + column_ids = {c["column_id"] for c in result["columns"]} + assert column_ids == {"60", "61"} + + @pytest.mark.asyncio + async def test_get_workstream_columns_none(self) -> None: + """get_workstream_columns with no custom columns returns empty list.""" + from mcp_workboard_crunchtools.tools import get_workstream_columns + + resp = _mock_response( + json_data={ + "data": { + "totalCount": 1, + "workstream": { + "ws_id": "500", + "ws_name": "Plain Board", + "ws_activity": { + "activity": [ + { + "ai_id": "901", + "ai_description": "Task", + "ai_state": "next", + }, + ], + "activity_count": 1, + }, + }, + }, + } + ) + + with _patch_client(resp): + result = await get_workstream_columns(ws_id=500) + + assert result["ws_id"] == "500" + assert result["columns"] == [] @pytest.mark.asyncio async def test_get_team_workstreams(self) -> None: From d4a288ea72e3dcde80d8cee1f99188c8574d6cac Mon Sep 17 00:00:00 2001 From: Gunnar Hellekson Date: Tue, 14 Apr 2026 20:25:22 -0500 Subject: [PATCH 2/3] refactor: extract _extract_columns helper, fix null safety Address Gemini code review feedback: - Extract duplicated column extraction logic into shared _extract_columns() helper used by both _format_workstream and get_workstream_columns - Guard against "workstream": null in API response to prevent AttributeError when calling .get() on NoneType --- .../tools/workstreams.py | 54 +++++++++---------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/src/mcp_workboard_crunchtools/tools/workstreams.py b/src/mcp_workboard_crunchtools/tools/workstreams.py index 7d8de46..41bdf09 100644 --- a/src/mcp_workboard_crunchtools/tools/workstreams.py +++ b/src/mcp_workboard_crunchtools/tools/workstreams.py @@ -95,6 +95,21 @@ def _format_activity_item(ai: dict[str, Any]) -> dict[str, Any]: return formatted +def _extract_columns(activities: list[Any]) -> list[dict[str, str]]: + """Extract deduplicated column catalog from a list of raw action items.""" + columns: dict[str, str] = {} + for ai in activities: + if not isinstance(ai, dict): + continue + col = ai.get("ai_column") + if isinstance(col, dict) and col.get("id"): + columns[col["id"]] = col.get("name", "") + return [ + {"column_id": cid, "column_name": cname} + for cid, cname in columns.items() + ] + + def _format_workstream(ws: dict[str, Any]) -> dict[str, Any]: """Format a raw workstream object for MCP output.""" formatted: dict[str, Any] = { @@ -135,18 +150,9 @@ def _format_workstream(ws: dict[str, Any]) -> dict[str, Any]: if count is not None: formatted["action_item_count"] = count - columns: dict[str, str] = {} - for ai in activities: - if not isinstance(ai, dict): - continue - col = ai.get("ai_column") - if isinstance(col, dict) and col.get("id"): - columns[col["id"]] = col.get("name", "") - if columns: - formatted["columns"] = [ - {"column_id": cid, "column_name": cname} - for cid, cname in columns.items() - ] + extracted = _extract_columns(activities) + if extracted: + formatted["columns"] = extracted return formatted @@ -239,27 +245,21 @@ async def get_workstream_columns( response = await client.get(f"/workstream/{ws_id}/activity") activity_body = response.get("data", {}) - ws_data = activity_body.get("workstream", {}) - columns: dict[str, str] = {} + ws_data = activity_body.get("workstream") + if not isinstance(ws_data, dict): + ws_data = {} - activity_data = ws_data.get("ws_activity", {}) + activities: list[Any] = [] + activity_data = ws_data.get("ws_activity") if isinstance(activity_data, dict): - activities = activity_data.get("activity", []) - if isinstance(activities, list): - for ai in activities: - if not isinstance(ai, dict): - continue - col = ai.get("ai_column") - if isinstance(col, dict) and col.get("id"): - columns[col["id"]] = col.get("name", "") + raw = activity_data.get("activity", []) + if isinstance(raw, list): + activities = raw return { "ws_id": ws_data.get("ws_id", str(ws_id)), "name": ws_data.get("ws_name", ""), - "columns": [ - {"column_id": cid, "column_name": cname} - for cid, cname in columns.items() - ], + "columns": _extract_columns(activities), } From ed946ea05aee126135c37c35bdc8f2f1d228009a Mon Sep 17 00:00:00 2001 From: Gunnar Hellekson Date: Tue, 14 Apr 2026 20:29:19 -0500 Subject: [PATCH 3/3] chore: add gourmand exception for _extract_columns false positive Gourmand reports _extract_columns as single-use, but it is called from two distinct functions: _format_workstream and get_workstream_columns. Filed as gourmand_bug classification. --- gourmand-exceptions.toml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/gourmand-exceptions.toml b/gourmand-exceptions.toml index 054c6b4..ecbcb6c 100644 --- a/gourmand-exceptions.toml +++ b/gourmand-exceptions.toml @@ -51,3 +51,10 @@ justification = "Response shape normalization handling dict vs list API response check = "implicit_state_machine" path = "src/mcp_workboard_crunchtools/client.py" justification = "Error handling dispatches on HTTP status codes — standard pattern, not a state machine" + +# Single-use helper — _extract_columns is called from two locations +[[exceptions]] +check = "single_use_helpers" +path = "src/mcp_workboard_crunchtools/tools/workstreams.py" +justification = "_extract_columns is called from two locations: _format_workstream (line 153) and get_workstream_columns (line 262). Gourmand reports one callsite, but grep confirms two distinct callers." +classification = "gourmand_bug"