Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
3ca6f4c
feat(workspaces): add rwx perms to blueprints and ranges
Adamkadaban Jul 27, 2025
cfd38d8
fix(tests): fix unit test for range crud admin check to check perms i…
Adamkadaban Jul 27, 2025
8b8887a
fix(types): fix mypy issues in rwx
Adamkadaban Jul 27, 2025
350593b
test(rwx): add tests for blueprint and range permissions
Adamkadaban Jul 27, 2025
718acf9
style(rwx): fix ruff and black errors for range and blueprint perms
Adamkadaban Jul 27, 2025
2cb5136
Merge branch 'workspaces-staging' into adam/workspaces2/perms
Adamkadaban Jul 27, 2025
43133e7
fix(rwx): move id and perm type to mixin
Adamkadaban Jul 31, 2025
b08cbb4
fix(rwx): make all schemas inherit rwx
Adamkadaban Jul 31, 2025
5ec03d1
fix(rwx): make all schemas inherit rwx computed fields
Adamkadaban Jul 31, 2025
a42af7f
fix(rwx): no longer delete rw fields now that theyre in base schema
Adamkadaban Jul 31, 2025
439ad61
fix(rwx): remove duplicate test data
Adamkadaban Jul 31, 2025
f1d57b5
fix(rwx): move from literalstring to enum for perm types
Adamkadaban Jul 31, 2025
7b58b3a
fix(rwx): better permission checks in crud
Adamkadaban Jul 31, 2025
6be6579
fix(rwx): better permission checks in crud
Adamkadaban Jul 31, 2025
80f6bf8
fix(rwx): improve type hints for range model
Adamkadaban Jul 31, 2025
763a8ce
fix(rwx): various formatting and doc fixes
Adamkadaban Aug 2, 2025
ff934c9
fix(rwx): styling and type fixes
Adamkadaban Aug 3, 2025
d482741
fix(rwx): clean up redundant tests
Adamkadaban Aug 3, 2025
6022ece
fix(rwx): clean up redundant tests
Adamkadaban Aug 3, 2025
06ea902
fix(rwx): type safety issues
Adamkadaban Aug 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
313 changes: 313 additions & 0 deletions api/src/app/crud/crud_permissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
import logging

from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession

from ..enums.permissions import BlueprintPermissionType, DeployedRangePermissionType
from ..models.permission_models import (
BlueprintRangePermissionModel,
DeployedRangePermissionModel,
)
from ..models.range_models import BlueprintRangeModel, DeployedRangeModel

logger = logging.getLogger(__name__)


async def grant_blueprint_permission(
db: AsyncSession,
blueprint_range_id: int,
user_id: int,
permission_type: BlueprintPermissionType,
requesting_user_id: int,
) -> BlueprintRangePermissionModel:
"""Grant permission to a blueprint range.

Args:
----
db: Database session
blueprint_range_id: ID of blueprint range to grant permission for
user_id: ID of user to grant permission to
permission_type: Type of permission to grant
requesting_user_id: ID of user requesting to grant permission (must be owner)

Returns:
-------
BlueprintRangePermissionModel: The created permission

Raises:
------
SQLAlchemyError: If database operation fails
ValueError: If requesting user is not the owner

"""
stmt = select(BlueprintRangeModel).where(
BlueprintRangeModel.id == blueprint_range_id
)
result = await db.execute(stmt)
blueprint_range = result.scalar_one_or_none()

if not blueprint_range:
msg = f"Blueprint range {blueprint_range_id} not found"
raise ValueError(msg)

if blueprint_range.owner_id != requesting_user_id:
msg = f"Only the owner can grant permissions on blueprint range {blueprint_range_id}"
raise ValueError(msg)

permission = BlueprintRangePermissionModel(
blueprint_range_id=blueprint_range_id,
user_id=user_id,
permission_type=permission_type,
)

db.add(permission)

try:
await db.flush()
await db.refresh(permission)
logger.debug(
"Granted %s permission on blueprint range %s to user %s",
permission_type,
blueprint_range_id,
user_id,
)
except SQLAlchemyError as e:
logger.exception(
"Failed to grant %s permission on blueprint range %s to user %s: %s",
permission_type,
blueprint_range_id,
user_id,
e,
)
raise

return permission


async def grant_deployed_permission(
db: AsyncSession,
deployed_range_id: int,
user_id: int,
permission_type: DeployedRangePermissionType,
requesting_user_id: int,
) -> DeployedRangePermissionModel:
"""Grant permission to a deployed range.

Args:
----
db: Database session
deployed_range_id: ID of deployed range to grant permission for
user_id: ID of user to grant permission to
permission_type: Type of permission to grant
requesting_user_id: ID of user requesting to grant permission (must be owner)

Returns:
-------
DeployedRangePermissionModel: The created permission

Raises:
------
SQLAlchemyError: If database operation fails
ValueError: If requesting user is not the owner

"""
stmt = select(DeployedRangeModel).where(DeployedRangeModel.id == deployed_range_id)
result = await db.execute(stmt)
deployed_range = result.scalar_one_or_none()

