Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 194 additions & 0 deletions ncsa_build/ClassHelper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
from typing import List, Dict, Tuple
from datetime import datetime

class PkgObj():
def __init__(self, data, path):
self.path = path
self.xalt_run_uuid = data.get('xalt_run_uuid')
self.pkg_version = data.get('pkg_version')
self.pkg_name = data.get('pkg_name')
self.pkg_path = data.get('pkg_path')
def writeToDB(self, conn):
"""
Write new package data to the database using provided MariaDB connection.
Uses xalt_run_uuid directly as run_id and 'python' as program.

Args:
conn: MariaDB connection object

Returns:
pkg_id: The ID of the inserted package record
"""
cursor = conn.cursor()
try:
cursor.execute("""
INSERT INTO xalt_pkg
(run_id, program, pkg_name, pkg_version, pkg_path)
VALUES (%s, 'python', %s, %s, %s)
""", (
int(self.xalt_run_uuid), # Ensure run_id is an integer since it's int(11)
self.pkg_name[:64], # Respect varchar(64) limit
self.pkg_version[:32] if self.pkg_version else None, # Respect varchar(32) limit
self.pkg_path[:1024] if self.pkg_path else None # Respect varchar(1024) limit
))

pkg_id = cursor.lastrowid
conn.commit()

return pkg_id

except Exception as e:
conn.rollback()
raise Exception(f"Error writing package to database: {str(e)}")
finally:
cursor.close()


class LinkObj:
def __init__(self, json_data: dict, path):
self.path = path
resultT = json_data["resultT"] # Direct access since the field is guaranteed to exist
print(resultT)
self.xalt_run_uuid = resultT["uuid"]
self.crc = json_data["crc"]

# Extract resultT fields
self.link_program = resultT["link_program"]
self.link_path = resultT["link_path"]
self.build_user = resultT["build_user"]
self.build_epoch = resultT["build_epoch"]
self.exec_path = resultT["exec_path"]
self.hash_id = resultT["hash_id"]
self.wd = resultT["wd"]
self.build_syshost = resultT["build_syshost"]

# Extract linkA, function, and link_line fields
self.linkA = json_data["linkA"]
self.function = json_data["function"]
self.link_line = json_data["link_line"]

