diff --git a/app/src/App.tsx b/app/src/App.tsx index f0dc5942..d9ed9e1f 100644 --- a/app/src/App.tsx +++ b/app/src/App.tsx @@ -16,6 +16,7 @@ import NotFound from "./pages/NotFound"; import { Landing } from "./pages/Landing"; import ProtectedRoute from "./components/auth/ProtectedRoute"; import Account from "./pages/Account"; +import { Security } from "./pages/Security"; const queryClient = new QueryClient({ defaultOptions: { @@ -91,6 +92,14 @@ const App = () => ( } /> + + + + } + /> } /> } /> diff --git a/app/src/api/security.ts b/app/src/api/security.ts new file mode 100644 index 00000000..a8960516 --- /dev/null +++ b/app/src/api/security.ts @@ -0,0 +1,31 @@ +import { api } from './client'; + +export interface LoginEvent { + id: number; + ip_address: string | null; + user_agent: string | null; + success: boolean; + anomaly_score: number; + anomaly_reasons: string | null; + created_at: string; +} + +export interface LoginStats { + total_logins: number; + unique_ips: number; + unique_devices: number; + suspicious_count: number; + last_anomaly: string | null; +} + +export async function getLoginHistory(limit = 50): Promise { + return api(`/security/login-history?limit=${limit}`); +} + +export async function getAnomalies(limit = 50): Promise { + return api(`/security/anomalies?limit=${limit}`); +} + +export async function getLoginStats(): Promise { + return api('/security/login-stats'); +} diff --git a/app/src/components/layout/Navbar.tsx b/app/src/components/layout/Navbar.tsx index c7593b70..dc1f48e7 100644 --- a/app/src/components/layout/Navbar.tsx +++ b/app/src/components/layout/Navbar.tsx @@ -13,6 +13,7 @@ const navigation = [ { name: 'Reminders', href: '/reminders' }, { name: 'Expenses', href: '/expenses' }, { name: 'Analytics', href: '/analytics' }, + { name: 'Security', href: '/security' }, ]; export function Navbar() { diff --git a/app/src/pages/Security.tsx b/app/src/pages/Security.tsx new file mode 100644 index 00000000..fdc23b18 --- /dev/null +++ b/app/src/pages/Security.tsx @@ -0,0 +1,214 @@ +import { useEffect, useState } from 'react'; +import { + Shield, + AlertTriangle, + CheckCircle, + Monitor, + Globe, + Clock, + RefreshCw, +} from 'lucide-react'; +import { getLoginHistory, getLoginStats, LoginEvent, LoginStats } from '@/api/security'; +import { useToast } from '@/components/ui/use-toast'; + +function riskLabel(score: number): { label: string; color: string } { + if (score === 0) return { label: 'Safe', color: 'text-green-600' }; + if (score < 0.4) return { label: 'Low', color: 'text-yellow-500' }; + if (score < 0.7) return { label: 'Medium', color: 'text-orange-500' }; + return { label: 'High', color: 'text-red-600' }; +} + +function formatDate(iso: string | null): string { + if (!iso) return '—'; + return new Date(iso).toLocaleString(); +} + +export function Security() { + const [stats, setStats] = useState(null); + const [history, setHistory] = useState([]); + const [loading, setLoading] = useState(true); + const { toast } = useToast(); + + const load = async () => { + setLoading(true); + try { + const [s, h] = await Promise.all([getLoginStats(), getLoginHistory(20)]); + setStats(s); + setHistory(h); + } catch (err: unknown) { + toast({ + variant: 'destructive', + title: 'Failed to load security data', + description: err instanceof Error ? err.message : 'Unknown error', + }); + } finally { + setLoading(false); + } + }; + + useEffect(() => { + load(); + // eslint-disable-next-line react-hooks/exhaustive-deps + }, []); + + if (loading) { + return ( +
+ + Loading security data… +
+ ); + } + + return ( +
+ {/* Header */} +
+
+ +
+

Security Center

+

+ Monitor login activity and suspicious behaviour +

+
+
+ +
+ + {/* Stats cards */} + {stats && ( +
+ } + label="Total Logins" + value={stats.total_logins} + /> + } + label="Unique IPs" + value={stats.unique_ips} + /> + } + label="Unique Devices" + value={stats.unique_devices} + /> + } + label="Suspicious Events" + value={stats.suspicious_count} + highlight={stats.suspicious_count > 0} + /> +
+ )} + + {stats?.last_anomaly && ( +
+ + + Last suspicious login detected at{' '} + {formatDate(stats.last_anomaly)}. Review your login + history below. + +
+ )} + + {/* Login history */} +
+

