From 6e86b02dfa8717d925c556494167d3f403370b33 Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Wed, 18 Mar 2026 16:49:04 +0000
Subject: [PATCH 01/11] Initial plan
From 41588a58374690c872b5900cef6ad39bb16c0036 Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Wed, 18 Mar 2026 16:57:56 +0000
Subject: [PATCH 02/11] Add update_column_colors tool for setting
text/background colors on RDL column cells
Co-authored-by: urjeetpatel <1077440+urjeetpatel@users.noreply.github.com>
---
rdl_mcp/columns.py | 90 ++++++++++++++++++++++++
rdl_mcp/server.py | 24 +++++++
tests/test_rdl_mcp_server.py | 128 +++++++++++++++++++++++++++++++++++
3 files changed, 242 insertions(+)
diff --git a/rdl_mcp/columns.py b/rdl_mcp/columns.py
index 664b960..f24b11c 100644
--- a/rdl_mcp/columns.py
+++ b/rdl_mcp/columns.py
@@ -140,6 +140,96 @@ def remove_column(filepath: str, column_index: int,
return {'success': True, 'message': f'Removed column at index {column_index}'}
+def update_column_colors(filepath: str, column_index: int,
+ text_color: Optional[str] = None,
+ background_color: Optional[str] = None,
+ header_text_color: Optional[str] = None,
+ header_background_color: Optional[str] = None) -> Dict[str, Any]:
+ """Update text and background colors for a column's data and/or header cells.
+
+ Args:
+ filepath: Path to the RDL file
+ column_index: Zero-based index of the column to update
+ text_color: Text color for data cells (e.g., 'Red', '#FF0000')
+ background_color: Background color for data cells
+ header_text_color: Text color for header cell
+ header_background_color: Background color for header cell
+ """
+ colors = (text_color, background_color, header_text_color, header_background_color)
+ if all(c is None for c in colors):
+ return {'success': False, 'message': 'At least one color parameter must be provided'}
+
+ tree = parse_rdl_tree(filepath)
+ root = tree.getroot()
+ ns = get_namespace(root)
+
+ tablix = root.find(f'.//{ns}Tablix')
+ if tablix is None:
+ return {'success': False, 'message': 'No Tablix found in report'}
+
+ tablix_rows = tablix.findall(f'.//{ns}TablixBody/{ns}TablixRows/{ns}TablixRow')
+
+ updated_rows = []
+
+ for row in tablix_rows:
+ cells = row.findall(f'{ns}TablixCells/{ns}TablixCell')
+ row_type = _detect_row_type(cells, ns)
+
+ if row_type not in ('header', 'data'):
+ continue
+
+ if row_type == 'data':
+ t_color = text_color
+ bg_color = background_color
+ else:
+ t_color = header_text_color
+ bg_color = header_background_color
+
+ if t_color is None and bg_color is None:
+ continue
+
+ if column_index >= len(cells):
+ return {'success': False, 'message': f'Column index {column_index} out of range'}
+
+ cell = cells[column_index]
+ textbox = cell.find(f'.//{ns}Textbox')
+ if textbox is None:
+ continue
+
+ # Find or create Style element as a direct child of Textbox
+ style = textbox.find(f'{ns}Style')
+ if style is None:
+ style = ET.Element(f'{ns}Style')
+ textbox.insert(0, style)
+
+ if t_color is not None:
+ color_elem = style.find(f'{ns}Color')
+ if color_elem is None:
+ color_elem = ET.SubElement(style, f'{ns}Color')
+ color_elem.text = t_color
+
+ if bg_color is not None:
+ bg_elem = style.find(f'{ns}BackgroundColor')
+ if bg_elem is None:
+ bg_elem = ET.SubElement(style, f'{ns}BackgroundColor')
+ bg_elem.text = bg_color
+
+ updated_rows.append(row_type)
+
+ if not updated_rows:
+ return {'success': False, 'message': 'No matching rows found to update'}
+
+ write_xml(tree, filepath)
+
+ parts = []
+ if text_color is not None or background_color is not None:
+ parts.append('data row colors updated')
+ if header_text_color is not None or header_background_color is not None:
+ parts.append('header row colors updated')
+
+ return {'success': True, 'message': f'Column {column_index} colors updated: {", ".join(parts)}'}
+
+
def update_column_format(filepath: str, column_index: int, format_string: str) -> Dict[str, Any]:
"""Update the format string for a column."""
tree = parse_rdl_tree(filepath)
diff --git a/rdl_mcp/server.py b/rdl_mcp/server.py
index 5bc0225..54b5459 100644
--- a/rdl_mcp/server.py
+++ b/rdl_mcp/server.py
@@ -58,6 +58,10 @@ def register_tools(self):
'function': self.update_column_format,
'description': 'Update the format string for a column'
},
+ 'update_column_colors': {
+ 'function': self.update_column_colors,
+ 'description': 'Update text and background colors for a column\'s data and/or header cells'
+ },
'add_column': {
'function': self.add_column,
'description': 'Add a new column to the report table'
@@ -240,6 +244,18 @@ def _get_tool_schema(self, tool_name: str) -> Dict[str, Any]:
},
'required': ['filepath', 'column_index', 'format_string']
},
+ 'update_column_colors': {
+ 'type': 'object',
+ 'properties': {
+ 'filepath': {'type': 'string'},
+ 'column_index': {'type': 'integer'},
+ 'text_color': {'type': 'string', 'description': 'Text color for data cells (e.g., "Red", "#FF0000")'},
+ 'background_color': {'type': 'string', 'description': 'Background color for data cells'},
+ 'header_text_color': {'type': 'string', 'description': 'Text color for header cell'},
+ 'header_background_color': {'type': 'string', 'description': 'Background color for header cell'}
+ },
+ 'required': ['filepath', 'column_index']
+ },
'add_column': {
'type': 'object',
'properties': {
@@ -345,6 +361,14 @@ def update_column_width(self, filepath: str, column_index: int, new_width: str)
def update_column_format(self, filepath: str, column_index: int, format_string: str) -> Dict[str, Any]:
return columns.update_column_format(filepath, column_index, format_string)
+ def update_column_colors(self, filepath: str, column_index: int,
+ text_color: Optional[str] = None,
+ background_color: Optional[str] = None,
+ header_text_color: Optional[str] = None,
+ header_background_color: Optional[str] = None) -> Dict[str, Any]:
+ return columns.update_column_colors(filepath, column_index, text_color, background_color,
+ header_text_color, header_background_color)
+
def add_column(self, filepath: str, column_index: int, header_text: str,
field_binding: str, width: str = "1in",
format_string: Optional[str] = None,
diff --git a/tests/test_rdl_mcp_server.py b/tests/test_rdl_mcp_server.py
index b9cae03..d56f80c 100644
--- a/tests/test_rdl_mcp_server.py
+++ b/tests/test_rdl_mcp_server.py
@@ -567,6 +567,134 @@ def test_remove_column_without_auto_adjust(self, server, temp_report):
assert page_width.text == original_page_width
+class TestUpdateColumnColors:
+ """Tests for update_column_colors operation."""
+
+ RDL_NS = '{http://schemas.microsoft.com/sqlserver/reporting/2016/01/reportdefinition}'
+
+ def _parse_report(self, filepath: str):
+ """Parse the RDL file and return (root, ns) for assertions."""
+ import xml.etree.ElementTree as ET
+ root = ET.parse(filepath).getroot()
+ return root, self.RDL_NS
+
+ def test_update_data_text_color(self, server, temp_report):
+ result = server.update_column_colors(temp_report, 0, text_color='Red')
+
+ assert result['success'] == True
+
+ root, ns = self._parse_report(temp_report)
+ textbox = root.find(f'.//{ns}Textbox[@Name="DataID"]')
+ assert textbox is not None
+ style = textbox.find(f'{ns}Style')
+ assert style is not None
+ color_elem = style.find(f'{ns}Color')
+ assert color_elem is not None
+ assert color_elem.text == 'Red'
+
+ def test_update_data_background_color(self, server, temp_report):
+ result = server.update_column_colors(temp_report, 1, background_color='LightBlue')
+
+ assert result['success'] == True
+
+ root, ns = self._parse_report(temp_report)
+ textbox = root.find(f'.//{ns}Textbox[@Name="DataName"]')
+ assert textbox is not None
+ style = textbox.find(f'{ns}Style')
+ assert style is not None
+ bg_elem = style.find(f'{ns}BackgroundColor')
+ assert bg_elem is not None
+ assert bg_elem.text == 'LightBlue'
+
+ def test_update_header_text_color(self, server, temp_report):
+ result = server.update_column_colors(temp_report, 2, header_text_color='White')
+
+ assert result['success'] == True
+
+ root, ns = self._parse_report(temp_report)
+ textbox = root.find(f'.//{ns}Textbox[@Name="HeaderAmount"]')
+ assert textbox is not None
+ style = textbox.find(f'{ns}Style')
+ assert style is not None
+ color_elem = style.find(f'{ns}Color')
+ assert color_elem is not None
+ assert color_elem.text == 'White'
+
+ def test_update_header_background_color(self, server, temp_report):
+ result = server.update_column_colors(temp_report, 0, header_background_color='DarkBlue')
+
+ assert result['success'] == True
+
+ root, ns = self._parse_report(temp_report)
+ textbox = root.find(f'.//{ns}Textbox[@Name="HeaderID"]')
+ assert textbox is not None
+ style = textbox.find(f'{ns}Style')
+ assert style is not None
+ bg_elem = style.find(f'{ns}BackgroundColor')
+ assert bg_elem is not None
+ assert bg_elem.text == 'DarkBlue'
+
+ def test_update_all_colors(self, server, temp_report):
+ result = server.update_column_colors(
+ temp_report, 1,
+ text_color='Black',
+ background_color='White',
+ header_text_color='White',
+ header_background_color='Navy'
+ )
+
+ assert result['success'] == True
+
+ root, ns = self._parse_report(temp_report)
+
+ # Check data cell
+ data_textbox = root.find(f'.//{ns}Textbox[@Name="DataName"]')
+ assert data_textbox is not None
+ data_style = data_textbox.find(f'{ns}Style')
+ assert data_style.find(f'{ns}Color').text == 'Black'
+ assert data_style.find(f'{ns}BackgroundColor').text == 'White'
+
+ # Check header cell
+ header_textbox = root.find(f'.//{ns}Textbox[@Name="HeaderName"]')
+ assert header_textbox is not None
+ header_style = header_textbox.find(f'{ns}Style')
+ assert header_style.find(f'{ns}Color').text == 'White'
+ assert header_style.find(f'{ns}BackgroundColor').text == 'Navy'
+
+ def test_no_colors_provided_returns_error(self, server, temp_report):
+ result = server.update_column_colors(temp_report, 0)
+
+ assert result['success'] == False
+ assert 'At least one color parameter must be provided' in result['message']
+
+ def test_invalid_column_index_returns_error(self, server, temp_report):
+ result = server.update_column_colors(temp_report, 99, text_color='Red')
+
+ assert result['success'] == False
+ assert 'out of range' in result['message']
+
+ def test_update_colors_preserves_existing_format(self, server, temp_report):
+ # First set a format on the column
+ server.update_column_format(temp_report, 2, 'C2')
+ # Then set a color
+ result = server.update_column_colors(temp_report, 2, background_color='Yellow')
+
+ assert result['success'] == True
+
+ root, ns = self._parse_report(temp_report)
+ textbox = root.find(f'.//{ns}Textbox[@Name="DataAmount"]')
+ assert textbox is not None
+ # BackgroundColor is on the Textbox-level Style
+ style = textbox.find(f'{ns}Style')
+ assert style is not None
+ assert style.find(f'{ns}BackgroundColor').text == 'Yellow'
+ # Format is on the TextRun-level Style - verify it's still there
+ text_run = textbox.find(f'.//{ns}TextRun')
+ tr_style = text_run.find(f'{ns}Style')
+ assert tr_style is not None
+ assert tr_style.find(f'{ns}Format').text == 'C2'
+
+
class TestDatasetOperations:
"""Tests for dataset field operations."""
From d71105f463102495fd7acc0ec83880a97b2940f1 Mon Sep 17 00:00:00 2001
From: Urjeet Patel <1077440+urjeetpatel@users.noreply.github.com>
Date: Wed, 18 Mar 2026 12:03:53 -0500
Subject: [PATCH 03/11] Add update_column_colors to README
Add update_column_colors to modify reports section
---
README.md | 1 +
1 file changed, 1 insertion(+)
diff --git a/README.md b/README.md
index 648590c..c0005d2 100644
--- a/README.md
+++ b/README.md
@@ -21,6 +21,7 @@ Edit SSRS reports using AI assistants instead of wrestling with 2000+ lines of X
- `update_column_header` / `update_column_width` - Change columns
- `add_column` / `remove_column` - Add or remove columns
- `update_column_format` - Change number/date formatting
+- `update_column_colors` - Change the colors for the column header and the data rows
- `update_stored_procedure` - Swap stored procedures
- `add_dataset_field` / `remove_dataset_field` - Manage dataset fields
- `add_parameter` / `update_parameter` - Manage parameters
From 19e875b63b6b0697d51fb0fa4d3d98124ab01a60 Mon Sep 17 00:00:00 2001
From: Urjeet Patel <1077440+urjeetpatel@users.noreply.github.com>
Date: Wed, 18 Mar 2026 17:20:25 +0000
Subject: [PATCH 04/11] build: update pyproject.toml and setup.py for packaging
- Added find_packages() to setup.py to ensure all packages are
included in the distribution.
---
pyproject.toml | 9 +++++----
setup.py | 3 ++-
2 files changed, 7 insertions(+), 5 deletions(-)
diff --git a/pyproject.toml b/pyproject.toml
index fd5c08e..df46986 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -8,10 +8,8 @@ version = "0.1.0"
description = "Edit SSRS reports using AI - MCP server for RDL files"
readme = "README.md"
requires-python = ">=3.8"
-license = {text = "MIT"}
-authors = [
- {name = "Beth Maloney"}
-]
+license = { text = "MIT" }
+authors = [{ name = "Beth Maloney" }]
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
@@ -29,5 +27,8 @@ Homepage = "https://github.com/bethmaloney/rdl-mcp"
Repository = "https://github.com/bethmaloney/rdl-mcp"
Issues = "https://github.com/bethmaloney/rdl-mcp/issues"
+[tool.setuptools.packages.find]
+where = ["."]
+
[project.scripts]
rdl-mcp = "rdl_mcp_server:main"
diff --git a/setup.py b/setup.py
index 0fbd050..30e9fff 100644
--- a/setup.py
+++ b/setup.py
@@ -1,4 +1,4 @@
-from setuptools import setup
+from setuptools import setup, find_packages
with open("README.md", "r", encoding="utf-8") as fh:
long_description = fh.read()
@@ -13,6 +13,7 @@
long_description_content_type="text/markdown",
url="https://github.com/bethmaloney/rdl-mcp",
py_modules=["rdl_mcp_server"],
+ packages=find_packages(),
python_requires=">=3.8",
classifiers=[
"Development Status :: 3 - Alpha",
From f6acf71e7cb7d87e3db5879de9c89e1fd8ecf88c Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Mon, 23 Mar 2026 22:30:06 +0000
Subject: [PATCH 05/11] Initial plan
From 9c163402e1961d92a506d0e9404d65999de988a2 Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Mon, 23 Mar 2026 22:37:32 +0000
Subject: [PATCH 06/11] Move to fastmcp instead of hand-rolled MCP server
protocol
Co-authored-by: urjeetpatel <1077440+urjeetpatel@users.noreply.github.com>
Agent-Logs-Url: https://github.com/urjeetpatel/rdl-mcp/sessions/9d75ca8b-56e7-4be7-9736-b91d7444d5f5
---
CLAUDE.md | 21 +-
README.md | 18 +-
pyproject.toml | 5 +-
rdl_mcp/__init__.py | 3 +-
rdl_mcp/server.py | 460 +++++++++--------------------------
rdl_mcp_server.py | 69 +-----
tests/test_rdl_mcp_server.py | 41 +++-
7 files changed, 173 insertions(+), 444 deletions(-)
diff --git a/CLAUDE.md b/CLAUDE.md
index 201b952..dc562cb 100644
--- a/CLAUDE.md
+++ b/CLAUDE.md
@@ -32,8 +32,8 @@ uvx rdl-mcp
The codebase follows a modular design with the main MCP server delegating to specialized modules:
-- **`rdl_mcp_server.py`** - Entry point with logging setup, imports `MCPServer` from package
-- **`rdl_mcp/server.py`** - Core `MCPServer` class handling JSON-RPC protocol, tool registration, and request routing
+- **`rdl_mcp_server.py`** - Entry point; calls `mcp.run()` to start the fastmcp server
+- **`rdl_mcp/server.py`** - fastmcp `mcp` instance with `@mcp.tool` decorators for all tools; also exports `MCPServer` (a thin wrapper used by tests)
- **`rdl_mcp/reader.py`** - Read-only operations: `describe_rdl_report`, `get_rdl_datasets`, `get_rdl_parameters`, `get_rdl_columns`
- **`rdl_mcp/columns.py`** - Column modifications: add/remove/update header/width/format
- **`rdl_mcp/datasets.py`** - Dataset operations: add/remove fields, update stored procedures
@@ -43,28 +43,19 @@ The codebase follows a modular design with the main MCP server delegating to spe
## Key Constraints
-- **Python standard library only** - No external dependencies beyond pytest for testing
-- **Python 3.8+ compatibility** required
+- **fastmcp** - MCP transport is provided by the `fastmcp` library (declared in `pyproject.toml`)
+- **Python 3.10+ compatibility** required
- **Tablix controls only** - Currently supports table-based reports, not Matrix or Chart controls
- **RDL 2016 namespace** - Uses `http://schemas.microsoft.com/sqlserver/reporting/2016/01/reportdefinition`
## MCP Protocol
-The server uses JSON-RPC 2.0 over stdin/stdout. Key methods:
-- `initialize` - Returns server capabilities
-- `tools/list` - Returns available tools with JSON schemas
-- `tools/call` - Executes a tool with arguments
+The server uses fastmcp for the MCP transport (stdio by default). Tool registration is done via `@mcp.tool` decorators in `rdl_mcp/server.py`. fastmcp automatically handles protocol negotiation, tool listing, and tool dispatch.
## Testing
-Tests use pytest with fixtures that create temporary RDL files. The `_create_sample_report()` function in tests generates valid minimal RDL documents for testing.
+Tests use pytest with fixtures that create temporary RDL files. The `_create_sample_report()` function in tests generates valid minimal RDL documents for testing. `TestFastMCPServer` verifies that all tools are correctly registered with the fastmcp instance.
**Requirements:**
- Write tests before implementing any changes
- All tests must pass after making changes (`python3 -m pytest tests/ -v`)
-
-## Logging
-
-Configure via environment variables:
-- `RDL_MCP_LOG_LEVEL`: DEBUG, INFO, WARNING, ERROR
-- `RDL_MCP_LOG_FILE`: Path to log file
diff --git a/README.md b/README.md
index c0005d2..0a3248b 100644
--- a/README.md
+++ b/README.md
@@ -4,7 +4,7 @@ mcp-name: io.github.bethmaloney/rdl-mcp
[](https://pypi.org/project/rdl-mcp/)
[](https://opensource.org/licenses/MIT)
-[](https://www.python.org/downloads/)
+[](https://www.python.org/downloads/)
[](https://modelcontextprotocol.io)
Edit SSRS reports using AI assistants instead of wrestling with 2000+ lines of XML. This [Model Context Protocol (MCP)](https://modelcontextprotocol.io) server gives Claude, Copilot, and other AI tools simple commands to read and modify RDL files.
@@ -31,12 +31,12 @@ Edit SSRS reports using AI assistants instead of wrestling with 2000+ lines of X
- AI sees clean JSON instead of verbose XML namespaces
- One-line commands instead of error-prone string manipulation
- Automatic validation catches errors before they break reports
-- No dependencies - just Python 3.8+ standard library
+- Built on [fastmcp](https://github.com/jlowin/fastmcp) for a robust, standards-compliant MCP transport
## Installation
**Requirements:**
-- Python 3.8 or higher
+- Python 3.10 or higher
- [uv](https://docs.astral.sh/uv/) (Python package manager and tool runner)
**Installing uv:**
@@ -91,14 +91,6 @@ Add to VSCode settings (`.vscode/mcp.json` in your workspace or user settings):
**After installation:** Restart your AI assistant and try: `"Describe the structure of my report.rdl file"`
-
-Optional: Enable debug logging
-
-Set environment variables:
-- `RDL_MCP_LOG_LEVEL`: `DEBUG`, `INFO`, `WARNING`, or `ERROR`
-- `RDL_MCP_LOG_FILE`: Path to log file
-
-
## Usage
Just ask your AI assistant in natural language:
@@ -182,7 +174,7 @@ All tools return `{success: bool, message?: string, error?: string}` or structur
**Server not appearing?**
- Check absolute path in config is correct
-- Verify Python 3.8+: `python3 --version`
+- Verify Python 3.10+: `python3 --version`
- Restart your MCP client
**Permission errors?**
@@ -238,7 +230,7 @@ PRs welcome! Priority areas:
- Better column detection for complex layouts
- More editing operations (reordering, grouping, etc.)
-Requirements: Python standard library only
+Requirements: Python 3.10+, fastmcp
1. Fork repo
2. Create feature branch
diff --git a/pyproject.toml b/pyproject.toml
index df46986..48e6738 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -7,16 +7,15 @@ name = "rdl-mcp"
version = "0.1.0"
description = "Edit SSRS reports using AI - MCP server for RDL files"
readme = "README.md"
-requires-python = ">=3.8"
+requires-python = ">=3.10"
license = { text = "MIT" }
+dependencies = ["fastmcp>=2.0"]
authors = [{ name = "Beth Maloney" }]
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
- "Programming Language :: Python :: 3.8",
- "Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
diff --git a/rdl_mcp/__init__.py b/rdl_mcp/__init__.py
index 619b6de..d2e9f27 100644
--- a/rdl_mcp/__init__.py
+++ b/rdl_mcp/__init__.py
@@ -1,6 +1,6 @@
"""RDL MCP Server - Model Context Protocol server for RDL file operations."""
-from .server import MCPServer, run_server
+from .server import MCPServer, mcp, run_server
from .validation import validate_rdl, extract_field_references, extract_field_references_with_context
from .reader import describe_rdl_report, get_rdl_datasets, get_rdl_parameters, get_rdl_columns
from .columns import add_column, remove_column, update_column_format, update_column_header, update_column_width
@@ -11,6 +11,7 @@
__all__ = [
'MCPServer',
+ 'mcp',
'run_server',
'validate_rdl',
'extract_field_references',
diff --git a/rdl_mcp/server.py b/rdl_mcp/server.py
index 54b5459..7ed09e8 100644
--- a/rdl_mcp/server.py
+++ b/rdl_mcp/server.py
@@ -1,10 +1,10 @@
"""MCP Server for RDL file operations."""
-import json
-import sys
import logging
from typing import Dict, Any, Optional
+from fastmcp import FastMCP
+
from . import reader
from . import columns
from . import datasets
@@ -13,328 +13,120 @@
logger = logging.getLogger(__name__)
+mcp = FastMCP("rdl-mcp-server")
+
+
+@mcp.tool
+def describe_rdl_report(filepath: str) -> Dict[str, Any]:
+ """Get a high-level summary of the RDL report structure."""
+ return reader.describe_rdl_report(filepath)
+
+
+@mcp.tool
+def get_rdl_datasets(filepath: str, field_limit: int = 0,
+ field_pattern: Optional[str] = None) -> Dict[str, Any]:
+ """Get all datasets with their fields, queries, and stored procedures."""
+ return reader.get_rdl_datasets(filepath, field_limit, field_pattern)
+
+
+@mcp.tool
+def get_rdl_parameters(filepath: str) -> Dict[str, Any]:
+ """Get all report parameters with their types and values."""
+ return reader.get_rdl_parameters(filepath)
+
+
+@mcp.tool
+def get_rdl_columns(filepath: str) -> Dict[str, Any]:
+ """Get table columns with headers, widths, and bindings."""
+ return reader.get_rdl_columns(filepath)
+
+
+@mcp.tool
+def validate_rdl(filepath: str) -> Dict[str, Any]:
+ """Validate RDL XML structure and field references."""
+ return validation.validate_rdl(filepath)
+
+
+@mcp.tool
+def update_column_header(filepath: str, old_header: str, new_header: str) -> Dict[str, Any]:
+ """Update a column header text."""
+ return columns.update_column_header(filepath, old_header, new_header)
+
+
+@mcp.tool
+def update_column_width(filepath: str, column_index: int, new_width: str) -> Dict[str, Any]:
+ """Update a column width."""
+ return columns.update_column_width(filepath, column_index, new_width)
+
+
+@mcp.tool
+def update_column_format(filepath: str, column_index: int, format_string: str) -> Dict[str, Any]:
+ """Update the format string for a column."""
+ return columns.update_column_format(filepath, column_index, format_string)
+
+
+@mcp.tool
+def update_column_colors(filepath: str, column_index: int,
+ text_color: Optional[str] = None,
+ background_color: Optional[str] = None,
+ header_text_color: Optional[str] = None,
+ header_background_color: Optional[str] = None) -> Dict[str, Any]:
+ """Update text and background colors for a column's data and/or header cells."""
+ return columns.update_column_colors(filepath, column_index, text_color, background_color,
+ header_text_color, header_background_color)
+
+
+@mcp.tool
+def add_column(filepath: str, column_index: int, header_text: str,
+ field_binding: str, width: str = "1in",
+ format_string: Optional[str] = None,
+ footer_expression: Optional[str] = None) -> Dict[str, Any]:
+ """Add a new column to the report table."""
+ return columns.add_column(filepath, column_index, header_text, field_binding,
+ width, format_string, footer_expression)
+
+
+@mcp.tool
+def remove_column(filepath: str, column_index: int,
+ auto_adjust_page_width: bool = True) -> Dict[str, Any]:
+ """Remove a column from the report table."""
+ return columns.remove_column(filepath, column_index, auto_adjust_page_width)
+
+
+@mcp.tool
+def update_stored_procedure(filepath: str, dataset_name: str, new_sproc: str) -> Dict[str, Any]:
+ """Update the stored procedure for a dataset."""
+ return datasets.update_stored_procedure(filepath, dataset_name, new_sproc)
+
+
+@mcp.tool
+def add_dataset_field(filepath: str, dataset_name: str, field_name: str,
+ data_field: str, type_name: str) -> Dict[str, Any]:
+ """Add a new field to a dataset."""
+ return datasets.add_dataset_field(filepath, dataset_name, field_name, data_field, type_name)
+
+
+@mcp.tool
+def remove_dataset_field(filepath: str, dataset_name: str, field_name: str) -> Dict[str, Any]:
+ """Remove a field from a dataset."""
+ return datasets.remove_dataset_field(filepath, dataset_name, field_name)
+
+
+@mcp.tool
+def add_parameter(filepath: str, name: str, data_type: str, prompt: str) -> Dict[str, Any]:
+ """Add a new report parameter."""
+ return parameters.add_parameter(filepath, name, data_type, prompt)
+
+
+@mcp.tool
+def update_parameter(filepath: str, name: str, prompt: Optional[str] = None,
+ default_value: Optional[str] = None) -> Dict[str, Any]:
+ """Update an existing report parameter."""
+ return parameters.update_parameter(filepath, name, prompt, default_value)
+
class MCPServer:
- """MCP Server that handles RDL file operations."""
-
- def __init__(self):
- logger.info("Initializing RDL MCP Server")
- self.tools = {}
- self.register_tools()
- logger.info(f"Registered {len(self.tools)} tools")
-
- def register_tools(self):
- """Register all available tools."""
- self.tools = {
- 'describe_rdl_report': {
- 'function': self.describe_rdl_report,
- 'description': 'Get a high-level summary of the RDL report structure'
- },
- 'get_rdl_datasets': {
- 'function': self.get_rdl_datasets,
- 'description': 'Get all datasets with their fields, queries, and stored procedures'
- },
- 'get_rdl_parameters': {
- 'function': self.get_rdl_parameters,
- 'description': 'Get all report parameters with their types and values'
- },
- 'get_rdl_columns': {
- 'function': self.get_rdl_columns,
- 'description': 'Get table columns with headers, widths, and bindings'
- },
- 'validate_rdl': {
- 'function': self.validate_rdl,
- 'description': 'Validate RDL XML structure and field references'
- },
- 'update_column_header': {
- 'function': self.update_column_header,
- 'description': 'Update a column header text'
- },
- 'update_column_width': {
- 'function': self.update_column_width,
- 'description': 'Update a column width'
- },
- 'update_column_format': {
- 'function': self.update_column_format,
- 'description': 'Update the format string for a column'
- },
- 'update_column_colors': {
- 'function': self.update_column_colors,
- 'description': 'Update text and background colors for a column\'s data and/or header cells'
- },
- 'add_column': {
- 'function': self.add_column,
- 'description': 'Add a new column to the report table'
- },
- 'remove_column': {
- 'function': self.remove_column,
- 'description': 'Remove a column from the report table'
- },
- 'update_stored_procedure': {
- 'function': self.update_stored_procedure,
- 'description': 'Update the stored procedure for a dataset'
- },
- 'add_dataset_field': {
- 'function': self.add_dataset_field,
- 'description': 'Add a new field to a dataset'
- },
- 'remove_dataset_field': {
- 'function': self.remove_dataset_field,
- 'description': 'Remove a field from a dataset'
- },
- 'add_parameter': {
- 'function': self.add_parameter,
- 'description': 'Add a new report parameter'
- },
- 'update_parameter': {
- 'function': self.update_parameter,
- 'description': 'Update an existing report parameter'
- }
- }
-
- def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
- """Handle an MCP request."""
- method = request.get('method', '')
- params = request.get('params', {})
- request_id = request.get('id')
-
- logger.debug(f"Handling request: {method}")
-
- if method == 'initialize':
- return {
- 'jsonrpc': '2.0',
- 'id': request_id,
- 'result': {
- 'protocolVersion': '2024-11-05',
- 'serverInfo': {
- 'name': 'rdl-mcp-server',
- 'version': '1.0.0'
- },
- 'capabilities': {
- 'tools': {}
- }
- }
- }
-
- elif method == 'tools/list':
- tools_list = []
- for name, info in self.tools.items():
- tools_list.append({
- 'name': name,
- 'description': info['description'],
- 'inputSchema': self._get_tool_schema(name)
- })
- return {
- 'jsonrpc': '2.0',
- 'id': request_id,
- 'result': {'tools': tools_list}
- }
-
- elif method == 'tools/call':
- tool_name = params.get('name')
- tool_args = params.get('arguments', {})
-
- if tool_name not in self.tools:
- return {
- 'jsonrpc': '2.0',
- 'id': request_id,
- 'error': {
- 'code': -32601,
- 'message': f'Unknown tool: {tool_name}'
- }
- }
-
- try:
- result = self.tools[tool_name]['function'](**tool_args)
- return {
- 'jsonrpc': '2.0',
- 'id': request_id,
- 'result': {
- 'content': [{'type': 'text', 'text': json.dumps(result, indent=2)}]
- }
- }
- except Exception as e:
- logger.error(f"Error executing tool {tool_name}: {e}")
- return {
- 'jsonrpc': '2.0',
- 'id': request_id,
- 'error': {
- 'code': -32000,
- 'message': str(e)
- }
- }
-
- elif method == 'notifications/initialized':
- return None
-
- else:
- return {
- 'jsonrpc': '2.0',
- 'id': request_id,
- 'error': {
- 'code': -32601,
- 'message': f'Method not found: {method}'
- }
- }
-
- def _get_tool_schema(self, tool_name: str) -> Dict[str, Any]:
- """Get the JSON schema for a tool's parameters."""
- schemas = {
- 'describe_rdl_report': {
- 'type': 'object',
- 'properties': {
- 'filepath': {'type': 'string', 'description': 'Path to the RDL file'}
- },
- 'required': ['filepath']
- },
- 'get_rdl_datasets': {
- 'type': 'object',
- 'properties': {
- 'filepath': {'type': 'string', 'description': 'Path to the RDL file'},
- 'field_limit': {'type': 'integer', 'description': 'Number of fields to return (0=none, -1=all)', 'default': 0},
- 'field_pattern': {'type': 'string', 'description': 'Regex pattern to filter fields'}
- },
- 'required': ['filepath']
- },
- 'get_rdl_parameters': {
- 'type': 'object',
- 'properties': {
- 'filepath': {'type': 'string', 'description': 'Path to the RDL file'}
- },
- 'required': ['filepath']
- },
- 'get_rdl_columns': {
- 'type': 'object',
- 'properties': {
- 'filepath': {'type': 'string', 'description': 'Path to the RDL file'}
- },
- 'required': ['filepath']
- },
- 'validate_rdl': {
- 'type': 'object',
- 'properties': {
- 'filepath': {'type': 'string', 'description': 'Path to the RDL file'}
- },
- 'required': ['filepath']
- },
- 'update_column_header': {
- 'type': 'object',
- 'properties': {
- 'filepath': {'type': 'string'},
- 'old_header': {'type': 'string'},
- 'new_header': {'type': 'string'}
- },
- 'required': ['filepath', 'old_header', 'new_header']
- },
- 'update_column_width': {
- 'type': 'object',
- 'properties': {
- 'filepath': {'type': 'string'},
- 'column_index': {'type': 'integer'},
- 'new_width': {'type': 'string'}
- },
- 'required': ['filepath', 'column_index', 'new_width']
- },
- 'update_column_format': {
- 'type': 'object',
- 'properties': {
- 'filepath': {'type': 'string'},
- 'column_index': {'type': 'integer'},
- 'format_string': {'type': 'string'}
- },
- 'required': ['filepath', 'column_index', 'format_string']
- },
- 'update_column_colors': {
- 'type': 'object',
- 'properties': {
- 'filepath': {'type': 'string'},
- 'column_index': {'type': 'integer'},
- 'text_color': {'type': 'string', 'description': 'Text color for data cells (e.g., "Red", "#FF0000")'},
- 'background_color': {'type': 'string', 'description': 'Background color for data cells'},
- 'header_text_color': {'type': 'string', 'description': 'Text color for header cell'},
- 'header_background_color': {'type': 'string', 'description': 'Background color for header cell'}
- },
- 'required': ['filepath', 'column_index']
- },
- 'add_column': {
- 'type': 'object',
- 'properties': {
- 'filepath': {'type': 'string'},
- 'column_index': {'type': 'integer'},
- 'header_text': {'type': 'string'},
- 'field_binding': {'type': 'string'},
- 'width': {'type': 'string', 'default': '1in'},
- 'format_string': {'type': 'string'},
- 'footer_expression': {'type': 'string'}
- },
- 'required': ['filepath', 'column_index', 'header_text', 'field_binding']
- },
- 'remove_column': {
- 'type': 'object',
- 'properties': {
- 'filepath': {'type': 'string'},
- 'column_index': {'type': 'integer'},
- 'auto_adjust_page_width': {
- 'type': 'boolean',
- 'default': True,
- 'description': 'If true, shrink page width to fit remaining columns plus margins'
- }
- },
- 'required': ['filepath', 'column_index']
- },
- 'update_stored_procedure': {
- 'type': 'object',
- 'properties': {
- 'filepath': {'type': 'string'},
- 'dataset_name': {'type': 'string'},
- 'new_sproc': {'type': 'string'}
- },
- 'required': ['filepath', 'dataset_name', 'new_sproc']
- },
- 'add_dataset_field': {
- 'type': 'object',
- 'properties': {
- 'filepath': {'type': 'string'},
- 'dataset_name': {'type': 'string'},
- 'field_name': {'type': 'string'},
- 'data_field': {'type': 'string'},
- 'type_name': {'type': 'string'}
- },
- 'required': ['filepath', 'dataset_name', 'field_name', 'data_field', 'type_name']
- },
- 'remove_dataset_field': {
- 'type': 'object',
- 'properties': {
- 'filepath': {'type': 'string'},
- 'dataset_name': {'type': 'string'},
- 'field_name': {'type': 'string'}
- },
- 'required': ['filepath', 'dataset_name', 'field_name']
- },
- 'add_parameter': {
- 'type': 'object',
- 'properties': {
- 'filepath': {'type': 'string'},
- 'name': {'type': 'string'},
- 'data_type': {'type': 'string'},
- 'prompt': {'type': 'string'}
- },
- 'required': ['filepath', 'name', 'data_type', 'prompt']
- },
- 'update_parameter': {
- 'type': 'object',
- 'properties': {
- 'filepath': {'type': 'string'},
- 'name': {'type': 'string'},
- 'prompt': {'type': 'string'},
- 'default_value': {'type': 'string'}
- },
- 'required': ['filepath', 'name']
- }
- }
- return schemas.get(tool_name, {'type': 'object', 'properties': {}})
-
- # Delegate methods to modules
+ """Thin wrapper around RDL operations, kept for backward compatibility."""
def describe_rdl_report(self, filepath: str) -> Dict[str, Any]:
return reader.describe_rdl_report(filepath)
@@ -397,7 +189,6 @@ def update_parameter(self, filepath: str, name: str, prompt: Optional[str] = Non
default_value: Optional[str] = None) -> Dict[str, Any]:
return parameters.update_parameter(filepath, name, prompt, default_value)
- # Expression parsing methods (exposed for testing)
def _extract_field_references_with_context(self, expression: str, default_dataset: str) -> Dict[str, Any]:
return validation.extract_field_references_with_context(expression, default_dataset)
@@ -407,24 +198,5 @@ def _extract_field_references(self, expression: str):
def run_server():
"""Run the MCP server."""
- server = MCPServer()
-
- for line in sys.stdin:
- line = line.strip()
- if not line:
- continue
-
- try:
- request = json.loads(line)
- response = server.handle_request(request)
- if response:
- print(json.dumps(response))
- sys.stdout.flush()
- except json.JSONDecodeError as e:
- logger.error(f"Invalid JSON: {e}")
- print(json.dumps({
- 'jsonrpc': '2.0',
- 'id': None,
- 'error': {'code': -32700, 'message': 'Parse error'}
- }))
- sys.stdout.flush()
+ mcp.run()
+
diff --git a/rdl_mcp_server.py b/rdl_mcp_server.py
index 9627cbf..bd127f4 100755
--- a/rdl_mcp_server.py
+++ b/rdl_mcp_server.py
@@ -2,79 +2,16 @@
"""
MCP Server for RDL (Report Definition Language) Report Editing
Provides high-level tools for reading and modifying SSRS/RDL reports
-
-This is a backward-compatible entry point that imports from the rdl_mcp package.
"""
-import json
-import logging
-import os
-import sys
-
-# Configure logging
-def setup_logging():
- """Configure logging for the RDL MCP Server"""
- log_level = os.environ.get('RDL_MCP_LOG_LEVEL', 'INFO').upper()
- log_file = os.environ.get('RDL_MCP_LOG_FILE')
-
- logger = logging.getLogger('rdl_mcp_server')
- logger.setLevel(getattr(logging, log_level, logging.INFO))
-
- formatter = logging.Formatter(
- '%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S'
- )
-
- # Console handler (stderr to avoid interfering with MCP protocol on stdout)
- console_handler = logging.StreamHandler(sys.stderr)
- console_handler.setLevel(logging.WARNING)
- console_handler.setFormatter(formatter)
- logger.addHandler(console_handler)
-
- # File handler if log file specified
- if log_file:
- try:
- file_handler = logging.FileHandler(log_file)
- file_handler.setLevel(logging.DEBUG)
- file_handler.setFormatter(formatter)
- logger.addHandler(file_handler)
- logger.info(f"Logging to file: {log_file}")
- except Exception as e:
- logger.error(f"Failed to set up file logging: {e}")
-
- return logger
-
-# Initialize logger
-logger = setup_logging()
-
-# Import from package
-from rdl_mcp import MCPServer
+from rdl_mcp.server import MCPServer, mcp
def main():
"""Main entry point for the MCP server."""
- server = MCPServer()
-
- for line in sys.stdin:
- line = line.strip()
- if not line:
- continue
-
- try:
- request = json.loads(line)
- response = server.handle_request(request)
- if response:
- print(json.dumps(response))
- sys.stdout.flush()
- except json.JSONDecodeError as e:
- logger.error(f"Invalid JSON: {e}")
- print(json.dumps({
- 'jsonrpc': '2.0',
- 'id': None,
- 'error': {'code': -32700, 'message': 'Parse error'}
- }))
- sys.stdout.flush()
+ mcp.run()
if __name__ == '__main__':
main()
+
diff --git a/tests/test_rdl_mcp_server.py b/tests/test_rdl_mcp_server.py
index d56f80c..34abd7a 100644
--- a/tests/test_rdl_mcp_server.py
+++ b/tests/test_rdl_mcp_server.py
@@ -13,7 +13,7 @@
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-from rdl_mcp_server import MCPServer
+from rdl_mcp.server import MCPServer, mcp
# Path to test fixtures
@@ -806,5 +806,42 @@ def test_update_nonexistent_parameter(self, server, temp_report):
assert 'not found' in result['message']
-if __name__ == '__main__':
+class TestFastMCPServer:
+ """Tests verifying fastmcp tool registration."""
+
+ EXPECTED_TOOLS = {
+ 'describe_rdl_report',
+ 'get_rdl_datasets',
+ 'get_rdl_parameters',
+ 'get_rdl_columns',
+ 'validate_rdl',
+ 'update_column_header',
+ 'update_column_width',
+ 'update_column_format',
+ 'update_column_colors',
+ 'add_column',
+ 'remove_column',
+ 'update_stored_procedure',
+ 'add_dataset_field',
+ 'remove_dataset_field',
+ 'add_parameter',
+ 'update_parameter',
+ }
+
+ def _get_tools(self):
+ import asyncio
+ return asyncio.run(mcp.list_tools())
+
+ def test_all_tools_registered(self):
+ registered = {t.name for t in self._get_tools()}
+ assert self.EXPECTED_TOOLS == registered
+
+ def test_tool_has_description(self):
+ tools = {t.name: t for t in self._get_tools()}
+ assert tools['describe_rdl_report'].description
+
+ def test_tool_count(self):
+ assert len(self._get_tools()) == len(self.EXPECTED_TOOLS)
+
+
pytest.main([__file__, '-v'])
From fd4f7eb1eb7eb2c0492efff3831d8c966133dbe4 Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Tue, 24 Mar 2026 13:50:10 +0000
Subject: [PATCH 07/11] Initial plan
From da7fb53f10d18bc7d48397c26925a7190fb6547c Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Tue, 24 Mar 2026 14:03:14 +0000
Subject: [PATCH 08/11] Fix critical security vulnerabilities: XXE, path
traversal, info disclosure, ReDoS
- Replace unsafe xml.etree.ElementTree.parse() with defusedxml (CWE-611, CWE-776)
- Add filepath validation to prevent path traversal attacks (CWE-22)
- Sanitize error messages to prevent information disclosure (CWE-209)
- Add regex pattern length limit to prevent ReDoS attacks (CWE-1333)
- Add defusedxml>=0.7.1 as project dependency
- Add 12 security tests covering XXE, path traversal, info disclosure, ReDoS
Co-authored-by: urjeetpatel <1077440+urjeetpatel@users.noreply.github.com>
Agent-Logs-Url: https://github.com/urjeetpatel/rdl-mcp/sessions/447b3529-a1b4-4b3f-a600-4df5fe62f786
---
pyproject.toml | 3 +
rdl_mcp/reader.py | 11 ++-
rdl_mcp/server.py | 12 ++-
rdl_mcp/validation.py | 6 +-
rdl_mcp/xml_utils.py | 48 ++++++++-
setup.py | 3 +
tests/test_rdl_mcp_server.py | 187 +++++++++++++++++++++++++++++++++++
7 files changed, 259 insertions(+), 11 deletions(-)
diff --git a/pyproject.toml b/pyproject.toml
index df46986..1a332d1 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -9,6 +9,9 @@ description = "Edit SSRS reports using AI - MCP server for RDL files"
readme = "README.md"
requires-python = ">=3.8"
license = { text = "MIT" }
+dependencies = [
+ "defusedxml>=0.7.1",
+]
authors = [{ name = "Beth Maloney" }]
classifiers = [
"Development Status :: 3 - Alpha",
diff --git a/rdl_mcp/reader.py b/rdl_mcp/reader.py
index 8cb35b4..cf38955 100644
--- a/rdl_mcp/reader.py
+++ b/rdl_mcp/reader.py
@@ -122,10 +122,15 @@ def get_rdl_datasets(filepath: str, field_limit: int = 0, field_pattern: Optiona
# Apply field pattern filter
if field_pattern:
try:
- pattern_re = re.compile(field_pattern, re.IGNORECASE)
- filtered_fields = [f for f in all_fields if pattern_re.search(f['name'])]
+ # Limit pattern length to prevent ReDoS (CWE-1333)
+ if len(field_pattern) > 200:
+ logger.warning(f"field_pattern too long ({len(field_pattern)} chars), ignoring")
+ filtered_fields = all_fields
+ else:
+ pattern_re = re.compile(field_pattern, re.IGNORECASE)
+ filtered_fields = [f for f in all_fields if pattern_re.search(f['name'])]
except re.error as e:
- logger.warning(f"Invalid field_pattern regex: {field_pattern}, error: {e}")
+ logger.warning(f"Invalid field_pattern regex, error: {e}")
filtered_fields = all_fields
else:
filtered_fields = all_fields
diff --git a/rdl_mcp/server.py b/rdl_mcp/server.py
index 54b5459..8de349f 100644
--- a/rdl_mcp/server.py
+++ b/rdl_mcp/server.py
@@ -155,12 +155,22 @@ def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
}
except Exception as e:
logger.error(f"Error executing tool {tool_name}: {e}")
+ error_msg = str(e)
+ # Sanitize error messages to avoid leaking internal paths (CWE-209)
+ if isinstance(e, FileNotFoundError):
+ error_msg = "File not found"
+ elif isinstance(e, ValueError):
+ error_msg = str(e)
+ elif isinstance(e, PermissionError):
+ error_msg = "Permission denied"
+ else:
+ error_msg = "An internal error occurred"
return {
'jsonrpc': '2.0',
'id': request_id,
'error': {
'code': -32000,
- 'message': str(e)
+ 'message': error_msg
}
}
diff --git a/rdl_mcp/validation.py b/rdl_mcp/validation.py
index 0c6c0f8..29aec0c 100644
--- a/rdl_mcp/validation.py
+++ b/rdl_mcp/validation.py
@@ -1,11 +1,12 @@
"""RDL validation logic and expression parsing."""
import xml.etree.ElementTree as ET
+import defusedxml.ElementTree as SafeET
import re
import logging
from typing import Dict, List, Any, Optional
-from .xml_utils import get_namespace, find_parent
+from .xml_utils import get_namespace, find_parent, validate_filepath
logger = logging.getLogger(__name__)
@@ -121,7 +122,8 @@ def extract_field_references(expression: str) -> List[str]:
def validate_rdl(filepath: str) -> Dict[str, Any]:
"""Validate RDL XML structure and field references in expressions."""
try:
- tree = ET.parse(filepath)
+ resolved = validate_filepath(filepath)
+ tree = SafeET.parse(resolved)
root = tree.getroot()
ns = get_namespace(root)
diff --git a/rdl_mcp/xml_utils.py b/rdl_mcp/xml_utils.py
index c3d2713..b21f005 100644
--- a/rdl_mcp/xml_utils.py
+++ b/rdl_mcp/xml_utils.py
@@ -1,6 +1,8 @@
"""XML utility functions for RDL file handling."""
import xml.etree.ElementTree as ET
+import defusedxml.ElementTree as SafeET
+import os
import re
import logging
from typing import Optional
@@ -8,6 +10,32 @@
logger = logging.getLogger(__name__)
+def validate_filepath(filepath: str) -> str:
+ """Validate and resolve a filepath for safe access.
+
+ Ensures the path points to a .rdl file, resolves symlinks,
+ and prevents path traversal attacks (CWE-22).
+
+ Returns:
+ The resolved absolute filepath.
+
+ Raises:
+ ValueError: If the filepath is invalid or unsafe.
+ """
+ if not filepath or not isinstance(filepath, str):
+ raise ValueError("Filepath must be a non-empty string")
+
+ resolved = os.path.realpath(filepath)
+
+ if not resolved.lower().endswith('.rdl'):
+ raise ValueError("Only .rdl files are supported")
+
+ if not os.path.isfile(resolved):
+ raise FileNotFoundError(f"File not found: {filepath}")
+
+ return resolved
+
+
def get_namespace(root: ET.Element) -> str:
"""Extract the namespace from the root element."""
match = re.match(r'\{(.+?)\}', root.tag)
@@ -40,15 +68,25 @@ def register_namespaces(filepath: str):
def parse_rdl(filepath: str) -> ET.Element:
- """Parse an RDL file and return the root element."""
- tree = ET.parse(filepath)
+ """Parse an RDL file and return the root element.
+
+ Uses defusedxml to prevent XXE and entity expansion attacks (CWE-611, CWE-776).
+ Validates the filepath to prevent path traversal (CWE-22).
+ """
+ resolved = validate_filepath(filepath)
+ tree = SafeET.parse(resolved)
return tree.getroot()
def parse_rdl_tree(filepath: str) -> ET.ElementTree:
- """Parse an RDL file and return the ElementTree (for modifications)."""
- register_namespaces(filepath)
- return ET.parse(filepath)
+ """Parse an RDL file and return the ElementTree (for modifications).
+
+ Uses defusedxml to prevent XXE and entity expansion attacks (CWE-611, CWE-776).
+ Validates the filepath to prevent path traversal (CWE-22).
+ """
+ resolved = validate_filepath(filepath)
+ register_namespaces(resolved)
+ return SafeET.parse(resolved)
def indent_xml(elem: ET.Element, level: int = 0):
diff --git a/setup.py b/setup.py
index 30e9fff..5b4dcfe 100644
--- a/setup.py
+++ b/setup.py
@@ -15,6 +15,9 @@
py_modules=["rdl_mcp_server"],
packages=find_packages(),
python_requires=">=3.8",
+ install_requires=[
+ "defusedxml>=0.7.1",
+ ],
classifiers=[
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
diff --git a/tests/test_rdl_mcp_server.py b/tests/test_rdl_mcp_server.py
index d56f80c..e64cb88 100644
--- a/tests/test_rdl_mcp_server.py
+++ b/tests/test_rdl_mcp_server.py
@@ -806,5 +806,192 @@ def test_update_nonexistent_parameter(self, server, temp_report):
assert 'not found' in result['message']
+class TestSecurityXXE:
+ """Tests for XML External Entity (XXE) prevention (CWE-611, CWE-776)."""
+
+ def test_xxe_external_entity_blocked(self, server):
+ """Verify that XML with external entity declarations is rejected."""
+ xxe_content = '''
+
+]>
+
+
+
+
+ test
+
+
+
+
+'''
+
+ with tempfile.NamedTemporaryFile(suffix='.rdl', delete=False, mode='w') as f:
+ f.write(xxe_content)
+ temp_path = f.name
+
+ try:
+ result = server.validate_rdl(temp_path)
+ # defusedxml should block parsing; validation returns error
+ assert result['valid'] == False
+ finally:
+ os.unlink(temp_path)
+
+ def test_xxe_entity_expansion_bomb_blocked(self, server):
+ """Verify that entity expansion (billion laughs) attacks are rejected."""
+ bomb_content = '''
+
+
+
+]>
+
+
+
+
+ test
+
+
+
+
+'''
+
+ with tempfile.NamedTemporaryFile(suffix='.rdl', delete=False, mode='w') as f:
+ f.write(bomb_content)
+ temp_path = f.name
+
+ try:
+ result = server.validate_rdl(temp_path)
+ assert result['valid'] == False
+ finally:
+ os.unlink(temp_path)
+
+ def test_xxe_parameter_injection_blocked(self, server):
+ """Verify XXE is blocked via external DTD reference."""
+ xxe_content = '''
+
+
+
+'''
+
+ with tempfile.NamedTemporaryFile(suffix='.rdl', delete=False, mode='w') as f:
+ f.write(xxe_content)
+ temp_path = f.name
+
+ try:
+ result = server.validate_rdl(temp_path)
+ assert result['valid'] == False
+ finally:
+ os.unlink(temp_path)
+
+
+class TestSecurityPathTraversal:
+ """Tests for path traversal prevention (CWE-22)."""
+
+ def test_non_rdl_extension_rejected(self, server):
+ """Verify that files without .rdl extension are rejected."""
+ with tempfile.NamedTemporaryFile(suffix='.xml', delete=False, mode='w') as f:
+ f.write('')
+ temp_path = f.name
+
+ try:
+ result = server.describe_rdl_report(temp_path)
+ # Should not succeed with non-rdl file
+ assert False, "Expected an error for non-.rdl file"
+ except ValueError as e:
+ assert "Only .rdl files are supported" in str(e)
+ finally:
+ os.unlink(temp_path)
+
+ def test_sensitive_file_read_blocked(self, server):
+ """Verify that reading non-.rdl system files is blocked."""
+ try:
+ result = server.describe_rdl_report('/etc/passwd')
+ assert False, "Expected an error for non-.rdl file"
+ except ValueError as e:
+ assert "Only .rdl files are supported" in str(e)
+
+ def test_symlink_resolved(self, server, temp_report):
+ """Verify that symlinks are resolved (preventing symlink-based traversal)."""
+ import rdl_mcp.xml_utils as xml_utils
+
+ # Create a symlink to the .rdl file - this should work fine
+ symlink_path = temp_report + '.link.rdl'
+ try:
+ os.symlink(temp_report, symlink_path)
+ resolved = xml_utils.validate_filepath(symlink_path)
+ # Should resolve to the real path
+ assert resolved == os.path.realpath(temp_report)
+ finally:
+ if os.path.exists(symlink_path):
+ os.unlink(symlink_path)
+
+ def test_empty_filepath_rejected(self, server):
+ """Verify that empty filepath is rejected."""
+ try:
+ server.describe_rdl_report('')
+ assert False, "Expected an error for empty filepath"
+ except ValueError as e:
+ assert "non-empty string" in str(e)
+
+ def test_none_filepath_rejected(self, server):
+ """Verify that None filepath is rejected."""
+ try:
+ server.describe_rdl_report(None)
+ assert False, "Expected an error for None filepath"
+ except (ValueError, TypeError):
+ pass # Either error type is acceptable
+
+
+class TestSecurityInfoDisclosure:
+ """Tests for information disclosure prevention (CWE-209)."""
+
+ def test_error_does_not_leak_internal_paths(self, server):
+ """Verify that error responses don't expose internal file paths."""
+ request = {
+ 'method': 'tools/call',
+ 'id': 1,
+ 'params': {
+ 'name': 'describe_rdl_report',
+ 'arguments': {'filepath': '/some/secret/internal/path/report.rdl'}
+ }
+ }
+ response = server.handle_request(request)
+ error_msg = response.get('error', {}).get('message', '')
+ assert '/some/secret/internal/path' not in error_msg
+
+ def test_generic_error_for_unexpected_exceptions(self, server):
+ """Verify that unexpected errors return generic messages."""
+ request = {
+ 'method': 'tools/call',
+ 'id': 1,
+ 'params': {
+ 'name': 'describe_rdl_report',
+ 'arguments': {'filepath': '/etc/passwd'}
+ }
+ }
+ response = server.handle_request(request)
+ error_msg = response.get('error', {}).get('message', '')
+ # Should not contain system paths or stack traces
+ assert 'Traceback' not in error_msg
+
+
+class TestSecurityReDoS:
+ """Tests for Regular Expression Denial of Service prevention (CWE-1333)."""
+
+ def test_long_regex_pattern_rejected(self, server, temp_report):
+ """Verify that excessively long regex patterns are handled safely."""
+ # Create a very long regex pattern
+ long_pattern = 'a' * 201
+ result = server.get_rdl_datasets(temp_report, field_limit=-1, field_pattern=long_pattern)
+ # Should return all fields unfiltered rather than crash
+ assert 'datasets' in result
+
+ def test_invalid_regex_handled_gracefully(self, server, temp_report):
+ """Verify that invalid regex patterns don't crash the server."""
+ result = server.get_rdl_datasets(temp_report, field_limit=-1, field_pattern='[invalid')
+ assert 'datasets' in result
+
+
if __name__ == '__main__':
pytest.main([__file__, '-v'])
From f9c779fbbdcf5a0827be97ca51d362c37effe752 Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Tue, 24 Mar 2026 14:04:51 +0000
Subject: [PATCH 09/11] Address code review: remove info from error messages
and logs, use pytest.fail()
Co-authored-by: urjeetpatel <1077440+urjeetpatel@users.noreply.github.com>
Agent-Logs-Url: https://github.com/urjeetpatel/rdl-mcp/sessions/447b3529-a1b4-4b3f-a600-4df5fe62f786
---
rdl_mcp/reader.py | 2 +-
rdl_mcp/xml_utils.py | 2 +-
tests/test_rdl_mcp_server.py | 8 ++++----
3 files changed, 6 insertions(+), 6 deletions(-)
diff --git a/rdl_mcp/reader.py b/rdl_mcp/reader.py
index cf38955..ddf0cd6 100644
--- a/rdl_mcp/reader.py
+++ b/rdl_mcp/reader.py
@@ -124,7 +124,7 @@ def get_rdl_datasets(filepath: str, field_limit: int = 0, field_pattern: Optiona
try:
# Limit pattern length to prevent ReDoS (CWE-1333)
if len(field_pattern) > 200:
- logger.warning(f"field_pattern too long ({len(field_pattern)} chars), ignoring")
+ logger.warning("field_pattern too long, ignoring")
filtered_fields = all_fields
else:
pattern_re = re.compile(field_pattern, re.IGNORECASE)
diff --git a/rdl_mcp/xml_utils.py b/rdl_mcp/xml_utils.py
index b21f005..cb1c67b 100644
--- a/rdl_mcp/xml_utils.py
+++ b/rdl_mcp/xml_utils.py
@@ -31,7 +31,7 @@ def validate_filepath(filepath: str) -> str:
raise ValueError("Only .rdl files are supported")
if not os.path.isfile(resolved):
- raise FileNotFoundError(f"File not found: {filepath}")
+ raise FileNotFoundError("File not found")
return resolved
diff --git a/tests/test_rdl_mcp_server.py b/tests/test_rdl_mcp_server.py
index e64cb88..9d7514d 100644
--- a/tests/test_rdl_mcp_server.py
+++ b/tests/test_rdl_mcp_server.py
@@ -897,7 +897,7 @@ def test_non_rdl_extension_rejected(self, server):
try:
result = server.describe_rdl_report(temp_path)
# Should not succeed with non-rdl file
- assert False, "Expected an error for non-.rdl file"
+ pytest.fail("Expected an error for non-.rdl file")
except ValueError as e:
assert "Only .rdl files are supported" in str(e)
finally:
@@ -907,7 +907,7 @@ def test_sensitive_file_read_blocked(self, server):
"""Verify that reading non-.rdl system files is blocked."""
try:
result = server.describe_rdl_report('/etc/passwd')
- assert False, "Expected an error for non-.rdl file"
+ pytest.fail("Expected an error for non-.rdl file")
except ValueError as e:
assert "Only .rdl files are supported" in str(e)
@@ -930,7 +930,7 @@ def test_empty_filepath_rejected(self, server):
"""Verify that empty filepath is rejected."""
try:
server.describe_rdl_report('')
- assert False, "Expected an error for empty filepath"
+ pytest.fail("Expected an error for empty filepath")
except ValueError as e:
assert "non-empty string" in str(e)
@@ -938,7 +938,7 @@ def test_none_filepath_rejected(self, server):
"""Verify that None filepath is rejected."""
try:
server.describe_rdl_report(None)
- assert False, "Expected an error for None filepath"
+ pytest.fail("Expected an error for None filepath")
except (ValueError, TypeError):
pass # Either error type is acceptable
From 98fcbd23a1daf1b9251b5653b2e3ec1a7ee55542 Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Wed, 8 Apr 2026 16:58:03 +0000
Subject: [PATCH 10/11] Initial plan
From b298f2dd957127475d30c55a66c390232db57853 Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Wed, 8 Apr 2026 17:07:26 +0000
Subject: [PATCH 11/11] feat: RDL MCP hardening - safe batch editing,
concurrency control, layout/style APIs
Agent-Logs-Url: https://github.com/urjeetpatel/rdl-mcp/sessions/f7c4a2df-3782-4a3a-9fd9-759a26d14bec
Co-authored-by: urjeetpatel <1077440+urjeetpatel@users.noreply.github.com>
---
pyproject.toml | 2 +-
rdl_mcp/batch.py | 408 +++++++++++++++++++++++++++++++++++
rdl_mcp/server.py | 98 ++++++++-
rdl_mcp/styles.py | 190 ++++++++++++++++
rdl_mcp/xml_utils.py | 127 +++++++++--
tests/test_rdl_mcp_server.py | 310 ++++++++++++++++++++++++++
6 files changed, 1120 insertions(+), 15 deletions(-)
create mode 100644 rdl_mcp/batch.py
create mode 100644 rdl_mcp/styles.py
diff --git a/pyproject.toml b/pyproject.toml
index 0ee4c0c..3f33362 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -11,7 +11,7 @@ requires-python = ">=3.10"
license = { text = "MIT" }
dependencies = [
"defusedxml>=0.7.1",
- "fastmcp>=2.0",
+ "fastmcp>=3.2",
]
authors = [{ name = "Beth Maloney" }]
classifiers = [
diff --git a/rdl_mcp/batch.py b/rdl_mcp/batch.py
new file mode 100644
index 0000000..067e6f2
--- /dev/null
+++ b/rdl_mcp/batch.py
@@ -0,0 +1,408 @@
+"""RDL batch update operations - apply multiple column changes transactionally."""
+
+import copy
+import logging
+import uuid
+import xml.etree.ElementTree as ET
+from typing import Any, Dict, List, Optional
+
+from .reader import _detect_row_type
+from .xml_utils import get_namespace, parse_rdl_tree, write_xml
+from .columns import _parse_dimension, _update_tablix_width
+
+logger = logging.getLogger(__name__)
+
+
+# ---------------------------------------------------------------------------
+# Internal helpers
+# ---------------------------------------------------------------------------
+
+def _get_or_create_textbox_style(textbox: ET.Element, ns: str) -> ET.Element:
+ """Return the direct-child Style element of *textbox*, creating if absent."""
+ style = textbox.find(f"{ns}Style")
+ if style is None:
+ style = ET.Element(f"{ns}Style")
+ textbox.insert(0, style)
+ return style
+
+
+def _set_style_child(style: ET.Element, ns: str, tag: str, value: str):
+ """Set (or create) a child element *tag* under *style* to *value*."""
+ elem = style.find(tag)
+ if elem is None:
+ elem = ET.SubElement(style, tag)
+ elem.text = value
+
+
+def _get_textbox_color_props(textbox: ET.Element, ns: str) -> Dict[str, Optional[str]]:
+ """Return current color + alignment props from a Textbox's Style."""
+ style = textbox.find(f"{ns}Style")
+ if style is None:
+ return {}
+ result: Dict[str, Optional[str]] = {}
+ for tag, key in (
+ (f"{ns}Color", "text_color"),
+ (f"{ns}BackgroundColor", "background_color"),
+ (f"{ns}TextAlign", "alignment"),
+ ):
+ elem = style.find(tag)
+ if elem is not None and elem.text:
+ result[key] = elem.text
+ return result
+
+
+def _get_textrun_format(textbox: ET.Element, ns: str) -> Optional[str]:
+ fmt_elem = textbox.find(f".//{ns}TextRun/{ns}Style/{ns}Format")
+ return fmt_elem.text if fmt_elem is not None and fmt_elem.text else None
+
+
+def _get_column_width(tablix: ET.Element, ns: str, col_idx: int) -> Optional[str]:
+ cols = tablix.findall(f".//{ns}TablixBody/{ns}TablixColumns/{ns}TablixColumn")
+ if col_idx < len(cols):
+ w = cols[col_idx].find(f"{ns}Width")
+ return w.text if w is not None else None
+ return None
+
+
+def _get_column_header(tablix: ET.Element, ns: str, col_idx: int) -> Optional[str]:
+ for row in tablix.findall(f".//{ns}TablixBody/{ns}TablixRows/{ns}TablixRow"):
+ cells = row.findall(f"{ns}TablixCells/{ns}TablixCell")
+ if _detect_row_type(cells, ns) == "header":
+ if col_idx < len(cells):
+ v = cells[col_idx].find(f".//{ns}Value")
+ return v.text if v is not None else None
+ return None
+
+
+def _snapshot_column(tablix: ET.Element, ns: str, col_idx: int) -> Dict[str, Any]:
+ """Capture current state of a column for diff generation."""
+ snap: Dict[str, Any] = {
+ "width": _get_column_width(tablix, ns, col_idx),
+ "header": _get_column_header(tablix, ns, col_idx),
+ }
+ for row in tablix.findall(f".//{ns}TablixBody/{ns}TablixRows/{ns}TablixRow"):
+ cells = row.findall(f"{ns}TablixCells/{ns}TablixCell")
+ rt = _detect_row_type(cells, ns)
+ if rt not in ("header", "data"):
+ continue
+ if col_idx >= len(cells):
+ continue
+ tb = cells[col_idx].find(f".//{ns}Textbox")
+ if tb is None:
+ continue
+ prefix = "header_" if rt == "header" else ""
+ for prop, val in _get_textbox_color_props(tb, ns).items():
+ snap[f"{prefix}{prop}"] = val
+ if rt == "data":
+ fmt = _get_textrun_format(tb, ns)
+ if fmt:
+ snap["format"] = fmt
+ return snap
+
+
+def _apply_single_update(tablix: ET.Element, ns: str, update: Dict[str, Any]) -> List[str]:
+ """Apply one column-update dict to *tablix* in-memory.
+
+ Returns a list of human-readable error strings (empty means success).
+ """
+ errors: List[str] = []
+ col_idx: int = update["column_index"]
+
+ # --- width ---
+ if "width" in update and update["width"] is not None:
+ cols = tablix.findall(f".//{ns}TablixBody/{ns}TablixColumns/{ns}TablixColumn")
+ if col_idx < 0 or col_idx >= len(cols):
+ errors.append(f"Column index {col_idx} out of range for width update")
+ else:
+ w = cols[col_idx].find(f"{ns}Width")
+ if w is None:
+ w = ET.SubElement(cols[col_idx], f"{ns}Width")
+ w.text = update["width"]
+
+ # --- header + cell colors/alignment per row type ---
+ for row in tablix.findall(f".//{ns}TablixBody/{ns}TablixRows/{ns}TablixRow"):
+ cells = row.findall(f"{ns}TablixCells/{ns}TablixCell")
+ rt = _detect_row_type(cells, ns)
+ if rt not in ("header", "data"):
+ continue
+ if col_idx >= len(cells):
+ errors.append(f"Column index {col_idx} out of range for row type '{rt}'")
+ continue
+ tb = cells[col_idx].find(f".//{ns}Textbox")
+ if tb is None:
+ continue
+
+ if rt == "header":
+ # update header text
+ if "header" in update and update["header"] is not None:
+ v = tb.find(f".//{ns}Value")
+ if v is not None:
+ v.text = update["header"]
+ # header colors
+ style = _get_or_create_textbox_style(tb, ns)
+ if "header_text_color" in update and update["header_text_color"] is not None:
+ _set_style_child(style, ns, f"{ns}Color", update["header_text_color"])
+ if "header_background_color" in update and update["header_background_color"] is not None:
+ _set_style_child(style, ns, f"{ns}BackgroundColor", update["header_background_color"])
+ if "header_alignment" in update and update["header_alignment"] is not None:
+ _set_style_child(style, ns, f"{ns}TextAlign", update["header_alignment"])
+
+ elif rt == "data":
+ style = _get_or_create_textbox_style(tb, ns)
+ if "text_color" in update and update["text_color"] is not None:
+ _set_style_child(style, ns, f"{ns}Color", update["text_color"])
+ if "background_color" in update and update["background_color"] is not None:
+ _set_style_child(style, ns, f"{ns}BackgroundColor", update["background_color"])
+ if "alignment" in update and update["alignment"] is not None:
+ _set_style_child(style, ns, f"{ns}TextAlign", update["alignment"])
+ # format_string goes in TextRun/Style/Format
+ if "format_string" in update and update["format_string"] is not None:
+ text_run = tb.find(f".//{ns}TextRun")
+ if text_run is None:
+ errors.append(f"No TextRun found in column {col_idx} data cell")
+ continue
+ tr_style = text_run.find(f"{ns}Style")
+ if tr_style is None:
+ tr_style = ET.SubElement(text_run, f"{ns}Style")
+ fmt_elem = tr_style.find(f"{ns}Format")
+ if fmt_elem is None:
+ fmt_elem = ET.SubElement(tr_style, f"{ns}Format")
+ fmt_elem.text = update["format_string"]
+
+ return errors
+
+
+# ---------------------------------------------------------------------------
+# Public API
+# ---------------------------------------------------------------------------
+
+def update_table_batch(
+ filepath: str,
+ updates: List[Dict[str, Any]],
+ backup: bool = False,
+) -> Dict[str, Any]:
+ """Apply multiple column updates transactionally in a single write.
+
+ All updates are applied in-memory first. If any update produces an error
+ the entire operation is aborted (all-or-none) and the file is not written.
+ On success the file is written atomically (temp-validate-replace).
+
+ Args:
+ filepath: Path to the RDL file.
+ updates: List of column-update dicts. Each dict must contain
+ ``column_index`` (int) plus any of:
+ ``width``, ``header``, ``format_string``,
+ ``text_color``, ``background_color``,
+ ``header_text_color``, ``header_background_color``,
+ ``alignment`` (data cell), ``header_alignment`` (header cell).
+ backup: When True, save a timestamped ``.bak`` copy before writing.
+
+ Returns:
+ Mutation-response envelope with keys:
+ ``success``, ``mutation_id``, ``diff``, ``integrity_checks``,
+ ``backup_path``, ``message``.
+ """
+ mutation_id = str(uuid.uuid4())
+
+ if not updates:
+ return {
+ "success": False,
+ "mutation_id": mutation_id,
+ "message": "No updates provided",
+ "diff": [],
+ }
+
+ tree = parse_rdl_tree(filepath)
+ root = tree.getroot()
+ ns = get_namespace(root)
+
+ tablix = root.find(f".//{ns}Tablix")
+ if tablix is None:
+ return {
+ "success": False,
+ "mutation_id": mutation_id,
+ "message": "No Tablix found in report",
+ "error_code": "INVALID_RDL_SCHEMA",
+ "diff": [],
+ }
+
+ # Capture before-state for diff
+ col_indices = list({u["column_index"] for u in updates if "column_index" in u})
+ before_state = {idx: _snapshot_column(tablix, ns, idx) for idx in col_indices}
+
+ # Validate all updates before touching the tree (all-or-none)
+ cols_count = len(tablix.findall(f".//{ns}TablixBody/{ns}TablixColumns/{ns}TablixColumn"))
+ pre_errors: List[str] = []
+ for u in updates:
+ if "column_index" not in u:
+ pre_errors.append("Update missing required 'column_index' field")
+ continue
+ idx = u["column_index"]
+ if not isinstance(idx, int) or idx < 0 or idx >= cols_count:
+ pre_errors.append(
+ f"column_index {idx} is out of range (report has {cols_count} columns)"
+ )
+
+ if pre_errors:
+ return {
+ "success": False,
+ "mutation_id": mutation_id,
+ "message": "; ".join(pre_errors),
+ "error_code": "COLUMN_NOT_FOUND",
+ "diff": [],
+ }
+
+ # Apply all updates in-memory
+ all_errors: List[str] = []
+ for u in updates:
+ errs = _apply_single_update(tablix, ns, u)
+ all_errors.extend(errs)
+
+ if all_errors:
+ # Abort – do NOT write; return fresh (unmodified) tree to GC
+ return {
+ "success": False,
+ "mutation_id": mutation_id,
+ "message": "; ".join(all_errors),
+ "error_code": "INVALID_RDL_SCHEMA",
+ "diff": [],
+ }
+
+ # Recalculate Tablix width after width changes
+ _update_tablix_width(tablix, ns)
+
+ # Atomic write with optional backup
+ try:
+ backup_path = write_xml(tree, filepath, backup=backup)
+ except OSError as exc:
+ return {
+ "success": False,
+ "mutation_id": mutation_id,
+ "message": f"Write failed: {exc}",
+ "error_code": "WRITE_FAILURE",
+ "diff": [],
+ }
+
+ # Capture after-state for diff
+ after_tree = parse_rdl_tree(filepath)
+ after_root = after_tree.getroot()
+ after_tablix = after_root.find(f".//{ns}Tablix")
+ diff = []
+ for idx in sorted(col_indices):
+ after_snap = _snapshot_column(after_tablix, ns, idx) if after_tablix is not None else {}
+ before_snap = before_state[idx]
+ changes: Dict[str, Any] = {}
+ all_keys = set(before_snap) | set(after_snap)
+ for key in all_keys:
+ bv = before_snap.get(key)
+ av = after_snap.get(key)
+ if bv != av:
+ changes[key] = {"before": bv, "after": av}
+ if changes:
+ diff.append({"column_index": idx, "changes": changes})
+
+ return {
+ "success": True,
+ "mutation_id": mutation_id,
+ "message": f"Applied {len(updates)} update(s) across {len(col_indices)} column(s)",
+ "diff": diff,
+ "integrity_checks": {"xml_valid": True},
+ "backup_path": backup_path,
+ }
+
+
+def preview_table_batch(
+ filepath: str,
+ updates: List[Dict[str, Any]],
+) -> Dict[str, Any]:
+ """Preview what *update_table_batch* would do without writing any changes.
+
+ Returns the same diff structure as ``update_table_batch`` but with
+ ``dry_run: true`` and no file modification.
+ """
+ mutation_id = str(uuid.uuid4())
+
+ if not updates:
+ return {
+ "dry_run": True,
+ "mutation_id": mutation_id,
+ "message": "No updates provided",
+ "diff": [],
+ }
+
+ tree = parse_rdl_tree(filepath)
+ root = tree.getroot()
+ ns = get_namespace(root)
+
+ tablix = root.find(f".//{ns}Tablix")
+ if tablix is None:
+ return {
+ "dry_run": True,
+ "mutation_id": mutation_id,
+ "message": "No Tablix found in report",
+ "error_code": "INVALID_RDL_SCHEMA",
+ "diff": [],
+ }
+
+ cols_count = len(tablix.findall(f".//{ns}TablixBody/{ns}TablixColumns/{ns}TablixColumn"))
+ col_indices = list({u["column_index"] for u in updates if "column_index" in u})
+
+ # Validate indices
+ pre_errors: List[str] = []
+ for u in updates:
+ if "column_index" not in u:
+ pre_errors.append("Update missing required 'column_index' field")
+ continue
+ idx = u["column_index"]
+ if not isinstance(idx, int) or idx < 0 or idx >= cols_count:
+ pre_errors.append(
+ f"column_index {idx} is out of range (report has {cols_count} columns)"
+ )
+ if pre_errors:
+ return {
+ "dry_run": True,
+ "mutation_id": mutation_id,
+ "message": "; ".join(pre_errors),
+ "error_code": "COLUMN_NOT_FOUND",
+ "diff": [],
+ }
+
+ # Deep-copy the tablix so we can apply changes without touching the real tree
+ tablix_copy = copy.deepcopy(tablix)
+ before_state = {idx: _snapshot_column(tablix_copy, ns, idx) for idx in col_indices}
+
+ all_errors: List[str] = []
+ for u in updates:
+ errs = _apply_single_update(tablix_copy, ns, u)
+ all_errors.extend(errs)
+
+ if all_errors:
+ return {
+ "dry_run": True,
+ "mutation_id": mutation_id,
+ "message": "; ".join(all_errors),
+ "error_code": "INVALID_RDL_SCHEMA",
+ "diff": [],
+ }
+
+ # Build diff from the copied tree
+ diff = []
+ for idx in sorted(col_indices):
+ after_snap = _snapshot_column(tablix_copy, ns, idx)
+ before_snap = before_state[idx]
+ changes: Dict[str, Any] = {}
+ all_keys = set(before_snap) | set(after_snap)
+ for key in all_keys:
+ bv = before_snap.get(key)
+ av = after_snap.get(key)
+ if bv != av:
+ changes[key] = {"before": bv, "after": av}
+ if changes:
+ diff.append({"column_index": idx, "changes": changes})
+
+ return {
+ "dry_run": True,
+ "mutation_id": mutation_id,
+ "message": f"Preview: {len(updates)} update(s) across {len(col_indices)} column(s) — file NOT modified",
+ "diff": diff,
+ }
diff --git a/rdl_mcp/server.py b/rdl_mcp/server.py
index 7ed09e8..03a9320 100644
--- a/rdl_mcp/server.py
+++ b/rdl_mcp/server.py
@@ -1,14 +1,16 @@
"""MCP Server for RDL file operations."""
import logging
-from typing import Dict, Any, Optional
+from typing import Any, Dict, List, Optional
from fastmcp import FastMCP
-from . import reader
+from . import batch
from . import columns
from . import datasets
from . import parameters
+from . import reader
+from . import styles
from . import validation
logger = logging.getLogger(__name__)
@@ -16,6 +18,10 @@
mcp = FastMCP("rdl-mcp-server")
+# ---------------------------------------------------------------------------
+# Existing read tools
+# ---------------------------------------------------------------------------
+
@mcp.tool
def describe_rdl_report(filepath: str) -> Dict[str, Any]:
"""Get a high-level summary of the RDL report structure."""
@@ -47,6 +53,10 @@ def validate_rdl(filepath: str) -> Dict[str, Any]:
return validation.validate_rdl(filepath)
+# ---------------------------------------------------------------------------
+# Existing column mutation tools
+# ---------------------------------------------------------------------------
+
@mcp.tool
def update_column_header(filepath: str, old_header: str, new_header: str) -> Dict[str, Any]:
"""Update a column header text."""
@@ -93,6 +103,10 @@ def remove_column(filepath: str, column_index: int,
return columns.remove_column(filepath, column_index, auto_adjust_page_width)
+# ---------------------------------------------------------------------------
+# Existing dataset / parameter tools
+# ---------------------------------------------------------------------------
+
@mcp.tool
def update_stored_procedure(filepath: str, dataset_name: str, new_sproc: str) -> Dict[str, Any]:
"""Update the stored procedure for a dataset."""
@@ -125,6 +139,67 @@ def update_parameter(filepath: str, name: str, prompt: Optional[str] = None,
return parameters.update_parameter(filepath, name, prompt, default_value)
+# ---------------------------------------------------------------------------
+# New batch + style + layout tools
+# ---------------------------------------------------------------------------
+
+@mcp.tool
+def update_table_batch(
+ filepath: str,
+ updates: List[Dict[str, Any]],
+ backup: bool = False,
+) -> Dict[str, Any]:
+ """Apply multiple column updates (width/header/colors/format/alignment) in one atomic write.
+
+ All changes are applied in-memory first; if any update fails the file is
+ not written (all-or-none). Returns a mutation-response envelope including
+ a machine-readable before/after diff.
+
+ Each element of *updates* is a dict with ``column_index`` (int, required)
+ and any of: ``width``, ``header``, ``format_string``, ``text_color``,
+ ``background_color``, ``header_text_color``, ``header_background_color``,
+ ``alignment``, ``header_alignment``.
+
+ Set ``backup=True`` to save a timestamped ``.bak`` file before writing.
+ """
+ return batch.update_table_batch(filepath, updates, backup=backup)
+
+
+@mcp.tool
+def preview_table_batch(
+ filepath: str,
+ updates: List[Dict[str, Any]],
+) -> Dict[str, Any]:
+ """Preview what update_table_batch would do without modifying the file.
+
+ Returns the same diff structure as update_table_batch with dry_run=True.
+ The file is never written.
+ """
+ return batch.preview_table_batch(filepath, updates)
+
+
+@mcp.tool
+def get_column_styles(filepath: str) -> Dict[str, Any]:
+ """Get effective style properties for every column (colors, format, alignment)."""
+ return styles.get_column_styles(filepath)
+
+
+@mcp.tool
+def get_table_styles(filepath: str) -> Dict[str, Any]:
+ """Get table-level style properties (border, background, font defaults)."""
+ return styles.get_table_styles(filepath)
+
+
+@mcp.tool
+def get_layout_diagnostics(filepath: str) -> Dict[str, Any]:
+ """Get layout diagnostics: page size, margins, tablix width, and overflow/fit status."""
+ return styles.get_layout_diagnostics(filepath)
+
+
+# ---------------------------------------------------------------------------
+# MCPServer wrapper (backward-compatible)
+# ---------------------------------------------------------------------------
+
class MCPServer:
"""Thin wrapper around RDL operations, kept for backward compatibility."""
@@ -189,6 +264,24 @@ def update_parameter(self, filepath: str, name: str, prompt: Optional[str] = Non
default_value: Optional[str] = None) -> Dict[str, Any]:
return parameters.update_parameter(filepath, name, prompt, default_value)
+ # --- new tools ---
+
+ def update_table_batch(self, filepath: str, updates: List[Dict[str, Any]],
+ backup: bool = False) -> Dict[str, Any]:
+ return batch.update_table_batch(filepath, updates, backup=backup)
+
+ def preview_table_batch(self, filepath: str, updates: List[Dict[str, Any]]) -> Dict[str, Any]:
+ return batch.preview_table_batch(filepath, updates)
+
+ def get_column_styles(self, filepath: str) -> Dict[str, Any]:
+ return styles.get_column_styles(filepath)
+
+ def get_table_styles(self, filepath: str) -> Dict[str, Any]:
+ return styles.get_table_styles(filepath)
+
+ def get_layout_diagnostics(self, filepath: str) -> Dict[str, Any]:
+ return styles.get_layout_diagnostics(filepath)
+
def _extract_field_references_with_context(self, expression: str, default_dataset: str) -> Dict[str, Any]:
return validation.extract_field_references_with_context(expression, default_dataset)
@@ -200,3 +293,4 @@ def run_server():
"""Run the MCP server."""
mcp.run()
+
diff --git a/rdl_mcp/styles.py b/rdl_mcp/styles.py
new file mode 100644
index 0000000..6720057
--- /dev/null
+++ b/rdl_mcp/styles.py
@@ -0,0 +1,190 @@
+"""RDL style and layout read operations."""
+
+import logging
+from typing import Any, Dict, List, Optional
+
+from .reader import _detect_row_type
+from .xml_utils import get_namespace, parse_rdl
+from .columns import _parse_dimension
+
+logger = logging.getLogger(__name__)
+
+
+def _get_textbox_style(textbox, ns: str) -> Dict[str, Optional[str]]:
+ """Extract color/alignment properties from a Textbox's Style element."""
+ style = textbox.find(f"{ns}Style")
+ if style is None:
+ return {}
+ result: Dict[str, Optional[str]] = {}
+ for tag, key in (
+ (f"{ns}Color", "text_color"),
+ (f"{ns}BackgroundColor", "background_color"),
+ (f"{ns}TextAlign", "alignment"),
+ (f"{ns}FontWeight", "font_weight"),
+ (f"{ns}FontSize", "font_size"),
+ (f"{ns}FontFamily", "font_family"),
+ ):
+ elem = style.find(tag)
+ if elem is not None and elem.text:
+ result[key] = elem.text
+ return result
+
+
+def _get_textrun_format(textbox, ns: str) -> Optional[str]:
+ """Return the Format string from the first TextRun in *textbox*, if any."""
+ fmt = textbox.find(f".//{ns}TextRun/{ns}Style/{ns}Format")
+ return fmt.text if fmt is not None and fmt.text else None
+
+
+def get_column_styles(filepath: str) -> Dict[str, Any]:
+ """Return effective style properties for every column in the Tablix.
+
+ For each column the response includes header and data cell properties:
+ text_color, background_color, alignment, format, font_weight,
+ font_size, font_family.
+ """
+ root = parse_rdl(filepath)
+ ns = get_namespace(root)
+
+ tablix = root.find(f".//{ns}Tablix")
+ if tablix is None:
+ return {"columns": [], "error": "No Tablix found"}
+
+ # widths
+ tablix_cols = tablix.findall(f".//{ns}TablixColumns/{ns}TablixColumn")
+ widths: List[str] = []
+ for col in tablix_cols:
+ w = col.find(f"{ns}Width")
+ widths.append(w.text if w is not None else "")
+
+ # rows
+ tablix_rows = tablix.findall(f".//{ns}TablixBody/{ns}TablixRows/{ns}TablixRow")
+ header_cells: List = []
+ data_cells: List = []
+
+ for row in tablix_rows:
+ cells = row.findall(f"{ns}TablixCells/{ns}TablixCell")
+ rt = _detect_row_type(cells, ns)
+ if rt == "header" and not header_cells:
+ header_cells = cells
+ elif rt == "data" and not data_cells:
+ data_cells = cells
+
+ num_cols = max(len(widths), len(header_cells), len(data_cells))
+ columns = []
+ for idx in range(num_cols):
+ col_info: Dict[str, Any] = {
+ "index": idx,
+ "width": widths[idx] if idx < len(widths) else "",
+ "header": {},
+ "data": {},
+ }
+
+ if idx < len(header_cells):
+ tb = header_cells[idx].find(f".//{ns}Textbox")
+ if tb is not None:
+ col_info["header"] = _get_textbox_style(tb, ns)
+
+ if idx < len(data_cells):
+ tb = data_cells[idx].find(f".//{ns}Textbox")
+ if tb is not None:
+ col_info["data"] = _get_textbox_style(tb, ns)
+ fmt = _get_textrun_format(tb, ns)
+ if fmt:
+ col_info["data"]["format"] = fmt
+
+ columns.append(col_info)
+
+ return {"columns": columns}
+
+
+def get_table_styles(filepath: str) -> Dict[str, Any]:
+ """Return table-level style properties (border, background, font defaults).
+
+ Reads the Style element directly on the Tablix element, if present.
+ """
+ root = parse_rdl(filepath)
+ ns = get_namespace(root)
+
+ tablix = root.find(f".//{ns}Tablix")
+ if tablix is None:
+ return {"table_styles": {}, "error": "No Tablix found"}
+
+ tablix_name = tablix.get("Name", "")
+ style = tablix.find(f"{ns}Style")
+ table_style: Dict[str, Any] = {}
+
+ if style is not None:
+ for tag, key in (
+ (f"{ns}Color", "text_color"),
+ (f"{ns}BackgroundColor", "background_color"),
+ (f"{ns}FontWeight", "font_weight"),
+ (f"{ns}FontSize", "font_size"),
+ (f"{ns}FontFamily", "font_family"),
+ (f"{ns}Border", "border"),
+ ):
+ elem = style.find(tag)
+ if elem is not None and elem.text:
+ table_style[key] = elem.text
+
+ return {"table_name": tablix_name, "table_styles": table_style}
+
+
+def _page_dim(page, tag: str, ns: str) -> float:
+ """Helper: parse a page dimension element, returning 0.0 if absent."""
+ elem = page.find(tag) if page is not None else None
+ return _parse_dimension(elem.text) if (elem is not None and elem.text) else 0.0
+
+
+def get_layout_diagnostics(filepath: str) -> Dict[str, Any]:
+ """Return layout diagnostics for the report.
+
+ Includes:
+ - page_width, page_height
+ - left_margin, right_margin, top_margin, bottom_margin
+ - printable_width (page_width - left_margin - right_margin)
+ - tablix_width (sum of all column widths)
+ - tablix_fits (tablix_width <= printable_width)
+ - overflow_inches (max(0, tablix_width - printable_width))
+ - column_count
+ """
+ root = parse_rdl(filepath)
+ ns = get_namespace(root)
+
+ page = root.find(f".//{ns}Page")
+ page_width_in = _page_dim(page, f"{ns}PageWidth", ns)
+ page_height_in = _page_dim(page, f"{ns}PageHeight", ns)
+ left_margin_in = _page_dim(page, f"{ns}LeftMargin", ns)
+ right_margin_in = _page_dim(page, f"{ns}RightMargin", ns)
+ top_margin_in = _page_dim(page, f"{ns}TopMargin", ns)
+ bottom_margin_in = _page_dim(page, f"{ns}BottomMargin", ns)
+
+ tablix = root.find(f".//{ns}Tablix")
+ tablix_width_in = 0.0
+ column_count = 0
+
+ if tablix is not None:
+ tablix_cols = tablix.findall(f".//{ns}TablixColumns/{ns}TablixColumn")
+ column_count = len(tablix_cols)
+ for col in tablix_cols:
+ w = col.find(f"{ns}Width")
+ if w is not None and w.text:
+ tablix_width_in += _parse_dimension(w.text)
+
+ printable_width_in = page_width_in - left_margin_in - right_margin_in
+ tablix_fits = tablix_width_in <= printable_width_in + 1e-6
+ overflow_in = max(0.0, tablix_width_in - printable_width_in)
+
+ return {
+ "page_width": f"{page_width_in:.4f}in",
+ "page_height": f"{page_height_in:.4f}in",
+ "left_margin": f"{left_margin_in:.4f}in",
+ "right_margin": f"{right_margin_in:.4f}in",
+ "top_margin": f"{top_margin_in:.4f}in",
+ "bottom_margin": f"{bottom_margin_in:.4f}in",
+ "printable_width": f"{printable_width_in:.4f}in",
+ "tablix_width": f"{tablix_width_in:.4f}in",
+ "tablix_fits": tablix_fits,
+ "overflow_inches": round(overflow_in, 4),
+ "column_count": column_count,
+ }
diff --git a/rdl_mcp/xml_utils.py b/rdl_mcp/xml_utils.py
index cb1c67b..d6ecb02 100644
--- a/rdl_mcp/xml_utils.py
+++ b/rdl_mcp/xml_utils.py
@@ -5,11 +5,63 @@
import os
import re
import logging
-from typing import Optional
+import shutil
+import tempfile
+import threading
+import datetime
+from typing import Dict, Optional
logger = logging.getLogger(__name__)
+class ErrorCode:
+ """Structured error category constants for actionable diagnostics."""
+
+ EMPTY_FILE = "EMPTY_FILE"
+ MALFORMED_XML = "MALFORMED_XML"
+ INVALID_RDL_SCHEMA = "INVALID_RDL_SCHEMA"
+ LOCK_CONFLICT = "LOCK_CONFLICT"
+ WRITE_FAILURE = "WRITE_FAILURE"
+ FILE_NOT_FOUND = "FILE_NOT_FOUND"
+ INVALID_FILEPATH = "INVALID_FILEPATH"
+ COLUMN_NOT_FOUND = "COLUMN_NOT_FOUND"
+ DATASET_NOT_FOUND = "DATASET_NOT_FOUND"
+ PARAMETER_NOT_FOUND = "PARAMETER_NOT_FOUND"
+ INVALID_ARGUMENT = "INVALID_ARGUMENT"
+
+
+class FileLockManager:
+ """Singleton per-file threading-lock manager.
+
+ Guarantees that concurrent mutation calls for the *same resolved path*
+ are serialised, preventing write-over-write file corruption.
+ """
+
+ _singleton_lock: threading.Lock = threading.Lock()
+ _instance: Optional["FileLockManager"] = None
+ _file_locks: Dict[str, threading.Lock]
+
+ def __init__(self) -> None:
+ self._file_locks = {}
+ self._registry_lock = threading.Lock()
+
+ @classmethod
+ def get_instance(cls) -> "FileLockManager":
+ if cls._instance is None:
+ with cls._singleton_lock:
+ if cls._instance is None:
+ cls._instance = cls()
+ return cls._instance
+
+ def get_lock(self, filepath: str) -> threading.Lock:
+ """Return (creating if necessary) the lock for *filepath*."""
+ key = os.path.realpath(filepath) if os.path.exists(filepath) else os.path.abspath(filepath)
+ with self._registry_lock:
+ if key not in self._file_locks:
+ self._file_locks[key] = threading.Lock()
+ return self._file_locks[key]
+
+
def validate_filepath(filepath: str) -> str:
"""Validate and resolve a filepath for safe access.
@@ -106,21 +158,72 @@ def indent_xml(elem: ET.Element, level: int = 0):
elem.tail = indent
-def write_xml(tree: ET.ElementTree, filepath: str):
- """Write an ElementTree to file with proper formatting."""
+def write_xml(tree: ET.ElementTree, filepath: str,
+ backup: bool = False) -> Optional[str]:
+ """Write an ElementTree to file atomically with proper formatting.
+
+ Steps:
+ 1. Acquire per-file threading lock (prevents concurrent write corruption).
+ 2. Optionally create a timestamped backup of the current file.
+ 3. Write formatted XML to a sibling temp file.
+ 4. Validate the temp file is parseable XML.
+ 5. Atomically replace the original (``os.replace``).
+ 6. Release the lock (even on failure; original file is never touched).
+
+ Args:
+ tree: The ElementTree to write.
+ filepath: Destination path (must already have been validated).
+ backup: When True, save a timestamped ``.bak`` copy before replacing.
+
+ Returns:
+ The backup path when *backup* is True and a backup was created,
+ otherwise ``None``.
+
+ Raises:
+ OSError: If the temp write or atomic replace fails.
+ """
root = tree.getroot()
indent_xml(root)
- # Write with XML declaration
- with open(filepath, 'wb') as f:
- tree.write(f, encoding='utf-8', xml_declaration=True)
+ file_lock = FileLockManager.get_instance().get_lock(filepath)
+ backup_path: Optional[str] = None
- # Fix the XML declaration to use double quotes (SSRS preference)
- with open(filepath, 'r') as f:
- content = f.read()
- content = content.replace("'", '"', 2) # Fix XML declaration quotes
- with open(filepath, 'w') as f:
- f.write(content)
+ with file_lock:
+ # --- optional backup ---
+ if backup and os.path.isfile(filepath):
+ timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
+ backup_path = f"{filepath}.{timestamp}.bak"
+ shutil.copy2(filepath, backup_path)
+
+ # --- write to temp file ---
+ dir_name = os.path.dirname(os.path.abspath(filepath))
+ tmp_fd, tmp_path = tempfile.mkstemp(suffix=".tmp", dir=dir_name)
+ try:
+ with os.fdopen(tmp_fd, "wb") as tmp_f:
+ tree.write(tmp_f, encoding="utf-8", xml_declaration=True)
+
+ # Fix XML declaration quotes (SSRS preference: double quotes)
+ with open(tmp_path, "r", encoding="utf-8") as f:
+ content = f.read()
+ content = content.replace("'", '"', 2)
+ with open(tmp_path, "w", encoding="utf-8") as f:
+ f.write(content)
+
+ # --- validate the temp file before replacing the original ---
+ SafeET.parse(tmp_path)
+
+ # --- atomic replace ---
+ os.replace(tmp_path, filepath)
+
+ except Exception:
+ # Rollback: remove temp file; original is untouched
+ try:
+ os.unlink(tmp_path)
+ except OSError:
+ pass
+ raise
+
+ return backup_path
def find_parent(root: ET.Element, target: ET.Element) -> Optional[ET.Element]:
diff --git a/tests/test_rdl_mcp_server.py b/tests/test_rdl_mcp_server.py
index 0a72312..83497a1 100644
--- a/tests/test_rdl_mcp_server.py
+++ b/tests/test_rdl_mcp_server.py
@@ -980,6 +980,12 @@ class TestFastMCPServer:
'remove_dataset_field',
'add_parameter',
'update_parameter',
+ # new tools
+ 'update_table_batch',
+ 'preview_table_batch',
+ 'get_column_styles',
+ 'get_table_styles',
+ 'get_layout_diagnostics',
}
def _get_tools(self):
@@ -998,5 +1004,309 @@ def test_tool_count(self):
assert len(self._get_tools()) == len(self.EXPECTED_TOOLS)
+# ---------------------------------------------------------------------------
+# New feature tests
+# ---------------------------------------------------------------------------
+
+class TestUpdateTableBatch:
+ """Tests for the transactional update_table_batch operation."""
+
+ def test_batch_updates_width_and_header(self, server, temp_report):
+ updates = [
+ {'column_index': 0, 'width': '2in', 'header': 'Identifier'},
+ {'column_index': 1, 'width': '3in'},
+ ]
+ result = server.update_table_batch(temp_report, updates)
+
+ assert result['success'] is True
+ assert 'mutation_id' in result
+ assert 'diff' in result
+
+ cols = server.get_rdl_columns(temp_report)['columns']
+ assert cols[0]['width'] == '2in'
+ assert cols[0]['header'] == 'Identifier'
+ assert cols[1]['width'] == '3in'
+
+ def test_batch_updates_format_and_colors(self, server, temp_report):
+ updates = [
+ {
+ 'column_index': 2,
+ 'format_string': 'C2',
+ 'text_color': 'Red',
+ 'background_color': 'LightYellow',
+ 'header_text_color': 'White',
+ 'header_background_color': 'DarkBlue',
+ }
+ ]
+ result = server.update_table_batch(temp_report, updates)
+
+ assert result['success'] is True
+
+ import xml.etree.ElementTree as ET
+ ns = '{http://schemas.microsoft.com/sqlserver/reporting/2016/01/reportdefinition}'
+ root = ET.parse(temp_report).getroot()
+ data_tb = root.find(f'.//{ns}Textbox[@Name="DataAmount"]')
+ assert data_tb is not None
+ assert data_tb.find(f'{ns}Style/{ns}Color').text == 'Red'
+ assert data_tb.find(f'{ns}Style/{ns}BackgroundColor').text == 'LightYellow'
+ tr_fmt = data_tb.find(f'.//{ns}TextRun/{ns}Style/{ns}Format')
+ assert tr_fmt is not None and tr_fmt.text == 'C2'
+
+ header_tb = root.find(f'.//{ns}Textbox[@Name="HeaderAmount"]')
+ assert header_tb.find(f'{ns}Style/{ns}Color').text == 'White'
+ assert header_tb.find(f'{ns}Style/{ns}BackgroundColor').text == 'DarkBlue'
+
+ def test_batch_returns_diff(self, server, temp_report):
+ updates = [{'column_index': 0, 'width': '1.5in'}]
+ result = server.update_table_batch(temp_report, updates)
+
+ assert result['success'] is True
+ diff = result['diff']
+ assert len(diff) == 1
+ assert diff[0]['column_index'] == 0
+ assert 'width' in diff[0]['changes']
+ assert diff[0]['changes']['width']['after'] == '1.5in'
+
+ def test_batch_all_or_none_on_bad_index(self, server, temp_report):
+ # One valid + one invalid column_index – should fail, original untouched
+ original_cols = server.get_rdl_columns(temp_report)['columns']
+ updates = [
+ {'column_index': 0, 'width': '5in'}, # valid
+ {'column_index': 99, 'width': '1in'}, # invalid
+ ]
+ result = server.update_table_batch(temp_report, updates)
+
+ assert result['success'] is False
+ # File must remain unmodified
+ after_cols = server.get_rdl_columns(temp_report)['columns']
+ assert after_cols[0]['width'] == original_cols[0]['width']
+
+ def test_batch_empty_updates_returns_error(self, server, temp_report):
+ result = server.update_table_batch(temp_report, [])
+ assert result['success'] is False
+
+ def test_batch_returns_integrity_checks(self, server, temp_report):
+ updates = [{'column_index': 0, 'header': 'New Header'}]
+ result = server.update_table_batch(temp_report, updates)
+ assert result['success'] is True
+ assert result.get('integrity_checks', {}).get('xml_valid') is True
+
+ def test_batch_backup_creates_bak_file(self, server, temp_report):
+ updates = [{'column_index': 0, 'width': '1.2in'}]
+ result = server.update_table_batch(temp_report, updates, backup=True)
+
+ assert result['success'] is True
+ backup_path = result.get('backup_path')
+ assert backup_path is not None
+ assert os.path.isfile(backup_path)
+ # Cleanup
+ os.unlink(backup_path)
+
+ def test_batch_no_backup_by_default(self, server, temp_report):
+ updates = [{'column_index': 0, 'width': '1.2in'}]
+ result = server.update_table_batch(temp_report, updates, backup=False)
+
+ assert result['success'] is True
+ assert result.get('backup_path') is None
+
+ def test_batch_alignment_update(self, server, temp_report):
+ updates = [{'column_index': 2, 'alignment': 'Right'}]
+ result = server.update_table_batch(temp_report, updates)
+
+ assert result['success'] is True
+ import xml.etree.ElementTree as ET
+ ns = '{http://schemas.microsoft.com/sqlserver/reporting/2016/01/reportdefinition}'
+ root = ET.parse(temp_report).getroot()
+ data_tb = root.find(f'.//{ns}Textbox[@Name="DataAmount"]')
+ assert data_tb.find(f'{ns}Style/{ns}TextAlign').text == 'Right'
+
+
+class TestPreviewTableBatch:
+ """Tests for the dry-run preview_table_batch operation."""
+
+ def test_preview_does_not_modify_file(self, server, temp_report):
+ with open(temp_report, 'rb') as f:
+ original_bytes = f.read()
+
+ updates = [{'column_index': 0, 'width': '5in', 'header': 'ChangedHeader'}]
+ result = server.preview_table_batch(temp_report, updates)
+
+ assert result.get('dry_run') is True
+ with open(temp_report, 'rb') as f:
+ after_bytes = f.read()
+ assert original_bytes == after_bytes
+
+ def test_preview_returns_diff(self, server, temp_report):
+ updates = [{'column_index': 0, 'width': '3in'}]
+ result = server.preview_table_batch(temp_report, updates)
+
+ assert result.get('dry_run') is True
+ diff = result['diff']
+ assert len(diff) == 1
+ assert diff[0]['changes']['width']['after'] == '3in'
+
+ def test_preview_invalid_index_returns_error(self, server, temp_report):
+ updates = [{'column_index': 99, 'width': '1in'}]
+ result = server.preview_table_batch(temp_report, updates)
+
+ assert result.get('dry_run') is True
+ assert 'error_code' in result
+
+ def test_preview_mutation_id_present(self, server, temp_report):
+ updates = [{'column_index': 0, 'header': 'X'}]
+ result = server.preview_table_batch(temp_report, updates)
+ assert 'mutation_id' in result
+
+
+class TestGetColumnStyles:
+ """Tests for get_column_styles read API."""
+
+ def test_returns_columns_list(self, server, temp_report):
+ result = server.get_column_styles(temp_report)
+ assert 'columns' in result
+ assert len(result['columns']) == 3
+
+ def test_columns_have_index_and_width(self, server, temp_report):
+ result = server.get_column_styles(temp_report)
+ col = result['columns'][0]
+ assert col['index'] == 0
+ assert col['width'] != ''
+
+ def test_reflects_updated_colors(self, server, temp_report):
+ # Apply a color then verify it shows in get_column_styles
+ server.update_column_colors(temp_report, 0, text_color='Navy')
+
+ result = server.get_column_styles(temp_report)
+ data_style = result['columns'][0]['data']
+ assert data_style.get('text_color') == 'Navy'
+
+ def test_reflects_format(self, server, temp_report):
+ server.update_column_format(temp_report, 2, 'P2')
+ result = server.get_column_styles(temp_report)
+ assert result['columns'][2]['data'].get('format') == 'P2'
+
+
+class TestGetTableStyles:
+ """Tests for get_table_styles read API."""
+
+ def test_returns_table_name(self, server, temp_report):
+ result = server.get_table_styles(temp_report)
+ assert 'table_name' in result
+ assert result['table_name'] == 'MainTable'
+
+ def test_returns_table_styles_dict(self, server, temp_report):
+ result = server.get_table_styles(temp_report)
+ assert 'table_styles' in result
+ assert isinstance(result['table_styles'], dict)
+
+
+class TestGetLayoutDiagnostics:
+ """Tests for get_layout_diagnostics read API."""
+
+ def test_returns_page_dimensions(self, server, temp_report):
+ result = server.get_layout_diagnostics(temp_report)
+ assert 'page_width' in result
+ assert result['page_width'] == '8.5000in'
+ assert result['page_height'] == '11.0000in'
+
+ def test_returns_margins(self, server, temp_report):
+ result = server.get_layout_diagnostics(temp_report)
+ assert result['left_margin'] == '0.5000in'
+ assert result['right_margin'] == '0.5000in'
+
+ def test_returns_printable_width(self, server, temp_report):
+ result = server.get_layout_diagnostics(temp_report)
+ # 8.5 - 0.5 - 0.5 = 7.5
+ assert result['printable_width'] == '7.5000in'
+
+ def test_returns_tablix_width(self, server, temp_report):
+ result = server.get_layout_diagnostics(temp_report)
+ # 1in + 2in + 1.5in = 4.5in
+ assert result['tablix_width'] == '4.5000in'
+
+ def test_tablix_fits_flag(self, server, temp_report):
+ result = server.get_layout_diagnostics(temp_report)
+ assert result['tablix_fits'] is True
+ assert result['overflow_inches'] == 0.0
+
+ def test_overflow_detected_when_columns_exceed_page(self, server, temp_report):
+ # Widen columns to force overflow
+ server.update_column_width(temp_report, 0, '4in')
+ server.update_column_width(temp_report, 1, '4in')
+ server.update_column_width(temp_report, 2, '4in')
+
+ result = server.get_layout_diagnostics(temp_report)
+ assert result['tablix_fits'] is False
+ assert result['overflow_inches'] > 0
+
+ def test_returns_column_count(self, server, temp_report):
+ result = server.get_layout_diagnostics(temp_report)
+ assert result['column_count'] == 3
+
+
+class TestAtomicWriteAndConcurrency:
+ """Tests for atomic write safety and per-file concurrency protection."""
+
+ def test_file_not_corrupted_after_write(self, server, temp_report):
+ """Written file must be parseable XML (no half-writes or empty file)."""
+ import xml.etree.ElementTree as ET
+ server.update_column_width(temp_report, 0, '1.5in')
+
+ # File must be non-empty valid XML
+ assert os.path.getsize(temp_report) > 0
+ tree = ET.parse(temp_report)
+ assert tree.getroot() is not None
+
+ def test_concurrent_writes_do_not_corrupt(self, server, temp_report):
+ """Multiple threads writing to the same file should not corrupt it."""
+ import threading
+ import xml.etree.ElementTree as ET
+
+ errors = []
+
+ def write_width(idx, width):
+ try:
+ server.update_column_width(temp_report, idx % 3, width)
+ except Exception as exc:
+ errors.append(exc)
+
+ threads = [
+ threading.Thread(target=write_width, args=(i, f'{(i % 3) + 1}in'))
+ for i in range(10)
+ ]
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join()
+
+ assert errors == [], f"Thread errors: {errors}"
+ # File must still be valid XML after concurrent writes
+ assert os.path.getsize(temp_report) > 0
+ tree = ET.parse(temp_report)
+ assert tree.getroot() is not None
+
+ def test_backup_file_contains_original_content(self, server, temp_report):
+ """Backup must contain the file content as it was before the write."""
+ with open(temp_report, 'r') as f:
+ original_content = f.read()
+
+ new_header = 'UNIQUE_BATCH_HEADER_XYZ'
+ updates = [{'column_index': 0, 'header': new_header}]
+ result = server.update_table_batch(temp_report, updates, backup=True)
+
+ assert result['success'] is True
+ backup_path = result['backup_path']
+ try:
+ with open(backup_path, 'r') as f:
+ backup_content = f.read()
+ # Backup was taken before the write, so new header must NOT be in it
+ assert new_header not in backup_content
+ assert original_content == backup_content
+ finally:
+ if os.path.isfile(backup_path):
+ os.unlink(backup_path)
+
+
+
if __name__ == '__main__':
pytest.main([__file__, '-v'])