From 454cd9cfab0bce2a8cc18eabed17c7991cabc11e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 13 Apr 2026 19:46:48 +0000 Subject: [PATCH 1/2] Initial plan From d2db6acf03aad211fe83cd8e8e1a42d1144ca28b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 13 Apr 2026 19:58:13 +0000 Subject: [PATCH 2/2] Fix coverage threshold, improve cache worker configurability, use logging module, and harden SSH in benchmark scripts Agent-Logs-Url: https://github.com/prestodb/pbench/sessions/c90caf7f-fdb0-47b9-ae4d-30eac8b06fb4 Co-authored-by: ethanyzhang <6643318+ethanyzhang@users.noreply.github.com> --- .githooks/pre-commit | 4 +- benchmarks/scripts/cache_cleaning_workers.py | 64 +++++++++++++++----- benchmarks/scripts/mysql_utils.py | 11 +++- benchmarks/scripts/system_utils.py | 9 +-- 4 files changed, 64 insertions(+), 24 deletions(-) diff --git a/.githooks/pre-commit b/.githooks/pre-commit index 37f3c1dc..e82d14d8 100755 --- a/.githooks/pre-commit +++ b/.githooks/pre-commit @@ -39,8 +39,8 @@ fi echo "Checking coverage..." COVERAGE=$(go tool cover -func=coverage.out 2>/dev/null | grep total | awk '{print $3}' | sed 's/%//') if [ -n "$COVERAGE" ]; then - COVERAGE_INT=$(awk -v c="$COVERAGE" 'BEGIN { printf "%d", c * 100 }') - if [ "$COVERAGE_INT" -lt 5000 ]; then + COVERAGE_INT=$(awk -v c="$COVERAGE" 'BEGIN { printf "%d", c }') + if [ "$COVERAGE_INT" -lt 50 ]; then echo "Coverage ${COVERAGE}% is below 50% threshold" FAILED=1 else diff --git a/benchmarks/scripts/cache_cleaning_workers.py b/benchmarks/scripts/cache_cleaning_workers.py index 6587ceb2..668d77a0 100644 --- a/benchmarks/scripts/cache_cleaning_workers.py +++ b/benchmarks/scripts/cache_cleaning_workers.py @@ -3,6 +3,7 @@ from system_utils import execute_ssh_command import json import argparse +import sys def get_workers_public_ips(data): data = json.loads(data) @@ -13,8 +14,14 @@ def get_workers_public_ips(data): worker_ips.append(value) return worker_ips -def cleanup_worker_disk_cache(worker_public_ips, login_user, ssh_key_path, worker_http_port): - ssd_cache_clean_command = f"docker exec $(docker ps -q --filter 'name=^presto_workers') curl -sS -X GET http://localhost:{worker_http_port}/v1/operation/server/clearCache?type=ssd" +def build_cache_clean_command(worker_http_port, cache_type, container_name_pattern): + return ( + f"docker exec $(docker ps -q --filter 'name={container_name_pattern}') " + f"curl -sS -X GET http://localhost:{worker_http_port}/v1/operation/server/clearCache?type={cache_type}" + ) + +def cleanup_worker_disk_cache(worker_public_ips, login_user, ssh_key_path, worker_http_port, container_name_pattern): + ssd_cache_clean_command = build_cache_clean_command(worker_http_port, "ssd", container_name_pattern) for worker_ip in worker_public_ips: execute_ssh_command(worker_ip, login_user, ssh_key_path, ssd_cache_clean_command) print("cleanup_worker_disk_cache is successful!") @@ -26,8 +33,8 @@ def cleanup_worker_os_cache(worker_public_ips, login_user, ssh_key_path): execute_ssh_command(worker_ip, login_user, ssh_key_path, command) print("cleanup_worker_os_cache is successful!") -def cleanup_worker_memory_cache(worker_public_ips, login_user, ssh_key_path, worker_http_port): - memory_cache_clean_command = f"docker exec $(docker ps -q --filter 'name=^presto_workers') curl -sS -X GET http://localhost:{worker_http_port}/v1/operation/server/clearCache?type=memory" +def cleanup_worker_memory_cache(worker_public_ips, login_user, ssh_key_path, worker_http_port, container_name_pattern): + memory_cache_clean_command = build_cache_clean_command(worker_http_port, "memory", container_name_pattern) for worker_ip in worker_public_ips: execute_ssh_command(worker_ip, login_user, ssh_key_path, memory_cache_clean_command) print("cleanup_worker_memory_cache is successful!") @@ -38,6 +45,12 @@ def cleanup_worker_memory_cache(worker_public_ips, login_user, ssh_key_path, wor parser.add_argument('--mysql', required=True, help='Mysql database details') parser.add_argument('--clustername', required=True, help='Presto cluster name') parser.add_argument('--sshkey', required=True, help='SSH key to connect to Presto Vms') + parser.add_argument('--sshuser', default='centos', help='SSH login user for Presto VMs') + parser.add_argument('--worker-http-port', default='8080', help='Presto worker HTTP port (default: 8080)') + parser.add_argument('--container-name-pattern', default='^presto_workers', help='Docker container name filter pattern') + parser.add_argument('--cleanup-disk-cache', action='store_true', help='Cleanup worker disk cache only when specified') + parser.add_argument('--cleanup-os-cache', action='store_true', help='Cleanup worker OS cache only when specified') + parser.add_argument('--cleanup-memory-cache', action='store_true', help='Cleanup worker memory cache only when specified') args = parser.parse_args() @@ -59,27 +72,48 @@ def cleanup_worker_memory_cache(worker_public_ips, login_user, ssh_key_path, wor select_query = "SELECT detail_json FROM presto_clusters WHERE cluster_name = %s" results = execute_mysql_query(connection, select_query, cluster_name) - worker_public_ips = [] - if results: - worker_public_ips = get_workers_public_ips(results[0][0]) + if not results: + print(f"No cluster details found for cluster '{cluster_name}'. Exiting without cleanup.") + if connection.is_connected(): + connection.close() + sys.exit(1) + + worker_public_ips = get_workers_public_ips(results[0][0]) + if not worker_public_ips: + print(f"No worker public IPs found for cluster '{cluster_name}'. Exiting without cleanup.") + if connection.is_connected(): + connection.close() + sys.exit(1) print("worker count = ", len(worker_public_ips)) print("======= worker_public_ips ======") print(worker_public_ips) - is_worker_disk_cache_cleanup_enabled = True - is_worker_os_cache_cleanup_enabled = True - is_worker_memory_cache_cleanup_enabled = True - - worker_http_port = "8080" + selected_any_cleanup = ( + args.cleanup_disk_cache or + args.cleanup_os_cache or + args.cleanup_memory_cache + ) + if selected_any_cleanup: + is_worker_disk_cache_cleanup_enabled = args.cleanup_disk_cache + is_worker_os_cache_cleanup_enabled = args.cleanup_os_cache + is_worker_memory_cache_cleanup_enabled = args.cleanup_memory_cache + else: + is_worker_disk_cache_cleanup_enabled = True + is_worker_os_cache_cleanup_enabled = True + is_worker_memory_cache_cleanup_enabled = True + + worker_http_port = args.worker_http_port + ssh_login_user = args.sshuser + container_name_pattern = args.container_name_pattern if is_worker_disk_cache_cleanup_enabled: - cleanup_worker_disk_cache(worker_public_ips, "centos", args.sshkey, worker_http_port) + cleanup_worker_disk_cache(worker_public_ips, ssh_login_user, args.sshkey, worker_http_port, container_name_pattern) if is_worker_memory_cache_cleanup_enabled: - cleanup_worker_memory_cache(worker_public_ips, "centos", args.sshkey, worker_http_port) + cleanup_worker_memory_cache(worker_public_ips, ssh_login_user, args.sshkey, worker_http_port, container_name_pattern) if is_worker_os_cache_cleanup_enabled: - cleanup_worker_os_cache(worker_public_ips, "centos", args.sshkey) + cleanup_worker_os_cache(worker_public_ips, ssh_login_user, args.sshkey) if connection.is_connected(): connection.close() diff --git a/benchmarks/scripts/mysql_utils.py b/benchmarks/scripts/mysql_utils.py index 37cf13a0..7ff27bcb 100644 --- a/benchmarks/scripts/mysql_utils.py +++ b/benchmarks/scripts/mysql_utils.py @@ -1,5 +1,9 @@ import mysql.connector from mysql.connector import Error +import logging + +logger = logging.getLogger(__name__) + def create_connection(host_name, user_name, user_password, db_name): connection = None @@ -10,12 +14,13 @@ def create_connection(host_name, user_name, user_password, db_name): passwd=user_password, database=db_name ) - print("Connection to Benchmark Database is successful") + logger.info("Connection to Benchmark Database is successful") except Error as e: - print(f"The error '{e}' occurred") + logger.error("The error '%s' occurred", e) return connection + def execute_mysql_query(connection, query, cluster_name): cursor = None try: @@ -24,7 +29,7 @@ def execute_mysql_query(connection, query, cluster_name): result = cursor.fetchall() return result except Error as e: - print(f"The error '{e}' occurred") + logger.error("The error '%s' occurred", e) return [] finally: if cursor is not None: diff --git a/benchmarks/scripts/system_utils.py b/benchmarks/scripts/system_utils.py index 3cf23d61..7f0b7a0f 100644 --- a/benchmarks/scripts/system_utils.py +++ b/benchmarks/scripts/system_utils.py @@ -1,14 +1,15 @@ import paramiko import sys -def execute_ssh_command(worker_ip, login_user, ssh_key_path, command): +def execute_ssh_command(worker_ip, login_user, ssh_key_path, command, timeout=30): ssh = None try: ssh = paramiko.SSHClient() - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - private_key = paramiko.Ed25519Key(filename=ssh_key_path) + ssh.load_system_host_keys() + ssh.set_missing_host_key_policy(paramiko.RejectPolicy()) + private_key = paramiko.PKey.from_path(ssh_key_path) - ssh.connect(hostname=worker_ip, username=login_user, pkey=private_key, timeout=30) + ssh.connect(hostname=worker_ip, username=login_user, pkey=private_key, timeout=timeout) stdin, stdout, stderr = ssh.exec_command(command) stdout_output = stdout.read().decode()