diff --git a/.gitignore b/.gitignore index 8ad46643..641188e2 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,6 @@ asu.egg-info/ .coverage **/__pycache__/ *.pyc +*.db +*.db-shm +*.db-wal diff --git a/asu/build.py b/asu/build.py index 7cb55b38..945d8f1c 100644 --- a/asu/build.py +++ b/asu/build.py @@ -7,8 +7,6 @@ from typing import Union from time import perf_counter -from rq import get_current_job -from rq.utils import parse_timeout from podman import errors from asu.build_request import BuildRequest @@ -31,7 +29,7 @@ run_cmd, ) -log = logging.getLogger("rq.worker") +log = logging.getLogger("asu.worker") def _build(build_request: BuildRequest, job=None): @@ -40,7 +38,8 @@ def _build(build_request: BuildRequest, job=None): The `request` dict contains properties of the requested image. Args: - request (dict): Contains all properties of requested image + build_request: BuildRequest object containing all properties of requested image + job: Job object for tracking build progress """ build_start: float = perf_counter() @@ -50,7 +49,18 @@ def _build(build_request: BuildRequest, job=None): bin_dir.mkdir(parents=True, exist_ok=True) log.debug(f"Bin dir: {bin_dir}") - job = job or get_current_job() + # Job is now passed as parameter instead of using get_current_job() + if job is None: + # Create a minimal job-like object for testing + class MinimalJob: + def __init__(self): + self.meta = {} + + def save_meta(self): + pass + + job = MinimalJob() + job.meta["detail"] = "init" job.meta["imagebuilder_status"] = "init" job.meta["request"] = build_request @@ -168,6 +178,9 @@ def _build(build_request: BuildRequest, job=None): log.debug("Mounts: %s", mounts) + # Parse timeout locally instead of using rq.utils.parse_timeout + from asu.job_queue import parse_timeout + container = podman.containers.create( image, command=["sleep", str(parse_timeout(settings.job_timeout))], diff --git a/asu/config.py b/asu/config.py index 92c57041..ec1b3028 100644 --- a/asu/config.py +++ b/asu/config.py @@ -1,5 +1,5 @@ from pathlib import Path -from typing import Union +from typing import Union, Optional from pydantic_settings import BaseSettings, SettingsConfigDict @@ -64,7 +64,8 @@ class Settings(BaseSettings): model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8") public_path: Path = Path.cwd() / "public" - redis_url: str = "redis://localhost:6379" + database_path: Optional[Path] = None # Will default to public_path / "asu.db" + worker_threads: int = 4 upstream_url: str = "https://downloads.openwrt.org" allow_defaults: bool = False async_queue: bool = True @@ -98,5 +99,11 @@ class Settings(BaseSettings): max_pending_jobs: int = 200 job_timeout: str = "10m" + def __init__(self, **kwargs): + super().__init__(**kwargs) + # Set default database_path if not provided + if self.database_path is None: + self.database_path = self.public_path / "asu.db" + settings = Settings() diff --git a/asu/database.py b/asu/database.py new file mode 100644 index 00000000..ef1b3a51 --- /dev/null +++ b/asu/database.py @@ -0,0 +1,173 @@ +"""Database module for ASU using SQLAlchemy. + +This module provides SQLAlchemy models and database utilities for managing +build jobs and statistics without requiring Redis. +""" + +import logging +from datetime import UTC, datetime, timedelta +from pathlib import Path + +from sqlalchemy import ( + JSON, + Column, + DateTime, + Integer, + String, + create_engine, + event, +) +from sqlalchemy.engine import Engine +from sqlalchemy.orm import declarative_base, scoped_session, sessionmaker + +log = logging.getLogger("asu.worker") + +Base = declarative_base() + + +class Job(Base): + """Model for storing job information.""" + + __tablename__ = "jobs" + + id = Column(String, primary_key=True) + status = Column(String, default="queued") # queued, started, finished, failed + meta = Column(JSON, default=dict) + result = Column(JSON, nullable=True) + enqueued_at = Column(DateTime, default=lambda: datetime.now(UTC)) + started_at = Column(DateTime, nullable=True) + finished_at = Column(DateTime, nullable=True) + failure_ttl = Column(Integer, nullable=True) # in seconds + result_ttl = Column(Integer, nullable=True) # in seconds + exc_string = Column(String, nullable=True) # Exception string for failed jobs + + def __repr__(self): + return f"" + + +class BuildStats(Base): + """Model for storing build statistics.""" + + __tablename__ = "build_stats" + + id = Column(Integer, primary_key=True, autoincrement=True) + event_type = Column(String, index=True) + timestamp = Column(DateTime, default=lambda: datetime.now(UTC), index=True) + event_metadata = Column(JSON, default=dict) + + def __repr__(self): + return f"" + + +# Global session factory +_session_factory = None +_engine = None + + +@event.listens_for(Engine, "connect") +def set_sqlite_pragma(dbapi_conn, connection_record): + """Enable SQLite optimizations for multi-threaded use.""" + cursor = dbapi_conn.cursor() + cursor.execute("PRAGMA journal_mode=WAL") + cursor.execute("PRAGMA synchronous=NORMAL") + cursor.execute("PRAGMA cache_size=-64000") # 64MB cache + cursor.close() + + +def init_database(database_path: Path) -> None: + """Initialize the database and create tables. + + Args: + database_path: Path to the SQLite database file + """ + global _session_factory, _engine + + database_path.parent.mkdir(parents=True, exist_ok=True) + database_url = f"sqlite:///{database_path}" + + _engine = create_engine( + database_url, + connect_args={"check_same_thread": False}, + pool_size=5, + max_overflow=10, + pool_pre_ping=True, + ) + + Base.metadata.create_all(_engine) + + _session_factory = scoped_session( + sessionmaker(autocommit=False, autoflush=False, bind=_engine) + ) + + log.info(f"Database initialized at {database_path}") + + +def get_session(): + """Get a database session. + + Returns: + SQLAlchemy session + """ + if _session_factory is None: + raise RuntimeError("Database not initialized. Call init_database() first.") + return _session_factory() + + +def cleanup_expired_jobs() -> int: + """Remove expired jobs from the database. + + Returns: + Number of jobs removed + """ + session = get_session() + try: + now = datetime.now(UTC) + expired_count = 0 + + # Find expired finished jobs + finished_jobs = ( + session.query(Job) + .filter(Job.status == "finished", Job.result_ttl.isnot(None)) + .all() + ) + + for job in finished_jobs: + if job.finished_at: + expiry_time = job.finished_at + timedelta(seconds=job.result_ttl) + if now > expiry_time: + session.delete(job) + expired_count += 1 + + # Find expired failed jobs + failed_jobs = ( + session.query(Job) + .filter(Job.status == "failed", Job.failure_ttl.isnot(None)) + .all() + ) + + for job in failed_jobs: + if job.finished_at: + expiry_time = job.finished_at + timedelta(seconds=job.failure_ttl) + if now > expiry_time: + session.delete(job) + expired_count += 1 + + session.commit() + return expired_count + finally: + session.close() + + +def close_database() -> None: + """Close database connections.""" + global _session_factory, _engine + + if _session_factory: + _session_factory.remove() + _session_factory = None + + if _engine: + _engine.dispose() + _engine = None + + log.info("Database connections closed") diff --git a/asu/job_queue.py b/asu/job_queue.py new file mode 100644 index 00000000..335da21f --- /dev/null +++ b/asu/job_queue.py @@ -0,0 +1,486 @@ +"""Thread-based job queue implementation to replace RQ. + +This module provides Queue and Job classes that mimic RQ's interface but use +SQLite and threading instead of Redis. +""" + +import logging +import traceback +from concurrent.futures import ThreadPoolExecutor, Future +from datetime import UTC, datetime +from typing import Any, Callable, Optional + +from asu.database import get_session, Job as JobModel + +log = logging.getLogger("asu.worker") + + +def parse_timeout(timeout_str: str) -> int: + """Parse timeout string like '10m' or '1h' to seconds. + + Args: + timeout_str: Timeout string (e.g., '10m', '1h', '30s') + + Returns: + Timeout in seconds + """ + if timeout_str is None: + return None + + timeout_str = str(timeout_str).strip() + + if timeout_str.endswith("s"): + return int(timeout_str[:-1]) + elif timeout_str.endswith("m"): + return int(timeout_str[:-1]) * 60 + elif timeout_str.endswith("h"): + return int(timeout_str[:-1]) * 3600 + elif timeout_str.endswith("d"): + return int(timeout_str[:-1]) * 86400 + else: + # Assume it's already in seconds + return int(timeout_str) + + +class Job: + """Job class that mimics RQ's Job interface. + + This class wraps a database Job model and provides methods compatible + with RQ's Job API. + """ + + def __init__(self, job_id: str): + """Initialize Job with job ID. + + Args: + job_id: Unique job identifier + """ + self.id = job_id + self._future: Optional[Future] = None + + @property + def meta(self) -> dict: + """Get job metadata. + + Returns: + Job metadata dictionary + """ + session = get_session() + try: + job_model = session.query(JobModel).filter_by(id=self.id).first() + if job_model: + return job_model.meta or {} + return {} + finally: + session.close() + + def get_meta(self) -> dict: + """Get job metadata (alias for meta property). + + Returns: + Job metadata dictionary + """ + return self.meta + + def save_meta(self) -> None: + """Save job metadata to database. + + Note: Since meta is accessed via property, this method ensures + any changes to meta are persisted. + """ + # Meta is accessed and saved via the meta property setter + # This method is here for API compatibility + pass + + @meta.setter + def meta(self, value: dict) -> None: + """Set job metadata. + + Args: + value: Metadata dictionary to set + """ + session = get_session() + try: + job_model = session.query(JobModel).filter_by(id=self.id).first() + if job_model: + job_model.meta = value + session.commit() + finally: + session.close() + + def set_meta(self, key: str, value: Any) -> None: + """Set a specific metadata key atomically. + + Args: + key: Metadata key + value: Value to set + """ + session = get_session() + try: + # Use atomic update with JSON manipulation + job_model = session.query(JobModel).filter_by(id=self.id).first() + if job_model: + current_meta = job_model.meta or {} + current_meta[key] = value + job_model.meta = current_meta + session.commit() + finally: + session.close() + + @property + def enqueued_at(self) -> Optional[datetime]: + """Get when the job was enqueued. + + Returns: + Enqueue timestamp + """ + session = get_session() + try: + job_model = session.query(JobModel).filter_by(id=self.id).first() + if job_model: + return job_model.enqueued_at + return None + finally: + session.close() + + def get_position(self) -> Optional[int]: + """Get position in queue. + + Returns: + Position in queue (0-based), or None if not queued + """ + if not self.is_queued: + return None + + session = get_session() + try: + # Count jobs that were enqueued before this one and are still queued + position = ( + session.query(JobModel) + .filter( + JobModel.status == "queued", + JobModel.enqueued_at < self.enqueued_at, + ) + .count() + ) + return position + finally: + session.close() + + @property + def is_queued(self) -> bool: + """Check if job is queued. + + Returns: + True if job is queued + """ + session = get_session() + try: + job_model = session.query(JobModel).filter_by(id=self.id).first() + return job_model.status == "queued" if job_model else False + finally: + session.close() + + @property + def is_started(self) -> bool: + """Check if job has started. + + Returns: + True if job is started + """ + session = get_session() + try: + job_model = session.query(JobModel).filter_by(id=self.id).first() + return job_model.status == "started" if job_model else False + finally: + session.close() + + @property + def is_finished(self) -> bool: + """Check if job is finished. + + Returns: + True if job is finished + """ + session = get_session() + try: + job_model = session.query(JobModel).filter_by(id=self.id).first() + return job_model.status == "finished" if job_model else False + finally: + session.close() + + @property + def is_failed(self) -> bool: + """Check if job has failed. + + Returns: + True if job failed + """ + session = get_session() + try: + job_model = session.query(JobModel).filter_by(id=self.id).first() + return job_model.status == "failed" if job_model else False + finally: + session.close() + + def latest_result(self): + """Get the latest result (for failed jobs, contains exception). + + Returns: + Result object with exc_string attribute + """ + session = get_session() + try: + job_model = session.query(JobModel).filter_by(id=self.id).first() + if job_model: + + class Result: + def __init__(self, exc_string): + self.exc_string = exc_string + + return Result(job_model.exc_string) + return None + finally: + session.close() + + def return_value(self) -> Any: + """Get the return value for finished jobs. + + Returns: + Job result + """ + session = get_session() + try: + job_model = session.query(JobModel).filter_by(id=self.id).first() + if job_model: + return job_model.result + return None + finally: + session.close() + + +class Queue: + """Queue class that mimics RQ's Queue interface. + + This class manages job submission and execution using a ThreadPoolExecutor. + """ + + def __init__(self, max_workers: int = 4, is_async: bool = True): + """Initialize Queue. + + Args: + max_workers: Maximum number of worker threads + is_async: Whether to execute jobs asynchronously + """ + self.max_workers = max_workers + self.is_async = is_async + self._executor = ( + ThreadPoolExecutor(max_workers=max_workers) if is_async else None + ) + log.info(f"Queue initialized with {max_workers} workers (async={is_async})") + + def enqueue( + self, + func: Callable, + *args, + job_id: Optional[str] = None, + result_ttl: Optional[str] = None, + failure_ttl: Optional[str] = None, + job_timeout: Optional[str] = None, + **kwargs, + ) -> Job: + """Enqueue a job for execution. + + Args: + func: Function to execute + args: Positional arguments for function + job_id: Unique job ID + result_ttl: Time to keep successful results + failure_ttl: Time to keep failed results + job_timeout: Job execution timeout + kwargs: Keyword arguments for function + + Returns: + Job object + """ + if not job_id: + import uuid + + job_id = str(uuid.uuid4()) + + # Parse TTL values + result_ttl_seconds = parse_timeout(result_ttl) if result_ttl else None + failure_ttl_seconds = parse_timeout(failure_ttl) if failure_ttl else None + + # Create job in database + session = get_session() + try: + job_model = JobModel( + id=job_id, + status="queued", + meta={}, + result_ttl=result_ttl_seconds, + failure_ttl=failure_ttl_seconds, + ) + session.add(job_model) + session.commit() + finally: + session.close() + + job = Job(job_id) + + # Execute job + if self.is_async: + future = self._executor.submit( + self._execute_job, job_id, func, args, kwargs + ) + job._future = future + else: + # Synchronous execution for testing + self._execute_job(job_id, func, args, kwargs) + + return job + + def _execute_job( + self, job_id: str, func: Callable, args: tuple, kwargs: dict + ) -> None: + """Execute a job. + + Args: + job_id: Job ID + func: Function to execute + args: Positional arguments + kwargs: Keyword arguments + """ + session = get_session() + try: + # Update job status to started + job_model = session.query(JobModel).filter_by(id=job_id).first() + if not job_model: + log.error(f"Job {job_id} not found in database") + return + + job_model.status = "started" + job_model.started_at = datetime.now(UTC) + session.commit() + + # Create Job wrapper to pass to function + job_wrapper = Job(job_id) + + # Execute function + try: + log.info(f"Starting job {job_id}") + result = func(*args, job=job_wrapper, **kwargs) + + # Update job with result + session = get_session() + try: + job_model = session.query(JobModel).filter_by(id=job_id).first() + if job_model: + job_model.status = "finished" + job_model.result = result + job_model.finished_at = datetime.now(UTC) + session.commit() + finally: + session.close() + log.info(f"Job {job_id} completed successfully") + + except Exception as e: + log.error(f"Job {job_id} failed: {e}", exc_info=True) + # Update job with error + session = get_session() + try: + job_model = session.query(JobModel).filter_by(id=job_id).first() + if job_model: + job_model.status = "failed" + job_model.exc_string = traceback.format_exc() + job_model.finished_at = datetime.now(UTC) + session.commit() + finally: + session.close() + + finally: + session.close() + + def fetch_job(self, job_id: str) -> Optional[Job]: + """Fetch a job by ID. + + Args: + job_id: Job ID + + Returns: + Job object or None + """ + session = get_session() + try: + job_model = session.query(JobModel).filter_by(id=job_id).first() + if job_model: + return Job(job_id) + return None + finally: + session.close() + + def __len__(self) -> int: + """Get number of queued jobs. + + Returns: + Number of queued jobs + """ + session = get_session() + try: + return session.query(JobModel).filter_by(status="queued").count() + finally: + session.close() + + def shutdown(self, wait: bool = True) -> None: + """Shutdown the queue and thread pool. + + Args: + wait: Whether to wait for running jobs to complete + """ + if self._executor: + log.info("Shutting down job queue...") + self._executor.shutdown(wait=wait) + log.info("Job queue shutdown complete") + + +# Global queue instance +_queue: Optional[Queue] = None + + +def init_queue(max_workers: int = 4, is_async: bool = True) -> Queue: + """Initialize the global queue. + + Args: + max_workers: Maximum number of worker threads + is_async: Whether to execute jobs asynchronously + + Returns: + Queue instance + """ + global _queue + _queue = Queue(max_workers=max_workers, is_async=is_async) + return _queue + + +def get_queue() -> Queue: + """Get the global queue instance. + + Returns: + Queue instance + """ + if _queue is None: + raise RuntimeError("Queue not initialized. Call init_queue() first.") + return _queue + + +def shutdown_queue(wait: bool = True) -> None: + """Shutdown the global queue. + + Args: + wait: Whether to wait for running jobs to complete + """ + global _queue + if _queue: + _queue.shutdown(wait=wait) + _queue = None diff --git a/asu/main.py b/asu/main.py index 8602bac4..f1054ebb 100644 --- a/asu/main.py +++ b/asu/main.py @@ -28,6 +28,39 @@ base_path = Path(__file__).resolve().parent app = FastAPI() + + +@app.on_event("startup") +async def startup_event(): + """Initialize database and job queue on startup.""" + from asu.database import init_database + from asu.job_queue import init_queue + + # Initialize database + init_database(settings.database_path) + + # Initialize job queue + init_queue(max_workers=settings.worker_threads, is_async=settings.async_queue) + + logging.info(f"Database initialized at {settings.database_path}") + logging.info(f"Job queue initialized with {settings.worker_threads} workers") + + +@app.on_event("shutdown") +async def shutdown_event(): + """Cleanup database and job queue on shutdown.""" + from asu.database import close_database + from asu.job_queue import shutdown_queue + + # Shutdown job queue + shutdown_queue(wait=True) + + # Close database connections + close_database() + + logging.info("Application shutdown complete") + + app.include_router(api.router, prefix="/api/v1") app.include_router(stats.router, prefix="/api/v1") diff --git a/asu/routers/api.py b/asu/routers/api.py index b26c85de..c935c999 100644 --- a/asu/routers/api.py +++ b/asu/routers/api.py @@ -3,7 +3,6 @@ from fastapi import APIRouter, Header, Request from fastapi.responses import RedirectResponse, Response -from rq.job import Job from asu.build import build from asu.build_request import BuildRequest @@ -141,7 +140,7 @@ def valid_profile(profile: str, build_request: BuildRequest) -> bool: return ({}, None) -def return_job_v1(job: Job) -> tuple[dict, int, dict]: +def return_job_v1(job) -> tuple[dict, int, dict]: response: dict = job.get_meta() imagebuilder_status: str = "done" queue_position: int = 0 @@ -186,7 +185,7 @@ def return_job_v1(job: Job) -> tuple[dict, int, dict]: @router.head("/build/{request_hash}") @router.get("/build/{request_hash}") def api_v1_build_get(request: Request, request_hash: str, response: Response) -> dict: - job: Job = get_queue().fetch_job(request_hash) + job = get_queue().fetch_job(request_hash) if not job: response.status_code = 404 return { @@ -215,7 +214,7 @@ def api_v1_build_post( add_build_event("requests") request_hash: str = get_request_hash(build_request) - job: Job = get_queue().fetch_job(request_hash) + job = get_queue().fetch_job(request_hash) status: int = 200 result_ttl: str = settings.build_ttl if build_request.defaults: diff --git a/asu/routers/stats.py b/asu/routers/stats.py index c2185ffc..505732e8 100644 --- a/asu/routers/stats.py +++ b/asu/routers/stats.py @@ -2,7 +2,7 @@ from fastapi import APIRouter -from asu.util import get_redis_ts +from asu.config import settings router = APIRouter() @@ -29,94 +29,134 @@ def start_stop(duration, interval): @router.get("/builds-per-day") def get_builds_per_day() -> dict: """ - References: - https://redis.readthedocs.io/en/latest/redismodules.html#redis.commands.timeseries.commands.TimeSeriesCommands.range - https://www.chartjs.org/docs/latest/charts/line.html + Get builds per day statistics. + + This is a simplified implementation using SQLite. + TODO: Implement TimeSeries-style aggregation for better performance. """ + if not settings.server_stats: + return {"labels": [], "datasets": []} start, stop, stamps, labels = start_stop(N_DAYS, DAY_MS) - ts = get_redis_ts() - rc = ts.client - range_options = dict( - from_time=start, - to_time=stop, - align=start, # Ensures alignment of X values with "stamps". - aggregation_type="sum", - bucket_size_msec=DAY_MS, - ) - - def get_dataset(event: str, color: str) -> dict: - """Fills "data" array completely, supplying 0 for missing values.""" - key = f"stats:build:{event}" - result = ts.range(key, **range_options) if rc.exists(key) else [] - data_map = dict(result) + from asu.database import get_session, BuildStats + + session = get_session() + try: + # Convert timestamps from milliseconds to datetime + start_dt = dt.fromtimestamp(start / 1000, UTC) + stop_dt = dt.fromtimestamp(stop / 1000, UTC) + + # Query stats for each event type + def get_dataset(event: str, color: str) -> dict: + """Get dataset for a specific event type.""" + key = f"stats:build:{event}" + stats = ( + session.query(BuildStats) + .filter( + BuildStats.event_type == key, + BuildStats.timestamp >= start_dt, + BuildStats.timestamp < stop_dt, + ) + .all() + ) + + # Group by day + data_map = {} + for stat in stats: + timestamp_ms = int(stat.timestamp.timestamp() * 1000) + # Align to bucket + bucket_index = (timestamp_ms - start) // DAY_MS + bucket_timestamp = start + (bucket_index * DAY_MS) + data_map[bucket_timestamp] = data_map.get(bucket_timestamp, 0) + 1 + + return { + "label": event.title(), + "data": [data_map.get(stamp, 0) for stamp in stamps], + "color": color, + } + return { - "label": event.title(), - "data": [data_map.get(stamp, 0) for stamp in stamps], - "color": color, + "labels": labels, + "datasets": [ + # See add_build_event for valid "event" values. + get_dataset("requests", "green"), + get_dataset("cache-hits", "orange"), + get_dataset("failures", "red"), + ], } - - return { - "labels": labels, - "datasets": [ - # See add_build_event for valid "event" values. - get_dataset("requests", "green"), - get_dataset("cache-hits", "orange"), - get_dataset("failures", "red"), - ], - } + finally: + session.close() @router.get("/builds-by-version") -def get_builds_by_version(branch: str = None) -> dict(): - """If 'branch' is None, then data will be returned "by branch", +def get_builds_by_version(branch: str = None) -> dict: + """Get builds by version. + + If 'branch' is None, then data will be returned "by branch", so you get one curve for each of 23.05, 24.10, 25.12 etc. If you specify a branch, say "24.10", then the results are for - all versions on that branch, 24.10.0, 24.1.1 and so on.""" + all versions on that branch, 24.10.0, 24.1.1 and so on. + + This is a simplified implementation using SQLite. + TODO: Implement TimeSeries-style aggregation for better performance. + """ + if not settings.server_stats: + return {"labels": [], "datasets": []} interval = 7 * DAY_MS # Each bucket is a week. duration = 26 # Number of weeks of data, about 6 months. start, stop, stamps, labels = start_stop(duration, interval) - bucket = {} - - def sum_data(version, data): - data_map = dict(data) - if version not in bucket: - bucket[version] = [0.0] * len(stamps) - for i, stamp in enumerate(stamps): - bucket[version][i] += data_map.get(stamp, 0) - - range_options = dict( - filters=["stats=builds"], - with_labels=True, - from_time=start, - to_time=stop, - align=start, # Ensures alignment of X values with "stamps". - aggregation_type="sum", - bucket_size_msec=interval, - ) - - result = get_redis_ts().mrange(**range_options) - for row in result: - for data in row.values(): - version = data[0]["version"] + from asu.database import get_session, BuildStats + + session = get_session() + try: + # Convert timestamps from milliseconds to datetime + start_dt = dt.fromtimestamp(start / 1000, UTC) + stop_dt = dt.fromtimestamp(stop / 1000, UTC) + + # Query stats for builds + stats = ( + session.query(BuildStats) + .filter( + BuildStats.event_type.like("stats:builds:%"), + BuildStats.timestamp >= start_dt, + BuildStats.timestamp < stop_dt, + ) + .all() + ) + + bucket = {} + + for stat in stats: + version = stat.event_metadata.get("version", "unknown") + if branch and not version.startswith(branch): continue elif branch is None and "." in version: version = version[:5] - sum_data(version, data[1]) - - return { - "labels": labels, - "datasets": [ - { - "label": version, - "data": bucket[version], - } - for version in sorted(bucket) - ], - } + + if version not in bucket: + bucket[version] = [0.0] * len(stamps) + + timestamp_ms = int(stat.timestamp.timestamp() * 1000) + bucket_index = (timestamp_ms - start) // interval + + if 0 <= bucket_index < len(stamps): + bucket[version][bucket_index] += 1 + + return { + "labels": labels, + "datasets": [ + { + "label": version, + "data": bucket[version], + } + for version in sorted(bucket) + ], + } + finally: + session.close() diff --git a/asu/util.py b/asu/util.py index 7f8e8f68..911ad5e8 100644 --- a/asu/util.py +++ b/asu/util.py @@ -17,43 +17,49 @@ from httpx import Response from podman import PodmanClient from podman.domain.containers import Container -from rq import Queue -from rq.job import Job -import redis from asu.build_request import BuildRequest from asu.config import settings -log: logging.Logger = logging.getLogger("rq.worker") +log: logging.Logger = logging.getLogger("asu.worker") log.propagate = False # Suppress duplicate log messages. # Create a shared HTTP client _http_client = httpx.Client() -def get_redis_client(unicode: bool = True) -> redis.client.Redis: - return redis.from_url(settings.redis_url, decode_responses=unicode) - - -def get_redis_ts(): - return get_redis_client().ts() - - def client_get(url: str) -> Response: return _http_client.get(url) def add_timestamp(key: str, labels: dict[str, str] = {}, value: int = 1) -> None: + """Add a timestamp/stats event to the database. + + This is a simplified implementation that stores stats in SQLite. + TODO: Implement TimeSeries-style aggregation if needed. + + Args: + key: Stats key + labels: Metadata labels + value: Value to record + """ if not settings.server_stats: return + log.debug(f"Adding timestamp to {key}: {labels}") - get_redis_ts().add( - key, - value=value, - timestamp="*", - labels=labels, - duplicate_policy="sum", - ) + + from asu.database import get_session, BuildStats + + session = get_session() + try: + stat = BuildStats(event_type=key, event_metadata={**labels, "value": value}) + session.add(stat) + session.commit() + except Exception as e: + log.warning(f"Failed to add timestamp: {e}") + session.rollback() + finally: + session.close() def add_build_event(event: str) -> None: @@ -79,13 +85,15 @@ def add_build_event(event: str) -> None: add_timestamp(key, {"stats": "summary"}) -def get_queue() -> Queue: +def get_queue(): """Return the current queue Returns: - Queue: The current RQ work queue + Queue: The current work queue """ - return Queue(connection=get_redis_client(False), is_async=settings.async_queue) + from asu.job_queue import get_queue as get_job_queue + + return get_job_queue() def get_branch(version_or_branch: str) -> dict[str, str]: @@ -321,7 +329,7 @@ def run_cmd( return returncode, stdout, stderr -def report_error(job: Job, msg: str) -> None: +def report_error(job, msg: str) -> None: log.warning(f"Error: {msg}") job.meta["detail"] = f"Error: {msg}" job.meta["imagebuilder_status"] = "failed" diff --git a/pyproject.toml b/pyproject.toml index 4494809b..eed0d959 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,9 +11,8 @@ dependencies = [ "fastapi[standard]>=0.119.0", "pynacl>=1.6.0", "podman>=5.6.0", - "redis>=6.4.0", "pydantic-settings>=2.12.0", - "rq>=2.6.0", + "sqlalchemy>=2.0.0", "uvicorn>=0.37.0", "fastapi-cache2>=0.2.2", "httpx>=0.28.1", @@ -25,7 +24,6 @@ dev = [ "ruff>=0.14.9", "coverage>=7.13.0", "isort>=7.0.0", - "fakeredis>=2.32.0", "pytest-httpserver>=1.1.3", ] diff --git a/tests/conftest.py b/tests/conftest.py index df4ff57d..aa7e6065 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,65 +3,11 @@ from pathlib import Path import pytest -from fakeredis import FakeStrictRedis -from rq import Queue from fastapi.testclient import TestClient from asu.config import settings -def redis_load_mock_data(redis): - return - redis.sadd( - "packages:1.2:1.2.3:testtarget/testsubtarget", - "test1", - "test2", - "test3", - "valid_new_package", - ) - redis.sadd("profiles:1.2:1.2.3:testtarget/testsubtarget", "testprofile") - redis.sadd("profiles:SNAPSHOT:SNAPSHOT:ath79/generic", "tplink_tl-wdr4300-v1") - redis.sadd("packages:SNAPSHOT:SNAPSHOT:ath79/generic", "vim", "tmux") - redis.sadd("packages:SNAPSHOT:SNAPSHOT:x86/64", "vim", "tmux") - - redis.sadd("branches", "SNAPSHOT", "1.2", "21.02", "19.07") - redis.sadd("versions:SNAPSHOT", "SNAPSHOT") - redis.sadd("versions:1.2", "1.2.3") - redis.sadd("versions:21.02", "21.02.7", "21.02.0", "21.02.0-rc4", "21.02-SNAPSHOT") - redis.sadd("versions:19.07", "19.07.7", "19.07.6") - - redis.sadd("profiles:21.02:21.02.7:ath79/generic", "tplink_tl-wdr4300-v1") - redis.sadd("packages:21.02:21.02.7:ath79/generic", "vim", "tmux") - redis.sadd("packages:21.02:21.02.7:x86/64", "vim", "tmux") - - redis.sadd("profiles:21.02:21.02.7:x86/64", "generic") - redis.set("revision:21.02.7:x86/64", "r16847-f8282da11e") - - redis.hset( - "mapping:1.2:1.2.3:testtarget/testsubtarget", - mapping={"testvendor,testprofile": "testprofile"}, - ) - redis.hset("targets:1.2", mapping={"testtarget/testsubtarget": "testarch"}) - redis.hset("targets:SNAPSHOT", mapping={"ath79/generic": "", "x86/64": ""}) - redis.hset( - "targets:21.02", - mapping={ - "testtarget/testsubtarget": "testarch", - "ath79/generic": "", - "x86/64": "", - }, - ) - redis.hset("mapping-abi", mapping={"test1-1": "test1"}) - - -@pytest.fixture -def redis_server(): - r = FakeStrictRedis() - redis_load_mock_data(r) - yield r - r.flushall() - - def pytest_addoption(parser): parser.addoption( "--runslow", action="store_true", default=False, help="run slow tests" @@ -90,17 +36,17 @@ def test_path(): @pytest.fixture -def app(redis_server, test_path, monkeypatch, upstream): - def mocked_redis_client(*args, **kwargs): - return redis_server - - def mocked_redis_queue(): - return Queue(connection=redis_server, is_async=settings.async_queue) - +def app(test_path, monkeypatch, upstream): + from asu.database import init_database, close_database + from asu.job_queue import init_queue, shutdown_queue + settings.public_path = Path(test_path) / "public" + settings.database_path = Path(test_path) / "test.db" settings.async_queue = False settings.upstream_url = "http://localhost:8123" settings.server_stats = "stats" + settings.worker_threads = 2 + for branch in "1.2", "19.07", "21.02": if branch not in settings.branches: settings.branches[branch] = { @@ -108,13 +54,17 @@ def mocked_redis_queue(): "enabled": True, } - monkeypatch.setattr("asu.util.get_queue", mocked_redis_queue) - monkeypatch.setattr("asu.routers.api.get_queue", mocked_redis_queue) - monkeypatch.setattr("asu.util.get_redis_client", mocked_redis_client) + # Initialize database and queue + init_database(settings.database_path) + init_queue(max_workers=settings.worker_threads, is_async=settings.async_queue) from asu.main import app as real_app yield real_app + + # Cleanup + shutdown_queue(wait=False) + close_database() @pytest.fixture