def __repr__(self):
return (f"LinkObj(xalt_run_uuid={self.xalt_run_uuid}, crc={self.crc}, link_program={self.link_program}, "
f"link_path={self.link_path}, build_user={self.build_user}, build_epoch={self.build_epoch}, "
f"exec_path={self.exec_path}, hash_id={self.hash_id}, wd={self.wd}, build_syshost={self.build_syshost}, "
f"linkA={self.linkA}, function={self.function}, link_line={self.link_line})")
def writeToDB(self, conn):
"""
Write link data to the database using provided MariaDB connection.
Uses xalt_run_uuid directly as uuid and respects the column constraints.

Args:
conn: MariaDB connection object

Returns:
link_id: The ID of the inserted link record
"""
cursor = conn.cursor()
try:
cursor.execute("""
INSERT INTO links
(hash_id, date, link_program, link_path, link_module_name,
link_line, cwd, build_user, build_syshost, build_epoch,
exec_path, uuid)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
self.hash_id[:40], # Ensure hash_id is 40 chars (char(40))
# datetime.now().strftime('%Y-%m-%d %H:%M:%S'), # Current timestamp for date
self.link_program[:64], # Respect varchar(64) limit for link_program
self.link_path[:1024], # Respect varchar(1024) limit for link_path
self.linkA[:64] if self.linkA else None, # Respect varchar(64) limit for link_module_name
self.link_line, # link_line is a blob, can be passed as-is
self.wd[:1024] if self.wd else None, # Respect varchar(1024) limit for cwd
self.build_user[:64], # Respect varchar(64) limit for build_user
self.build_syshost[:64], # Respect varchar(64) limit for build_syshost
self.build_epoch, # build_epoch is a double
self.exec_path[:1024], # Respect varchar(1024) limit for exec_path
self.xalt_run_uuid # uuid for the link record
))

link_id = cursor.lastrowid # Get the ID of the inserted link record
conn.commit() # Commit the transaction

return link_id

except Exception as e:
conn.rollback() # Rollback in case of error
raise Exception(f"Error writing link to database: {str(e)}")

finally:
return # Close the cursor after operation





class RunObj:
def __init__(self, json_data: dict, path):
# Extract fields from the json_data dictionary
self.path = path
self.crc = json_data.get("crc", "")
self.cmdlineA = json_data.get("cmdlineA", [])
self.hash_id = json_data.get("hash_id", "")

# libA is a list of strings
self.libA = json_data.get("libA", [])

# ptA is a list of dictionaries with each dictionary having cmd_name, cmd_path, pid, and cmdlineA
self.ptA = [
{
"cmd_name": pt.get("cmd_name", ""),
"cmd_path": pt.get("cmd_path", ""),
"pid": pt.get("pid", 0),
"cmdlineA": pt.get("cmdlineA", [])
}
for pt in json_data.get("ptA", [])
]

# envT is a dictionary of unlimited key-value pairs (environment variables)
self.envT = json_data.get("envT", {})

# userT is a dictionary with information about the user and job environment
self.userT = {
"syshost": json_data.get("userT", {}).get("syshost", ""),
"run_uuid": json_data.get("userT", {}).get("run_uuid", ""),
"exec_path": json_data.get("userT", {}).get("exec_path", ""),
"exec_type": json_data.get("userT", {}).get("exec_type", ""),
"cwd": json_data.get("userT", {}).get("cwd", ""),
"currentEpoch": json_data.get("userT", {}).get("currentEpoch", ""),
"start_date": json_data.get("userT", {}).get("start_date", ""),
"user": json_data.get("userT", {}).get("user", ""),
"execModify": json_data.get("userT", {}).get("execModify", ""),
"scheduler": json_data.get("userT", {}).get("scheduler", ""),
"account": json_data.get("userT", {}).get("account", ""),
"job_id": json_data.get("userT", {}).get("job_id", ""),
"queue": json_data.get("userT", {}).get("queue", ""),
"submit_host": json_data.get("userT", {}).get("submit_host", "")
}

# userDT is a dictionary containing runtime and task-related data
self.userDT = {
"start_time": json_data.get("userDT", {}).get("start_time", 0.0),
"end_time": json_data.get("userDT", {}).get("end_time", 0.0),
"run_time": json_data.get("userDT", {}).get("run_time", 0.0),
"probability": json_data.get("userDT", {}).get("probability", 1.0),
"num_tasks": json_data.get("userDT", {}).get("num_tasks", 1.0),
"num_gpus": json_data.get("userDT", {}).get("num_gpus", 0.0),
"exec_epoch": json_data.get("userDT", {}).get("exec_epoch", 0.0),
"num_threads": json_data.get("userDT", {}).get("num_threads", 1.0),
"num_cores": json_data.get("userDT", {}).get("num_cores", 1.0),
"num_nodes": json_data.get("userDT", {}).get("num_nodes", 1.0)
}

# XALT_measureT is a dictionary of measurement values for different steps in the process
self.XALT_measureT = json_data.get("XALT_measureT", {})

# XALT_qaT is a dictionary with QA-specific information
self.XALT_qaT = json_data.get("XALT_qaT", {})
self.xalt_run_uuid = self.userT['run_uuid']

def __repr__(self):
return (
f"RunObj(crc={self.crc}, cmdlineA={self.cmdlineA}, hash_id={self.hash_id}, "
f"libA={self.libA}, ptA={self.ptA}, envT={self.envT}, userT={self.userT}, "
f"userDT={self.userDT}, XALT_measureT={self.XALT_measureT}, XALT_qaT={self.XALT_qaT})"
)
141 changes: 141 additions & 0 deletions ncsa_build/orm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import sys
import os
import re
from pathlib import Path
import mariadb
import json
from ClassHelper import PkgObj, LinkObj, RunObj

def connectDB(username, password, hostname, dbname):
"""
Connect to the MariaDB database using provided credentials.
"""
try:
conn = mariadb.connect(
user=username,
password=password,
host=hostname,
port=3306, # Use integer here
database=dbname
)
print("MariaDB connection established.")
return conn
except mariadb.Error as e:
print(f"Error connecting to MariaDB: {e}")
sys.exit(1)

def createLogList():
"""
Create a list of log files based on the XALT_FILE_PREFIX environment variable.
"""
xalt_log_dir = os.getenv("XALT_FILE_PREFIX")
if xalt_log_dir is None:
print("Error: XALT_FILE_PREFIX environment variable not set.")
sys.exit(1)
xalt_logs = list(Path(xalt_log_dir).rglob('*delta*.json'))
return xalt_logs

def splitLogs(file_paths):
run_files = []
link_files = []
pkg_files = []

# Updated patterns
run_pattern = re.compile(r"^run\.delta")
link_pattern = re.compile(r"^link\.delta")
pkg_pattern = re.compile(r"^pkg\.delta")

for file_path in file_paths:
file_name = Path(file_path).name

if run_pattern.match(file_name):
run_files.append(file_path)
elif link_pattern.match(file_name):
link_files.append(file_path)
elif pkg_pattern.match(file_name):
pkg_files.append(file_path)
else:
print(f"No match: {file_name}")

return run_files, link_files, pkg_files



def ingestPkgRecords(pkg_paths):
pkg_dict = {}

for pkg in pkg_paths:
with open(pkg, 'r') as file:
data = json.load(file)
obj = PkgObj(data)

if obj.xalt_run_uuid in pkg_dict:
pkg_dict[obj.xalt_run_uuid].append(obj)
else:
pkg_dict[obj.xalt_run_uuid] = [obj] # Initialize as an array to bundle pkgs for the same run record

return pkg_dict


def ingestLinkRecords(link_paths):
link_dict = {}
for link in link_paths:
print(link)
with open(link, 'r') as file:
data = json.load(file)
obj = LinkObj(data)

if obj.xalt_run_uuid in link_dict:
link_dict[obj.xalt_run_uuid].append(obj)
else:
link_dict[obj.xalt_run_uuid] = [obj] # Initialize as an array to bundle links for the same run record

return link_dict

def ingestRunRecords(run_paths):
run_dict = {}

for run in run_paths:
with open(run, 'r') as file:
data = json.load(file)
obj = RunObj(data)

if obj.xalt_run_uuid in run_dict:
run_dict[obj.xalt_run_uuid].append(obj)
else:
run_dict[obj.xalt_run_uuid] = [obj] # Initialize as an array to bundle runs for the same run record

return run_dict


def main():
# Validate command-line arguments
if len(sys.argv) < 5:
print("Usage: script.py <username> <password> <hostname> <dbname>")
sys.exit(1)

# Parse command-line arguments
username, password, hostname, dbname = sys.argv[1:5]

# Fetch log files
logfiles = createLogList()
print(f"Found {len(logfiles)} log files to process.")
# Perform operations with `conn` and `logfiles` as needed.

run, link, pkg = splitLogs(logfiles)
# Establish database connection
# conn = connectDB(username, password, hostname, dbname)

link_dict = ingestLinkRecords(link)
pkg_dict = ingestPkgRecords(pkg)
run_dict = ingestRunRecords(run)



"""
Now we can index into records using the RUN UUID.
The general ingestion workflow can go like this
"""

if __name__ == "__main__":
main()
Loading