Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .githooks/pre-commit
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ fi
echo "Checking coverage..."
COVERAGE=$(go tool cover -func=coverage.out 2>/dev/null | grep total | awk '{print $3}' | sed 's/%//')
if [ -n "$COVERAGE" ]; then
COVERAGE_INT=$(awk -v c="$COVERAGE" 'BEGIN { printf "%d", c * 100 }')
if [ "$COVERAGE_INT" -lt 5000 ]; then
COVERAGE_INT=$(awk -v c="$COVERAGE" 'BEGIN { printf "%d", c }')
if [ "$COVERAGE_INT" -lt 50 ]; then
echo "Coverage ${COVERAGE}% is below 50% threshold"
FAILED=1
else
Expand Down
64 changes: 49 additions & 15 deletions benchmarks/scripts/cache_cleaning_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from system_utils import execute_ssh_command
import json
import argparse
import sys

def get_workers_public_ips(data):
data = json.loads(data)
Expand All @@ -13,8 +14,14 @@ def get_workers_public_ips(data):
worker_ips.append(value)
return worker_ips

def cleanup_worker_disk_cache(worker_public_ips, login_user, ssh_key_path, worker_http_port):
ssd_cache_clean_command = f"docker exec $(docker ps -q --filter 'name=^presto_workers') curl -sS -X GET http://localhost:{worker_http_port}/v1/operation/server/clearCache?type=ssd"
def build_cache_clean_command(worker_http_port, cache_type, container_name_pattern):
return (
f"docker exec $(docker ps -q --filter 'name={container_name_pattern}') "
f"curl -sS -X GET http://localhost:{worker_http_port}/v1/operation/server/clearCache?type={cache_type}"
)

def cleanup_worker_disk_cache(worker_public_ips, login_user, ssh_key_path, worker_http_port, container_name_pattern):
ssd_cache_clean_command = build_cache_clean_command(worker_http_port, "ssd", container_name_pattern)
for worker_ip in worker_public_ips:
execute_ssh_command(worker_ip, login_user, ssh_key_path, ssd_cache_clean_command)
print("cleanup_worker_disk_cache is successful!")
Expand All @@ -26,8 +33,8 @@ def cleanup_worker_os_cache(worker_public_ips, login_user, ssh_key_path):
execute_ssh_command(worker_ip, login_user, ssh_key_path, command)
print("cleanup_worker_os_cache is successful!")

