diff --git a/Makefile b/Makefile index 6c9a631..cd0a160 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,11 @@ PYPY_VERSION := 3.11-v7.3.20-linux64 backend/pypy${PYPY_VERSION}/bin/pypy3: wget https://downloads.python.org/pypy/pypy${PYPY_VERSION}.tar.bz2 -O backend/pypy${PYPY_VERSION}.tar.bz2 tar xf backend/pypy${PYPY_VERSION}.tar.bz2 -C backend + +# there is the issue of parsing query string in socketify with asgi, https://github.com/cirospaciari/socketify.py/issues/218 +install-backend-deps: backend/pypy${PYPY_VERSION}/bin/pypy3 cd backend && ./pypy${PYPY_VERSION}/bin/pypy3 -m ensurepip && ./pypy${PYPY_VERSION}/bin/pypy3 -mpip install -r requirements.txt + cd backend && sed -i 's/"query_string": ffi.unpack(info.query_string, info.query_string_size)/"query_string": ffi.unpack(info.query_string, info.query_string_size)[1:]/g' pypy${PYPY_VERSION}/lib/pypy3.11/site-packages/socketify/asgi.py dev-frontend: ~/.pixi/bin/pixi cd frontend && pixi run npm install -f @@ -53,7 +57,7 @@ migrate-dev-db: backend/pypy${PYPY_VERSION}/bin/pypy3 start-test-db: docker compose up test-db -d -migrate-test-db: backend/pypy${PYPY_VERSION}/bin/pypy3 +migrate-test-db: install-backend-deps cd backend && ./pypy${PYPY_VERSION}/bin/pypy3 -m aerich init-db || echo "DB already existed" cd backend && ./pypy${PYPY_VERSION}/bin/pypy3 -m aerich upgrade @@ -85,11 +89,11 @@ start-traefik: ~/.pixi/bin/pixi cd traefik && pixi run traefik --configFile=traefik.dev.yaml ## Tests: Currently testing for backend only -.PHONY: start-test-infra test-auth test-health test-organization test-all +.PHONY: start-test-infra test-auth test-health test-organization test-all install-backend-deps install-test-deps test-credential test-analysis test-project test-storage test-job start-test-infra: start-test-db start-redis start-localstack start-slurm @echo "Start test infra setup" -install-test-deps: backend/pypy${PYPY_VERSION}/bin/pypy3 +install-test-deps: install-backend-deps cd backend && ./pypy${PYPY_VERSION}/bin/pypy3 -mpip install pytest pytest-cov pytest-asyncio test-auth: install-test-deps @@ -129,7 +133,7 @@ staging: .env production: .env.prod docker compose --env-file .env.prod build sed -e 's#HOST#$(shell cat .env.prod|grep APP_DOMAIN|cut -f2 -d=)#g' traefik/base/prod.yaml > traefik/dynamic/prod.yaml - docker compose --env-file .env.prod up traefik + docker compose --env-file .env.prod up traefik -d # CLEAN .PHONY: clean-dev-db clean-test-db clean-redis diff --git a/backend/.gitignore b/backend/.gitignore index 90eaee5..15ba092 100644 --- a/backend/.gitignore +++ b/backend/.gitignore @@ -100,7 +100,7 @@ venv.bak/ # mypy .mypy_cache/ .pytest # pixi environments -.pixi/* +.pixi !.pixi/config.toml bombardier-linux-amd64 *tar.bz2 diff --git a/backend/Dockerfile b/backend/Dockerfile index 71deeb3..ea31cb6 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -1,16 +1,14 @@ FROM pypy:3.11-slim - ENV PYTHONUNBUFFERED=1 WORKDIR /app - RUN apt-get update && apt-get install -y \ - build-essential pkg-config \ + build-essential pkg-config libuv1 \ libffi-dev libssl-dev libsodium-dev make libpq-dev \ && rm -rf /var/lib/apt/lists/* - COPY requirements.txt . RUN pip install --upgrade pip && pip install --no-cache-dir -r requirements.txt - +# Fix socketify ASGI query string parsing issue +RUN sed -i 's/"query_string": ffi.unpack(info.query_string, info.query_string_size)/"query_string": ffi.unpack(info.query_string, info.query_string_size)[1:]/g' /opt/pypy/lib/pypy3.11/site-packages/socketify/asgi.py COPY app ./app COPY domain ./domain COPY migrations ./migrations @@ -18,7 +16,6 @@ COPY pyproject.toml . COPY settings.yaml . COPY run_server.sh ./ RUN chmod +x run_server.sh -RUN apt-get update && apt-get install -y libuv1 EXPOSE 8000 RUN useradd -m appuser diff --git a/backend/app/main.py b/backend/app/main.py index d5ebdaf..6a5abb2 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -86,7 +86,6 @@ async def configure_redis(app): JobController, ] ) -print("Failed here") # Database 2 3 register_tortoise( app, diff --git a/backend/app/service_analysis/controller.py b/backend/app/service_analysis/controller.py index f4858fa..9fc73ba 100644 --- a/backend/app/service_analysis/controller.py +++ b/backend/app/service_analysis/controller.py @@ -8,7 +8,7 @@ from tortoise.expressions import Q from app.service_credential.models.base import Credential from app.utils.version_control.manager import get_version_control_manager -from .serializers import AnalysisSerializerIn, AnalysisSerializerOut +from .serializers import AnalysisSerializerIn, AnalysisSerializerOut, AnalysisUpdateSerializer from .models import Analysis @@ -95,7 +95,7 @@ async def get_analysis( return ok( { - "item": result, + "items": result, "page": page, "page_size": page_size, "total": total_count, @@ -181,17 +181,19 @@ async def get_profile_schema( @auth("authenticated") @put("/{id}/") - async def update_analysis(id: UUID, request: Request) -> Response: + async def update_analysis( + request: Request, + id: UUID, + data: AnalysisUpdateSerializer, + ) -> Response: created_by = await request.identity.get_user() - data = await request.json() - if "allow_access" not in data.keys(): - return bad_request({"message": "Missing allow_access field"}) - if "has_base_url" not in data.keys(): - return bad_request({"message": "Missing has_base_url field"}) - - await Analysis.filter(id=id, created_by=created_by.email).update( - allow_access=data["allow_access"], has_base_url=data["has_base_url"] + + updated_count = await Analysis.filter(id=id, created_by=created_by.email).update( + allow_access=data.allow_access, has_base_url=data.has_base_url ) + if updated_count == 0: + return bad_request({"message": "Analysis not found or unauthorized"}) + return no_content() @auth("authenticated") diff --git a/backend/app/service_analysis/serializers.py b/backend/app/service_analysis/serializers.py index d77a3bd..1553ef2 100644 --- a/backend/app/service_analysis/serializers.py +++ b/backend/app/service_analysis/serializers.py @@ -1,4 +1,5 @@ from tortoise.contrib.pydantic import pydantic_model_creator +from pydantic import BaseModel from .models import Analysis @@ -14,3 +15,8 @@ Analysis, name="AnalysisOut", ) + + +class AnalysisUpdateSerializer(BaseModel): + has_base_url: bool + allow_access: bool diff --git a/backend/app/service_analysis/tests/test_analysis.py b/backend/app/service_analysis/tests/test_analysis.py index aa137ee..74b1772 100644 --- a/backend/app/service_analysis/tests/test_analysis.py +++ b/backend/app/service_analysis/tests/test_analysis.py @@ -51,16 +51,16 @@ async def test_basic_analysis( # list instance as admin res_admin = await list_instance(test_client, f"{BASE_URL}/", cookies_admin, 200) - assert len(res_admin["item"]) == 1 - assert_instance_fields(res_admin["item"][0], EXPECTED_FIELDS) + assert len(res_admin["items"]) == 1 + assert_instance_fields(res_admin["items"][0], EXPECTED_FIELDS) # list instance as contributor res_contrib = await list_instance(test_client, f"{BASE_URL}/", cookies_contributor, 200) - assert len(res_contrib["item"]) == 1 - assert_instance_fields(res_contrib["item"][0], EXPECTED_FIELDS) + assert len(res_contrib["items"]) == 1 + assert_instance_fields(res_contrib["items"][0], EXPECTED_FIELDS) # delete as admin (should fail) - delete_url = f"{BASE_URL}/{res_admin['item'][0]['id']}/" + delete_url = f"{BASE_URL}/{res_admin['items'][0]['id']}/" await delete_instance(test_client, delete_url, cookies_admin, 204) # delete as superuser (should pass) @@ -93,23 +93,23 @@ async def get_page(page, page_size): # page 1 data = await get_page(1, 10) assert data["page"] == 1 - assert len(data["item"]) == 10 + assert len(data["items"]) == 10 # page 2 data = await get_page(2, 10) assert data["page"] == 2 - assert len(data["item"]) == 10 + assert len(data["items"]) == 10 # last page data = await get_page(4, 10) assert data["page"] == 4 - assert len(data["item"]) == 0 + assert len(data["items"]) == 0 # too large page data = await get_page(99, 10) - assert data["item"] == [] + assert data["items"] == [] # search by url res = await test_client.get(f"{BASE_URL}/", query={"search": "repo-1"}, cookies=cookies_admin) data = await res.json() assert res.status == 200 - assert any("repo-1" in item["url"] for item in data["item"]) + assert any("repo-1" in item["url"] for item in data["items"]) diff --git a/backend/app/service_credential/models/personal.py b/backend/app/service_credential/models/personal.py index 226de32..9db68ec 100644 --- a/backend/app/service_credential/models/personal.py +++ b/backend/app/service_credential/models/personal.py @@ -25,4 +25,4 @@ class Github(models.Model): credential = fields.OneToOneField("models.Credential", related_name="github", on_delete=fields.CASCADE) id = fields.UUIDField(primary_key=True, default=uuid.uuid4) username = fields.CharField(max_length=50, null=False) - token = fields.CharField(max_length=200, null=False) + token = EncryptedTextField(max_length=200, null=False) diff --git a/backend/app/service_job/controller.py b/backend/app/service_job/controller.py index 3947c13..6bbc9ee 100644 --- a/backend/app/service_job/controller.py +++ b/backend/app/service_job/controller.py @@ -54,10 +54,11 @@ async def create_job(self, project_id: UUID, job: JobSerializerIn, request: Requ project = await Project.get_or_none(id=project_id) params["bucket_name"] = project.bucket_name await project.fetch_related("storage") + + # Validate that storage credentials are configured (don't generate temp creds here) storage_manager = await get_storage_manager(project.storage) - temp_secrets = storage_manager.get_temp_secret(session_name=user.email, duration=job["time"]) - if not temp_secrets: - return bad_request({"message": "The ARN failed to create"}) + if not storage_manager.is_valid(): + return bad_request({"message": "Storage credentials are not valid"}) if analysis.allow_access: job = await WebJob.create(**job) @@ -74,9 +75,12 @@ async def create_job(self, project_id: UUID, job: JobSerializerIn, request: Requ job_out = job_out.model_dump() user = await request.identity.get_user() job_out["created_by"] = user.email - # Submit to monitor job - params.update(temp_secrets) - submit_job.apply_async((job.id, analysis.allow_access, params)) + + # SECURITY: Pass only storage_id and user_email, not credentials + # Worker will regenerate credentials just-in-time to avoid Redis exposure + submit_job.apply_async( + (str(job.id), analysis.allow_access, params, str(project.storage.pk), user.email, job.time) + ) return created(job_out) @auth("viewer") @@ -149,7 +153,7 @@ async def list_web_job( return ok( { - "item": result, + "items": result, "page": page, "page_size": page_size, "total": total_count, diff --git a/backend/app/service_job/models.py b/backend/app/service_job/models.py index ef58399..28c566b 100644 --- a/backend/app/service_job/models.py +++ b/backend/app/service_job/models.py @@ -12,7 +12,7 @@ class JobStatus(str, Enum): PENDING = "PENDING" FAILED = "FAILED" COMPLETED = "COMPLETED" - SERVER_ERROR = "SERVER_ERROR" + HPC_DISCONNECTED = "HPC_DISCONNECTED" CANCELLED = "CANCELLED" CANCELLING = "CANCELLED+" TIMEOUT = "TIMEOUT" diff --git a/backend/app/service_job/tasks.py b/backend/app/service_job/tasks.py index 3be64d5..cbed903 100644 --- a/backend/app/service_job/tasks.py +++ b/backend/app/service_job/tasks.py @@ -6,8 +6,24 @@ @shared_task -def submit_job(job_id: str, is_web_job: bool, params: dict): +def submit_job(job_id: str, is_web_job: bool, params: dict, storage_id: str, user_email: str, duration: int): + """ + Submit job to compute executor. + + SECURITY: Regenerates AWS credentials just-in-time in the worker to avoid + passing credentials through Redis/Celery broker. + + Args: + job_id: Job UUID + is_web_job: Whether this is a web job + params: Job parameters (without credentials) + storage_id: Storage credential ID to fetch from database + user_email: User email for STS session naming + duration: Duration in seconds for temporary credentials + """ from app.utils.executor.manager import get_compute_executor + from app.utils.storage.manager import get_storage_manager + from app.service_credential.models.base import Credential import logging logger = logging.getLogger(__name__) @@ -23,13 +39,36 @@ async def _handle(): return try: + # SECURITY: Fetch storage credential from DB and generate temp credentials + # This happens in the worker, so credentials never pass through Redis + storage_credential = await Credential.get_or_none(id=storage_id) + if not storage_credential: + logger.error(f"Storage credential {storage_id} not found for job {job_id}") + job.status = JobStatus.FAILED + await job.save() + return + + storage_manager = await get_storage_manager(storage_credential) + temp_secrets = storage_manager.get_temp_secret(session_name=user_email, duration=duration) + + if not temp_secrets: + logger.error(f"Failed to generate temporary credentials for job {job_id}") + job.status = JobStatus.FAILED + await job.save() + return + + # Add temp credentials to params + params.update(temp_secrets) + logger.info(f"Generated temporary credentials for job {job_id} (duration: {duration}s)") + + # Submit job with credentials await job.fetch_related("compute") executor = await get_compute_executor(job.compute) result = executor.submit(job, params) job.external_id = result.get("job_id") if job.external_id is None: - job.status = JobStatus.SERVER_ERROR + job.status = JobStatus.HPC_DISCONNECTED else: job.status = JobStatus.SUBMITTED await job.save() @@ -77,13 +116,19 @@ async def _handle(): JobStatus.CANCELLED, JobStatus.CANCELLING, JobStatus.TIMEOUT, - JobStatus.SERVER_ERROR, ]: logger.info(f"Rescheduling monitoring for job {job.id}, current status: {job.status}") monitor_job.apply_async(args=[job.id, is_web_job], countdown=5) else: logger.info(f"Monitoring finished for job {job.id}, status: {job.status}") + # SECURITY: Cleanup params.json from HPC in case job failed before it could delete it + try: + await executor.cleanup_credentials(job) + logger.info(f"Cleaned up credential files for job {job.id}") + except Exception as e: + logger.warning(f"Failed to cleanup credentials for job {job.id}: {e}") + except Exception: logger.exception(f"Monitoring failed for WebJob {job_id}") job.status = JobStatus.FAILED diff --git a/backend/app/service_job/tests/test_api_web_job.py b/backend/app/service_job/tests/test_api_web_job.py index 94a5a8b..1e078ff 100644 --- a/backend/app/service_job/tests/test_api_web_job.py +++ b/backend/app/service_job/tests/test_api_web_job.py @@ -80,8 +80,8 @@ async def test_create_list_job( ) assert res.status == 200 result = await res.json() - assert "item" in result - assert len(result["item"]) == 5 + assert "items" in result + assert len(result["items"]) == 5 assert result["total"] >= 15 # List jobs with pagination (page=3, page_size=5) @@ -92,8 +92,8 @@ async def test_create_list_job( ) assert res.status == 200 result = await res.json() - assert "item" in result - assert len(result["item"]) == 5 + assert "items" in result + assert len(result["items"]) == 5 # Search for a specific job by name search_name = "customer-03" @@ -104,7 +104,7 @@ async def test_create_list_job( ) assert res.status == 200 result = await res.json() - assert any(job["name"] == search_name for job in result["item"]) + assert any(job["name"] == search_name for job in result["items"]) # get log of current running job # TODO: test job log diff --git a/backend/app/service_organization/controller.py b/backend/app/service_organization/controller.py index 67638d6..3842d0b 100644 --- a/backend/app/service_organization/controller.py +++ b/backend/app/service_organization/controller.py @@ -69,7 +69,7 @@ async def list_organization( return ok( { - "item": result, + "items": result, "page": page, "page_size": page_size, "total": total_count, diff --git a/backend/app/service_organization/tests/test_organization.py b/backend/app/service_organization/tests/test_organization.py index 96a59dd..7fe2038 100644 --- a/backend/app/service_organization/tests/test_organization.py +++ b/backend/app/service_organization/tests/test_organization.py @@ -45,8 +45,8 @@ async def check_page(page, expected_count): res_data = await res.json() assert res.status == 200 assert res_data["page"] == page - assert len(res_data["item"]) == expected_count - for item in res_data["item"]: + assert len(res_data["items"]) == expected_count + for item in res_data["items"]: assert_instance_fields(item, expected_fields) await check_page(1, 10) @@ -58,19 +58,19 @@ async def check_page(page, expected_count): res_data = await res.json() assert res.status == 200 assert res_data["total"] == 1 - assert "29" in res_data["item"][0]["name"] + assert "29" in res_data["items"][0]["name"] # search by description res = await test_client.get(f"{BASE_URL}", query={"search": "Description-1"}, cookies=cookies_org_manager) res_data = await res.json() assert res.status == 200 assert res_data["total"] == 11 - assert len(res_data["item"]) == 11 - for item in res_data["item"]: + assert len(res_data["items"]) == 11 + for item in res_data["items"]: assert_instance_fields(item, expected_fields) # Update - update_org = res_data["item"][0] + update_org = res_data["items"][0] updated_org = await update_instance( test_client, f"{BASE_URL}/{update_org['id']}/", @@ -99,4 +99,4 @@ async def check_page(page, expected_count): cookies=cookies_org_manager, ) res_data = await res.json() - assert len(res_data["item"]) == 29 + assert len(res_data["items"]) == 29 diff --git a/backend/app/service_project/controller.py b/backend/app/service_project/controller.py index e3aac61..4d300d8 100644 --- a/backend/app/service_project/controller.py +++ b/backend/app/service_project/controller.py @@ -148,7 +148,7 @@ async def list_projects( return ok( { - "item": result, + "items": result, "page": page, "page_size": page_size, "total": total_count, diff --git a/backend/app/service_project/tests/test_project.py b/backend/app/service_project/tests/test_project.py index 6d0c08a..6117db2 100644 --- a/backend/app/service_project/tests/test_project.py +++ b/backend/app/service_project/tests/test_project.py @@ -184,8 +184,8 @@ async def test_project_CRUD_and_search( res_data = await res.json() assert res.status == 200 assert res_data["page"] == 1 - assert len(res_data["item"]) == 5 - for item in res_data["item"]: + assert len(res_data["items"]) == 5 + for item in res_data["items"]: assert_instance_fields(item, expected_fields) # second page @@ -193,8 +193,8 @@ async def test_project_CRUD_and_search( res_data = await res.json() assert res.status == 200 assert res_data["page"] == 2 - assert len(res_data["item"]) == 3 - for item in res_data["item"]: + assert len(res_data["items"]) == 3 + for item in res_data["items"]: assert_instance_fields(item, expected_fields) # Search name @@ -203,7 +203,7 @@ async def test_project_CRUD_and_search( res_data = await res.json() assert res.status == 200 assert res_data["total"] == 2 - assert "public project 1" in res_data["item"][0]["name"] + assert "public project 1" in res_data["items"][0]["name"] # failed due to invalid format filters res = await test_client.get( @@ -233,7 +233,7 @@ async def test_project_CRUD_and_search( res_data = await res.json() assert res.status == 200 assert res_data["total"] == 2 - assert "public project 1" in res_data["item"][0]["name"] + assert "public project 1" in res_data["items"][0]["name"] # Search by role res = await test_client.get( @@ -265,8 +265,8 @@ async def test_project_CRUD_and_search( res_data = await res.json() assert res.status == 200 assert res_data["total"] == 8 - assert len(res_data["item"]) == 5 - for item in res_data["item"]: + assert len(res_data["items"]) == 5 + for item in res_data["items"]: assert_instance_fields(item, expected_fields) # Get by desc sorting (asc by default) @@ -283,13 +283,13 @@ async def test_project_CRUD_and_search( res_data = await res.json() assert res.status == 200 assert res_data["total"] == 8 - assert len(res_data["item"]) == 5 - for item in res_data["item"]: + assert len(res_data["items"]) == 5 + for item in res_data["items"]: assert_instance_fields(item, expected_fields) - assert res_data["item"][0]["name"] == "user public project 1" # this one if not sorting will be last + assert res_data["items"][0]["name"] == "user public project 1" # this one if not sorting will be last # Test RBAC for update/delete - project_id = res_data["item"][0]["id"] + project_id = res_data["items"][0]["id"] update_data = {"name": "Edited name", "description": "Edited description"} for cookies in [cookies_contributor, cookies_viewer]: diff --git a/backend/app/settings.py b/backend/app/settings.py index d8478f8..09adf2f 100644 --- a/backend/app/settings.py +++ b/backend/app/settings.py @@ -27,11 +27,8 @@ ENV = os.environ.get("APP_ENV", "test") SECRET = os.environ.get("APP_SECRET", "default_secret") DOMAIN = os.environ.get("APP_DOMAIN", "localhost") -BASE_API_HOST = "localhost" -IS_SSL = False -if DOMAIN != "localhost": - IS_SSL = True - BASE_API_HOST = "river-backend" +BASE_API_HOST = "localhost" if ENV != "prod" else "river-backend" +IS_SSL = False if DOMAIN == "localhost" else True DEBUG = os.environ.get("DEBUG", 0) SUPERUSER_EMAILS = os.environ.get("SUPERUSER_EMAILS", "").split(",") diff --git a/backend/app/utils/executor/ssh.py b/backend/app/utils/executor/ssh.py index 918cfb5..5e19fb1 100644 --- a/backend/app/utils/executor/ssh.py +++ b/backend/app/utils/executor/ssh.py @@ -201,13 +201,20 @@ def _create_tunnel(job: WebJob): def _delete_tunnel(job: WebJob): try: + killed = False for proc in psutil.process_iter(["pid", "name", "connections"]): - for conn in proc.info.get("connections", []): - if conn.laddr and conn.laddr.port == job.local_port: - logger.info(f"[INFO] Killing process {proc.pid} on port {job.local_port}") - proc.kill() + try: + for conn in proc.info.get("connections", []): + if conn.laddr and conn.laddr.port == job.local_port: + logger.info(f"[INFO] Killing process {proc.pid} on port {job.local_port}") + proc.kill() + killed = True + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + continue + if not killed: + logger.info(f"[INFO] No process found on port {job.local_port}") except Exception as e: - logger.error(f"[ERROR] Failed to kill tunnel: {e}") + logger.error(f"Failed to kill tunnel: {e}") def _create_traefik_config(job: WebJob): config_path = os.path.join(TRAEFIK_DYNAMIC_FOLDER, f"{job.id}.yaml") @@ -235,8 +242,14 @@ def _create_traefik_config(job: WebJob): address: "{BASE_API}/api/pros/{job.project.id}/job/{job.id}/{job.access_by.value}" trustForwardHeader: true authResponseHeaders: - - X-User-ID - - X-User-Role + - X-User-ID + - X-User-Role + authRequestHeaders: + - Cookie + - Authorization + - X-Forwarded-For + - X-Forwarded-Proto + - X-Forwarded-Host strip-job-{job.id}: stripPrefix: @@ -275,7 +288,13 @@ def _create_traefik_config(job: WebJob): trustForwardHeader: true authResponseHeaders: - X-User-ID - - X-User-Role + - X-User-Role + authRequestHeaders: + - Cookie + - Authorization + - X-Forwarded-For + - X-Forwarded-Proto + - X-Forwarded-Host job-header-{job.id}: headers: @@ -294,9 +313,11 @@ def _create_traefik_config(job: WebJob): # Monitor job STAGING_STATUS = [ JobStatus.SUBMITTED, + JobStatus.SUBMITTING, JobStatus.PENDING, JobStatus.RUNNING, JobStatus.CANCELLING, + JobStatus.HPC_DISCONNECTED, ] if job.status in STAGING_STATUS: update_info = {} @@ -305,9 +326,12 @@ def _create_traefik_config(job: WebJob): out, error, exit_code = self._run_cmd( f"squeue --job {job.external_id} --format='%.18i %.9T %.10M %.20V %R' --noheader | head -n1" ) - logger.info(f"squeue output: {out.strip() if out else ''}, error: {error.strip()}, exit_code: {exit_code}") - - if exit_code == 0 and out.strip(): + logger.info( + f"squeue output: {out.strip() if out else ''}, error: {error.strip() if error else ''}, exit_code: {exit_code}" + ) + if exit_code is None: + update_info["status"] = JobStatus.HPC_DISCONNECTED + elif exit_code == 0 and out.strip(): _get_update_info(update_info, out, job, method="squeue") else: # Fallback to sacct (for completed, failed, or recently finished jobs) @@ -330,7 +354,6 @@ def _create_traefik_config(job: WebJob): if job.local_port and job.remote_port and not _is_local_tunnel_existed(job.local_port): _create_tunnel(job) - # Render the config to make reverse proxy config_path = _create_traefik_config(job) if job.status != JobStatus.RUNNING: @@ -503,17 +526,22 @@ def _generate_script(self, server: str, job: WebJob, params: dict) -> str: mkdir -p $NXF_SINGULARITY_CACHEDIR # === Install pixi === -which pixi || curl -fsSL https://pixi.sh/install.sh | sh +if [ ! -f "$HOME/.pixi/bin/pixi" ]; then + curl -fsSL https://pixi.sh/install.sh | sh +else + echo "Pixi already installed." +fi + export PATH=$PATH:$HOME/.pixi/bin # Only append channels if not already present -if ! pixi config get default-channels --global | grep -q bioconda; then +if ! pixi config list default-channels --global | grep -q bioconda; then pixi config append default-channels bioconda --global fi -if ! pixi config get default-channels --global | grep -q conda-forge; then +if ! pixi config list default-channels --global | grep -q conda-forge; then pixi config append default-channels conda-forge --global fi # Only install packages if not already installed -REQUIRED_PIXI_PKGS="nextflow jq git singularity python=3.14" +REQUIRED_PIXI_PKGS="nextflow jq git singularity python=3.12" for pkg in $REQUIRED_PIXI_PKGS; do if ! pixi global list | grep -E "^$pkg\b"; then pixi global install $pkg @@ -521,7 +549,7 @@ def _generate_script(self, server: str, job: WebJob, params: dict) -> str: done pixi config append default-channels bioconda --global pixi config append default-channels conda-forge --global -pixi global install nextflow jq git singularity python=3.14 +pixi global install nextflow jq git singularity python=3.12 # === Install Goofys if missing === if [ ! -f "$GOOFYS_PATH" ]; then diff --git a/backend/migrations/models/5_20260105205918_update.py b/backend/migrations/models/5_20260105205918_update.py new file mode 100644 index 0000000..44afd44 --- /dev/null +++ b/backend/migrations/models/5_20260105205918_update.py @@ -0,0 +1,49 @@ +from tortoise import BaseDBAsyncClient + + +async def upgrade(db: BaseDBAsyncClient) -> str: + return """ + COMMENT ON COLUMN "batchjob"."status" IS 'SUBMITTING: SUBMITTING +SUBMITTED: SUBMITTED +RUNNING: RUNNING +PENDING: PENDING +FAILED: FAILED +COMPLETED: COMPLETED +HPC_DISCONNECTED: HPC_DISCONNECTED +CANCELLED: CANCELLED +CANCELLING: CANCELLED+ +TIMEOUT: TIMEOUT'; + COMMENT ON COLUMN "webjob"."status" IS 'SUBMITTING: SUBMITTING +SUBMITTED: SUBMITTED +RUNNING: RUNNING +PENDING: PENDING +FAILED: FAILED +COMPLETED: COMPLETED +HPC_DISCONNECTED: HPC_DISCONNECTED +CANCELLED: CANCELLED +CANCELLING: CANCELLED+ +TIMEOUT: TIMEOUT';""" + + +async def downgrade(db: BaseDBAsyncClient) -> str: + return """ + COMMENT ON COLUMN "webjob"."status" IS 'SUBMITTING: SUBMITTING +SUBMITTED: SUBMITTED +RUNNING: RUNNING +PENDING: PENDING +FAILED: FAILED +COMPLETED: COMPLETED +SERVER_ERROR: SERVER_ERROR +CANCELLED: CANCELLED +CANCELLING: CANCELLED+ +TIMEOUT: TIMEOUT'; + COMMENT ON COLUMN "batchjob"."status" IS 'SUBMITTING: SUBMITTING +SUBMITTED: SUBMITTED +RUNNING: RUNNING +PENDING: PENDING +FAILED: FAILED +COMPLETED: COMPLETED +SERVER_ERROR: SERVER_ERROR +CANCELLED: CANCELLED +CANCELLING: CANCELLED+ +TIMEOUT: TIMEOUT';""" diff --git a/backend/migrations/models/6_20260105224743_update.py b/backend/migrations/models/6_20260105224743_update.py new file mode 100644 index 0000000..53b0033 --- /dev/null +++ b/backend/migrations/models/6_20260105224743_update.py @@ -0,0 +1,11 @@ +from tortoise import BaseDBAsyncClient + + +async def upgrade(db: BaseDBAsyncClient) -> str: + return """ + ALTER TABLE "github" ALTER COLUMN "token" TYPE TEXT USING "token"::TEXT;""" + + +async def downgrade(db: BaseDBAsyncClient) -> str: + return """ + ALTER TABLE "github" ALTER COLUMN "token" TYPE VARCHAR(200) USING "token"::VARCHAR(200);""" diff --git a/backend/perf/install.sh b/backend/perf/install.sh index d1db931..580098d 100644 --- a/backend/perf/install.sh +++ b/backend/perf/install.sh @@ -12,7 +12,11 @@ tar xf pypy3.11-v7.3.20-linux64.tar.bz2 ./pypy3.11-v7.3.20-linux64/bin/pypy -mpip install -U pip wheel ./pypy3.11-v7.3.20-linux64/bin/pypy -mpip install -r requirements.txt +# download for bombardier +wget https://github.com/codesenberg/bombardier/releases/download/v2.0.2/bombardier-linux-amd64 +chmod +x bombardier-linux-amd64 +./bombardier-linux-amd64 -c 2000 -d 10s -l http://river-backend:8000/api/health # pypy3.11 socketify ./pypy3.11-v7.3.20-linux64/bin/pypy -m socketify app:app --worker 2 --port 8000 diff --git a/backend/requirements.txt b/backend/requirements.txt index ed5fe02..488ca76 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -18,4 +18,5 @@ celery==5.5.3 socketify==0.0.31 psycopg[binary]==3.3.2 psycopg-pool==3.3.0 -anyio==4.12.0 \ No newline at end of file +anyio==4.12.0 +cffi==1.18.0.dev0 \ No newline at end of file diff --git a/backend/run_server.sh b/backend/run_server.sh index 28917fb..66f3c8e 100644 --- a/backend/run_server.sh +++ b/backend/run_server.sh @@ -13,4 +13,4 @@ echo "Starting BlackSheep app..." pypy -m socketify app.main:app \ --host 0.0.0.0 \ --port 8000 \ - --workers "$(nproc --all)" \ No newline at end of file + --workers $(nproc --all) \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index f5baf6a..f4f6d11 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -76,6 +76,8 @@ services: ports: - "6379:6379" restart: unless-stopped + environment: + - TZ=UTC celery: build: ./backend diff --git a/frontend/src/features/analysis/contexts/AnalysisContext.tsx b/frontend/src/features/analysis/contexts/AnalysisContext.tsx index 9df95a9..f575c73 100755 --- a/frontend/src/features/analysis/contexts/AnalysisContext.tsx +++ b/frontend/src/features/analysis/contexts/AnalysisContext.tsx @@ -330,7 +330,7 @@ export const AnalysisContextProvider: React.FC<{ children: ReactNode }> = ({ `analysis/${updatedTool.id}/`, removeIDUpdateTool ); - if (response.status === 200) { + if (response.status === 204) { fetchTools(0, 20); return response; } diff --git a/frontend/src/features/credential/contexts/CredentialContext.tsx b/frontend/src/features/credential/contexts/CredentialContext.tsx index 9009f07..5f0e905 100755 --- a/frontend/src/features/credential/contexts/CredentialContext.tsx +++ b/frontend/src/features/credential/contexts/CredentialContext.tsx @@ -137,6 +137,7 @@ export const CredentialContextProvider: React.FC<{ children: ReactNode }> = ({ case CredentialTypeEnum.GITHUB: if ( existing.type === CredentialTypeEnum.GITHUB + && existing.username ) { update = true; } diff --git a/frontend/src/features/project/components/Projects.tsx b/frontend/src/features/project/components/Projects.tsx index b6d55a1..8bae452 100755 --- a/frontend/src/features/project/components/Projects.tsx +++ b/frontend/src/features/project/components/Projects.tsx @@ -45,7 +45,7 @@ const Projects: React.FC = () => { const [sorting, setSorting] = useState([]); const [pagination, setPagination] = useState({ pageIndex: 0, - pageSize: 5, + pageSize: 10, }); // Fetch projects