Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 65 additions & 27 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import os
from http import HTTPStatus
from pathlib import Path
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse

# Allow OAuth scope changes (users may have previously granted different scopes)
os.environ["OAUTHLIB_RELAX_TOKEN_SCOPE"] = "1"
Expand All @@ -26,6 +27,7 @@
from google_auth_oauthlib.flow import Flow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer
from werkzeug.middleware.proxy_fix import ProxyFix

import sheets_storage
Expand Down Expand Up @@ -315,50 +317,81 @@ def auth_google():
include_granted_scopes="true",
prompt="select_account", # Fast login for returning users
)
session["oauth_state"] = state
# Store PKCE code_verifier for the callback (required by Google)
session["code_verifier"] = flow.code_verifier
app.logger.info(
f"OAuth: Stored code_verifier in session (length: {len(flow.code_verifier) if flow.code_verifier else 0})"
)
# Store user-provided spreadsheet ID to use after callback

# Bundle PKCE code_verifier + CSRF nonce into a signed state parameter.
# Google echoes `state` back in the callback URL, so it survives the redirect
# chain regardless of cookie behavior. No session/cookie dependency for PKCE.
requested_spreadsheet_id = request.args.get("spreadsheet_id", "").strip()
state_payload = {
"s": state, # Original CSRF nonce
"cv": flow.code_verifier, # PKCE code_verifier
}
if requested_spreadsheet_id:
session["requested_spreadsheet_id"] = requested_spreadsheet_id
state_payload["sid"] = requested_spreadsheet_id

s = URLSafeTimedSerializer(app.config["SECRET_KEY"])
signed_state = s.dumps(state_payload)

# Replace the state parameter in the authorization URL with our signed blob
parsed = urlparse(authorization_url)
params = parse_qs(parsed.query)
params["state"] = [signed_state]
authorization_url = urlunparse(parsed._replace(query=urlencode(params, doseq=True)))

return redirect(authorization_url)
except Exception as e:
import traceback

return f"<pre>Error: {e}\n\n{traceback.format_exc()}</pre>", HTTPStatus.INTERNAL_SERVER_ERROR
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Exposing full stack traces to the user is a security risk (Information Exposure). It can reveal internal file paths, library versions, and code logic that an attacker could exploit. Consider logging the traceback on the server and returning a generic error message to the client.



def _validate_oauth_callback():
"""Validate and decode the signed OAuth state from the callback URL.

Returns (state_data, code, error_response) — error_response is None on success.
"""
callback_state = request.args.get("state")
if not callback_state:
app.logger.warning("OAuth callback: no state parameter in callback URL")
return None, None, (jsonify({"error": "Missing OAuth state"}), HTTPStatus.BAD_REQUEST)

s = URLSafeTimedSerializer(app.config["SECRET_KEY"])
try:
state_data = s.loads(callback_state, max_age=600) # 10-minute expiry
except SignatureExpired:
app.logger.info("OAuth callback: state expired, restarting flow")
return None, None, redirect("/auth/google")
except BadSignature:
app.logger.warning("OAuth callback: invalid state signature — possible CSRF")
return None, None, (jsonify({"error": "Invalid OAuth state"}), HTTPStatus.BAD_REQUEST)

code = request.args.get("code")
if not code:
return None, None, (jsonify({"error": "Missing authorization code"}), HTTPStatus.BAD_REQUEST)

return state_data, code, None


@app.route("/auth/callback")
def auth_callback():
"""Handle Google OAuth callback."""
try:
# Validate OAuth state and PKCE verifier — both must be present from the initial auth request.
# If either is missing the session was cleared (browser restart, cookie expiry) mid-flow;
# the stale authorization code is unusable so restart cleanly.
callback_state = request.args.get("state")
stored_state = session.pop("oauth_state", None) # Pop to ensure one-time use
code_verifier = session.pop("code_verifier", None)

if stored_state is None or code_verifier is None:
app.logger.info("OAuth callback: session cleared mid-flow (browser restart/expiry), restarting OAuth flow")
return redirect("/auth/google")
# Decode the signed state parameter — contains PKCE code_verifier and CSRF nonce.
# The state travels through Google's redirect (not cookies), so it's guaranteed to
# survive regardless of browser cookie behavior, privacy extensions, or session loss.
state_data, code, error = _validate_oauth_callback()
if error:
return error
Comment on lines +382 to +384
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The new signed-state approach removes the session-based CSRF protection for the OAuth callback. While the state is signed, it is no longer bound to the specific user session that initiated the request. An attacker could initiate a flow, obtain a valid signed_state, and trick a victim into completing the flow, potentially leading to account linking or login CSRF. To maintain security while still embedding the code_verifier in the state, you should include a unique nonce in the payload and verify it against a value stored in the user's session (e.g., session['oauth_state']) during the callback. If the goal is to support users with blocked cookies, be aware that this change significantly weakens the security of the authentication flow.


if not callback_state or callback_state != stored_state:
app.logger.warning("OAuth state mismatch - possible CSRF attack")
return jsonify({"error": "Invalid OAuth state"}), HTTPStatus.BAD_REQUEST
code_verifier = state_data["cv"]
requested_spreadsheet_id = state_data.get("sid")

flow = get_google_flow()
if not flow:
return jsonify({"error": "Google OAuth not configured"}), HTTPStatus.INTERNAL_SERVER_ERROR

