Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
ce52b32
Fixed the change by making the retries attribute private (and also th…
anthony-yip Mar 21, 2025
28db9e5
Made all of tangoJob's attributes read only
anthony-yip Mar 23, 2025
9d57597
fixed the local/remote desyncing issues, left some print statements i…
anthony-yip Aug 24, 2025
3bd80cf
Better encapsulation of TangoJob._remoteLocation
anthony-yip Aug 24, 2025
f0c0d32
Fixing uses of makeDead that was changed 2 commits ago
anthony-yip Aug 24, 2025
5b5cbc0
Merge branch 'copy-in' into anthonyyip/infinite_retries_bugfix
anthony-yip Aug 25, 2025
ba83509
comments, logging and todos
anthony-yip Aug 30, 2025
600803c
Code to enable spot instances
anthony-yip Sep 16, 2025
d450112
type annotations
anthony-yip Sep 16, 2025
029c1a9
finished mypy stuff up to not using --check-untyped-defs
anthony-yip Sep 16, 2025
85bdf7c
doesn't quite work, but wanted a checkpoint before muddling with the …
anthony-yip Sep 22, 2025
7e6751e
more typing changes, streamlining the used of the TangoDictionary in …
anthony-yip Sep 23, 2025
df044e3
Merge branch 'copy-in' into anthonyyip/infinite_retries_bugfix
anthony-yip Sep 23, 2025
4c8a521
Merge branch 'anthonyyip/infinite_retries_bugfix' into anthonyyip/spo…
anthony-yip Sep 23, 2025
03e1f31
refactored afterJobExecution and detachVM
anthony-yip Sep 28, 2025
bc8c8a7
error messages
anthony-yip Sep 28, 2025
20b833b
code cleanup: worker always gets initialized with a preallocated VM (…
anthony-yip Sep 28, 2025
d2e069c
Merge branch 'copy-in' into anthonyyip/spot_instances
anthony-yip Oct 27, 2025
138e332
fixed logging type safety
anthony-yip Oct 27, 2025
d1a8b27
always assert that detachVM is called (taken care of with .keep_for_d…
anthony-yip Oct 27, 2025
de38329
more todos
anthony-yip Oct 27, 2025
299e999
stop Before bug fix
anthony-yip Oct 28, 2025
f44735e
tangodictionary expanded key type
anthony-yip Nov 6, 2025
06b8022
Merge branch 'anthonyyip/spot_instances' into anthonyyip/type_annotat…
anthony-yip Nov 6, 2025
f7b4acb
more no-op type annotations
anthony-yip Nov 6, 2025
b643a98
'no issues found in 25 source files'
anthony-yip Nov 6, 2025
5439c15
changed ret in worker.py to not have Nones
anthony-yip Nov 6, 2025
2dc0ee7
moved timeout into VMMSUtils
anthony-yip Nov 11, 2025
bababf9
tidied code: return statement in timeout_with_retries
anthony-yip Nov 11, 2025
ee7d71e
added to the VMMSInterfae
anthony-yip Nov 11, 2025
6abab6c
all the changes that are basically no-ops
anthony-yip Nov 11, 2025
2325e54
empty line
anthony-yip Nov 11, 2025
5b34e24
replaced _clean with make_empty for both TangoQueue and TangoDictionary
anthony-yip Nov 18, 2025
ee45c4d
removed dead code
anthony-yip Nov 18, 2025
ff65d39
changed some dead code in createSecurityGroup to allow for type safety
anthony-yip Dec 2, 2025
0c7d45b
removed unused variable
anthony-yip Dec 2, 2025
40b7ace
fixed error logging in server.py
anthony-yip Dec 2, 2025
25a9cf2
proper decoding of bytes
anthony-yip Dec 2, 2025
d4a8c55
preallocator changes
anthony-yip Dec 2, 2025
e71b5b1
fixes
anthony-yip Dec 2, 2025
ed2984e
Merge branch 'anthonyyip/spot_instances' into anthonyyip/type_annotat…
anthony-yip Dec 2, 2025
b30279f
Merge branch 'ec2-new-implementation' into anthonyyip/type_annotations
anthony-yip Dec 2, 2025
620b949
added type annotations to everything such that mypy . doesn't give an…
anthony-yip Dec 2, 2025
8eed4db
local docker type annotations
anthony-yip Dec 2, 2025
9b1d898
linting fix
anthony-yip Dec 2, 2025
b0bb611
clean up worker.py
anthony-yip Mar 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
306 changes: 190 additions & 116 deletions clients/tango-cli.py

Large diffs are not rendered by default.

24 changes: 14 additions & 10 deletions jobManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from tangoObjects import TangoJob, TangoQueue, TangoMachine
from typing import List, Tuple
from worker import Worker

from vmms.interface import VMMSInterface


class JobManager(object):
Expand Down Expand Up @@ -66,13 +66,16 @@ def __manage(self) -> None:
# Blocks until we get a next job
job: TangoJob = self.jobQueue.getNextPendingJob()
if not job.accessKey and Config.REUSE_VMS:
self.log.info(f"job has access key {job.accessKey} and is calling reuseVM")
self.log.info(
f"job has access key {job.accessKey} and is calling reuseVM"
)
vm = None
while vm is None:
vm = self.jobQueue.reuseVM(job)
# Sleep for a bit and then check again
time.sleep(Config.DISPATCH_PERIOD)

vmms: VMMSInterface
try:
# if the job is a ec2 vmms job
# spin up an ec2 instance for that job
Expand All @@ -90,9 +93,7 @@ def __manage(self) -> None:
self.log.error("ERROR initialization VM: %s", e)
self.log.error(traceback.format_exc())
if preVM is None:
raise Exception(
"EC2 SSH VM initialization failed: see log"
)
raise Exception("EC2 SSH VM initialization failed: see log")
else:
self.log.info(f"job {job.id} is not an ec2 vmms job")
# Try to find a vm on the free list and allocate it to
Expand Down Expand Up @@ -120,15 +121,15 @@ def __manage(self) -> None:
)
# Mark the job assigned
self.jobQueue.assignJob(job.id, preVM)
Worker(
job, vmms, self.jobQueue, self.preallocator, preVM
).start()
Worker(job, vmms, self.jobQueue, self.preallocator, preVM).start()

except Exception as err:
if job is None:
self.log.info("job_manager: job is None")
else:
self.log.error("job failed during creation %d %s" % (job.id, str(err)))
self.log.error(
"job failed during creation %d %s" % (job.id, str(err))
)
self.jobQueue.makeDead(job, str(err))


Expand All @@ -144,7 +145,10 @@ def __manage(self) -> None:
tango_server.log.debug("Resetting Tango VMs")
tango_server.resetTango(tango_server.preallocator.vmms)
for key in tango_server.preallocator.machines.keys():
machine: Tuple[List[TangoMachine], TangoQueue] = ([], TangoQueue.create(key))
machine: Tuple[List[TangoMachine], TangoQueue] = (
[],
TangoQueue.create(key),
)
machine[1].make_empty()
tango_server.preallocator.machines.set(key, machine)

Expand Down
37 changes: 21 additions & 16 deletions jobQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import time

from datetime import datetime
from tangoObjects import TangoDictionary, TangoJob, TangoQueue
from tangoObjects import TangoDictionary, TangoJob, TangoQueue, TangoMachine
from config import Config
from preallocator import Preallocator
from typing import Optional

#
# JobQueue - This class defines the job queue and the functions for
Expand All @@ -31,7 +33,7 @@


class JobQueue(object):
def __init__(self, preallocator):
def __init__(self, preallocator: Preallocator) -> None:
"""
Here we maintain several data structures used to keep track of the
jobs present for the autograder.
Expand All @@ -54,10 +56,12 @@ def __init__(self, preallocator):
using the makeUnassigned api.
"""
self.liveJobs: TangoDictionary[TangoJob] = TangoDictionary.create("liveJobs")
self.deadJobs: TangoDictionary[TangoJob] = TangoDictionary.create("deadJobs")
self.unassignedJobs = TangoQueue.create("unassignedLiveJobs")
self.deadJobs: TangoDictionary[TangoJob] = TangoDictionary.create(
"deadJobs"
) # Servees as a record of both failed and completed jobs
self.unassignedJobs: TangoQueue[int] = TangoQueue.create("unassignedLiveJobs")
self.queueLock = threading.Lock()
self.preallocator = preallocator
self.preallocator: Preallocator = preallocator
self.log = logging.getLogger("JobQueue")
self.nextID = 1

Expand Down Expand Up @@ -136,7 +140,7 @@ def add(self, job):

# Since we assume that the job is new, we set the number of retries
# of this job to 0
assert(job.retries == 0)
assert job.retries == 0

# Add the job to the queue. Careful not to append the trace until we
# know the job has actually been added to the queue.
Expand Down Expand Up @@ -168,7 +172,8 @@ def add(self, job):

return str(job.id)

def addDead(self, job):
# TODO: get rid of this return value, it is not used anywhere
def addDead(self, job) -> int:
"""addDead - add a job to the dead queue.
Called by validateJob when a job validation fails.
Returns -1 on failure and the job id on success
Expand Down Expand Up @@ -246,10 +251,10 @@ def get(self, id):
self.log.debug("get| Released lock to job queue.")
return job

# TODO: this function is a little weird. It sets the state of job to be "assigned", but not to which worker.
# TODO: this function is a little weird. It sets the state of job to be "assigned", but not to which worker.
# TODO: It does assign the job to a particular VM though.
# Precondition: jobId is in self.liveJobs
def assignJob(self, jobId, vm=None) -> None:
def assignJob(self, jobId, vm=None):
"""assignJob - marks a job to be assigned"""
self.queueLock.acquire()
self.log.debug("assignJob| Acquired lock to job queue.")
Expand All @@ -272,7 +277,7 @@ def assignJob(self, jobId, vm=None) -> None:
# return job

# TODO: Rename this job to be more accurate in its description
def unassignJob(self, jobId):
def unassignJob(self, jobId: int) -> None:
"""unassignJob - marks a job to be unassigned
Note: We assume here that a job is to be rescheduled or
'retried' when you unassign it. This retry is done by
Expand All @@ -282,7 +287,7 @@ def unassignJob(self, jobId):
self.log.debug("unassignJob| Acquired lock to job queue.")

# Get the current job
job = self.liveJobs.get(jobId)
job = self.liveJobs.getExn(jobId)

# Increment the number of retires
if job.retries is None:
Expand Down Expand Up @@ -313,11 +318,11 @@ def makeDead(self, job: TangoJob, reason):
if job.id not in self.liveJobs:
self.log.error("makeDead| Job ID: %s not found in live jobs" % (job.id))
return -1

self.log.info("makeDead| Found job ID: %s in the live queue" % (job.id))
status = 0
self.log.info("Terminated job %s:%s: %s" % (job.name, job.id, reason))

# Remove the job from the live jobs dictionary
job.deleteFromDict(self.liveJobs)
# Add the job to the dead jobs dictionary
Expand Down Expand Up @@ -356,6 +361,9 @@ def getNextPendingJob(self) -> TangoJob:
"""
# Blocks till the next item is added
id = self.unassignedJobs.get()
assert (
id is not None
), ".get with default arguments should block and never return None"

self.log.debug("_getNextPendingJob|Acquiring lock to job queue.")
self.queueLock.acquire()
Expand All @@ -365,7 +373,6 @@ def getNextPendingJob(self) -> TangoJob:
job = self.liveJobs.get(id)
if job is None:
raise Exception("Cannot find unassigned job in live jobs")

self.log.debug("getNextPendingJob| Releasing lock to job queue.")
self.queueLock.release()
self.log.debug("getNextPendingJob| Released lock to job queue.")
Expand Down Expand Up @@ -393,5 +400,3 @@ def reuseVM(self, job):
return job.vm
else:
raise Exception("Job assigned without vm")


7 changes: 5 additions & 2 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@ explicit_package_bases = True
[mypy-vmms.tashiSSH]
ignore_errors = True

[mypy-preallocator]
ignore_errors = True
[mypy-vmms.distDocker]
ignore_errors = True

[mypy-tests.*]
ignore_errors = True
Loading