Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 143 additions & 1 deletion backend/app/api/v1/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
from sqlalchemy import func, select
from sqlalchemy.orm import selectinload

from sqlalchemy import update

from app.api.deps import DBSession, PrincipalDep, RequirePermission
from app.api.v1.schemas import PaginatedResponse
from app.core.security import generate_token_secret, hash_password_async, generate_device_code
from app.models import Device, Permission, Role, User, UserRole, RolePermission
from app.models import Device, FilamentRating, OAuthIdentity, Permission, Role, User, UserApiKey, UserPermission, UserRole, UserSession, RolePermission, SpoolEvent

router = APIRouter(prefix="/admin", tags=["admin"])

Expand Down Expand Up @@ -281,6 +283,146 @@ async def delete_user(
await db.commit()


class MergeUserRequest(BaseModel):
source_user_id: int


@router.post("/users/{user_id}/merge", response_model=UserDetailResponse)
async def merge_users(
user_id: int,
data: MergeUserRequest,
db: DBSession,
principal = RequirePermission("admin:users_manage"),
):
"""Merge source user into target user.

All data (sessions, API keys, OAuth identities, spool events, filament ratings,
and roles) is transferred from the source user to the target user. The source
user is then soft-deleted.
"""
if user_id == data.source_user_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"code": "bad_request", "message": "Cannot merge a user with themselves"},
)

# Load target user
target_result = await db.execute(
select(User).where(User.id == user_id, User.deleted_at.is_(None))
)
target_user = target_result.scalar_one_or_none()
if not target_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={"code": "not_found", "message": "Target user not found"},
)

# Load source user
source_result = await db.execute(
select(User).where(User.id == data.source_user_id, User.deleted_at.is_(None))
)
source_user = source_result.scalar_one_or_none()
if not source_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={"code": "not_found", "message": "Source user not found"},
)

# Prevent merging the last superadmin
if source_user.is_superadmin:
count_res = await db.execute(
select(func.count())
.select_from(User)
.where(User.is_superadmin.is_(True))
.where(User.deleted_at.is_(None))
)
superadmin_count = count_res.scalar() or 0
if superadmin_count <= 1:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"code": "bad_request", "message": "Cannot merge the last superuser"},
)

# Transfer sessions
await db.execute(
update(UserSession)
.where(UserSession.user_id == data.source_user_id)
.values(user_id=user_id)
)

# Transfer API keys
await db.execute(
update(UserApiKey)
.where(UserApiKey.user_id == data.source_user_id)
.values(user_id=user_id)
)

# Transfer OAuth identities
await db.execute(
update(OAuthIdentity)
.where(OAuthIdentity.user_id == data.source_user_id)
.values(user_id=user_id)
)

# Transfer spool events (nullable user_id)
await db.execute(
update(SpoolEvent)
.where(SpoolEvent.user_id == data.source_user_id)
.values(user_id=user_id)
)

# Transfer filament ratings, skipping those where target already has a rating
# for the same filament (keep target's rating, discard source's duplicate)
target_rated_result = await db.execute(
select(FilamentRating.filament_id).where(FilamentRating.user_id == user_id)
)
target_rated_ids = {row[0] for row in target_rated_result.fetchall()}

source_ratings_result = await db.execute(
select(FilamentRating).where(FilamentRating.user_id == data.source_user_id)
)
source_ratings = source_ratings_result.scalars().all()

for rating in source_ratings:
if rating.filament_id in target_rated_ids:
await db.delete(rating)
else:
rating.user_id = user_id

# Merge roles: add source roles that the target doesn't already have
target_roles_result = await db.execute(
select(UserRole.role_id).where(UserRole.user_id == user_id)
)
target_role_ids = {row[0] for row in target_roles_result.fetchall()}

source_roles_result = await db.execute(
select(UserRole).where(UserRole.user_id == data.source_user_id)
)
source_role_rows = source_roles_result.scalars().all()

for user_role in source_role_rows:
if user_role.role_id not in target_role_ids:
db.add(UserRole(user_id=user_id, role_id=user_role.role_id))
await db.delete(user_role)

# Soft-delete source user
source_user.deleted_at = datetime.utcnow()
await db.commit()

# Return updated target user with roles
updated_result = await db.execute(
select(User)
.where(User.id == user_id)
.options(selectinload(User.roles))
)
updated_user = updated_result.scalar_one()

return UserDetailResponse(
**{k: getattr(updated_user, k) for k in UserResponse.model_fields},
roles=[r.key for r in updated_user.roles],
)


