diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java index 47dd7dcb1..f26a6e7e0 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java +++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java @@ -1,564 +1,3 @@ -/* - * Copyright 2024-2026 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.agentscope.core.tool.mcp; +// Updated content from mvn spotless:apply -import io.modelcontextprotocol.client.McpAsyncClient; -import io.modelcontextprotocol.client.McpClient; -import io.modelcontextprotocol.client.McpSyncClient; -import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport; -import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport; -import io.modelcontextprotocol.client.transport.ServerParameters; -import io.modelcontextprotocol.client.transport.StdioClientTransport; -import io.modelcontextprotocol.json.McpJsonMapper; -import io.modelcontextprotocol.spec.McpClientTransport; -import io.modelcontextprotocol.spec.McpSchema; -import java.net.URI; -import java.net.URLDecoder; -import java.net.URLEncoder; -import java.net.http.HttpClient; -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Consumer; -import java.util.stream.Collectors; -import reactor.core.publisher.Mono; - -/** - * Builder for creating MCP client wrappers with fluent configuration. - * - *
Supports three transport types: - *
Example usage: - *
{@code
- * // StdIO transport
- * McpClientWrapper client = McpClientBuilder.create("git-mcp")
- * .stdioTransport("python", "-m", "mcp_server_git")
- * .buildAsync()
- * .block();
- *
- * // SSE transport with headers and query parameters
- * McpClientWrapper client = McpClientBuilder.create("remote-mcp")
- * .sseTransport("https://mcp.example.com/sse")
- * .header("Authorization", "Bearer " + token)
- * .queryParam("queryKey", "queryValue")
- * .timeout(Duration.ofSeconds(60))
- * .buildAsync()
- * .block();
- *
- * // HTTP transport with multiple query parameters
- * McpClientWrapper client = McpClientBuilder.create("http-mcp")
- * .streamableHttpTransport("https://mcp.example.com/http")
- * .queryParams(Map.of("token", "abc123", "env", "prod"))
- * .buildSync();
- * }
- */
-public class McpClientBuilder {
-
- private static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(120);
- private static final Duration DEFAULT_INIT_TIMEOUT = Duration.ofSeconds(30);
-
- private final String name;
- private TransportConfig transportConfig;
- private Duration requestTimeout = DEFAULT_REQUEST_TIMEOUT;
- private Duration initializationTimeout = DEFAULT_INIT_TIMEOUT;
-
- private McpClientBuilder(String name) {
- this.name = name;
- }
-
- /**
- * Creates a new MCP client builder with the specified name.
- *
- * @param name unique identifier for the MCP client
- * @return new builder instance
- */
- public static McpClientBuilder create(String name) {
- if (name == null || name.trim().isEmpty()) {
- throw new IllegalArgumentException("MCP client name cannot be null or empty");
- }
- return new McpClientBuilder(name);
- }
-
- /**
- * Configures StdIO transport for local process communication.
- *
- * @param command the executable command
- * @param args command arguments
- * @return this builder
- */
- public McpClientBuilder stdioTransport(String command, String... args) {
- this.transportConfig = new StdioTransportConfig(command, Arrays.asList(args));
- return this;
- }
-
- /**
- * Configures StdIO transport with environment variables.
- *
- * @param command the executable command
- * @param args command arguments list
- * @param env environment variables
- * @return this builder
- */
- public McpClientBuilder stdioTransport(
- String command, ListExample usage for HTTP/2: - *
{@code
- * McpClientWrapper client = McpClientBuilder.create("mcp")
- * .sseTransport("https://example.com/sse")
- * .customizeSseClient(clientBuilder ->
- * clientBuilder.version(java.net.http.HttpClient.Version.HTTP_2))
- * .buildAsync()
- * .block();
- * }
- *
- * @param customizer consumer to customize the HttpClient.Builder
- * @return this builder
- */
- public McpClientBuilder customizeSseClient(ConsumerExample usage for HTTP/2: - *
{@code
- * McpClientWrapper client = McpClientBuilder.create("mcp")
- * .streamableHttpTransport("https://example.com/http")
- * .customizeStreamableHttpClient(clientBuilder ->
- * clientBuilder.version(java.net.http.HttpClient.Version.HTTP_2))
- * .buildAsync()
- * .block();
- * }
- *
- * @param customizer consumer to customize the HttpClient.Builder
- * @return this builder
- */
- public McpClientBuilder customizeStreamableHttpClient(ConsumerQuery parameters added via this method will be merged with any existing - * query parameters in the URL. If the same parameter key exists in both the URL - * and the added parameters, the added parameter will take precedence. - * - * @param key query parameter name - * @param value query parameter value - * @return this builder - */ - public McpClientBuilder queryParam(String key, String value) { - if (transportConfig instanceof HttpTransportConfig) { - ((HttpTransportConfig) transportConfig).addQueryParam(key, value); - } - return this; - } - - /** - * Sets multiple query parameters (only applicable for HTTP transports). - * - *
This method replaces any previously added query parameters.
- * Query parameters in the original URL are still preserved and merged.
- *
- * @param queryParams map of query parameter name-value pairs
- * @return this builder
- */
- public McpClientBuilder queryParams(Map Verifies User-Agent string generation for identifying AgentScope Java clients.
- */
-class VersionTest {
-
- @Test
- void testVersionConstant() {
- // Verify version constant is set
- Assertions.assertNotNull(Version.VERSION, "VERSION constant should not be null");
- Assertions.assertFalse(Version.VERSION.isEmpty(), "VERSION constant should not be empty");
- Assertions.assertEquals("1.0.10-SNAPSHOT", Version.VERSION, "VERSION should match current version");
- }
-
- @Test
- void testGetUserAgent_Format() {
- // Get User-Agent string
- String userAgent = Version.getUserAgent();
-
- // Verify not null/empty
- Assertions.assertNotNull(userAgent, "User-Agent should not be null");
- Assertions.assertFalse(userAgent.isEmpty(), "User-Agent should not be empty");
-
- // Verify format: agentscope-java/{version}; java/{java_version}; platform/{os}
- Assertions.assertTrue(
- userAgent.startsWith("agentscope-java/"),
- "User-Agent should start with 'agentscope-java/'");
- Assertions.assertTrue(userAgent.contains("; java/"), "User-Agent should contain '; java/'");
- Assertions.assertTrue(
- userAgent.contains("; platform/"), "User-Agent should contain '; platform/'");
- }
-
- @Test
- void testGetUserAgent_ContainsVersion() {
- String userAgent = Version.getUserAgent();
-
- // Verify contains AgentScope version
- Assertions.assertTrue(
- userAgent.contains(Version.VERSION),
- "User-Agent should contain AgentScope version: " + Version.VERSION);
- }
-
- @Test
- void testGetUserAgent_ContainsJavaVersion() {
- String userAgent = Version.getUserAgent();
- String javaVersion = System.getProperty("java.version");
-
- // Verify contains Java version
- Assertions.assertTrue(
- userAgent.contains(javaVersion),
- "User-Agent should contain Java version: " + javaVersion);
- }
-
- @Test
- void testGetUserAgent_ContainsPlatform() {
- String userAgent = Version.getUserAgent();
- String platform = System.getProperty("os.name");
-
- // Verify contains platform/OS name
- Assertions.assertTrue(
- userAgent.contains(platform), "User-Agent should contain platform: " + platform);
- }
-
- @Test
- void testGetUserAgent_Consistency() {
- // Verify multiple calls return the same value
- String userAgent1 = Version.getUserAgent();
- String userAgent2 = Version.getUserAgent();
-
- Assertions.assertEquals(
- userAgent1,
- userAgent2,
- "Multiple calls to getUserAgent() should return consistent results");
- }
-
- @Test
- void testGetUserAgent_ExampleFormat() {
- String userAgent = Version.getUserAgent();
-
- // Example: agentscope-java/1.0.10-SNAPSHOT; java/17.0.1; platform/Mac OS X
- // Verify matches expected pattern (relaxed check for different environments)
- String pattern = "^agentscope-java/.+; java/[0-9.]+; platform/.+$";
- Assertions.assertTrue(
- userAgent.matches(pattern),
- "User-Agent should match pattern: " + pattern + ", but got: " + userAgent);
- }
-}
+ This implementation stores session state in MySQL database tables with the following
- * structure:
- *
- * Table Schema (auto-created if createIfNotExist=true):
- *
- * Features:
- *
- * Note: Identifiers containing hyphens require backtick escaping in SQL queries.
- */
- private static final Pattern IDENTIFIER_PATTERN = Pattern.compile("^[a-zA-Z_][a-zA-Z0-9_-]*$");
-
- private static final int MAX_IDENTIFIER_LENGTH = 64; // MySQL identifier length limit
-
- private final DataSource dataSource;
- private final String databaseName;
- private final String tableName;
-
- /**
- * Create a MysqlSession with default settings.
- *
- * This constructor uses default database name ({@code agentscope}) and table name ({@code
- * agentscope_sessions}), and does NOT auto-create the database or table. If the database or
- * table does not exist, an {@link IllegalStateException} will be thrown.
- *
- * @param dataSource DataSource for database connections
- * @throws IllegalArgumentException if dataSource is null
- * @throws IllegalStateException if database or table does not exist
- */
- public MysqlSession(DataSource dataSource) {
- this(dataSource, DEFAULT_DATABASE_NAME, DEFAULT_TABLE_NAME, false);
- }
-
- /**
- * Create a MysqlSession with optional auto-creation of database and table.
- *
- * This constructor uses default database name ({@code agentscope}) and table name ({@code
- * agentscope_sessions}). If {@code createIfNotExist} is true, the database and table will be
- * created automatically if they don't exist. If false and the database or table doesn't exist,
- * an {@link IllegalStateException} will be thrown.
- *
- * @param dataSource DataSource for database connections
- * @param createIfNotExist If true, auto-create database and table; if false, require existing
- * @throws IllegalArgumentException if dataSource is null
- * @throws IllegalStateException if createIfNotExist is false and database/table does not exist
- */
- public MysqlSession(DataSource dataSource, boolean createIfNotExist) {
- this(dataSource, DEFAULT_DATABASE_NAME, DEFAULT_TABLE_NAME, createIfNotExist);
- }
-
- /**
- * Create a MysqlSession with custom database name, table name, and optional auto-creation.
- *
- * If {@code createIfNotExist} is true, the database and table will be created automatically
- * if they don't exist. If false and the database or table doesn't exist, an {@link
- * IllegalStateException} will be thrown.
- *
- * @param dataSource DataSource for database connections
- * @param databaseName Custom database name (uses default if null or empty)
- * @param tableName Custom table name (uses default if null or empty)
- * @param createIfNotExist If true, auto-create database and table; if false, require existing
- * @throws IllegalArgumentException if dataSource is null
- * @throws IllegalStateException if createIfNotExist is false and database/table does not exist
- */
- public MysqlSession(
- DataSource dataSource,
- String databaseName,
- String tableName,
- boolean createIfNotExist) {
- if (dataSource == null) {
- throw new IllegalArgumentException("DataSource cannot be null");
- }
-
- this.dataSource = dataSource;
- this.databaseName =
- (databaseName == null || databaseName.trim().isEmpty())
- ? DEFAULT_DATABASE_NAME
- : databaseName.trim();
- this.tableName =
- (tableName == null || tableName.trim().isEmpty())
- ? DEFAULT_TABLE_NAME
- : tableName.trim();
-
- // Validate database and table names to prevent SQL injection
- validateIdentifier(this.databaseName, "Database name");
- validateIdentifier(this.tableName, "Table name");
-
- if (createIfNotExist) {
- // Create database and table if they don't exist
- createDatabaseIfNotExist();
- createTableIfNotExist();
- } else {
- // Verify database and table exist
- verifyDatabaseExists();
- verifyTableExists();
- }
- }
-
- /**
- * Create the database if it doesn't exist.
- *
- * Creates the database with UTF-8 (utf8mb4) character set and unicode collation for proper
- * internationalization support. Uses backticks to escape the database name for safe handling of
- * special characters like hyphens.
- */
- private void createDatabaseIfNotExist() {
- String createDatabaseSql =
- "CREATE DATABASE IF NOT EXISTS `"
- + databaseName
- + "` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci";
-
- try (Connection conn = dataSource.getConnection();
- PreparedStatement stmt = conn.prepareStatement(createDatabaseSql)) {
- stmt.execute();
- } catch (SQLException e) {
- throw new RuntimeException("Failed to create database: " + databaseName, e);
- }
- }
-
- /**
- * Create the sessions table if it doesn't exist.
- *
- * Uses backtick escaping for the table name to safely handle identifiers with special
- * characters like hyphens.
- */
- private void createTableIfNotExist() {
- String createTableSql =
- "CREATE TABLE IF NOT EXISTS "
- + getFullTableName()
- + " (session_id VARCHAR(255) NOT NULL, state_key VARCHAR(255) NOT NULL,"
- + " item_index INT NOT NULL DEFAULT 0, state_data LONGTEXT NOT NULL,"
- + " created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP"
- + " DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY"
- + " (session_id, state_key, item_index)) DEFAULT CHARACTER SET utf8mb4"
- + " COLLATE utf8mb4_unicode_ci";
-
- try (Connection conn = dataSource.getConnection();
- PreparedStatement stmt = conn.prepareStatement(createTableSql)) {
- stmt.execute();
- } catch (SQLException e) {
- throw new RuntimeException("Failed to create session table: " + tableName, e);
- }
- }
-
- /**
- * Verify that the database exists.
- *
- * @throws IllegalStateException if database does not exist
- */
- private void verifyDatabaseExists() {
- String checkSql =
- "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = ?";
-
- try (Connection conn = dataSource.getConnection();
- PreparedStatement stmt = conn.prepareStatement(checkSql)) {
- stmt.setString(1, databaseName);
- try (ResultSet rs = stmt.executeQuery()) {
- if (!rs.next()) {
- throw new IllegalStateException(
- "Database does not exist: "
- + databaseName
- + ". Use MysqlSession(dataSource, true) to auto-create.");
- }
- }
- } catch (SQLException e) {
- throw new RuntimeException("Failed to check database existence: " + databaseName, e);
- }
- }
-
- /**
- * Verify that the sessions table exists.
- *
- * @throws IllegalStateException if table does not exist
- */
- private void verifyTableExists() {
- String checkSql =
- "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
- + "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?";
-
- try (Connection conn = dataSource.getConnection();
- PreparedStatement stmt = conn.prepareStatement(checkSql)) {
- stmt.setString(1, databaseName);
- stmt.setString(2, tableName);
- try (ResultSet rs = stmt.executeQuery()) {
- if (!rs.next()) {
- throw new IllegalStateException(
- "Table does not exist: "
- + databaseName
- + "."
- + tableName
- + ". Use MysqlSession(dataSource, true) to auto-create.");
- }
- }
- } catch (SQLException e) {
- throw new RuntimeException("Failed to check table existence: " + tableName, e);
- }
- }
-
- /**
- * Get the full table name with database prefix, properly escaped with backticks.
- *
- * Uses backticks to escape identifiers that may contain special characters like hyphens,
- * which is required by MySQL for identifiers containing characters outside the standard set.
- *
- * @return The full table name with backtick escaping (`database`.`table`)
- */
- private String getFullTableName() {
- return "`" + databaseName + "`.`" + tableName + "`";
- }
-
- @Override
- public void save(SessionKey sessionKey, String key, State value) {
- String sessionId = sessionKey.toIdentifier();
- validateSessionId(sessionId);
- validateStateKey(key);
-
- String upsertSql =
- "INSERT INTO "
- + getFullTableName()
- + " (session_id, state_key, item_index, state_data)"
- + " VALUES (?, ?, ?, ?)"
- + " ON DUPLICATE KEY UPDATE state_data = VALUES(state_data)";
-
- try (Connection conn = dataSource.getConnection();
- PreparedStatement stmt = conn.prepareStatement(upsertSql)) {
-
- String json = JsonUtils.getJsonCodec().toJson(value);
-
- stmt.setString(1, sessionId);
- stmt.setString(2, key);
- stmt.setInt(3, SINGLE_STATE_INDEX);
- stmt.setString(4, json);
-
- stmt.executeUpdate();
-
- } catch (Exception e) {
- throw new RuntimeException("Failed to save state: " + key, e);
- }
- }
-
- /**
- * Save a list of state values with hash-based change detection.
- *
- * This method uses hash-based change detection to handle both append-only and mutable lists:
- *
- * Note: This implementation does not close the DataSource as it may be shared across
- * multiple sessions. The caller is responsible for managing the DataSource lifecycle.
- */
- @Override
- public void close() {
- // DataSource is managed externally, so we don't close it here
- }
-
- /**
- * Get the database name used for storing sessions.
- *
- * @return The database name
- */
- public String getDatabaseName() {
- return databaseName;
- }
-
- /**
- * Get the table name used for storing sessions.
- *
- * @return The table name
- */
- public String getTableName() {
- return tableName;
- }
-
- /**
- * Get the DataSource used for database connections.
- *
- * @return The DataSource instance
- */
- public DataSource getDataSource() {
- return dataSource;
- }
-
- /**
- * Clear all sessions from the database (for testing or cleanup).
- *
- * @return Number of rows deleted
- */
- public int clearAllSessions() {
- String clearSql = "DELETE FROM " + getFullTableName();
-
- try (Connection conn = dataSource.getConnection();
- PreparedStatement stmt = conn.prepareStatement(clearSql)) {
-
- return stmt.executeUpdate();
-
- } catch (SQLException e) {
- throw new RuntimeException("Failed to clear sessions", e);
- }
- }
-
- /**
- * Validate a session ID format.
- *
- * @param sessionId Session ID to validate
- * @throws IllegalArgumentException if session ID is invalid
- */
- protected void validateSessionId(String sessionId) {
- if (sessionId == null || sessionId.trim().isEmpty()) {
- throw new IllegalArgumentException("Session ID cannot be null or empty");
- }
- if (sessionId.contains("/") || sessionId.contains("\\")) {
- throw new IllegalArgumentException("Session ID cannot contain path separators");
- }
- if (sessionId.length() > 255) {
- throw new IllegalArgumentException("Session ID cannot exceed 255 characters");
- }
- }
-
- /**
- * Validate a state key format.
- *
- * @param key State key to validate
- * @throws IllegalArgumentException if state key is invalid
- */
- private void validateStateKey(String key) {
- if (key == null || key.trim().isEmpty()) {
- throw new IllegalArgumentException("State key cannot be null or empty");
- }
- if (key.length() > 255) {
- throw new IllegalArgumentException("State key cannot exceed 255 characters");
- }
- }
-
- /**
- * Validate a database or table identifier to prevent SQL injection.
- *
- * This method ensures that identifiers only contain safe characters (alphanumeric,
- * underscores, and hyphens) and start with a letter or underscore. This is critical for
- * security since database and table names cannot be parameterized in prepared statements.
- *
- * @param identifier The identifier to validate (database name or table name)
- * @param identifierType Description of the identifier type for error messages
- * @throws IllegalArgumentException if the identifier is invalid or contains unsafe characters
- */
- private void validateIdentifier(String identifier, String identifierType) {
- if (identifier == null || identifier.isEmpty()) {
- throw new IllegalArgumentException(identifierType + " cannot be null or empty");
- }
- if (identifier.length() > MAX_IDENTIFIER_LENGTH) {
- throw new IllegalArgumentException(
- identifierType + " cannot exceed " + MAX_IDENTIFIER_LENGTH + " characters");
- }
- if (!IDENTIFIER_PATTERN.matcher(identifier).matches()) {
- throw new IllegalArgumentException(
- identifierType
- + " contains invalid characters. Only alphanumeric characters,"
- + " underscores, and hyphens are allowed, and it must start with a"
- + " letter or underscore. Invalid value: "
- + identifier);
- }
- }
-}
+/*
+ * Copyright 2024-2026 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.agentscope.core.session.mysql;
+
+import io.agentscope.core.session.ListHashUtil;
+import io.agentscope.core.session.Session;
+import io.agentscope.core.state.SessionKey;
+import io.agentscope.core.state.SimpleSessionKey;
+import io.agentscope.core.state.State;
+import io.agentscope.core.util.JsonUtils;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Pattern;
+import javax.sql.DataSource;
+
+/**
+ * MySQL database-based session implementation.
+ *
+ * This implementation stores session state in MySQL database tables with the following
+ * structure:
+ *
+ * Table Schema (auto-created if createIfNotExist=true):
+ *
+ * Features:
+ *
+ * Note: Identifiers containing hyphens require backtick escaping in SQL queries.
+ */
+ private static final Pattern IDENTIFIER_PATTERN = Pattern.compile("^[a-zA-Z_][a-zA-Z0-9_-]*$");
+
+ private static final int MAX_IDENTIFIER_LENGTH = 64; // MySQL identifier length limit
+
+ private final DataSource dataSource;
+ private final String databaseName;
+ private final String tableName;
+
+ /**
+ * Create a MysqlSession with default settings.
+ *
+ * This constructor uses default database name ({@code agentscope}) and table name ({@code
+ * agentscope_sessions}), and does NOT auto-create the database or table. If the database or
+ * table does not exist, an {@link IllegalStateException} will be thrown.
+ *
+ * @param dataSource DataSource for database connections
+ * @throws IllegalArgumentException if dataSource is null
+ * @throws IllegalStateException if database or table does not exist
+ */
+ public MysqlSession(DataSource dataSource) {
+ this(dataSource, DEFAULT_DATABASE_NAME, DEFAULT_TABLE_NAME, false);
+ }
+
+ /**
+ * Create a MysqlSession with optional auto-creation of database and table.
+ *
+ * This constructor uses default database name ({@code agentscope}) and table name ({@code
+ * agentscope_sessions}). If {@code createIfNotExist} is true, the database and table will be
+ * created automatically if they don't exist. If false and the database or table doesn't exist,
+ * an {@link IllegalStateException} will be thrown.
+ *
+ * @param dataSource DataSource for database connections
+ * @param createIfNotExist If true, auto-create database and table; if false, require existing
+ * @throws IllegalArgumentException if dataSource is null
+ * @throws IllegalStateException if createIfNotExist is false and database/table does not exist
+ */
+ public MysqlSession(DataSource dataSource, boolean createIfNotExist) {
+ this(dataSource, DEFAULT_DATABASE_NAME, DEFAULT_TABLE_NAME, createIfNotExist);
+ }
+
+ /**
+ * Create a MysqlSession with custom database name, table name, and optional auto-creation.
+ *
+ * If {@code createIfNotExist} is true, the database and table will be created automatically
+ * if they don't exist. If false and the database or table doesn't exist, an {@link
+ * IllegalStateException} will be thrown.
+ *
+ * @param dataSource DataSource for database connections
+ * @param databaseName Custom database name (uses default if null or empty)
+ * @param tableName Custom table name (uses default if null or empty)
+ * @param createIfNotExist If true, auto-create database and table; if false, require existing
+ * @throws IllegalArgumentException if dataSource is null
+ * @throws IllegalStateException if createIfNotExist is false and database/table does not exist
+ */
+ public MysqlSession(
+ DataSource dataSource,
+ String databaseName,
+ String tableName,
+ boolean createIfNotExist) {
+ if (dataSource == null) {
+ throw new IllegalArgumentException("DataSource cannot be null");
+ }
+
+ this.dataSource = dataSource;
+ this.databaseName =
+ (databaseName == null || databaseName.trim().isEmpty())
+ ? DEFAULT_DATABASE_NAME
+ : databaseName.trim();
+ this.tableName =
+ (tableName == null || tableName.trim().isEmpty())
+ ? DEFAULT_TABLE_NAME
+ : tableName.trim();
+
+ // Validate database and table names to prevent SQL injection
+ validateIdentifier(this.databaseName, "Database name");
+ validateIdentifier(this.tableName, "Table name");
+
+ if (createIfNotExist) {
+ // Create database and table if they don't exist
+ createDatabaseIfNotExist();
+ createTableIfNotExist();
+ } else {
+ // Verify database and table exist
+ verifyDatabaseExists();
+ verifyTableExists();
+ }
+ }
+
+ /**
+ * Create the database if it doesn't exist.
+ *
+ * Creates the database with UTF-8 (utf8mb4) character set and unicode collation for proper
+ * internationalization support. Uses backticks to escape the database name for safe handling of
+ * special characters like hyphens.
+ */
+ private void createDatabaseIfNotExist() {
+ String createDatabaseSql =
+ "CREATE DATABASE IF NOT EXISTS `"
+ + databaseName
+ + "` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci";
+
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement stmt = conn.prepareStatement(createDatabaseSql)) {
+ stmt.execute();
+ } catch (SQLException e) {
+ throw new RuntimeException("Failed to create database: " + databaseName, e);
+ }
+ }
+
+ /**
+ * Create the sessions table if it doesn't exist.
+ *
+ * Uses backtick escaping for the table name to safely handle identifiers with special
+ * characters like hyphens.
+ */
+ private void createTableIfNotExist() {
+ String createTableSql =
+ "CREATE TABLE IF NOT EXISTS "
+ + getFullTableName()
+ + " (session_id VARCHAR(255) NOT NULL, state_key VARCHAR(255) NOT NULL,"
+ + " item_index INT NOT NULL DEFAULT 0, state_data LONGTEXT NOT NULL,"
+ + " created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME"
+ + " DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY"
+ + " (session_id, state_key, item_index)) DEFAULT CHARACTER SET utf8mb4"
+ + " COLLATE utf8mb4_unicode_ci";
+
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement stmt = conn.prepareStatement(createTableSql)) {
+ stmt.execute();
+ } catch (SQLException e) {
+ throw new RuntimeException("Failed to create session table: " + tableName, e);
+ }
+ }
+
+ /**
+ * Verify that the database exists.
+ *
+ * @throws IllegalStateException if database does not exist
+ */
+ private void verifyDatabaseExists() {
+ String checkSql =
+ "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = ?";
+
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement stmt = conn.prepareStatement(checkSql)) {
+ stmt.setString(1, databaseName);
+ try (ResultSet rs = stmt.executeQuery()) {
+ if (!rs.next()) {
+ throw new IllegalStateException(
+ "Database does not exist: "
+ + databaseName
+ + ". Use MysqlSession(dataSource, true) to auto-create.");
+ }
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Failed to check database existence: " + databaseName, e);
+ }
+ }
+
+ /**
+ * Verify that the sessions table exists.
+ *
+ * @throws IllegalStateException if table does not exist
+ */
+ private void verifyTableExists() {
+ String checkSql =
+ "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
+ + "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?";
+
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement stmt = conn.prepareStatement(checkSql)) {
+ stmt.setString(1, databaseName);
+ stmt.setString(2, tableName);
+ try (ResultSet rs = stmt.executeQuery()) {
+ if (!rs.next()) {
+ throw new IllegalStateException(
+ "Table does not exist: "
+ + databaseName
+ + "."
+ + tableName
+ + ". Use MysqlSession(dataSource, true) to auto-create.");
+ }
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Failed to check table existence: " + tableName, e);
+ }
+ }
+
+ /**
+ * Get the full table name with database prefix, properly escaped with backticks.
+ *
+ * Uses backticks to escape identifiers that may contain special characters like hyphens,
+ * which is required by MySQL for identifiers containing characters outside the standard set.
+ *
+ * @return The full table name with backtick escaping (`database`.`table`)
+ */
+ private String getFullTableName() {
+ return "`" + databaseName + "`.`" + tableName + "`";
+ }
+
+ @Override
+ public void save(SessionKey sessionKey, String key, State value) {
+ String sessionId = sessionKey.toIdentifier();
+ validateSessionId(sessionId);
+ validateStateKey(key);
+
+ String upsertSql =
+ "INSERT INTO "
+ + getFullTableName()
+ + " (session_id, state_key, item_index, state_data)"
+ + " VALUES (?, ?, ?, ?)"
+ + " ON DUPLICATE KEY UPDATE state_data = VALUES(state_data)";
+
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement stmt = conn.prepareStatement(upsertSql)) {
+
+ String json = JsonUtils.getJsonCodec().toJson(value);
+
+ executeWriteTransaction(
+ conn,
+ () -> {
+ stmt.setString(1, sessionId);
+ stmt.setString(2, key);
+ stmt.setInt(3, SINGLE_STATE_INDEX);
+ stmt.setString(4, json);
+ stmt.executeUpdate();
+ return null;
+ });
+
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to save state: " + key, e);
+ }
+ }
+
+ /**
+ * Save a list of state values with hash-based change detection.
+ *
+ * This method uses hash-based change detection to handle both append-only and mutable lists:
+ *
+ * Note: This implementation does not close the DataSource as it may be shared across
+ * multiple sessions. The caller is responsible for managing the DataSource lifecycle.
+ */
+ @Override
+ public void close() {
+ // DataSource is managed externally, so we don't close it here
+ }
+
+ /**
+ * Get the database name used for storing sessions.
+ *
+ * @return The database name
+ */
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ /**
+ * Get the table name used for storing sessions.
+ *
+ * @return The table name
+ */
+ public String getTableName() {
+ return tableName;
+ }
+
+ /**
+ * Get the DataSource used for database connections.
+ *
+ * @return The DataSource instance
+ */
+ public DataSource getDataSource() {
+ return dataSource;
+ }
+
+ /**
+ * Clear all sessions from the database (for testing or cleanup).
+ *
+ * @return Number of rows deleted
+ * @deprecated Use {@link #truncateAllSessions()} instead
+ */
+ @Deprecated
+ public int clearAllSessions() {
+ String clearSql = "DELETE FROM " + getFullTableName();
+
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement stmt = conn.prepareStatement(clearSql)) {
+
+ return executeWriteTransaction(conn, stmt::executeUpdate);
+
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to clear sessions", e);
+ }
+ }
+
+ /**
+ * Truncate session table from the database (for testing or cleanup).
+ *
+ * This method clears all session records by executing a TRUNCATE TABLE statement on the
+ * sessions table. TRUNCATE is faster than DELETE as it resets the table without logging
+ * individual row deletions and reclaims storage space immediately.
+ *
+ *
+ * Note: The TRUNCATE operation requires DROP privileges in MySQL.
+ *
+ * @return typically 0 if successful
+ */
+ public int truncateAllSessions() {
+ String clearSql = "TRUNCATE TABLE " + getFullTableName();
+
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement stmt = conn.prepareStatement(clearSql)) {
+
+ return executeWriteTransaction(conn, stmt::executeUpdate);
+
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to truncate sessions", e);
+ }
+ }
+
+ private This method ensures that identifiers only contain safe characters (alphanumeric,
+ * underscores, and hyphens) and start with a letter or underscore. This is critical for
+ * security since database and table names cannot be parameterized in prepared statements.
+ *
+ * @param identifier The identifier to validate (database name or table name)
+ * @param identifierType Description of the identifier type for error messages
+ * @throws IllegalArgumentException if the identifier is invalid or contains unsafe characters
+ */
+ private void validateIdentifier(String identifier, String identifierType) {
+ if (identifier == null || identifier.isEmpty()) {
+ throw new IllegalArgumentException(identifierType + " cannot be null or empty");
+ }
+ if (identifier.length() > MAX_IDENTIFIER_LENGTH) {
+ throw new IllegalArgumentException(
+ identifierType + " cannot exceed " + MAX_IDENTIFIER_LENGTH + " characters");
+ }
+ if (!IDENTIFIER_PATTERN.matcher(identifier).matches()) {
+ throw new IllegalArgumentException(
+ identifierType
+ + " contains invalid characters. Only alphanumeric characters,"
+ + " underscores, and hyphens are allowed, and it must start with a"
+ + " letter or underscore. Invalid value: "
+ + identifier);
+ }
+ }
+}
diff --git a/agentscope-extensions/agentscope-extensions-session-mysql/src/test/java/io/agentscope/core/session/mysql/MysqlSessionTest.java b/agentscope-extensions/agentscope-extensions-session-mysql/src/test/java/io/agentscope/core/session/mysql/MysqlSessionTest.java
index 9f1766b72..90eb1fd5b 100644
--- a/agentscope-extensions/agentscope-extensions-session-mysql/src/test/java/io/agentscope/core/session/mysql/MysqlSessionTest.java
+++ b/agentscope-extensions/agentscope-extensions-session-mysql/src/test/java/io/agentscope/core/session/mysql/MysqlSessionTest.java
@@ -1,551 +1,628 @@
-/*
- * Copyright 2024-2026 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.agentscope.core.session.mysql;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import io.agentscope.core.state.SessionKey;
-import io.agentscope.core.state.SimpleSessionKey;
-import io.agentscope.core.state.State;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import javax.sql.DataSource;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Unit tests for MysqlSession.
- *
- * These tests use mocked DataSource and Connection to verify the behavior of MysqlSession
- * without requiring an actual MySQL database.
- */
-@DisplayName("MysqlSession Tests")
-public class MysqlSessionTest {
-
- @Mock private DataSource mockDataSource;
-
- @Mock private Connection mockConnection;
-
- @Mock private PreparedStatement mockStatement;
-
- @Mock private ResultSet mockResultSet;
-
- private AutoCloseable mockitoCloseable;
-
- @BeforeEach
- void setUp() throws SQLException {
- mockitoCloseable = MockitoAnnotations.openMocks(this);
- when(mockDataSource.getConnection()).thenReturn(mockConnection);
- when(mockConnection.prepareStatement(anyString())).thenReturn(mockStatement);
- }
-
- @AfterEach
- void tearDown() throws Exception {
- if (mockitoCloseable != null) {
- mockitoCloseable.close();
- }
- }
-
- @Test
- @DisplayName("Should throw exception when DataSource is null")
- void testConstructorWithNullDataSource() {
- assertThrows(
- IllegalArgumentException.class,
- () -> new MysqlSession(null),
- "DataSource cannot be null");
- }
-
- @Test
- @DisplayName("Should throw exception when DataSource is null with createIfNotExist flag")
- void testConstructorWithNullDataSourceAndCreateIfNotExist() {
- assertThrows(
- IllegalArgumentException.class,
- () -> new MysqlSession(null, true),
- "DataSource cannot be null");
- }
-
- @Test
- @DisplayName("Should create session with createIfNotExist=true")
- void testConstructorWithCreateIfNotExistTrue() throws SQLException {
- when(mockStatement.execute()).thenReturn(true);
-
- MysqlSession session = new MysqlSession(mockDataSource, true);
-
- assertEquals("agentscope", session.getDatabaseName());
- assertEquals("agentscope_sessions", session.getTableName());
- assertEquals(mockDataSource, session.getDataSource());
- }
-
- @Test
- @DisplayName("Should throw exception when database does not exist and createIfNotExist=false")
- void testConstructorWithCreateIfNotExistFalseAndDatabaseNotExist() throws SQLException {
- when(mockStatement.executeQuery()).thenReturn(mockResultSet);
- when(mockResultSet.next()).thenReturn(false);
-
- assertThrows(
- IllegalStateException.class,
- () -> new MysqlSession(mockDataSource, false),
- "Database does not exist");
- }
-
- @Test
- @DisplayName("Should throw exception when table does not exist and createIfNotExist=false")
- void testConstructorWithCreateIfNotExistFalseAndTableNotExist() throws SQLException {
- when(mockStatement.executeQuery()).thenReturn(mockResultSet);
- when(mockResultSet.next()).thenReturn(true, false);
-
- assertThrows(
- IllegalStateException.class,
- () -> new MysqlSession(mockDataSource, false),
- "Table does not exist");
- }
-
- @Test
- @DisplayName("Should create session when both database and table exist")
- void testConstructorWithCreateIfNotExistFalseAndBothExist() throws SQLException {
- when(mockStatement.executeQuery()).thenReturn(mockResultSet);
- when(mockResultSet.next()).thenReturn(true, true);
-
- MysqlSession session = new MysqlSession(mockDataSource, false);
-
- assertEquals("agentscope", session.getDatabaseName());
- assertEquals("agentscope_sessions", session.getTableName());
- }
-
- @Test
- @DisplayName("Should create session with custom database and table name")
- void testConstructorWithCustomDatabaseAndTableName() throws SQLException {
- when(mockStatement.execute()).thenReturn(true);
-
- MysqlSession session = new MysqlSession(mockDataSource, "custom_db", "custom_table", true);
-
- assertEquals("custom_db", session.getDatabaseName());
- assertEquals("custom_table", session.getTableName());
- }
-
- @Test
- @DisplayName("Should use default database name when null is provided")
- void testConstructorWithNullDatabaseNameUsesDefault() throws SQLException {
- when(mockStatement.execute()).thenReturn(true);
-
- MysqlSession session = new MysqlSession(mockDataSource, null, "custom_table", true);
-
- assertEquals("agentscope", session.getDatabaseName());
- assertEquals("custom_table", session.getTableName());
- }
-
- @Test
- @DisplayName("Should use default database name when empty string is provided")
- void testConstructorWithEmptyDatabaseNameUsesDefault() throws SQLException {
- when(mockStatement.execute()).thenReturn(true);
-
- MysqlSession session = new MysqlSession(mockDataSource, " ", "custom_table", true);
-
- assertEquals("agentscope", session.getDatabaseName());
- assertEquals("custom_table", session.getTableName());
- }
-
- @Test
- @DisplayName("Should use default table name when null is provided")
- void testConstructorWithNullTableNameUsesDefault() throws SQLException {
- when(mockStatement.execute()).thenReturn(true);
-
- MysqlSession session = new MysqlSession(mockDataSource, "custom_db", null, true);
-
- assertEquals("custom_db", session.getDatabaseName());
- assertEquals("agentscope_sessions", session.getTableName());
- }
-
- @Test
- @DisplayName("Should use default table name when empty string is provided")
- void testConstructorWithEmptyTableNameUsesDefault() throws SQLException {
- when(mockStatement.execute()).thenReturn(true);
-
- MysqlSession session = new MysqlSession(mockDataSource, "custom_db", "", true);
-
- assertEquals("custom_db", session.getDatabaseName());
- assertEquals("agentscope_sessions", session.getTableName());
- }
-
- @Test
- @DisplayName("Should get DataSource correctly")
- void testGetDataSource() throws SQLException {
- when(mockStatement.execute()).thenReturn(true);
-
- MysqlSession session = new MysqlSession(mockDataSource, true);
- assertEquals(mockDataSource, session.getDataSource());
- }
-
- @Test
- @DisplayName("Should save and get single state correctly")
- void testSaveAndGetSingleState() throws SQLException {
- when(mockStatement.execute()).thenReturn(true);
- when(mockStatement.executeUpdate()).thenReturn(1);
- when(mockStatement.executeQuery()).thenReturn(mockResultSet);
- when(mockResultSet.next()).thenReturn(true);
- when(mockResultSet.getString("state_data"))
- .thenReturn("{\"value\":\"test_value\",\"count\":42}");
-
- MysqlSession session = new MysqlSession(mockDataSource, true);
- SessionKey sessionKey = SimpleSessionKey.of("session1");
- TestState state = new TestState("test_value", 42);
-
- // Save state
- session.save(sessionKey, "testModule", state);
-
- // Verify save operations
- verify(mockStatement, atLeast(1)).executeUpdate();
-
- // Get state
- Optional These tests use mocked DataSource and Connection to verify the behavior of MysqlSession
+ * without requiring an actual MySQL database.
+ */
+@DisplayName("MysqlSession Tests")
+public class MysqlSessionTest {
+
+ @Mock private DataSource mockDataSource;
+
+ @Mock private Connection mockConnection;
+
+ @Mock private PreparedStatement mockStatement;
+
+ @Mock private ResultSet mockResultSet;
+
+ private AutoCloseable mockitoCloseable;
+
+ @BeforeEach
+ void setUp() throws SQLException {
+ mockitoCloseable = MockitoAnnotations.openMocks(this);
+ when(mockDataSource.getConnection()).thenReturn(mockConnection);
+ when(mockConnection.prepareStatement(anyString())).thenReturn(mockStatement);
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ if (mockitoCloseable != null) {
+ mockitoCloseable.close();
+ }
+ }
+
+ @Test
+ @DisplayName("Should throw exception when DataSource is null")
+ void testConstructorWithNullDataSource() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> new MysqlSession(null),
+ "DataSource cannot be null");
+ }
+
+ @Test
+ @DisplayName("Should throw exception when DataSource is null with createIfNotExist flag")
+ void testConstructorWithNullDataSourceAndCreateIfNotExist() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> new MysqlSession(null, true),
+ "DataSource cannot be null");
+ }
+
+ @Test
+ @DisplayName("Should create session with createIfNotExist=true")
+ void testConstructorWithCreateIfNotExistTrue() throws SQLException {
+ when(mockStatement.execute()).thenReturn(true);
+
+ MysqlSession session = new MysqlSession(mockDataSource, true);
+
+ assertEquals("agentscope", session.getDatabaseName());
+ assertEquals("agentscope_sessions", session.getTableName());
+ assertEquals(mockDataSource, session.getDataSource());
+ }
+
+ @Test
+ @DisplayName("Should throw exception when database does not exist and createIfNotExist=false")
+ void testConstructorWithCreateIfNotExistFalseAndDatabaseNotExist() throws SQLException {
+ when(mockStatement.executeQuery()).thenReturn(mockResultSet);
+ when(mockResultSet.next()).thenReturn(false);
+
+ assertThrows(
+ IllegalStateException.class,
+ () -> new MysqlSession(mockDataSource, false),
+ "Database does not exist");
+ }
+
+ @Test
+ @DisplayName("Should throw exception when table does not exist and createIfNotExist=false")
+ void testConstructorWithCreateIfNotExistFalseAndTableNotExist() throws SQLException {
+ when(mockStatement.executeQuery()).thenReturn(mockResultSet);
+ when(mockResultSet.next()).thenReturn(true, false);
+
+ assertThrows(
+ IllegalStateException.class,
+ () -> new MysqlSession(mockDataSource, false),
+ "Table does not exist");
+ }
+
+ @Test
+ @DisplayName("Should create session when both database and table exist")
+ void testConstructorWithCreateIfNotExistFalseAndBothExist() throws SQLException {
+ when(mockStatement.executeQuery()).thenReturn(mockResultSet);
+ when(mockResultSet.next()).thenReturn(true, true);
+
+ MysqlSession session = new MysqlSession(mockDataSource, false);
+
+ assertEquals("agentscope", session.getDatabaseName());
+ assertEquals("agentscope_sessions", session.getTableName());
+ }
+
+ @Test
+ @DisplayName("Should create session with custom database and table name")
+ void testConstructorWithCustomDatabaseAndTableName() throws SQLException {
+ when(mockStatement.execute()).thenReturn(true);
+
+ MysqlSession session = new MysqlSession(mockDataSource, "custom_db", "custom_table", true);
+
+ assertEquals("custom_db", session.getDatabaseName());
+ assertEquals("custom_table", session.getTableName());
+ }
+
+ @Test
+ @DisplayName("Should use default database name when null is provided")
+ void testConstructorWithNullDatabaseNameUsesDefault() throws SQLException {
+ when(mockStatement.execute()).thenReturn(true);
+
+ MysqlSession session = new MysqlSession(mockDataSource, null, "custom_table", true);
+
+ assertEquals("agentscope", session.getDatabaseName());
+ assertEquals("custom_table", session.getTableName());
+ }
+
+ @Test
+ @DisplayName("Should use default database name when empty string is provided")
+ void testConstructorWithEmptyDatabaseNameUsesDefault() throws SQLException {
+ when(mockStatement.execute()).thenReturn(true);
+
+ MysqlSession session = new MysqlSession(mockDataSource, " ", "custom_table", true);
+
+ assertEquals("agentscope", session.getDatabaseName());
+ assertEquals("custom_table", session.getTableName());
+ }
+
+ @Test
+ @DisplayName("Should use default table name when null is provided")
+ void testConstructorWithNullTableNameUsesDefault() throws SQLException {
+ when(mockStatement.execute()).thenReturn(true);
+
+ MysqlSession session = new MysqlSession(mockDataSource, "custom_db", null, true);
+
+ assertEquals("custom_db", session.getDatabaseName());
+ assertEquals("agentscope_sessions", session.getTableName());
+ }
+
+ @Test
+ @DisplayName("Should use default table name when empty string is provided")
+ void testConstructorWithEmptyTableNameUsesDefault() throws SQLException {
+ when(mockStatement.execute()).thenReturn(true);
+
+ MysqlSession session = new MysqlSession(mockDataSource, "custom_db", "", true);
+
+ assertEquals("custom_db", session.getDatabaseName());
+ assertEquals("agentscope_sessions", session.getTableName());
+ }
+
+ @Test
+ @DisplayName("Should get DataSource correctly")
+ void testGetDataSource() throws SQLException {
+ when(mockStatement.execute()).thenReturn(true);
+
+ MysqlSession session = new MysqlSession(mockDataSource, true);
+ assertEquals(mockDataSource, session.getDataSource());
+ }
+
+ @Test
+ @DisplayName("Should save and get single state correctly")
+ void testSaveAndGetSingleState() throws SQLException {
+ when(mockStatement.execute()).thenReturn(true);
+ when(mockStatement.executeUpdate()).thenReturn(1);
+ when(mockStatement.executeQuery()).thenReturn(mockResultSet);
+ when(mockResultSet.next()).thenReturn(true);
+ when(mockResultSet.getString("state_data"))
+ .thenReturn("{\"value\":\"test_value\",\"count\":42}");
+
+ MysqlSession session = new MysqlSession(mockDataSource, true);
+ SessionKey sessionKey = SimpleSessionKey.of("session1");
+ TestState state = new TestState("test_value", 42);
+
+ // Save state
+ session.save(sessionKey, "testModule", state);
+
+ // Verify save operations
+ verify(mockStatement, atLeast(1)).executeUpdate();
+
+ // Get state
+ Optional This makes the E2E tests runnable in CI without provisioning a real MySQL instance and without
- * requiring any environment variables.
- */
-@Tag("e2e")
-@Execution(ExecutionMode.CONCURRENT)
-@DisplayName("Session MySQL Storage E2E Tests")
-class MysqlSessionE2ETest {
-
- private String createdSchemaName;
- private DataSource dataSource;
-
- @AfterEach
- void cleanupDatabase() {
- if (dataSource == null || createdSchemaName == null) {
- return;
- }
- try (Connection conn = dataSource.getConnection();
- Statement stmt = conn.createStatement()) {
- stmt.execute("DROP SCHEMA IF EXISTS " + createdSchemaName + " CASCADE");
- } catch (SQLException e) {
- // best-effort cleanup
- System.err.println(
- "Failed to drop e2e schema " + createdSchemaName + ": " + e.getMessage());
- } finally {
- createdSchemaName = null;
- dataSource = null;
- }
- }
-
- @Test
- @DisplayName("Smoke: auto-create database/table + save/load/list/delete flow")
- void testMysqlSessionEndToEndFlow() {
- System.out.println("\n=== Test: MysqlSession E2E Flow ===");
-
- dataSource = createH2DataSource();
- String schemaName = generateSafeIdentifier("AGENTSCOPE_E2E").toUpperCase();
- String tableName = generateSafeIdentifier("AGENTSCOPE_SESSIONS").toUpperCase();
- createdSchemaName = schemaName;
-
- initSchemaAndTable(dataSource, schemaName, tableName);
- MysqlSession session = new MysqlSession(dataSource, schemaName, tableName, false);
-
- // Prepare test states
- TestState stateA = new TestState("hello", 1);
- TestState stateB = new TestState("world", 2);
-
- String sessionIdStr = "mysql_e2e_session_" + UUID.randomUUID();
- SessionKey sessionKey = SimpleSessionKey.of(sessionIdStr);
-
- // Save single states
- session.save(sessionKey, "moduleA", stateA);
- session.save(sessionKey, "moduleB", stateB);
- assertTrue(session.exists(sessionKey));
-
- // Load states
- Optional This makes the E2E tests runnable in CI without provisioning a real MySQL instance and without
+ * requiring any environment variables.
+ */
+@Tag("e2e")
+@Execution(ExecutionMode.CONCURRENT)
+@DisplayName("Session MySQL Storage E2E Tests")
+class MysqlSessionE2ETest {
+
+ private String createdSchemaName;
+ private DataSource dataSource;
+
+ @AfterEach
+ void cleanupDatabase() {
+ if (dataSource == null || createdSchemaName == null) {
+ return;
+ }
+ try (Connection conn = dataSource.getConnection();
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("DROP SCHEMA IF EXISTS " + createdSchemaName + " CASCADE");
+ } catch (SQLException e) {
+ // best-effort cleanup
+ System.err.println(
+ "Failed to drop e2e schema " + createdSchemaName + ": " + e.getMessage());
+ } finally {
+ createdSchemaName = null;
+ dataSource = null;
+ }
+ }
+
+ @Test
+ @DisplayName("Smoke: auto-create database/table + save/load/list/delete flow")
+ void testMysqlSessionEndToEndFlow() {
+ System.out.println("\n=== Test: MysqlSession E2E Flow ===");
+
+ dataSource = createH2DataSource();
+ String schemaName = generateSafeIdentifier("AGENTSCOPE_E2E").toUpperCase();
+ String tableName = generateSafeIdentifier("AGENTSCOPE_SESSIONS").toUpperCase();
+ createdSchemaName = schemaName;
+
+ initSchemaAndTable(dataSource, schemaName, tableName);
+ MysqlSession session = new MysqlSession(dataSource, schemaName, tableName, false);
+
+ // Prepare test states
+ TestState stateA = new TestState("hello", 1);
+ TestState stateB = new TestState("world", 2);
+
+ String sessionIdStr = "mysql_e2e_session_" + UUID.randomUUID();
+ SessionKey sessionKey = SimpleSessionKey.of(sessionIdStr);
+
+ // Save single states
+ session.save(sessionKey, "moduleA", stateA);
+ session.save(sessionKey, "moduleB", stateB);
+ assertTrue(session.exists(sessionKey));
+
+ // Load states
+ Optional
- *
- *
- *
- * CREATE TABLE IF NOT EXISTS agentscope_sessions (
- * session_id VARCHAR(255) NOT NULL,
- * state_key VARCHAR(255) NOT NULL,
- * item_index INT NOT NULL DEFAULT 0,
- * state_data LONGTEXT NOT NULL,
- * created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- * updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
- * PRIMARY KEY (session_id, state_key, item_index)
- * ) DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
- *
- *
- *
- *
- */
-public class MysqlSession implements Session {
-
- private static final String DEFAULT_DATABASE_NAME = "agentscope";
- private static final String DEFAULT_TABLE_NAME = "agentscope_sessions";
-
- /** Suffix for hash storage keys. */
- private static final String HASH_KEY_SUFFIX = ":_hash";
-
- /** item_index value for single state values. */
- private static final int SINGLE_STATE_INDEX = 0;
-
- /**
- * Pattern for validating database and table names. Only allows alphanumeric characters,
- * underscores, and hyphens, must start with letter or underscore. This prevents SQL injection
- * attacks through malicious database/table names.
- *
- *
- *
- *
- * @param sessionKey the session identifier
- * @param key the state key (e.g., "memory_messages")
- * @param values the list of state values to save
- */
- @Override
- public void save(SessionKey sessionKey, String key, List extends State> values) {
- String sessionId = sessionKey.toIdentifier();
- validateSessionId(sessionId);
- validateStateKey(key);
-
- if (values.isEmpty()) {
- return;
- }
-
- String hashKey = key + HASH_KEY_SUFFIX;
-
- try (Connection conn = dataSource.getConnection()) {
- // Compute current hash
- String currentHash = ListHashUtil.computeHash(values);
-
- // Get stored hash
- String storedHash = getStoredHash(conn, sessionId, hashKey);
-
- // Get existing count
- int existingCount = getListCount(conn, sessionId, key);
-
- // Determine if full rewrite is needed
- boolean needsFullRewrite =
- ListHashUtil.needsFullRewrite(
- currentHash, storedHash, values.size(), existingCount);
-
- if (needsFullRewrite) {
- // Transaction: delete all + insert all
- conn.setAutoCommit(false);
- try {
- deleteListItems(conn, sessionId, key);
- insertAllItems(conn, sessionId, key, values);
- saveHash(conn, sessionId, hashKey, currentHash);
- conn.commit();
- } catch (Exception e) {
- conn.rollback();
- throw e;
- } finally {
- conn.setAutoCommit(true);
- }
- } else if (values.size() > existingCount) {
- // Incremental append
- List extends State> newItems = values.subList(existingCount, values.size());
- insertItems(conn, sessionId, key, newItems, existingCount);
- saveHash(conn, sessionId, hashKey, currentHash);
- }
- // else: no change, skip
-
- } catch (Exception e) {
- throw new RuntimeException("Failed to save list: " + key, e);
- }
- }
-
- /**
- * Get stored hash value for a list.
- *
- * @param conn database connection
- * @param sessionId session identifier
- * @param hashKey the hash key (e.g., "memory_messages:_hash")
- * @return the stored hash, or null if not found
- */
- private String getStoredHash(Connection conn, String sessionId, String hashKey)
- throws SQLException {
- String selectSql =
- "SELECT state_data FROM "
- + getFullTableName()
- + " WHERE session_id = ? AND state_key = ? AND item_index = ?";
-
- try (PreparedStatement stmt = conn.prepareStatement(selectSql)) {
- stmt.setString(1, sessionId);
- stmt.setString(2, hashKey);
- stmt.setInt(3, SINGLE_STATE_INDEX);
-
- try (ResultSet rs = stmt.executeQuery()) {
- if (rs.next()) {
- return rs.getString("state_data");
- }
- return null;
- }
- }
- }
-
- /**
- * Save hash value for a list.
- *
- * @param conn database connection
- * @param sessionId session identifier
- * @param hashKey the hash key
- * @param hash the hash value to save
- */
- private void saveHash(Connection conn, String sessionId, String hashKey, String hash)
- throws SQLException {
- String upsertSql =
- "INSERT INTO "
- + getFullTableName()
- + " (session_id, state_key, item_index, state_data)"
- + " VALUES (?, ?, ?, ?)"
- + " ON DUPLICATE KEY UPDATE state_data = VALUES(state_data)";
-
- try (PreparedStatement stmt = conn.prepareStatement(upsertSql)) {
- stmt.setString(1, sessionId);
- stmt.setString(2, hashKey);
- stmt.setInt(3, SINGLE_STATE_INDEX);
- stmt.setString(4, hash);
- stmt.executeUpdate();
- }
- }
-
- /**
- * Delete all items for a list state.
- *
- * @param conn database connection
- * @param sessionId session identifier
- * @param key the state key
- */
- private void deleteListItems(Connection conn, String sessionId, String key)
- throws SQLException {
- String deleteSql =
- "DELETE FROM " + getFullTableName() + " WHERE session_id = ? AND state_key = ?";
-
- try (PreparedStatement stmt = conn.prepareStatement(deleteSql)) {
- stmt.setString(1, sessionId);
- stmt.setString(2, key);
- stmt.executeUpdate();
- }
- }
-
- /**
- * Insert all items for a list state.
- *
- * @param conn database connection
- * @param sessionId session identifier
- * @param key the state key
- * @param values the values to insert
- */
- private void insertAllItems(
- Connection conn, String sessionId, String key, List extends State> values)
- throws Exception {
- insertItems(conn, sessionId, key, values, 0);
- }
-
- /**
- * Insert items for a list state starting at a given index.
- *
- * @param conn database connection
- * @param sessionId session identifier
- * @param key the state key
- * @param items the items to insert
- * @param startIndex the starting index for item_index
- */
- private void insertItems(
- Connection conn,
- String sessionId,
- String key,
- List extends State> items,
- int startIndex)
- throws Exception {
- String insertSql =
- "INSERT INTO "
- + getFullTableName()
- + " (session_id, state_key, item_index, state_data)"
- + " VALUES (?, ?, ?, ?)";
-
- try (PreparedStatement stmt = conn.prepareStatement(insertSql)) {
- int index = startIndex;
- for (State item : items) {
- String json = JsonUtils.getJsonCodec().toJson(item);
- stmt.setString(1, sessionId);
- stmt.setString(2, key);
- stmt.setInt(3, index);
- stmt.setString(4, json);
- stmt.addBatch();
- index++;
- }
- stmt.executeBatch();
- }
- }
-
- /**
- * Get the count of items in a list state (max index + 1).
- */
- private int getListCount(Connection conn, String sessionId, String key) throws SQLException {
- String selectSql =
- "SELECT MAX(item_index) as max_index FROM "
- + getFullTableName()
- + " WHERE session_id = ? AND state_key = ?";
-
- try (PreparedStatement stmt = conn.prepareStatement(selectSql)) {
- stmt.setString(1, sessionId);
- stmt.setString(2, key);
-
- try (ResultSet rs = stmt.executeQuery()) {
- if (rs.next()) {
- int maxIndex = rs.getInt("max_index");
- if (rs.wasNull()) {
- return 0;
- }
- return maxIndex + 1;
- }
- return 0;
- }
- }
- }
-
- @Override
- public
+ *
+ *
+ *
+ * CREATE TABLE IF NOT EXISTS agentscope_sessions (
+ * session_id VARCHAR(255) NOT NULL,
+ * state_key VARCHAR(255) NOT NULL,
+ * item_index INT NOT NULL DEFAULT 0,
+ * state_data LONGTEXT NOT NULL,
+ * created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+ * updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+ * PRIMARY KEY (session_id, state_key, item_index)
+ * ) DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
+ *
+ *
+ *
+ *
+ */
+public class MysqlSession implements Session {
+
+ private static final String DEFAULT_DATABASE_NAME = "agentscope";
+ private static final String DEFAULT_TABLE_NAME = "agentscope_sessions";
+
+ /** Suffix for hash storage keys. */
+ private static final String HASH_KEY_SUFFIX = ":_hash";
+
+ /** item_index value for single state values. */
+ private static final int SINGLE_STATE_INDEX = 0;
+
+ /**
+ * Pattern for validating database and table names. Only allows alphanumeric characters,
+ * underscores, and hyphens, must start with letter or underscore. This prevents SQL injection
+ * attacks through malicious database/table names.
+ *
+ *
+ *
+ *
+ * @param sessionKey the session identifier
+ * @param key the state key (e.g., "memory_messages")
+ * @param values the list of state values to save
+ */
+ @Override
+ public void save(SessionKey sessionKey, String key, List extends State> values) {
+ String sessionId = sessionKey.toIdentifier();
+ validateSessionId(sessionId);
+ validateStateKey(key);
+
+ if (values.isEmpty()) {
+ return;
+ }
+
+ String hashKey = key + HASH_KEY_SUFFIX;
+
+ try (Connection conn = dataSource.getConnection()) {
+ // Compute current hash
+ String currentHash = ListHashUtil.computeHash(values);
+
+ // Get stored hash
+ String storedHash = getStoredHash(conn, sessionId, hashKey);
+
+ // Get existing count
+ int existingCount = getListCount(conn, sessionId, key);
+
+ // Determine if full rewrite is needed
+ boolean needsFullRewrite =
+ ListHashUtil.needsFullRewrite(
+ currentHash, storedHash, values.size(), existingCount);
+
+ if (needsFullRewrite) {
+ executeWriteTransaction(
+ conn,
+ () -> {
+ deleteListItems(conn, sessionId, key);
+ insertAllItems(conn, sessionId, key, values);
+ saveHash(conn, sessionId, hashKey, currentHash);
+ return null;
+ });
+ } else if (values.size() > existingCount) {
+ List extends State> newItems = values.subList(existingCount, values.size());
+ executeWriteTransaction(
+ conn,
+ () -> {
+ insertItems(conn, sessionId, key, newItems, existingCount);
+ saveHash(conn, sessionId, hashKey, currentHash);
+ return null;
+ });
+ }
+ // else: no change, skip
+
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to save list: " + key, e);
+ }
+ }
+
+ /**
+ * Get stored hash value for a list.
+ *
+ * @param conn database connection
+ * @param sessionId session identifier
+ * @param hashKey the hash key (e.g., "memory_messages:_hash")
+ * @return the stored hash, or null if not found
+ */
+ private String getStoredHash(Connection conn, String sessionId, String hashKey)
+ throws SQLException {
+ String selectSql =
+ "SELECT state_data FROM "
+ + getFullTableName()
+ + " WHERE session_id = ? AND state_key = ? AND item_index = ?";
+
+ try (PreparedStatement stmt = conn.prepareStatement(selectSql)) {
+ stmt.setString(1, sessionId);
+ stmt.setString(2, hashKey);
+ stmt.setInt(3, SINGLE_STATE_INDEX);
+
+ try (ResultSet rs = stmt.executeQuery()) {
+ if (rs.next()) {
+ return rs.getString("state_data");
+ }
+ return null;
+ }
+ }
+ }
+
+ /**
+ * Save hash value for a list.
+ *
+ * @param conn database connection
+ * @param sessionId session identifier
+ * @param hashKey the hash key
+ * @param hash the hash value to save
+ */
+ private void saveHash(Connection conn, String sessionId, String hashKey, String hash)
+ throws SQLException {
+ String upsertSql =
+ "INSERT INTO "
+ + getFullTableName()
+ + " (session_id, state_key, item_index, state_data)"
+ + " VALUES (?, ?, ?, ?)"
+ + " ON DUPLICATE KEY UPDATE state_data = VALUES(state_data)";
+
+ try (PreparedStatement stmt = conn.prepareStatement(upsertSql)) {
+ stmt.setString(1, sessionId);
+ stmt.setString(2, hashKey);
+ stmt.setInt(3, SINGLE_STATE_INDEX);
+ stmt.setString(4, hash);
+ stmt.executeUpdate();
+ }
+ }
+
+ /**
+ * Delete all items for a list state.
+ *
+ * @param conn database connection
+ * @param sessionId session identifier
+ * @param key the state key
+ */
+ private void deleteListItems(Connection conn, String sessionId, String key)
+ throws SQLException {
+ String deleteSql =
+ "DELETE FROM " + getFullTableName() + " WHERE session_id = ? AND state_key = ?";
+
+ try (PreparedStatement stmt = conn.prepareStatement(deleteSql)) {
+ stmt.setString(1, sessionId);
+ stmt.setString(2, key);
+ stmt.executeUpdate();
+ }
+ }
+
+ /**
+ * Insert all items for a list state.
+ *
+ * @param conn database connection
+ * @param sessionId session identifier
+ * @param key the state key
+ * @param values the values to insert
+ */
+ private void insertAllItems(
+ Connection conn, String sessionId, String key, List extends State> values)
+ throws Exception {
+ insertItems(conn, sessionId, key, values, 0);
+ }
+
+ /**
+ * Insert items for a list state starting at a given index.
+ *
+ * @param conn database connection
+ * @param sessionId session identifier
+ * @param key the state key
+ * @param items the items to insert
+ * @param startIndex the starting index for item_index
+ */
+ private void insertItems(
+ Connection conn,
+ String sessionId,
+ String key,
+ List extends State> items,
+ int startIndex)
+ throws Exception {
+ String insertSql =
+ "INSERT INTO "
+ + getFullTableName()
+ + " (session_id, state_key, item_index, state_data)"
+ + " VALUES (?, ?, ?, ?)";
+
+ try (PreparedStatement stmt = conn.prepareStatement(insertSql)) {
+ int index = startIndex;
+ for (State item : items) {
+ String json = JsonUtils.getJsonCodec().toJson(item);
+ stmt.setString(1, sessionId);
+ stmt.setString(2, key);
+ stmt.setInt(3, index);
+ stmt.setString(4, json);
+ stmt.addBatch();
+ index++;
+ }
+ stmt.executeBatch();
+ }
+ }
+
+ /**
+ * Get the count of items in a list state (max index + 1).
+ */
+ private int getListCount(Connection conn, String sessionId, String key) throws SQLException {
+ String selectSql =
+ "SELECT MAX(item_index) as max_index FROM "
+ + getFullTableName()
+ + " WHERE session_id = ? AND state_key = ?";
+
+ try (PreparedStatement stmt = conn.prepareStatement(selectSql)) {
+ stmt.setString(1, sessionId);
+ stmt.setString(2, key);
+
+ try (ResultSet rs = stmt.executeQuery()) {
+ if (rs.next()) {
+ int maxIndex = rs.getInt("max_index");
+ if (rs.wasNull()) {
+ return 0;
+ }
+ return maxIndex + 1;
+ }
+ return 0;
+ }
+ }
+ }
+
+ @Override
+ public