Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ asu.egg-info/
.coverage
**/__pycache__/
*.pyc
*.db
*.db-shm
*.db-wal
23 changes: 18 additions & 5 deletions asu/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
from typing import Union
from time import perf_counter

from rq import get_current_job
from rq.utils import parse_timeout
from podman import errors

from asu.build_request import BuildRequest
Expand All @@ -31,7 +29,7 @@
run_cmd,
)

log = logging.getLogger("rq.worker")
log = logging.getLogger("asu.worker")


def _build(build_request: BuildRequest, job=None):
Expand All @@ -40,7 +38,8 @@ def _build(build_request: BuildRequest, job=None):
The `request` dict contains properties of the requested image.

Args:
request (dict): Contains all properties of requested image
build_request: BuildRequest object containing all properties of requested image
job: Job object for tracking build progress
"""

build_start: float = perf_counter()
Expand All @@ -50,7 +49,18 @@ def _build(build_request: BuildRequest, job=None):
bin_dir.mkdir(parents=True, exist_ok=True)
log.debug(f"Bin dir: {bin_dir}")

job = job or get_current_job()
# Job is now passed as parameter instead of using get_current_job()
if job is None:
# Create a minimal job-like object for testing
class MinimalJob:
def __init__(self):
self.meta = {}

def save_meta(self):
pass

job = MinimalJob()

job.meta["detail"] = "init"
job.meta["imagebuilder_status"] = "init"
job.meta["request"] = build_request
Expand Down Expand Up @@ -168,6 +178,9 @@ def _build(build_request: BuildRequest, job=None):

log.debug("Mounts: %s", mounts)

# Parse timeout locally instead of using rq.utils.parse_timeout
from asu.job_queue import parse_timeout

container = podman.containers.create(
image,
command=["sleep", str(parse_timeout(settings.job_timeout))],
Expand Down
11 changes: 9 additions & 2 deletions asu/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pathlib import Path
from typing import Union
from typing import Union, Optional

from pydantic_settings import BaseSettings, SettingsConfigDict

Expand Down Expand Up @@ -64,7 +64,8 @@ class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")

public_path: Path = Path.cwd() / "public"
redis_url: str = "redis://localhost:6379"
database_path: Optional[Path] = None # Will default to public_path / "asu.db"
worker_threads: int = 4
upstream_url: str = "https://downloads.openwrt.org"
allow_defaults: bool = False
async_queue: bool = True
Expand Down Expand Up @@ -98,5 +99,11 @@ class Settings(BaseSettings):
max_pending_jobs: int = 200
job_timeout: str = "10m"

def __init__(self, **kwargs):
super().__init__(**kwargs)
# Set default database_path if not provided
if self.database_path is None:
self.database_path = self.public_path / "asu.db"


settings = Settings()
173 changes: 173 additions & 0 deletions asu/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""Database module for ASU using SQLAlchemy.

This module provides SQLAlchemy models and database utilities for managing
build jobs and statistics without requiring Redis.
"""

import logging
from datetime import UTC, datetime, timedelta
from pathlib import Path

from sqlalchemy import (
JSON,
Column,
DateTime,
Integer,
String,
create_engine,
event,
)
from sqlalchemy.engine import Engine
from sqlalchemy.orm import declarative_base, scoped_session, sessionmaker

log = logging.getLogger("asu.worker")

Base = declarative_base()


class Job(Base):
"""Model for storing job information."""

__tablename__ = "jobs"

id = Column(String, primary_key=True)
status = Column(String, default="queued") # queued, started, finished, failed
meta = Column(JSON, default=dict)
result = Column(JSON, nullable=True)
enqueued_at = Column(DateTime, default=lambda: datetime.now(UTC))
started_at = Column(DateTime, nullable=True)
finished_at = Column(DateTime, nullable=True)
failure_ttl = Column(Integer, nullable=True) # in seconds
result_ttl = Column(Integer, nullable=True) # in seconds
exc_string = Column(String, nullable=True) # Exception string for failed jobs

def __repr__(self):
return f"<Job(id={self.id}, status={self.status})>"


class BuildStats(Base):
"""Model for storing build statistics."""

__tablename__ = "build_stats"

id = Column(Integer, primary_key=True, autoincrement=True)
event_type = Column(String, index=True)
timestamp = Column(DateTime, default=lambda: datetime.now(UTC), index=True)
event_metadata = Column(JSON, default=dict)

def __repr__(self):
return f"<BuildStats(event_type={self.event_type}, timestamp={self.timestamp})>"


# Global session factory
_session_factory = None
_engine = None


@event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_conn, connection_record):
"""Enable SQLite optimizations for multi-threaded use."""
cursor = dbapi_conn.cursor()
cursor.execute("PRAGMA journal_mode=WAL")
cursor.execute("PRAGMA synchronous=NORMAL")
cursor.execute("PRAGMA cache_size=-64000") # 64MB cache
cursor.close()


def init_database(database_path: Path) -> None:
"""Initialize the database and create tables.

Args:
database_path: Path to the SQLite database file
"""
global _session_factory, _engine

database_path.parent.mkdir(parents=True, exist_ok=True)
database_url = f"sqlite:///{database_path}"

_engine = create_engine(
database_url,
connect_args={"check_same_thread": False},
pool_size=5,
max_overflow=10,
pool_pre_ping=True,
)

Base.metadata.create_all(_engine)

_session_factory = scoped_session(
sessionmaker(autocommit=False, autoflush=False, bind=_engine)
)

log.info(f"Database initialized at {database_path}")


def get_session():
"""Get a database session.

Returns:
SQLAlchemy session
"""
if _session_factory is None:
raise RuntimeError("Database not initialized. Call init_database() first.")
return _session_factory()


def cleanup_expired_jobs() -> int:
"""Remove expired jobs from the database.

Returns:
Number of jobs removed
"""
session = get_session()
try:
now = datetime.now(UTC)
expired_count = 0

# Find expired finished jobs
finished_jobs = (
session.query(Job)
.filter(Job.status == "finished", Job.result_ttl.isnot(None))
.all()
)

for job in finished_jobs:
if job.finished_at:
expiry_time = job.finished_at + timedelta(seconds=job.result_ttl)
if now > expiry_time:
session.delete(job)
expired_count += 1

# Find expired failed jobs
failed_jobs = (
session.query(Job)
.filter(Job.status == "failed", Job.failure_ttl.isnot(None))
.all()
)

for job in failed_jobs:
if job.finished_at:
expiry_time = job.finished_at + timedelta(seconds=job.failure_ttl)
if now > expiry_time:
session.delete(job)
expired_count += 1

session.commit()
return expired_count
finally:
session.close()


def close_database() -> None:
"""Close database connections."""
global _session_factory, _engine

if _session_factory:
_session_factory.remove()
_session_factory = None

if _engine:
_engine.dispose()
_engine = None

log.info("Database connections closed")
Loading