-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathauth.py
More file actions
230 lines (198 loc) · 8.9 KB
/
auth.py
File metadata and controls
230 lines (198 loc) · 8.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
#!/usr/bin/env python3
# Author: skondla@me.com
# Purpose: Authentication router — web UI login/signup/logout + OAuth2 JWT API endpoints
# -*- coding: utf-8 -*-
from datetime import timedelta
from typing import Optional
from fastapi import APIRouter, Depends, Form, HTTPException, Request, status
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session
import models
import schemas
import security
from database import get_db
router = APIRouter(tags=["authentication"])
templates = Jinja2Templates(directory="templates")
# ═══════════════════════════════════════════════════════════════════════════════
# Web UI — Login / Signup / Logout
# ═══════════════════════════════════════════════════════════════════════════════
@router.get("/login", response_class=HTMLResponse)
async def login_page(
request: Request,
signup: Optional[str] = None,
next: str = "/restore",
current_user: Optional[models.User] = Depends(security.get_optional_user),
):
"""Render login form. Redirect to /restore if already authenticated."""
if current_user:
return RedirectResponse(url="/restore", status_code=302)
message = "Account created successfully — please log in." if signup == "success" else None
return templates.TemplateResponse("login.html", {
"request": request,
"message": message,
"next": next,
})
@router.post("/login", response_class=HTMLResponse)
async def login_post(
request: Request,
email: str = Form(...),
password: str = Form(...),
remember: Optional[str] = Form(default=None),
next: str = Form(default="/restore"),
db: Session = Depends(get_db),
):
"""Process login form, issue JWT stored in HttpOnly cookie."""
user = db.query(models.User).filter(models.User.email == email).first()
if not user or not security.verify_password(password, user.password):
return templates.TemplateResponse(
"login.html",
{"request": request, "error": "Invalid email or password.", "next": next},
status_code=status.HTTP_401_UNAUTHORIZED,
)
# 7-day token when "remember me" checked, else 30 minutes
max_age = 604_800 if remember else ACCESS_MIN * 60
access_token = security.create_access_token(
data={"sub": user.email},
expires_delta=timedelta(seconds=max_age),
)
refresh_token = security.create_refresh_token(data={"sub": user.email})
response = RedirectResponse(url=next, status_code=302)
_set_auth_cookies(response, access_token, refresh_token, max_age)
return response
@router.get("/signup", response_class=HTMLResponse)
async def signup_page(
request: Request,
current_user: Optional[models.User] = Depends(security.get_optional_user),
):
"""Render sign-up form."""
if current_user:
return RedirectResponse(url="/restore", status_code=302)
return templates.TemplateResponse("signup.html", {"request": request})
@router.post("/signup", response_class=HTMLResponse)
async def signup_post(
request: Request,
email: str = Form(...),
name: str = Form(...),
password: str = Form(...),
db: Session = Depends(get_db),
):
"""Create new user account and redirect to login."""
existing = db.query(models.User).filter(models.User.email == email).first()
if existing:
return templates.TemplateResponse(
"signup.html",
{"request": request, "error": "Email address already registered."},
status_code=status.HTTP_400_BAD_REQUEST,
)
hashed = security.get_password_hash(password)
db.add(models.User(email=email, name=name, password=hashed))
db.commit()
return RedirectResponse(url="/login?signup=success", status_code=302)
@router.get("/logout")
async def logout():
"""Clear auth cookies and redirect to login page."""
response = RedirectResponse(url="/login", status_code=302)
response.delete_cookie("access_token")
response.delete_cookie("refresh_token")
return response
# ═══════════════════════════════════════════════════════════════════════════════
# OAuth2 / JWT API Endpoints (JSON)
# ═══════════════════════════════════════════════════════════════════════════════
@router.post(
"/auth/token",
response_model=schemas.Token,
summary="OAuth2 Password Flow — get JWT",
tags=["OAuth2"],
)
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db),
):
"""
Standard OAuth2 password-flow token endpoint.
- **username**: your email address
- **password**: your password
- Returns `access_token` (30 min) and `refresh_token` (7 days)
- Use `Authorization: Bearer <access_token>` on subsequent API calls
"""
user = db.query(models.User).filter(models.User.email == form_data.username).first()
if not user or not security.verify_password(form_data.password, user.password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = security.create_access_token(
data={"sub": user.email},
expires_delta=timedelta(minutes=security.ACCESS_TOKEN_EXPIRE_MINUTES),
)
refresh_token = security.create_refresh_token(data={"sub": user.email})
return schemas.Token(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer",
)
@router.post(
"/auth/refresh",
response_model=schemas.Token,
summary="Refresh access token",
tags=["OAuth2"],
)
async def refresh_access_token(request: Request, db: Session = Depends(get_db)):
"""
Exchange a valid refresh token (from cookie or JSON body) for a new access token.
"""
refresh_tok = request.cookies.get("refresh_token")
if not refresh_tok:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="No refresh token provided",
)
payload = security.decode_token(refresh_tok)
if not payload or payload.get("type") != "refresh":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired refresh token",
)
user = db.query(models.User).filter(models.User.email == payload.get("sub")).first()
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
new_access = security.create_access_token(data={"sub": user.email})
return schemas.Token(access_token=new_access, token_type="bearer")
@router.get(
"/auth/me",
response_model=schemas.UserResponse,
summary="Get current user info",
tags=["OAuth2"],
)
async def get_me(current_user: models.User = Depends(security.get_current_user)):
"""Return the profile of the currently authenticated user."""
return current_user
@router.post(
"/auth/register",
response_model=schemas.UserResponse,
status_code=status.HTTP_201_CREATED,
summary="Register new user (API)",
tags=["OAuth2"],
)
async def api_register(user_data: schemas.UserCreate, db: Session = Depends(get_db)):
"""API endpoint to create a new user account (returns JSON)."""
if db.query(models.User).filter(models.User.email == user_data.email).first():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered",
)
hashed = security.get_password_hash(user_data.password)
user = models.User(email=user_data.email, name=user_data.name, password=hashed)
db.add(user)
db.commit()
db.refresh(user)
return user
# ─── Private helpers ──────────────────────────────────────────────────────────
ACCESS_MIN = security.ACCESS_TOKEN_EXPIRE_MINUTES
def _set_auth_cookies(response, access_token: str, refresh_token: str, access_max_age: int):
cookie_kwargs = dict(httponly=True, samesite="lax", secure=False)
response.set_cookie("access_token", f"Bearer {access_token}", max_age=access_max_age, **cookie_kwargs)
response.set_cookie("refresh_token", refresh_token, max_age=604_800, **cookie_kwargs)