From ce52b324ef9ea1707c1c96f0f96a5735e6f842f0 Mon Sep 17 00:00:00 2001 From: Anthony Yip Date: Fri, 21 Mar 2025 15:26:21 -0400 Subject: [PATCH 01/22] Fixed the change by making the retries attribute private (and also the assigned attribute), improving logging surrounding retries and giving up --- jobQueue.py | 4 +-- tangoObjects.py | 59 +++++++++++++++++++++++++++++++-------------- vmms/localDocker.py | 2 +- worker.py | 14 ++++++++++- 4 files changed, 57 insertions(+), 22 deletions(-) diff --git a/jobQueue.py b/jobQueue.py index a583b3f1..1b0612e8 100644 --- a/jobQueue.py +++ b/jobQueue.py @@ -281,9 +281,9 @@ def unassignJob(self, jobId): # Increment the number of retires if job.retries is None: - job.retries = 0 + job.resetRetries() else: - job.retries += 1 + job.incrementRetries() Config.job_retries += 1 self.log.info("unassignJob|Unassigning job %s" % str(job.id)) diff --git a/tangoObjects.py b/tangoObjects.py index 03c33c58..83da888f 100644 --- a/tangoObjects.py +++ b/tangoObjects.py @@ -102,8 +102,8 @@ def __init__( disableNetwork=None, stopBefore="", ): - self.assigned = False - self.retries = 0 + self._assigned = False + self._retries = 0 self.vm = vm if input is None: @@ -126,10 +126,31 @@ def __init__( def __repr__(self): self.syncRemote() return f"ID: {self.id} - Name: {self.name}" + + # Getters for private variables + @property + def assigned(self): + self.syncRemote() # Is it necessary to sync here? + return self._assigned + + @property + def retries(self): + self.syncRemote() + return self._retries def makeAssigned(self): self.syncRemote() - self.assigned = True + self._assigned = True + self.updateRemote() + + def resetRetries(self): + self.syncRemote() + self._retries = 0 + self.updateRemote() + + def incrementRetries(self): + self.syncRemote() + self._retries += 1 self.updateRemote() def makeVM(self, vm): @@ -139,12 +160,12 @@ def makeVM(self, vm): def makeUnassigned(self): self.syncRemote() - self.assigned = False + self._assigned = False self.updateRemote() def isNotAssigned(self): self.syncRemote() - return not self.assigned + return not self._assigned def appendTrace(self, trace_str): self.syncRemote() @@ -161,13 +182,27 @@ def setId(self, new_id): self._remoteLocation = dict_hash + ":" + str(new_id) self.updateRemote() + # Private method + def __updateSelf(self, other_job): + self._assigned = other_job._assigned + self._retries = other_job._retries + self.vm = other_job.vm + self.input = other_job.input + self.outputFile = other_job.outputFile + self.name = other_job.name + self.notifyURL = other_job.notifyURL + self.timeout = other_job.timeout + self.trace = other_job.trace + self.maxOutputFileSize = other_job.maxOutputFileSize + + def syncRemote(self): if Config.USE_REDIS and self._remoteLocation is not None: dict_hash = self._remoteLocation.split(":")[0] key = self._remoteLocation.split(":")[1] dictionary = TangoDictionary(dict_hash) temp_job = dictionary.get(key) - self.updateSelf(temp_job) + self.__updateSelf(temp_job) def updateRemote(self): if Config.USE_REDIS and self._remoteLocation is not None: @@ -176,18 +211,6 @@ def updateRemote(self): dictionary = TangoDictionary(dict_hash) dictionary.set(key, self) - def updateSelf(self, other_job): - self.assigned = other_job.assigned - self.retries = other_job.retries - self.vm = other_job.vm - self.input = other_job.input - self.outputFile = other_job.outputFile - self.name = other_job.name - self.notifyURL = other_job.notifyURL - self.timeout = other_job.timeout - self.trace = other_job.trace - self.maxOutputFileSize = other_job.maxOutputFileSize - def TangoIntValue(object_name, obj): if Config.USE_REDIS: diff --git a/vmms/localDocker.py b/vmms/localDocker.py index 45dda03d..1a1e1dd0 100644 --- a/vmms/localDocker.py +++ b/vmms/localDocker.py @@ -123,7 +123,7 @@ def waitVM(self, vm, max_secs): """waitVM - Nothing to do for waitVM""" return - def copyIn(self, vm, inputFiles): + def copyIn(self, vm, inputFiles, job_id=None): """copyIn - Create a directory to be mounted as a volume for the docker containers. Copy input files to this directory. """ diff --git a/worker.py b/worker.py index 0c6b6614..f06e1f10 100644 --- a/worker.py +++ b/worker.py @@ -77,6 +77,11 @@ def rescheduleJob(self, hdrfile, ret, err): # Try a few times before giving up if self.job.retries < Config.JOB_RETRIES: + self.log.error("Retrying job %s:%d, retries: %d" % (self.job.name, self.job.id, self.job.retries)) + self.job.appendTrace( + "%s|Retrying job %s:%d, retries: %d" + % (datetime.now().ctime(), self.job.name, self.job.id, self.job.retries) + ) try: os.remove(hdrfile) except OSError: @@ -86,7 +91,12 @@ def rescheduleJob(self, hdrfile, ret, err): # Here is where we give up else: - self.jobQueue.makeDead(self.job.id, err) + self.log.error("Giving up on job %s:%d" % (self.job.name, self.job.id)) + self.job.appendTrace( + "%s|Giving up on job %s:%d" + % (datetime.now().ctime(), self.job.name, self.job.id) + ) + self.jobQueue.makeDead(self.job.id, err) # Note: this reports the error that caused the last call to rescheduleJob to fail self.appendMsg( hdrfile, @@ -149,6 +159,8 @@ def notifyServer(self, job): % (job.notifyURL, response.content) ) fh.close() + else: + self.log.info("No callback URL for job %s:%d" % (self.job.name, self.job.id)) except Exception as e: self.log.debug("Error in notifyServer: %s" % str(e)) From 28db9e5ef0a13f9ef22a6d7896c4ddb2404f8845 Mon Sep 17 00:00:00 2001 From: Anthony Yip Date: Sun, 23 Mar 2025 13:43:50 -0400 Subject: [PATCH 02/22] Made all of tangoJob's attributes read only --- tango.py | 2 +- tangoObjects.py | 119 ++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 96 insertions(+), 25 deletions(-) diff --git a/tango.py b/tango.py index 1a03ed0d..e487481a 100755 --- a/tango.py +++ b/tango.py @@ -399,7 +399,7 @@ def __validateJob(self, job, vmms): "validateJob: Setting job.timeout to" " default config value: %d secs", Config.RUNJOB_TIMEOUT, ) - job.timeout = Config.RUNJOB_TIMEOUT + job.setTimeout(Config.RUNJOB_TIMEOUT) # Any problems, return an error status if errors > 0: diff --git a/tangoObjects.py b/tangoObjects.py index 83da888f..dcd97c67 100644 --- a/tangoObjects.py +++ b/tangoObjects.py @@ -105,23 +105,23 @@ def __init__( self._assigned = False self._retries = 0 - self.vm = vm + self._vm = vm if input is None: - self.input = [] + self._input = [] else: - self.input = input - - self.outputFile = outputFile - self.name = name - self.notifyURL = notifyURL - self.timeout = timeout - self.trace = [] - self.maxOutputFileSize = maxOutputFileSize + self._input = input + + self._outputFile = outputFile + self._name = name + self._notifyURL = notifyURL + self._timeout = timeout # How long to run the autodriver on the job for before timing out. + self._trace = [] + self._maxOutputFileSize = maxOutputFileSize self._remoteLocation = None - self.accessKeyId = accessKeyId - self.accessKey = accessKey - self.disableNetwork = disableNetwork - self.stopBefore = "stopBefore" + self._accessKeyId = accessKeyId + self._accessKey = accessKey + self._disableNetwork = disableNetwork + self._stopBefore = "stopBefore" def __repr__(self): self.syncRemote() @@ -138,6 +138,72 @@ def retries(self): self.syncRemote() return self._retries + @property + def vm(self): + self.syncRemote() + return self._vm + + @property + def input(self): + self.syncRemote() + return self._input + + @property + def outputFile(self): + self.syncRemote() + return self._outputFile + + @property + def name(self): + self.syncRemote() + return self._name + + @property + def notifyURL(self): + self.syncRemote() + return self._notifyURL + + @property + def timeout(self): + self.syncRemote() + return self._timeout + + @property + def trace(self): + self.syncRemote() + return self._trace + + @property + def maxOutputFileSize(self): + self.syncRemote() + return self._maxOutputFileSize + + @property + def remoteLocation(self): + self.syncRemote() + return self._remoteLocation + + @property + def accessKeyId(self): + self.syncRemote() + return self._accessKeyId + + @property + def accessKey(self): + self.syncRemote() + return self._accessKey + + @property + def disableNetwork(self): + self.syncRemote() + return self._disableNetwork + + @property + def stopBefore(self): + self.syncRemote() + return self._stopBefore + + def makeAssigned(self): self.syncRemote() self._assigned = True @@ -155,7 +221,7 @@ def incrementRetries(self): def makeVM(self, vm): self.syncRemote() - self.vm = vm + self._vm = vm self.updateRemote() def makeUnassigned(self): @@ -169,7 +235,7 @@ def isNotAssigned(self): def appendTrace(self, trace_str): self.syncRemote() - self.trace.append(trace_str) + self._trace.append(trace_str) self.updateRemote() def setId(self, new_id): @@ -181,19 +247,24 @@ def setId(self, new_id): dictionary.delete(key) self._remoteLocation = dict_hash + ":" + str(new_id) self.updateRemote() + + def setTimeout(self, new_timeout): + self.syncRemote() + self._timeout = new_timeout + self.updateRemote() # Private method def __updateSelf(self, other_job): self._assigned = other_job._assigned self._retries = other_job._retries - self.vm = other_job.vm - self.input = other_job.input - self.outputFile = other_job.outputFile - self.name = other_job.name - self.notifyURL = other_job.notifyURL - self.timeout = other_job.timeout - self.trace = other_job.trace - self.maxOutputFileSize = other_job.maxOutputFileSize + self._vm = other_job._vm + self._input = other_job._input + self._outputFile = other_job._outputFile + self._name = other_job._name + self._notifyURL = other_job._notifyURL + self._timeout = other_job._timeout + self._trace = other_job._trace + self._maxOutputFileSize = other_job._maxOutputFileSize def syncRemote(self): From 9d57597555ed9ee4aa2fd0415c166ef7c9300401 Mon Sep 17 00:00:00 2001 From: Anthony Yip Date: Sun, 24 Aug 2025 13:00:51 -0400 Subject: [PATCH 03/22] fixed the local/remote desyncing issues, left some print statements in there --- jobQueue.py | 40 ++++++++++++++++++++++++---------------- tango.py | 1 + tangoObjects.py | 7 +++++-- worker.py | 13 +++++++------ 4 files changed, 37 insertions(+), 24 deletions(-) diff --git a/jobQueue.py b/jobQueue.py index 1b0612e8..707b4e3f 100644 --- a/jobQueue.py +++ b/jobQueue.py @@ -136,7 +136,7 @@ def add(self, job): # Since we assume that the job is new, we set the number of retries # of this job to 0 - job.retries = 0 + assert(job.retries == 0) # Add the job to the queue. Careful not to append the trace until we # know the job has actually been added to the queue. @@ -297,28 +297,36 @@ def unassignJob(self, jobId): self.queueLock.release() self.log.debug("unassignJob| Released lock to job queue.") - def makeDead(self, id, reason): + def makeDead(self, job: TangoJob, reason): """makeDead - move a job from live queue to dead queue""" - self.log.info("makeDead| Making dead job ID: " + str(id)) + self.log.info("makeDead| Making dead job ID: " + str(job.id)) self.queueLock.acquire() self.log.debug("makeDead| Acquired lock to job queue.") status = -1 # Check to make sure that the job is in the live jobs queue - if id in self.liveJobs: - self.log.info("makeDead| Found job ID: %s in the live queue" % (id)) - status = 0 - job = self.liveJobs.get(id) - self.log.info("Terminated job %s:%s: %s" % (job.name, job.id, reason)) - # Add the job to the dead jobs dictionary - self.deadJobs.set(id, job) - # Remove the job from the live jobs dictionary - self.liveJobs.delete(id) + if job.id not in self.liveJobs: + self.log.error("makeDead| Job ID: %s not found in live jobs" % (job.id)) + return -1 + + self.log.info("makeDead| Found job ID: %s in the live queue" % (job.id)) + status = 0 + self.log.info("Terminated job %s:%s: %s" % (job.name, job.id, reason)) + + print(f"Anthony: {job.id} has remote location {job._remoteLocation} before swapping") + + # Add the job to the dead jobs dictionary + self.deadJobs.set(job.id, job) + # Remove the job from the live jobs dictionary + self.liveJobs.delete(job.id) - # unassign, remove from unassigned jobs queue - job.makeUnassigned() - self.unassignedJobs.remove(int(id)) + print(f"Anthony: {job.id} has remote location {job._remoteLocation} after swapping") + print(f"Anthony: {job.id} has remote location {job._remoteLocation} after syncing") + + # unassign, remove from unassigned jobs queue + job.makeUnassigned() + self.unassignedJobs.remove(int(job.id)) - job.appendTrace("%s|%s" % (datetime.utcnow().ctime(), reason)) + job.appendTrace("%s|%s" % (datetime.utcnow().ctime(), reason)) self.queueLock.release() self.log.debug("makeDead| Released lock to job queue.") return status diff --git a/tango.py b/tango.py index e487481a..8841a38c 100755 --- a/tango.py +++ b/tango.py @@ -278,6 +278,7 @@ def resetTango(self, vmms): self.log.error("resetTango: Call to VMMS %s failed: %s" % (vmms_name, err)) os._exit(1) + # Returns 0 if the job is valid, -1 if the job is invalid def __validateJob(self, job, vmms): """validateJob - validate the input arguments in an addJob request.""" errors = 0 diff --git a/tangoObjects.py b/tangoObjects.py index dcd97c67..1a0156d0 100644 --- a/tangoObjects.py +++ b/tangoObjects.py @@ -272,7 +272,10 @@ def syncRemote(self): dict_hash = self._remoteLocation.split(":")[0] key = self._remoteLocation.split(":")[1] dictionary = TangoDictionary(dict_hash) - temp_job = dictionary.get(key) + temp_job = dictionary.get(key) # Key should be in dictionary + if temp_job is None: + print(f"Job {key} not found in dictionary {dict_hash}") # TODO: add better error handling for TangoJob + return self.__updateSelf(temp_job) def updateRemote(self): @@ -446,7 +449,7 @@ def set(self, id, obj): pickled_obj = pickle.dumps(obj) if hasattr(obj, "_remoteLocation"): - obj._remoteLocation = self.hash_name + ":" + str(id) + obj._remoteLocation = self.hash_name + ":" + str(id) # TODO: don't violate the encapsulation of TangoJob self.r.hset(self.hash_name, str(id), pickled_obj) return str(id) diff --git a/worker.py b/worker.py index f06e1f10..1dd8d8e7 100644 --- a/worker.py +++ b/worker.py @@ -13,7 +13,7 @@ from datetime import datetime from config import Config - +from jobQueue import JobQueue # # Worker - The worker class is very simple and very dumb. The goal is # to walk through the VMMS interface, track the job's progress, and if @@ -32,7 +32,7 @@ def __init__(self, job, vmms, jobQueue, preallocator, preVM): self.daemon = True self.job = job self.vmms = vmms - self.jobQueue = jobQueue + self.jobQueue : JobQueue = jobQueue self.preallocator = preallocator self.preVM = preVM threading.Thread.__init__(self) @@ -64,6 +64,7 @@ def detachVM(self, return_vm=False, replace_vm=False): # pool is empty and creates a spurious vm. self.preallocator.removeVM(self.job.vm) + # TODO: figure out what hdrfile, ret and err are def rescheduleJob(self, hdrfile, ret, err): """rescheduleJob - Reschedule a job that has failed because of a system error, such as a VM timing out or a connection @@ -96,7 +97,7 @@ def rescheduleJob(self, hdrfile, ret, err): "%s|Giving up on job %s:%d" % (datetime.now().ctime(), self.job.name, self.job.id) ) - self.jobQueue.makeDead(self.job.id, err) # Note: this reports the error that caused the last call to rescheduleJob to fail + self.jobQueue.makeDead(self.job, err) # Note: this reports the error that caused the last call to rescheduleJob to fail self.appendMsg( hdrfile, @@ -165,13 +166,13 @@ def notifyServer(self, job): self.log.debug("Error in notifyServer: %s" % str(e)) def afterJobExecution(self, hdrfile, msg, returnVM, killVM=True): - self.jobQueue.makeDead(self.job.id, msg) - + self.jobQueue.makeDead(self.job, msg) + # Update the text that users see in the autodriver output file self.appendMsg(hdrfile, msg) self.catFiles(hdrfile, self.job.outputFile) os.chmod(self.job.outputFile, 0o644) - + # Thread exit after termination if killVM: self.detachVM(return_vm=returnVM) From 3bd80cf3bbb3a7c0eff7992b3ffeb0bedac7ef03 Mon Sep 17 00:00:00 2001 From: Anthony Yip Date: Sun, 24 Aug 2025 17:06:50 -0400 Subject: [PATCH 04/22] Better encapsulation of TangoJob._remoteLocation --- jobQueue.py | 15 +++++---------- tangoObjects.py | 17 +++++++++++++---- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/jobQueue.py b/jobQueue.py index 707b4e3f..ae23d4f9 100644 --- a/jobQueue.py +++ b/jobQueue.py @@ -145,7 +145,7 @@ def add(self, job): self.log.debug("add| Acquired lock to job queue.") # Adds the job to the live jobs dictionary - self.liveJobs.set(job.id, job) + job.addToDict(self.liveJobs) # Add this to the unassigned job queue too self.unassignedJobs.put(int(job.id)) @@ -194,7 +194,7 @@ def addDead(self, job): self.log.debug("addDead|Acquired lock to job queue.") # We add the job into the dead jobs dictionary - self.deadJobs.set(job.id, job) + job.addToDict(self.deadJobs) self.queueLock.release() self.log.debug("addDead|Released lock to job queue.") @@ -312,15 +312,10 @@ def makeDead(self, job: TangoJob, reason): status = 0 self.log.info("Terminated job %s:%s: %s" % (job.name, job.id, reason)) - print(f"Anthony: {job.id} has remote location {job._remoteLocation} before swapping") - - # Add the job to the dead jobs dictionary - self.deadJobs.set(job.id, job) # Remove the job from the live jobs dictionary - self.liveJobs.delete(job.id) - - print(f"Anthony: {job.id} has remote location {job._remoteLocation} after swapping") - print(f"Anthony: {job.id} has remote location {job._remoteLocation} after syncing") + job.deleteFromDict(self.liveJobs) + # Add the job to the dead jobs dictionary + job.addToDict(self.deadJobs) # unassign, remove from unassigned jobs queue job.makeUnassigned() diff --git a/tangoObjects.py b/tangoObjects.py index 1a0156d0..9dcdf5fe 100644 --- a/tangoObjects.py +++ b/tangoObjects.py @@ -1,3 +1,4 @@ +from __future__ import annotations # tangoREST.py # # Implements objects used to pass state within Tango. @@ -284,6 +285,18 @@ def updateRemote(self): key = self._remoteLocation.split(":")[1] dictionary = TangoDictionary(dict_hash) dictionary.set(key, self) + + def deleteFromDict(self, dictionary : TangoDictionary) -> None: + dictionary.delete(self.id) + self._remoteLocation = None + + def addToDict(self, dictionary : TangoDictionary) -> None: + dictionary.set(self.id, self) + assert self._remoteLocation is None, "Job already has a remote location" + if Config.USE_REDIS: + self._remoteLocation = dictionary.hash_name + ":" + str(self.id) + self.updateRemote() + def TangoIntValue(object_name, obj): @@ -448,9 +461,6 @@ def __contains__(self, id): def set(self, id, obj): pickled_obj = pickle.dumps(obj) - if hasattr(obj, "_remoteLocation"): - obj._remoteLocation = self.hash_name + ":" + str(id) # TODO: don't violate the encapsulation of TangoJob - self.r.hset(self.hash_name, str(id), pickled_obj) return str(id) @@ -474,7 +484,6 @@ def values(self): return valslist def delete(self, id): - self._remoteLocation = None self.r.hdel(self.hash_name, id) def _clean(self): From f0c0d3262bf3fc0327d0198776808cefb16370dd Mon Sep 17 00:00:00 2001 From: Anthony Yip Date: Sun, 24 Aug 2025 19:01:26 -0400 Subject: [PATCH 05/22] Fixing uses of makeDead that was changed 2 commits ago --- jobManager.py | 4 ++-- jobQueue.py | 4 +++- tangoObjects.py | 10 ++++++++++ tests/testJobQueue.py | 2 +- 4 files changed, 16 insertions(+), 4 deletions(-) diff --git a/jobManager.py b/jobManager.py index 2be2147f..1010827c 100644 --- a/jobManager.py +++ b/jobManager.py @@ -28,7 +28,7 @@ class JobManager(object): def __init__(self, queue): self.daemon = True - self.jobQueue = queue + self.jobQueue: JobQueue = queue self.preallocator = self.jobQueue.preallocator self.vmms = self.preallocator.vmms self.log = logging.getLogger("JobManager") @@ -124,7 +124,7 @@ def __manage(self): self.log.info("job_manager: job is None") else: self.log.error("job failed during creation %d %s" % (job.id, str(err))) - self.jobQueue.makeDead(job.id, str(err)) + self.jobQueue.makeDead(job, str(err)) if __name__ == "__main__": diff --git a/jobQueue.py b/jobQueue.py index ae23d4f9..e1c35cec 100644 --- a/jobQueue.py +++ b/jobQueue.py @@ -209,6 +209,7 @@ def delJob(self, id, deadjob): """ status = -1 if deadjob == 0: + temp_job = self.liveJobs.getExn(id) try: # Remove the job from the unassigned live jobs queue, if it # is yet to be assigned. @@ -218,7 +219,7 @@ def delJob(self, id, deadjob): self.log.info("delJob | Job ID %s was already assigned" % (id)) return status - return self.makeDead(id, "Requested by operator") + return self.makeDead(temp_job, "Requested by operator") else: self.queueLock.acquire() self.log.debug("delJob| Acquired lock to job queue.") @@ -297,6 +298,7 @@ def unassignJob(self, jobId): self.queueLock.release() self.log.debug("unassignJob| Released lock to job queue.") + # Takes in a job in order to switch its remote location, in which you can't rely on syncing def makeDead(self, job: TangoJob, reason): """makeDead - move a job from live queue to dead queue""" self.log.info("makeDead| Making dead job ID: " + str(job.id)) diff --git a/tangoObjects.py b/tangoObjects.py index 9dcdf5fe..3e57dccb 100644 --- a/tangoObjects.py +++ b/tangoObjects.py @@ -472,6 +472,11 @@ def get(self, id): else: return None + def getExn(self, id): + job = self.get(id) + assert job is not None, f"ID {id} does not exist in this remote dictionary" + return job + def keys(self): keys = map(lambda key: key.decode(), self.r.hkeys(self.hash_name)) return list(keys) @@ -519,6 +524,11 @@ def get(self, id): else: return None + def getExn(self, id): + job = self.get(id) + assert job is not None, f"ID {id} does not exist in this native dictionary" + return job + def keys(self): return list(self.dict.keys()) diff --git a/tests/testJobQueue.py b/tests/testJobQueue.py index 00861d23..27fd3d4f 100644 --- a/tests/testJobQueue.py +++ b/tests/testJobQueue.py @@ -130,7 +130,7 @@ def test_makeDead(self): info = self.jobQueue.getInfo() self.assertEqual(info["size_deadjobs"], 0) self.assertEqual(info["size_unassignedjobs"], 2) - self.jobQueue.makeDead(self.jobId1, "test") + self.jobQueue.makeDead(self.job1, "test") info = self.jobQueue.getInfo() self.assertEqual(info["size_deadjobs"], 1) self.assertEqual(info["size_unassignedjobs"], 1) From ba83509eb984a52f51346992840887b60b1a2e04 Mon Sep 17 00:00:00 2001 From: Anthony Yip Date: Sat, 30 Aug 2025 18:25:38 -0400 Subject: [PATCH 06/22] comments, logging and todos --- clients/tango-cli.py | 2 +- jobManager.py | 4 ++++ jobQueue.py | 4 ++++ tangoObjects.py | 3 ++- vmms/ec2SSH.py | 1 + 5 files changed, 12 insertions(+), 2 deletions(-) diff --git a/clients/tango-cli.py b/clients/tango-cli.py index d9f5ab8d..f060bd1d 100755 --- a/clients/tango-cli.py +++ b/clients/tango-cli.py @@ -131,7 +131,7 @@ parser.add_argument("--accessKeyId", default="", help="AWS account access key ID") parser.add_argument("--accessKey", default="", help="AWS account access key content") parser.add_argument("--instanceType", default="", help="AWS EC2 instance type") -parser.add_argument("--ec2", action="store_true", help="Enable ec2SSH VMMS") +parser.add_argument("--ec2", action="store_true", help="Enable ec2SSH VMMS") # TODO: shouldn't this be taken from config.py? parser.add_argument("--stopBefore", default="", help="Stops the worker before a function is executed") def checkKey(): diff --git a/jobManager.py b/jobManager.py index 59c1ebad..7f254cce 100644 --- a/jobManager.py +++ b/jobManager.py @@ -19,10 +19,12 @@ import tango # Written this way to avoid circular imports from config import Config +from jobQueue import JobQueue from tangoObjects import TangoQueue from worker import Worker + class JobManager(object): def __init__(self, queue): self.daemon = True @@ -63,6 +65,7 @@ def __manage(self): # Blocks until we get a next job job = self.jobQueue.getNextPendingJob() if not job.accessKey and Config.REUSE_VMS: + self.log.info(f"job has access key {job.accessKey} and is calling reuseVM") vm = None while vm is None: vm = self.jobQueue.reuseVM(job) @@ -89,6 +92,7 @@ def __manage(self): "EC2 SSH VM initialization failed: see log" ) else: + self.log.info(f"job {job.id} is not an ec2 vmms job") # Try to find a vm on the free list and allocate it to # the worker if successful. if Config.REUSE_VMS: diff --git a/jobQueue.py b/jobQueue.py index e1c35cec..45dd40ac 100644 --- a/jobQueue.py +++ b/jobQueue.py @@ -246,6 +246,8 @@ def get(self, id): self.log.debug("get| Released lock to job queue.") return job + # TODO: this function is a little weird. It sets the state of job to be "assigned", but not to which worker. + # TODO: It does assign the job to a particular VM though. def assignJob(self, jobId, vm=None): """assignJob - marks a job to be assigned""" self.queueLock.acquire() @@ -389,3 +391,5 @@ def reuseVM(self, job): return job.vm else: raise Exception("Job assigned without vm") + + diff --git a/tangoObjects.py b/tangoObjects.py index 3e57dccb..64c5f309 100644 --- a/tangoObjects.py +++ b/tangoObjects.py @@ -7,6 +7,7 @@ from queue import Queue import pickle import redis +from typing import Optional redisConnection = None @@ -91,7 +92,7 @@ class TangoJob(object): def __init__( self, - vm=None, + vm: Optional[TangoMachine] = None, outputFile=None, name=None, input=None, diff --git a/vmms/ec2SSH.py b/vmms/ec2SSH.py index b2621192..318b6d49 100644 --- a/vmms/ec2SSH.py +++ b/vmms/ec2SSH.py @@ -162,6 +162,7 @@ def release_vm_semaphore(): """Releases the VM sempahore""" Ec2SSH._vm_semaphore.release() + # TODO: the arguments accessKeyId and accessKey don't do anything def __init__(self, accessKeyId=None, accessKey=None): """log - logger for the instance connection - EC2Connection object that stores the connection From 600803ccd43980c2a2617a6435272a38f19c7322 Mon Sep 17 00:00:00 2001 From: Anthony Yip Date: Mon, 15 Sep 2025 22:06:33 -0400 Subject: [PATCH 07/22] Code to enable spot instances --- jobManager.py | 4 ++-- jobQueue.py | 2 +- vmms/ec2SSH.py | 24 +++++++++++++++++------- worker.py | 3 ++- 4 files changed, 22 insertions(+), 11 deletions(-) diff --git a/jobManager.py b/jobManager.py index 7f254cce..df54dec8 100644 --- a/jobManager.py +++ b/jobManager.py @@ -20,7 +20,7 @@ import tango # Written this way to avoid circular imports from config import Config from jobQueue import JobQueue -from tangoObjects import TangoQueue +from tangoObjects import TangoJob, TangoQueue from worker import Worker @@ -63,7 +63,7 @@ def __manage(self): self.running = True while True: # Blocks until we get a next job - job = self.jobQueue.getNextPendingJob() + job: TangoJob = self.jobQueue.getNextPendingJob() if not job.accessKey and Config.REUSE_VMS: self.log.info(f"job has access key {job.accessKey} and is calling reuseVM") vm = None diff --git a/jobQueue.py b/jobQueue.py index 45dd40ac..e3dc4256 100644 --- a/jobQueue.py +++ b/jobQueue.py @@ -347,7 +347,7 @@ def reset(self): self.deadJobs._clean() self.unassignedJobs._clean() - def getNextPendingJob(self): + def getNextPendingJob(self) -> TangoJob: """Gets the next unassigned live job. Note that this is a blocking function and we will block till there is an available job. diff --git a/vmms/ec2SSH.py b/vmms/ec2SSH.py index 318b6d49..1e21da79 100644 --- a/vmms/ec2SSH.py +++ b/vmms/ec2SSH.py @@ -22,6 +22,7 @@ import config from tangoObjects import TangoMachine +from typing import Optional # suppress most boto logging logging.getLogger("boto3").setLevel(logging.CRITICAL) @@ -192,7 +193,7 @@ def __init__(self, accessKeyId=None, accessKey=None): self.img2ami = {} self.images = [] try: - self.boto3resource = boto3.resource("ec2", config.Config.EC2_REGION) + self.boto3resource: boto3.EC2.ServiceResource = boto3.resource("ec2", config.Config.EC2_REGION) self.boto3client = boto3.client("ec2", config.Config.EC2_REGION) # Get images from ec2 @@ -248,7 +249,7 @@ def domainName(self, vm): # VMMS helper methods # - def tangoMachineToEC2Instance(self, vm: TangoMachine): + def tangoMachineToEC2Instance(self, vm: TangoMachine) -> dict: """tangoMachineToEC2Instance - returns an object with EC2 instance type and AMI. Only general-purpose instances are used. Defalt AMI is currently used. @@ -328,7 +329,7 @@ def createSecurityGroup(self): # # VMMS API functions # - def initializeVM(self, vm): + def initializeVM(self, vm: TangoMachine) -> Optional[TangoMachine]: """initializeVM - Tell EC2 to create a new VM instance. Returns a boto.ec2.instance.Instance object. @@ -350,13 +351,21 @@ def initializeVM(self, vm): self.key_pair_name = self.keyPairName(vm.id, vm.name) self.createKeyPair() - reservation = self.boto3resource.create_instances( + instance_market_options = { + "MarketType": "spot", + "SpotOptions": { + "SpotInstanceType": "one-time", + "InstanceInterruptionBehavior": "terminate" + } + } + reservation: list[boto3.ec2.Instance] = self.boto3resource.create_instances( ImageId=ec2instance["ami"], KeyName=self.key_pair_name, SecurityGroups=[config.Config.DEFAULT_SECURITY_GROUP], InstanceType=ec2instance["instance_type"], MaxCount=1, MinCount=1, + InstanceMarketOptions=instance_market_options, ) # Sleep for a while to prevent random transient errors observed @@ -365,8 +374,9 @@ def initializeVM(self, vm): # reservation is a list of instances created. there is only # one instance created so get index 0. - newInstance = reservation[0] + newInstance: boto3.ec2.Instance = reservation[0] if not newInstance: + # TODO: when does this happen? raise ValueError("Cannot find new instance for %s" % vm.name) # Wait for instance to reach 'running' state @@ -436,7 +446,7 @@ def initializeVM(self, vm): self.log.debug("initializeVM Failed: %s" % e) # if the new instance exists, terminate it - if newInstance: + if newInstance is not None: try: self.boto3resource.instances.filter( InstanceIds=[newInstance.id] @@ -448,7 +458,7 @@ def initializeVM(self, vm): return None return None - def waitVM(self, vm, max_secs): + def waitVM(self, vm, max_secs) -> 0 | -1: """waitVM - Wait at most max_secs for a VM to become ready. Return error if it takes too long. diff --git a/worker.py b/worker.py index 1dd8d8e7..59a17239 100644 --- a/worker.py +++ b/worker.py @@ -247,7 +247,7 @@ def run(self): ) # Host name returned from EC2 is stored in the vm object - self.vmms.initializeVM(self.job.vm) + self.vmms.initializeVM(self.job.vm) # TODO: This can return None if the step fails, check for that self.log.debug("Asigned job to a new VM") vm = self.job.vm @@ -322,6 +322,7 @@ def run(self): self.job.vm.notes = str(self.job.id) + "_" + self.job.name self.job.vm.keep_for_debugging = True self.afterJobExecution(hdrfile, msg, False) + # TODO: no reschedule? return self.log.info( From d4501129334b2bbce4506217d96cf48c7688db1b Mon Sep 17 00:00:00 2001 From: Anthony Yip Date: Tue, 16 Sep 2025 16:01:11 -0400 Subject: [PATCH 08/22] type annotations --- jobManager.py | 23 +++++------ jobQueue.py | 1 + mypy.ini | 5 +++ restful_tango/server.py | 6 +-- restful_tango/tangoREST.py | 5 --- tango.py | 6 +-- tangoObjects.py | 78 ++++++++++++++++++++++++++++++++------ tests/validate.py | 4 +- vmms/ec2SSH.py | 54 +++++++++++++------------- vmms/interface.py | 42 ++++++++++++++++++++ vmms/localDocker.py | 2 +- 11 files changed, 162 insertions(+), 64 deletions(-) create mode 100644 mypy.ini create mode 100644 vmms/interface.py diff --git a/jobManager.py b/jobManager.py index df54dec8..65d62116 100644 --- a/jobManager.py +++ b/jobManager.py @@ -26,7 +26,7 @@ class JobManager(object): - def __init__(self, queue): + def __init__(self, queue: JobQueue): self.daemon = True self.jobQueue: JobQueue = queue self.preallocator = self.jobQueue.preallocator @@ -36,7 +36,7 @@ def __init__(self, queue): self.nextId = 10000 self.running = False - def start(self): + def start(self) -> None: if self.running: return thread = threading.Thread(target=self.__manage) @@ -59,7 +59,7 @@ def _getNextID(self): self.nextId = 10000 return id - def __manage(self): + def __manage(self) -> None: self.running = True while True: # Blocks until we get a next job @@ -83,7 +83,8 @@ def __manage(self): newVM = copy.deepcopy(job.vm) newVM.id = self._getNextID() try: - preVM = vmms.initializeVM(newVM) + vmms.initializeVM(newVM) + preVM = newVM except Exception as e: self.log.error("ERROR initialization VM: %s", e) self.log.error(traceback.format_exc()) @@ -138,19 +139,19 @@ def __manage(self): JobManager" ) else: - tango = tango.TangoServer() - tango.log.debug("Resetting Tango VMs") - tango.resetTango(tango.preallocator.vmms) - for key in tango.preallocator.machines.keys(): - tango.preallocator.machines.set(key, [[], TangoQueue(key)]) + tango_server = tango.TangoServer() + tango_server.log.debug("Resetting Tango VMs") + tango_server.resetTango(tango_server.preallocator.vmms) + for key in tango_server.preallocator.machines.keys(): + tango_server.preallocator.machines.set(key, [[], TangoQueue(key)]) # The above call sets the total pool empty. But the free pool which # is a queue in redis, may not be empty. When the job manager restarts, # resetting the free queue using the key doesn't change its content. # Therefore we empty the queue, thus the free pool, to keep it consistent # with the total pool. - tango.preallocator.machines.get(key)[1].make_empty() - jobs = JobManager(tango.jobQueue) + tango_server.preallocator.machines.get(key)[1].make_empty() + jobs = JobManager(tango_server.jobQueue) print("Starting the stand-alone Tango JobManager") jobs.run() diff --git a/jobQueue.py b/jobQueue.py index e3dc4256..2d0a9934 100644 --- a/jobQueue.py +++ b/jobQueue.py @@ -270,6 +270,7 @@ def assignJob(self, jobId, vm=None): self.log.debug("assignJob| Released lock to job queue.") # return job + # TODO: Rename this job to be more accurate in its description def unassignJob(self, jobId): """unassignJob - marks a job to be unassigned Note: We assume here that a job is to be rescheduled or diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 00000000..db776d15 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,5 @@ +[mypy] +explicit_package_bases = True + +[mypy-vmms.tashiSSH] +ignore_errors = True \ No newline at end of file diff --git a/restful_tango/server.py b/restful_tango/server.py index 2eba21fc..b31f0e8b 100755 --- a/restful_tango/server.py +++ b/restful_tango/server.py @@ -9,13 +9,9 @@ import tornado.web from tempfile import NamedTemporaryFile -from tangoREST import TangoREST +from restful_tango.tangoREST import TangoREST import asyncio -currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) -parentdir = os.path.dirname(currentdir) -sys.path.insert(0, parentdir) - from config import Config tangoREST = TangoREST() diff --git a/restful_tango/tangoREST.py b/restful_tango/tangoREST.py index c60cf44d..90dbc300 100644 --- a/restful_tango/tangoREST.py +++ b/restful_tango/tangoREST.py @@ -6,16 +6,11 @@ import sys import os -import inspect import hashlib import json import logging import docker -currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) -parentdir = os.path.dirname(currentdir) -sys.path.insert(0, parentdir) - from config import Config from tangoObjects import TangoJob, TangoMachine, InputFile from tango import TangoServer diff --git a/tango.py b/tango.py index 8841a38c..3f94cd12 100755 --- a/tango.py +++ b/tango.py @@ -75,8 +75,8 @@ def __init__(self): vmms = DistDocker() - self.preallocator = Preallocator({Config.VMMS_NAME: vmms}) - self.jobQueue = JobQueue(self.preallocator) + self.preallocator: Preallocator = Preallocator({Config.VMMS_NAME: vmms}) + self.jobQueue: JobQueue = JobQueue(self.preallocator) if not Config.USE_REDIS: # creates a local Job Manager if there is no persistent # memory between processes. Otherwise, JobManager will @@ -89,7 +89,7 @@ def __init__(self): level=Config.LOGLEVEL, ) self.start_time = time.time() - self.log = logging.getLogger("TangoServer") + self.log: logging.Logger = logging.getLogger("TangoServer") self.log.info("Starting Tango server") def addJob(self, job): diff --git a/tangoObjects.py b/tangoObjects.py index 64c5f309..a5554e1e 100644 --- a/tangoObjects.py +++ b/tangoObjects.py @@ -7,7 +7,8 @@ from queue import Queue import pickle import redis -from typing import Optional +from typing import Optional, Protocol, TypeVar +from abc import abstractmethod redisConnection = None @@ -117,9 +118,9 @@ def __init__( self._name = name self._notifyURL = notifyURL self._timeout = timeout # How long to run the autodriver on the job for before timing out. - self._trace = [] + self._trace: list[str] = [] self._maxOutputFileSize = maxOutputFileSize - self._remoteLocation = None + self._remoteLocation: Optional[str] = None self._accessKeyId = accessKeyId self._accessKey = accessKey self._disableNetwork = disableNetwork @@ -245,7 +246,7 @@ def setId(self, new_id): if self._remoteLocation is not None: dict_hash = self._remoteLocation.split(":")[0] key = self._remoteLocation.split(":")[1] - dictionary = TangoDictionary(dict_hash) + dictionary = TangoDictionary.create(dict_hash) dictionary.delete(key) self._remoteLocation = dict_hash + ":" + str(new_id) self.updateRemote() @@ -438,23 +439,72 @@ def make_empty(self): if item is None: break +T = TypeVar('T') +class TangoDictionary(Protocol[T]): + + @staticmethod + def create(dictionary_name: str) -> TangoDictionary[T]: + if Config.USE_REDIS: + return TangoRemoteDictionary(dictionary_name) + else: + return TangoNativeDictionary() + + @property + @abstractmethod + def hash_name(self) -> str: + ... + + @abstractmethod + def __contains__(self, id: str) -> bool: + ... + @abstractmethod + def set(self, id: str, obj: T) -> str: + ... + @abstractmethod + def get(self, id: str) -> Optional[T]: + ... + @abstractmethod + def getExn(self, id: str) -> T: + ... + @abstractmethod + def keys(self) -> list[str]: + ... + @abstractmethod + def values(self) -> list[T]: + ... + @abstractmethod + def delete(self, id: str) -> None: + ... + @abstractmethod + def _clean(self) -> None: + ... + @abstractmethod + def items(self) -> list[tuple[str, T]]: + ... + # This is an abstract class that decides on # if we should initiate a TangoRemoteDictionary or TangoNativeDictionary # Since there are no abstract classes in Python, we use a simple method -def TangoDictionary(object_name): - if Config.USE_REDIS: - return TangoRemoteDictionary(object_name) - else: - return TangoNativeDictionary() +# def TangoDictionary(object_name): +# if Config.USE_REDIS: +# return TangoRemoteDictionary(object_name) +# else: +# return TangoNativeDictionary() + + -class TangoRemoteDictionary(object): +class TangoRemoteDictionary(TangoDictionary[T]): def __init__(self, object_name): self.r = getRedisConnection() - self.hash_name = object_name + self._hash_name = object_name + + @property + def hash_name(self) -> str: + return self._hash_name def __contains__(self, id): return self.r.hexists(self.hash_name, str(id)) @@ -506,10 +556,14 @@ def items(self): ) -class TangoNativeDictionary(object): +class TangoNativeDictionary(TangoDictionary[T]): def __init__(self): self.dict = {} + @property + def hash_name(self) -> str: + raise ValueError("TangoNativeDictionary does not have a hash name") + def __repr__(self): return str(self.dict) diff --git a/tests/validate.py b/tests/validate.py index 546db78b..fe0ab3bf 100644 --- a/tests/validate.py +++ b/tests/validate.py @@ -4,6 +4,8 @@ # directories. # +from typing import List + try: import pyflakes @@ -46,7 +48,7 @@ args = parser.parse_args() # Setup -skip_paths = [] +skip_paths: List[str] = [] # Stats file_count = 0 diff --git a/vmms/ec2SSH.py b/vmms/ec2SSH.py index 1e21da79..0e7297a8 100644 --- a/vmms/ec2SSH.py +++ b/vmms/ec2SSH.py @@ -22,7 +22,9 @@ import config from tangoObjects import TangoMachine -from typing import Optional +from typing import Optional, Literal, List +from mypy_boto3_ec2 import EC2ServiceResource +from mypy_boto3_ec2.service_resource import Instance # suppress most boto logging logging.getLogger("boto3").setLevel(logging.CRITICAL) @@ -193,7 +195,7 @@ def __init__(self, accessKeyId=None, accessKey=None): self.img2ami = {} self.images = [] try: - self.boto3resource: boto3.EC2.ServiceResource = boto3.resource("ec2", config.Config.EC2_REGION) + self.boto3resource: EC2ServiceResource = boto3.resource("ec2", config.Config.EC2_REGION) self.boto3client = boto3.client("ec2", config.Config.EC2_REGION) # Get images from ec2 @@ -276,26 +278,26 @@ def tangoMachineToEC2Instance(self, vm: TangoMachine) -> dict: def createKeyPair(self): # TODO: SUPPORT raise - # try to delete the key to avoid collision - self.key_pair_path = "%s/%s.pem" % ( - config.Config.DYNAMIC_SECURITY_KEY_PATH, - self.key_pair_name, - ) - self.deleteKeyPair() - key = self.connection.create_key_pair(self.key_pair_name) - key.save(config.Config.DYNAMIC_SECURITY_KEY_PATH) - # change the SSH_FLAG accordingly - self.ssh_flags[1] = self.key_pair_path + # # try to delete the key to avoid collision + # self.key_pair_path: str = "%s/%s.pem" % ( + # config.Config.DYNAMIC_SECURITY_KEY_PATH, + # self.key_pair_name, + # ) + # self.deleteKeyPair() + # key = self.connection.create_key_pair(self.key_pair_name) + # key.save(config.Config.DYNAMIC_SECURITY_KEY_PATH) + # # change the SSH_FLAG accordingly + # self.ssh_flags[1] = self.key_pair_path def deleteKeyPair(self): # TODO: SUPPORT raise - self.boto3client.delete_key_pair(self.key_pair_name) - # try to delete may not exist key file - try: - os.remove(self.key_pair_path) - except OSError: - pass + # self.boto3client.delete_key_pair(self.key_pair_name) + # # try to delete may not exist key file + # try: + # os.remove(self.key_pair_path) + # except OSError: + # pass def createSecurityGroup(self): try: @@ -329,7 +331,7 @@ def createSecurityGroup(self): # # VMMS API functions # - def initializeVM(self, vm: TangoMachine) -> Optional[TangoMachine]: + def initializeVM(self, vm: TangoMachine) -> Literal[0, -1]: """initializeVM - Tell EC2 to create a new VM instance. Returns a boto.ec2.instance.Instance object. @@ -343,8 +345,8 @@ def initializeVM(self, vm: TangoMachine) -> Optional[TangoMachine]: # ensure that security group exists self.createSecurityGroup() if self.useDefaultKeyPair: - self.key_pair_name = config.Config.SECURITY_KEY_NAME - self.key_pair_path = config.Config.SECURITY_KEY_PATH + self.key_pair_name: str = config.Config.SECURITY_KEY_NAME + self.key_pair_path: str = config.Config.SECURITY_KEY_PATH else: # TODO: SUPPORT raise @@ -358,7 +360,7 @@ def initializeVM(self, vm: TangoMachine) -> Optional[TangoMachine]: "InstanceInterruptionBehavior": "terminate" } } - reservation: list[boto3.ec2.Instance] = self.boto3resource.create_instances( + reservation: List[Instance] = self.boto3resource.create_instances( ImageId=ec2instance["ami"], KeyName=self.key_pair_name, SecurityGroups=[config.Config.DEFAULT_SECURITY_GROUP], @@ -440,7 +442,7 @@ def initializeVM(self, vm: TangoMachine) -> Optional[TangoMachine]: vm.domain_name = newInstance.public_ip_address vm.instance_id = newInstance.id self.log.debug("VM %s: %s" % (instanceName, newInstance)) - return vm + return 0 except Exception as e: self.log.debug("initializeVM Failed: %s" % e) @@ -455,10 +457,10 @@ def initializeVM(self, vm: TangoMachine) -> Optional[TangoMachine]: self.log.error( "Exception handling failed for %s: %s" % (vm.name, e) ) - return None - return None + return -1 + return -1 - def waitVM(self, vm, max_secs) -> 0 | -1: + def waitVM(self, vm, max_secs) -> Literal[0, -1]: """waitVM - Wait at most max_secs for a VM to become ready. Return error if it takes too long. diff --git a/vmms/interface.py b/vmms/interface.py new file mode 100644 index 00000000..d71500f9 --- /dev/null +++ b/vmms/interface.py @@ -0,0 +1,42 @@ +from typing import Protocol, Optional, Literal, List +from tangoObjects import TangoMachine +from abc import abstractmethod + + +class VMMSInterface(Protocol): + @abstractmethod + def initializeVM(self, vm: TangoMachine) -> Literal[0, -1]: + ... + + @abstractmethod + def waitVM(self, vm: TangoMachine, max_secs: int) -> Literal[0, -1]: + ... + + @abstractmethod + def copyIn(self, vm: TangoMachine, inputFiles: List[str], job_id: Optional[int] = None) -> Literal[0, -1]: + ... + + @abstractmethod + def runJob(self, vm: TangoMachine, runTimeout: int, maxOutputFileSize: int, disableNetwork: bool) -> int: # -1 to infinity + ... + + @abstractmethod + def copyOut(self, vm: TangoMachine, destFile: str) -> Literal[0, -1]: + ... + + @abstractmethod + def destroyVM(self, vm: TangoMachine) -> Literal[0, -1]: + ... + + @abstractmethod + def safeDestroyVM(self, vm: TangoMachine) -> Literal[0, -1]: + ... + + @abstractmethod + def getVMs(self) -> List[TangoMachine]: + ... + + @abstractmethod + def existsVM(self, vm: TangoMachine) -> bool: + ... + diff --git a/vmms/localDocker.py b/vmms/localDocker.py index 1a1e1dd0..ada77c09 100644 --- a/vmms/localDocker.py +++ b/vmms/localDocker.py @@ -117,7 +117,7 @@ def domainName(self, vm): # def initializeVM(self, vm): """initializeVM - Nothing to do for initializeVM""" - return vm + return 0 def waitVM(self, vm, max_secs): """waitVM - Nothing to do for waitVM""" From 029c1a9f386551c886d01d6393e93faf30db6c64 Mon Sep 17 00:00:00 2001 From: Anthony Yip Date: Tue, 16 Sep 2025 16:21:04 -0400 Subject: [PATCH 09/22] finished mypy stuff up to not using --check-untyped-defs --- Dockerfile | 3 +++ requirements.txt | 49 +++++++++++++++++++++++++++++++++++++++++------- vmms/ec2SSH.py | 45 ++++++++++++++++++++++---------------------- 3 files changed, 68 insertions(+), 29 deletions(-) diff --git a/Dockerfile b/Dockerfile index c120e15f..d6c7dac8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -70,5 +70,8 @@ RUN mkdir -p /var/log/docker /var/log/supervisor RUN cp /opt/TangoService/Tango/deployment/config/nginx.conf /etc/nginx/nginx.conf RUN cp /opt/TangoService/Tango/deployment/config/supervisord.conf /etc/supervisor/supervisord.conf +# Set up PYTHONPATH +ENV PYTHONPATH="/opt/TangoService/Tango" + # Reload new config scripts CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/supervisord.conf"] diff --git a/requirements.txt b/requirements.txt index 548d9625..8ebee4d7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,12 +1,47 @@ +async-timeout==5.0.1 +backoff==2.2.1 backports.ssl-match-hostname==3.7.0.1 -boto3 +boto3==1.37.38 +boto3-stubs==1.40.31 +botocore==1.37.38 +botocore-stubs==1.38.30 +certifi==2025.8.3 +cffi==1.17.1 +charset-normalizer==3.4.3 +cryptography==45.0.7 +distlib==0.4.0 +docker==5.0.3 +filelock==3.16.1 +idna==3.10 +jmespath==1.0.1 +mypy==1.14.1 +mypy-boto3-ec2==1.40.24 +mypy-extensions==1.1.0 +platformdirs==4.3.6 +pycparser==2.23 pyflakes==2.1.1 +python-dateutil==2.9.0.post0 +pytz==2025.2 +PyYAML==6.0.2 redis==4.4.4 requests==2.31.0 -# needed for tashi, rpyc==4.1.4 +s3transfer==0.11.5 +six==1.17.0 +supervisor==4.1.0 +tomli==2.2.1 tornado==6.4.1 -urllib3==1.26.19 -docker==5.0.3 -backoff==2.2.1 -pytz -pyyaml +types-awscrt==0.27.6 +types-cffi==1.16.0.20241221 +types-docker==7.1.0.20241229 +types-jmespath==1.0.2.20240106 +types-pyflakes==3.2.0.20240813 +types-pyOpenSSL==24.1.0.20240722 +types-PyYAML==6.0.12.20241230 +types-redis==4.6.0.20241004 +types-requests==2.32.0.20241016 +types-s3transfer==0.13.1 +types-setuptools==75.8.0.20250110 +typing-extensions==4.13.2 +urllib3==2.2.3 +virtualenv==20.34.0 +websocket-client==1.8.0 diff --git a/vmms/ec2SSH.py b/vmms/ec2SSH.py index 0e7297a8..8f59e1d0 100644 --- a/vmms/ec2SSH.py +++ b/vmms/ec2SSH.py @@ -22,9 +22,10 @@ import config from tangoObjects import TangoMachine -from typing import Optional, Literal, List +from typing import Optional, Literal, List, Sequence from mypy_boto3_ec2 import EC2ServiceResource from mypy_boto3_ec2.service_resource import Instance +from mypy_boto3_ec2.type_defs import FilterTypeDef # suppress most boto logging logging.getLogger("boto3").setLevel(logging.CRITICAL) @@ -189,7 +190,14 @@ def __init__(self, accessKeyId=None, accessKey=None): self.useDefaultKeyPair = True # key pair settings, for now, use default security key - + if self.useDefaultKeyPair: + self.key_pair_name: str = config.Config.SECURITY_KEY_NAME + self.key_pair_path: str = config.Config.SECURITY_KEY_PATH + else: + # TODO: SUPPORT. Know that this if/else block used to be under initializeVM, using vm for a unique identifier + raise + # self.key_pair_name = self.keyPairName(vm.id, vm.name) + # self.createKeyPair() # create boto3resource self.img2ami = {} @@ -336,7 +344,7 @@ def initializeVM(self, vm: TangoMachine) -> Literal[0, -1]: Returns a boto.ec2.instance.Instance object. """ - newInstance = None + newInstance: Optional[Instance] = None # Create the instance and obtain the reservation try: instanceName = self.instanceName(vm.id, vm.name) @@ -344,22 +352,8 @@ def initializeVM(self, vm: TangoMachine) -> Literal[0, -1]: self.log.debug("instanceName: %s" % instanceName) # ensure that security group exists self.createSecurityGroup() - if self.useDefaultKeyPair: - self.key_pair_name: str = config.Config.SECURITY_KEY_NAME - self.key_pair_path: str = config.Config.SECURITY_KEY_PATH - else: - # TODO: SUPPORT - raise - self.key_pair_name = self.keyPairName(vm.id, vm.name) - self.createKeyPair() - - instance_market_options = { - "MarketType": "spot", - "SpotOptions": { - "SpotInstanceType": "one-time", - "InstanceInterruptionBehavior": "terminate" - } - } + + reservation: List[Instance] = self.boto3resource.create_instances( ImageId=ec2instance["ami"], KeyName=self.key_pair_name, @@ -367,7 +361,14 @@ def initializeVM(self, vm: TangoMachine) -> Literal[0, -1]: InstanceType=ec2instance["instance_type"], MaxCount=1, MinCount=1, - InstanceMarketOptions=instance_market_options, + InstanceMarketOptions= + { + "MarketType": "spot", + "SpotOptions": { + "SpotInstanceType": "one-time", + "InstanceInterruptionBehavior": "terminate" + } + }, ) # Sleep for a while to prevent random transient errors observed @@ -376,7 +377,7 @@ def initializeVM(self, vm: TangoMachine) -> Literal[0, -1]: # reservation is a list of instances created. there is only # one instance created so get index 0. - newInstance: boto3.ec2.Instance = reservation[0] + newInstance = reservation[0] if not newInstance: # TODO: when does this happen? raise ValueError("Cannot find new instance for %s" % vm.name) @@ -385,7 +386,7 @@ def initializeVM(self, vm: TangoMachine) -> Literal[0, -1]: start_time = time.time() while True: - filters = [ + filters: Sequence[FilterTypeDef] = [ {"Name": "instance-state-name", "Values": ["running"]} ] instances = self.boto3resource.instances.filter(Filters=filters) From 85bdf7c2398092bef6d15def3915721c6e57dfdf Mon Sep 17 00:00:00 2001 From: Anthony Yip Date: Mon, 22 Sep 2025 17:44:24 -0400 Subject: [PATCH 10/22] doesn't quite work, but wanted a checkpoint before muddling with the preallocator --- jobQueue.py | 11 +++++----- mypy.ini | 3 +++ preallocator.py | 11 ++++++---- tangoObjects.py | 7 +++--- tests/testObjects.py | 2 +- vmms/ec2SSH.py | 10 ++++++--- vmms/localDocker.py | 4 +++- worker.py | 51 ++++++++++++++++++++++++++++++++++++++------ 8 files changed, 75 insertions(+), 24 deletions(-) diff --git a/jobQueue.py b/jobQueue.py index 2d0a9934..ee5b2956 100644 --- a/jobQueue.py +++ b/jobQueue.py @@ -53,8 +53,8 @@ def __init__(self, preallocator): is needed since there are multiple worker threads and they might be using the makeUnassigned api. """ - self.liveJobs = TangoDictionary("liveJobs") - self.deadJobs = TangoDictionary("deadJobs") + self.liveJobs: TangoDictionary[TangoJob] = TangoDictionary.create("liveJobs") + self.deadJobs: TangoDictionary[TangoJob] = TangoDictionary.create("deadJobs") self.unassignedJobs = TangoQueue("unassignedLiveJobs") self.queueLock = threading.Lock() self.preallocator = preallocator @@ -187,7 +187,7 @@ def addDead(self, job): self.log.info("addDead|Unassigning job %s" % str(job.id)) job.makeUnassigned() - job.retries = 0 + job.resetRetries() self.log.debug("addDead|Acquiring lock to job queue.") self.queueLock.acquire() @@ -248,12 +248,13 @@ def get(self, id): # TODO: this function is a little weird. It sets the state of job to be "assigned", but not to which worker. # TODO: It does assign the job to a particular VM though. - def assignJob(self, jobId, vm=None): + # Precondition: jobId is in self.liveJobs + def assignJob(self, jobId, vm=None) -> None: """assignJob - marks a job to be assigned""" self.queueLock.acquire() self.log.debug("assignJob| Acquired lock to job queue.") - job = self.liveJobs.get(jobId) + job = self.liveJobs.getExn(jobId) print(str(job), jobId) # print(str(self.unassignedJobs)) # print(str(self.liveJobs)) diff --git a/mypy.ini b/mypy.ini index db776d15..f6332a89 100644 --- a/mypy.ini +++ b/mypy.ini @@ -2,4 +2,7 @@ explicit_package_bases = True [mypy-vmms.tashiSSH] +ignore_errors = True + +[mypy-preallocator] ignore_errors = True \ No newline at end of file diff --git a/preallocator.py b/preallocator.py index 93e9c822..6607c552 100644 --- a/preallocator.py +++ b/preallocator.py @@ -6,7 +6,7 @@ import time import copy -from tangoObjects import TangoDictionary, TangoQueue, TangoIntValue +from tangoObjects import TangoDictionary, TangoQueue, TangoIntValue, TangoMachine from config import Config # @@ -22,7 +22,7 @@ class Preallocator(object): def __init__(self, vmms): - self.machines = TangoDictionary("machines") + self.machines: TangoDictionary[TangoMachine] = TangoDictionary.create("machines") self.lock = threading.Lock() self.nextID = TangoIntValue("nextID", 1000) self.vmms = vmms @@ -33,7 +33,7 @@ def poolSize(self, vmName): if vmName not in self.machines: return 0 else: - return len(self.machines.get(vmName)[0]) + return len(self.machines.getExn(vmName)[0]) def update(self, vm, num): """update - Updates the number of machines of a certain type @@ -47,7 +47,10 @@ def update(self, vm, num): self.lock.acquire() if vm.name not in self.machines: self.machines.set(vm.name, [[], TangoQueue(vm.name)]) - self.machines.get(vm.name)[1].make_empty() + self.machines.getExn(vm.name)[1].make_empty() # TODO: oh bruh this is incorrect. + # TODO: when used with a TangoRemoteDictionary, this will create a transient copy of the queue (from the redis pickle), + # TODO: then modify it, and then discard it :). self.machines will not be updated by a .get/.getExn call. + # TODO: TangoDictionary's should have value semantics, so the value type should probably be a tuple self.log.debug("Creating empty pool of %s instances" % (vm.name)) self.lock.release() diff --git a/tangoObjects.py b/tangoObjects.py index a5554e1e..9bf60b65 100644 --- a/tangoObjects.py +++ b/tangoObjects.py @@ -246,7 +246,7 @@ def setId(self, new_id): if self._remoteLocation is not None: dict_hash = self._remoteLocation.split(":")[0] key = self._remoteLocation.split(":")[1] - dictionary = TangoDictionary.create(dict_hash) + dictionary: TangoDictionary[TangoJob] = TangoDictionary.create(dict_hash) dictionary.delete(key) self._remoteLocation = dict_hash + ":" + str(new_id) self.updateRemote() @@ -274,7 +274,7 @@ def syncRemote(self): if Config.USE_REDIS and self._remoteLocation is not None: dict_hash = self._remoteLocation.split(":")[0] key = self._remoteLocation.split(":")[1] - dictionary = TangoDictionary(dict_hash) + dictionary: TangoDictionary[TangoJob] = TangoDictionary.create(dict_hash) temp_job = dictionary.get(key) # Key should be in dictionary if temp_job is None: print(f"Job {key} not found in dictionary {dict_hash}") # TODO: add better error handling for TangoJob @@ -285,7 +285,7 @@ def updateRemote(self): if Config.USE_REDIS and self._remoteLocation is not None: dict_hash = self._remoteLocation.split(":")[0] key = self._remoteLocation.split(":")[1] - dictionary = TangoDictionary(dict_hash) + dictionary: TangoDictionary[TangoJob] = TangoDictionary.create(dict_hash) dictionary.set(key, self) def deleteFromDict(self, dictionary : TangoDictionary) -> None: @@ -440,6 +440,7 @@ def make_empty(self): break T = TypeVar('T') +# Dictionary from string to T class TangoDictionary(Protocol[T]): @staticmethod diff --git a/tests/testObjects.py b/tests/testObjects.py index f0b08ea6..d477073f 100644 --- a/tests/testObjects.py +++ b/tests/testObjects.py @@ -18,7 +18,7 @@ def setUp(self): } def runDictionaryTests(self): - test_dict = TangoDictionary("test") + test_dict = TangoDictionary.create("test") self.assertEqual(test_dict.keys(), []) self.assertEqual(test_dict.values(), []) diff --git a/vmms/ec2SSH.py b/vmms/ec2SSH.py index 8f59e1d0..02a3f577 100644 --- a/vmms/ec2SSH.py +++ b/vmms/ec2SSH.py @@ -27,6 +27,9 @@ from mypy_boto3_ec2.service_resource import Instance from mypy_boto3_ec2.type_defs import FilterTypeDef +from vmms.interface import VMMSInterface + + # suppress most boto logging logging.getLogger("boto3").setLevel(logging.CRITICAL) logging.getLogger("botocore").setLevel(logging.CRITICAL) @@ -143,7 +146,7 @@ class ec2CallError(Exception): pass -class Ec2SSH(object): +class Ec2SSH(VMMSInterface): _SSH_FLAGS = [ "-i", config.Config.SECURITY_KEY_PATH, @@ -200,10 +203,11 @@ def __init__(self, accessKeyId=None, accessKey=None): # self.createKeyPair() # create boto3resource - self.img2ami = {} + self.img2ami = {} # this is a bad name, should really be img_name to img self.images = [] try: - self.boto3resource: EC2ServiceResource = boto3.resource("ec2", config.Config.EC2_REGION) + # This is a service resource + self.boto3resource: EC2ServiceResource = boto3.resource("ec2", config.Config.EC2_REGION) # TODO: rename this ot self.ec2resource self.boto3client = boto3.client("ec2", config.Config.EC2_REGION) # Get images from ec2 diff --git a/vmms/localDocker.py b/vmms/localDocker.py index ada77c09..8ccfb7d5 100644 --- a/vmms/localDocker.py +++ b/vmms/localDocker.py @@ -14,6 +14,8 @@ import config from tangoObjects import TangoMachine +from vmms.interface import VMMSInterface + def timeout(command, time_out=1): """timeout - Run a unix command with a timeout. Return -1 on @@ -72,7 +74,7 @@ def timeoutWithReturnStatus(command, time_out, returnValue=0): # -class LocalDocker(object): +class LocalDocker(VMMSInterface): def __init__(self): """Checks if the machine is ready to run docker containers. Initialize boot2docker if running on OS X. diff --git a/worker.py b/worker.py index 59a17239..14da23f6 100644 --- a/worker.py +++ b/worker.py @@ -6,7 +6,7 @@ import logging import tempfile import requests -from requests.packages.urllib3.util.retry import Retry +from urllib3.util.retry import Retry from requests.adapters import HTTPAdapter import os import shutil @@ -41,6 +41,7 @@ def __init__(self, job, vmms, jobQueue, preallocator, preVM): # # Worker helper functions # + # TODO: These should not have default values in my opinion def detachVM(self, return_vm=False, replace_vm=False): """detachVM - Detach the VM from this worker. The options are to return it to the pool's free list (return_vm), destroy it @@ -106,8 +107,8 @@ def rescheduleJob(self, hdrfile, ret, err): ) self.appendMsg( hdrfile, - "Job status: waitVM=%s copyIn=%s runJob=%s copyOut=%s" - % (ret["waitvm"], ret["copyin"], ret["runjob"], ret["copyout"]), + "Job status: waitVM=%s initializeVM=%s copyIn=%s runJob=%s copyOut=%s" + % (ret["waitvm"], ret["initializevm"], ret["copyin"], ret["runjob"], ret["copyout"]), ) self.catFiles(hdrfile, self.job.outputFile) @@ -165,7 +166,10 @@ def notifyServer(self, job): except Exception as e: self.log.debug("Error in notifyServer: %s" % str(e)) - def afterJobExecution(self, hdrfile, msg, returnVM, killVM=True): + def afterJobExecution(self, hdrfile, msg, returnVM, killVM=True): + # TODO: I don't think killVM is a good variable name, it can refer to either returning or destroying the VM + # TODO: Also, what does it mean to not kill the VM? (i.e. not returning it)? It only gets called when we have a job.stopBefore. + # TODO: This directly contradicts the documentation of detachVM ("The worker must always call this function before returning.") self.jobQueue.makeDead(self.job, msg) # Update the text that users see in the autodriver output file @@ -188,6 +192,7 @@ def run(self): # Hash of return codes for each step ret = {} ret["waitvm"] = None + ret["initializevm"] = None ret["copyin"] = None ret["runjob"] = None ret["copyout"] = None @@ -201,6 +206,7 @@ def run(self): # Assigning job to a preallocated VM if self.preVM: # self.preVM: + assert not Config.VMMS_NAME == "ec2ssh", "Unimplemented" self.log.debug("Assigning job to preallocated VM") self.job.makeVM(self.preVM) self.log.info( @@ -221,6 +227,7 @@ def run(self): ) ) self.log.debug("Assigned job to preallocated VM") + ret["initializevm"] = 0 # Vacuous success since it doesn't happen # Assigning job to a new VM else: self.log.debug("Assigning job to a new VM") @@ -247,7 +254,15 @@ def run(self): ) # Host name returned from EC2 is stored in the vm object - self.vmms.initializeVM(self.job.vm) # TODO: This can return None if the step fails, check for that + ret["initializevm"] = self.vmms.initializeVM(self.job.vm) + if ret["initializevm"] != 0: + self.rescheduleJob( + hdrfile, + ret, + "Internal error: initializeVM failed" + ) + return + self.log.debug("Asigned job to a new VM") vm = self.job.vm @@ -321,8 +336,12 @@ def run(self): msg = "Error: Copy in to VM failed (status=%d)" % (ret["copyin"]) self.job.vm.notes = str(self.job.id) + "_" + self.job.name self.job.vm.keep_for_debugging = True - self.afterJobExecution(hdrfile, msg, False) - # TODO: no reschedule? + self.log.debug(msg) + self.rescheduleJob( + hdrfile, + ret, + msg + ) return self.log.info( @@ -350,6 +369,13 @@ def run(self): Config.runjob_errors += 1 if ret["runjob"] == -1: Config.runjob_timeouts += 1 + self.rescheduleJob( + hdrfile, + ret, + "Internal error: runJob failed" + ) + return + self.log.info( "Job %s:%d executed [status=%s]" % (self.job.name, self.job.id, ret["runjob"]) @@ -368,6 +394,13 @@ def run(self): ret["copyout"] = self.vmms.copyOut(vm, self.job.outputFile) if ret["copyout"] != 0: Config.copyout_errors += 1 + self.rescheduleJob( + hdrfile, + ret, + "Internal error: copyOut failed" + ) + return + self.log.info( "Output copied for job %s:%d [status=%d]" % (self.job.name, self.job.id, ret["copyout"]) @@ -382,6 +415,10 @@ def run(self): # normal termination and doesn't reschedule the job. self.log.info("Success: job %s:%d finished" % (self.job.name, self.job.id)) + for status in ret.values(): + assert status == 0, "Should not get to the success point if any stage failed" + # TODO: test this, then remove everything below this point + # Move the job from the live queue to the dead queue # with an explanatory message msg = "Success: Autodriver returned normally" From 7e6751e61e71b7f7d8f5bb2859c76c567580a520 Mon Sep 17 00:00:00 2001 From: Anthony Yip Date: Mon, 22 Sep 2025 21:08:06 -0400 Subject: [PATCH 11/22] more typing changes, streamlining the used of the TangoDictionary in preallocator (do note that my initial worry about incorrectness was unfounded due to weird Redis sharing behavior) --- jobManager.py | 8 +++++--- jobQueue.py | 2 +- preallocator.py | 15 ++++++++------- tangoObjects.py | 42 ++++++++++++++++++++++++++++++++---------- tests/testObjects.py | 2 +- 5 files changed, 47 insertions(+), 22 deletions(-) diff --git a/jobManager.py b/jobManager.py index 65d62116..c21d316f 100644 --- a/jobManager.py +++ b/jobManager.py @@ -20,7 +20,8 @@ import tango # Written this way to avoid circular imports from config import Config from jobQueue import JobQueue -from tangoObjects import TangoJob, TangoQueue +from tangoObjects import TangoJob, TangoQueue, TangoMachine +from typing import List, Tuple from worker import Worker @@ -143,14 +144,15 @@ def __manage(self) -> None: tango_server.log.debug("Resetting Tango VMs") tango_server.resetTango(tango_server.preallocator.vmms) for key in tango_server.preallocator.machines.keys(): - tango_server.preallocator.machines.set(key, [[], TangoQueue(key)]) + machine: Tuple[List[TangoMachine], TangoQueue] = ([], TangoQueue.create(key)) + machine[1].make_empty() + tango_server.preallocator.machines.set(key, machine) # The above call sets the total pool empty. But the free pool which # is a queue in redis, may not be empty. When the job manager restarts, # resetting the free queue using the key doesn't change its content. # Therefore we empty the queue, thus the free pool, to keep it consistent # with the total pool. - tango_server.preallocator.machines.get(key)[1].make_empty() jobs = JobManager(tango_server.jobQueue) print("Starting the stand-alone Tango JobManager") diff --git a/jobQueue.py b/jobQueue.py index ee5b2956..34c3ae5b 100644 --- a/jobQueue.py +++ b/jobQueue.py @@ -55,7 +55,7 @@ def __init__(self, preallocator): """ self.liveJobs: TangoDictionary[TangoJob] = TangoDictionary.create("liveJobs") self.deadJobs: TangoDictionary[TangoJob] = TangoDictionary.create("deadJobs") - self.unassignedJobs = TangoQueue("unassignedLiveJobs") + self.unassignedJobs = TangoQueue.create("unassignedLiveJobs") self.queueLock = threading.Lock() self.preallocator = preallocator self.log = logging.getLogger("JobQueue") diff --git a/preallocator.py b/preallocator.py index 6607c552..69291933 100644 --- a/preallocator.py +++ b/preallocator.py @@ -8,6 +8,7 @@ from tangoObjects import TangoDictionary, TangoQueue, TangoIntValue, TangoMachine from config import Config +from typing import Tuple, List # # Preallocator - This class maintains a pool of active VMs for future @@ -22,7 +23,7 @@ class Preallocator(object): def __init__(self, vmms): - self.machines: TangoDictionary[TangoMachine] = TangoDictionary.create("machines") + self.machines: TangoDictionary[Tuple[List[TangoMachine], TangoQueue]] = TangoDictionary.create("machines") self.lock = threading.Lock() self.nextID = TangoIntValue("nextID", 1000) self.vmms = vmms @@ -46,11 +47,9 @@ def update(self, vm, num): """ self.lock.acquire() if vm.name not in self.machines: - self.machines.set(vm.name, [[], TangoQueue(vm.name)]) - self.machines.getExn(vm.name)[1].make_empty() # TODO: oh bruh this is incorrect. - # TODO: when used with a TangoRemoteDictionary, this will create a transient copy of the queue (from the redis pickle), - # TODO: then modify it, and then discard it :). self.machines will not be updated by a .get/.getExn call. - # TODO: TangoDictionary's should have value semantics, so the value type should probably be a tuple + initial_queue = TangoQueue.create(vm.name) + initial_queue.make_empty() + self.machines.set(vm.name, ([], initial_queue)) self.log.debug("Creating empty pool of %s instances" % (vm.name)) self.lock.release() @@ -209,7 +208,9 @@ def destroyVM(self, vmName, id): for i in range(size): vm = self.machines.get(vmName)[1].get_nowait() if vm.id != id: - self.machines.get(vmName)[1].put(vm) + machine = self.machines.get(vmName) + machine[1].put(vm) + self.machines.set(vmName, machine) else: dieVM = vm self.lock.release() diff --git a/tangoObjects.py b/tangoObjects.py index 9bf60b65..7d0dfc55 100644 --- a/tangoObjects.py +++ b/tangoObjects.py @@ -342,16 +342,35 @@ def get(self): def set(self, val): self.val = val return val + +class TangoQueue(Protocol): + @staticmethod + def create(key_name: str) -> TangoQueue: + if Config.USE_REDIS: + return TangoRemoteQueue(key_name) + else: + return ExtendedQueue() -def TangoQueue(object_name): - if Config.USE_REDIS: - return TangoRemoteQueue(object_name) - else: - return ExtendedQueue() - + @abstractmethod + def qsize(self) -> int: + ... + def empty(self) -> bool: + ... + def put(self, item) -> None: + ... + def get(self, block=True, timeout=None) -> Optional[T]: + ... + def get_nowait(self) -> Optional[T]: + ... + def remove(self, item) -> None: + ... + def _clean(self) -> None: + ... + def make_empty(self) -> None: + ... -class ExtendedQueue(Queue): +class ExtendedQueue(Queue, TangoQueue): """Python Thread safe Queue with the remove and clean function added""" def test(self): @@ -369,9 +388,12 @@ def remove(self, value): def _clean(self): with self.mutex: self.queue.clear() + + def make_empty(self): + self._clean() + - -class TangoRemoteQueue(object): +class TangoRemoteQueue(TangoQueue): """Simple Queue with Redis Backend""" @@ -433,7 +455,7 @@ def remove(self, item): def _clean(self): self.__db.delete(self.key) - def make_empty(self): + def make_empty(self) -> None: while True: item = self.__db.lpop(self.key) if item is None: diff --git a/tests/testObjects.py b/tests/testObjects.py index d477073f..8ba81270 100644 --- a/tests/testObjects.py +++ b/tests/testObjects.py @@ -69,7 +69,7 @@ def addAllToQueue(self): self.assertEqual(self.testQueue.qsize(), self.expectedSize) def runQueueTests(self): - self.testQueue = TangoQueue("self.testQueue") + self.testQueue = TangoQueue.create("self.testQueue") self.expectedSize = 0 self.assertEqual(self.testQueue.qsize(), self.expectedSize) self.assertTrue(self.testQueue.empty()) From 87b2d4b9a237e7c2689306162e894bd9d9dd21b4 Mon Sep 17 00:00:00 2001 From: Anthony Yip Date: Mon, 22 Sep 2025 23:20:38 -0400 Subject: [PATCH 12/22] tested on a single cli job --- vmms/ec2SSH.py | 75 +++++++++++++++++++++++++++++++------------------- 1 file changed, 46 insertions(+), 29 deletions(-) diff --git a/vmms/ec2SSH.py b/vmms/ec2SSH.py index 02a3f577..9ea0d4ba 100644 --- a/vmms/ec2SSH.py +++ b/vmms/ec2SSH.py @@ -168,6 +168,40 @@ def acquire_vm_semaphore(): def release_vm_semaphore(): """Releases the VM sempahore""" Ec2SSH._vm_semaphore.release() + + def __get_valid_images(self) -> dict: + # Get images from ec2 + images = self.boto3resource.images.filter(Owners=["self"]) + + img_name_to_img_obj = {} + for image in images: + if image.tags: + for tag in image.tags: + if tag["Key"] == "Name" and tag["Value"]: + if tag["Value"] in img_name_to_img_obj: + self.log.info( + "Ignore %s for duplicate name tag %s" + % (image.id, tag["Value"]) + ) + else: + img_name_to_img_obj[tag["Value"]] = image + self.log.info( + "Found image: %s with name tag %s" + % (image.id, tag["Value"]) + ) + + imageAMIs = [item.id for item in images] + taggedAMIs = [img_name_to_img_obj[key].id for key in img_name_to_img_obj] + ignoredAMIs = list(set(imageAMIs) - set(taggedAMIs)) + + if len(ignoredAMIs) > 0: + self.log.info( + "Ignored images %s for lack of or ill-formed name tag" + % str(ignoredAMIs) + ) + + return img_name_to_img_obj + # TODO: the arguments accessKeyId and accessKey don't do anything def __init__(self, accessKeyId=None, accessKey=None): @@ -203,44 +237,17 @@ def __init__(self, accessKeyId=None, accessKey=None): # self.createKeyPair() # create boto3resource - self.img2ami = {} # this is a bad name, should really be img_name to img - self.images = [] try: # This is a service resource self.boto3resource: EC2ServiceResource = boto3.resource("ec2", config.Config.EC2_REGION) # TODO: rename this ot self.ec2resource self.boto3client = boto3.client("ec2", config.Config.EC2_REGION) - # Get images from ec2 - images = self.boto3resource.images.filter(Owners=["self"]) except Exception as e: self.log.error("EC2SSH failed initialization: %s" % (e)) raise + self.img2ami = self.__get_valid_images() - for image in images: - if image.tags: - for tag in image.tags: - if tag["Key"] == "Name" and tag["Value"]: - if tag["Value"] in self.img2ami: - self.log.info( - "Ignore %s for duplicate name tag %s" - % (image.id, tag["Value"]) - ) - else: - self.img2ami[tag["Value"]] = image - self.log.info( - "Found image: %s with name tag %s" - % (image.id, tag["Value"]) - ) - imageAMIs = [item.id for item in images] - taggedAMIs = [self.img2ami[key].id for key in self.img2ami] - ignoredAMIs = list(set(imageAMIs) - set(taggedAMIs)) - - if len(ignoredAMIs) > 0: - self.log.info( - "Ignored images %s for lack of or ill-formed name tag" - % str(ignoredAMIs) - ) def instanceName(self, id, name): """instanceName - Constructs a VM instance name. Always use @@ -282,7 +289,16 @@ def tangoMachineToEC2Instance(self, vm: TangoMachine) -> dict: ec2instance["instance_type"] = config.Config.DEFAULT_INST_TYPE # for now, ami is config default - ec2instance["ami"] = self.img2ami[vm.image].id + if vm.image in self.img2ami: + ec2instance["ami"] = self.img2ami[vm.image].id + else: + # We may need to rescan for new images + self.img2ami = self.__get_valid_images() + if vm.image in self.img2ami: + ec2instance["ami"] = self.img2ami[vm.image].id + else: + self.log.error("Image %s not found" % vm.image) + raise self.log.info("tangoMachineToEC2Instance: %s" % str(ec2instance)) return ec2instance @@ -801,6 +817,7 @@ def existsVM(self, vm): def getImages(self): """getImages - return a constant; actually use the ami specified in config""" + self.img2ami = self.__get_valid_images() return [key for key in self.img2ami] # getTag: to do later From 03e1f3181e23e26d738cb06ce06fb10f82ded926 Mon Sep 17 00:00:00 2001 From: Anthony Yip Date: Sun, 28 Sep 2025 14:08:25 -0400 Subject: [PATCH 13/22] refactored afterJobExecution and detachVM --- worker.py | 92 +++++++++++++++++++++++++++---------------------------- 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/worker.py b/worker.py index 14da23f6..271a9b7f 100644 --- a/worker.py +++ b/worker.py @@ -10,6 +10,7 @@ from requests.adapters import HTTPAdapter import os import shutil +from enum import Enum from datetime import datetime from config import Config @@ -25,6 +26,11 @@ # anything else in the system. # +class DetachMethod(Enum): + RETURN_TO_POOL = "return_to_pool" + DESTROY_WITHOUT_REPLACEMENT = "destroy_without_replacement" + DESTROY_AND_REPLACE = "replace" + class Worker(threading.Thread): def __init__(self, job, vmms, jobQueue, preallocator, preVM): @@ -37,12 +43,18 @@ def __init__(self, job, vmms, jobQueue, preallocator, preVM): self.preVM = preVM threading.Thread.__init__(self) self.log = logging.getLogger("Worker") + self.cleanupStatus = False # # Worker helper functions # - # TODO: These should not have default values in my opinion - def detachVM(self, return_vm=False, replace_vm=False): + def __del__(self): + if self.job.stopBefore == "": # We don't want to cleanup the VM if we are stopping early for debugging + assert self.cleanupStatus, "Worker must call detachVM before returning" + + + # TODO: Return_vm should not have default values in my opinion + def detachVM(self, detachMethod: DetachMethod): """detachVM - Detach the VM from this worker. The options are to return it to the pool's free list (return_vm), destroy it (not return_vm), and if destroying it, whether to replace it @@ -50,20 +62,25 @@ def detachVM(self, return_vm=False, replace_vm=False): this function before returning. """ # job-owned instance, simply destroy after job is completed + self.cleanupStatus = True if self.job.vm.ec2_vmms: self.vmms.safeDestroyVM(self.job.vm) - elif return_vm: - self.preallocator.freeVM(self.job.vm) else: - self.vmms.safeDestroyVM(self.job.vm) - if replace_vm: + if detachMethod == DetachMethod.RETURN_TO_POOL: + self.preallocator.freeVM(self.job.vm) + elif detachMethod == DetachMethod.DESTROY_WITHOUT_REPLACEMENT: + self.vmms.safeDestroyVM(self.job.vm) + self.preallocator.removeVM(self.job.vm) + elif detachMethod == DetachMethod.DESTROY_AND_REPLACE: + self.vmms.safeDestroyVM(self.job.vm) self.preallocator.createVM(self.job.vm) - - # Important: don't remove the VM from the pool until its - # replacement has been created. Otherwise there is a - # potential race where the job manager thinks that the - # pool is empty and creates a spurious vm. - self.preallocator.removeVM(self.job.vm) + # Important: don't remove the VM from the pool until its + # replacement has been created. Otherwise there is a + # potential race where the job manager thinks that the + # pool is empty and creates a spurious vm. + self.preallocator.removeVM(self.job.vm) + else: + raise ValueError(f"Invalid detach method: {detachMethod}") # TODO: figure out what hdrfile, ret and err are def rescheduleJob(self, hdrfile, ret, err): @@ -88,32 +105,18 @@ def rescheduleJob(self, hdrfile, ret, err): os.remove(hdrfile) except OSError: pass - self.detachVM(return_vm=False, replace_vm=True) + self.detachVM(DetachMethod.DESTROY_AND_REPLACE) self.jobQueue.unassignJob(self.job.id) # Here is where we give up else: - self.log.error("Giving up on job %s:%d" % (self.job.name, self.job.id)) + full_err = f"Error: {err}. Unable to complete job after {Config.JOB_RETRIES} tries. Please resubmit.\nJob status: waitVM={ret['waitvm']} initializeVM={ret['initializevm']} copyIn={ret['copyin']} runJob={ret['runjob']} copyOut={ret['copyout']}" + self.log.error(f"Giving up on job %s:%d. %s" % (self.job.name, self.job.id, full_err)) self.job.appendTrace( - "%s|Giving up on job %s:%d" - % (datetime.now().ctime(), self.job.name, self.job.id) + "%s|Giving up on job %s:%d. %s" + % (datetime.now().ctime(), self.job.name, self.job.id, full_err) ) - self.jobQueue.makeDead(self.job, err) # Note: this reports the error that caused the last call to rescheduleJob to fail - - self.appendMsg( - hdrfile, - "Internal error: Unable to complete job after %d tries. Pleae resubmit" - % (Config.JOB_RETRIES), - ) - self.appendMsg( - hdrfile, - "Job status: waitVM=%s initializeVM=%s copyIn=%s runJob=%s copyOut=%s" - % (ret["waitvm"], ret["initializevm"], ret["copyin"], ret["runjob"], ret["copyout"]), - ) - - self.catFiles(hdrfile, self.job.outputFile) - self.detachVM(return_vm=False, replace_vm=True) - self.notifyServer(self.job) + self.afterJobExecution(hdrfile, full_err, DetachMethod.DESTROY_AND_REPLACE) def appendMsg(self, filename, msg): """appendMsg - Append a timestamped Tango message to a file""" @@ -166,10 +169,7 @@ def notifyServer(self, job): except Exception as e: self.log.debug("Error in notifyServer: %s" % str(e)) - def afterJobExecution(self, hdrfile, msg, returnVM, killVM=True): - # TODO: I don't think killVM is a good variable name, it can refer to either returning or destroying the VM - # TODO: Also, what does it mean to not kill the VM? (i.e. not returning it)? It only gets called when we have a job.stopBefore. - # TODO: This directly contradicts the documentation of detachVM ("The worker must always call this function before returning.") + def afterJobExecution(self, hdrfile, msg, detachMethod: Optional[DetachMethod]): self.jobQueue.makeDead(self.job, msg) # Update the text that users see in the autodriver output file @@ -178,8 +178,8 @@ def afterJobExecution(self, hdrfile, msg, returnVM, killVM=True): os.chmod(self.job.outputFile, 0o644) # Thread exit after termination - if killVM: - self.detachVM(return_vm=returnVM) + if detachMethod is not None: + self.detachVM(detachMethod) self.notifyServer(self.job) return @@ -286,7 +286,7 @@ def run(self): if self.job.stopBefore == "waitvm": msg = "Execution stopped before %s" % self.job.stopBefore returnVM = True - self.afterJobExecution(hdrfile, msg, returnVM, False) + self.afterJobExecution(hdrfile, msg, detachMethod=None) return ret["waitvm"] = self.vmms.waitVM(vm, Config.WAITVM_TIMEOUT) @@ -324,7 +324,7 @@ def run(self): if (self.job.stopBefore == "copyin"): msg = "Execution stopped before %s" % self.job.stopBefore returnVM = True - self.afterJobExecution(hdrfile, msg, returnVM, False) + self.afterJobExecution(hdrfile, msg, detachMethod=None) return # Copy input files to VM self.log.debug(f"Before copyIn: ret[copyin] = {ret['copyin']}, job_id: {str(self.job.id)}") @@ -356,7 +356,7 @@ def run(self): if (self.job.stopBefore == "runjob"): msg = "Execution stopped before %s" % self.job.stopBefore returnVM = True - self.afterJobExecution(hdrfile, msg, returnVM, False) + self.afterJobExecution(hdrfile, msg, detachMethod=None) return # Run the job on the virtual machine ret["runjob"] = self.vmms.runJob( @@ -388,7 +388,7 @@ def run(self): if (self.job.stopBefore == "copyout"): msg = "Execution stopped before %s" % self.job.stopBefore returnVM = True - self.afterJobExecution(hdrfile, msg, returnVM, False) + self.afterJobExecution(hdrfile, msg, detachMethod=None) return # Copy the output back. ret["copyout"] = self.vmms.copyOut(vm, self.job.outputFile) @@ -422,7 +422,7 @@ def run(self): # Move the job from the live queue to the dead queue # with an explanatory message msg = "Success: Autodriver returned normally" - (returnVM, replaceVM) = (True, False) + detachMethod = DetachMethod.RETURN_TO_POOL if ret["copyin"] != 0: msg = "Error: Copy in to VM failed (status=%d)" % (ret["copyin"]) elif ret["runjob"] != 0: @@ -436,7 +436,7 @@ def run(self): # and do not retry the job since the job may have damaged # the VM. msg = "Error: OS error while running job on VM" - (returnVM, replaceVM) = (False, True) + detachMethod = DetachMethod.DESTROY_WITHOUT_REPLACEMENT self.job.vm.notes = str(self.job.id) + "_" + self.job.name self.job.vm.keep_for_debugging = True else: # This should never happen @@ -447,7 +447,7 @@ def run(self): elif ret["copyout"] != 0: msg += "Error: Copy out from VM failed (status=%d)" % (ret["copyout"]) - self.afterJobExecution(hdrfile, msg, returnVM) + self.afterJobExecution(hdrfile, msg, detachMethod) return # @@ -463,4 +463,4 @@ def run(self): if self.preVM and not vm: vm = self.job.vm = self.preVM if vm: - self.detachVM(return_vm=False, replace_vm=True) + self.detachVM(DetachMethod.DESTROY_AND_REPLACE) From bc8c8a7c0b33fb33c49af83bcd9c33a8be1ef66b Mon Sep 17 00:00:00 2001 From: Anthony Yip Date: Sun, 28 Sep 2025 15:48:00 -0400 Subject: [PATCH 14/22] error messages --- worker.py | 81 +++++++++++++++++++++++++++++++++---------------------- 1 file changed, 49 insertions(+), 32 deletions(-) diff --git a/worker.py b/worker.py index 271a9b7f..a57e9e5f 100644 --- a/worker.py +++ b/worker.py @@ -53,7 +53,6 @@ def __del__(self): assert self.cleanupStatus, "Worker must call detachVM before returning" - # TODO: Return_vm should not have default values in my opinion def detachVM(self, detachMethod: DetachMethod): """detachVM - Detach the VM from this worker. The options are to return it to the pool's free list (return_vm), destroy it @@ -65,6 +64,7 @@ def detachVM(self, detachMethod: DetachMethod): self.cleanupStatus = True if self.job.vm.ec2_vmms: self.vmms.safeDestroyVM(self.job.vm) + # TODO: what about the preallocator? else: if detachMethod == DetachMethod.RETURN_TO_POOL: self.preallocator.freeVM(self.job.vm) @@ -110,7 +110,7 @@ def rescheduleJob(self, hdrfile, ret, err): # Here is where we give up else: - full_err = f"Error: {err}. Unable to complete job after {Config.JOB_RETRIES} tries. Please resubmit.\nJob status: waitVM={ret['waitvm']} initializeVM={ret['initializevm']} copyIn={ret['copyin']} runJob={ret['runjob']} copyOut={ret['copyout']}" + full_err = f"Internal Error: {err}. Unable to complete job after {Config.JOB_RETRIES} tries. Please resubmit.\nJob status: waitVM={ret['waitvm']} initializeVM={ret['initializevm']} copyIn={ret['copyin']} runJob={ret['runjob']} copyOut={ret['copyout']}" self.log.error(f"Giving up on job %s:%d. %s" % (self.job.name, self.job.id, full_err)) self.job.appendTrace( "%s|Giving up on job %s:%d. %s" @@ -333,7 +333,7 @@ def run(self): if ret["copyin"] != 0: Config.copyin_errors += 1 - msg = "Error: Copy in to VM failed (status=%d)" % (ret["copyin"]) + msg = "Copy in to VM failed (status=%d)" % (ret["copyin"]) self.job.vm.notes = str(self.job.id) + "_" + self.job.name self.job.vm.keep_for_debugging = True self.log.debug(msg) @@ -366,13 +366,31 @@ def run(self): self.job.disableNetwork, ) if ret["runjob"] != 0: - Config.runjob_errors += 1 - if ret["runjob"] == -1: + if ret["runjob"] == 1: # This should never happen + msg = "RunJob: Autodriver usage error (status=%d)" % (ret["runjob"]) + elif ret["runjob"] == 2: + msg = "RunJob: Job timed out after %d seconds" % (self.job.timeout) + elif ret["runjob"] == 3: # EXIT_OSERROR in Autodriver + # Abnormal job termination (Autodriver encountered an OS + # error). Assume that the VM is damaged. Destroy this VM + # and do not retry the job since the job may have damaged + # the VM. + msg = "RunJob: OS error while running job on VM" + # TODO: do we need to not reschedule the job? + self.job.vm.notes = str(self.job.id) + "_" + self.job.name + self.job.vm.keep_for_debugging = True + elif ret["runjob"] == -1: Config.runjob_timeouts += 1 + # TODO: difference between 2 and -1? + else: # This should never happen + msg = "RunJob: Unknown autodriver error (status=%d)" % ( + ret["runjob"] + ) + Config.runjob_errors += 1 self.rescheduleJob( hdrfile, ret, - "Internal error: runJob failed" + msg ) return @@ -397,7 +415,7 @@ def run(self): self.rescheduleJob( hdrfile, ret, - "Internal error: copyOut failed" + f"Internal error: copyOut failed (status={ret['copyout']})" ) return @@ -422,32 +440,31 @@ def run(self): # Move the job from the live queue to the dead queue # with an explanatory message msg = "Success: Autodriver returned normally" - detachMethod = DetachMethod.RETURN_TO_POOL - if ret["copyin"] != 0: - msg = "Error: Copy in to VM failed (status=%d)" % (ret["copyin"]) - elif ret["runjob"] != 0: - if ret["runjob"] == 1: # This should never happen - msg = "Error: Autodriver usage error (status=%d)" % (ret["runjob"]) - elif ret["runjob"] == 2: - msg = "Error: Job timed out after %d seconds" % (self.job.timeout) - elif ret["runjob"] == 3: # EXIT_OSERROR in Autodriver - # Abnormal job termination (Autodriver encountered an OS - # error). Assume that the VM is damaged. Destroy this VM - # and do not retry the job since the job may have damaged - # the VM. - msg = "Error: OS error while running job on VM" - detachMethod = DetachMethod.DESTROY_WITHOUT_REPLACEMENT - self.job.vm.notes = str(self.job.id) + "_" + self.job.name - self.job.vm.keep_for_debugging = True - else: # This should never happen - msg = "Error: Unknown autodriver error (status=%d)" % ( - ret["runjob"] - ) - - elif ret["copyout"] != 0: - msg += "Error: Copy out from VM failed (status=%d)" % (ret["copyout"]) + self.afterJobExecution(hdrfile, msg, DetachMethod.RETURN_TO_POOL) + # if ret["copyin"] != 0: + # msg = "Error: Copy in to VM failed (status=%d)" % (ret["copyin"]) + # elif ret["runjob"] != 0: + # if ret["runjob"] == 1: # This should never happen + # msg = "Error: Autodriver usage error (status=%d)" % (ret["runjob"]) + # elif ret["runjob"] == 2: + # msg = "Error: Job timed out after %d seconds" % (self.job.timeout) + # elif ret["runjob"] == 3: # EXIT_OSERROR in Autodriver + # # Abnormal job termination (Autodriver encountered an OS + # # error). Assume that the VM is damaged. Destroy this VM + # # and do not retry the job since the job may have damaged + # # the VM. + # msg = "Error: OS error while running job on VM" + # detachMethod = DetachMethod.DESTROY_WITHOUT_REPLACEMENT + # self.job.vm.notes = str(self.job.id) + "_" + self.job.name + # self.job.vm.keep_for_debugging = True + # else: # This should never happen + # msg = "Error: Unknown autodriver error (status=%d)" % ( + # ret["runjob"] + # ) + + # elif ret["copyout"] != 0: + # msg += "Error: Copy out from VM failed (status=%d)" % (ret["copyout"]) - self.afterJobExecution(hdrfile, msg, detachMethod) return # From 20b833b0017de3f25161c59d54087de89850a81c Mon Sep 17 00:00:00 2001 From: Anthony Yip Date: Sun, 28 Sep 2025 16:55:49 -0400 Subject: [PATCH 15/22] code cleanup: worker always gets initialized with a preallocated VM (else preVM.name will fail) --- vmms/ec2SSH.py | 1 + worker.py | 88 ++++++++++++++------------------------------------ 2 files changed, 25 insertions(+), 64 deletions(-) diff --git a/vmms/ec2SSH.py b/vmms/ec2SSH.py index 02a3f577..73e63a1b 100644 --- a/vmms/ec2SSH.py +++ b/vmms/ec2SSH.py @@ -347,6 +347,7 @@ def initializeVM(self, vm: TangoMachine) -> Literal[0, -1]: """initializeVM - Tell EC2 to create a new VM instance. Returns a boto.ec2.instance.Instance object. + Reads from vm's id and name, writes to vm's instance_id and domain_name """ newInstance: Optional[Instance] = None # Create the instance and obtain the reservation diff --git a/worker.py b/worker.py index a57e9e5f..8bc84d5f 100644 --- a/worker.py +++ b/worker.py @@ -15,6 +15,8 @@ from datetime import datetime from config import Config from jobQueue import JobQueue +from typing import Optional +from tangoObjects import TangoMachine # # Worker - The worker class is very simple and very dumb. The goal is # to walk through the VMMS interface, track the job's progress, and if @@ -32,8 +34,9 @@ class DetachMethod(Enum): DESTROY_AND_REPLACE = "replace" +# We always preallocate a VM for the worker to use class Worker(threading.Thread): - def __init__(self, job, vmms, jobQueue, preallocator, preVM): + def __init__(self, job, vmms, jobQueue, preallocator, preVM: TangoMachine): threading.Thread.__init__(self) self.daemon = True self.job = job @@ -204,69 +207,30 @@ def run(self): hdrfile = tempfile.mktemp() self.appendMsg(hdrfile, "Received job %s:%d" % (self.job.name, self.job.id)) - # Assigning job to a preallocated VM - if self.preVM: # self.preVM: - assert not Config.VMMS_NAME == "ec2ssh", "Unimplemented" - self.log.debug("Assigning job to preallocated VM") - self.job.makeVM(self.preVM) - self.log.info( - "Assigned job %s:%d existing VM %s" - % ( - self.job.name, - self.job.id, - self.vmms.instanceName(self.preVM.id, self.preVM.name), - ) - ) - self.job.appendTrace( - "%s|Assigned job %s:%d existing VM %s" - % ( - datetime.now().ctime(), - self.job.name, - self.job.id, - self.vmms.instanceName(self.preVM.id, self.preVM.name), - ) - ) - self.log.debug("Assigned job to preallocated VM") - ret["initializevm"] = 0 # Vacuous success since it doesn't happen - # Assigning job to a new VM - else: - self.log.debug("Assigning job to a new VM") - self.job.syncRemote() - self.job.vm.id = self.job.id - self.job.updateRemote() - - self.log.info( - "Assigned job %s:%d new VM %s" - % ( - self.job.name, - self.job.id, - self.vmms.instanceName(self.job.vm.id, self.job.vm.name), - ) + # Assigning job to the preallocated VM + self.log.debug("Assigning job to preallocated VM") + self.job.makeVM(self.preVM) + self.log.info( + "Assigned job %s:%d existing VM %s" + % ( + self.job.name, + self.job.id, + self.vmms.instanceName(self.preVM.id, self.preVM.name), ) - self.job.appendTrace( - "%s|Assigned job %s:%d new VM %s" - % ( - datetime.now().ctime(), - self.job.name, - self.job.id, - self.vmms.instanceName(self.job.vm.id, self.job.vm.name), - ) + ) + self.job.appendTrace( + "%s|Assigned job %s:%d existing VM %s" + % ( + datetime.now().ctime(), + self.job.name, + self.job.id, + self.vmms.instanceName(self.preVM.id, self.preVM.name), ) - - # Host name returned from EC2 is stored in the vm object - ret["initializevm"] = self.vmms.initializeVM(self.job.vm) - if ret["initializevm"] != 0: - self.rescheduleJob( - hdrfile, - ret, - "Internal error: initializeVM failed" - ) - return - - self.log.debug("Asigned job to a new VM") + ) + self.log.debug("Assigned job to preallocated VM") + ret["initializevm"] = 0 # Vacuous success since it doesn't happen vm = self.job.vm - returnVM = True # Wait for the instance to be ready self.log.debug( @@ -285,7 +249,6 @@ def run(self): self.log.debug("Waiting for VM") if self.job.stopBefore == "waitvm": msg = "Execution stopped before %s" % self.job.stopBefore - returnVM = True self.afterJobExecution(hdrfile, msg, detachMethod=None) return ret["waitvm"] = self.vmms.waitVM(vm, Config.WAITVM_TIMEOUT) @@ -323,7 +286,6 @@ def run(self): if (self.job.stopBefore == "copyin"): msg = "Execution stopped before %s" % self.job.stopBefore - returnVM = True self.afterJobExecution(hdrfile, msg, detachMethod=None) return # Copy input files to VM @@ -355,7 +317,6 @@ def run(self): if (self.job.stopBefore == "runjob"): msg = "Execution stopped before %s" % self.job.stopBefore - returnVM = True self.afterJobExecution(hdrfile, msg, detachMethod=None) return # Run the job on the virtual machine @@ -405,7 +366,6 @@ def run(self): if (self.job.stopBefore == "copyout"): msg = "Execution stopped before %s" % self.job.stopBefore - returnVM = True self.afterJobExecution(hdrfile, msg, detachMethod=None) return # Copy the output back. From 138e33299479eca5ccf10c60bded480a5fcd488c Mon Sep 17 00:00:00 2001 From: Anthony Yip Date: Sun, 26 Oct 2025 22:17:55 -0400 Subject: [PATCH 16/22] fixed logging type safety --- jobManager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jobManager.py b/jobManager.py index 635df83e..e4e4cb54 100644 --- a/jobManager.py +++ b/jobManager.py @@ -135,7 +135,7 @@ def __manage(self) -> None: if __name__ == "__main__": if not Config.USE_REDIS: - tango.log.error( + print( "You need to have Redis running to be able to initiate stand-alone\ JobManager" ) @@ -155,5 +155,5 @@ def __manage(self) -> None: # with the total pool. jobs = JobManager(tango_server.jobQueue) - tango.log.info("Starting the stand-alone Tango JobManager") + tango_server.log.info("Starting the stand-alone Tango JobManager") jobs.run() From d1a8b27aa7c5725b33f13c3f8104fbddc8f319df Mon Sep 17 00:00:00 2001 From: Anthony Yip Date: Sun, 26 Oct 2025 22:25:14 -0400 Subject: [PATCH 17/22] always assert that detachVM is called (taken care of with .keep_for_debuggin) --- worker.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/worker.py b/worker.py index 2c3e078a..569bb89d 100644 --- a/worker.py +++ b/worker.py @@ -51,8 +51,7 @@ def __init__(self, job, vmms, jobQueue, preallocator, preVM: TangoMachine): # Worker helper functions # def __del__(self): - if self.job.stopBefore == "": # We don't want to cleanup the VM if we are stopping early for debugging - assert self.cleanupStatus, "Worker must call detachVM before returning" + assert self.cleanupStatus, "Worker must call detachVM before returning" def detachVM(self, detachMethod: DetachMethod): From de383297745dcba1b27d70aaf55aeae62a683857 Mon Sep 17 00:00:00 2001 From: Anthony Yip Date: Mon, 27 Oct 2025 01:12:56 -0400 Subject: [PATCH 18/22] more todos --- tangoObjects.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tangoObjects.py b/tangoObjects.py index 63ed8f99..adfa71db 100644 --- a/tangoObjects.py +++ b/tangoObjects.py @@ -106,7 +106,7 @@ def __init__( stopBefore="", ): self._assigned = False - self._retries = 0 + self._retries: int = 0 self._vm = vm if input is None: @@ -130,6 +130,7 @@ def __repr__(self): self.syncRemote() return f"ID: {self.id} - Name: {self.name}" + # TODO: reduce code size/duplication by setting TangoJob as a dataclass # Getters for private variables @property def assigned(self): From 299e9998eaa2b19f664042e51de6f4b6050946b4 Mon Sep 17 00:00:00 2001 From: Anthony Yip Date: Tue, 28 Oct 2025 15:21:47 -0400 Subject: [PATCH 19/22] stop Before bug fix --- tangoObjects.py | 7 ++++++- worker.py | 16 ++++++++-------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/tangoObjects.py b/tangoObjects.py index adfa71db..b0ce2da0 100644 --- a/tangoObjects.py +++ b/tangoObjects.py @@ -124,7 +124,7 @@ def __init__( self._accessKeyId = accessKeyId self._accessKey = accessKey self._disableNetwork = disableNetwork - self._stopBefore = "stopBefore" + self._stopBefore = stopBefore def __repr__(self): self.syncRemote() @@ -257,6 +257,11 @@ def setTimeout(self, new_timeout): self._timeout = new_timeout self.updateRemote() + def setKeepForDebugging(self, keep_for_debugging: bool): + self.syncRemote() + self._vm.keep_for_debugging = keep_for_debugging + self.updateRemote() + # Private method def __updateSelf(self, other_job): self._assigned = other_job._assigned diff --git a/worker.py b/worker.py index 569bb89d..c68400f0 100644 --- a/worker.py +++ b/worker.py @@ -246,7 +246,7 @@ def run(self): self.log.debug("Waiting for VM") if self.job.stopBefore == "waitvm": msg = "Execution stopped before %s" % self.job.stopBefore - self.job.vm.keep_for_debugging = True + self.job.setKeepForDebugging(True) self.afterJobExecution(hdrfile, msg, detachMethod=DetachMethod.DESTROY_AND_REPLACE) return ret["waitvm"] = self.vmms.waitVM(vm, Config.WAITVM_TIMEOUT) @@ -281,11 +281,11 @@ def run(self): self.job.id, ) ) - if (self.job.stopBefore == "copyin"): msg = "Execution stopped before %s" % self.job.stopBefore - self.job.vm.keep_for_debugging = True + self.job.setKeepForDebugging(True) self.afterJobExecution(hdrfile, msg, detachMethod=DetachMethod.DESTROY_AND_REPLACE) + self.log.debug(msg) return # Copy input files to VM self.log.debug(f"Before copyIn: ret[copyin] = {ret['copyin']}, job_id: {str(self.job.id)}") @@ -296,7 +296,7 @@ def run(self): Config.copyin_errors += 1 msg = "Copy in to VM failed (status=%d)" % (ret["copyin"]) self.job.vm.notes = str(self.job.id) + "_" + self.job.name - self.job.vm.keep_for_debugging = True + self.job.setKeepForDebugging(True) self.log.debug(msg) self.rescheduleJob( hdrfile, @@ -316,7 +316,7 @@ def run(self): if (self.job.stopBefore == "runjob"): msg = "Execution stopped before %s" % self.job.stopBefore - self.job.vm.keep_for_debugging = True + self.job.setKeepForDebugging(True) self.afterJobExecution(hdrfile, msg, detachMethod=DetachMethod.DESTROY_AND_REPLACE) return # Run the job on the virtual machine @@ -339,7 +339,7 @@ def run(self): msg = "RunJob: OS error while running job on VM" # TODO: do we need to not reschedule the job? self.job.vm.notes = str(self.job.id) + "_" + self.job.name - self.job.vm.keep_for_debugging = True + self.job.setKeepForDebugging(True) elif ret["runjob"] == -1: Config.runjob_timeouts += 1 # TODO: difference between 2 and -1? @@ -366,7 +366,7 @@ def run(self): if (self.job.stopBefore == "copyout"): msg = "Execution stopped before %s" % self.job.stopBefore - self.job.vm.keep_for_debugging = True + self.job.setKeepForDebugging(True) self.afterJobExecution(hdrfile, msg, detachMethod=DetachMethod.DESTROY_AND_REPLACE) return # Copy the output back. @@ -417,7 +417,7 @@ def run(self): # msg = "Error: OS error while running job on VM" # detachMethod = DetachMethod.DESTROY_WITHOUT_REPLACEMENT # self.job.vm.notes = str(self.job.id) + "_" + self.job.name - # self.job.vm.keep_for_debugging = True + # self.job.setKeepForDebugging(True) # else: # This should never happen # msg = "Error: Unknown autodriver error (status=%d)" % ( # ret["runjob"] From 2325e54236133135eaeed797211b2440cb18cedd Mon Sep 17 00:00:00 2001 From: Anthony Yip Date: Tue, 11 Nov 2025 15:36:35 -0500 Subject: [PATCH 20/22] empty line --- vmms/localDocker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/vmms/localDocker.py b/vmms/localDocker.py index 8ccfb7d5..338bc6d4 100644 --- a/vmms/localDocker.py +++ b/vmms/localDocker.py @@ -17,6 +17,7 @@ from vmms.interface import VMMSInterface + def timeout(command, time_out=1): """timeout - Run a unix command with a timeout. Return -1 on timeout, otherwise return the return value from the command, which From 5b34e24f76e39cc19cc2a6fb48328d67ef311f1f Mon Sep 17 00:00:00 2001 From: Anthony Yip Date: Tue, 18 Nov 2025 12:52:13 -0500 Subject: [PATCH 21/22] replaced _clean with make_empty for both TangoQueue and TangoDictionary --- jobQueue.py | 8 ++++---- tangoObjects.py | 17 ++++------------- 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/jobQueue.py b/jobQueue.py index 34c3ae5b..d00969fc 100644 --- a/jobQueue.py +++ b/jobQueue.py @@ -341,13 +341,13 @@ def getInfo(self): return info - def reset(self): + def reset(self) -> None: """reset - resets and clears all the internal dictionaries and queues """ - self.liveJobs._clean() - self.deadJobs._clean() - self.unassignedJobs._clean() + self.liveJobs.make_empty() + self.deadJobs.make_empty() + self.unassignedJobs.make_empty() def getNextPendingJob(self) -> TangoJob: """Gets the next unassigned live job. Note that this is a diff --git a/tangoObjects.py b/tangoObjects.py index b0ce2da0..191bf141 100644 --- a/tangoObjects.py +++ b/tangoObjects.py @@ -371,8 +371,6 @@ def get_nowait(self) -> Optional[T]: ... def remove(self, item) -> None: ... - def _clean(self) -> None: - ... def make_empty(self) -> None: ... @@ -391,14 +389,10 @@ def remove(self, value): with self.mutex: self.queue.remove(value) - def _clean(self): + def make_empty(self): with self.mutex: self.queue.clear() - def make_empty(self): - self._clean() - - class TangoRemoteQueue(TangoQueue): """Simple Queue with Redis Backend""" @@ -458,9 +452,6 @@ def remove(self, item): pickled_item = pickle.dumps(item) return self.__db.lrem(self.key, 0, pickled_item) - def _clean(self): - self.__db.delete(self.key) - def make_empty(self) -> None: self.__db.delete(self.key) @@ -502,7 +493,7 @@ def values(self) -> list[T]: def delete(self, id: str) -> None: ... @abstractmethod - def _clean(self) -> None: + def make_empty(self) -> None: ... @abstractmethod def items(self) -> list[tuple[str, T]]: @@ -568,7 +559,7 @@ def values(self): def delete(self, id): self.r.hdel(self.hash_name, id) - def _clean(self): + def make_empty(self): # only for testing self.r.delete(self.hash_name) @@ -629,6 +620,6 @@ def items(self): ] ) - def _clean(self): + def make_empty(self): # only for testing return From ee45c4d9c024271e8dfc1398277e493965404811 Mon Sep 17 00:00:00 2001 From: Anthony Yip Date: Tue, 18 Nov 2025 12:52:59 -0500 Subject: [PATCH 22/22] removed dead code --- worker.py | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/worker.py b/worker.py index c68400f0..cf62b61f 100644 --- a/worker.py +++ b/worker.py @@ -402,30 +402,6 @@ def run(self): # with an explanatory message msg = "Success: Autodriver returned normally" self.afterJobExecution(hdrfile, msg, detachMethod=DetachMethod.RETURN_TO_POOL) - # if ret["copyin"] != 0: - # msg = "Error: Copy in to VM failed (status=%d)" % (ret["copyin"]) - # elif ret["runjob"] != 0: - # if ret["runjob"] == 1: # This should never happen - # msg = "Error: Autodriver usage error (status=%d)" % (ret["runjob"]) - # elif ret["runjob"] == 2: - # msg = "Error: Job timed out after %d seconds" % (self.job.timeout) - # elif ret["runjob"] == 3: # EXIT_OSERROR in Autodriver - # # Abnormal job termination (Autodriver encountered an OS - # # error). Assume that the VM is damaged. Destroy this VM - # # and do not retry the job since the job may have damaged - # # the VM. - # msg = "Error: OS error while running job on VM" - # detachMethod = DetachMethod.DESTROY_WITHOUT_REPLACEMENT - # self.job.vm.notes = str(self.job.id) + "_" + self.job.name - # self.job.setKeepForDebugging(True) - # else: # This should never happen - # msg = "Error: Unknown autodriver error (status=%d)" % ( - # ret["runjob"] - # ) - - # elif ret["copyout"] != 0: - # msg += "Error: Copy out from VM failed (status=%d)" % (ret["copyout"]) - return #