if not deployed_range:
msg = f"Deployed range {deployed_range_id} not found"
raise ValueError(msg)

if deployed_range.owner_id != requesting_user_id:
msg = f"User {requesting_user_id} is not the owner of deployed range {deployed_range_id}"
raise ValueError(msg)

permission = DeployedRangePermissionModel(
deployed_range_id=deployed_range_id,
user_id=user_id,
permission_type=permission_type,
)

db.add(permission)

try:
await db.flush()
await db.refresh(permission)
logger.debug(
"Granted %s permission on deployed range %s to user %s",
permission_type,
deployed_range_id,
user_id,
)
except SQLAlchemyError as e:
logger.exception(
"Failed to grant %s permission on deployed range %s to user %s: %s",
permission_type,
deployed_range_id,
user_id,
e,
)
raise

return permission


async def revoke_blueprint_permission(
db: AsyncSession,
blueprint_range_id: int,
user_id: int,
permission_type: BlueprintPermissionType,
requesting_user_id: int,
) -> bool:
"""Revoke permission from a blueprint range.

Args:
----
db: Database session
blueprint_range_id: ID of blueprint range to revoke permission from
user_id: ID of user to revoke permission from
permission_type: Type of permission to revoke
requesting_user_id: ID of user requesting to revoke permission (must be owner)

Returns:
-------
bool: True if permission was revoked, False if not found

Raises:
------
SQLAlchemyError: If database operation fails
ValueError: If requesting user is not the owner

"""
# Check ownership
stmt = select(BlueprintRangeModel).where(
BlueprintRangeModel.id == blueprint_range_id
)
result = await db.execute(stmt)
blueprint_range = result.scalar_one_or_none()

if not blueprint_range:
msg = f"Blueprint range {blueprint_range_id} not found"
raise ValueError(msg)

if blueprint_range.owner_id != requesting_user_id:
msg = f"User {requesting_user_id} is not the owner of blueprint range {blueprint_range_id}"
raise ValueError(msg)

permission_stmt = select(BlueprintRangePermissionModel).where(
BlueprintRangePermissionModel.blueprint_range_id == blueprint_range_id,
BlueprintRangePermissionModel.user_id == user_id,
BlueprintRangePermissionModel.permission_type == permission_type,
)
result = await db.execute(permission_stmt)
permission = result.scalar_one_or_none()

if not permission:
logger.warning(
"Permission %s on blueprint range %s for user %s not found",
permission_type,
blueprint_range_id,
user_id,
)
return False

try:
await db.delete(permission)
await db.flush()
logger.debug(
"Revoked %s permission on blueprint range %s from user %s",
permission_type,
blueprint_range_id,
user_id,
)
except SQLAlchemyError as e:
logger.exception(
"Failed to revoke %s permission on blueprint range %s from user %s: %s",
permission_type,
blueprint_range_id,
user_id,
e,
)
raise

return True


async def revoke_deployed_permission(
db: AsyncSession,
deployed_range_id: int,
user_id: int,
permission_type: DeployedRangePermissionType,
requesting_user_id: int,
) -> bool:
"""Revoke permission from a deployed range.

Args:
----
db: Database session
deployed_range_id: ID of deployed range to revoke permission from
user_id: ID of user to revoke permission from
permission_type: Type of permission to revoke
requesting_user_id: ID of user requesting to revoke permission (must be owner)

Returns:
-------
bool: True if permission was revoked, False if not found

Raises:
------
SQLAlchemyError: If database operation fails
ValueError: If requesting user is not the owner

"""
stmt = select(DeployedRangeModel).where(DeployedRangeModel.id == deployed_range_id)
result = await db.execute(stmt)
deployed_range = result.scalar_one_or_none()

if not deployed_range:
msg = f"Deployed range {deployed_range_id} not found"
raise ValueError(msg)

if deployed_range.owner_id != requesting_user_id:
msg = f"User {requesting_user_id} is not the owner of deployed range {deployed_range_id}"
raise ValueError(msg)

permission_stmt = select(DeployedRangePermissionModel).where(
DeployedRangePermissionModel.deployed_range_id == deployed_range_id,
DeployedRangePermissionModel.user_id == user_id,
DeployedRangePermissionModel.permission_type == permission_type,
)
result = await db.execute(permission_stmt)
permission = result.scalar_one_or_none()

if not permission:
logger.warning(
"Permission %s on deployed range %s for user %s not found",
permission_type,
deployed_range_id,
user_id,
)
return False

try:
await db.delete(permission)
await db.flush()
logger.debug(
"Revoked %s permission on deployed range %s from user %s",
permission_type,
deployed_range_id,
user_id,
)
except SQLAlchemyError as e:
logger.exception(
"Failed to revoke %s permission on deployed range %s from user %s: %s",
permission_type,
deployed_range_id,
user_id,
e,
)
raise

return True
Loading
Loading