diff --git a/asu/build.py b/asu/build.py index 945d8f1c..bf903544 100644 --- a/asu/build.py +++ b/asu/build.py @@ -54,17 +54,25 @@ def _build(build_request: BuildRequest, job=None): # Create a minimal job-like object for testing class MinimalJob: def __init__(self): - self.meta = {} + self._meta = {} + + @property + def meta(self): + return self._meta + + def update_meta(self, updates): + self._meta.update(updates) def save_meta(self): pass job = MinimalJob() - job.meta["detail"] = "init" - job.meta["imagebuilder_status"] = "init" - job.meta["request"] = build_request - job.save_meta() + job.update_meta({ + "detail": "init", + "imagebuilder_status": "init", + "request": build_request.model_dump(), + }) log.debug(f"Building {build_request}") @@ -101,8 +109,7 @@ def save_meta(self): } ) - job.meta["imagebuilder_status"] = "container_setup" - job.save_meta() + job.update_meta({"imagebuilder_status": "container_setup"}) log.info(f"Pulling {image}...") try: diff --git a/asu/job_queue.py b/asu/job_queue.py index cafa8d9b..0e6ff606 100644 --- a/asu/job_queue.py +++ b/asu/job_queue.py @@ -1,7 +1,7 @@ """Thread-based job queue implementation to replace RQ. -This module provides Queue and Job classes that mimic RQ's interface but use -SQLite and threading instead of Redis. +This module provides Queue and BuildJob classes for managing firmware build jobs +using SQLite and threading. """ import logging @@ -10,6 +10,8 @@ from datetime import datetime from typing import Any, Callable, Optional +from sqlalchemy.orm import attributes + from asu.database import get_session, Job as JobModel log = logging.getLogger("asu.worker") @@ -42,213 +44,167 @@ def parse_timeout(timeout_str: str) -> int: return int(timeout_str) -class Job: - """Job class that mimics RQ's Job interface. - - This class wraps a database Job model and provides methods compatible - with RQ's Job API. - """ +class BuildJob: + """Represents a firmware build job with simplified state management.""" def __init__(self, job_id: str): - """Initialize Job with job ID. + """Initialize BuildJob with job ID. Args: job_id: Unique job identifier """ self.id = job_id + self._cached_state = None self._future: Optional[Future] = None + self._meta_cache = None # Cache for meta dict to track modifications - @property - def meta(self) -> dict: - """Get job metadata. - - Returns: - Job metadata dictionary - """ - session = get_session() - try: - job_model = session.query(JobModel).filter_by(id=self.id).first() - if job_model: - return job_model.meta or {} - return {} - finally: - session.close() + def get_state(self, refresh: bool = False) -> Optional[dict]: + """Get full job state in one query. - def get_meta(self) -> dict: - """Get job metadata (alias for meta property). + Args: + refresh: Force refresh from database Returns: - Job metadata dictionary + dict with keys: id, status, meta, result, enqueued_at, started_at, + finished_at, exc_string, queue_position (if queued), or None if not found """ - return self.meta + if self._cached_state is None or refresh: + session = get_session() + try: + job_model = session.query(JobModel).filter_by(id=self.id).first() + if not job_model: + return None + + state = { + "id": job_model.id, + "status": job_model.status, + "meta": job_model.meta or {}, + "result": job_model.result, + "enqueued_at": job_model.enqueued_at, + "started_at": job_model.started_at, + "finished_at": job_model.finished_at, + "exc_string": job_model.exc_string, + } + + # Calculate queue position if queued + if job_model.status == "queued": + position = ( + session.query(JobModel) + .filter( + JobModel.status == "queued", + JobModel.enqueued_at < job_model.enqueued_at, + ) + .count() + ) + state["queue_position"] = position + else: + state["queue_position"] = None + + self._cached_state = state + self._meta_cache = None # Invalidate meta cache when state refreshes + finally: + session.close() - def save_meta(self) -> None: - """Save job metadata to database. + return self._cached_state - Note: Since meta is accessed via property, this method ensures - any changes to meta are persisted. - """ - # Meta is accessed and saved via the meta property setter - # This method is here for API compatibility - pass - - @meta.setter - def meta(self, value: dict) -> None: - """Set job metadata. + def update_meta(self, meta_updates: dict): + """Update job metadata efficiently. Args: - value: Metadata dictionary to set + meta_updates: Dictionary of metadata to update/add """ session = get_session() try: job_model = session.query(JobModel).filter_by(id=self.id).first() if job_model: - job_model.meta = value + current_meta = dict(job_model.meta or {}) + current_meta.update(meta_updates) + # Force SQLAlchemy to detect the change by creating a new dict + job_model.meta = current_meta + # Mark the attribute as modified to ensure SQLAlchemy detects the change + attributes.flag_modified(job_model, "meta") session.commit() + # Invalidate caches + self._cached_state = None + self._meta_cache = None finally: session.close() - def set_meta(self, key: str, value: Any) -> None: - """Set a specific metadata key. + def update_status(self, status: str, **fields): + """Update job status and optional fields. Args: - key: Metadata key - value: Value to set - """ - current_meta = self.meta - current_meta[key] = value - self.meta = current_meta - - @property - def enqueued_at(self) -> Optional[datetime]: - """Get when the job was enqueued. - - Returns: - Enqueue timestamp + status: New status (queued, started, finished, failed) + **fields: Additional fields to update (result, exc_string, etc.) """ session = get_session() try: job_model = session.query(JobModel).filter_by(id=self.id).first() if job_model: - return job_model.enqueued_at - return None - finally: - session.close() - - def get_position(self) -> Optional[int]: - """Get position in queue. - - Returns: - Position in queue (0-based), or None if not queued - """ - if not self.is_queued: - return None - - session = get_session() - try: - # Count jobs that were enqueued before this one and are still queued - position = ( - session.query(JobModel) - .filter( - JobModel.status == "queued", - JobModel.enqueued_at < self.enqueued_at, - ) - .count() - ) - return position - finally: - session.close() - - @property - def is_queued(self) -> bool: - """Check if job is queued. - - Returns: - True if job is queued - """ - session = get_session() - try: - job_model = session.query(JobModel).filter_by(id=self.id).first() - return job_model.status == "queued" if job_model else False + job_model.status = status + for key, value in fields.items(): + setattr(job_model, key, value) + session.commit() + # Invalidate caches + self._cached_state = None + self._meta_cache = None finally: session.close() @property - def is_started(self) -> bool: - """Check if job has started. - - Returns: - True if job is started - """ - session = get_session() - try: - job_model = session.query(JobModel).filter_by(id=self.id).first() - return job_model.status == "started" if job_model else False - finally: - session.close() + def status(self) -> Optional[str]: + """Get current job status.""" + state = self.get_state() + return state["status"] if state else None @property - def is_finished(self) -> bool: - """Check if job is finished. - - Returns: - True if job is finished + def meta(self) -> dict: + """Get job metadata as a mutable dictionary. + + Returns a dict that can be modified. Call save_meta() to persist changes. """ - session = get_session() - try: - job_model = session.query(JobModel).filter_by(id=self.id).first() - return job_model.status == "finished" if job_model else False - finally: - session.close() - - @property - def is_failed(self) -> bool: - """Check if job has failed. + if self._meta_cache is None: + state = self.get_state() + if state: + # Create a copy so modifications don't affect cached state + self._meta_cache = dict(state["meta"]) + else: + self._meta_cache = {} + return self._meta_cache - Returns: - True if job failed - """ - session = get_session() - try: - job_model = session.query(JobModel).filter_by(id=self.id).first() - return job_model.status == "failed" if job_model else False - finally: - session.close() - - def latest_result(self): - """Get the latest result (for failed jobs, contains exception). + @meta.setter + def meta(self, value: dict) -> None: + """Set job metadata. - Returns: - Result object with exc_string attribute + Args: + value: Metadata dictionary to set (replaces all existing metadata) """ session = get_session() try: job_model = session.query(JobModel).filter_by(id=self.id).first() if job_model: - - class Result: - def __init__(self, exc_string): - self.exc_string = exc_string - - return Result(job_model.exc_string) - return None + job_model.meta = value + session.commit() + # Invalidate caches + self._cached_state = None + self._meta_cache = None finally: session.close() - def return_value(self) -> Any: - """Get the return value for finished jobs. - - Returns: - Job result + def save_meta(self) -> None: + """Save current metadata to database. + + This persists any modifications made to the dict returned by the meta property. """ - session = get_session() - try: - job_model = session.query(JobModel).filter_by(id=self.id).first() - if job_model: - return job_model.result - return None - finally: - session.close() + if self._meta_cache is not None: + # Save the modified meta cache + self.meta = self._meta_cache + + @property + def enqueued_at(self) -> Optional[datetime]: + """Get when the job was enqueued.""" + state = self.get_state() + return state["enqueued_at"] if state else None class Queue: @@ -280,7 +236,7 @@ def enqueue( failure_ttl: Optional[str] = None, job_timeout: Optional[str] = None, **kwargs, - ) -> Job: + ) -> BuildJob: """Enqueue a job for execution. Args: @@ -293,7 +249,7 @@ def enqueue( kwargs: Keyword arguments for function Returns: - Job object + BuildJob object """ if not job_id: import uuid @@ -319,7 +275,7 @@ def enqueue( finally: session.close() - job = Job(job_id) + job = BuildJob(job_id) # Execute job if self.is_async: @@ -358,7 +314,7 @@ def _execute_job( session.close() # Create Job wrapper to pass to function - job_wrapper = Job(job_id) + job_wrapper = BuildJob(job_id) # Execute function try: @@ -390,20 +346,20 @@ def _execute_job( if session: session.close() - def fetch_job(self, job_id: str) -> Optional[Job]: + def fetch_job(self, job_id: str) -> Optional[BuildJob]: """Fetch a job by ID. Args: job_id: Job ID Returns: - Job object or None + BuildJob object or None """ session = get_session() try: job_model = session.query(JobModel).filter_by(id=job_id).first() if job_model: - return Job(job_id) + return BuildJob(job_id) return None finally: session.close() diff --git a/asu/routers/api.py b/asu/routers/api.py index c935c999..10743c16 100644 --- a/asu/routers/api.py +++ b/asu/routers/api.py @@ -140,46 +140,73 @@ def valid_profile(profile: str, build_request: BuildRequest) -> bool: return ({}, None) -def return_job_v1(job) -> tuple[dict, int, dict]: - response: dict = job.get_meta() - imagebuilder_status: str = "done" - queue_position: int = 0 - - if job.meta: - response.update(job.meta) - - if job.is_failed: - error_message: str = job.latest_result().exc_string - if "stderr" in response: - error_message = response["stderr"] + "\n" + error_message - detail: str = response.get("detail", "failed") - if detail == "init": # Happens when container startup fails. - detail = "failed" - response.update(status=500, detail=detail, stderr=error_message) - imagebuilder_status = "failed" +def build_job_response(job) -> tuple[dict, int, dict]: + """Build API response from job state. - elif job.is_queued: - queue_position = job.get_position() or 0 - response.update(status=202, detail="queued", queue_position=queue_position) - imagebuilder_status = "queued" + Args: + job: BuildJob instance - elif job.is_started: - response.update(status=202, detail="started") - imagebuilder_status = response.get("imagebuilder_status", "init") + Returns: + Tuple of (response_dict, status_code, headers_dict) + """ + state = job.get_state(refresh=True) - elif job.is_finished: - response.update(status=200, **job.return_value()) + if not state: + return {"status": 404, "detail": "Job not found"}, 404, {} + + # Base response with metadata (only include safe metadata fields) + meta = state.get("meta", {}) or {} + response = { + "request_hash": state["id"], + "enqueued_at": state["enqueued_at"], + **meta, # Include all metadata + } + + # Status-specific handling + status_code = 200 + imagebuilder_status = "done" + queue_position = 0 + + if state["status"] == "queued": + status_code = 202 + response["status"] = 202 + response["detail"] = "queued" + response["queue_position"] = state.get("queue_position", 0) + queue_position = state.get("queue_position", 0) + imagebuilder_status = "queued" + + elif state["status"] == "started": + status_code = 202 + response["status"] = 202 + response["detail"] = "started" + imagebuilder_status = meta.get("imagebuilder_status", "init") + + elif state["status"] == "finished": + status_code = 200 + response["status"] = 200 + if state["result"]: + response.update(state["result"]) imagebuilder_status = "done" + elif state["status"] == "failed": + status_code = 500 + response["status"] = 500 + error_message = state.get("exc_string", "Unknown error") + if "stderr" in meta: + error_message = meta["stderr"] + "\n" + error_message + detail = meta.get("detail", "failed") + if detail == "init": + detail = "failed" + response.update(status=500, detail=detail, stderr=error_message) + imagebuilder_status = "failed" + headers = { "X-Imagebuilder-Status": imagebuilder_status, "X-Queue-Position": str(queue_position), } - response.update(enqueued_at=job.enqueued_at, request_hash=job.id) - logging.debug(response) - return response, response["status"], headers + return response, status_code, headers @router.head("/build/{request_hash}") @@ -194,7 +221,7 @@ def api_v1_build_get(request: Request, request_hash: str, response: Response) -> "detail": "could not find provided request hash", } - content, status, headers = return_job_v1(job) + content, status, headers = build_job_response(job) response.headers.update(headers) response.status_code = status @@ -259,10 +286,10 @@ def api_v1_build_post( job_timeout=settings.job_timeout, ) else: - if job.is_finished: + if job.status == "finished": add_build_event("cache-hits") - content, status, headers = return_job_v1(job) + content, status, headers = build_job_response(job) response.headers.update(headers) response.status_code = status