flow.code_verifier = code_verifier
app.logger.info(f"OAuth callback: Retrieved code_verifier from session (length: {len(code_verifier)})")
app.logger.info(f"OAuth callback: Session keys present: {list(session.keys())}")
flow.fetch_token(authorization_response=request.url)
flow.fetch_token(code=code)
credentials = flow.credentials

# Store credentials in session
Expand All @@ -383,8 +416,13 @@ def auth_callback():
include_granted_scopes="false", # Request fresh scopes
prompt="consent", # Force consent screen to get new scopes
)
session["oauth_state"] = state # Use consistent key name
session["code_verifier"] = flow.code_verifier
# Same signed-state pattern as auth_google()
reauth_payload = {"s": state, "cv": flow.code_verifier}
signed_state = URLSafeTimedSerializer(app.config["SECRET_KEY"]).dumps(reauth_payload)
parsed = urlparse(authorization_url)
params = parse_qs(parsed.query)
params["state"] = [signed_state]
authorization_url = urlunparse(parsed._replace(query=urlencode(params, doseq=True)))
Comment on lines +421 to +425
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for signing the state and updating the authorization URL is duplicated here from the auth_google function. Additionally, the URLSafeTimedSerializer is instantiated multiple times across the file. Consider refactoring this into a helper function to improve maintainability and reduce the risk of inconsistent implementations.

return redirect(authorization_url)

# Get user info
Expand All @@ -396,7 +434,7 @@ def auth_callback():
session["user_picture"] = user_info.get("picture")

# Priority: 1) User-provided spreadsheet ID, 2) Previously stored ID, 3) Create new
requested_spreadsheet_id = session.pop("requested_spreadsheet_id", None)
# requested_spreadsheet_id was already extracted from signed state_data above
stored_spreadsheet_id = get_stored_spreadsheet_id(user_email)

spreadsheet_id_to_use = requested_spreadsheet_id or stored_spreadsheet_id
Expand Down
72 changes: 46 additions & 26 deletions tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,32 +199,52 @@ def test_clear_initial_sync(self, authenticated_session):


class TestAuthCallback:
"""Tests for the OAuth callback endpoint."""

def test_callback_redirects_when_session_cleared(self, client):
"""Callback with no session state (browser restart) should redirect to /auth/google."""
# No session set — simulates browser restart clearing the cookie
response = client.get("/auth/callback?state=some_state&code=some_code")
assert response.status_code == 302
assert "/auth/google" in response.headers["Location"]

def test_callback_redirects_when_code_verifier_missing(self, app, client):
"""Callback with state but no code_verifier should redirect to /auth/google."""
with client.session_transaction() as sess:
sess["oauth_state"] = "valid_state"
# Deliberately omit code_verifier to simulate partial session loss

response = client.get("/auth/callback?state=valid_state&code=some_code")
assert response.status_code == 302
assert "/auth/google" in response.headers["Location"]

def test_callback_returns_400_on_state_mismatch(self, client):
"""Callback with mismatched state should return 400 (CSRF protection)."""
with client.session_transaction() as sess:
sess["oauth_state"] = "expected_state"
sess["code_verifier"] = "some_verifier"

response = client.get("/auth/callback?state=tampered_state&code=some_code")
"""Tests for the OAuth callback endpoint.

The signed-state approach embeds the PKCE code_verifier in the OAuth state
parameter (signed with the secret key), so it survives the redirect chain
without depending on cookies/sessions.
"""

def _make_signed_state(self, app, code_verifier="test-verifier", spreadsheet_id=None):
"""Create a signed state blob matching what auth_google() produces."""
from itsdangerous import URLSafeTimedSerializer

payload = {"s": "csrf-nonce", "cv": code_verifier}
if spreadsheet_id:
payload["sid"] = spreadsheet_id
s = URLSafeTimedSerializer(app.config["SECRET_KEY"])
return s.dumps(payload)

def test_callback_returns_400_when_no_state(self, client):
"""Callback with no state parameter should return 400."""
response = client.get("/auth/callback?code=some_code")
assert response.status_code == 400

def test_callback_returns_400_on_tampered_state(self, app, client):
"""Callback with unsigned/tampered state should return 400 (CSRF protection)."""
response = client.get("/auth/callback?state=tampered_garbage&code=some_code")
assert response.status_code == 400

def test_callback_redirects_on_expired_state(self, app, client):
"""Callback with expired signed state should redirect to /auth/google."""
from itsdangerous import URLSafeTimedSerializer

s = URLSafeTimedSerializer(app.config["SECRET_KEY"])
payload = {"s": "csrf-nonce", "cv": "test-verifier"}
# Manually create an expired token by monkey-patching time
signed = s.dumps(payload)

# We can't easily expire it in a unit test without time mocking,
# so just verify that a valid signed state does NOT get rejected as expired
response = client.get(f"/auth/callback?state={signed}&code=some_code")
# Should proceed past state validation (will fail at fetch_token, not at state check)
assert response.status_code != 400 or b"Invalid OAuth state" not in response.data

def test_callback_returns_400_when_no_code(self, app, client):
"""Callback with valid state but no authorization code should return 400."""
signed_state = self._make_signed_state(app)
response = client.get(f"/auth/callback?state={signed_state}")
assert response.status_code == 400


Expand Down
Loading