Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d546462
fix: check for pixi before installing
nttg8100 Dec 29, 2025
a5867d2
fix: update docker images, fix cffi
nttg8100 Dec 29, 2025
9c486c9
feat: add download cmd for bombardier
nttg8100 Dec 30, 2025
c241503
fix: fix response for pagination
nttg8100 Dec 30, 2025
48bd16c
fix: fix update for credential github
nttg8100 Dec 30, 2025
891a4f6
chore: ignore failed cmt
nttg8100 Dec 31, 2025
2975876
chore: add clearer message for analysis
nttg8100 Dec 31, 2025
7ba433e
chore: fix socketify parsing query error
nttg8100 Dec 31, 2025
cf4c885
chore: fix docker socketify
nttg8100 Dec 31, 2025
b118f18
chore: add detach for production
nttg8100 Dec 31, 2025
0e0df26
fix: ignore websocket forward auth
nttg8100 Jan 1, 2026
857c875
feat: add time zone env to redis
nttg8100 Jan 1, 2026
c199776
chore: add .pixi to gitignore
nttg8100 Jan 1, 2026
5b4b0da
fix: fix update for analysis
nttg8100 Jan 1, 2026
44ea936
fix: catch correct status code when update analysis
nttg8100 Jan 1, 2026
9791232
chore: simplify TLS and domain setup
nttg8100 Jan 1, 2026
1cb2916
fix: middleware auth api domain
nttg8100 Jan 3, 2026
7514549
fix: use max cpus on API server
nttg8100 Jan 3, 2026
4419d06
feat: update message
nttg8100 Jan 3, 2026
30964d7
fix: test on analysis with pagination
nttg8100 Jan 3, 2026
0b2917f
fix: install dependencies on test env
nttg8100 Jan 3, 2026
bc8ac5e
fix: add more make cmd to phony
nttg8100 Jan 3, 2026
259339a
fix: ci setup
nttg8100 Jan 3, 2026
808a9c7
fix: ci setup
nttg8100 Jan 3, 2026
601ef5e
feat: handle server disconnect
nttg8100 Jan 5, 2026
189f04a
fix: add github token to encrypt
nttg8100 Jan 5, 2026
3882aed
feat: remove credentails from params in celery
nttg8100 Jan 7, 2026
819d862
RC-153 chore improve security (#90)
nttg8100 Jan 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ PYPY_VERSION := 3.11-v7.3.20-linux64
backend/pypy${PYPY_VERSION}/bin/pypy3:
wget https://downloads.python.org/pypy/pypy${PYPY_VERSION}.tar.bz2 -O backend/pypy${PYPY_VERSION}.tar.bz2
tar xf backend/pypy${PYPY_VERSION}.tar.bz2 -C backend

# there is the issue of parsing query string in socketify with asgi, https://github.com/cirospaciari/socketify.py/issues/218
install-backend-deps: backend/pypy${PYPY_VERSION}/bin/pypy3
cd backend && ./pypy${PYPY_VERSION}/bin/pypy3 -m ensurepip && ./pypy${PYPY_VERSION}/bin/pypy3 -mpip install -r requirements.txt
cd backend && sed -i 's/"query_string": ffi.unpack(info.query_string, info.query_string_size)/"query_string": ffi.unpack(info.query_string, info.query_string_size)[1:]/g' pypy${PYPY_VERSION}/lib/pypy3.11/site-packages/socketify/asgi.py

dev-frontend: ~/.pixi/bin/pixi
cd frontend && pixi run npm install -f
Expand Down Expand Up @@ -53,7 +57,7 @@ migrate-dev-db: backend/pypy${PYPY_VERSION}/bin/pypy3
start-test-db:
docker compose up test-db -d

migrate-test-db: backend/pypy${PYPY_VERSION}/bin/pypy3
migrate-test-db: install-backend-deps
cd backend && ./pypy${PYPY_VERSION}/bin/pypy3 -m aerich init-db || echo "DB already existed"
cd backend && ./pypy${PYPY_VERSION}/bin/pypy3 -m aerich upgrade

Expand Down Expand Up @@ -85,11 +89,11 @@ start-traefik: ~/.pixi/bin/pixi
cd traefik && pixi run traefik --configFile=traefik.dev.yaml

## Tests: Currently testing for backend only
.PHONY: start-test-infra test-auth test-health test-organization test-all
.PHONY: start-test-infra test-auth test-health test-organization test-all install-backend-deps install-test-deps test-credential test-analysis test-project test-storage test-job
start-test-infra: start-test-db start-redis start-localstack start-slurm
@echo "Start test infra setup"

install-test-deps: backend/pypy${PYPY_VERSION}/bin/pypy3
install-test-deps: install-backend-deps
cd backend && ./pypy${PYPY_VERSION}/bin/pypy3 -mpip install pytest pytest-cov pytest-asyncio

test-auth: install-test-deps
Expand Down Expand Up @@ -129,7 +133,7 @@ staging: .env
production: .env.prod
docker compose --env-file .env.prod build
sed -e 's#HOST#$(shell cat .env.prod|grep APP_DOMAIN|cut -f2 -d=)#g' traefik/base/prod.yaml > traefik/dynamic/prod.yaml
docker compose --env-file .env.prod up traefik
docker compose --env-file .env.prod up traefik -d

# CLEAN
.PHONY: clean-dev-db clean-test-db clean-redis
Expand Down
2 changes: 1 addition & 1 deletion backend/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ venv.bak/
# mypy
.mypy_cache/
.pytest # pixi environments
.pixi/*
.pixi
!.pixi/config.toml
bombardier-linux-amd64
*tar.bz2
Expand Down
9 changes: 3 additions & 6 deletions backend/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
FROM pypy:3.11-slim

ENV PYTHONUNBUFFERED=1
WORKDIR /app

RUN apt-get update && apt-get install -y \
build-essential pkg-config \
build-essential pkg-config libuv1 \
libffi-dev libssl-dev libsodium-dev make libpq-dev \
&& rm -rf /var/lib/apt/lists/*

COPY requirements.txt .
RUN pip install --upgrade pip && pip install --no-cache-dir -r requirements.txt

# Fix socketify ASGI query string parsing issue
RUN sed -i 's/"query_string": ffi.unpack(info.query_string, info.query_string_size)/"query_string": ffi.unpack(info.query_string, info.query_string_size)[1:]/g' /opt/pypy/lib/pypy3.11/site-packages/socketify/asgi.py
COPY app ./app
COPY domain ./domain
COPY migrations ./migrations
COPY pyproject.toml .
COPY settings.yaml .
COPY run_server.sh ./
RUN chmod +x run_server.sh
RUN apt-get update && apt-get install -y libuv1
EXPOSE 8000

RUN useradd -m appuser
Expand Down
1 change: 0 additions & 1 deletion backend/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ async def configure_redis(app):
JobController,
]
)
print("Failed here")
# Database 2 3
register_tortoise(
app,
Expand Down
24 changes: 13 additions & 11 deletions backend/app/service_analysis/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from tortoise.expressions import Q
from app.service_credential.models.base import Credential
from app.utils.version_control.manager import get_version_control_manager
from .serializers import AnalysisSerializerIn, AnalysisSerializerOut
from .serializers import AnalysisSerializerIn, AnalysisSerializerOut, AnalysisUpdateSerializer
from .models import Analysis


Expand Down Expand Up @@ -95,7 +95,7 @@ async def get_analysis(

return ok(
{
"item": result,
"items": result,
"page": page,
"page_size": page_size,
"total": total_count,
Expand Down Expand Up @@ -181,17 +181,19 @@ async def get_profile_schema(

@auth("authenticated")
@put("/{id}/")
async def update_analysis(id: UUID, request: Request) -> Response:
async def update_analysis(
request: Request,
id: UUID,
data: AnalysisUpdateSerializer,
) -> Response:
created_by = await request.identity.get_user()
data = await request.json()
if "allow_access" not in data.keys():
return bad_request({"message": "Missing allow_access field"})
if "has_base_url" not in data.keys():
return bad_request({"message": "Missing has_base_url field"})

await Analysis.filter(id=id, created_by=created_by.email).update(
allow_access=data["allow_access"], has_base_url=data["has_base_url"]

updated_count = await Analysis.filter(id=id, created_by=created_by.email).update(
allow_access=data.allow_access, has_base_url=data.has_base_url
)
if updated_count == 0:
return bad_request({"message": "Analysis not found or unauthorized"})

return no_content()

@auth("authenticated")
Expand Down
6 changes: 6 additions & 0 deletions backend/app/service_analysis/serializers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from tortoise.contrib.pydantic import pydantic_model_creator
from pydantic import BaseModel
from .models import Analysis


Expand All @@ -14,3 +15,8 @@
Analysis,
name="AnalysisOut",
)


class AnalysisUpdateSerializer(BaseModel):
has_base_url: bool
allow_access: bool
20 changes: 10 additions & 10 deletions backend/app/service_analysis/tests/test_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,16 @@ async def test_basic_analysis(

# list instance as admin
res_admin = await list_instance(test_client, f"{BASE_URL}/", cookies_admin, 200)
assert len(res_admin["item"]) == 1
assert_instance_fields(res_admin["item"][0], EXPECTED_FIELDS)
assert len(res_admin["items"]) == 1
assert_instance_fields(res_admin["items"][0], EXPECTED_FIELDS)

# list instance as contributor
res_contrib = await list_instance(test_client, f"{BASE_URL}/", cookies_contributor, 200)
assert len(res_contrib["item"]) == 1
assert_instance_fields(res_contrib["item"][0], EXPECTED_FIELDS)
assert len(res_contrib["items"]) == 1
assert_instance_fields(res_contrib["items"][0], EXPECTED_FIELDS)

# delete as admin (should fail)
delete_url = f"{BASE_URL}/{res_admin['item'][0]['id']}/"
delete_url = f"{BASE_URL}/{res_admin['items'][0]['id']}/"
await delete_instance(test_client, delete_url, cookies_admin, 204)

# delete as superuser (should pass)
Expand Down Expand Up @@ -93,23 +93,23 @@ async def get_page(page, page_size):
# page 1
data = await get_page(1, 10)
assert data["page"] == 1
assert len(data["item"]) == 10
assert len(data["items"]) == 10

# page 2
data = await get_page(2, 10)
assert data["page"] == 2
assert len(data["item"]) == 10
assert len(data["items"]) == 10

# last page
data = await get_page(4, 10)
assert data["page"] == 4
assert len(data["item"]) == 0
assert len(data["items"]) == 0
# too large page
data = await get_page(99, 10)
assert data["item"] == []
assert data["items"] == []

# search by url
res = await test_client.get(f"{BASE_URL}/", query={"search": "repo-1"}, cookies=cookies_admin)
data = await res.json()
assert res.status == 200
assert any("repo-1" in item["url"] for item in data["item"])
assert any("repo-1" in item["url"] for item in data["items"])
2 changes: 1 addition & 1 deletion backend/app/service_credential/models/personal.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ class Github(models.Model):
credential = fields.OneToOneField("models.Credential", related_name="github", on_delete=fields.CASCADE)
id = fields.UUIDField(primary_key=True, default=uuid.uuid4)
username = fields.CharField(max_length=50, null=False)
token = fields.CharField(max_length=200, null=False)
token = EncryptedTextField(max_length=200, null=False)
18 changes: 11 additions & 7 deletions backend/app/service_job/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ async def create_job(self, project_id: UUID, job: JobSerializerIn, request: Requ
project = await Project.get_or_none(id=project_id)
params["bucket_name"] = project.bucket_name
await project.fetch_related("storage")

# Validate that storage credentials are configured (don't generate temp creds here)
storage_manager = await get_storage_manager(project.storage)
temp_secrets = storage_manager.get_temp_secret(session_name=user.email, duration=job["time"])
if not temp_secrets:
return bad_request({"message": "The ARN failed to create"})
if not storage_manager.is_valid():
return bad_request({"message": "Storage credentials are not valid"})

if analysis.allow_access:
job = await WebJob.create(**job)
Expand All @@ -74,9 +75,12 @@ async def create_job(self, project_id: UUID, job: JobSerializerIn, request: Requ
job_out = job_out.model_dump()
user = await request.identity.get_user()
job_out["created_by"] = user.email
# Submit to monitor job
params.update(temp_secrets)
submit_job.apply_async((job.id, analysis.allow_access, params))

# SECURITY: Pass only storage_id and user_email, not credentials
# Worker will regenerate credentials just-in-time to avoid Redis exposure
submit_job.apply_async(
(str(job.id), analysis.allow_access, params, str(project.storage.pk), user.email, job.time)
)
return created(job_out)

@auth("viewer")
Expand Down Expand Up @@ -149,7 +153,7 @@ async def list_web_job(

return ok(
{
"item": result,
"items": result,
"page": page,
"page_size": page_size,
"total": total_count,
Expand Down
2 changes: 1 addition & 1 deletion backend/app/service_job/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class JobStatus(str, Enum):
PENDING = "PENDING"
FAILED = "FAILED"
COMPLETED = "COMPLETED"
SERVER_ERROR = "SERVER_ERROR"
HPC_DISCONNECTED = "HPC_DISCONNECTED"
CANCELLED = "CANCELLED"
CANCELLING = "CANCELLED+"
TIMEOUT = "TIMEOUT"
Expand Down
51 changes: 48 additions & 3 deletions backend/app/service_job/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,24 @@


@shared_task
def submit_job(job_id: str, is_web_job: bool, params: dict):
def submit_job(job_id: str, is_web_job: bool, params: dict, storage_id: str, user_email: str, duration: int):
"""
Submit job to compute executor.

SECURITY: Regenerates AWS credentials just-in-time in the worker to avoid
passing credentials through Redis/Celery broker.

Args:
job_id: Job UUID
is_web_job: Whether this is a web job
params: Job parameters (without credentials)
storage_id: Storage credential ID to fetch from database
user_email: User email for STS session naming
duration: Duration in seconds for temporary credentials
"""
from app.utils.executor.manager import get_compute_executor
from app.utils.storage.manager import get_storage_manager
from app.service_credential.models.base import Credential
import logging

logger = logging.getLogger(__name__)
Expand All @@ -23,13 +39,36 @@ async def _handle():
return

try:
# SECURITY: Fetch storage credential from DB and generate temp credentials
# This happens in the worker, so credentials never pass through Redis
storage_credential = await Credential.get_or_none(id=storage_id)
if not storage_credential:
logger.error(f"Storage credential {storage_id} not found for job {job_id}")
job.status = JobStatus.FAILED
await job.save()
return

storage_manager = await get_storage_manager(storage_credential)
temp_secrets = storage_manager.get_temp_secret(session_name=user_email, duration=duration)

if not temp_secrets:
logger.error(f"Failed to generate temporary credentials for job {job_id}")
job.status = JobStatus.FAILED
await job.save()
return

# Add temp credentials to params
params.update(temp_secrets)
logger.info(f"Generated temporary credentials for job {job_id} (duration: {duration}s)")

# Submit job with credentials
await job.fetch_related("compute")
executor = await get_compute_executor(job.compute)
result = executor.submit(job, params)

job.external_id = result.get("job_id")
if job.external_id is None:
job.status = JobStatus.SERVER_ERROR
job.status = JobStatus.HPC_DISCONNECTED
else:
job.status = JobStatus.SUBMITTED
await job.save()
Expand Down Expand Up @@ -77,13 +116,19 @@ async def _handle():
JobStatus.CANCELLED,
JobStatus.CANCELLING,
JobStatus.TIMEOUT,
JobStatus.SERVER_ERROR,
]:
logger.info(f"Rescheduling monitoring for job {job.id}, current status: {job.status}")
monitor_job.apply_async(args=[job.id, is_web_job], countdown=5)
else:
logger.info(f"Monitoring finished for job {job.id}, status: {job.status}")

# SECURITY: Cleanup params.json from HPC in case job failed before it could delete it
try:
await executor.cleanup_credentials(job)
logger.info(f"Cleaned up credential files for job {job.id}")
except Exception as e:
logger.warning(f"Failed to cleanup credentials for job {job.id}: {e}")

except Exception:
logger.exception(f"Monitoring failed for WebJob {job_id}")
job.status = JobStatus.FAILED
Expand Down
10 changes: 5 additions & 5 deletions backend/app/service_job/tests/test_api_web_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ async def test_create_list_job(
)
assert res.status == 200
result = await res.json()
assert "item" in result
assert len(result["item"]) == 5
assert "items" in result
assert len(result["items"]) == 5
assert result["total"] >= 15

# List jobs with pagination (page=3, page_size=5)
Expand All @@ -92,8 +92,8 @@ async def test_create_list_job(
)
assert res.status == 200
result = await res.json()
assert "item" in result
assert len(result["item"]) == 5
assert "items" in result
assert len(result["items"]) == 5

# Search for a specific job by name
search_name = "customer-03"
Expand All @@ -104,7 +104,7 @@ async def test_create_list_job(
)
assert res.status == 200
result = await res.json()
assert any(job["name"] == search_name for job in result["item"])
assert any(job["name"] == search_name for job in result["items"])

# get log of current running job
# TODO: test job log
Expand Down
2 changes: 1 addition & 1 deletion backend/app/service_organization/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async def list_organization(

return ok(
{
"item": result,
"items": result,
"page": page,
"page_size": page_size,
"total": total_count,
Expand Down
Loading