+ + Recent Login Activity +

+ {history.length === 0 ? ( +

No login events recorded yet.

+ ) : ( +
+ + + + + + + + + + + + {history.map((evt) => { + const { label, color } = riskLabel(evt.anomaly_score); + return ( + 0 + ? 'bg-orange-50/40 hover:bg-orange-50/60' + : 'hover:bg-muted/30' + } + > + + + + + + + ); + })} + +
Date & TimeIP AddressStatusRiskDetails
+ {formatDate(evt.created_at)} + + {evt.ip_address || '—'} + + {evt.success ? ( + + + Success + + ) : ( + + + Failed + + )} + {label} + {evt.anomaly_reasons || '—'} +
+
+ )} +
+
+ ); +} + +function StatCard({ + icon, + label, + value, + highlight = false, +}: { + icon: React.ReactNode; + label: string; + value: number; + highlight?: boolean; +}) { + return ( +
+
+ {icon} + {label} +
+
{value}
+
+ ); +} diff --git a/packages/backend/app/__init__.py b/packages/backend/app/__init__.py index cdf76b45..66ed75e5 100644 --- a/packages/backend/app/__init__.py +++ b/packages/backend/app/__init__.py @@ -36,7 +36,7 @@ def create_app(settings: Settings | None = None) -> Flask: ) # Logging - log_level = os.getenv("LOG_LEVEL", "INFO").upper() + log_level = (os.environ.get("LOG_LEVEL") or "INFO").upper() configure_logging(log_level) logger = logging.getLogger("finmind") logger.info("Starting FinMind backend with log level %s", log_level) @@ -110,10 +110,29 @@ def _ensure_schema_compatibility(app: Flask) -> None: NOT NULL DEFAULT 'INR' """ ) + # Create login_events table if it doesn't exist (for existing deployments) + cur.execute( + """ + CREATE TABLE IF NOT EXISTS login_events ( + id SERIAL PRIMARY KEY, + user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + ip_address VARCHAR(45), + user_agent VARCHAR(512), + success BOOLEAN NOT NULL DEFAULT TRUE, + anomaly_score NUMERIC(4, 2) NOT NULL DEFAULT 0.0, + anomaly_reasons VARCHAR(512), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + """ + ) + cur.execute( + "CREATE INDEX IF NOT EXISTS idx_login_events_user_id " + "ON login_events(user_id)" + ) conn.commit() except Exception: app.logger.exception( - "Schema compatibility patch failed for users.preferred_currency" + "Schema compatibility patch failed" ) conn.rollback() finally: diff --git a/packages/backend/app/db/schema.sql b/packages/backend/app/db/schema.sql index 410189de..bdcfae45 100644 --- a/packages/backend/app/db/schema.sql +++ b/packages/backend/app/db/schema.sql @@ -123,3 +123,20 @@ CREATE TABLE IF NOT EXISTS audit_logs ( action VARCHAR(100) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW() ); + +-- Login anomaly detection +CREATE TABLE IF NOT EXISTS login_events ( + id SERIAL PRIMARY KEY, + user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + ip_address VARCHAR(45), + user_agent VARCHAR(512), + success BOOLEAN NOT NULL DEFAULT TRUE, + anomaly_score NUMERIC(4, 2) NOT NULL DEFAULT 0.0, + anomaly_reasons VARCHAR(512), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_login_events_user_id ON login_events(user_id); +CREATE INDEX IF NOT EXISTS idx_login_events_created_at ON login_events(created_at DESC); +CREATE INDEX IF NOT EXISTS idx_login_events_anomaly ON login_events(user_id, anomaly_score) + WHERE anomaly_score > 0; diff --git a/packages/backend/app/models.py b/packages/backend/app/models.py index 64d44810..58b62b8a 100644 --- a/packages/backend/app/models.py +++ b/packages/backend/app/models.py @@ -133,3 +133,19 @@ class AuditLog(db.Model): user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True) action = db.Column(db.String(100), nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + + +class LoginEvent(db.Model): + """Records every login attempt with anomaly-detection metadata.""" + + __tablename__ = "login_events" + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False) + ip_address = db.Column(db.String(45), nullable=True) # supports IPv6 + user_agent = db.Column(db.String(512), nullable=True) + success = db.Column(db.Boolean, nullable=False, default=True) + anomaly_score = db.Column(db.Numeric(4, 2), nullable=False, default=0.0) + anomaly_reasons = db.Column(db.String(512), nullable=True) + created_at = db.Column( + db.DateTime(timezone=True), default=datetime.utcnow, nullable=False + ) diff --git a/packages/backend/app/routes/__init__.py b/packages/backend/app/routes/__init__.py index f13b0f89..6bbd11e1 100644 --- a/packages/backend/app/routes/__init__.py +++ b/packages/backend/app/routes/__init__.py @@ -7,6 +7,7 @@ from .categories import bp as categories_bp from .docs import bp as docs_bp from .dashboard import bp as dashboard_bp +from .security import bp as security_bp def register_routes(app: Flask): @@ -18,3 +19,4 @@ def register_routes(app: Flask): app.register_blueprint(categories_bp, url_prefix="/categories") app.register_blueprint(docs_bp, url_prefix="/docs") app.register_blueprint(dashboard_bp, url_prefix="/dashboard") + app.register_blueprint(security_bp, url_prefix="/security") diff --git a/packages/backend/app/routes/auth.py b/packages/backend/app/routes/auth.py index 05a39377..490e6b83 100644 --- a/packages/backend/app/routes/auth.py +++ b/packages/backend/app/routes/auth.py @@ -10,6 +10,7 @@ ) from ..extensions import db, redis_client from ..models import User +from ..services.login_anomaly import record_login import logging import time @@ -55,10 +56,26 @@ def login(): data = request.get_json() or {} email = data.get("email") password = data.get("password") + ip = request.headers.get("X-Forwarded-For", request.remote_addr) or "" + ua = (request.headers.get("User-Agent") or "")[:512] + user = db.session.query(User).filter_by(email=email).first() if not user or not check_password_hash(user.password_hash, password): logger.warning("Login failed for email=%s", email) + # Record failed attempt if we can identify the user + if user: + try: + record_login(user.id, ip, ua, success=False) + except Exception: + logger.exception("Failed to record failed login event") return jsonify(error="invalid credentials"), 401 + + # Record successful login (anomaly score computed inside) + try: + record_login(user.id, ip, ua, success=True) + except Exception: + logger.exception("Failed to record login event") + access = create_access_token(identity=str(user.id)) refresh = create_refresh_token(identity=str(user.id)) _store_refresh_session(refresh, str(user.id)) diff --git a/packages/backend/app/routes/security.py b/packages/backend/app/routes/security.py new file mode 100644 index 00000000..ad9e33b2 --- /dev/null +++ b/packages/backend/app/routes/security.py @@ -0,0 +1,99 @@ +"""Security routes — login history and anomaly reporting.""" + +from __future__ import annotations + +from flask import Blueprint, jsonify, request +from flask_jwt_extended import jwt_required, get_jwt_identity + +from ..extensions import db +from ..models import LoginEvent + +bp = Blueprint("security", __name__) + +_DEFAULT_LIMIT = 50 + + +@bp.get("/login-history") +@jwt_required() +def login_history(): + """Return recent login events for the authenticated user.""" + uid = int(get_jwt_identity()) + limit = min(int(request.args.get("limit", _DEFAULT_LIMIT)), 200) + events = ( + db.session.query(LoginEvent) + .filter(LoginEvent.user_id == uid) + .order_by(LoginEvent.created_at.desc()) + .limit(limit) + .all() + ) + return jsonify([_event_to_dict(e) for e in events]) + + +@bp.get("/anomalies") +@jwt_required() +def anomalies(): + """Return only suspicious login events (anomaly_score > 0).""" + uid = int(get_jwt_identity()) + limit = min(int(request.args.get("limit", _DEFAULT_LIMIT)), 200) + events = ( + db.session.query(LoginEvent) + .filter(LoginEvent.user_id == uid, LoginEvent.anomaly_score > 0) + .order_by(LoginEvent.created_at.desc()) + .limit(limit) + .all() + ) + return jsonify([_event_to_dict(e) for e in events]) + + +@bp.get("/login-stats") +@jwt_required() +def login_stats(): + """Aggregate security statistics for the authenticated user.""" + uid = int(get_jwt_identity()) + + total = db.session.query(LoginEvent).filter(LoginEvent.user_id == uid).count() + unique_ips = ( + db.session.query(LoginEvent.ip_address) + .filter(LoginEvent.user_id == uid) + .distinct() + .count() + ) + unique_devices = ( + db.session.query(LoginEvent.user_agent) + .filter(LoginEvent.user_id == uid) + .distinct() + .count() + ) + suspicious_count = ( + db.session.query(LoginEvent) + .filter(LoginEvent.user_id == uid, LoginEvent.anomaly_score > 0) + .count() + ) + last_anomaly = ( + db.session.query(LoginEvent) + .filter(LoginEvent.user_id == uid, LoginEvent.anomaly_score > 0) + .order_by(LoginEvent.created_at.desc()) + .first() + ) + + return jsonify( + { + "total_logins": total, + "unique_ips": unique_ips, + "unique_devices": unique_devices, + "suspicious_count": suspicious_count, + "last_anomaly": last_anomaly.created_at.isoformat() if last_anomaly else None, + } + ) + + +def _event_to_dict(e: LoginEvent) -> dict: + return { + "id": e.id, + "ip_address": e.ip_address, + "user_agent": e.user_agent, + "success": e.success, + "anomaly_score": float(e.anomaly_score) if e.anomaly_score is not None else 0.0, + "anomaly_reasons": e.anomaly_reasons, + "created_at": e.created_at.isoformat() if e.created_at else None, + } diff --git a/packages/backend/app/services/login_anomaly.py b/packages/backend/app/services/login_anomaly.py new file mode 100644 index 00000000..53fd65e8 --- /dev/null +++ b/packages/backend/app/services/login_anomaly.py @@ -0,0 +1,143 @@ +""" +Login anomaly detection service. + +Detects suspicious login behaviour based on: +- New/unknown IP addresses +- New/unknown user-agent strings (device fingerprint) +- Brute-force: too many failed attempts in a short window +- Unusual hours (2-5 AM UTC) +""" + +from __future__ import annotations + +import logging +from datetime import datetime, timezone, timedelta +from typing import List, Tuple + +from ..extensions import db +from ..models import LoginEvent + +logger = logging.getLogger("finmind.security") + +# Tunable thresholds +BRUTE_FORCE_WINDOW_MINUTES = 15 +BRUTE_FORCE_THRESHOLD = 5 +UNUSUAL_HOUR_START = 2 # 02:00 UTC inclusive +UNUSUAL_HOUR_END = 5 # 04:59 UTC inclusive + + +def _recent_events(user_id: int, minutes: int) -> List[LoginEvent]: + """Return login events for *user_id* in the last *minutes*.""" + cutoff = datetime.now(timezone.utc) - timedelta(minutes=minutes) + return ( + db.session.query(LoginEvent) + .filter( + LoginEvent.user_id == user_id, + LoginEvent.created_at >= cutoff, + ) + .order_by(LoginEvent.created_at.desc()) + .all() + ) + + +def _known_ips(user_id: int) -> set[str]: + rows = ( + db.session.query(LoginEvent.ip_address) + .filter(LoginEvent.user_id == user_id, LoginEvent.success == True) # noqa: E712 + .distinct() + .all() + ) + return {r.ip_address for r in rows if r.ip_address} + + +def _known_agents(user_id: int) -> set[str]: + rows = ( + db.session.query(LoginEvent.user_agent) + .filter(LoginEvent.user_id == user_id, LoginEvent.success == True) # noqa: E712 + .distinct() + .all() + ) + return {r.user_agent for r in rows if r.user_agent} + + +def compute_anomaly_score( + user_id: int, + ip_address: str, + user_agent: str, + success: bool, + now: datetime | None = None, +) -> Tuple[float, List[str]]: + """ + Return (score, reasons) where score is in [0.0, 1.0]. + A higher score means the login looks more suspicious. + """ + if now is None: + now = datetime.now(timezone.utc) + + score = 0.0 + reasons: List[str] = [] + + # 1. New IP address (compared to successful logins) + known_ips = _known_ips(user_id) + if known_ips and ip_address and ip_address not in known_ips: + score += 0.30 + reasons.append("login from new IP address") + + # 2. New device / user-agent + known_agents = _known_agents(user_id) + if known_agents and user_agent and user_agent not in known_agents: + score += 0.25 + reasons.append("login from new device or browser") + + # 3. Brute-force: many failed attempts recently + recent = _recent_events(user_id, BRUTE_FORCE_WINDOW_MINUTES) + recent_failures = [e for e in recent if not e.success] + if len(recent_failures) >= BRUTE_FORCE_THRESHOLD: + score += 0.40 + reasons.append( + f"{len(recent_failures)} failed login attempts in the last " + f"{BRUTE_FORCE_WINDOW_MINUTES} minutes" + ) + + # 4. Unusual hour (2-5 AM UTC) + hour = now.hour + if UNUSUAL_HOUR_START <= hour <= UNUSUAL_HOUR_END: + score += 0.15 + reasons.append(f"login at unusual hour ({hour:02d}:00 UTC)") + + # Clamp to [0, 1] + score = round(min(score, 1.0), 2) + return score, reasons + + +def record_login( + user_id: int, + ip_address: str, + user_agent: str, + success: bool, +) -> LoginEvent: + """ + Persist a LoginEvent and annotate it with the anomaly score. + Returns the saved LoginEvent. + """ + score, reasons = compute_anomaly_score(user_id, ip_address, user_agent, success) + event = LoginEvent( + user_id=user_id, + ip_address=ip_address, + user_agent=user_agent, + success=success, + anomaly_score=score, + anomaly_reasons=", ".join(reasons) if reasons else None, + ) + db.session.add(event) + db.session.commit() + + if score > 0 and success: + logger.warning( + "Suspicious login for user_id=%s score=%.2f reasons=%s ip=%s", + user_id, + score, + reasons, + ip_address, + ) + return event diff --git a/packages/backend/tests/test_login_anomaly.py b/packages/backend/tests/test_login_anomaly.py new file mode 100644 index 00000000..6c99fb5c --- /dev/null +++ b/packages/backend/tests/test_login_anomaly.py @@ -0,0 +1,295 @@ +"""Tests for login anomaly detection — service + security API endpoints.""" + +from __future__ import annotations + +from datetime import datetime, timezone, timedelta +from unittest.mock import patch, MagicMock + +import pytest +from app.extensions import db +from app.models import LoginEvent, User +from werkzeug.security import generate_password_hash + +from app.services.login_anomaly import ( + compute_anomaly_score, + record_login, + BRUTE_FORCE_WINDOW_MINUTES, + BRUTE_FORCE_THRESHOLD, +) + +# --------------------------------------------------------------------------- +# Fixture: patch redis so API route tests don't need a live Redis +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=False) +def mock_redis(monkeypatch): + """Replace the live redis_client with a simple in-memory MagicMock.""" + store: dict = {} + + mock = MagicMock() + mock.get.side_effect = lambda key: store.get(key) + mock.setex.side_effect = lambda key, _ttl, val: store.update({key: val}) + mock.delete.side_effect = lambda key: store.pop(key, None) + mock.flushdb.side_effect = lambda: store.clear() + + import app.routes.auth as auth_module + monkeypatch.setattr(auth_module, "redis_client", mock) + import app.extensions as ext_module + monkeypatch.setattr(ext_module, "redis_client", mock) + return mock + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_user(email: str = "test@example.com") -> User: + user = User( + email=email, + password_hash=generate_password_hash("pass"), + preferred_currency="INR", + ) + db.session.add(user) + db.session.commit() + return user + + +def _login_event(user_id: int, ip: str, ua: str, success: bool, score: float = 0.0): + evt = LoginEvent( + user_id=user_id, + ip_address=ip, + user_agent=ua, + success=success, + anomaly_score=score, + ) + db.session.add(evt) + db.session.commit() + return evt + + +# --------------------------------------------------------------------------- +# Service: compute_anomaly_score +# --------------------------------------------------------------------------- + + +class TestComputeAnomalyScore: + def test_first_login_returns_zero_score(self, app_fixture): + """A brand-new user with no history has no anomalies.""" + with app_fixture.app_context(): + user = _make_user() + score, reasons = compute_anomaly_score( + user.id, "1.2.3.4", "TestBrowser/1.0", success=True + ) + assert score == 0.0 + assert reasons == [] + + def test_new_ip_detected(self, app_fixture): + """Login from a new IP after an established one raises score.""" + with app_fixture.app_context(): + user = _make_user("newip@example.com") + # Seed a successful login from the "known" IP + _login_event(user.id, "10.0.0.1", "UA/1", success=True) + + score, reasons = compute_anomaly_score( + user.id, "192.168.99.1", "UA/1", success=True + ) + assert score > 0 + assert any("new IP" in r for r in reasons) + + def test_known_ip_not_flagged(self, app_fixture): + """Login from a previously seen IP is NOT flagged for IP.""" + with app_fixture.app_context(): + user = _make_user("knownip@example.com") + _login_event(user.id, "10.0.0.1", "UA/1", success=True) + + score, reasons = compute_anomaly_score( + user.id, "10.0.0.1", "UA/1", success=True + ) + assert not any("new IP" in r for r in reasons) + + def test_new_device_detected(self, app_fixture): + """Login from a new user-agent raises score.""" + with app_fixture.app_context(): + user = _make_user("newdev@example.com") + _login_event(user.id, "10.0.0.1", "KnownBrowser/1.0", success=True) + + score, reasons = compute_anomaly_score( + user.id, "10.0.0.1", "UnknownBrowser/99.0", success=True + ) + assert score > 0 + assert any("new device" in r for r in reasons) + + def test_brute_force_detected(self, app_fixture): + """Too many failures in the window should be flagged.""" + with app_fixture.app_context(): + user = _make_user("brute@example.com") + # Insert BRUTE_FORCE_THRESHOLD failed events within the window + for _ in range(BRUTE_FORCE_THRESHOLD): + _login_event(user.id, "10.0.0.1", "UA/1", success=False) + + score, reasons = compute_anomaly_score( + user.id, "10.0.0.1", "UA/1", success=True + ) + assert score > 0 + assert any("failed login" in r for r in reasons) + + def test_brute_force_not_triggered_below_threshold(self, app_fixture): + """Fewer failures than the threshold should not trigger brute-force.""" + with app_fixture.app_context(): + user = _make_user("nobrute@example.com") + for _ in range(BRUTE_FORCE_THRESHOLD - 1): + _login_event(user.id, "10.0.0.1", "UA/1", success=False) + + score, reasons = compute_anomaly_score( + user.id, "10.0.0.1", "UA/1", success=True + ) + assert not any("failed login" in r for r in reasons) + + def test_unusual_hour_flagged(self, app_fixture): + """A login at 3 AM UTC should be flagged.""" + with app_fixture.app_context(): + user = _make_user("oddhour@example.com") + odd_hour_ts = datetime(2024, 1, 15, 3, 0, 0, tzinfo=timezone.utc) + score, reasons = compute_anomaly_score( + user.id, "10.0.0.1", "UA/1", success=True, now=odd_hour_ts + ) + assert any("unusual hour" in r for r in reasons) + + def test_normal_hour_not_flagged(self, app_fixture): + """A login at 10 AM UTC should NOT be flagged for unusual hour.""" + with app_fixture.app_context(): + user = _make_user("normalhour@example.com") + normal_ts = datetime(2024, 1, 15, 10, 0, 0, tzinfo=timezone.utc) + score, reasons = compute_anomaly_score( + user.id, "10.0.0.1", "UA/1", success=True, now=normal_ts + ) + assert not any("unusual hour" in r for r in reasons) + + def test_score_capped_at_one(self, app_fixture): + """Composite score should never exceed 1.0.""" + with app_fixture.app_context(): + user = _make_user("cap@example.com") + _login_event(user.id, "10.0.0.1", "UA/1", success=True) + # Many failures for brute force + for _ in range(BRUTE_FORCE_THRESHOLD + 5): + _login_event(user.id, "10.0.0.1", "UA/1", success=False) + + odd_ts = datetime(2024, 1, 15, 3, 0, 0, tzinfo=timezone.utc) + score, _ = compute_anomaly_score( + user.id, "9.9.9.9", "NewBrowser/2.0", success=True, now=odd_ts + ) + assert score <= 1.0 + + +# --------------------------------------------------------------------------- +# Service: record_login +# --------------------------------------------------------------------------- + + +class TestRecordLogin: + def test_records_event_in_db(self, app_fixture): + with app_fixture.app_context(): + user = _make_user("recorder@example.com") + event = record_login(user.id, "10.0.0.1", "UA/1", success=True) + assert event.id is not None + assert event.user_id == user.id + assert event.success is True + + def test_failed_login_recorded(self, app_fixture): + with app_fixture.app_context(): + user = _make_user("fail@example.com") + event = record_login(user.id, "10.0.0.1", "UA/1", success=False) + assert event.success is False + + +# --------------------------------------------------------------------------- +# API: /security endpoints +# --------------------------------------------------------------------------- + + +class TestSecurityRoutes: + def _register_and_login(self, client, email="sec@example.com", pw="pass1234"): + client.post("/auth/register", json={"email": email, "password": pw}) + r = client.post("/auth/login", json={"email": email, "password": pw}) + token = r.get_json()["access_token"] + return {"Authorization": f"Bearer {token}"} + + @pytest.fixture(autouse=True) + def _use_mock_redis(self, mock_redis): + """Ensure all tests in this class use the mocked Redis.""" + pass + + def test_login_history_empty(self, client, app_fixture): + headers = self._register_and_login(client) + r = client.get("/security/login-history", headers=headers) + assert r.status_code == 200 + data = r.get_json() + assert isinstance(data, list) + # At least the login above is recorded + assert len(data) >= 1 + + def test_login_history_requires_auth(self, client): + r = client.get("/security/login-history") + assert r.status_code == 401 + + def test_anomalies_endpoint(self, client, app_fixture): + headers = self._register_and_login(client, "anom@example.com") + r = client.get("/security/anomalies", headers=headers) + assert r.status_code == 200 + assert isinstance(r.get_json(), list) + + def test_login_stats_structure(self, client, app_fixture): + headers = self._register_and_login(client, "stats@example.com") + r = client.get("/security/login-stats", headers=headers) + assert r.status_code == 200 + data = r.get_json() + assert "total_logins" in data + assert "unique_ips" in data + assert "unique_devices" in data + assert "suspicious_count" in data + assert "last_anomaly" in data + + def test_login_stats_total_increments(self, client, app_fixture): + """Each login increments total_logins.""" + email, pw = "incr@example.com", "pass1234" + client.post("/auth/register", json={"email": email, "password": pw}) + + def get_total(h): + return client.get("/security/login-stats", headers=h).get_json()[ + "total_logins" + ] + + r1 = client.post("/auth/login", json={"email": email, "password": pw}) + h1 = {"Authorization": f"Bearer {r1.get_json()['access_token']}"} + before = get_total(h1) + + client.post("/auth/login", json={"email": email, "password": pw}) + after = get_total(h1) + assert after == before + 1 + + def test_anomalies_requires_auth(self, client): + r = client.get("/security/anomalies") + assert r.status_code == 401 + + def test_login_stats_requires_auth(self, client): + r = client.get("/security/login-stats") + assert r.status_code == 401 + + def test_failed_login_recorded_as_event(self, client, app_fixture): + """Failed logins should appear in login history (success=False).""" + email, pw = "failrec@example.com", "rightpass" + client.post("/auth/register", json={"email": email, "password": pw}) + # Intentional failed attempt + client.post("/auth/login", json={"email": email, "password": "wrongpass"}) + # Successful login + r = client.post("/auth/login", json={"email": email, "password": pw}) + h = {"Authorization": f"Bearer {r.get_json()['access_token']}"} + + history = client.get("/security/login-history", headers=h).get_json() + # At least the successful login is present; failed is also recorded + successes = [e for e in history if e["success"]] + failures = [e for e in history if not e["success"]] + assert len(successes) >= 1 + assert len(failures) >= 1