def cleanup_worker_memory_cache(worker_public_ips, login_user, ssh_key_path, worker_http_port):
memory_cache_clean_command = f"docker exec $(docker ps -q --filter 'name=^presto_workers') curl -sS -X GET http://localhost:{worker_http_port}/v1/operation/server/clearCache?type=memory"
def cleanup_worker_memory_cache(worker_public_ips, login_user, ssh_key_path, worker_http_port, container_name_pattern):
memory_cache_clean_command = build_cache_clean_command(worker_http_port, "memory", container_name_pattern)
for worker_ip in worker_public_ips:
execute_ssh_command(worker_ip, login_user, ssh_key_path, memory_cache_clean_command)
print("cleanup_worker_memory_cache is successful!")
Expand All @@ -38,6 +45,12 @@ def cleanup_worker_memory_cache(worker_public_ips, login_user, ssh_key_path, wor
parser.add_argument('--mysql', required=True, help='Mysql database details')
parser.add_argument('--clustername', required=True, help='Presto cluster name')
parser.add_argument('--sshkey', required=True, help='SSH key to connect to Presto Vms')
parser.add_argument('--sshuser', default='centos', help='SSH login user for Presto VMs')
parser.add_argument('--worker-http-port', default='8080', help='Presto worker HTTP port (default: 8080)')
parser.add_argument('--container-name-pattern', default='^presto_workers', help='Docker container name filter pattern')
parser.add_argument('--cleanup-disk-cache', action='store_true', help='Cleanup worker disk cache only when specified')
parser.add_argument('--cleanup-os-cache', action='store_true', help='Cleanup worker OS cache only when specified')
parser.add_argument('--cleanup-memory-cache', action='store_true', help='Cleanup worker memory cache only when specified')

args = parser.parse_args()

Expand All @@ -59,27 +72,48 @@ def cleanup_worker_memory_cache(worker_public_ips, login_user, ssh_key_path, wor
select_query = "SELECT detail_json FROM presto_clusters WHERE cluster_name = %s"
results = execute_mysql_query(connection, select_query, cluster_name)

worker_public_ips = []
if results:
worker_public_ips = get_workers_public_ips(results[0][0])
if not results:
print(f"No cluster details found for cluster '{cluster_name}'. Exiting without cleanup.")
if connection.is_connected():
connection.close()
sys.exit(1)

worker_public_ips = get_workers_public_ips(results[0][0])
if not worker_public_ips:
print(f"No worker public IPs found for cluster '{cluster_name}'. Exiting without cleanup.")
if connection.is_connected():
connection.close()
sys.exit(1)

print("worker count = ", len(worker_public_ips))
print("======= worker_public_ips ======")
print(worker_public_ips)

is_worker_disk_cache_cleanup_enabled = True
is_worker_os_cache_cleanup_enabled = True
is_worker_memory_cache_cleanup_enabled = True

worker_http_port = "8080"
selected_any_cleanup = (
args.cleanup_disk_cache or
args.cleanup_os_cache or
args.cleanup_memory_cache
)
if selected_any_cleanup:
is_worker_disk_cache_cleanup_enabled = args.cleanup_disk_cache
is_worker_os_cache_cleanup_enabled = args.cleanup_os_cache
is_worker_memory_cache_cleanup_enabled = args.cleanup_memory_cache
else:
is_worker_disk_cache_cleanup_enabled = True
is_worker_os_cache_cleanup_enabled = True
is_worker_memory_cache_cleanup_enabled = True

worker_http_port = args.worker_http_port
ssh_login_user = args.sshuser
container_name_pattern = args.container_name_pattern
if is_worker_disk_cache_cleanup_enabled:
cleanup_worker_disk_cache(worker_public_ips, "centos", args.sshkey, worker_http_port)
cleanup_worker_disk_cache(worker_public_ips, ssh_login_user, args.sshkey, worker_http_port, container_name_pattern)

if is_worker_memory_cache_cleanup_enabled:
cleanup_worker_memory_cache(worker_public_ips, "centos", args.sshkey, worker_http_port)
cleanup_worker_memory_cache(worker_public_ips, ssh_login_user, args.sshkey, worker_http_port, container_name_pattern)

if is_worker_os_cache_cleanup_enabled:
cleanup_worker_os_cache(worker_public_ips, "centos", args.sshkey)
cleanup_worker_os_cache(worker_public_ips, ssh_login_user, args.sshkey)

if connection.is_connected():
connection.close()
Expand Down
11 changes: 8 additions & 3 deletions benchmarks/scripts/mysql_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import mysql.connector
from mysql.connector import Error
import logging

logger = logging.getLogger(__name__)


def create_connection(host_name, user_name, user_password, db_name):
connection = None
Expand All @@ -10,12 +14,13 @@ def create_connection(host_name, user_name, user_password, db_name):
passwd=user_password,
database=db_name
)
print("Connection to Benchmark Database is successful")
logger.info("Connection to Benchmark Database is successful")
except Error as e:
print(f"The error '{e}' occurred")
logger.error("The error '%s' occurred", e)

return connection


def execute_mysql_query(connection, query, cluster_name):
cursor = None
try:
Expand All @@ -24,7 +29,7 @@ def execute_mysql_query(connection, query, cluster_name):
result = cursor.fetchall()
return result
except Error as e:
print(f"The error '{e}' occurred")
logger.error("The error '%s' occurred", e)
return []
finally:
if cursor is not None:
Expand Down
9 changes: 5 additions & 4 deletions benchmarks/scripts/system_utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import paramiko
import sys

def execute_ssh_command(worker_ip, login_user, ssh_key_path, command):
def execute_ssh_command(worker_ip, login_user, ssh_key_path, command, timeout=30):
ssh = None
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
private_key = paramiko.Ed25519Key(filename=ssh_key_path)
ssh.load_system_host_keys()
ssh.set_missing_host_key_policy(paramiko.RejectPolicy())
private_key = paramiko.PKey.from_path(ssh_key_path)

ssh.connect(hostname=worker_ip, username=login_user, pkey=private_key, timeout=30)
ssh.connect(hostname=worker_ip, username=login_user, pkey=private_key, timeout=timeout)

stdin, stdout, stderr = ssh.exec_command(command)
stdout_output = stdout.read().decode()
Expand Down
Loading