-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
69 lines (59 loc) · 2.33 KB
/
utils.py
File metadata and controls
69 lines (59 loc) · 2.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import bcrypt
import jwt
from datetime import datetime, timedelta
from config import Config
from functools import wraps
from flask import request, jsonify
def hash_password(password):
"""Hash a password using bcrypt"""
# Convert password to bytes, generate salt, hash it
password_bytes = password.encode('utf-8')
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(password_bytes, salt)
return hashed.decode('utf-8') # Store as string in DB
def verify_password(password, hashed_password):
"""Verify a password against its hash"""
password_bytes = password.encode('utf-8')
hashed_bytes = hashed_password.encode('utf-8')
return bcrypt.checkpw(password_bytes, hashed_bytes)
def generate_jwt(user_id):
"""Generate JWT token for a user"""
payload = {
'user_id': str(user_id), # MongoDB ObjectId to string
'exp': datetime.utcnow() + timedelta(hours=Config.JWT_EXPIRATION_HOURS),
'iat': datetime.utcnow() # Issued at
}
token = jwt.encode(payload, Config.JWT_SECRET_KEY, algorithm=Config.JWT_ALGORITHM)
return token
def verify_jwt(token):
"""Verify JWT token and return payload"""
try:
payload = jwt.decode(token, Config.JWT_SECRET_KEY, algorithms=[Config.JWT_ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
return None # Token expired
except jwt.InvalidTokenError:
return None # Invalid token
def token_required(f):
"""Decorator to protect routes with JWT authentication"""
@wraps(f)
def decorated(*args, **kwargs):
token = None
# Check if token is in headers
if 'Authorization' in request.headers:
auth_header = request.headers['Authorization']
# Expected format: "Bearer <token>"
try:
token = auth_header.split(' ')[1]
except IndexError:
return jsonify({'error': 'Invalid token format'}), 401
if not token:
return jsonify({'error': 'Token is missing'}), 401
# Verify token
payload = verify_jwt(token)
if not payload:
return jsonify({'error': 'Token is invalid or expired'}), 401
# Add user_id to kwargs so route can use it
kwargs['user_id'] = payload['user_id']
return f(*args, **kwargs)
return decorated