diff --git a/clients/tango-cli.py b/clients/tango-cli.py index 701e66cd..d51d2da3 100755 --- a/clients/tango-cli.py +++ b/clients/tango-cli.py @@ -28,6 +28,7 @@ def get_arg(name, default=None): @dataclass class RequestObj: """Dataclass for job request objects""" + image: str files: str timeout: int @@ -46,6 +47,7 @@ class RequestObj: @dataclass class VmObj: """Dataclass for VM allocation objects""" + vmms: str cores: int memory: int @@ -161,45 +163,48 @@ class VmObj: parser.add_argument("--accessKeyId", default="", help="AWS account access key ID") parser.add_argument("--accessKey", default="", help="AWS account access key content") parser.add_argument("--instanceType", default="", help="AWS EC2 instance type") -parser.add_argument("--stopBefore", default="", help="Stops the worker before a function is executed") +parser.add_argument( + "--stopBefore", default="", help="Stops the worker before a function is executed" +) + def checkKey(): - if get_arg('key') is None: + if get_arg("key") is None: print("Key must be specified with -k") return -1 return 0 def checkCourselab(): - if get_arg('courselab') is None: + if get_arg("courselab") is None: print("Courselab must be specified with -l") return -1 return 0 def checkFilename(): - if get_arg('filename') is None: + if get_arg("filename") is None: print("Filename must be specified with --filename") return -1 return 0 def checkInfiles(): - if get_arg('infiles') is None: + if get_arg("infiles") is None: print("Input files must be specified with --infiles") return -1 return 0 def checkDeadjobs(): - if get_arg('deadJobs') is None: + if get_arg("deadJobs") is None: print("Deadjobs must be specified with --deadJobs") return -1 return 0 def checkImageName(): - if get_arg('imageName') is None: + if get_arg("imageName") is None: print("Image name must be specified with --imageName") return -1 return 0 @@ -218,18 +223,24 @@ def tango_open(): response = requests.get( "%s://%s:%d/open/%s/%s/" - % (_tango_protocol, get_arg('server'), get_arg('port'), get_arg('key'), get_arg('courselab')) + % ( + _tango_protocol, + get_arg("server"), + get_arg("port"), + get_arg("key"), + get_arg("courselab"), + ) ) print( "Sent request to %s:%d/open/%s/%s/" - % (get_arg('server'), get_arg('port'), get_arg('key'), get_arg('courselab')) + % (get_arg("server"), get_arg("port"), get_arg("key"), get_arg("courselab")) ) print(response.text) except Exception as err: print( "Failed to send request to %s:%d/open/%s/%s/" - % (get_arg('server'), get_arg('port'), get_arg('key'), get_arg('courselab')) + % (get_arg("server"), get_arg("port"), get_arg("key"), get_arg("courselab")) ) print(str(err)) sys.exit(0) @@ -244,28 +255,46 @@ def tango_upload(): if res != 0: raise Exception("Invalid usage: [upload] " + upload_help) - dirs = get_arg('filename').split("/") + dirs = get_arg("filename").split("/") filename = dirs[len(dirs) - 1] header = {"Filename": filename} - f = open(get_arg('filename'), 'rb') + f = open(get_arg("filename"), "rb") response = requests.post( "%s://%s:%d/upload/%s/%s/" - % (_tango_protocol, get_arg('server'), get_arg('port'), get_arg('key'), get_arg('courselab')), + % ( + _tango_protocol, + get_arg("server"), + get_arg("port"), + get_arg("key"), + get_arg("courselab"), + ), data=f.read(), headers=header, ) f.close() print( "Sent request to %s:%d/upload/%s/%s/ filename=%s" - % (get_arg('server'), get_arg('port'), get_arg('key'), get_arg('courselab'), get_arg('filename')) + % ( + get_arg("server"), + get_arg("port"), + get_arg("key"), + get_arg("courselab"), + get_arg("filename"), + ) ) print(response.text) except Exception as err: print( "Failed to send request to %s:%d/upload/%s/%s/ filename=%s" - % (get_arg('server'), get_arg('port'), get_arg('key'), get_arg('courselab'), get_arg('filename')) + % ( + get_arg("server"), + get_arg("port"), + get_arg("key"), + get_arg("courselab"), + get_arg("filename"), + ) ) print(str(err)) sys.exit(0) @@ -276,42 +305,59 @@ def tango_upload(): def tango_addJob(): try: - requestObj = {} res = checkKey() + checkCourselab() + checkInfiles() if res != 0: raise Exception("Invalid usage: [addJob] " + addJob_help) requestObj = RequestObj( - image=get_arg('image'), - files=get_arg('infiles'), - timeout=get_arg('timeout'), - max_kb=get_arg('maxsize'), - output_file=get_arg('outputFile'), - jobName=get_arg('jobname'), - accessKeyId=get_arg('accessKeyId'), - accessKey=get_arg('accessKey'), - disable_network=get_arg('disableNetwork'), - instanceType=get_arg('instanceType'), - stopBefore=get_arg('stopBefore'), - notifyURL=get_arg('notifyURL'), - callback_url=get_arg('callbackURL'), + image=get_arg("image"), + files=get_arg("infiles"), + timeout=get_arg("timeout"), + max_kb=get_arg("maxsize"), + output_file=get_arg("outputFile"), + jobName=get_arg("jobname"), + accessKeyId=get_arg("accessKeyId"), + accessKey=get_arg("accessKey"), + disable_network=get_arg("disableNetwork"), + instanceType=get_arg("instanceType"), + stopBefore=get_arg("stopBefore"), + notifyURL=get_arg("notifyURL"), + callback_url=get_arg("callbackURL"), ) response = requests.post( "%s://%s:%d/addJob/%s/%s/" - % (_tango_protocol, get_arg('server'), get_arg('port'), get_arg('key'), get_arg('courselab')), + % ( + _tango_protocol, + get_arg("server"), + get_arg("port"), + get_arg("key"), + get_arg("courselab"), + ), data=json.dumps(asdict(requestObj)), ) print( "Sent request to %s:%d/addJob/%s/%s/ \t jobObj=%s" - % (get_arg('server'), get_arg('port'), get_arg('key'), get_arg('courselab'), json.dumps(asdict(requestObj))) + % ( + get_arg("server"), + get_arg("port"), + get_arg("key"), + get_arg("courselab"), + json.dumps(asdict(requestObj)), + ) ) print(response.text) except Exception as err: print( "Failed to send request to %s:%d/addJob/%s/%s/ \t jobObj=%s" - % (get_arg('server'), get_arg('port'), get_arg('key'), get_arg('courselab'), json.dumps(asdict(requestObj)) if 'requestObj' in locals() else 'N/A') + % ( + get_arg("server"), + get_arg("port"), + get_arg("key"), + get_arg("courselab"), + json.dumps(asdict(requestObj)) if "requestObj" in locals() else "N/A", + ) ) print(str(err)) sys.exit(0) @@ -326,19 +372,19 @@ def tango_getPartialOutput(): "%s://%s:%d/getPartialOutput/%s/%s/" % ( _tango_protocol, - get_arg('server'), - get_arg('port'), - get_arg('key'), - get_arg('jobid'), + get_arg("server"), + get_arg("port"), + get_arg("key"), + get_arg("jobid"), ) ) print( "Sent request to %s:%d/getPartialOutput/%s/%s/" % ( - get_arg('server'), - get_arg('port'), - get_arg('key'), - get_arg('jobid'), + get_arg("server"), + get_arg("port"), + get_arg("key"), + get_arg("jobid"), ) ) print(response.text) @@ -346,10 +392,10 @@ def tango_getPartialOutput(): print( "Failed to send request to %s:%d/getPartialOutput/%s/%s/" % ( - get_arg('server'), - get_arg('port'), - get_arg('key'), - get_arg('jobid'), + get_arg("server"), + get_arg("port"), + get_arg("key"), + get_arg("jobid"), ) ) print(str(err)) @@ -369,21 +415,21 @@ def tango_poll(): "%s://%s:%d/poll/%s/%s/%s/" % ( _tango_protocol, - get_arg('server'), - get_arg('port'), - get_arg('key'), - get_arg('courselab'), - urllib.parse.quote(get_arg('outputFile')), + get_arg("server"), + get_arg("port"), + get_arg("key"), + get_arg("courselab"), + urllib.parse.quote(get_arg("outputFile")), ) ) print( "Sent request to %s:%d/poll/%s/%s/%s/" % ( - get_arg('server'), - get_arg('port'), - get_arg('key'), - get_arg('courselab'), - urllib.parse.quote(get_arg('outputFile')), + get_arg("server"), + get_arg("port"), + get_arg("key"), + get_arg("courselab"), + urllib.parse.quote(get_arg("outputFile")), ) ) print(response.text) @@ -392,11 +438,11 @@ def tango_poll(): print( "Failed to send request to %s:%d/poll/%s/%s/%s/" % ( - get_arg('server'), - get_arg('port'), - get_arg('key'), - get_arg('courselab'), - urllib.parse.quote(get_arg('outputFile')), + get_arg("server"), + get_arg("port"), + get_arg("key"), + get_arg("courselab"), + urllib.parse.quote(get_arg("outputFile")), ) ) print(str(err)) @@ -413,15 +459,19 @@ def tango_info(): raise Exception("Invalid usage: [info] " + info_help) response = requests.get( - "%s://%s:%d/info/%s/" % (_tango_protocol, get_arg('server'), get_arg('port'), get_arg('key')) + "%s://%s:%d/info/%s/" + % (_tango_protocol, get_arg("server"), get_arg("port"), get_arg("key")) + ) + print( + "Sent request to %s:%d/info/%s/" + % (get_arg("server"), get_arg("port"), get_arg("key")) ) - print("Sent request to %s:%d/info/%s/" % (get_arg('server'), get_arg('port'), get_arg('key'))) print(response.text) except Exception as err: print( "Failed to send request to %s:%d/info/%s/" - % (get_arg('server'), get_arg('port'), get_arg('key')) + % (get_arg("server"), get_arg("port"), get_arg("key")) ) print(str(err)) sys.exit(0) @@ -438,18 +488,24 @@ def tango_jobs(): response = requests.get( "%s://%s:%d/jobs/%s/%d/" - % (_tango_protocol, get_arg('server'), get_arg('port'), get_arg('key'), get_arg('deadJobs')) + % ( + _tango_protocol, + get_arg("server"), + get_arg("port"), + get_arg("key"), + get_arg("deadJobs"), + ) ) print( "Sent request to %s:%d/jobs/%s/%d/" - % (get_arg('server'), get_arg('port'), get_arg('key'), get_arg('deadJobs')) + % (get_arg("server"), get_arg("port"), get_arg("key"), get_arg("deadJobs")) ) print(response.text) except Exception as err: print( "Failed to send request to %s:%d/jobs/%s/%d/" - % (get_arg('server'), get_arg('port'), get_arg('key'), get_arg('deadJobs')) + % (get_arg("server"), get_arg("port"), get_arg("key"), get_arg("deadJobs")) ) print(str(err)) sys.exit(0) @@ -466,18 +522,24 @@ def tango_pool(): response = requests.get( "%s://%s:%d/pool/%s/%s/" - % (_tango_protocol, get_arg('server'), get_arg('port'), get_arg('key'), get_arg('image')) + % ( + _tango_protocol, + get_arg("server"), + get_arg("port"), + get_arg("key"), + get_arg("image"), + ) ) print( "Sent request to %s:%d/pool/%s/%s/" - % (get_arg('server'), get_arg('port'), get_arg('key'), get_arg('image')) + % (get_arg("server"), get_arg("port"), get_arg("key"), get_arg("image")) ) print(response.text) except Exception as err: print( "Failed to send request to %s:%d/pool/%s/%s/" - % (get_arg('server'), get_arg('port'), get_arg('key'), get_arg('image')) + % (get_arg("server"), get_arg("port"), get_arg("key"), get_arg("image")) ) print(str(err)) sys.exit(0) @@ -493,23 +555,30 @@ def tango_prealloc(): if res != 0: raise Exception("Invalid usage: [prealloc] " + prealloc_help) - vmObj["vmms"] = get_arg('vmms') - vmObj["cores"] = get_arg('cores') - vmObj["memory"] = get_arg('memory') + vmObj["vmms"] = get_arg("vmms") + vmObj["cores"] = get_arg("cores") + vmObj["memory"] = get_arg("memory") response = requests.post( "%s://%s:%d/prealloc/%s/%s/%s/" - % (_tango_protocol, get_arg('server'), get_arg('port'), get_arg('key'), get_arg('image'), get_arg('num')), + % ( + _tango_protocol, + get_arg("server"), + get_arg("port"), + get_arg("key"), + get_arg("image"), + get_arg("num"), + ), data=json.dumps(vmObj), ) print( "Sent request to %s:%d/prealloc/%s/%s/%s/ \t vmObj=%s" % ( - get_arg('server'), - get_arg('port'), - get_arg('key'), - get_arg('image'), - get_arg('num'), + get_arg("server"), + get_arg("port"), + get_arg("key"), + get_arg("image"), + get_arg("num"), json.dumps(vmObj), ) ) @@ -519,11 +588,11 @@ def tango_prealloc(): print( "Failed to send request to %s:%d/prealloc/%s/%s/%s/ \t vmObj=%s" % ( - get_arg('server'), - get_arg('port'), - get_arg('key'), - get_arg('image'), - get_arg('num'), + get_arg("server"), + get_arg("port"), + get_arg("key"), + get_arg("image"), + get_arg("num"), json.dumps(vmObj), ) ) @@ -549,21 +618,24 @@ def tango_build(): if res != 0: raise Exception("Invalid usage: [build] " + build_help) - f = open(get_arg('filename'), "rb") - header = {"imageName": get_arg('imageName')} + f = open(get_arg("filename"), "rb") + header = {"imageName": get_arg("imageName")} response = requests.post( "%s://%s:%d/build/%s/" - % (_tango_protocol, get_arg('server'), get_arg('port'), get_arg('key')), + % (_tango_protocol, get_arg("server"), get_arg("port"), get_arg("key")), data=f.read(), headers=header, ) - print("Sent request to %s:%d/build/%s/" % (get_arg('server'), get_arg('port'), get_arg('key'))) + print( + "Sent request to %s:%d/build/%s/" + % (get_arg("server"), get_arg("port"), get_arg("key")) + ) print(response.text) except Exception as err: print( "Failed to send request to %s:%d/build/%s/" - % (get_arg('server'), get_arg('port'), get_arg('key')) + % (get_arg("server"), get_arg("port"), get_arg("key")) ) print(str(err)) sys.exit(0) @@ -573,11 +645,11 @@ def tango_build(): def tango_runJob(): - if get_arg('runJob') is None: + if get_arg("runJob") is None: print("Invalid usage: [runJob]") sys.exit(0) - dir = get_arg('runJob') + dir = get_arg("runJob") infiles = [ file for file in os.listdir(dir) if os.path.isfile(os.path.join(dir, file)) ] @@ -586,7 +658,7 @@ def tango_runJob(): args.jobname += "-0" args.outputFile += "-0" - for i in range(1, get_arg('numJobs') + 1): + for i in range(1, get_arg("numJobs") + 1): print( "----------------------------------------- STARTING JOB " + str(i) @@ -609,27 +681,27 @@ def tango_runJob(): def router(): - if get_arg('open'): + if get_arg("open"): tango_open() - elif get_arg('upload'): + elif get_arg("upload"): tango_upload() - elif get_arg('addJob'): + elif get_arg("addJob"): tango_addJob() - elif get_arg('poll'): + elif get_arg("poll"): tango_poll() - elif get_arg('info'): + elif get_arg("info"): tango_info() - elif get_arg('jobs'): + elif get_arg("jobs"): tango_jobs() - elif get_arg('pool'): + elif get_arg("pool"): tango_pool() - elif get_arg('prealloc'): + elif get_arg("prealloc"): tango_prealloc() - elif get_arg('runJob'): + elif get_arg("runJob"): tango_runJob() - elif get_arg('getPartialOutput'): + elif get_arg("getPartialOutput"): tango_getPartialOutput() - elif get_arg('build'): + elif get_arg("build"): tango_build() @@ -638,32 +710,34 @@ def router(): # args = parser.parse_args() if ( - not get_arg('open') - and not get_arg('upload') - and not get_arg('addJob') - and not get_arg('poll') - and not get_arg('info') - and not get_arg('jobs') - and not get_arg('pool') - and not get_arg('prealloc') - and not get_arg('runJob') - and not get_arg('getPartialOutput') - and not get_arg('build') + not get_arg("open") + and not get_arg("upload") + and not get_arg("addJob") + and not get_arg("poll") + and not get_arg("info") + and not get_arg("jobs") + and not get_arg("pool") + and not get_arg("prealloc") + and not get_arg("runJob") + and not get_arg("getPartialOutput") + and not get_arg("build") ): parser.print_help() sys.exit(0) -if get_arg('ssl'): +if get_arg("ssl"): _tango_protocol = "https" - if get_arg('port') == 3000: + if get_arg("port") == 3000: args.port = 443 try: - response = requests.get("%s://%s:%d/" % (_tango_protocol, get_arg('server'), get_arg('port'))) + response = requests.get( + "%s://%s:%d/" % (_tango_protocol, get_arg("server"), get_arg("port")) + ) response.raise_for_status() except BaseException: - print("Tango not reachable on %s:%d!\n" % (get_arg('server'), get_arg('port'))) + print("Tango not reachable on %s:%d!\n" % (get_arg("server"), get_arg("port"))) sys.exit(0) router() diff --git a/jobManager.py b/jobManager.py index e4e4cb54..a9f85f46 100644 --- a/jobManager.py +++ b/jobManager.py @@ -23,7 +23,7 @@ from tangoObjects import TangoJob, TangoQueue, TangoMachine from typing import List, Tuple from worker import Worker - +from vmms.interface import VMMSInterface class JobManager(object): @@ -66,13 +66,16 @@ def __manage(self) -> None: # Blocks until we get a next job job: TangoJob = self.jobQueue.getNextPendingJob() if not job.accessKey and Config.REUSE_VMS: - self.log.info(f"job has access key {job.accessKey} and is calling reuseVM") + self.log.info( + f"job has access key {job.accessKey} and is calling reuseVM" + ) vm = None while vm is None: vm = self.jobQueue.reuseVM(job) # Sleep for a bit and then check again time.sleep(Config.DISPATCH_PERIOD) + vmms: VMMSInterface try: # if the job is a ec2 vmms job # spin up an ec2 instance for that job @@ -90,9 +93,7 @@ def __manage(self) -> None: self.log.error("ERROR initialization VM: %s", e) self.log.error(traceback.format_exc()) if preVM is None: - raise Exception( - "EC2 SSH VM initialization failed: see log" - ) + raise Exception("EC2 SSH VM initialization failed: see log") else: self.log.info(f"job {job.id} is not an ec2 vmms job") # Try to find a vm on the free list and allocate it to @@ -120,15 +121,15 @@ def __manage(self) -> None: ) # Mark the job assigned self.jobQueue.assignJob(job.id, preVM) - Worker( - job, vmms, self.jobQueue, self.preallocator, preVM - ).start() + Worker(job, vmms, self.jobQueue, self.preallocator, preVM).start() except Exception as err: if job is None: self.log.info("job_manager: job is None") else: - self.log.error("job failed during creation %d %s" % (job.id, str(err))) + self.log.error( + "job failed during creation %d %s" % (job.id, str(err)) + ) self.jobQueue.makeDead(job, str(err)) @@ -144,7 +145,10 @@ def __manage(self) -> None: tango_server.log.debug("Resetting Tango VMs") tango_server.resetTango(tango_server.preallocator.vmms) for key in tango_server.preallocator.machines.keys(): - machine: Tuple[List[TangoMachine], TangoQueue] = ([], TangoQueue.create(key)) + machine: Tuple[List[TangoMachine], TangoQueue] = ( + [], + TangoQueue.create(key), + ) machine[1].make_empty() tango_server.preallocator.machines.set(key, machine) diff --git a/jobQueue.py b/jobQueue.py index d00969fc..56bc2ad7 100644 --- a/jobQueue.py +++ b/jobQueue.py @@ -12,8 +12,10 @@ import time from datetime import datetime -from tangoObjects import TangoDictionary, TangoJob, TangoQueue +from tangoObjects import TangoDictionary, TangoJob, TangoQueue, TangoMachine from config import Config +from preallocator import Preallocator +from typing import Optional # # JobQueue - This class defines the job queue and the functions for @@ -31,7 +33,7 @@ class JobQueue(object): - def __init__(self, preallocator): + def __init__(self, preallocator: Preallocator) -> None: """ Here we maintain several data structures used to keep track of the jobs present for the autograder. @@ -54,10 +56,12 @@ def __init__(self, preallocator): using the makeUnassigned api. """ self.liveJobs: TangoDictionary[TangoJob] = TangoDictionary.create("liveJobs") - self.deadJobs: TangoDictionary[TangoJob] = TangoDictionary.create("deadJobs") - self.unassignedJobs = TangoQueue.create("unassignedLiveJobs") + self.deadJobs: TangoDictionary[TangoJob] = TangoDictionary.create( + "deadJobs" + ) # Servees as a record of both failed and completed jobs + self.unassignedJobs: TangoQueue[int] = TangoQueue.create("unassignedLiveJobs") self.queueLock = threading.Lock() - self.preallocator = preallocator + self.preallocator: Preallocator = preallocator self.log = logging.getLogger("JobQueue") self.nextID = 1 @@ -136,7 +140,7 @@ def add(self, job): # Since we assume that the job is new, we set the number of retries # of this job to 0 - assert(job.retries == 0) + assert job.retries == 0 # Add the job to the queue. Careful not to append the trace until we # know the job has actually been added to the queue. @@ -168,7 +172,8 @@ def add(self, job): return str(job.id) - def addDead(self, job): + # TODO: get rid of this return value, it is not used anywhere + def addDead(self, job) -> int: """addDead - add a job to the dead queue. Called by validateJob when a job validation fails. Returns -1 on failure and the job id on success @@ -246,10 +251,10 @@ def get(self, id): self.log.debug("get| Released lock to job queue.") return job - # TODO: this function is a little weird. It sets the state of job to be "assigned", but not to which worker. + # TODO: this function is a little weird. It sets the state of job to be "assigned", but not to which worker. # TODO: It does assign the job to a particular VM though. # Precondition: jobId is in self.liveJobs - def assignJob(self, jobId, vm=None) -> None: + def assignJob(self, jobId, vm=None): """assignJob - marks a job to be assigned""" self.queueLock.acquire() self.log.debug("assignJob| Acquired lock to job queue.") @@ -272,7 +277,7 @@ def assignJob(self, jobId, vm=None) -> None: # return job # TODO: Rename this job to be more accurate in its description - def unassignJob(self, jobId): + def unassignJob(self, jobId: int) -> None: """unassignJob - marks a job to be unassigned Note: We assume here that a job is to be rescheduled or 'retried' when you unassign it. This retry is done by @@ -282,7 +287,7 @@ def unassignJob(self, jobId): self.log.debug("unassignJob| Acquired lock to job queue.") # Get the current job - job = self.liveJobs.get(jobId) + job = self.liveJobs.getExn(jobId) # Increment the number of retires if job.retries is None: @@ -313,11 +318,11 @@ def makeDead(self, job: TangoJob, reason): if job.id not in self.liveJobs: self.log.error("makeDead| Job ID: %s not found in live jobs" % (job.id)) return -1 - + self.log.info("makeDead| Found job ID: %s in the live queue" % (job.id)) status = 0 self.log.info("Terminated job %s:%s: %s" % (job.name, job.id, reason)) - + # Remove the job from the live jobs dictionary job.deleteFromDict(self.liveJobs) # Add the job to the dead jobs dictionary @@ -356,6 +361,9 @@ def getNextPendingJob(self) -> TangoJob: """ # Blocks till the next item is added id = self.unassignedJobs.get() + assert ( + id is not None + ), ".get with default arguments should block and never return None" self.log.debug("_getNextPendingJob|Acquiring lock to job queue.") self.queueLock.acquire() @@ -365,7 +373,6 @@ def getNextPendingJob(self) -> TangoJob: job = self.liveJobs.get(id) if job is None: raise Exception("Cannot find unassigned job in live jobs") - self.log.debug("getNextPendingJob| Releasing lock to job queue.") self.queueLock.release() self.log.debug("getNextPendingJob| Released lock to job queue.") @@ -393,5 +400,3 @@ def reuseVM(self, job): return job.vm else: raise Exception("Job assigned without vm") - - diff --git a/mypy.ini b/mypy.ini index f6332a89..f607e0a5 100644 --- a/mypy.ini +++ b/mypy.ini @@ -4,5 +4,8 @@ explicit_package_bases = True [mypy-vmms.tashiSSH] ignore_errors = True -[mypy-preallocator] -ignore_errors = True \ No newline at end of file +[mypy-vmms.distDocker] +ignore_errors = True + +[mypy-tests.*] +ignore_errors = True diff --git a/preallocator.py b/preallocator.py index 69291933..da7f8b1d 100644 --- a/preallocator.py +++ b/preallocator.py @@ -8,7 +8,8 @@ from tangoObjects import TangoDictionary, TangoQueue, TangoIntValue, TangoMachine from config import Config -from typing import Tuple, List +from typing import Tuple, List, Dict +from vmms.interface import VMMSInterface # # Preallocator - This class maintains a pool of active VMs for future @@ -22,21 +23,23 @@ class Preallocator(object): - def __init__(self, vmms): - self.machines: TangoDictionary[Tuple[List[TangoMachine], TangoQueue]] = TangoDictionary.create("machines") + def __init__(self, vmms: Dict[str, VMMSInterface]) -> None: + self.machines: TangoDictionary[ + Tuple[List[TangoMachine], TangoQueue[TangoMachine]] + ] = TangoDictionary.create("machines") self.lock = threading.Lock() self.nextID = TangoIntValue("nextID", 1000) - self.vmms = vmms + self.vmms: Dict[str, VMMSInterface] = vmms self.log = logging.getLogger("Preallocator") - def poolSize(self, vmName): + def poolSize(self, vmName: str): """poolSize - returns the size of the vmName pool, for external callers""" if vmName not in self.machines: return 0 else: return len(self.machines.getExn(vmName)[0]) - def update(self, vm, num): + def update(self, vm: TangoMachine, num: int): """update - Updates the number of machines of a certain type to be preallocated. @@ -47,13 +50,13 @@ def update(self, vm, num): """ self.lock.acquire() if vm.name not in self.machines: - initial_queue = TangoQueue.create(vm.name) + initial_queue: TangoQueue[TangoMachine] = TangoQueue.create(vm.name) initial_queue.make_empty() self.machines.set(vm.name, ([], initial_queue)) self.log.debug("Creating empty pool of %s instances" % (vm.name)) self.lock.release() - delta = num - len(self.machines.get(vm.name)[0]) + delta = num - len(self.machines.getExn(vm.name)[0]) if delta > 0: # We need more self.machines, spin them up. self.log.debug("update: Creating %d new %s instances" % (delta, vm.name)) @@ -69,14 +72,14 @@ def update(self, vm, num): # If delta == 0 then we are the perfect number! - def allocVM(self, vmName): + def allocVM(self, vmName: str): """allocVM - Allocate a VM from the free list""" vm = None if vmName in self.machines: self.lock.acquire() - if not self.machines.get(vmName)[1].empty(): - vm = self.machines.get(vmName)[1].get_nowait() + if not self.machines.getExn(vmName)[1].empty(): + vm = self.machines.getExn(vmName)[1].get_nowait() self.lock.release() @@ -86,14 +89,14 @@ def allocVM(self, vmName): return vm - def freeVM(self, vm): + def freeVM(self, vm: TangoMachine): """freeVM - Returns a VM instance to the free list""" # Sanity check: Return a VM to the free list only if it is # still a member of the pool. not_found = False self.lock.acquire() - if vm and vm.id in self.machines.get(vm.name)[0]: - machine = self.machines.get(vm.name) + if vm and vm.id in self.machines.getExn(vm.name)[0]: + machine = self.machines.getExn(vm.name) machine[1].put(vm) self.machines.set(vm.name, machine) else: @@ -105,18 +108,18 @@ def freeVM(self, vm): vmms = self.vmms[vm.vmms] vmms.safeDestroyVM(vm) - def addVM(self, vm): + def addVM(self, vm: TangoMachine): """addVM - add a particular VM instance to the pool""" self.lock.acquire() - machine = self.machines.get(vm.name) + machine = self.machines.getExn(vm.name) machine[0].append(vm.id) self.machines.set(vm.name, machine) self.lock.release() - def removeVM(self, vm): + def removeVM(self, vm: TangoMachine): """removeVM - remove a particular VM instance from the pool""" self.lock.acquire() - machine = self.machines.get(vm.name) + machine = self.machines.getExn(vm.name) machine[0].remove(vm.id) self.machines.set(vm.name, machine) self.lock.release() @@ -137,7 +140,7 @@ def _getNextID(self): self.lock.release() return id - def __create(self, vm, cnt): + def __create(self, vm: TangoMachine, cnt: int): """__create - Creates count VMs and adds them to the pool This function should always be called in a thread since it @@ -157,7 +160,7 @@ def __create(self, vm, cnt): self.freeVM(newVM) self.log.debug("__create: Added vm %s to pool %s " % (newVM.id, newVM.name)) - def __destroy(self, vm): + def __destroy(self, vm: TangoMachine): """__destroy - Removes a VM from the pool If the user asks for fewer preallocated VMs, then we will @@ -167,7 +170,7 @@ def __destroy(self, vm): the free list is empty. """ self.lock.acquire() - dieVM = self.machines.get(vm.name)[1].get_nowait() + dieVM = self.machines.getExn(vm.name)[1].get_nowait() self.lock.release() if dieVM: @@ -175,7 +178,7 @@ def __destroy(self, vm): vmms = self.vmms[vm.vmms] vmms.safeDestroyVM(dieVM) - def createVM(self, vm): + def createVM(self, vm: TangoMachine): """createVM - Called in non-thread context to create a single VM and add it to the pool """ @@ -192,7 +195,7 @@ def createVM(self, vm): self.freeVM(newVM) self.log.debug("createVM: Added vm %s to pool %s" % (newVM.id, newVM.name)) - def destroyVM(self, vmName, id): + def destroyVM(self, vmName: str, id: int): """destroyVM - Called by the delVM API function to remove and destroy a particular VM instance from a pool. We only allow this function when the system is queiscent (pool size == free @@ -203,12 +206,14 @@ def destroyVM(self, vmName, id): dieVM = None self.lock.acquire() - size = self.machines.get(vmName)[1].qsize() - if size == len(self.machines.get(vmName)[0]): - for i in range(size): - vm = self.machines.get(vmName)[1].get_nowait() + size = self.machines.getExn(vmName)[1].qsize() + if size == len(self.machines.getExn(vmName)[0]): + for _ in range(size): + vm = self.machines.getExn(vmName)[1].get_nowait() + if vm is None: + break if vm.id != id: - machine = self.machines.get(vmName) + machine = self.machines.getExn(vmName) machine[1].put(vm) self.machines.set(vmName, machine) else: @@ -217,7 +222,7 @@ def destroyVM(self, vmName, id): if dieVM: self.removeVM(dieVM) - vmms = self.vmms[vm.vmms] + vmms = self.vmms[dieVM.vmms] vmms.safeDestroyVM(dieVM) return 0 else: @@ -229,9 +234,10 @@ def getAllPools(self): result[vmName] = self.getPool(vmName) return result - def getPool(self, vmName): + # TODO: replace with a named tuple + def getPool(self, vmName: str): """getPool - returns the members of a pool and its free list""" - result = {} + result: Dict[str, List[TangoMachine]] = {} if vmName not in self.machines: return result @@ -239,15 +245,17 @@ def getPool(self, vmName): result["free"] = [] free_list = [] self.lock.acquire() - size = self.machines.get(vmName)[1].qsize() - for i in range(size): - vm = self.machines.get(vmName)[1].get_nowait() + size = self.machines.getExn(vmName)[1].qsize() + for _ in range(size): + vm = self.machines.getExn(vmName)[1].get_nowait() + if vm is None: + break free_list.append(vm.id) - machine = self.machines.get(vmName) + machine = self.machines.getExn(vmName) machine[1].put(vm) self.machines.set(vmName, machine) self.lock.release() - result["total"] = self.machines.get(vmName)[0] + result["total"] = self.machines.getExn(vmName)[0] result["free"] = free_list return result diff --git a/restful_tango/server.py b/restful_tango/server.py index b31f0e8b..5b4c495d 100755 --- a/restful_tango/server.py +++ b/restful_tango/server.py @@ -46,7 +46,9 @@ def prepare(self): if not os.path.exists(tempdir): os.mkdir(tempdir, 0o700) if os.path.exists(tempdir) and not os.path.isdir(tempdir): - tangoREST.log("Cannot process uploads, %s is not a directory" % (tempdir,)) + tangoREST.log.error( + "Cannot process uploads, %s is not a directory" % (tempdir,) + ) return self.send_error() self.tempfile = NamedTemporaryFile(prefix="upload", dir=tempdir, delete=False) self.hasher = hashlib.md5() @@ -129,7 +131,9 @@ def prepare(self): if not os.path.exists(tempdir): os.mkdir(tempdir, 0o700) if os.path.exists(tempdir) and not os.path.isdir(tempdir): - tangoREST.log("Cannot process uploads, %s is not a directory" % (tempdir,)) + tangoREST.log.error( + "Cannot process uploads, %s is not a directory" % (tempdir,) + ) return self.send_error() self.tempfile = NamedTemporaryFile(prefix="docker", dir=tempdir, delete=False) diff --git a/restful_tango/tangoREST.py b/restful_tango/tangoREST.py index 0e67b0b2..1b0f9881 100644 --- a/restful_tango/tangoREST.py +++ b/restful_tango/tangoREST.py @@ -195,7 +195,7 @@ def convertJobObj(self, dirName, jobObj): accessKey=accessKey, accessKeyId=accessKeyId, disableNetwork=disableNetwork, - stopBefore=stopBefore + stopBefore=stopBefore, ) self.log.debug("inputFiles: %s" % [file.localFile for file in input]) @@ -306,7 +306,9 @@ def upload(self, key, courselab, file, tempfile, fileMD5): os.unlink(tempfile) return self.status.wrong_courselab except Exception as e: - exc_type, exc_obj, exc_tb = sys.exc_info() + exc_type, _, exc_tb = sys.exc_info() + assert exc_type is not None + assert exc_tb is not None # currently handling an exception fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] print(exc_type, fname, exc_tb.tb_lineno) self.log.error("upload request failed: %s" % str(e)) @@ -335,7 +337,9 @@ def addJob(self, key, courselab, jobStr): result["jobId"] = jobId return result except Exception as e: - exc_type, exc_obj, exc_tb = sys.exc_info() + exc_type, _, exc_tb = sys.exc_info() + assert exc_type is not None + assert exc_tb is not None # currently handling an exception fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] print(exc_type, fname, exc_tb.tb_lineno) self.log.error("addJob request failed: %s" % str(e)) diff --git a/tango.py b/tango.py index 3f94cd12..a8cfa7dd 100755 --- a/tango.py +++ b/tango.py @@ -48,16 +48,17 @@ from jobQueue import JobQueue from tangoObjects import TangoJob from config import Config +from vmms.interface import VMMSInterface class TangoServer(object): """TangoServer - Implements the API functions that the server accepts""" - def __init__(self): + def __init__(self) -> None: self.daemon = True - vmms = None + vmms: VMMSInterface if Config.VMMS_NAME == "tashiSSH": from vmms.tashiSSH import TashiSSH @@ -354,15 +355,6 @@ def __validateJob(self, job, vmms): ) errors += 1 - # Check for max output file size parameter - if not job.maxOutputFileSize: - self.log.debug( - "validateJob: Setting job.maxOutputFileSize " - "to default value: %d bytes", - Config.MAX_OUTPUT_FILE_SIZE, - ) - job.maxOutputFileSize = Config.MAX_OUTPUT_FILE_SIZE - # Check the list of input files hasMakefile = False for inputFile in job.input: diff --git a/tangoObjects.py b/tangoObjects.py index 191bf141..37dbf0dd 100644 --- a/tangoObjects.py +++ b/tangoObjects.py @@ -1,4 +1,5 @@ from __future__ import annotations + # tangoREST.py # # Implements objects used to pass state within Tango. @@ -7,7 +8,7 @@ from queue import Queue import pickle import redis -from typing import Optional, Protocol, TypeVar +from typing import Optional, Protocol, TypeVar, Union from abc import abstractmethod redisConnection = None @@ -91,9 +92,10 @@ class TangoJob(object): TangoJob - A job that is to be run on a TangoMachine """ + # TODO: do we really want all of these default values? def __init__( self, - vm: Optional[TangoMachine] = None, + vm: TangoMachine, outputFile=None, name=None, input=None, @@ -108,7 +110,7 @@ def __init__( self._assigned = False self._retries: int = 0 - self._vm = vm + self._vm: TangoMachine = vm if input is None: self._input = [] else: @@ -117,7 +119,9 @@ def __init__( self._outputFile = outputFile self._name = name self._notifyURL = notifyURL - self._timeout = timeout # How long to run the autodriver on the job for before timing out. + self._timeout = ( + timeout # How long to run the autodriver on the job for before timing out. + ) self._trace: list[str] = [] self._maxOutputFileSize = maxOutputFileSize self._remoteLocation: Optional[str] = None @@ -125,18 +129,21 @@ def __init__( self._accessKey = accessKey self._disableNetwork = disableNetwork self._stopBefore = stopBefore + self._id: Optional[ + int + ] = None # uninitialized until it gets added to either the live or dead queue def __repr__(self): self.syncRemote() - return f"ID: {self.id} - Name: {self.name}" - + return f"ID: {self._id} - Name: {self.name}" + # TODO: reduce code size/duplication by setting TangoJob as a dataclass # Getters for private variables @property def assigned(self): - self.syncRemote() # Is it necessary to sync here? + self.syncRemote() # Is it necessary to sync here? return self._assigned - + @property def retries(self): self.syncRemote() @@ -146,67 +153,72 @@ def retries(self): def vm(self): self.syncRemote() return self._vm - + @property def input(self): self.syncRemote() return self._input - + @property def outputFile(self): self.syncRemote() return self._outputFile - + @property def name(self): self.syncRemote() return self._name - + @property def notifyURL(self): self.syncRemote() return self._notifyURL - + @property def timeout(self): self.syncRemote() return self._timeout - + @property def trace(self): self.syncRemote() return self._trace - + @property def maxOutputFileSize(self): self.syncRemote() return self._maxOutputFileSize - + @property def remoteLocation(self): self.syncRemote() return self._remoteLocation - + @property def accessKeyId(self): self.syncRemote() return self._accessKeyId - + @property def accessKey(self): self.syncRemote() return self._accessKey - + @property def disableNetwork(self): self.syncRemote() return self._disableNetwork - + @property def stopBefore(self): self.syncRemote() return self._stopBefore - + + @property + def id(self) -> int: + self.syncRemote() + assert self._id is not None, "Job ID is not set, add it to the job queue first" + return self._id def makeAssigned(self): self.syncRemote() @@ -217,13 +229,13 @@ def resetRetries(self): self.syncRemote() self._retries = 0 self.updateRemote() - + def incrementRetries(self): self.syncRemote() self._retries += 1 self.updateRemote() - def makeVM(self, vm): + def makeVM(self, vm: TangoMachine) -> None: self.syncRemote() self._vm = vm self.updateRemote() @@ -242,8 +254,8 @@ def appendTrace(self, trace_str): self._trace.append(trace_str) self.updateRemote() - def setId(self, new_id): - self.id = new_id + def setId(self, new_id: int) -> None: + self._id = new_id if self._remoteLocation is not None: dict_hash = self._remoteLocation.split(":")[0] key = self._remoteLocation.split(":")[1] @@ -251,16 +263,17 @@ def setId(self, new_id): dictionary.delete(key) self._remoteLocation = dict_hash + ":" + str(new_id) self.updateRemote() - + def setTimeout(self, new_timeout): self.syncRemote() self._timeout = new_timeout self.updateRemote() def setKeepForDebugging(self, keep_for_debugging: bool): - self.syncRemote() - self._vm.keep_for_debugging = keep_for_debugging - self.updateRemote() + if self._vm is not None: + self.syncRemote() + self._vm.keep_for_debugging = keep_for_debugging + self.updateRemote() # Private method def __updateSelf(self, other_job): @@ -274,37 +287,40 @@ def __updateSelf(self, other_job): self._timeout = other_job._timeout self._trace = other_job._trace self._maxOutputFileSize = other_job._maxOutputFileSize + self._id = other_job._id - - def syncRemote(self): + def syncRemote(self) -> None: if Config.USE_REDIS and self._remoteLocation is not None: dict_hash = self._remoteLocation.split(":")[0] key = self._remoteLocation.split(":")[1] dictionary: TangoDictionary[TangoJob] = TangoDictionary.create(dict_hash) - temp_job = dictionary.get(key) # Key should be in dictionary + temp_job = dictionary.get(key) # Key should be in dictionary if temp_job is None: - print(f"Job {key} not found in dictionary {dict_hash}") # TODO: add better error handling for TangoJob + print( + f"Job {key} not found in dictionary {dict_hash}" + ) # TODO: add better error handling for TangoJob return self.__updateSelf(temp_job) - def updateRemote(self): + def updateRemote(self) -> None: if Config.USE_REDIS and self._remoteLocation is not None: dict_hash = self._remoteLocation.split(":")[0] key = self._remoteLocation.split(":")[1] dictionary: TangoDictionary[TangoJob] = TangoDictionary.create(dict_hash) dictionary.set(key, self) - - def deleteFromDict(self, dictionary : TangoDictionary) -> None: - dictionary.delete(self.id) + + def deleteFromDict(self, dictionary: TangoDictionary) -> None: + assert self._id is not None + dictionary.delete(self._id) self._remoteLocation = None - - def addToDict(self, dictionary : TangoDictionary) -> None: - dictionary.set(self.id, self) + + def addToDict(self, dictionary: TangoDictionary) -> None: + assert self._id is not None + dictionary.set(self._id, self) assert self._remoteLocation is None, "Job already has a remote location" if Config.USE_REDIS: - self._remoteLocation = dictionary.hash_name + ":" + str(self.id) + self._remoteLocation = dictionary.hash_name + ":" + str(self._id) self.updateRemote() - def TangoIntValue(object_name, obj): @@ -348,11 +364,14 @@ def get(self): def set(self, val): self.val = val return val - -class TangoQueue(Protocol): + +QueueElem = TypeVar("QueueElem") + + +class TangoQueue(Protocol[QueueElem]): @staticmethod - def create(key_name: str) -> TangoQueue: + def create(key_name: str) -> TangoQueue[QueueElem]: if Config.USE_REDIS: return TangoRemoteQueue(key_name) else: @@ -361,20 +380,27 @@ def create(key_name: str) -> TangoQueue: @abstractmethod def qsize(self) -> int: ... + def empty(self) -> bool: ... - def put(self, item) -> None: + + def put(self, item: QueueElem) -> None: ... - def get(self, block=True, timeout=None) -> Optional[T]: + + def get(self, block=True, timeout=None) -> Optional[QueueElem]: ... - def get_nowait(self) -> Optional[T]: + + def get_nowait(self) -> Optional[QueueElem]: ... - def remove(self, item) -> None: + + def remove(self, item: QueueElem) -> None: ... + def make_empty(self) -> None: ... -class ExtendedQueue(Queue, TangoQueue): + +class ExtendedQueue(Queue, TangoQueue[QueueElem]): """Python Thread safe Queue with the remove and clean function added""" def test(self): @@ -392,7 +418,8 @@ def remove(self, value): def make_empty(self): with self.mutex: self.queue.clear() - + + class TangoRemoteQueue(TangoQueue): """Simple Queue with Redis Backend""" @@ -455,46 +482,55 @@ def remove(self, item): def make_empty(self) -> None: self.__db.delete(self.key) -T = TypeVar('T') + +T = TypeVar("T") +KeyType = Union[str, int] # Dictionary from string to T class TangoDictionary(Protocol[T]): - @staticmethod def create(dictionary_name: str) -> TangoDictionary[T]: if Config.USE_REDIS: return TangoRemoteDictionary(dictionary_name) else: return TangoNativeDictionary() - + @property @abstractmethod def hash_name(self) -> str: ... - + @abstractmethod - def __contains__(self, id: str) -> bool: + def __contains__(self, id: KeyType) -> bool: ... + @abstractmethod - def set(self, id: str, obj: T) -> str: + def set(self, id: KeyType, obj: T) -> str: ... + @abstractmethod - def get(self, id: str) -> Optional[T]: + def get(self, id: KeyType) -> Optional[T]: ... + @abstractmethod - def getExn(self, id: str) -> T: + def getExn(self, id: KeyType) -> T: ... + @abstractmethod def keys(self) -> list[str]: ... + @abstractmethod def values(self) -> list[T]: ... + @abstractmethod - def delete(self, id: str) -> None: + def delete(self, id: KeyType) -> None: ... + @abstractmethod def make_empty(self) -> None: ... + @abstractmethod def items(self) -> list[tuple[str, T]]: ... @@ -512,8 +548,6 @@ def items(self) -> list[tuple[str, T]]: # return TangoNativeDictionary() - - class TangoRemoteDictionary(TangoDictionary[T]): def __init__(self, object_name): self.r = getRedisConnection() diff --git a/tests/sample_test/expected_output.txt b/tests/sample_test/expected_output.txt index 70c379b6..802992c4 100644 --- a/tests/sample_test/expected_output.txt +++ b/tests/sample_test/expected_output.txt @@ -1 +1 @@ -Hello world \ No newline at end of file +Hello world diff --git a/tests/stressTest.py b/tests/stressTest.py index 6494a8c4..eeb46bfa 100644 --- a/tests/stressTest.py +++ b/tests/stressTest.py @@ -14,7 +14,17 @@ start_time = time.time() expected_output = "" -def printProgressBar (iteration, total, prefix = '', suffix = '', decimals = 1, length = 100, fill = '█', printEnd = "\r"): + +def printProgressBar( + iteration, + total, + prefix="", + suffix="", + decimals=1, + length=100, + fill="█", + printEnd="\r", +): """ Call in a loop to create terminal progress bar @params: @@ -29,39 +39,65 @@ def printProgressBar (iteration, total, prefix = '', suffix = '', decimals = 1, """ percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) filledLength = int(length * iteration // total) - bar = fill * filledLength + '-' * (length - filledLength) - print(f'\r{prefix} |{bar}| {percent}% {suffix}', end = printEnd) + bar = fill * filledLength + "-" * (length - filledLength) + print(f"\r{prefix} |{bar}| {percent}% {suffix}", end=printEnd) # Print New Line on Complete - if iteration == total: + if iteration == total: print() -def run_stress_test(num_submissions, submission_delay, autograder_image, output_file, tango_port, cli_path, - job_name, job_path, instance_type, timeout, ec2): - printProgressBar(0, num_submissions, prefix = 'Jobs Added:', suffix = 'Complete', length = 50) - with open(output_file, 'a') as f: + +def run_stress_test( + num_submissions, + submission_delay, + autograder_image, + output_file, + tango_port, + cli_path, + job_name, + job_path, + instance_type, + timeout, + ec2, +): + printProgressBar( + 0, num_submissions, prefix="Jobs Added:", suffix="Complete", length=50 + ) + with open(output_file, "a") as f: f.write(f"Stress testing with {num_submissions} submissions\n") - + for i in range(1, num_submissions + 1): command = [ - 'python3', cli_path, - '-P', str(tango_port), - '-k', 'test', - '-l', job_name, - '--runJob', job_path, - '--image', autograder_image, - '--instanceType', instance_type, - '--timeout', str(timeout), - '--callbackURL', ("http://localhost:8888/autograde_done?id=%d" % (i)) + "python3", + cli_path, + "-P", + str(tango_port), + "-k", + "test", + "-l", + job_name, + "--runJob", + job_path, + "--image", + autograder_image, + "--instanceType", + instance_type, + "--timeout", + str(timeout), + "--callbackURL", + ("http://localhost:8888/autograde_done?id=%d" % (i)), ] if ec2: - command += ['--ec2'] + command += ["--ec2"] subprocess.run(command, stdout=f, stderr=f) f.write(f"Submission {i} completed\n") - printProgressBar(i, num_submissions, prefix = 'Jobs Added:', suffix = 'Complete', length = 50) + printProgressBar( + i, num_submissions, prefix="Jobs Added:", suffix="Complete", length=50 + ) if submission_delay > 0: time.sleep(submission_delay) print() + class AutogradeDoneHandler(tornado.web.RequestHandler): def post(self): global finished_tests @@ -71,10 +107,16 @@ def post(self): id = self.get_query_argument("id") fileBody = self.request.files["file"][0]["body"].decode() scoreJson = fileBody.split("\n")[-2] - with open(os.path.join(test_dir, "output", "output%s.txt" % id), 'w') as f: + with open(os.path.join(test_dir, "output", "output%s.txt" % id), "w") as f: f.write(fileBody) finished_tests[str(id)] = scoreJson - printProgressBar(len(finished_tests), sub_num, prefix = 'Tests Done:', suffix = 'Complete', length = 50) + printProgressBar( + len(finished_tests), + sub_num, + prefix="Tests Done:", + suffix="Complete", + length=50, + ) self.write("ok") self.flush() @@ -87,6 +129,7 @@ def on_finish(self): print("\nShutting down server...") tornado.ioloop.IOLoop.current().stop() + def create_summary(): success = [] failed = [] @@ -95,7 +138,7 @@ def create_summary(): success.append(i) else: failed.append(i) - with open(os.path.join(test_dir, "summary.txt"), 'w') as f: + with open(os.path.join(test_dir, "summary.txt"), "w") as f: f.write("Total Time: %d seconds\n" % (time.time() - start_time)) f.write("Total Succeeded: %d / %d\n" % (len(success), sub_num)) f.write("Total Failed: %d / %d\n" % (len(failed), sub_num)) @@ -107,21 +150,28 @@ def create_summary(): for i in range(0, len(failed)): f.write("Test Case #%d: %s\n" % (failed[i], finished_tests[str(failed[i])])) + def make_app(): - return tornado.web.Application([ - (r"/autograde_done", AutogradeDoneHandler), - ]) + return tornado.web.Application( + [ + (r"/autograde_done", AutogradeDoneHandler), + ] + ) + def notifyServer(): global shutdown_event app = make_app() app.listen(8888) - printProgressBar(0, sub_num, prefix = 'Tests Done:', suffix = 'Complete', length = 50) + printProgressBar(0, sub_num, prefix="Tests Done:", suffix="Complete", length=50) tornado.ioloop.IOLoop.current().start() + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Stress test script for Tango") - parser.add_argument('--test_dir', type=str, required=True, help="Directory to run the test in") + parser.add_argument( + "--test_dir", type=str, required=True, help="Directory to run the test in" + ) args = parser.parse_args() @@ -129,10 +179,10 @@ def notifyServer(): test_dir = args.test_dir - with open(os.path.join(args.test_dir, dirname + '.yaml'), 'r') as f: + with open(os.path.join(args.test_dir, dirname + ".yaml"), "r") as f: data = yaml.load(f, Loader=yaml.SafeLoader) - - with open(os.path.join(args.test_dir, data["expected_output"]), 'r') as f: + + with open(os.path.join(args.test_dir, data["expected_output"]), "r") as f: expected_output = f.read() sub_num = data["num_submissions"] @@ -152,10 +202,10 @@ def notifyServer(): data["tango_port"], data["cli_path"], dirname, - os.path.join(args.test_dir, 'input'), + os.path.join(args.test_dir, "input"), data["instance_type"], data["timeout"], - data["ec2"] + data["ec2"], ) - notifyServer() \ No newline at end of file + notifyServer() diff --git a/vmms/distDocker.py b/vmms/distDocker.py index b6d498e5..ab90bcb6 100644 --- a/vmms/distDocker.py +++ b/vmms/distDocker.py @@ -21,61 +21,11 @@ import socket import config from tangoObjects import TangoMachine +from vmms.interface import VMMSInterface +from vmms.sharedUtils import VMMSUtils -def timeout(command, time_out=1): - """timeout - Run a unix command with a timeout. Return -1 on - timeout, otherwise return the return value from the command, which - is typically 0 for success, 1-255 for failure. - """ - - # Launch the command - p = subprocess.Popen( - command, stdout=open("/dev/null", "w"), stderr=subprocess.STDOUT - ) - - # Wait for the command to complete - t = 0.0 - while t < time_out and p.poll() is None: - time.sleep(config.Config.TIMER_POLL_INTERVAL) - t += config.Config.TIMER_POLL_INTERVAL - - # Determine why the while loop terminated - if p.poll() is None: - try: - os.kill(p.pid, 9) - except OSError: - pass - returncode = -1 - else: - returncode = p.poll() - return returncode - - -def timeoutWithReturnStatus(command, time_out, returnValue=0): - """timeoutWithReturnStatus - Run a Unix command with a timeout, - until the expected value is returned by the command; On timeout, - return last error code obtained from the command. - """ - p = subprocess.Popen( - command, stdout=open("/dev/null", "w"), stderr=subprocess.STDOUT - ) - t = 0.0 - while t < time_out: - ret = p.poll() - if ret is None: - time.sleep(config.Config.TIMER_POLL_INTERVAL) - t += config.Config.TIMER_POLL_INTERVAL - elif ret == returnValue: - return ret - else: - p = subprocess.Popen( - command, stdout=open("/dev/null", "w"), stderr=subprocess.STDOUT - ) - return ret - - -class DistDocker(object): +class DistDocker(VMMSInterface, VMMSUtils): _SSH_FLAGS = ["-q", "-o", "BatchMode=yes"] _SSH_AUTH_FLAGS = [ @@ -171,7 +121,7 @@ def waitVM(self, vm, max_secs): # If the call to ssh returns timeout (-1) or ssh error # (255), then success. Otherwise, keep trying until we run # out of time. - ret = timeout( + ret = VMMSUtils.timeout( ["ssh"] + DistDocker._SSH_FLAGS + vm.ssh_flags @@ -187,7 +137,7 @@ def waitVM(self, vm, max_secs): # Sleep a bit before trying again time.sleep(config.Config.TIMER_POLL_INTERVAL) - def copyIn(self, vm, inputFiles): + def copyIn(self, vm, inputFiles, job_id=None): """copyIn - Create a directory to be mounted as a volume for the docker containers on the host machine for this VM. Copy input files to this directory on the host machine. @@ -196,7 +146,7 @@ def copyIn(self, vm, inputFiles): volumePath = self.getVolumePath(instanceName) if vm.use_ssh_master: - ret = timeout( + ret = VMMSUtils.timeout( ["ssh"] + DistDocker._SSH_FLAGS + vm.ssh_flags @@ -208,7 +158,7 @@ def copyIn(self, vm, inputFiles): return ret # Create a fresh volume - ret = timeout( + ret = VMMSUtils.timeout( ["ssh"] + DistDocker._SSH_FLAGS + vm.ssh_flags @@ -224,7 +174,7 @@ def copyIn(self, vm, inputFiles): return ret for file in inputFiles: - ret = timeout( + ret = VMMSUtils.timeout( ["scp"] + DistDocker._SSH_FLAGS + vm.ssh_flags @@ -260,7 +210,7 @@ def runJob(self, vm, runTimeout, maxOutputFileSize, disableNetwork): volumePath = self.getVolumePath(instanceName) if vm.use_ssh_master: - ret = timeout( + ret = VMMSUtils.timeout( ["ssh"] + DistDocker._SSH_FLAGS + vm.ssh_flags @@ -302,7 +252,7 @@ def runJob(self, vm, runTimeout, maxOutputFileSize, disableNetwork): self.log.debug("Running job: %s" % args) - ret = timeout( + ret = VMMSUtils.timeout( ["ssh"] + DistDocker._SSH_FLAGS + vm.ssh_flags @@ -323,7 +273,7 @@ def copyOut(self, vm, destFile): volumePath = self.getVolumePath(instanceName) if vm.use_ssh_master: - ret = timeout( + ret = VMMSUtils.timeout( ["ssh"] + DistDocker._SSH_FLAGS + vm.ssh_flags @@ -334,7 +284,7 @@ def copyOut(self, vm, destFile): self.log.debug("Lost persistent SSH connection") return ret - ret = timeout( + ret = VMMSUtils.timeout( ["scp"] + DistDocker._SSH_FLAGS + vm.ssh_flags @@ -355,7 +305,7 @@ def destroyVM(self, vm): instanceName = self.instanceName(vm.id, vm.image) volumePath = self.getVolumePath(instanceName) if vm.use_ssh_master: - ret = timeout( + ret = VMMSUtils.timeout( ["ssh"] + DistDocker._SSH_FLAGS + vm.ssh_flags @@ -371,7 +321,7 @@ def destroyVM(self, vm): # Do a hard kill on corresponding docker container. # Return status does not matter. args = "(docker rm -f %s)" % (instanceName) - timeout( + VMMSUtils.timeout( ["ssh"] + DistDocker._SSH_FLAGS + vm.ssh_flags @@ -379,7 +329,7 @@ def destroyVM(self, vm): config.Config.DOCKER_RM_TIMEOUT, ) # Destroy corresponding volume if it exists. - timeout( + VMMSUtils.timeout( ["ssh"] + DistDocker._SSH_FLAGS + vm.ssh_flags @@ -388,7 +338,7 @@ def destroyVM(self, vm): ) self.log.debug("Deleted volume %s" % instanceName) if vm.use_ssh_master: - timeout( + VMMSUtils.timeout( ["ssh"] + DistDocker._SSH_FLAGS + vm.ssh_flags @@ -489,7 +439,7 @@ def getPartialOutput(self, vm): instanceName = self.instanceName(vm.id, vm.image) if vm.use_ssh_master: - ret = timeout( + ret = VMMSUtils.timeout( ["ssh"] + DistDocker._SSH_FLAGS + vm.ssh_flags diff --git a/vmms/ec2SSH.py b/vmms/ec2SSH.py index 914de719..f55c3c05 100644 --- a/vmms/ec2SSH.py +++ b/vmms/ec2SSH.py @@ -2,11 +2,7 @@ # ec2SSH.py - Implements the Tango VMMS interface to run Tango jobs on Amazon EC2. # # This implementation uses the AWS EC2 SDK to manage the virtual machines and -# ssh and scp to access them. The following excecption are raised back -# to the caller: -# -# Ec2Exception - EC2 raises this if it encounters any problem -# ec2CallError - raised by ec2Call() function +# ssh and scp to access them. # import logging @@ -21,57 +17,37 @@ from botocore.exceptions import ClientError import config -from tangoObjects import TangoMachine -from typing import Optional, Literal, List, Sequence +from tangoObjects import TangoMachine, InputFile +from typing import Optional, Literal, List, Dict, Sequence, Set, get_args +from typing_extensions import TypeGuard from mypy_boto3_ec2 import EC2ServiceResource -from mypy_boto3_ec2.service_resource import Instance -from mypy_boto3_ec2.type_defs import FilterTypeDef +from mypy_boto3_ec2.literals import InstanceTypeType +from mypy_boto3_ec2.service_resource import Instance, Image +from mypy_boto3_ec2.type_defs import FilterTypeDef, TagTypeDef from vmms.interface import VMMSInterface - +from vmms.sharedUtils import VMMSUtils # suppress most boto logging logging.getLogger("boto3").setLevel(logging.CRITICAL) logging.getLogger("botocore").setLevel(logging.CRITICAL) logging.getLogger("urllib3.connectionpool").setLevel(logging.CRITICAL) +valid_instance_types: Set[InstanceTypeType] = set(get_args(InstanceTypeType)) -def timeout(command, time_out=1): - """timeout - Run a unix command with a timeout. Return -1 on - timeout, otherwise return the return value from the command, which - is typically 0 for success, 1-255 for failure. - """ - # Launch the command - p = subprocess.Popen( - command, stdout=open("/dev/null", "w"), stderr=subprocess.STDOUT - ) - - # Wait for the command to complete - t = 0.0 - while t < time_out and p.poll() is None: - time.sleep(config.Config.TIMER_POLL_INTERVAL) - t += config.Config.TIMER_POLL_INTERVAL - if t >= time_out: - print("ERROR: timeout trying ", command) - # Determine why the while loop terminated - if p.poll() is None: - try: - os.kill(p.pid, 9) - except OSError: - pass - returncode = -1 - else: - returncode = p.poll() - return returncode +def check_instance_type(instance_type: str) -> TypeGuard[InstanceTypeType]: + return instance_type in valid_instance_types -def timeout_with_retries(command, time_out=1, retries=3, retry_delay=2): +def timeout_with_retries( + command: List[str], time_out: float = 1, retries: int = 3, retry_delay: float = 2 +) -> int: """timeout - Run a unix command with a timeout. Return -1 on timeout, otherwise return the return value from the command, which is typically 0 for success, 1-255 for failure. """ - for attempt in range(retries + 1): + for _ in range(retries + 1): # Launch the command p = subprocess.Popen( command, stdout=open("/dev/null", "w"), stderr=subprocess.STDOUT @@ -86,49 +62,24 @@ def timeout_with_retries(command, time_out=1, retries=3, retry_delay=2): print("ERROR: timeout trying ", command) # Determine why the while loop terminated - if p.poll() is None: + poll_result: Optional[int] = p.poll() + if poll_result is None: try: os.kill(p.pid, 9) except OSError: pass returncode = -1 else: - returncode = p.poll() + returncode = poll_result # try to retry the command on a timeout if returncode == -1: - if attempt < retries: - print(f"Retrying in {retry_delay} seconds...") - time.sleep(retry_delay) - else: - # attempt == retries -> failure - print("All retries exhausted.") - return -1 + print(f"Retrying in {retry_delay} seconds...") + time.sleep(retry_delay) else: return returncode - - -def timeoutWithReturnStatus(command, time_out, returnValue=0): - """timeoutWithReturnStatus - Run a Unix command with a timeout, - until the expected value is returned by the command; On timeout, - return last error code obtained from the command. - """ - p = subprocess.Popen( - command, stdout=open("/dev/null", "w"), stderr=subprocess.STDOUT - ) - t = 0.0 - while t < time_out: - ret = p.poll() - if ret is None: - time.sleep(config.Config.TIMER_POLL_INTERVAL) - t += config.Config.TIMER_POLL_INTERVAL - elif ret == returnValue: - return ret - else: - p = subprocess.Popen( - command, stdout=open("/dev/null", "w"), stderr=subprocess.STDOUT - ) - return ret + print("All retries exhausted.") + return -1 @backoff.on_exception(backoff.expo, ClientError, max_tries=3, jitter=None) @@ -136,17 +87,7 @@ def try_load_instance(newInstance): newInstance.load() -# -# User defined exceptions -# -# ec2Call() exception - - -class ec2CallError(Exception): - pass - - -class Ec2SSH(VMMSInterface): +class Ec2SSH(VMMSInterface, VMMSUtils): _SSH_FLAGS = [ "-i", config.Config.SECURITY_KEY_PATH, @@ -160,17 +101,19 @@ class Ec2SSH(VMMSInterface): _vm_semaphore = threading.Semaphore(config.Config.MAX_EC2_VMS) @staticmethod - def acquire_vm_semaphore(): + def acquire_vm_semaphore() -> None: """Blocks until a VM is available to limit load""" Ec2SSH._vm_semaphore.acquire() # This blocks until a slot is available @staticmethod - def release_vm_semaphore(): + def release_vm_semaphore() -> None: """Releases the VM sempahore""" Ec2SSH._vm_semaphore.release() # TODO: the arguments accessKeyId and accessKey don't do anything - def __init__(self, accessKeyId=None, accessKey=None): + def __init__( + self, accessKeyId: Optional[str] = None, accessKey: Optional[str] = None + ) -> None: """log - logger for the instance connection - EC2Connection object that stores the connection info to the EC2 network @@ -203,11 +146,15 @@ def __init__(self, accessKeyId=None, accessKey=None): # self.createKeyPair() # create boto3resource - self.img2ami = {} # this is a bad name, should really be img_name to img - self.images = [] + self.img2ami: Dict[ + str, Image + ] = {} # this is a bad name, should really be img_name to img + self.images: List[Image] = [] try: # This is a service resource - self.boto3resource: EC2ServiceResource = boto3.resource("ec2", config.Config.EC2_REGION) # TODO: rename this ot self.ec2resource + self.boto3resource: EC2ServiceResource = boto3.resource( + "ec2", config.Config.EC2_REGION + ) # TODO: rename this ot self.ec2resource self.boto3client = boto3.client("ec2", config.Config.EC2_REGION) # Get images from ec2 @@ -242,18 +189,11 @@ def __init__(self, accessKeyId=None, accessKey=None): % str(ignoredAMIs) ) - def instanceName(self, id, name): - """instanceName - Constructs a VM instance name. Always use - this function when you need a VM instance name. Never generate - instance names manually. - """ - return "%s-%d-%s" % (config.Config.PREFIX, id, name) - - def keyPairName(self, id, name): + def keyPairName(self, id: int, name: str) -> str: """keyPairName - Constructs a unique key pair name.""" return "%s-%d-%s" % (config.Config.PREFIX, id, name) - def domainName(self, vm): + def domainName(self, vm: TangoMachine) -> str: """Returns the domain name that is stored in the vm instance. """ @@ -262,8 +202,11 @@ def domainName(self, vm): # # VMMS helper methods # + def instanceName(self, id: int, name: str) -> str: + return VMMSUtils.constructInstanceName(id, name) - def tangoMachineToEC2Instance(self, vm: TangoMachine) -> dict: + # TODO: return a dataclass with the instance_type member of type InstanceTypeType of type str + def tangoMachineToEC2Instance(self, vm: TangoMachine) -> Dict[str, str]: """tangoMachineToEC2Instance - returns an object with EC2 instance type and AMI. Only general-purpose instances are used. Defalt AMI is currently used. @@ -287,7 +230,7 @@ def tangoMachineToEC2Instance(self, vm: TangoMachine) -> dict: self.log.info("tangoMachineToEC2Instance: %s" % str(ec2instance)) return ec2instance - def createKeyPair(self): + def createKeyPair(self) -> None: # TODO: SUPPORT raise # # try to delete the key to avoid collision @@ -301,7 +244,7 @@ def createKeyPair(self): # # change the SSH_FLAG accordingly # self.ssh_flags[1] = self.key_pair_path - def deleteKeyPair(self): + def deleteKeyPair(self) -> None: # TODO: SUPPORT raise # self.boto3client.delete_key_pair(self.key_pair_name) @@ -311,10 +254,12 @@ def deleteKeyPair(self): # except OSError: # pass - def createSecurityGroup(self): + # Creates a security group if it doesn't exist. + # ^ Note: strangely, the security group id is never used. + def createSecurityGroup(self) -> None: try: # Check if the security group already exists - response = self.boto3client.describe_security_groups( + description_response = self.boto3client.describe_security_groups( Filters=[ { "Name": "group-name", @@ -322,21 +267,21 @@ def createSecurityGroup(self): } ] ) - if response["SecurityGroups"]: - security_group_id = response["SecurityGroups"][0]["GroupId"] + if description_response["SecurityGroups"]: + security_group_id = description_response["SecurityGroups"][0]["GroupId"] return except Exception as e: self.log.debug("ERROR checking for existing security group: %s", e) - + # ! Note: We've never encountered the lines below before (there was a type error), + # ! because we've always had a security group. + # ! Difficult to test because it involves deleting all security groups. try: - response = self.boto3resource.create_security_group( + security_group_response = self.boto3client.create_security_group( GroupName=config.Config.DEFAULT_SECURITY_GROUP, Description="Autolab security group - allowing all traffic", ) - security_group_id = response["GroupId"] - self.boto3resource.authorize_security_group_ingress( - GroupId=security_group_id - ) + security_group_id = security_group_response["GroupId"] + self.boto3client.authorize_security_group_ingress(GroupId=security_group_id) except Exception as e: self.log.debug("ERROR in creating security group: %s", e) @@ -354,25 +299,26 @@ def initializeVM(self, vm: TangoMachine) -> Literal[0, -1]: try: instanceName = self.instanceName(vm.id, vm.name) ec2instance = self.tangoMachineToEC2Instance(vm) + instance_type = ec2instance["instance_type"] + if not check_instance_type(instance_type): + raise ValueError(f"Invalid instance type: {instance_type}") self.log.debug("instanceName: %s" % instanceName) # ensure that security group exists self.createSecurityGroup() - reservation: List[Instance] = self.boto3resource.create_instances( ImageId=ec2instance["ami"], KeyName=self.key_pair_name, SecurityGroups=[config.Config.DEFAULT_SECURITY_GROUP], - InstanceType=ec2instance["instance_type"], + InstanceType=instance_type, MaxCount=1, MinCount=1, - InstanceMarketOptions= - { + InstanceMarketOptions={ "MarketType": "spot", "SpotOptions": { "SpotInstanceType": "one-time", - "InstanceInterruptionBehavior": "terminate" - } + "InstanceInterruptionBehavior": "terminate", + }, }, ) @@ -400,18 +346,13 @@ def initializeVM(self, vm: TangoMachine) -> Literal[0, -1]: # reload the state of the new instance try_load_instance(newInstance) for inst in instances.filter(InstanceIds=[newInstance.id]): - self.log.debug( - "VM %s %s: is running" % (vm.name, newInstance.id) - ) + self.log.debug("VM %s %s: is running" % (vm.name, newInstance.id)) instanceRunning = True if instanceRunning: break - if ( - time.time() - start_time - > config.Config.INITIALIZEVM_TIMEOUT - ): + if time.time() - start_time > config.Config.INITIALIZEVM_TIMEOUT: raise ValueError( "VM %s %s: timeout (%d seconds) before reaching 'running' state" % ( @@ -466,7 +407,7 @@ def initializeVM(self, vm: TangoMachine) -> Literal[0, -1]: return -1 return -1 - def waitVM(self, vm, max_secs) -> Literal[0, -1]: + def waitVM(self, vm: TangoMachine, max_secs: int) -> Literal[0, -1]: """waitVM - Wait at most max_secs for a VM to become ready. Return error if it takes too long. @@ -517,7 +458,7 @@ def waitVM(self, vm, max_secs) -> Literal[0, -1]: # If the call to ssh returns timeout (-1) or ssh error # (255), then success. Otherwise, keep trying until we run # out of time. - ret = timeout( + ret = VMMSUtils.timeout( ["ssh"] + self.ssh_flags + ["%s@%s" % (self.ec2User, domain_name), "(:)"], @@ -532,18 +473,21 @@ def waitVM(self, vm, max_secs) -> Literal[0, -1]: # Sleep a bit before trying again time.sleep(config.Config.TIMER_POLL_INTERVAL) - def copyIn(self, vm, inputFiles, job_id=None): + def copyIn( + self, + vm: TangoMachine, + inputFiles: List[InputFile], + job_id: Optional[int] = None, + ) -> int: """copyIn - Copy input files to VM Args: - vm is a TangoMachine object - - inputFiles is a list of objects with attributes localFile and destFile. + - inputFiles is a list of objects with attributes localFile and destFile. localFile is the file on the host, destFile is the file on the VM. - - job_id is the job id of the job being run on the VM. + - job_id is the job id of the job being run on the VM. It is used for logging purposes only. """ - self.log.info( - "copyIn %s - writing files" % self.instanceName(vm.id, vm.name) - ) + self.log.info("copyIn %s - writing files" % self.instanceName(vm.id, vm.name)) domain_name = self.domainName(vm) @@ -565,18 +509,14 @@ def copyIn(self, vm, inputFiles, job_id=None): self.log.info("%s for job %s" % (line, job_id)) self.log.info("Return Code: %s, job: %s" % (result.returncode, job_id)) if result.stderr != 0: - self.log.info( - "Standard Error: %s, job: %s" % (result.stderr, job_id) - ) + self.log.info("Standard Error: %s, job: %s" % (result.stderr, job_id)) # Validate inputFiles structure if not inputFiles or not all( hasattr(file, "localFile") and hasattr(file, "destFile") for file in inputFiles ): - self.log.info( - "Error: Invalid inputFiles Structure, job: %s" % job_id - ) + self.log.info("Error: Invalid inputFiles Structure, job: %s" % job_id) for file in inputFiles: self.log.info("%s - %s" % (file.localFile, file.destFile)) @@ -585,8 +525,7 @@ def copyIn(self, vm, inputFiles, job_id=None): + self.ssh_flags + [ file.localFile, - "%s@%s:~/autolab/%s" - % (self.ec2User, domain_name, file.destFile), + "%s@%s:~/autolab/%s" % (self.ec2User, domain_name, file.destFile), ], config.Config.COPYIN_TIMEOUT, ) @@ -596,7 +535,13 @@ def copyIn(self, vm, inputFiles, job_id=None): return 0 - def runJob(self, vm, runTimeout, maxOutputFileSize, disableNetwork): + def runJob( + self, + vm: TangoMachine, + runTimeout: int, + maxOutputFileSize: int, + disableNetwork: bool, + ) -> int: """runJob - Run the make command on a VM using SSH and redirect output to file "output". """ @@ -618,17 +563,15 @@ def runJob(self, vm, runTimeout, maxOutputFileSize, disableNetwork): ) # no logging for now - ret = timeout( - ["ssh"] - + self.ssh_flags - + ["%s@%s" % (self.ec2User, domain_name), runcmd], + ret = VMMSUtils.timeout( + ["ssh"] + self.ssh_flags + ["%s@%s" % (self.ec2User, domain_name), runcmd], runTimeout * 2, ) # runTimeout * 2 is a temporary hack. The driver will handle the timout return ret - def copyOut(self, vm, destFile): + def copyOut(self, vm: TangoMachine, destFile: str) -> int: """copyOut - Copy the file output on the VM to the file outputFile on the Tango host. """ @@ -673,11 +616,11 @@ def copyOut(self, vm, destFile): except subprocess.CalledProcessError as xxx_todo_changeme: # Error copying out the timing data (probably runJob failed) - re.error = xxx_todo_changeme + # re.error = xxx_todo_changeme # Error copying out the timing data (probably runJob failed) pass - return timeout( + return VMMSUtils.timeout( ["scp"] + self.ssh_flags + [ @@ -687,7 +630,7 @@ def copyOut(self, vm, destFile): config.Config.COPYOUT_TIMEOUT, ) - def destroyVM(self, vm): + def destroyVM(self, vm: TangoMachine) -> None: """destroyVM - Removes a VM from the system""" self.log.info( "destroyVM: %s %s %s %s" @@ -699,9 +642,7 @@ def destroyVM(self, vm): InstanceIds=[vm.instance_id] ) if not instances: - self.log.debug( - "no instances found with instance id %s", vm.instance_id - ) + self.log.debug("no instances found with instance id %s", vm.instance_id) # Keep the vm and mark with meaningful tags for debugging if ( hasattr(config.Config, "KEEP_VM_AFTER_FAILURE") @@ -728,22 +669,20 @@ def destroyVM(self, vm): if not self.useDefaultKeyPair: self.deleteKeyPair() except Exception as e: - self.log.error( - "destroyVM failed: %s for vm %s" % (e, vm.instance_id) - ) + self.log.error("destroyVM failed: %s for vm %s" % (e, vm.instance_id)) Ec2SSH.release_vm_semaphore() - def safeDestroyVM(self, vm): + def safeDestroyVM(self, vm: TangoMachine) -> None: return self.destroyVM(vm) - def getVMs(self): + def getVMs(self) -> List[TangoMachine]: """getVMs - Returns the complete list of VMs on this account. Each list entry is a boto.ec2.instance.Instance object. """ try: - vms = list() - filters = [ + vms: List[TangoMachine] = [] + filters: Sequence[FilterTypeDef] = [ { "Name": "instance-state-name", "Values": ["running", "pending"], @@ -764,18 +703,12 @@ def getVMs(self): instance.tags, "Name" ) # inst name PREFIX-serial-IMAGE # Name tag is the standard form of prefix-serial-image - if not ( - instName - and re.match("%s-" % config.Config.PREFIX, instName) - ): - self.log.debug( - "getVMs: Instance id %s skipped" % vm.instance_id - ) + if not (instName and re.match("%s-" % config.Config.PREFIX, instName)): + self.log.debug("getVMs: Instance id %s skipped" % vm.instance_id) continue # instance without name tag or proper prefix vm.name = instName vm.id = int(instName.split("-")[1]) - vm.pool = instName.split("-")[2] vm.name = instName # needed for SSH @@ -784,18 +717,19 @@ def getVMs(self): vms.append(vm) self.log.debug( - "getVMs: Instance id %s, name %s" - % (vm.instance_id, vm.name) + "getVMs: Instance id %s, name %s" % (vm.instance_id, vm.name) ) except Exception as e: self.log.debug("getVMs Failed: %s" % e) return vms - def existsVM(self, vm): + def existsVM(self, vm: TangoMachine) -> bool: """existsVM - Checks whether a VM exists in the vmms.""" # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/migrationec2.html - filters = [{"Name": "instance-state-name", "Values": ["running"]}] + filters: Sequence[FilterTypeDef] = [ + {"Name": "instance-state-name", "Values": ["running"]} + ] # gets all running instances instances = self.boto3resource.instances.filter(Filters=filters) for instance in instances: @@ -807,19 +741,18 @@ def existsVM(self, vm): # for instance in instances.filter(InstanceIds) return False - def getImages(self): + def getImages(self) -> List[str]: """getImages - return a constant; actually use the ami specified in config""" return [key for key in self.img2ami] - # getTag: to do later - def getTag(self, tagList, tagKey): + def getTag(self, tagList: List[TagTypeDef], tagKey: str) -> Optional[str]: if tagList: for tag in tagList: if tag["Key"] == tagKey: return tag["Value"] return None - def getPartialOutput(self, vm): + def getPartialOutput(self, vm: TangoMachine) -> str: domain_name = self.domainName(vm) runcmd = "head -c %s /home/autograde/output.log" % ( @@ -827,13 +760,11 @@ def getPartialOutput(self, vm): ) sshcmd = ( - ["ssh"] - + self.ssh_flags - + ["%s@%s" % (self.ec2User, domain_name), runcmd] + ["ssh"] + self.ssh_flags + ["%s@%s" % (self.ec2User, domain_name), runcmd] ) - output = subprocess.check_output( - sshcmd, stderr=subprocess.STDOUT - ).decode("utf-8") + output = subprocess.check_output(sshcmd, stderr=subprocess.STDOUT).decode( + "utf-8" + ) return output diff --git a/vmms/interface.py b/vmms/interface.py index d71500f9..33454f13 100644 --- a/vmms/interface.py +++ b/vmms/interface.py @@ -1,5 +1,5 @@ from typing import Protocol, Optional, Literal, List -from tangoObjects import TangoMachine +from tangoObjects import TangoMachine, InputFile from abc import abstractmethod @@ -13,23 +13,34 @@ def waitVM(self, vm: TangoMachine, max_secs: int) -> Literal[0, -1]: ... @abstractmethod - def copyIn(self, vm: TangoMachine, inputFiles: List[str], job_id: Optional[int] = None) -> Literal[0, -1]: + def copyIn( + self, + vm: TangoMachine, + inputFiles: List[InputFile], + job_id: Optional[int] = None, + ) -> int: ... @abstractmethod - def runJob(self, vm: TangoMachine, runTimeout: int, maxOutputFileSize: int, disableNetwork: bool) -> int: # -1 to infinity + def runJob( + self, + vm: TangoMachine, + runTimeout: int, + maxOutputFileSize: int, + disableNetwork: bool, + ) -> int: # -1 to infinity ... @abstractmethod - def copyOut(self, vm: TangoMachine, destFile: str) -> Literal[0, -1]: + def copyOut(self, vm: TangoMachine, destFile: str) -> int: ... @abstractmethod - def destroyVM(self, vm: TangoMachine) -> Literal[0, -1]: + def destroyVM(self, vm: TangoMachine) -> None: ... - + @abstractmethod - def safeDestroyVM(self, vm: TangoMachine) -> Literal[0, -1]: + def safeDestroyVM(self, vm: TangoMachine) -> None: ... @abstractmethod @@ -40,3 +51,14 @@ def getVMs(self) -> List[TangoMachine]: def existsVM(self, vm: TangoMachine) -> bool: ... + @abstractmethod + def instanceName(self, id: int, name: str) -> str: + ... + + @abstractmethod + def getImages(self) -> List[str]: + ... + + @abstractmethod + def getPartialOutput(self, vm: TangoMachine) -> str: + ... diff --git a/vmms/localDocker.py b/vmms/localDocker.py index 338bc6d4..5617b807 100644 --- a/vmms/localDocker.py +++ b/vmms/localDocker.py @@ -12,62 +12,10 @@ import sys import shutil import config -from tangoObjects import TangoMachine - +from tangoObjects import TangoMachine, InputFile +from typing import List, Literal, Optional from vmms.interface import VMMSInterface - - - -def timeout(command, time_out=1): - """timeout - Run a unix command with a timeout. Return -1 on - timeout, otherwise return the return value from the command, which - is typically 0 for success, 1-255 for failure. - """ - - # Launch the command - p = subprocess.Popen( - command, stdout=open("/dev/null", "w"), stderr=subprocess.STDOUT - ) - - # Wait for the command to complete - t = 0.0 - while t < time_out and p.poll() is None: - time.sleep(config.Config.TIMER_POLL_INTERVAL) - t += config.Config.TIMER_POLL_INTERVAL - - # Determine why the while loop terminated - if p.poll() is None: - try: - os.kill(p.pid, 9) - except OSError: - pass - returncode = -1 - else: - returncode = p.poll() - return returncode - - -def timeoutWithReturnStatus(command, time_out, returnValue=0): - """timeoutWithReturnStatus - Run a Unix command with a timeout, - until the expected value is returned by the command; On timeout, - return last error code obtained from the command. - """ - p = subprocess.Popen( - command, stdout=open("/dev/null", "w"), stderr=subprocess.STDOUT - ) - t = 0.0 - while t < time_out: - ret = p.poll() - if ret is None: - time.sleep(config.Config.TIMER_POLL_INTERVAL) - t += config.Config.TIMER_POLL_INTERVAL - elif ret == returnValue: - return ret - else: - p = subprocess.Popen( - command, stdout=open("/dev/null", "w"), stderr=subprocess.STDOUT - ) - return ret +from vmms.sharedUtils import VMMSUtils # @@ -75,7 +23,7 @@ def timeoutWithReturnStatus(command, time_out, returnValue=0): # -class LocalDocker(VMMSInterface): +class LocalDocker(VMMSInterface, VMMSUtils): def __init__(self): """Checks if the machine is ready to run docker containers. Initialize boot2docker if running on OS X. @@ -91,25 +39,21 @@ def __init__(self): self.log.error(str(e)) exit(1) - def instanceName(self, id, name): - """instanceName - Constructs a VM instance name. Always use - this function when you need a VM instance name. Never generate - instance names manually. - """ - return "%s-%s-%s" % (config.Config.PREFIX, id, name) + def instanceName(self, id: int, name: str) -> str: + return VMMSUtils.constructInstanceName(id, name) - def getVolumePath(self, instanceName): + def getVolumePath(self, instanceName: str) -> str: volumePath = config.Config.DOCKER_VOLUME_PATH # Last empty string to cause trailing '/' volumePath = os.path.join(volumePath, instanceName, "") return volumePath - def getDockerVolumePath(self, dockerPath, instanceName): + def getDockerVolumePath(self, dockerPath: str, instanceName: str) -> str: # Last empty string to cause trailing '/' volumePath = os.path.join(dockerPath, instanceName, "") return volumePath - def domainName(self, vm): + def domainName(self, vm: TangoMachine) -> str: """Returns the domain name that is stored in the vm instance. """ @@ -118,15 +62,20 @@ def domainName(self, vm): # # VMMS API functions # - def initializeVM(self, vm): + def initializeVM(self, vm: TangoMachine) -> Literal[0, -1]: """initializeVM - Nothing to do for initializeVM""" return 0 - def waitVM(self, vm, max_secs): + def waitVM(self, vm: TangoMachine, max_secs: int) -> Literal[0, -1]: """waitVM - Nothing to do for waitVM""" - return + return 0 - def copyIn(self, vm, inputFiles, job_id=None): + def copyIn( + self, + vm: TangoMachine, + inputFiles: List[InputFile], + job_id: Optional[int] = None, + ) -> int: """copyIn - Create a directory to be mounted as a volume for the docker containers. Copy input files to this directory. """ @@ -145,7 +94,13 @@ def copyIn(self, vm, inputFiles, job_id=None): ) return 0 - def runJob(self, vm, runTimeout, maxOutputFileSize, disableNetwork): + def runJob( + self, + vm: TangoMachine, + runTimeout: int, + maxOutputFileSize: int, + disableNetwork: bool, + ) -> int: """runJob - Run a docker container by doing the follows: - mount directory corresponding to this job to /home/autolab in the container @@ -154,10 +109,9 @@ def runJob(self, vm, runTimeout, maxOutputFileSize, disableNetwork): """ instanceName = self.instanceName(vm.id, vm.image) volumePath = self.getVolumePath(instanceName) - if os.getenv("DOCKER_TANGO_HOST_VOLUME_PATH"): - volumePath = self.getDockerVolumePath( - os.getenv("DOCKER_TANGO_HOST_VOLUME_PATH"), instanceName - ) + host_volume_path = os.getenv("DOCKER_TANGO_HOST_VOLUME_PATH") + if host_volume_path: + volumePath = self.getDockerVolumePath(host_volume_path, instanceName) args = ["docker", "run", "--name", instanceName, "-v"] args = args + ["%s:%s" % (volumePath, "/home/mount")] if vm.cores: @@ -186,12 +140,12 @@ def runJob(self, vm, runTimeout, maxOutputFileSize, disableNetwork): ] self.log.debug("Running job: %s" % str(args)) - ret = timeout(args, runTimeout * 2) + ret = VMMSUtils.timeout(args, runTimeout * 2) self.log.debug("runJob returning %d" % ret) return ret - def copyOut(self, vm, destFile): + def copyOut(self, vm: TangoMachine, destFile: str) -> int: """copyOut - Copy the autograder feedback from container to destFile on the Tango host. Then, destroy that container. Containers are never reused. @@ -204,20 +158,22 @@ def copyOut(self, vm, destFile): return 0 - def destroyVM(self, vm): + def destroyVM(self, vm: TangoMachine) -> None: """destroyVM - Delete the docker container.""" instanceName = self.instanceName(vm.id, vm.image) volumePath = self.getVolumePath("") # Do a hard kill on corresponding docker container. # Return status does not matter. - timeout(["docker", "rm", "-f", instanceName], config.Config.DOCKER_RM_TIMEOUT) + VMMSUtils.timeout( + ["docker", "rm", "-f", instanceName], config.Config.DOCKER_RM_TIMEOUT + ) # Destroy corresponding volume if it exists. if instanceName in os.listdir(volumePath): shutil.rmtree(volumePath + instanceName) self.log.debug("Deleted volume %s" % instanceName) return - def safeDestroyVM(self, vm): + def safeDestroyVM(self, vm: TangoMachine) -> None: """safeDestroyVM - Delete the docker container and make sure it is removed. """ @@ -229,7 +185,7 @@ def safeDestroyVM(self, vm): self.destroyVM(vm) return - def getVMs(self): + def getVMs(self) -> List[TangoMachine]: """getVMs - Executes and parses `docker ps`. This function is a lot of parsing and can break easily. """ @@ -247,15 +203,15 @@ def getVMs(self): machines.append(machine) return machines - def existsVM(self, vm): + def existsVM(self, vm: TangoMachine) -> bool: """existsVM - Executes `docker inspect CONTAINER`, which returns a non-zero status upon not finding a container. """ instanceName = self.instanceName(vm.id, vm.name) - ret = timeout(["docker", "inspect", instanceName]) + ret = VMMSUtils.timeout(["docker", "inspect", instanceName]) return ret == 0 - def getImages(self): + def getImages(self) -> List[str]: """getImages - Executes `docker images` and returns a list of images that can be used to boot a docker container with. This function is a lot of parsing and so can break easily. @@ -272,7 +228,7 @@ def getImages(self): result.add(re.sub(r".*/([^/]*)", r"\1", row_l[0])) return list(result) - def getPartialOutput(self, vm): + def getPartialOutput(self, vm: TangoMachine) -> str: """getPartialOutput - Get the partial output of a job. It does not check if the docker container exists before executing as the command will not fail even if the container does not exist. diff --git a/vmms/sharedUtils.py b/vmms/sharedUtils.py new file mode 100644 index 00000000..15cb2777 --- /dev/null +++ b/vmms/sharedUtils.py @@ -0,0 +1,46 @@ +import subprocess +import time +import os +from typing import List +import config + + +class VMMSUtils: + @staticmethod + def constructInstanceName(id: int, name: str) -> str: + """instanceName - Constructs a VM instance name. Always use + this function when you need a VM instance name. Never generate + instance names manually. + """ + return "%s-%d-%s" % (config.Config.PREFIX, id, name) + + @staticmethod + def timeout(command: List[str], time_out: float = 1) -> int: + """timeout - Run a unix command with a timeout. Return -1 on + timeout, otherwise return the return value from the command, which + is typically 0 for success, 1-255 for failure. + """ + + # Launch the command + p = subprocess.Popen( + command, stdout=open("/dev/null", "w"), stderr=subprocess.STDOUT + ) + + # Wait for the command to complete + t = 0.0 + while t < time_out and p.poll() is None: + time.sleep(config.Config.TIMER_POLL_INTERVAL) + t += config.Config.TIMER_POLL_INTERVAL + + # Determine why the while loop terminated + returncode: int + poll_result = p.poll() + if poll_result is None: + try: + os.kill(p.pid, 9) + except OSError: + pass + returncode = -1 + else: + returncode = poll_result + return returncode diff --git a/vmms/tashiSSH.py b/vmms/tashiSSH.py index e0e7c58e..6562abfe 100644 --- a/vmms/tashiSSH.py +++ b/vmms/tashiSSH.py @@ -54,40 +54,6 @@ def timeout(command, time_out=1): return returncode -def timeoutWithReturnStatus(command, time_out, returnValue=0): - """timeoutWithReturnStatus - Run a Unix command with a timeout, - until the expected value is returned by the command; On timeout, - return last error code obtained from the command. - """ - if (config.Config.LOGLEVEL is logging.DEBUG) and ( - "ssh" in command or "scp" in command - ): - out = sys.stdout - err = sys.stderr - else: - out = open("/dev/null", "w") - err = sys.stdout - - # Launch the command - p = subprocess.Popen( - command, stdout=open("/dev/null", "w"), stderr=subprocess.STDOUT - ) - - t = 0.0 - while t < time_out: - ret = p.poll() - if ret is None: - time.sleep(config.Config.TIMER_POLL_INTERVAL) - t += config.Config.TIMER_POLL_INTERVAL - elif ret == returnValue: - return ret - else: - p = subprocess.Popen( - command, stdout=open("/dev/null", "w"), stderr=subprocess.STDOUT - ) - return ret - - # # User defined exceptions # @@ -98,7 +64,7 @@ class tashiCallError(Exception): pass -class TashiSSH(object): +class TashiSSH(VMMSInterface): _SSH_FLAGS = [ "-q", "-i", @@ -254,7 +220,7 @@ def waitVM(self, vm, max_secs): # Sleep a bit before trying again time.sleep(config.Config.TIMER_POLL_INTERVAL) - def copyIn(self, vm, inputFiles): + def copyIn(self, vm, inputFiles, job_id=None): """copyIn - Copy input files to VM""" domain_name = self.domainName(vm.id, vm.name) self.log.debug("Creating autolab directory on VM") @@ -292,7 +258,7 @@ def copyIn(self, vm, inputFiles): return ret return 0 - def runJob(self, vm, runTimeout, maxOutputFileSize): + def runJob(self, vm, runTimeout, maxOutputFileSize, disableNetwork): """runJob - Run the make command on a VM using SSH and redirect output to file "output". """ diff --git a/worker.py b/worker.py index cf62b61f..aab2e46b 100644 --- a/worker.py +++ b/worker.py @@ -15,7 +15,11 @@ from datetime import datetime from config import Config from jobQueue import JobQueue -from tangoObjects import TangoMachine +from tangoObjects import TangoMachine, TangoJob +from typing import Dict, Optional +from vmms.interface import VMMSInterface +from preallocator import Preallocator + # # Worker - The worker class is very simple and very dumb. The goal is # to walk through the VMMS interface, track the job's progress, and if @@ -27,20 +31,28 @@ # anything else in the system. # + class DetachMethod(Enum): RETURN_TO_POOL = "return_to_pool" DESTROY_WITHOUT_REPLACEMENT = "destroy_without_replacement" DESTROY_AND_REPLACE = "replace" -# We always preallocate a VM for the worker to use +# We always preallocate a VM for the worker to use, hence it isn't Optional class Worker(threading.Thread): - def __init__(self, job, vmms, jobQueue, preallocator, preVM: TangoMachine): + def __init__( + self, + job: TangoJob, + vmms: VMMSInterface, + jobQueue: JobQueue, + preallocator: Preallocator, + preVM: TangoMachine, + ): threading.Thread.__init__(self) self.daemon = True self.job = job - self.vmms = vmms - self.jobQueue : JobQueue = jobQueue + self.vmms: VMMSInterface = vmms + self.jobQueue: JobQueue = jobQueue self.preallocator = preallocator self.preVM = preVM threading.Thread.__init__(self) @@ -52,8 +64,7 @@ def __init__(self, job, vmms, jobQueue, preallocator, preVM: TangoMachine): # def __del__(self): assert self.cleanupStatus, "Worker must call detachVM before returning" - - + def detachVM(self, detachMethod: DetachMethod): """detachVM - Detach the VM from this worker. The options are to return it to the pool's free list (return_vm), destroy it @@ -83,8 +94,7 @@ def detachVM(self, detachMethod: DetachMethod): else: raise ValueError(f"Invalid detach method: {detachMethod}") - # TODO: figure out what hdrfile, ret and err are - def rescheduleJob(self, hdrfile, ret, err): + def rescheduleJob(self, hdrfile: str, ret: Dict[str, int], err: str) -> None: """rescheduleJob - Reschedule a job that has failed because of a system error, such as a VM timing out or a connection failure. @@ -97,7 +107,10 @@ def rescheduleJob(self, hdrfile, ret, err): # Try a few times before giving up if self.job.retries < Config.JOB_RETRIES: - self.log.error("Retrying job %s:%d, retries: %d" % (self.job.name, self.job.id, self.job.retries)) + self.log.error( + "Retrying job %s:%d, retries: %d" + % (self.job.name, self.job.id, self.job.retries) + ) self.job.appendTrace( "%s|Retrying job %s:%d, retries: %d" % (datetime.now().ctime(), self.job.name, self.job.id, self.job.retries) @@ -112,20 +125,22 @@ def rescheduleJob(self, hdrfile, ret, err): # Here is where we give up else: full_err = f"Internal Error: {err}. Unable to complete job after {Config.JOB_RETRIES} tries. Please resubmit.\nJob status: waitVM={ret['waitvm']} initializeVM={ret['initializevm']} copyIn={ret['copyin']} runJob={ret['runjob']} copyOut={ret['copyout']}" - self.log.error(f"Giving up on job %s:%d. %s" % (self.job.name, self.job.id, full_err)) + self.log.error( + f"Giving up on job %s:%d. %s" % (self.job.name, self.job.id, full_err) + ) self.job.appendTrace( "%s|Giving up on job %s:%d. %s" % (datetime.now().ctime(), self.job.name, self.job.id, full_err) ) self.afterJobExecution(hdrfile, full_err, DetachMethod.DESTROY_AND_REPLACE) - def appendMsg(self, filename, msg): + def appendMsg(self, filename: str, msg: str) -> None: """appendMsg - Append a timestamped Tango message to a file""" f = open(filename, "a") f.write("Autograder [%s]: %s\n" % (datetime.now().ctime(), msg)) f.close() - def catFiles(self, f1, f2): + def catFiles(self, f1: str, f2: str) -> None: """catFiles - cat f1 f2 > f2, where f1 is the Tango header and f2 is the output from the Autodriver """ @@ -144,7 +159,7 @@ def catFiles(self, f1, f2): os.rename(tmpname, f2) os.remove(f1) - def notifyServer(self, job): + def notifyServer(self, job: TangoJob) -> None: try: if job.notifyURL: outputFileName = job.outputFile.split("/")[-1] # get filename from path @@ -154,7 +169,7 @@ def notifyServer(self, job): self.log.debug("Sending request to %s" % job.notifyURL) with requests.session() as s: # urllib3 retry, allow POST to be retried, use backoffs - r = Retry(total=10, allowed_methods=False, backoff_factor=1) + r = Retry(total=10, allowed_methods=None, backoff_factor=1) s.mount("http://", HTTPAdapter(max_retries=r)) s.mount("https://", HTTPAdapter(max_retries=r)) response = s.post( @@ -162,22 +177,26 @@ def notifyServer(self, job): ) self.log.info( "Response from callback to %s:%s" - % (job.notifyURL, response.content) + % (job.notifyURL, response.content.decode()) ) fh.close() else: - self.log.info("No callback URL for job %s:%d" % (self.job.name, self.job.id)) + self.log.info( + "No callback URL for job %s:%d" % (self.job.name, self.job.id) + ) except Exception as e: self.log.debug("Error in notifyServer: %s" % str(e)) - def afterJobExecution(self, hdrfile, msg, detachMethod: DetachMethod): + def afterJobExecution( + self, hdrfile: str, msg: str, detachMethod: DetachMethod + ) -> None: self.jobQueue.makeDead(self.job, msg) - + # Update the text that users see in the autodriver output file self.appendMsg(hdrfile, msg) self.catFiles(hdrfile, self.job.outputFile) os.chmod(self.job.outputFile, 0o644) - + # Thread exit after termination self.detachVM(detachMethod) self.notifyServer(self.job) @@ -186,17 +205,11 @@ def afterJobExecution(self, hdrfile, msg, detachMethod: DetachMethod): # # Main worker function # - def run(self): + def run(self) -> None: """run - Step a job through its execution sequence""" try: # Hash of return codes for each step - ret = {} - ret["waitvm"] = None - ret["initializevm"] = None - ret["copyin"] = None - ret["runjob"] = None - ret["copyout"] = None - print("HELLO") + ret: Dict[str, int] = {} self.log.debug("Run worker") vm = None @@ -225,7 +238,7 @@ def run(self): ) ) self.log.debug("Assigned job to preallocated VM") - ret["initializevm"] = 0 # Vacuous success since it doesn't happen + ret["initializevm"] = 0 # Vacuous success since it doesn't happen vm = self.job.vm @@ -247,7 +260,9 @@ def run(self): if self.job.stopBefore == "waitvm": msg = "Execution stopped before %s" % self.job.stopBefore self.job.setKeepForDebugging(True) - self.afterJobExecution(hdrfile, msg, detachMethod=DetachMethod.DESTROY_AND_REPLACE) + self.afterJobExecution( + hdrfile, msg, detachMethod=DetachMethod.DESTROY_AND_REPLACE + ) return ret["waitvm"] = self.vmms.waitVM(vm, Config.WAITVM_TIMEOUT) @@ -281,16 +296,19 @@ def run(self): self.job.id, ) ) - if (self.job.stopBefore == "copyin"): + if self.job.stopBefore == "copyin": msg = "Execution stopped before %s" % self.job.stopBefore self.job.setKeepForDebugging(True) - self.afterJobExecution(hdrfile, msg, detachMethod=DetachMethod.DESTROY_AND_REPLACE) + self.afterJobExecution( + hdrfile, msg, detachMethod=DetachMethod.DESTROY_AND_REPLACE + ) self.log.debug(msg) return # Copy input files to VM - self.log.debug(f"Before copyIn: ret[copyin] = {ret['copyin']}, job_id: {str(self.job.id)}") ret["copyin"] = self.vmms.copyIn(vm, self.job.input, self.job.id) - self.log.debug(f"After copyIn: ret[copyin] = {ret['copyin']}, job_id: {str(self.job.id)}") + self.log.debug( + f"After copyIn: ret[copyin] = {ret['copyin']}, job_id: {str(self.job.id)}" + ) if ret["copyin"] != 0: Config.copyin_errors += 1 @@ -298,11 +316,7 @@ def run(self): self.job.vm.notes = str(self.job.id) + "_" + self.job.name self.job.setKeepForDebugging(True) self.log.debug(msg) - self.rescheduleJob( - hdrfile, - ret, - msg - ) + self.rescheduleJob(hdrfile, ret, msg) return self.log.info( @@ -314,10 +328,12 @@ def run(self): % (datetime.now().ctime(), self.job.name, self.job.id, ret["copyin"]) ) - if (self.job.stopBefore == "runjob"): + if self.job.stopBefore == "runjob": msg = "Execution stopped before %s" % self.job.stopBefore self.job.setKeepForDebugging(True) - self.afterJobExecution(hdrfile, msg, detachMethod=DetachMethod.DESTROY_AND_REPLACE) + self.afterJobExecution( + hdrfile, msg, detachMethod=DetachMethod.DESTROY_AND_REPLACE + ) return # Run the job on the virtual machine ret["runjob"] = self.vmms.runJob( @@ -348,13 +364,9 @@ def run(self): ret["runjob"] ) Config.runjob_errors += 1 - self.rescheduleJob( - hdrfile, - ret, - msg - ) + self.rescheduleJob(hdrfile, ret, msg) return - + self.log.info( "Job %s:%d executed [status=%s]" % (self.job.name, self.job.id, ret["runjob"]) @@ -364,10 +376,12 @@ def run(self): % (datetime.now().ctime(), self.job.name, self.job.id, ret["runjob"]) ) - if (self.job.stopBefore == "copyout"): + if self.job.stopBefore == "copyout": msg = "Execution stopped before %s" % self.job.stopBefore self.job.setKeepForDebugging(True) - self.afterJobExecution(hdrfile, msg, detachMethod=DetachMethod.DESTROY_AND_REPLACE) + self.afterJobExecution( + hdrfile, msg, detachMethod=DetachMethod.DESTROY_AND_REPLACE + ) return # Copy the output back. ret["copyout"] = self.vmms.copyOut(vm, self.job.outputFile) @@ -376,10 +390,10 @@ def run(self): self.rescheduleJob( hdrfile, ret, - f"Internal error: copyOut failed (status={ret['copyout']})" + f"Internal error: copyOut failed (status={ret['copyout']})", ) return - + self.log.info( "Output copied for job %s:%d [status=%d]" % (self.job.name, self.job.id, ret["copyout"]) @@ -395,13 +409,17 @@ def run(self): self.log.info("Success: job %s:%d finished" % (self.job.name, self.job.id)) for status in ret.values(): - assert status == 0, "Should not get to the success point if any stage failed" + assert ( + status == 0 + ), "Should not get to the success point if any stage failed" # TODO: test this, then remove everything below this point - + # Move the job from the live queue to the dead queue # with an explanatory message msg = "Success: Autodriver returned normally" - self.afterJobExecution(hdrfile, msg, detachMethod=DetachMethod.RETURN_TO_POOL) + self.afterJobExecution( + hdrfile, msg, detachMethod=DetachMethod.RETURN_TO_POOL + ) return # @@ -410,11 +428,5 @@ def run(self): except Exception as err: self.log.exception("Internal Error") self.appendMsg(self.job.outputFile, "Internal Error: %s" % err) - # if vm is set, then the normal job assignment completed, - # and detachVM can be run - # if vm is not set but self.preVM is set, we still need - # to return the VM, but have to initialize self.job.vm first - if self.preVM and not vm: - vm = self.job.vm = self.preVM - if vm: - self.detachVM(DetachMethod.DESTROY_AND_REPLACE) + # error must have occurred after the job had its VM set + self.detachVM(DetachMethod.DESTROY_AND_REPLACE)