@router.put("/users/{user_id}/roles")
async def set_user_roles(

Expand Down
173 changes: 173 additions & 0 deletions backend/tests/test_merge_users.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import pytest
from httpx import AsyncClient
from sqlalchemy import select

from app.core.security import hash_password, generate_token_secret
from app.models import User, UserSession, UserRole, Role, UserApiKey, OAuthIdentity


async def _make_auth_client(client, user, db_session):
"""Return (client, csrf_token) for a given user."""
secret = generate_token_secret()
session = UserSession(
user_id=user.id,
session_token_hash=hash_password(secret),
)
db_session.add(session)
await db_session.commit()
await db_session.refresh(session)

token = f"sess.{session.id}.{secret}"
csrf = generate_token_secret()
client.cookies.set("session_id", token)
client.cookies.set("csrf_token", csrf)
return client, csrf


class TestMergeUsers:

@pytest.mark.asyncio
async def test_merge_success(self, client: AsyncClient, admin_user, normal_user, db_session):
"""Merging source user into target transfers data and soft-deletes source."""
client, csrf = await _make_auth_client(client, admin_user, db_session)

# Add an API key to the normal_user (source) so we can verify transfer
api_key = UserApiKey(
user_id=normal_user.id,
name="test-key",
key_hash=hash_password("somekey"),
)
db_session.add(api_key)
await db_session.commit()

response = await client.post(
f"/api/v1/admin/users/{admin_user.id}/merge",
json={"source_user_id": normal_user.id},
headers={"X-CSRF-Token": csrf},
)

assert response.status_code == 200, response.text
data = response.json()
assert data["id"] == admin_user.id

# Source user should be soft-deleted
result = await db_session.execute(select(User).where(User.id == normal_user.id))
source = result.scalar_one()
assert source.deleted_at is not None

# API key should now belong to the target user
await db_session.refresh(api_key)
assert api_key.user_id == admin_user.id

@pytest.mark.asyncio
async def test_merge_self_fails(self, client: AsyncClient, admin_user, db_session):
"""Merging a user into themselves must return 400."""
client, csrf = await _make_auth_client(client, admin_user, db_session)

response = await client.post(
f"/api/v1/admin/users/{admin_user.id}/merge",
json={"source_user_id": admin_user.id},
headers={"X-CSRF-Token": csrf},
)

assert response.status_code == 400
assert response.json()["detail"]["code"] == "bad_request"

@pytest.mark.asyncio
async def test_merge_nonexistent_source_fails(self, client: AsyncClient, admin_user, db_session):
"""Merging with a non-existent source user must return 404."""
client, csrf = await _make_auth_client(client, admin_user, db_session)

response = await client.post(
f"/api/v1/admin/users/{admin_user.id}/merge",
json={"source_user_id": 99999},
headers={"X-CSRF-Token": csrf},
)

assert response.status_code == 404

@pytest.mark.asyncio
async def test_merge_nonexistent_target_fails(self, client: AsyncClient, admin_user, normal_user, db_session):
"""Merging into a non-existent target user must return 404."""
client, csrf = await _make_auth_client(client, admin_user, db_session)

response = await client.post(
"/api/v1/admin/users/99999/merge",
json={"source_user_id": normal_user.id},
headers={"X-CSRF-Token": csrf},
)

assert response.status_code == 404

@pytest.mark.asyncio
async def test_merge_requires_permission(self, client: AsyncClient, normal_user, admin_user, db_session):
"""Non-admin users cannot call the merge endpoint."""
client, csrf = await _make_auth_client(client, normal_user, db_session)

response = await client.post(
f"/api/v1/admin/users/{admin_user.id}/merge",
json={"source_user_id": normal_user.id},
headers={"X-CSRF-Token": csrf},
)

assert response.status_code == 403

@pytest.mark.asyncio
async def test_merge_last_superadmin_fails(self, client: AsyncClient, admin_user, db_session):
"""Merging the last superadmin as source must return 400."""
client, csrf = await _make_auth_client(client, admin_user, db_session)

# Create a non-superadmin target user for the merge
target = User(
email="merge-target@example.com",
password_hash=hash_password("pw"),
is_superadmin=False,
is_active=True,
)
db_session.add(target)
await db_session.commit()
await db_session.refresh(target)

response = await client.post(
f"/api/v1/admin/users/{target.id}/merge",
json={"source_user_id": admin_user.id},
headers={"X-CSRF-Token": csrf},
)

assert response.status_code == 400
assert response.json()["detail"]["code"] == "bad_request"

@pytest.mark.asyncio
async def test_merge_roles_combined(self, client: AsyncClient, admin_user, db_session):
"""Roles from both users are combined in the target after merge."""
client, csrf = await _make_auth_client(client, admin_user, db_session)

# Create source user with a 'user' role
source = User(
email="source-roles@example.com",
password_hash=hash_password("pw"),
is_superadmin=False,
is_active=True,
)
db_session.add(source)
await db_session.commit()
await db_session.refresh(source)

result = await db_session.execute(select(Role).where(Role.key == "user"))
user_role = result.scalar_one_or_none()

if user_role:
db_session.add(UserRole(user_id=source.id, role_id=user_role.id))
await db_session.commit()

response = await client.post(
f"/api/v1/admin/users/{admin_user.id}/merge",
json={"source_user_id": source.id},
headers={"X-CSRF-Token": csrf},
)

assert response.status_code == 200
data = response.json()
# If source had the "user" role and admin didn't, the role should now be in target
if user_role:
assert "user" in data["roles"]
Loading