Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions gourmand-exceptions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,10 @@ justification = "Response shape normalization handling dict vs list API response
check = "implicit_state_machine"
path = "src/mcp_workboard_crunchtools/client.py"
justification = "Error handling dispatches on HTTP status codes — standard pattern, not a state machine"

# Single-use helper — _extract_columns is called from two locations
[[exceptions]]
check = "single_use_helpers"
path = "src/mcp_workboard_crunchtools/tools/workstreams.py"
justification = "_extract_columns is called from two locations: _format_workstream (line 153) and get_workstream_columns (line 262). Gourmand reports one callsite, but grep confirms two distinct callers."
classification = "gourmand_bug"
24 changes: 24 additions & 0 deletions src/mcp_workboard_crunchtools/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,30 @@ async def workboard_get_workstream_activities_tool(
return await get_workstream_activities(ws_id=ws_id)


@mcp.tool()
async def workboard_get_workstream_columns_tool(
ws_id: int,
) -> dict[str, Any]:
"""Get the custom Kanban columns defined on a workstream.

Returns the list of custom columns (e.g. "Homework", "Interlocks",
"Active") configured on the workstream's Kanban board. Use the
column_id values with create/update activity tools to place cards
in specific columns.

Workstreams without custom columns return an empty list.

Args:
ws_id: Workstream ID (positive integer)

Returns:
Workstream ID, name, and list of columns with column_id and column_name
"""
from .tools import get_workstream_columns

return await get_workstream_columns(ws_id=ws_id)


@mcp.tool()
async def workboard_get_team_workstreams_tool(
team_id: int,
Expand Down
2 changes: 2 additions & 0 deletions src/mcp_workboard_crunchtools/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
create_workstream,
get_team_workstreams,
get_workstream_activities,
get_workstream_columns,
get_workstreams,
update_workstream,
)
Expand All @@ -48,6 +49,7 @@
"create_objective",
"get_workstreams",
"get_workstream_activities",
"get_workstream_columns",
"get_team_workstreams",
"create_workstream",
"update_workstream",
Expand Down
57 changes: 57 additions & 0 deletions src/mcp_workboard_crunchtools/tools/workstreams.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,21 @@ def _format_activity_item(ai: dict[str, Any]) -> dict[str, Any]:
return formatted


def _extract_columns(activities: list[Any]) -> list[dict[str, str]]:
"""Extract deduplicated column catalog from a list of raw action items."""
columns: dict[str, str] = {}
for ai in activities:
if not isinstance(ai, dict):
continue
col = ai.get("ai_column")
if isinstance(col, dict) and col.get("id"):
columns[col["id"]] = col.get("name", "")
return [
{"column_id": cid, "column_name": cname}
for cid, cname in columns.items()
]


def _format_workstream(ws: dict[str, Any]) -> dict[str, Any]:
"""Format a raw workstream object for MCP output."""
formatted: dict[str, Any] = {
Expand Down Expand Up @@ -135,6 +150,10 @@ def _format_workstream(ws: dict[str, Any]) -> dict[str, Any]:
if count is not None:
formatted["action_item_count"] = count

extracted = _extract_columns(activities)
if extracted:
formatted["columns"] = extracted

return formatted


Expand Down Expand Up @@ -206,6 +225,44 @@ async def get_workstream_activities(
return {"workstream": activity_body}


async def get_workstream_columns(
ws_id: int,
) -> dict[str, Any]:
"""Get the custom Kanban columns defined on a workstream.

Fetches all action items and extracts the unique column definitions.
Workstreams without custom columns return an empty list.

Args:
ws_id: Workstream ID (positive integer).

Returns:
Dictionary with ws_id, name, and columns list.
"""
ws_id = validate_workstream_id(ws_id)
client = get_client()

response = await client.get(f"/workstream/{ws_id}/activity")
activity_body = response.get("data", {})

ws_data = activity_body.get("workstream")
if not isinstance(ws_data, dict):
ws_data = {}

activities: list[Any] = []
activity_data = ws_data.get("ws_activity")
if isinstance(activity_data, dict):
raw = activity_data.get("activity", [])
if isinstance(raw, list):
activities = raw

return {
"ws_id": ws_data.get("ws_id", str(ws_id)),
"name": ws_data.get("ws_name", ""),
"columns": _extract_columns(activities),
}


async def get_team_workstreams(
team_id: int,
) -> dict[str, Any]:
Expand Down
200 changes: 198 additions & 2 deletions tests/test_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ def test_server_has_tools(self) -> None:
assert mcp is not None

def test_tool_count(self) -> None:
"""Server should have exactly 22 tools registered."""
"""Server should have exactly 23 tools registered."""
from mcp_workboard_crunchtools.tools import __all__

assert len(__all__) == 22
assert len(__all__) == 23

def test_imports(self) -> None:
"""All tool functions should be importable."""
Expand Down Expand Up @@ -748,6 +748,202 @@ async def test_get_workstream_activities(self) -> None:
assert ws["action_items"][0]["column_id"] == "15"
assert ws["action_items"][0]["column_name"] == "Interlocks"
assert len(ws["action_items"][0]["comments"]) == 1
assert "columns" in ws
assert ws["columns"] == [{"column_id": "15", "column_name": "Interlocks"}]

@pytest.mark.asyncio
async def test_get_workstream_activities_multiple_columns(self) -> None:
"""get_workstream_activities should deduplicate columns from action items."""
from mcp_workboard_crunchtools.tools import get_workstream_activities

resp = _mock_response(
json_data={
"data": {
"totalCount": 1,
"workstream": {
"ws_id": "200",
"ws_name": "Roadmap",
"ws_objective": "",
"ws_owner": "1",
"ws_lead": "2",
"ws_status": "active",
"ws_type": "team",
"ws_pace": "steady",
"ws_health": "good",
"ws_priority": "p1",
"ws_progress": "0",
"ws_start_date": "2026-01-01",
"ws_target_date": "2026-06-30",
"ws_completion_date": None,
"ws_team_id": "10",
"ws_team_name": "Eng",
"ws_activity": {
"activity": [
{
"ai_id": "601",
"ai_description": "Task A",
"ai_state": "doing",
"ai_column": {"id": "50", "name": "Planned"},
},
{
"ai_id": "602",
"ai_description": "Task B",
"ai_state": "doing",
"ai_column": {"id": "51", "name": "In Progress"},
},
{
"ai_id": "603",
"ai_description": "Task C",
"ai_state": "doing",
"ai_column": {"id": "50", "name": "Planned"},
},
],
"activity_count": 3,
},
},
},
}
)

with _patch_client(resp):
result = await get_workstream_activities(ws_id=200)

ws = result["workstream"]
assert len(ws["action_items"]) == 3
assert "columns" in ws
assert len(ws["columns"]) == 2
column_ids = {c["column_id"] for c in ws["columns"]}
assert column_ids == {"50", "51"}

@pytest.mark.asyncio
async def test_get_workstream_activities_no_columns(self) -> None:
"""Workstreams without custom columns should not include a columns key."""
from mcp_workboard_crunchtools.tools import get_workstream_activities

resp = _mock_response(
json_data={
"data": {
"totalCount": 1,
"workstream": {
"ws_id": "300",
"ws_name": "Plain Board",
"ws_objective": "",
"ws_owner": "1",
"ws_lead": "2",
"ws_status": "active",
"ws_type": "team",
"ws_pace": "steady",
"ws_health": "good",
"ws_priority": "p2",
"ws_progress": "0",
"ws_start_date": "2026-01-01",
"ws_target_date": "2026-06-30",
"ws_completion_date": None,
"ws_team_id": "10",
"ws_team_name": "Eng",
"ws_activity": {
"activity": [
{
"ai_id": "701",
"ai_description": "No column task",
"ai_state": "next",
},
],
"activity_count": 1,
},
},
},
}
)

with _patch_client(resp):
result = await get_workstream_activities(ws_id=300)

ws = result["workstream"]
assert len(ws["action_items"]) == 1
assert "columns" not in ws

@pytest.mark.asyncio
async def test_get_workstream_columns(self) -> None:
"""get_workstream_columns should return deduplicated column list."""
from mcp_workboard_crunchtools.tools import get_workstream_columns

resp = _mock_response(
json_data={
"data": {
"totalCount": 1,
"workstream": {
"ws_id": "400",
"ws_name": "Kanban Board",
"ws_activity": {
"activity": [
{
"ai_id": "801",
"ai_description": "Card A",
"ai_state": "doing",
"ai_column": {"id": "60", "name": "Planned"},
},
{
"ai_id": "802",
"ai_description": "Card B",
"ai_state": "doing",
"ai_column": {"id": "61", "name": "In Progress"},
},
{
"ai_id": "803",
"ai_description": "Card C",
"ai_state": "doing",
"ai_column": {"id": "60", "name": "Planned"},
},
],
"activity_count": 3,
},
},
},
}
)

with _patch_client(resp):
result = await get_workstream_columns(ws_id=400)

assert result["ws_id"] == "400"
assert result["name"] == "Kanban Board"
assert len(result["columns"]) == 2
column_ids = {c["column_id"] for c in result["columns"]}
assert column_ids == {"60", "61"}

@pytest.mark.asyncio
async def test_get_workstream_columns_none(self) -> None:
"""get_workstream_columns with no custom columns returns empty list."""
from mcp_workboard_crunchtools.tools import get_workstream_columns

resp = _mock_response(
json_data={
"data": {
"totalCount": 1,
"workstream": {
"ws_id": "500",
"ws_name": "Plain Board",
"ws_activity": {
"activity": [
{
"ai_id": "901",
"ai_description": "Task",
"ai_state": "next",
},
],
"activity_count": 1,
},
},
},
}
)

with _patch_client(resp):
result = await get_workstream_columns(ws_id=500)

assert result["ws_id"] == "500"
assert result["columns"] == []

@pytest.mark.asyncio
async def test_get_team_workstreams(self) -> None:
Expand Down
Loading