diff --git a/api/src/app/crud/crud_permissions.py b/api/src/app/crud/crud_permissions.py new file mode 100644 index 00000000..bf66621d --- /dev/null +++ b/api/src/app/crud/crud_permissions.py @@ -0,0 +1,313 @@ +import logging + +from sqlalchemy import select +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.ext.asyncio import AsyncSession + +from ..enums.permissions import BlueprintPermissionType, DeployedRangePermissionType +from ..models.permission_models import ( + BlueprintRangePermissionModel, + DeployedRangePermissionModel, +) +from ..models.range_models import BlueprintRangeModel, DeployedRangeModel + +logger = logging.getLogger(__name__) + + +async def grant_blueprint_permission( + db: AsyncSession, + blueprint_range_id: int, + user_id: int, + permission_type: BlueprintPermissionType, + requesting_user_id: int, +) -> BlueprintRangePermissionModel: + """Grant permission to a blueprint range. + + Args: + ---- + db: Database session + blueprint_range_id: ID of blueprint range to grant permission for + user_id: ID of user to grant permission to + permission_type: Type of permission to grant + requesting_user_id: ID of user requesting to grant permission (must be owner) + + Returns: + ------- + BlueprintRangePermissionModel: The created permission + + Raises: + ------ + SQLAlchemyError: If database operation fails + ValueError: If requesting user is not the owner + + """ + stmt = select(BlueprintRangeModel).where( + BlueprintRangeModel.id == blueprint_range_id + ) + result = await db.execute(stmt) + blueprint_range = result.scalar_one_or_none() + + if not blueprint_range: + msg = f"Blueprint range {blueprint_range_id} not found" + raise ValueError(msg) + + if blueprint_range.owner_id != requesting_user_id: + msg = f"Only the owner can grant permissions on blueprint range {blueprint_range_id}" + raise ValueError(msg) + + permission = BlueprintRangePermissionModel( + blueprint_range_id=blueprint_range_id, + user_id=user_id, + permission_type=permission_type, + ) + + db.add(permission) + + try: + await db.flush() + await db.refresh(permission) + logger.debug( + "Granted %s permission on blueprint range %s to user %s", + permission_type, + blueprint_range_id, + user_id, + ) + except SQLAlchemyError as e: + logger.exception( + "Failed to grant %s permission on blueprint range %s to user %s: %s", + permission_type, + blueprint_range_id, + user_id, + e, + ) + raise + + return permission + + +async def grant_deployed_permission( + db: AsyncSession, + deployed_range_id: int, + user_id: int, + permission_type: DeployedRangePermissionType, + requesting_user_id: int, +) -> DeployedRangePermissionModel: + """Grant permission to a deployed range. + + Args: + ---- + db: Database session + deployed_range_id: ID of deployed range to grant permission for + user_id: ID of user to grant permission to + permission_type: Type of permission to grant + requesting_user_id: ID of user requesting to grant permission (must be owner) + + Returns: + ------- + DeployedRangePermissionModel: The created permission + + Raises: + ------ + SQLAlchemyError: If database operation fails + ValueError: If requesting user is not the owner + + """ + stmt = select(DeployedRangeModel).where(DeployedRangeModel.id == deployed_range_id) + result = await db.execute(stmt) + deployed_range = result.scalar_one_or_none() + + if not deployed_range: + msg = f"Deployed range {deployed_range_id} not found" + raise ValueError(msg) + + if deployed_range.owner_id != requesting_user_id: + msg = f"User {requesting_user_id} is not the owner of deployed range {deployed_range_id}" + raise ValueError(msg) + + permission = DeployedRangePermissionModel( + deployed_range_id=deployed_range_id, + user_id=user_id, + permission_type=permission_type, + ) + + db.add(permission) + + try: + await db.flush() + await db.refresh(permission) + logger.debug( + "Granted %s permission on deployed range %s to user %s", + permission_type, + deployed_range_id, + user_id, + ) + except SQLAlchemyError as e: + logger.exception( + "Failed to grant %s permission on deployed range %s to user %s: %s", + permission_type, + deployed_range_id, + user_id, + e, + ) + raise + + return permission + + +async def revoke_blueprint_permission( + db: AsyncSession, + blueprint_range_id: int, + user_id: int, + permission_type: BlueprintPermissionType, + requesting_user_id: int, +) -> bool: + """Revoke permission from a blueprint range. + + Args: + ---- + db: Database session + blueprint_range_id: ID of blueprint range to revoke permission from + user_id: ID of user to revoke permission from + permission_type: Type of permission to revoke + requesting_user_id: ID of user requesting to revoke permission (must be owner) + + Returns: + ------- + bool: True if permission was revoked, False if not found + + Raises: + ------ + SQLAlchemyError: If database operation fails + ValueError: If requesting user is not the owner + + """ + # Check ownership + stmt = select(BlueprintRangeModel).where( + BlueprintRangeModel.id == blueprint_range_id + ) + result = await db.execute(stmt) + blueprint_range = result.scalar_one_or_none() + + if not blueprint_range: + msg = f"Blueprint range {blueprint_range_id} not found" + raise ValueError(msg) + + if blueprint_range.owner_id != requesting_user_id: + msg = f"User {requesting_user_id} is not the owner of blueprint range {blueprint_range_id}" + raise ValueError(msg) + + permission_stmt = select(BlueprintRangePermissionModel).where( + BlueprintRangePermissionModel.blueprint_range_id == blueprint_range_id, + BlueprintRangePermissionModel.user_id == user_id, + BlueprintRangePermissionModel.permission_type == permission_type, + ) + result = await db.execute(permission_stmt) + permission = result.scalar_one_or_none() + + if not permission: + logger.warning( + "Permission %s on blueprint range %s for user %s not found", + permission_type, + blueprint_range_id, + user_id, + ) + return False + + try: + await db.delete(permission) + await db.flush() + logger.debug( + "Revoked %s permission on blueprint range %s from user %s", + permission_type, + blueprint_range_id, + user_id, + ) + except SQLAlchemyError as e: + logger.exception( + "Failed to revoke %s permission on blueprint range %s from user %s: %s", + permission_type, + blueprint_range_id, + user_id, + e, + ) + raise + + return True + + +async def revoke_deployed_permission( + db: AsyncSession, + deployed_range_id: int, + user_id: int, + permission_type: DeployedRangePermissionType, + requesting_user_id: int, +) -> bool: + """Revoke permission from a deployed range. + + Args: + ---- + db: Database session + deployed_range_id: ID of deployed range to revoke permission from + user_id: ID of user to revoke permission from + permission_type: Type of permission to revoke + requesting_user_id: ID of user requesting to revoke permission (must be owner) + + Returns: + ------- + bool: True if permission was revoked, False if not found + + Raises: + ------ + SQLAlchemyError: If database operation fails + ValueError: If requesting user is not the owner + + """ + stmt = select(DeployedRangeModel).where(DeployedRangeModel.id == deployed_range_id) + result = await db.execute(stmt) + deployed_range = result.scalar_one_or_none() + + if not deployed_range: + msg = f"Deployed range {deployed_range_id} not found" + raise ValueError(msg) + + if deployed_range.owner_id != requesting_user_id: + msg = f"User {requesting_user_id} is not the owner of deployed range {deployed_range_id}" + raise ValueError(msg) + + permission_stmt = select(DeployedRangePermissionModel).where( + DeployedRangePermissionModel.deployed_range_id == deployed_range_id, + DeployedRangePermissionModel.user_id == user_id, + DeployedRangePermissionModel.permission_type == permission_type, + ) + result = await db.execute(permission_stmt) + permission = result.scalar_one_or_none() + + if not permission: + logger.warning( + "Permission %s on deployed range %s for user %s not found", + permission_type, + deployed_range_id, + user_id, + ) + return False + + try: + await db.delete(permission) + await db.flush() + logger.debug( + "Revoked %s permission on deployed range %s from user %s", + permission_type, + deployed_range_id, + user_id, + ) + except SQLAlchemyError as e: + logger.exception( + "Failed to revoke %s permission on deployed range %s from user %s: %s", + permission_type, + deployed_range_id, + user_id, + e, + ) + raise + + return True diff --git a/api/src/app/crud/crud_ranges.py b/api/src/app/crud/crud_ranges.py index 7adb1638..e95efb09 100644 --- a/api/src/app/crud/crud_ranges.py +++ b/api/src/app/crud/crud_ranges.py @@ -5,8 +5,13 @@ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload +from ..models.permission_models import ( + BlueprintRangePermissionModel, + DeployedRangePermissionModel, +) from ..models.range_models import BlueprintRangeModel, DeployedRangeModel from ..models.subnet_models import BlueprintSubnetModel, DeployedSubnetModel +from ..models.user_model import UserModel from ..models.vpc_models import BlueprintVPCModel, DeployedVPCModel from ..schemas.range_schemas import ( BlueprintRangeCreateSchema, @@ -22,6 +27,72 @@ logger = logging.getLogger(__name__) +def can_read_blueprint( + range_model: BlueprintRangeModel, user_id: int, user: UserModel | None = None +) -> bool: + """Check if user can read a blueprint range.""" + if range_model.owner_id == user_id: + return True + if user and hasattr(user, "is_admin") and user.is_admin: + return True + return any( + p.user_id == user_id and p.permission_type in ("read", "write") + for p in (range_model.permissions or []) + ) + + +def can_write_blueprint( + range_model: BlueprintRangeModel, user_id: int, user: UserModel | None = None +) -> bool: + """Check if user can write a blueprint range.""" + if range_model.owner_id == user_id: + return True + if user and hasattr(user, "is_admin") and user.is_admin: + return True + return any( + p.user_id == user_id and p.permission_type == "write" + for p in (range_model.permissions or []) + ) + + +def can_read_deployed( + range_model: DeployedRangeModel, user_id: int, user: UserModel | None = None +) -> bool: + """Check if user can read a deployed range.""" + if range_model.owner_id == user_id: + return True + if user and hasattr(user, "is_admin") and user.is_admin: + return True + return any( + p.user_id == user_id and p.permission_type in ("read", "write", "execute") + for p in (range_model.permissions or []) + ) + + +def can_write_deployed( + range_model: DeployedRangeModel, user_id: int, user: UserModel | None = None +) -> bool: + """Check if user can write a deployed range.""" + if range_model.owner_id == user_id: + return True + return any( + p.user_id == user_id and p.permission_type == "write" + for p in (range_model.permissions or []) + ) + + +def can_execute_deployed( + range_model: DeployedRangeModel, user_id: int, user: UserModel | None = None +) -> bool: + """Check if user can execute a deployed range.""" + if range_model.owner_id == user_id: + return True + return any( + p.user_id == user_id and p.permission_type == "execute" + for p in (range_model.permissions or []) + ) + + # ==================== Blueprints ===================== @@ -53,7 +124,25 @@ async def get_blueprint_range_headers( ).select_from(BlueprintRangeModel) if not is_admin: - stmt = stmt.filter(BlueprintRangeModel.owner_id == user_id) + stmt = ( + stmt.outerjoin( + BlueprintRangePermissionModel, + BlueprintRangeModel.id + == BlueprintRangePermissionModel.blueprint_range_id, + ) + .filter( + (BlueprintRangeModel.owner_id == user_id) + | ( + (BlueprintRangePermissionModel.user_id == user_id) + & ( + BlueprintRangePermissionModel.permission_type.in_( + ["read", "write"] + ) + ) + ) + ) + .distinct() + ) result = await db.execute(stmt) @@ -91,6 +180,7 @@ async def get_blueprint_range( selectinload(BlueprintRangeModel.vpcs) .selectinload(BlueprintVPCModel.subnets) .selectinload(BlueprintSubnetModel.hosts), + selectinload(BlueprintRangeModel.permissions), ] range_model = await db.get(BlueprintRangeModel, range_id, options=options) @@ -100,7 +190,7 @@ async def get_blueprint_range( ) return None - if is_admin or range_model.owner_id == user_id: + if is_admin or can_read_blueprint(range_model, user_id): logger.debug("Fetched range blueprint: %s for user %s.", range_id, user_id) return BlueprintRangeSchema.model_validate(range_model) @@ -131,7 +221,8 @@ def build_blueprint_range_models( range_models: list[BlueprintRangeModel] = [] for range_schema in ranges: range_model = BlueprintRangeModel( - **range_schema.model_dump(exclude={"vpcs"}), owner_id=user_id + **range_schema.model_dump(exclude={"vpcs", "readers", "writers"}), + owner_id=user_id, ) range_model.vpcs = build_blueprint_vpc_models(range_schema.vpcs, user_id) range_models.append(range_model) @@ -226,7 +317,11 @@ async def delete_blueprint_range( Optional[BlueprintRangeHeaderSchema]: Range header data if it exists in database and was successfully deleted. """ - range_model = await db.get(BlueprintRangeModel, range_id) + range_model = await db.get( + BlueprintRangeModel, + range_id, + options=[selectinload(BlueprintRangeModel.permissions)], + ) if not range_model: logger.warning( "Range blueprint: %s not found for deletion as user: %s. Does user have permissions?", @@ -242,7 +337,7 @@ async def delete_blueprint_range( ) return None - if not is_admin and range_model.owner_id != user_id: + if not is_admin and not can_write_blueprint(range_model, user_id): logger.warning( "User: %s is not authorized to delete range blueprint: %s.", user_id, @@ -310,7 +405,24 @@ async def get_deployed_range_headers( ).select_from(DeployedRangeModel) if not is_admin: - stmt = stmt.filter(DeployedRangeModel.owner_id == user_id) + stmt = ( + stmt.outerjoin( + DeployedRangePermissionModel, + DeployedRangeModel.id == DeployedRangePermissionModel.deployed_range_id, + ) + .filter( + (DeployedRangeModel.owner_id == user_id) + | ( + (DeployedRangePermissionModel.user_id == user_id) + & ( + DeployedRangePermissionModel.permission_type.in_( + ["read", "write", "execute"] + ) + ) + ) + ) + .distinct() + ) result = await db.execute(stmt) @@ -347,7 +459,8 @@ async def get_deployed_range( options = [ selectinload(DeployedRangeModel.vpcs) .selectinload(DeployedVPCModel.subnets) - .selectinload(DeployedSubnetModel.hosts) + .selectinload(DeployedSubnetModel.hosts), + selectinload(DeployedRangeModel.permissions), ] range_model = await db.get(DeployedRangeModel, range_id, options=options) @@ -357,7 +470,7 @@ async def get_deployed_range( ) return None - if is_admin or range_model.owner_id == user_id: + if is_admin or can_read_deployed(range_model, user_id): logger.debug("Fetched deployed range: %s for user %s.", range_id, user_id) return DeployedRangeSchema.model_validate(range_model) @@ -389,16 +502,15 @@ async def get_deployed_range_key( Optional[DeployedRangeKeySchema]: Deployed range key data if the range exists and the user is authorized, otherwise None. """ - stmt = select(DeployedRangeModel.range_private_key).where( - DeployedRangeModel.id == range_id + range_model = await db.get( + DeployedRangeModel, + range_id, + options=[selectinload(DeployedRangeModel.permissions)], ) - if not is_admin: - stmt = stmt.where(DeployedRangeModel.owner_id == user_id) - - range_private_key = await db.scalar(stmt) - - if range_private_key is None: + if not range_model or ( + not is_admin and not can_execute_deployed(range_model, user_id) + ): logger.warning( "Failed to fetch deployed range key for range: %s for user %s. Range not found or unauthorized.", range_id, @@ -406,6 +518,11 @@ async def get_deployed_range_key( ) return None + stmt = select(DeployedRangeModel.range_private_key).where( + DeployedRangeModel.id == range_id + ) + range_private_key = await db.scalar(stmt) + logger.debug( "Fetched deployed range key for range: %s for user %s.", range_id, user_id ) @@ -433,7 +550,10 @@ def build_deployed_range_models( range_models: list[DeployedRangeModel] = [] for range_schema in ranges: range_model = DeployedRangeModel( - **range_schema.model_dump(exclude={"vpcs"}), owner_id=user_id + **range_schema.model_dump( + exclude={"vpcs", "readers", "writers", "executors"} + ), + owner_id=user_id, ) range_model.vpcs = build_deployed_vpc_models(range_schema.vpcs, user_id) range_models.append(range_model) @@ -490,6 +610,7 @@ async def create_deployed_range( range_model.id, user_id, ) + except SQLAlchemyError as e: logger.exception( "Database error while flushing deployed range to database session for user: %s. Exception: %s.", @@ -528,7 +649,11 @@ async def delete_deployed_range( Optional[DeployedRangeHeaderSchema]: Deployed range header data if it exists in database and was successfully deleted. """ - range_model = await db.get(DeployedRangeModel, range_id) + range_model = await db.get( + DeployedRangeModel, + range_id, + options=[selectinload(DeployedRangeModel.permissions)], + ) if not range_model: logger.warning( "Deployed range: %s not found for deletion as user: %s. Does user have permissions?", @@ -537,7 +662,7 @@ async def delete_deployed_range( ) return None - if not is_admin and range_model.owner_id != user_id: + if not is_admin and not can_write_deployed(range_model, user_id): logger.warning( "User: %s is not authorized to delete deployed range: %s.", user_id, diff --git a/api/src/app/enums/permissions.py b/api/src/app/enums/permissions.py new file mode 100644 index 00000000..bb151b3a --- /dev/null +++ b/api/src/app/enums/permissions.py @@ -0,0 +1,48 @@ +from enum import Enum + + +class BlueprintPermissionType(str, Enum): + """Permission types for blueprint ranges. + + READ: Allows user to view the blueprint range configuration and details. + Users can see the blueprint in lists and access its properties. + + WRITE: Allows user to modify the blueprint range configuration and deploy it. + Users can edit the blueprint, update its settings, and create deployed + ranges from it. Write permission includes read permission. + """ + + READ = "read" + WRITE = "write" + + @classmethod + def values(cls) -> str: + """Return formatted tuple string of all permission values.""" + return f"({', '.join(repr(t.value) for t in cls)})" + + +class DeployedRangePermissionType(str, Enum): + """Permission types for deployed ranges. + + READ: Allows user to view the deployed range status and configuration. + Users can see the range in lists, check its state, and view properties + but cannot interact with or modify it. + + WRITE: Allows user to modify deployed range settings and manage its lifecycle. + Users can start/stop the range, update configurations, and delete the + deployed range. Write permission includes read permission. + + EXECUTE: Allows user to interact with the live deployed range infrastructure. + Users can access SSH keys, connect to VMs, and interact with the + running environment. Execute permission includes read permission but + not write permission. + """ + + READ = "read" + WRITE = "write" + EXECUTE = "execute" + + @classmethod + def values(cls) -> str: + """Return formatted tuple string of all permission values.""" + return f"({', '.join(repr(t.value) for t in cls)})" diff --git a/api/src/app/models/__init__.py b/api/src/app/models/__init__.py index 29103aa2..b640a7de 100644 --- a/api/src/app/models/__init__.py +++ b/api/src/app/models/__init__.py @@ -2,6 +2,10 @@ from .host_models import BlueprintHostModel, DeployedHostModel from .mixin_models import OpenLabsUserMixin, OwnableObjectMixin +from .permission_models import ( + BlueprintRangePermissionModel, + DeployedRangePermissionModel, +) from .range_models import BlueprintRangeModel, DeployedRangeModel from .secret_model import SecretModel from .subnet_models import BlueprintSubnetModel, DeployedSubnetModel @@ -11,10 +15,12 @@ __all__ = [ "BlueprintHostModel", "BlueprintRangeModel", + "BlueprintRangePermissionModel", "BlueprintSubnetModel", "BlueprintVPCModel", "DeployedHostModel", "DeployedRangeModel", + "DeployedRangePermissionModel", "DeployedSubnetModel", "DeployedVPCModel", "OpenLabsUserMixin", diff --git a/api/src/app/models/permission_models.py b/api/src/app/models/permission_models.py new file mode 100644 index 00000000..48ac3b2b --- /dev/null +++ b/api/src/app/models/permission_models.py @@ -0,0 +1,90 @@ +from sqlalchemy import BigInteger, CheckConstraint, ForeignKey, String, UniqueConstraint +from sqlalchemy.orm import ( + Mapped, + MappedAsDataclass, + declared_attr, + mapped_column, + relationship, +) + +from ..core.db.database import Base +from ..enums.permissions import BlueprintPermissionType, DeployedRangePermissionType + + +class PermissionMixin(MappedAsDataclass): + """Mixin class for common permission fields.""" + + id: Mapped[int] = mapped_column( + BigInteger, + primary_key=True, + init=False, + comment="Primary key (BIGSERIAL)", + ) + + user_id: Mapped[int] = mapped_column( + BigInteger, + ForeignKey("users.id", ondelete="CASCADE"), + nullable=False, + ) + + permission_type: Mapped[str] = mapped_column( + String(10), + nullable=False, + ) + + @declared_attr # type: ignore[arg-type] + def user(self) -> object: + """User relationship.""" + return relationship("UserModel", init=False) + + +class BlueprintRangePermissionModel(Base, PermissionMixin): + """Model for blueprint range permission grants to users.""" + + __tablename__ = "blueprint_range_permissions" + + blueprint_range_id: Mapped[int] = mapped_column( + BigInteger, + ForeignKey("blueprint_ranges.id", ondelete="CASCADE"), + nullable=False, + ) + + blueprint_range = relationship("BlueprintRangeModel", back_populates="permissions") + __table_args__ = ( + UniqueConstraint( + "blueprint_range_id", + "user_id", + "permission_type", + name="uq_blueprint_range_permissions", + ), + CheckConstraint( + f"permission_type IN {BlueprintPermissionType.values()}", + name="ck_blueprint_range_permission_type", + ), + ) + + +class DeployedRangePermissionModel(Base, PermissionMixin): + """Model for deployed range permission grants to users.""" + + __tablename__ = "deployed_range_permissions" + + deployed_range_id: Mapped[int] = mapped_column( + BigInteger, + ForeignKey("deployed_ranges.id", ondelete="CASCADE"), + nullable=False, + ) + + deployed_range = relationship("DeployedRangeModel", back_populates="permissions") + __table_args__ = ( + UniqueConstraint( + "deployed_range_id", + "user_id", + "permission_type", + name="uq_deployed_range_permissions", + ), + CheckConstraint( + f"permission_type IN {DeployedRangePermissionType.values()}", + name="ck_deployed_range_permission_type", + ), + ) diff --git a/api/src/app/models/range_models.py b/api/src/app/models/range_models.py index dd44127e..0397395b 100644 --- a/api/src/app/models/range_models.py +++ b/api/src/app/models/range_models.py @@ -41,6 +41,13 @@ class BlueprintRangeModel(Base, OwnableObjectMixin, RangeMixin): passive_deletes=True, ) + permissions = relationship( + "BlueprintRangePermissionModel", + back_populates="blueprint_range", + cascade="all, delete-orphan", + passive_deletes=True, + ) + def is_standalone(self) -> bool: """Return whether blueprint range model is standalone. @@ -82,3 +89,10 @@ class DeployedRangeModel(Base, OwnableObjectMixin, RangeMixin): cascade="all, delete-orphan", passive_deletes=True, ) + + permissions = relationship( + "DeployedRangePermissionModel", + back_populates="deployed_range", + cascade="all, delete-orphan", + passive_deletes=True, + ) diff --git a/api/src/app/schemas/range_schemas.py b/api/src/app/schemas/range_schemas.py index c6172072..c1703ff7 100644 --- a/api/src/app/schemas/range_schemas.py +++ b/api/src/app/schemas/range_schemas.py @@ -2,8 +2,16 @@ from ipaddress import IPv4Address from typing import Any -from pydantic import BaseModel, ConfigDict, Field, ValidationInfo, field_validator +from pydantic import ( + BaseModel, + ConfigDict, + Field, + ValidationInfo, + computed_field, + field_validator, +) +from ..enums.permissions import BlueprintPermissionType, DeployedRangePermissionType from ..enums.providers import OpenLabsProvider from ..enums.range_states import RangeState from ..enums.regions import OpenLabsRegion @@ -45,7 +53,29 @@ class BlueprintRangeBaseSchema(RangeCommonSchema): examples=["This is my test range."], ) - pass + @computed_field + def readers(self) -> list[int]: + """Get list of user IDs with read access.""" + permissions = getattr(self, "permissions", None) + if not permissions: + return [] + return [ + perm.user_id + for perm in permissions + if perm.permission_type == BlueprintPermissionType.READ.value + ] + + @computed_field + def writers(self) -> list[int]: + """Get list of user IDs with write access.""" + permissions = getattr(self, "permissions", None) + if not permissions: + return [] + return [ + perm.user_id + for perm in permissions + if perm.permission_type == BlueprintPermissionType.WRITE.value + ] class BlueprintRangeCreateSchema(BlueprintRangeBaseSchema): @@ -151,6 +181,42 @@ class DeployedRangeBaseSchema(RangeCommonSchema): description="SSH private key for the range.", ) + @computed_field + def readers(self) -> list[int]: + """Get list of user IDs with read access.""" + permissions = getattr(self, "permissions", None) + if not permissions: + return [] + return [ + perm.user_id + for perm in permissions + if perm.permission_type == DeployedRangePermissionType.READ.value + ] + + @computed_field + def writers(self) -> list[int]: + """Get list of user IDs with write access.""" + permissions = getattr(self, "permissions", None) + if not permissions: + return [] + return [ + perm.user_id + for perm in permissions + if perm.permission_type == DeployedRangePermissionType.WRITE.value + ] + + @computed_field + def executors(self) -> list[int]: + """Get list of user IDs with execute access.""" + permissions = getattr(self, "permissions", None) + if not permissions: + return [] + return [ + perm.user_id + for perm in permissions + if perm.permission_type == DeployedRangePermissionType.EXECUTE.value + ] + class DeployedRangeCreateSchema(DeployedRangeBaseSchema): """Schema to create deployed range object.""" diff --git a/api/tests/common/api/v1/test_blueprint_ranges.py b/api/tests/common/api/v1/test_blueprint_ranges.py index 5c5df9e6..77a63fe5 100644 --- a/api/tests/common/api/v1/test_blueprint_ranges.py +++ b/api/tests/common/api/v1/test_blueprint_ranges.py @@ -245,7 +245,11 @@ async def test_blueprint_range_get_range( # Check response data recieved_blueprint = response.json() remove_key_recursively(recieved_blueprint, "id") # Our payload doesn't have IDs - assert recieved_blueprint == valid_blueprint_range_create_payload + # Add expected computed permission fields for comparison + expected_payload = copy.deepcopy(valid_blueprint_range_create_payload) + expected_payload["readers"] = [] + expected_payload["writers"] = [] + assert recieved_blueprint == expected_payload async def test_blueprint_range_get_nonexistent_range( self, @@ -529,7 +533,11 @@ async def test_blueprint_range_add_remove_user_flow( remove_key_recursively( recieved_range, "id" ) # Our creation payload doesn't have IDs - assert recieved_range == valid_blueprint_range_create_payload + # Add expected computed permission fields for comparison + expected_payload = copy.deepcopy(valid_blueprint_range_create_payload) + expected_payload["readers"] = [] + expected_payload["writers"] = [] + assert recieved_range == expected_payload # Attempt to list out all of user's blueprints response = await auth_api_client.get(f"{BASE_ROUTE}/blueprints/ranges") @@ -593,6 +601,9 @@ async def test_blueprint_range_get_non_empty_list( # Mimic the header response with the valid JSON valid_blueprint_copy = copy.deepcopy(valid_blueprint_range_create_payload) del valid_blueprint_copy["vpcs"] + # Add expected computed permission fields + valid_blueprint_copy["readers"] = [] + valid_blueprint_copy["writers"] = [] remove_key_recursively( recieved_range, "id" ) # Our creation payload doesn't have IDs diff --git a/api/tests/common/api/v1/test_yaml_middleware.py b/api/tests/common/api/v1/test_yaml_middleware.py index 6072c7d3..3801c059 100644 --- a/api/tests/common/api/v1/test_yaml_middleware.py +++ b/api/tests/common/api/v1/test_yaml_middleware.py @@ -1,3 +1,5 @@ +import copy + import pytest import yaml from fastapi import status @@ -47,7 +49,11 @@ async def test_blueprint_range_yaml_to_json( # Validate response recieved_range = response.json() remove_key_recursively(recieved_range, "id") - assert recieved_range == valid_blueprint_range_create_payload + # Add expected computed permission fields for comparison + expected_payload = copy.deepcopy(valid_blueprint_range_create_payload) + expected_payload["readers"] = [] + expected_payload["writers"] = [] + assert recieved_range == expected_payload async def test_blueprint_vpc_yaml_to_json( self, auth_api_client: AsyncClient diff --git a/api/tests/conftest.py b/api/tests/conftest.py index 4601f9c9..c2216a97 100644 --- a/api/tests/conftest.py +++ b/api/tests/conftest.py @@ -25,6 +25,8 @@ from testcontainers.compose import DockerCompose from testcontainers.postgres import PostgresContainer +# Import all models to ensure they're registered with SQLAlchemy metadata +import src.app.models # noqa: F401 from src.app.core.config import settings from src.app.core.db.database import Base, async_get_db from src.app.enums.job_status import OpenLabsJobStatus diff --git a/api/tests/unit/crud/crud_mocks.py b/api/tests/unit/crud/crud_mocks.py index de020629..ce24cac6 100644 --- a/api/tests/unit/crud/crud_mocks.py +++ b/api/tests/unit/crud/crud_mocks.py @@ -39,13 +39,21 @@ def is_standalone(self) -> bool: class DummyBlueprintRange(Mock): """Dummy blueprint range model for testing.""" - pass + def __init__(self, *args: Any, **kwargs: dict[str, Any]) -> None: # noqa: ANN401 + """Initialize dummy blueprint range.""" + super().__init__(*args, **kwargs) + self.permissions: list[Any] = [] + self.owner_id = 1 class DummyDeployedRange(Mock): """Dummy deployed range model for testing.""" - pass + def __init__(self, *args: Any, **kwargs: dict[str, Any]) -> None: # noqa: ANN401 + """Initialize dummy deployed range.""" + super().__init__(*args, **kwargs) + self.permissions: list[Any] = [] + self.owner_id = 1 class DummyJob(Mock): diff --git a/api/tests/unit/crud/test_crud_permissions.py b/api/tests/unit/crud/test_crud_permissions.py new file mode 100644 index 00000000..61f608f0 --- /dev/null +++ b/api/tests/unit/crud/test_crud_permissions.py @@ -0,0 +1,352 @@ +import logging +from unittest.mock import Mock + +import pytest +from sqlalchemy.exc import SQLAlchemyError + +from src.app.crud.crud_permissions import ( + grant_blueprint_permission, + grant_deployed_permission, + revoke_blueprint_permission, + revoke_deployed_permission, +) +from src.app.enums.permissions import ( + BlueprintPermissionType, + DeployedRangePermissionType, +) +from src.app.models.permission_models import ( + BlueprintRangePermissionModel, + DeployedRangePermissionModel, +) + +from .crud_mocks import DummyDB + +# ==================== Grant Blueprint Permissions ===================== + + +@pytest.mark.parametrize( + "permission_type,user_id", + [ + (BlueprintPermissionType.READ, 2), + (BlueprintPermissionType.WRITE, 3), + ], +) +async def test_grant_blueprint_permission( + permission_type: BlueprintPermissionType, + user_id: int, + caplog: pytest.LogCaptureFixture, +) -> None: + """Test granting permissions to blueprint ranges.""" + dummy_db = DummyDB() + + mock_range = Mock() + mock_range.owner_id = 1 + mock_result = Mock() + mock_result.scalar_one_or_none.return_value = mock_range + dummy_db.execute.return_value = mock_result + + range_id = 1 + requesting_user_id = 1 # Owner + + with caplog.at_level(logging.DEBUG): + result = await grant_blueprint_permission( + dummy_db, range_id, user_id, permission_type, requesting_user_id + ) + + dummy_db.add.assert_called_once() + added_permission = dummy_db.add.call_args[0][0] + assert isinstance(added_permission, BlueprintRangePermissionModel) + assert added_permission.user_id == user_id + assert added_permission.permission_type == permission_type + assert added_permission.blueprint_range_id == range_id + + dummy_db.flush.assert_called_once() + dummy_db.refresh.assert_called_once_with(added_permission) + assert result == added_permission + + +async def test_grant_blueprint_permission_db_error( + caplog: pytest.LogCaptureFixture, +) -> None: + """Test that grant_blueprint_permission handles database errors.""" + dummy_db = DummyDB() + + mock_range = Mock() + mock_range.owner_id = 1 + mock_result = Mock() + mock_result.scalar_one_or_none.return_value = mock_range + dummy_db.execute.return_value = mock_result + + # Force a db exception + test_except_msg = "Fake DB error!" + dummy_db.flush.side_effect = SQLAlchemyError(test_except_msg) + + requesting_user_id = 1 # Owner + + with pytest.raises(SQLAlchemyError, match=test_except_msg): + await grant_blueprint_permission( + dummy_db, 1, 2, BlueprintPermissionType.READ, requesting_user_id + ) + + assert any( + record.levelno == logging.ERROR + and record.exc_info is not None + and test_except_msg in record.message + for record in caplog.records + ) + + +# ==================== Grant Deployed Permissions ===================== + + +@pytest.mark.parametrize( + "permission_type,user_id", + [ + (DeployedRangePermissionType.READ, 2), + (DeployedRangePermissionType.WRITE, 3), + (DeployedRangePermissionType.EXECUTE, 4), + ], +) +async def test_grant_deployed_permission( + permission_type: DeployedRangePermissionType, + user_id: int, + caplog: pytest.LogCaptureFixture, +) -> None: + """Test granting permissions to deployed ranges.""" + dummy_db = DummyDB() + + mock_range = Mock() + mock_range.owner_id = 1 + mock_result = Mock() + mock_result.scalar_one_or_none.return_value = mock_range + dummy_db.execute.return_value = mock_result + + range_id = 1 + requesting_user_id = 1 # Owner + + with caplog.at_level(logging.DEBUG): + result = await grant_deployed_permission( + dummy_db, range_id, user_id, permission_type, requesting_user_id + ) + + dummy_db.add.assert_called_once() + added_permission = dummy_db.add.call_args[0][0] + assert isinstance(added_permission, DeployedRangePermissionModel) + assert added_permission.user_id == user_id + assert added_permission.permission_type == permission_type + assert added_permission.deployed_range_id == range_id + + dummy_db.flush.assert_called_once() + dummy_db.refresh.assert_called_once_with(added_permission) + assert result == added_permission + + +async def test_grant_deployed_permission_db_error( + caplog: pytest.LogCaptureFixture, +) -> None: + """Test that grant_deployed_permission handles database errors.""" + dummy_db = DummyDB() + + mock_range = Mock() + mock_range.owner_id = 1 + mock_result = Mock() + mock_result.scalar_one_or_none.return_value = mock_range + dummy_db.execute.return_value = mock_result + + # Force a db exception + test_except_msg = "Fake DB error!" + dummy_db.flush.side_effect = SQLAlchemyError(test_except_msg) + + requesting_user_id = 1 # Owner + + with pytest.raises(SQLAlchemyError, match=test_except_msg): + await grant_deployed_permission( + dummy_db, 1, 2, DeployedRangePermissionType.READ, requesting_user_id + ) + + assert any( + record.levelno == logging.ERROR + and record.exc_info is not None + and test_except_msg in record.message + for record in caplog.records + ) + + +# ==================== Revoke Blueprint Permissions ===================== + + +@pytest.mark.parametrize( + "permission_type,permission_exists", + [ + (BlueprintPermissionType.READ, True), + (BlueprintPermissionType.READ, False), + (BlueprintPermissionType.WRITE, True), + ], +) +async def test_revoke_blueprint_permission( + permission_type: BlueprintPermissionType, + permission_exists: bool, +) -> None: + """Test revoking permissions from blueprint ranges.""" + dummy_db = DummyDB() + + mock_range = Mock() + mock_range.owner_id = 1 + mock_ownership_result = Mock() + mock_ownership_result.scalar_one_or_none.return_value = mock_range + + mock_permission_result = Mock() + if permission_exists: + mock_permission = BlueprintRangePermissionModel( + blueprint_range_id=1, user_id=2, permission_type=permission_type + ) + mock_permission_result.scalar_one_or_none.return_value = mock_permission + else: + mock_permission_result.scalar_one_or_none.return_value = None + + dummy_db.execute.side_effect = [mock_ownership_result, mock_permission_result] + + requesting_user_id = 1 # Owner + + result = await revoke_blueprint_permission( + dummy_db, 1, 2, permission_type, requesting_user_id + ) + + if permission_exists: + dummy_db.delete.assert_called_once_with(mock_permission) + dummy_db.flush.assert_called_once() + assert result is True + else: + dummy_db.delete.assert_not_called() + dummy_db.flush.assert_not_called() + assert result is False + + +async def test_revoke_blueprint_permission_db_error( + caplog: pytest.LogCaptureFixture, +) -> None: + """Test that revoke_blueprint_permission handles database errors.""" + dummy_db = DummyDB() + + mock_range = Mock() + mock_range.owner_id = 1 + mock_ownership_result = Mock() + mock_ownership_result.scalar_one_or_none.return_value = mock_range + + mock_permission = BlueprintRangePermissionModel( + blueprint_range_id=1, user_id=2, permission_type=BlueprintPermissionType.READ + ) + + mock_permission_result = Mock() + mock_permission_result.scalar_one_or_none.return_value = mock_permission + + dummy_db.execute.side_effect = [mock_ownership_result, mock_permission_result] + + # Force a db exception + test_except_msg = "Fake DB error!" + dummy_db.flush.side_effect = SQLAlchemyError(test_except_msg) + + requesting_user_id = 1 # Owner + + with pytest.raises(SQLAlchemyError, match=test_except_msg): + await revoke_blueprint_permission( + dummy_db, 1, 2, BlueprintPermissionType.READ, requesting_user_id + ) + + assert any( + record.levelno == logging.ERROR + and record.exc_info is not None + and test_except_msg in record.message + for record in caplog.records + ) + + +# ==================== Revoke Deployed Permissions ===================== + + +@pytest.mark.parametrize( + "permission_type,permission_exists", + [ + (DeployedRangePermissionType.EXECUTE, True), + (DeployedRangePermissionType.WRITE, False), + (DeployedRangePermissionType.READ, True), + ], +) +async def test_revoke_deployed_permission( + permission_type: DeployedRangePermissionType, + permission_exists: bool, +) -> None: + """Test revoking permissions from deployed ranges.""" + dummy_db = DummyDB() + + mock_range = Mock() + mock_range.owner_id = 1 + mock_ownership_result = Mock() + mock_ownership_result.scalar_one_or_none.return_value = mock_range + + mock_permission_result = Mock() + if permission_exists: + mock_permission = DeployedRangePermissionModel( + deployed_range_id=1, user_id=2, permission_type=permission_type + ) + mock_permission_result.scalar_one_or_none.return_value = mock_permission + else: + mock_permission_result.scalar_one_or_none.return_value = None + + dummy_db.execute.side_effect = [mock_ownership_result, mock_permission_result] + + requesting_user_id = 1 # Owner + + result = await revoke_deployed_permission( + dummy_db, 1, 2, permission_type, requesting_user_id + ) + + if permission_exists: + dummy_db.delete.assert_called_once_with(mock_permission) + dummy_db.flush.assert_called_once() + assert result is True + else: + dummy_db.delete.assert_not_called() + dummy_db.flush.assert_not_called() + assert result is False + + +async def test_revoke_deployed_permission_db_error( + caplog: pytest.LogCaptureFixture, +) -> None: + """Test that revoke_deployed_permission handles database errors.""" + dummy_db = DummyDB() + + mock_range = Mock() + mock_range.owner_id = 1 + mock_ownership_result = Mock() + mock_ownership_result.scalar_one_or_none.return_value = mock_range + + mock_permission = DeployedRangePermissionModel( + deployed_range_id=1, + user_id=2, + permission_type=DeployedRangePermissionType.EXECUTE, + ) + + mock_permission_result = Mock() + mock_permission_result.scalar_one_or_none.return_value = mock_permission + + dummy_db.execute.side_effect = [mock_ownership_result, mock_permission_result] + + # Force a db exception + test_except_msg = "Fake DB error!" + dummy_db.flush.side_effect = SQLAlchemyError(test_except_msg) + + requesting_user_id = 1 # Owner + + with pytest.raises(SQLAlchemyError, match=test_except_msg): + await revoke_deployed_permission( + dummy_db, 1, 2, DeployedRangePermissionType.EXECUTE, requesting_user_id + ) + + assert any( + record.levelno == logging.ERROR + and record.exc_info is not None + and test_except_msg in record.message + for record in caplog.records + ) diff --git a/api/tests/unit/crud/test_crud_ranges.py b/api/tests/unit/crud/test_crud_ranges.py index 21d46451..e1f685eb 100644 --- a/api/tests/unit/crud/test_crud_ranges.py +++ b/api/tests/unit/crud/test_crud_ranges.py @@ -1,6 +1,6 @@ import logging import random -from unittest.mock import MagicMock +from unittest.mock import MagicMock, Mock import pytest from pytest_mock import MockerFixture @@ -499,35 +499,46 @@ async def test_get_non_existent_deployed_range() -> None: @pytest.mark.parametrize( - "is_admin, expect_owner_filter", + "is_admin, user_owns_range", [ - (False, True), - (True, False), + (False, True), # Regular user, owns range - should get key + (False, False), # Regular user, doesn't own range - should not get key + (True, False), # Admin user, doesn't own range - should get key ], ) -async def test_get_deployed_range_key_filters( +async def test_get_deployed_range_key_permissions( is_admin: bool, - expect_owner_filter: bool, + user_owns_range: bool, ) -> None: - """Test the deployed range key crud function filters results appropriately.""" + """Test the deployed range key crud function respects permissions.""" dummy_db = DummyDB() + dummy_range = DummyDeployedRange() + + range_id = 1 + user_id = 1 + + # Set ownership based on test parameters + if user_owns_range: + dummy_range.owner_id = user_id + else: + dummy_range.owner_id = user_id + 1 # Configure return of mock result + dummy_db.get.return_value = dummy_range dummy_db.scalar.return_value = "fake_private_key" - range_id = random.randint(1, 100) # noqa: S311 - user_id = 1 - await get_deployed_range_key( + result = await get_deployed_range_key( dummy_db, range_id=range_id, user_id=user_id, is_admin=is_admin ) - # Build filter clauses - range_id_clause = str(DeployedRangeModel.id == range_id) - ownership_clause = str(DeployedRangeModel.owner_id == user_id) - where_clause = str(dummy_db.scalar.call_args[0][0].whereclause) + # Check if we should expect a result + should_have_access = is_admin or user_owns_range - assert range_id_clause in where_clause # Always only pull key of specified range - assert (ownership_clause in where_clause) == expect_owner_filter + if should_have_access: + assert result is not None + assert result.range_private_key == "fake_private_key" + else: + assert result is None async def test_get_non_existent_deployed_range_key() -> None: @@ -752,3 +763,245 @@ async def test_delete_deployed_range_raises_generic_errors( and test_except_msg in record.message for record in caplog.records ) + + +# ==================== Permission Integration Tests ===================== + + +class MockPermission(Mock): + """Mock permission object for testing.""" + + def __init__( + self, user_id: int, permission_type: str, *args: object, **kwargs: object + ) -> None: + """Initialize mock permission.""" + super().__init__(*args, **kwargs) + self.user_id = user_id + self.permission_type = permission_type + + +async def test_get_blueprint_range_with_read_permission(mocker: MockerFixture) -> None: + """Test that users with read permission can access blueprint ranges.""" + dummy_db = DummyDB() + dummy_range = DummyBlueprintRange() + + user_id = 2 + dummy_range.owner_id = 1 # Different owner + dummy_range.permissions = [MockPermission(user_id, "read")] + + dummy_db.get.return_value = dummy_range + mock_model_validate = mocker.patch.object( + BlueprintRangeSchema, "model_validate", return_value=dummy_range + ) + + result = await get_blueprint_range( + dummy_db, range_id=1, user_id=user_id, is_admin=False + ) + + assert result is not None + mock_model_validate.assert_called_once() + + +async def test_get_blueprint_range_denied_no_permission(mocker: MockerFixture) -> None: + """Test that users without permission cannot access blueprint ranges.""" + dummy_db = DummyDB() + dummy_range = DummyBlueprintRange() + + user_id = 2 + dummy_range.owner_id = 1 # Different owner + dummy_range.permissions = [] # No permissions + + dummy_db.get.return_value = dummy_range + mock_model_validate = mocker.patch.object( + BlueprintRangeSchema, "model_validate", return_value=dummy_range + ) + + result = await get_blueprint_range( + dummy_db, range_id=1, user_id=user_id, is_admin=False + ) + + assert result is None + mock_model_validate.assert_not_called() + + +async def test_get_deployed_range_with_read_permission(mocker: MockerFixture) -> None: + """Test that users with read permission can access deployed ranges.""" + dummy_db = DummyDB() + dummy_range = DummyDeployedRange() + + user_id = 2 + dummy_range.owner_id = 1 # Different owner + dummy_range.permissions = [MockPermission(user_id, "read")] + + dummy_db.get.return_value = dummy_range + mock_model_validate = mocker.patch.object( + DeployedRangeSchema, "model_validate", return_value=dummy_range + ) + + result = await get_deployed_range( + dummy_db, range_id=1, user_id=user_id, is_admin=False + ) + + assert result is not None + mock_model_validate.assert_called_once() + + +async def test_get_deployed_range_denied_no_permission(mocker: MockerFixture) -> None: + """Test that users without permission cannot access deployed ranges.""" + dummy_db = DummyDB() + dummy_range = DummyDeployedRange() + + user_id = 2 + dummy_range.owner_id = 1 # Different owner + dummy_range.permissions = [] # No permissions + + dummy_db.get.return_value = dummy_range + mock_model_validate = mocker.patch.object( + DeployedRangeSchema, "model_validate", return_value=dummy_range + ) + + result = await get_deployed_range( + dummy_db, range_id=1, user_id=user_id, is_admin=False + ) + + assert result is None + mock_model_validate.assert_not_called() + + +async def test_delete_blueprint_range_with_write_permission( + mocker: MockerFixture, +) -> None: + """Test that users with write permission can delete blueprint ranges.""" + dummy_db = DummyDB() + dummy_range = DummyBlueprintRange() + + user_id = 2 + dummy_range.owner_id = 1 # Different owner + dummy_range.permissions = [MockPermission(user_id, "write")] + + dummy_db.get.return_value = dummy_range + mock_model_validate = mocker.patch.object( + BlueprintRangeHeaderSchema, "model_validate", return_value=dummy_range + ) + + result = await delete_blueprint_range( + dummy_db, range_id=1, user_id=user_id, is_admin=False + ) + + assert result is not None + dummy_db.delete.assert_called_once_with(dummy_range) + mock_model_validate.assert_called_once() + + +async def test_delete_blueprint_range_denied_no_permission( + mocker: MockerFixture, +) -> None: + """Test that users without permission cannot delete blueprint ranges.""" + dummy_db = DummyDB() + dummy_range = DummyBlueprintRange() + + user_id = 2 + dummy_range.owner_id = 1 # Different owner + dummy_range.permissions = [] # No permissions + + dummy_db.get.return_value = dummy_range + mock_model_validate = mocker.patch.object( + BlueprintRangeHeaderSchema, "model_validate", return_value=dummy_range + ) + + result = await delete_blueprint_range( + dummy_db, range_id=1, user_id=user_id, is_admin=False + ) + + assert result is None + dummy_db.delete.assert_not_called() + mock_model_validate.assert_not_called() + + +async def test_delete_deployed_range_with_write_permission( + mocker: MockerFixture, +) -> None: + """Test that users with write permission can delete deployed ranges.""" + dummy_db = DummyDB() + dummy_range = DummyDeployedRange() + + user_id = 2 + dummy_range.owner_id = 1 # Different owner + dummy_range.permissions = [MockPermission(user_id, "write")] + + dummy_db.get.return_value = dummy_range + mock_model_validate = mocker.patch.object( + DeployedRangeHeaderSchema, "model_validate", return_value=dummy_range + ) + + result = await delete_deployed_range( + dummy_db, range_id=1, user_id=user_id, is_admin=False + ) + + assert result is not None + dummy_db.delete.assert_called_once_with(dummy_range) + mock_model_validate.assert_called_once() + + +async def test_delete_deployed_range_denied_no_permission( + mocker: MockerFixture, +) -> None: + """Test that users without permission cannot delete deployed ranges.""" + dummy_db = DummyDB() + dummy_range = DummyDeployedRange() + + user_id = 2 + dummy_range.owner_id = 1 # Different owner + dummy_range.permissions = [] # No permissions + + dummy_db.get.return_value = dummy_range + mock_model_validate = mocker.patch.object( + DeployedRangeHeaderSchema, "model_validate", return_value=dummy_range + ) + + result = await delete_deployed_range( + dummy_db, range_id=1, user_id=user_id, is_admin=False + ) + + assert result is None + dummy_db.delete.assert_not_called() + mock_model_validate.assert_not_called() + + +async def test_get_deployed_range_key_with_execute_permission() -> None: + """Test that users with execute permission can access deployed range keys.""" + dummy_db = DummyDB() + dummy_range = DummyDeployedRange() + + user_id = 2 + dummy_range.owner_id = 1 # Different owner + dummy_range.permissions = [MockPermission(user_id, "execute")] + + dummy_db.get.return_value = dummy_range + dummy_db.scalar.return_value = "fake_private_key" + + result = await get_deployed_range_key( + dummy_db, range_id=1, user_id=user_id, is_admin=False + ) + + assert result is not None + assert result.range_private_key == "fake_private_key" + + +async def test_get_deployed_range_key_denied_no_permission() -> None: + """Test that users without execute permission cannot access deployed range keys.""" + dummy_db = DummyDB() + dummy_range = DummyDeployedRange() + + user_id = 2 + dummy_range.owner_id = 1 # Different owner + dummy_range.permissions = [] # No permissions + + dummy_db.get.return_value = dummy_range + + result = await get_deployed_range_key( + dummy_db, range_id=1, user_id=user_id, is_admin=False + ) + + assert result is None + dummy_db.scalar.assert_not_called() # Should not even try to get the key diff --git a/api/tests/unit/crud/test_permissions.py b/api/tests/unit/crud/test_permissions.py new file mode 100644 index 00000000..cdeeee70 --- /dev/null +++ b/api/tests/unit/crud/test_permissions.py @@ -0,0 +1,211 @@ +"""Test permission system behavior including admin boundaries and security isolation.""" + +from unittest.mock import Mock + +import pytest + +from src.app.crud.crud_ranges import ( + can_execute_deployed, + can_read_blueprint, + can_read_deployed, + can_write_blueprint, + can_write_deployed, +) +from src.app.enums.permissions import ( + BlueprintPermissionType, + DeployedRangePermissionType, +) + + +class TestAdminPermissions: + """Test admin permission behavior per end-to-end encryption model.""" + + def test_admin_has_full_blueprint_access(self) -> None: + """Admins can read and write any blueprint (not encrypted).""" + range_model = Mock() + range_model.owner_id = 1 + range_model.permissions = [] + + admin_user = Mock() + admin_user.id = 2 + admin_user.is_admin = True + + assert can_read_blueprint(range_model, admin_user.id, admin_user) is True + assert can_write_blueprint(range_model, admin_user.id, admin_user) is True + + def test_admin_has_read_only_deployed_access(self) -> None: + """Admins can only read deployed ranges (due to encryption).""" + range_model = Mock() + range_model.owner_id = 1 + range_model.permissions = [] + + admin_user = Mock() + admin_user.id = 2 + admin_user.is_admin = True + + assert can_read_deployed(range_model, admin_user.id, admin_user) is True + assert can_write_deployed(range_model, admin_user.id, admin_user) is False + assert can_execute_deployed(range_model, admin_user.id, admin_user) is False + + def test_encryption_boundary_prevents_admin_deployed_modifications(self) -> None: + """E2E encryption prevents admin write/execute on deployed ranges.""" + range_model = Mock() + range_model.owner_id = 1 + range_model.permissions = [] + + admin_user = Mock() + admin_user.id = 2 + admin_user.is_admin = True + + # Admin cannot modify or execute deployed ranges due to encryption + assert can_write_deployed(range_model, admin_user.id, admin_user) is False + assert can_execute_deployed(range_model, admin_user.id, admin_user) is False + # But can still read metadata + assert can_read_deployed(range_model, admin_user.id, admin_user) is True + + def test_owner_always_has_full_access_regardless_of_admin_status(self) -> None: + """Owners always have full access regardless of admin status.""" + range_model = Mock() + range_model.owner_id = 1 + range_model.permissions = [] + + # Test owner who is also admin + admin_owner = Mock() + admin_owner.id = 1 + admin_owner.is_admin = True + + assert can_read_blueprint(range_model, admin_owner.id, admin_owner) is True + assert can_write_blueprint(range_model, admin_owner.id, admin_owner) is True + assert can_read_deployed(range_model, admin_owner.id, admin_owner) is True + assert can_write_deployed(range_model, admin_owner.id, admin_owner) is True + assert can_execute_deployed(range_model, admin_owner.id, admin_owner) is True + + +class TestPermissionIsolation: + """Test that permissions are properly isolated and don't grant unintended access.""" + + @pytest.mark.parametrize( + "permission_type,expected_read,expected_write", + [ + (BlueprintPermissionType.READ.value, True, False), + (BlueprintPermissionType.WRITE.value, True, True), + ], + ) + def test_blueprint_permission_boundaries( + self, permission_type: str, expected_read: bool, expected_write: bool + ) -> None: + """Blueprint permissions grant appropriate access levels.""" + range_model = Mock() + range_model.owner_id = 1 + range_model.permissions = [Mock(user_id=2, permission_type=permission_type)] + + assert can_read_blueprint(range_model, 2) is expected_read + assert can_write_blueprint(range_model, 2) is expected_write + + @pytest.mark.parametrize( + "permission_type,expected_read,expected_write,expected_execute", + [ + (DeployedRangePermissionType.READ.value, True, False, False), + (DeployedRangePermissionType.WRITE.value, True, True, False), + (DeployedRangePermissionType.EXECUTE.value, True, False, True), + ], + ) + def test_deployed_permission_boundaries( + self, + permission_type: str, + expected_read: bool, + expected_write: bool, + expected_execute: bool, + ) -> None: + """Deployed range permissions grant appropriate access levels.""" + range_model = Mock() + range_model.owner_id = 1 + range_model.permissions = [Mock(user_id=2, permission_type=permission_type)] + + assert can_read_deployed(range_model, 2) is expected_read + assert can_write_deployed(range_model, 2) is expected_write + assert can_execute_deployed(range_model, 2) is expected_execute + + def test_no_permission_denies_all_access(self) -> None: + """Users without permissions cannot access ranges.""" + blueprint_range = Mock() + blueprint_range.owner_id = 1 + blueprint_range.permissions = [] + + deployed_range = Mock() + deployed_range.owner_id = 1 + deployed_range.permissions = [] + + user_id = 2 # Not owner, no permissions + + # Blueprint access denied + assert can_read_blueprint(blueprint_range, user_id) is False + assert can_write_blueprint(blueprint_range, user_id) is False + + # Deployed range access denied + assert can_read_deployed(deployed_range, user_id) is False + assert can_write_deployed(deployed_range, user_id) is False + assert can_execute_deployed(deployed_range, user_id) is False + + def test_wrong_user_permission_denies_access(self) -> None: + """Permissions granted to other users don't grant access.""" + range_model = Mock() + range_model.owner_id = 1 + range_model.permissions = [ + Mock(user_id=3, permission_type=BlueprintPermissionType.WRITE.value) + ] + + user_id = 2 # Different user than permission grant + + assert can_read_blueprint(range_model, user_id) is False + assert can_write_blueprint(range_model, user_id) is False + + def test_multiple_permissions_work_correctly(self) -> None: + """Multiple permission grants work independently.""" + range_model = Mock() + range_model.owner_id = 1 + range_model.permissions = [ + Mock(user_id=2, permission_type=DeployedRangePermissionType.READ.value), + Mock(user_id=3, permission_type=DeployedRangePermissionType.EXECUTE.value), + Mock(user_id=4, permission_type=DeployedRangePermissionType.WRITE.value), + ] + + # User 2: read only + assert can_read_deployed(range_model, 2) is True + assert can_write_deployed(range_model, 2) is False + assert can_execute_deployed(range_model, 2) is False + + # User 3: read + execute (execute includes read) + assert can_read_deployed(range_model, 3) is True + assert can_write_deployed(range_model, 3) is False + assert can_execute_deployed(range_model, 3) is True + + # User 4: read + write + assert can_read_deployed(range_model, 4) is True + assert can_write_deployed(range_model, 4) is True + assert can_execute_deployed(range_model, 4) is False + + +class TestOwnerPermissions: + """Test that owners always have full access regardless of explicit grants.""" + + def test_owner_bypasses_explicit_permissions(self) -> None: + """Owners have access even without explicit permission grants.""" + blueprint_range = Mock() + blueprint_range.owner_id = 1 + blueprint_range.permissions = [] # No explicit permissions + + deployed_range = Mock() + deployed_range.owner_id = 1 + deployed_range.permissions = [] # No explicit permissions + + owner_id = 1 + + # Owner has full blueprint access + assert can_read_blueprint(blueprint_range, owner_id) is True + assert can_write_blueprint(blueprint_range, owner_id) is True + + # Owner has full deployed range access + assert can_read_deployed(deployed_range, owner_id) is True + assert can_write_deployed(deployed_range, owner_id) is True + assert can_execute_deployed(deployed_range, owner_id) is True