diff --git a/README.MD b/README.MD index 80001c55..51c2ad84 100644 --- a/README.MD +++ b/README.MD @@ -130,6 +130,7 @@ The following table provides an overview of the current servers implemented in g | Google Docs | OAuth 2.0 | [✅ Seamless with Gumloop auth](https://www.gumloop.com/mcp/gdocs) | ⚠️ Requires GCP project & OAuth setup | [GDocs Docs](/src/servers/gdocs/README.md) | | Google Drive | OAuth 2.0 | [✅ Seamless with Gumloop auth](https://www.gumloop.com/mcp/gdrive) | ⚠️ Requires GCP project & OAuth setup | [GDrive Docs](/src/servers/gdrive/README.md) | | Google Calendar | OAuth 2.0 | [✅ Seamless with Gumloop auth](https://www.gumloop.com/mcp/gcalendar) | ⚠️ Requires GCP project & OAuth setup | [GCalendar Docs](/src/servers/gcalendar/README.md) | +| Google Forms | OAuth 2.0 | ⚠️ Coming soon | ⚠️ Requires GCP project & OAuth setup | [GForms Docs](/src/servers/gforms/README.md) | | Google Maps | API Key | ⚠️ Coming soon | ⚠️ Requires GCP project & API Key | [GMaps Docs](/src/servers/gmaps/README.md) | | Google Meet | OAuth 2.0 | [✅ Seamless with Gumloop auth](https://www.gumloop.com/mcp/gmeet) | ⚠️ Requires GCP project & OAuth setup | [GMeet Docs](/src/servers/gmeet/README.md) | | YouTube | OAuth 2.0 | [✅ Seamless with Gumloop auth](https://www.gumloop.com/mcp/youtube) | ⚠️ Requires GCP project & OAuth setup | [YouTube Docs](/src/servers/youtube/README.md) | diff --git a/src/servers/gforms/README.md b/src/servers/gforms/README.md new file mode 100644 index 00000000..f051f387 --- /dev/null +++ b/src/servers/gforms/README.md @@ -0,0 +1,113 @@ +# Google Forms Server + +guMCP server implementation for interacting with the **Google Forms API** to manage Google Forms and their responses. + +--- + +### 🚀 Prerequisites + +- Python 3.11+ +- A **Google Cloud project** with the following APIs enabled: + - Google Forms API + - Google Drive API + +--- + +### 🔐 Google Cloud Project Setup (First-time Setup) + +1. **Log in to the [Google Cloud Console](https://console.cloud.google.com/)** +2. Create a new project or select an existing one +3. Enable the required APIs: + - Google Forms API + - Google Drive API +4. Navigate to **APIs & Services** → **Credentials** +5. Click **Create Credentials** → **OAuth client ID** +6. Configure the OAuth consent screen if not already done +7. Select **Web application** as the application type +8. Click **Create** +9. Download the OAuth client configuration JSON file + +--- + +### 📄 Local OAuth Credentials + +Place the downloaded OAuth configuration JSON file at: + +``` +local_auth/oauth_configs/gforms/oauth.json +``` + +The file should contain your client ID and client secret. + +--- + +### 🔓 Authenticate with Google Forms + +Run the following command to initiate the OAuth login: + +```bash +python src/servers/gforms/main.py auth +``` + +This will open your browser and prompt you to log in to your Google account. After successful authentication, the access credentials will be saved locally to: + +``` +local_auth/credentials/gforms/local_credentials.json +``` + +--- + +### 🛠 Features + +This server exposes tools for the following operations: + +#### 📋 Form Management +- `list_forms` – List all forms in your Google Drive +- `create_form` – Create a new form with title, description, and visibility settings +- `get_form` – Retrieve detailed information about a specific form +- `update_form` – Modify form details (title, description, visibility) +- `move_form_to_trash` – Move a form to trash +- `search_forms` – Search for forms by name + +#### ❓ Question Management +- `add_question` – Add a question to an existing form (supports text, paragraph, multiple choice, and checkbox types) +- `delete_item` – Delete a question from an existing form + +#### 📊 Response Management +- `list_responses` – Get all responses for a specific form +- `get_response` – Retrieve detailed information about a specific response + +--- + +### ▶️ Running the Server and Client + +#### 1. Start the Server + +```bash +./start_sse_dev_server.sh +``` + +Make sure you've already authenticated using the `auth` command. + +#### 2. Run the Client + +```bash +python RemoteMCPTestClient.py --endpoint http://localhost:8000/gforms/local +``` + +--- + +### 📌 Notes on Google Forms API Usage + +- Ensure your OAuth app has the necessary API scopes enabled +- When creating forms, you can specify whether they should be public or private +- Question types supported include: text, paragraph, multiple choice, and checkbox +- Make sure your `.env` file contains the appropriate API keys if using external LLM services + +--- + +### 📚 Resources + +- [Google Forms API Documentation](https://developers.google.com/forms/api) +- [Google Drive API Documentation](https://developers.google.com/drive/api) +- [OAuth 2.0 in Google APIs](https://developers.google.com/identity/protocols/oauth2) diff --git a/src/servers/gforms/assets/icon.png b/src/servers/gforms/assets/icon.png new file mode 100644 index 00000000..d9ca6ac0 Binary files /dev/null and b/src/servers/gforms/assets/icon.png differ diff --git a/src/servers/gforms/config.yaml b/src/servers/gforms/config.yaml new file mode 100644 index 00000000..62bfbbc9 --- /dev/null +++ b/src/servers/gforms/config.yaml @@ -0,0 +1,25 @@ +name: "Google Forms guMCP Server" +icon: "assets/icon.png" +description: "Interact with Google Forms using the Google Forms API" +documentation_path: "README.md" +tools: + - name: "list_forms" + description: "List all forms." + - name: "create_form" + description: "Creates a new form." + - name: "get_form" + description: "Retrieves an existing form by its ID." + - name: "update_form" + description: "Updates an existing form by its ID." + - name: "move_form_to_trash" + description: "Removes a form and moves it to trash." + - name: "list_responses" + description: "Retrieves a list of responses." + - name: "get_response" + description: "Retrieves the details of a response by its ID." + - name: "search_forms" + description: "Retrieves a list of forms by name." + - name: "add_question" + description: "Add a question to an existing Google Form." + - name: "delete_item" + description: "Deletes an item (question) from an existing Google Form." diff --git a/src/servers/gforms/main.py b/src/servers/gforms/main.py new file mode 100644 index 00000000..2aad3fce --- /dev/null +++ b/src/servers/gforms/main.py @@ -0,0 +1,983 @@ +import os +import sys +import logging +import json +from pathlib import Path + +# Add both project root and src directory to Python path +project_root = os.path.abspath( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) +) +sys.path.insert(0, project_root) +sys.path.insert(0, os.path.join(project_root, "src")) + +import mcp.types as types +from typing import Optional, Iterable +from mcp.types import Resource +from pydantic import AnyUrl + +from mcp.server import NotificationOptions, Server +from mcp.server.models import InitializationOptions + +from google.oauth2.credentials import Credentials +from googleapiclient.discovery import build +from mcp.server.lowlevel.helper_types import ReadResourceContents + + +from src.utils.google.util import authenticate_and_save_credentials +from src.auth.factory import create_auth_client + +SERVICE_NAME = Path(__file__).parent.name +SCOPES = [ + "https://www.googleapis.com/auth/forms", + "https://www.googleapis.com/auth/forms.body", + "https://www.googleapis.com/auth/forms.responses.readonly", + "https://www.googleapis.com/auth/drive", +] + +# Configure logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger("gforms-server") + + +async def get_credentials(user_id, api_key=None): + """Get stored or active credentials for Google Forms API.""" + auth_client = create_auth_client(api_key=api_key) + credentials_data = auth_client.get_user_credentials(SERVICE_NAME, user_id) + + if not credentials_data: + raise ValueError( + f"Credentials not found for user {user_id}. Run with 'auth' first." + ) + + token = credentials_data.get("token") + if token: + return Credentials.from_authorized_user_info(credentials_data) + access_token = credentials_data.get("access_token") + if access_token: + return Credentials(token=access_token) + + raise ValueError(f"Valid token not found for user {user_id}") + + +async def create_forms_service(user_id, api_key=None): + """Create an authorized Google Forms API service.""" + credentials = await get_credentials(user_id, api_key=api_key) + return build("forms", "v1", credentials=credentials) + + +async def create_drive_service(user_id, api_key=None): + """Create an authorized Google Drive API service.""" + credentials = await get_credentials(user_id, api_key=api_key) + return build("drive", "v3", credentials=credentials) + + +def create_server(user_id, api_key=None): + server = Server("gforms-server") + server.user_id = user_id + server.api_key = api_key + + @server.list_resources() + async def handle_list_resources( + cursor: Optional[str] = None, + ) -> list[Resource]: + """List Google Forms resources (forms)""" + logger.info( + f"Listing resources for user: {server.user_id} with cursor: {cursor}" + ) + + drive_service = await create_drive_service(server.user_id, server.api_key) + try: + resources = [] + + # List all forms + results = ( + drive_service.files() + .list( + q="mimeType='application/vnd.google-apps.form'", + fields="files(id, name)", + ) + .execute() + ) + + for form in results.get("files", []): + resources.append( + Resource( + uri=f"gforms://form/{form['id']}", + mimeType="application/json", + name=f"Form: {form['name']}", + description="Google Form", + ) + ) + + return resources + + except Exception as e: + logger.error(f"Error listing Google Forms resources: {e}") + return [] + + @server.read_resource() + async def handle_read_resource(uri: AnyUrl) -> Iterable[ReadResourceContents]: + """Read a resource from Google Forms by URI""" + logger.info(f"Reading resource: {uri} for user: {server.user_id}") + + forms_service = await create_forms_service(server.user_id, server.api_key) + try: + uri_str = str(uri) + + if uri_str.startswith("gforms://form/"): + # Handle form resource + form_id = uri_str.replace("gforms://form/", "") + form_data = forms_service.forms().get(formId=form_id).execute() + return [ + ReadResourceContents( + content=json.dumps(form_data, indent=2), + mime_type="application/json", + ) + ] + + raise ValueError(f"Unsupported resource URI: {uri_str}") + + except Exception as e: + logger.error(f"Error reading Google Forms resource: {e}") + return [ + ReadResourceContents( + content=json.dumps({"error": str(e)}), + mime_type="application/json", + ) + ] + + @server.list_tools() + async def handle_list_tools() -> list[types.Tool]: + """Register all supported tools for Google Forms.""" + return [ + types.Tool( + name="list_forms", + description="List all forms.", + inputSchema={ + "type": "object", + "properties": {}, + "required": [], + }, + outputSchema={ + "type": "object", + "properties": { + "files": { + "type": "array", + "items": { + "type": "object", + "properties": { + "id": {"type": "string"}, + "name": {"type": "string"}, + }, + }, + } + }, + "description": "List of Google Forms with their IDs and names", + "examples": [ + '{"files": [{"id": "1hkvi6cSnDrHx7V", "name": "test_form_aea1"}]}' + ], + }, + ), + types.Tool( + name="create_form", + description="Creates a new form.", + inputSchema={ + "type": "object", + "properties": { + "title": { + "type": "string", + "description": "Title of the form", + }, + "description": { + "type": "string", + "description": "Optional description for the form", + }, + "is_public": { + "type": "boolean", + "description": "Whether to make the form public (default: false)", + "default": False, + }, + }, + "required": ["title"], + }, + outputSchema={ + "type": "object", + "properties": { + "form_id": {"type": "string"}, + "response_url": {"type": "string"}, + "edit_url": {"type": "string"}, + "title": {"type": "string"}, + }, + "description": "Details of the created form including URLs", + "examples": [ + '{"form_id": "YT4KTB4UlZWM", "response_url": "https://docs.google.com/forms/d/e/dhshgoasghad/viewform", "edit_url": "https://docs.google.com/forms/d/YT4KTB4UlZWM/edit", "title": "test_form_8eb8"}' + ], + }, + ), + types.Tool( + name="get_form", + description="Retrieves an existing form by its ID.", + inputSchema={ + "type": "object", + "properties": { + "form_id": { + "type": "string", + "description": "ID of the form to retrieve", + } + }, + "required": ["form_id"], + }, + outputSchema={ + "type": "object", + "properties": { + "formId": {"type": "string"}, + "info": { + "type": "object", + "properties": { + "title": {"type": "string"}, + "documentTitle": {"type": "string"}, + }, + }, + "settings": { + "type": "object", + "properties": { + "quizSettings": {"type": "object"}, + "emailCollectionType": {"type": "string"}, + }, + }, + "revisionId": {"type": "string"}, + "responderUri": {"type": "string"}, + "publishSettings": { + "type": "object", + "properties": { + "publishState": { + "type": "object", + "properties": { + "isPublished": {"type": "boolean"}, + "isAcceptingResponses": {"type": "boolean"}, + }, + } + }, + }, + }, + "description": "Complete form details including settings and publish state", + "examples": [ + '{"formId": "Q4Ph8GrQZPWlYvWNtVMI8LSKObpw", "info": {"title": "test_form_aea1", "documentTitle": "test_form_aea1"}, "settings": {"quizSettings": {}, "emailCollectionType": "DO_NOT_COLLECT"}, "revisionId": "00000004", "responderUri": "https://docs.google.com/forms/d/e/dshgsdbg/viewform", "publishSettings": {"publishState": {"isPublished": true, "isAcceptingResponses": true}}}' + ], + }, + ), + types.Tool( + name="update_form", + description="Updates an existing form by its ID.", + inputSchema={ + "type": "object", + "properties": { + "form_id": { + "type": "string", + "description": "ID of the form to update", + }, + "description": { + "type": "string", + "description": "New description for the form", + }, + "is_public": { + "type": "boolean", + "description": "Whether to make the form public (default: false)", + "default": False, + }, + }, + "required": ["form_id"], + }, + outputSchema={ + "type": "object", + "properties": { + "form_id": {"type": "string"}, + "response_url": {"type": "string"}, + "edit_url": {"type": "string"}, + "result": { + "type": "object", + "properties": { + "formId": {"type": "string"}, + "info": { + "type": "object", + "properties": { + "title": {"type": "string"}, + "description": {"type": "string"}, + "documentTitle": {"type": "string"}, + }, + }, + "settings": { + "type": "object", + "properties": { + "quizSettings": {"type": "object"}, + "emailCollectionType": {"type": "string"}, + }, + }, + "revisionId": {"type": "string"}, + "responderUri": {"type": "string"}, + "publishSettings": { + "type": "object", + "properties": { + "publishState": { + "type": "object", + "properties": { + "isPublished": {"type": "boolean"}, + "isAcceptingResponses": { + "type": "boolean" + }, + }, + } + }, + }, + }, + }, + }, + "description": "Updated form details including URLs and complete form information", + "examples": [ + '{"form_id": "1hkvi6cSnDrHx7V", "response_url": "https://docs.google.com/forms/d/e/shfdsogsdog/viewform", "edit_url": "https://docs.google.com/forms/d/1hkvi6cSnDrHx7V/edit", "result": {"formId": "1hkvi6cSnDrHx7V", "info": {"title": "test_form_aea1", "description": "Updated description for test form", "documentTitle": "test_form_aea1"}, "settings": {"quizSettings": {}, "emailCollectionType": "DO_NOT_COLLECT"}, "revisionId": "00000006", "responderUri": "https://docs.google.com/forms/d/e/abjsflbf/viewform", "publishSettings": {"publishState": {"isPublished": true, "isAcceptingResponses": true}}}}' + ], + }, + ), + types.Tool( + name="move_form_to_trash", + description="Removes a form and moves it to trash.", + inputSchema={ + "type": "object", + "properties": { + "form_id": { + "type": "string", + "description": "ID of the form to move to trash", + } + }, + "required": ["form_id"], + }, + outputSchema={ + "type": "string", + "description": "ID of the form that was moved to trash", + "examples": ['"1hkvi6cSnDrHx7V-Q4Ph8GrQZPWlYvWNtVMI8LSKObpw"'], + }, + ), + types.Tool( + name="get_response", + description="Retrieves the details of a response by its ID.", + inputSchema={ + "type": "object", + "properties": { + "form_id": { + "type": "string", + "description": "ID of the form", + }, + "response_id": { + "type": "string", + "description": "ID of the response to retrieve", + }, + }, + "required": ["form_id", "response_id"], + }, + outputSchema={ + "type": "object", + "properties": { + "formId": {"type": "string"}, + "responseId": {"type": "string"}, + "createTime": {"type": "string"}, + "lastSubmittedTime": {"type": "string"}, + "answers": { + "type": "object", + "additionalProperties": { + "type": "object", + "properties": { + "questionId": {"type": "string"}, + "textAnswers": { + "type": "object", + "properties": { + "answers": { + "type": "array", + "items": { + "type": "object", + "properties": { + "value": {"type": "string"} + }, + }, + } + }, + }, + }, + }, + }, + }, + "description": "Response details including answers and timestamps", + "examples": [ + '{"formId": "L6vO7Mho-_yWqNWPhCU", "responseId": "Slv9FN6UMf9TFk4", "createTime": "2025-04-30T20:59:39.526Z", "lastSubmittedTime": "2025-04-30T20:59:39.526237Z", "answers": {"40a835f6": {"questionId": "40a835f6", "textAnswers": {"answers": [{"value": "HI"}]}}}}' + ], + }, + ), + types.Tool( + name="list_responses", + description="Retrieves a list of responses.", + inputSchema={ + "type": "object", + "properties": { + "form_id": { + "type": "string", + "description": "ID of the form", + }, + "page_size": { + "type": "integer", + "description": "Number of responses to return (max 100)", + }, + "page_token": { + "type": "string", + "description": "Token for pagination", + }, + }, + "required": ["form_id"], + }, + outputSchema={ + "type": "object", + "properties": { + "responses": { + "type": "array", + "items": { + "type": "object", + "properties": { + "responseId": {"type": "string"}, + "createTime": {"type": "string"}, + "lastSubmittedTime": {"type": "string"}, + "answers": { + "type": "object", + "additionalProperties": { + "type": "object", + "properties": { + "questionId": {"type": "string"}, + "textAnswers": { + "type": "object", + "properties": { + "answers": { + "type": "array", + "items": { + "type": "object", + "properties": { + "value": { + "type": "string" + } + }, + }, + } + }, + }, + }, + }, + }, + }, + }, + } + }, + "description": "List of form responses with their answers", + "examples": [ + '{"responses": [{"responseId": "Slv9FN6UMf9TFk4", "createTime": "2025-04-30T20:59:39.526Z", "lastSubmittedTime": "2025-04-30T20:59:39.526237Z", "answers": {"40a835f6": {"questionId": "40a835f6", "textAnswers": {"answers": [{"value": "HI"}]}}}}]}' + ], + }, + ), + types.Tool( + name="search_forms", + description="Retrieves a list of forms by name.", + inputSchema={ + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "Search query to filter forms", + }, + }, + "required": ["query"], + }, + outputSchema={ + "type": "object", + "properties": { + "files": { + "type": "array", + "items": { + "type": "object", + "properties": { + "id": {"type": "string"}, + "name": {"type": "string"}, + }, + }, + } + }, + "description": "List of matching forms with their IDs and names", + "examples": [ + '{"files": [{"id": "YT4KTB4UlZWM", "name": "test_form_8eb8"}]}' + ], + }, + ), + types.Tool( + name="add_question", + description="Add a question to an existing Google Form.", + inputSchema={ + "type": "object", + "properties": { + "form_id": { + "type": "string", + "description": "ID of the form to add the question to", + }, + "question_type": { + "type": "string", + "description": "Type of question (text, paragraph, multiple_choice, checkbox)", + "enum": [ + "text", + "paragraph", + "multiple_choice", + "checkbox", + ], + }, + "title": { + "type": "string", + "description": "Question title/text", + }, + "options": { + "type": "array", + "description": "List of options for multiple choice/checkbox questions", + "items": {"type": "string"}, + }, + "required": { + "type": "boolean", + "description": "Whether the question is required", + "default": False, + }, + }, + "required": ["form_id", "question_type", "title"], + }, + outputSchema={ + "type": "object", + "properties": { + "replies": { + "type": "array", + "items": { + "type": "object", + "properties": { + "createItem": { + "type": "object", + "properties": { + "itemId": {"type": "string"}, + "questionId": { + "type": "array", + "items": {"type": "string"}, + }, + }, + } + }, + }, + }, + "writeControl": { + "type": "object", + "properties": {"requiredRevisionId": {"type": "string"}}, + }, + }, + "description": "Result of adding the question including item and question IDs", + "examples": [ + '{"replies": [{"createItem": {"itemId": "2fb38c9f", "questionId": ["56a918aa"]}}], "writeControl": {"requiredRevisionId": "00000006"}}' + ], + }, + ), + types.Tool( + name="delete_item", + description="Deletes an item (question) from an existing Google Form.", + inputSchema={ + "type": "object", + "properties": { + "form_id": { + "type": "string", + "description": "ID of the form to delete the question from", + }, + "item_id": { + "type": "string", + "description": "ID of the question item to delete", + }, + }, + "required": ["form_id", "item_id"], + }, + outputSchema={ + "type": "object", + "properties": { + "replies": {"type": "array", "items": {"type": "object"}}, + "writeControl": { + "type": "object", + "properties": {"requiredRevisionId": {"type": "string"}}, + }, + }, + "description": "Result of deleting the question item", + "examples": [ + '{"replies": [{}], "writeControl": {"requiredRevisionId": "00000007"}}' + ], + }, + ), + ] + + @server.call_tool() + async def handle_call_tool(name: str, arguments: dict | None): + logger.info( + f"User {server.user_id} calling tool: {name} with arguments: {arguments}" + ) + forms_service = await create_forms_service(server.user_id, server.api_key) + + if arguments is None: + arguments = {} + + try: + if name == "list_forms": + drive_service = await create_drive_service( + server.user_id, server.api_key + ) + results = ( + drive_service.files() + .list( + q="mimeType='application/vnd.google-apps.form'", + fields="files(id, name)", + ) + .execute() + ) + files = results.get("files", []) + return [ + types.TextContent(type="text", text=json.dumps(file, indent=2)) + for file in files + ] + elif name == "create_form": + # Create basic form + form_body = { + "info": { + "title": arguments["title"], + "documentTitle": arguments["title"], + } + } + + # Create form + form = forms_service.forms().create(body=form_body).execute() + form_id = form["formId"] + + # If description is provided, update the form + if "description" in arguments and arguments["description"]: + update_body = { + "requests": [ + { + "updateFormInfo": { + "info": {"description": arguments["description"]}, + "updateMask": "description", + } + } + ] + } + forms_service.forms().batchUpdate( + formId=form_id, body=update_body + ).execute() + + # Update form settings to make it public and collectable + settings_body = { + "requests": [ + { + "updateSettings": { + "settings": {"quizSettings": {"isQuiz": False}}, + "updateMask": "quizSettings.isQuiz", + } + } + ] + } + forms_service.forms().batchUpdate( + formId=form_id, body=settings_body + ).execute() + + # Make the form public via Drive API if is_public is True (default) + if arguments.get("is_public", False): + drive_service = await create_drive_service( + server.user_id, server.api_key + ) + + # Set public permission + permission = { + "type": "anyone", + "role": "reader", + "allowFileDiscovery": True, + } + drive_service.permissions().create( + fileId=form_id, + body=permission, + fields="id", + sendNotificationEmail=False, + ).execute() + + # Get the form URLs + edit_url = f"https://docs.google.com/forms/d/{form_id}/edit" + response_url = form.get( + "responderUri", + f"https://docs.google.com/forms/d/e/{form_id}/viewform", + ) + + result = { + "form_id": form_id, + "response_url": response_url, + "edit_url": edit_url, + "title": arguments["title"], + } + return [ + types.TextContent(type="text", text=json.dumps(result, indent=2)) + ] + elif name == "get_form": + result = ( + forms_service.forms().get(formId=arguments["form_id"]).execute() + ) + return [ + types.TextContent(type="text", text=json.dumps(result, indent=2)) + ] + elif name == "update_form": + form_id = arguments["form_id"] + + if "description" in arguments and arguments["description"]: + update_body = { + "requests": [ + { + "updateFormInfo": { + "info": {"description": arguments["description"]}, + "updateMask": "description", + } + } + ] + } + forms_service.forms().batchUpdate( + formId=form_id, body=update_body + ).execute() + + # Update form settings to make it public and collectable + settings_body = { + "requests": [ + { + "updateSettings": { + "settings": {"quizSettings": {"isQuiz": False}}, + "updateMask": "quizSettings.isQuiz", + } + } + ] + } + forms_service.forms().batchUpdate( + formId=form_id, body=settings_body + ).execute() + + if "is_public" in arguments: + drive_service = await create_drive_service( + server.user_id, server.api_key + ) + + # Set public permission + permission = { + "type": "anyone", + "role": "reader", + "allowFileDiscovery": True, + } + drive_service.permissions().create( + fileId=form_id, + body=permission, + fields="id", + sendNotificationEmail=False, + ).execute() + + edit_url = f"https://docs.google.com/forms/d/{form_id}/edit" + form = forms_service.forms().get(formId=form_id).execute() + response_url = form.get( + "responderUri", + f"https://docs.google.com/forms/d/e/{form_id}/viewform", + ) + + result = { + "form_id": form_id, + "response_url": response_url, + "edit_url": edit_url, + "result": form, + } + return [ + types.TextContent(type="text", text=json.dumps(result, indent=2)) + ] + elif name == "move_form_to_trash": + form_id = arguments["form_id"] + drive_service = await create_drive_service( + server.user_id, server.api_key + ) + result = ( + drive_service.files() + .update(fileId=form_id, body={"trashed": True}) + .execute() + ) + return [ + types.TextContent( + type="text", + text=json.dumps(result.get("id", form_id), indent=2), + ) + ] + elif name == "get_response": + result = ( + forms_service.forms() + .responses() + .get( + formId=arguments["form_id"], responseId=arguments["response_id"] + ) + .execute() + ) + return [ + types.TextContent(type="text", text=json.dumps(result, indent=2)) + ] + elif name == "list_responses": + params = {"formId": arguments["form_id"]} + if "page_size" in arguments: + params["pageSize"] = min(arguments["page_size"], 100) + if "page_token" in arguments: + params["pageToken"] = arguments["page_token"] + + result = forms_service.forms().responses().list(**params).execute() + responses = result.get("responses", []) + return [ + types.TextContent(type="text", text=json.dumps(response, indent=2)) + for response in responses + ] + elif name == "search_forms": + drive_service = await create_drive_service( + server.user_id, server.api_key + ) + query = f"mimeType='application/vnd.google-apps.form' and name contains '{arguments['query']}'" + result = ( + drive_service.files() + .list(q=query, fields="files(id, name)") + .execute() + ) + files = result.get("files", []) + return [ + types.TextContent(type="text", text=json.dumps(file, indent=2)) + for file in files + ] + elif name == "add_question": + form_id = arguments["form_id"] + question_type = arguments["question_type"] + title = arguments["title"] + options = arguments.get("options", []) + required = arguments.get("required", False) + + # Get the current form + form = forms_service.forms().get(formId=form_id).execute() + + # Determine the item ID for the new question + item_id = len(form.get("items", [])) + + # Create base request + request = { + "requests": [ + { + "createItem": { + "item": { + "title": title, + "questionItem": { + "question": {"required": required} + }, + }, + "location": {"index": item_id}, + } + } + ] + } + + # Set up question type specific configuration + if question_type == "text": + request["requests"][0]["createItem"]["item"]["questionItem"][ + "question" + ]["textQuestion"] = {} + + elif question_type == "paragraph": + request["requests"][0]["createItem"]["item"]["questionItem"][ + "question" + ]["textQuestion"] = {"paragraph": True} + + elif question_type == "multiple_choice" and options: + choices = [{"value": option} for option in options] + request["requests"][0]["createItem"]["item"]["questionItem"][ + "question" + ]["choiceQuestion"] = { + "type": "RADIO", + "options": choices, + "shuffle": False, + } + + elif question_type == "checkbox" and options: + choices = [{"value": option} for option in options] + request["requests"][0]["createItem"]["item"]["questionItem"][ + "question" + ]["choiceQuestion"] = { + "type": "CHECKBOX", + "options": choices, + "shuffle": False, + } + + # Execute the request + result = ( + forms_service.forms() + .batchUpdate(formId=form_id, body=request) + .execute() + ) + return [ + types.TextContent(type="text", text=json.dumps(result, indent=2)) + ] + elif name == "delete_item": + form_id = arguments["form_id"] + item_id = arguments["item_id"] + + form = forms_service.forms().get(formId=form_id).execute() + + item_index = None + for i, item in enumerate(form.get("items", [])): + if item.get("itemId") == item_id: + item_index = i + break + if item_index is None: + raise ValueError(f"Item with ID {item_id} not found in the form.") + + request_body = { + "requests": [{"deleteItem": {"location": {"index": item_index}}}] + } + + result = ( + forms_service.forms() + .batchUpdate(formId=form_id, body=request_body) + .execute() + ) + return [ + types.TextContent(type="text", text=json.dumps(result, indent=2)) + ] + else: + raise ValueError(f"Unknown tool: {name}") + + except Exception as e: + logger.error(f"Error calling Google Forms API: {e}") + return [types.TextContent(type="text", text=str(e))] + + return server + + +server = create_server + + +def get_initialization_options(server_instance: Server) -> InitializationOptions: + return InitializationOptions( + server_name="gforms-server", + server_version="1.0.0", + capabilities=server_instance.get_capabilities( + notification_options=NotificationOptions(), + experimental_capabilities={}, + ), + ) + + +if __name__ == "__main__": + if sys.argv[1].lower() == "auth": + user_id = "local" + authenticate_and_save_credentials(user_id, SERVICE_NAME, SCOPES) + else: + print("Usage:") + print(" python main.py auth - Run authentication flow for a user") diff --git a/tests/README.md b/tests/README.md index 6bf1e611..50b2a32f 100644 --- a/tests/README.md +++ b/tests/README.md @@ -8,24 +8,44 @@ Each server should have a test file (`tests.py`) in its directory that implement ### Test Components -- **RESOURCE_TESTS**: Tests for resource operations (e.g., list_resources, read_resource) -- **TOOL_TESTS**: Tests for server tools (e.g., create_document, read_document) +- **Resources Test**: Tests for resource operations using `run_resources_test` +- **Tool Tests**: Tests for server tools using `run_tool_test` - **Shared Context**: A dictionary that persists between tests to maintain state -## Test Configuration Format +## Testing Resources -Both TOOL_TESTS and RESOURCE_TESTS use the same configuration format: +Resources can be tested using the simplified `run_resources_test` helper: + +```python +from tests.utils.test_tools import run_resources_test + +@pytest.mark.asyncio +async def test_resources(client, context): + response = await run_resources_test(client) + context["first_resource_uri"] = response.resources[0].uri + return response +``` + +This helper: +- Checks for a valid list_resources response +- Skips if no resources are found +- Validates the first resource and its read_resource response +- Stores the first resource URI in context for use in tool tests + +## Tool Test Configuration Format + +Tool tests use the following configuration format: ```python { - "name": "tool_or_resource_operation_name", + "name": "tool_name", "args_template": "with param1={value1} param2={value2}", # Optional "args": "with static_param=value", # Optional alternative to args_template "expected_keywords": ["keyword1", "keyword2"], # Must appear in response "regex_extractors": { # Optional, extracts values from response "value_name": r'"?field_name"?[:\s]+"?([^"]+)"?', }, - "description": "Describes what this test does", + "description": "Describes what this tool does", "depends_on": ["value_name"], # Optional, dependencies from context "setup": lambda context: {"key": "value"}, # Optional setup function "skip": False # Optional, skip this test if True @@ -37,28 +57,17 @@ Both TOOL_TESTS and RESOURCE_TESTS use the same configuration format: - Tests can depend on values from previous tests via the `depends_on` list - Values are extracted using regex patterns in `regex_extractors` - The shared context dictionary persists between tests +- The first resource URI is available as `first_resource_uri` in context ## Example ```python -RESOURCE_TESTS = [ - { - "name": "list_resources", - "expected_keywords": ["resources"], - "regex_extractors": { - "resource_uri": r'"?uri"?[:\s]+"?(server://type/[^"]+)"?', - }, - "description": "list resources and extract a URI", - }, - { - "name": "read_resource", - "args_template": 'with uri="{resource_uri}"', - "expected_keywords": ["contents"], - "description": "read a resource's details", - "depends_on": ["resource_uri"], - }, -] +# Import required components +import pytest +import random +from tests.utils.test_tools import get_test_id, run_tool_test, run_resources_test +# Define tool tests TOOL_TESTS = [ { "name": "create_document", @@ -78,6 +87,26 @@ TOOL_TESTS = [ "depends_on": ["created_file_id"], }, ] + +# Shared context dictionary +SHARED_CONTEXT = {} + +@pytest.fixture(scope="module") +def context(): + return SHARED_CONTEXT + +# Test resources +@pytest.mark.asyncio +async def test_resources(client, context): + response = await run_resources_test(client) + context["first_resource_uri"] = response.resources[0].uri + return response + +# Test tools +@pytest.mark.parametrize("test_config", TOOL_TESTS, ids=get_test_id) +@pytest.mark.asyncio +async def test_tool(client, context, test_config): + return await run_tool_test(client, context, test_config) ``` ## Running Tests diff --git a/tests/servers/excel/tests.py b/tests/servers/excel/tests.py index a671f999..47041390 100644 --- a/tests/servers/excel/tests.py +++ b/tests/servers/excel/tests.py @@ -2,7 +2,7 @@ import re import random import string -from tests.utils.test_tools import get_test_id, run_tool_test +from tests.utils.test_tools import get_test_id, run_tool_test, run_resources_test TOOL_TESTS = [ @@ -165,51 +165,12 @@ def context(): return SHARED_CONTEXT -@pytest.mark.parametrize("test_config", TOOL_TESTS, ids=get_test_id) @pytest.mark.asyncio -async def test_excel_tool(client, context, test_config): - return await run_tool_test(client, context, test_config) +async def test_resources(client): + return await run_resources_test(client) +@pytest.mark.parametrize("test_config", TOOL_TESTS, ids=get_test_id) @pytest.mark.asyncio -async def test_read_resource(client): - """Test reading an Excel workbook resource""" - # First list resources to get a valid Excel file - response = await client.list_resources() - assert ( - response and hasattr(response, "resources") and len(response.resources) - ), f"Invalid list resources response: {response}" - - # Find the first Excel file resource - excel_resource = next( - (r for r in response.resources if str(r.uri).startswith("excel://file/")), - None, - ) - - # Skip test if no Excel resources found - if not excel_resource: - pytest.skip("No Excel resources found to test read_resource functionality") - return - - # Read Excel file details - response = await client.read_resource(excel_resource.uri) - - # Verify response - assert response.contents, "Response should contain Excel workbook data" - assert response.contents[0].mimeType == "application/json", "Expected JSON response" - - # Parse the JSON content - import json - - content_text = response.contents[0].text - content_data = json.loads(content_text) - - # Verify basic workbook data - assert "id" in content_data, "Response should include workbook ID" - assert "name" in content_data, "Response should include workbook name" - assert "worksheets" in content_data, "Response should include worksheets data" - - print("Excel workbook data read:") - print(f" - Workbook name: {content_data.get('name')}") - print(f" - Worksheets count: {len(content_data.get('worksheets', []))}") - print("✅ Successfully read Excel workbook data") +async def test_excel_tool(client, context, test_config): + return await run_tool_test(client, context, test_config) diff --git a/tests/servers/gforms/tests.py b/tests/servers/gforms/tests.py new file mode 100644 index 00000000..5a14dd96 --- /dev/null +++ b/tests/servers/gforms/tests.py @@ -0,0 +1,128 @@ +import uuid +import pytest +from tests.utils.test_tools import get_test_id, run_tool_test, run_resources_test + +# Generate a unique form name for testing +form_name = f"test_form_{str(uuid.uuid4())[:4]}" + + +TOOL_TESTS = [ + { + "name": "list_forms", + "args_template": "", + "expected_keywords": ["files"], + "regex_extractors": { + "form_id": r'"?id"?[:\s]+"?([^"]+)"?', + "form_name": r'"?name"?[:\s]+"?([^"]+)"?', + }, + "description": "list all Google Forms", + }, + { + "name": "create_form", + "args_template": f"title={form_name} description=Test form created by guMCP is_public=false", + "expected_keywords": ["form_id", "response_url", "edit_url"], + "regex_extractors": { + "created_form_id": r'"?form_id"?[:\s]+"?([^"]+)"?', + "response_url": r'"?response_url"?[:\s]+"?([^"]+)"?', + "edit_url": r'"?edit_url"?[:\s]+"?([^"]+)"?', + }, + "description": f"create a new Google Form with title={form_name}", + }, + { + "name": "get_form", + "args_template": "with id={created_form_id}", + "expected_keywords": ["formId", "info"], + "regex_extractors": { + "form_id": r'"?formId"?[:\s]+"?([^"]+)"?', + "form_title": r'"?title"?[:\s]+"?([^"]+)"?', + }, + "description": "get details of a specific Google Form", + }, + { + "name": "update_form", + "args_template": "with id={created_form_id} description=Updated description for test form is_public=true", + "expected_keywords": ["form_id", "result"], + "regex_extractors": { + "updated_form_id": r'"?form_id"?[:\s]+"?([^"]+)"?', + "form_description": r'"?description"?[:\s]+"?([^"]+)"?', + }, + "description": "update an existing Google Form", + }, + { + "name": "search_forms", + "args_template": f"query={form_name}", + "expected_keywords": ["files"], + "regex_extractors": { + "found_form_id": r'"?id"?[:\s]+"?([^"]+)"?', + "found_form_name": r'"?name"?[:\s]+"?([^"]+)"?', + }, + "description": f"search for Google Forms with name containing {form_name}", + }, + { + "name": "add_question", + "args_template": "with form_id={created_form_id} question_type=text title=Test Question required=true", + "expected_keywords": ["replies"], + "regex_extractors": { + "question_id": r'"?itemId"?[:\s]+"?([^"]+)"?', + }, + "description": "add a text question to a Google Form", + }, + { + "name": "delete_item", + "args_template": "with form_id={created_form_id} item_id={question_id}", + "expected_keywords": ["replies"], + "regex_extractors": { + "success": r'"?replies"?[:\s]+"?([^"]+)"?', + }, + "description": "delete a question from a Google Form", + }, + { + "name": "list_responses", + "args_template": "with form_id={created_form_id} page_size=10", + "expected_keywords": ["responses"], + "regex_extractors": { + "response_count": r'"?responses"?[:\s]+"?([^"]+)"?', + "response_id": r'"?responseId"?[:\s]+"?([^"]+)"?', + }, + "description": "list responses for a Google Form", + }, + { + "name": "get_response", + "args_template": "with form_id={created_form_id} response_id={response_id}", + "expected_keywords": ["responseId", "answers"], + "regex_extractors": { + "response_id": r'"?responseId"?[:\s]+"?([^"]+)"?', + }, + "description": "get details of a form response", + }, + { + "name": "move_form_to_trash", + "args_template": "with id={created_form_id}", + "expected_keywords": ["id"], + "regex_extractors": { + "trashed_form_id": r'"?id"?[:\s]+"?([^"]+)"?', + }, + "description": "move a Google Form to trash", + }, +] + +# Shared context dictionary at module level +SHARED_CONTEXT = {} + + +@pytest.fixture(scope="module") +def context(): + return SHARED_CONTEXT + + +@pytest.mark.asyncio +async def test_resources(client, context): + response = await run_resources_test(client) + context["first_resource_uri"] = response.resources[0].uri + return response + + +@pytest.mark.parametrize("test_config", TOOL_TESTS, ids=get_test_id) +@pytest.mark.asyncio +async def test_gforms_tool(client, context, test_config): + return await run_tool_test(client, context, test_config) diff --git a/tests/servers/word/tests.py b/tests/servers/word/tests.py index 226ebd46..73512269 100644 --- a/tests/servers/word/tests.py +++ b/tests/servers/word/tests.py @@ -1,31 +1,8 @@ import pytest import random -from tests.utils.test_tools import get_test_id, run_tool_test +from tests.utils.test_tools import get_test_id, run_tool_test, run_resources_test -RESOURCE_TESTS = [ - { - "name": "list_resources", - "expected_keywords": ["resources"], - "regex_extractors": { - "resource_uri": r'"?uri"?[:\s]+"?(word://file/[^"]+)"?', - "resource_name": r'"?name"?[:\s]+"?([^"]+)"?', - }, - "description": "list Word document resources and extract a resource URI", - }, - { - "name": "read_resource", - "args_template": 'with uri="{resource_uri}"', - "expected_keywords": ["contents"], - "regex_extractors": { - "document_id": r'"?id"?[:\s]+"?([^"]+)"?', - "document_name": r'"?name"?[:\s]+"?([^"]+)"?', - }, - "description": "read a Word document resource and extract document details", - "depends_on": ["resource_uri"], - }, -] - TOOL_TESTS = [ { "name": "list_documents", @@ -93,12 +70,6 @@ def context(): return SHARED_CONTEXT -@pytest.mark.parametrize("test_config", RESOURCE_TESTS, ids=get_test_id) -@pytest.mark.asyncio -async def test_word_resource(client, context, test_config): - return await run_tool_test(client, context, test_config) - - @pytest.mark.parametrize("test_config", TOOL_TESTS, ids=get_test_id) @pytest.mark.asyncio async def test_word_tool(client, context, test_config): @@ -106,34 +77,7 @@ async def test_word_tool(client, context, test_config): @pytest.mark.asyncio -async def test_read_resource(client): - """Test reading a Word document resource""" - response = await client.list_resources() - - if not (response and hasattr(response, "resources") and len(response.resources)): - pytest.skip("No Word resources found to test read_resource functionality") - return - - word_resource = next( - (r for r in response.resources if str(r.uri).startswith("word://file/")), - None, - ) - - if not word_resource: - pytest.skip("No Word resources found to test read_resource functionality") - return - - response = await client.read_resource(word_resource.uri) - assert response.contents, "Response should contain Word document data" - assert response.contents[0].mimeType == "application/json", "Expected JSON response" - - import json - - content_data = json.loads(response.contents[0].text) - - if "error" in content_data: - pytest.fail(f"Error reading document: {content_data.get('error')}") - - assert "id" in content_data, "Response should include document ID" - assert "name" in content_data, "Response should include document name" - assert "webUrl" in content_data, "Response should include webUrl" +async def test_resources(client, context): + response = await run_resources_test(client) + context["first_resource_uri"] = response.resources[0].uri + return response diff --git a/tests/utils/test_tools.py b/tests/utils/test_tools.py index 3bf657d3..8087b168 100644 --- a/tests/utils/test_tools.py +++ b/tests/utils/test_tools.py @@ -7,20 +7,8 @@ def get_test_id(test_config): return f"{test_config['name']}_{hash(test_config['description']) % 1000}" -def validate_resource_uri(uri): - """ - Validates that a resource URI follows the expected format: {server_id}://{resource_type}/{resource_id} - Returns a tuple of (is_valid, components) where components is (server_id, resource_type, resource_id) - """ - pattern = r"^([a-zA-Z0-9_-]+)://([a-zA-Z0-9_-]+)/(.+)$" - match = re.match(pattern, uri) - if not match: - return False, None - return True, match.groups() - - @pytest.mark.asyncio -async def run_tool_test(client, context, test_config): +async def run_tool_test(client, context: dict, test_config: dict) -> dict: """ Common test function for running tool tests across different servers. @@ -28,6 +16,9 @@ async def run_tool_test(client, context, test_config): client: The client fixture context: Module-scoped context dictionary to store test values test_config: Configuration for the specific test to run + + Returns: + Updated context dictionary with test results """ if test_config.get("skip", False): pytest.skip(f"Test {test_config['name']} marked to skip") @@ -111,17 +102,34 @@ async def run_tool_test(client, context, test_config): if match and len(match.groups()) > 0: context[key] = match.group(1).strip() - should_validate = test_config.get("validate_resource_uri", False) or ( - tool_name in ["list_resources", "read_resource"] and "resource_uri" in context - ) + return context - if should_validate and "resource_uri" in context: - is_valid, components = validate_resource_uri(context["resource_uri"]) - if is_valid: - context["resource_server"] = components[0] - context["resource_type"] = components[1] - context["resource_id"] = components[2] - else: - pytest.fail(f"Invalid resource URI format: {context['resource_uri']}") - return context +@pytest.mark.asyncio +async def run_resources_test(client): + """ + Generic test function for list_resources and read_resource handlers. + """ + # List resources + response = await client.list_resources() + assert ( + response + and hasattr(response, "resources") + and isinstance(response.resources, list) + ), f"Invalid list_resources response: {response}" + if not response.resources: + pytest.skip("No resources found") + + # Test only the first resource + resource = response.resources[0] + assert ( + isinstance(resource.name, str) and resource.name + ), f"Invalid resource name for URI {resource.uri}" + + contents = await client.read_resource(resource.uri) + assert hasattr(contents, "contents") and isinstance( + contents.contents, list + ), f"Invalid read_resource response for {resource.uri}" + assert contents.contents, f"No content returned for {resource.uri}" + + return response