Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ security_audit.jsonl.*
**/security_audit.jsonl.*
app/security_audit.jsonl
app/security_audit.jsonl.*
app/r_key.txt
app/r_secret.txt
app/r_key.enc
app/r_secret.enc
app/.pt_salt
app/.pt_cred_meta

# Database files (SQLite, etc.)
*.db
Expand Down
185 changes: 160 additions & 25 deletions app/pt_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import tempfile
import threading
import time
from dataclasses import dataclass, asdict
from dataclasses import asdict, dataclass, field
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Set, Tuple

Expand All @@ -25,6 +25,17 @@

logger = logging.getLogger(__name__)


def _get_security_logger():
"""Return SecurityLogger singleton if available, else None."""
try:
from pt_security_logger import get_security_logger

return get_security_logger()
except Exception:
return None


# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -90,6 +101,7 @@ class PermissionAuditResult:
missing_trading: List[str]
audit_passed: bool
message: str
excess_permissions: List[str] = field(default_factory=list)

def to_dict(self) -> dict:
return asdict(self)
Expand Down Expand Up @@ -369,6 +381,9 @@ def decrypt_credentials(self) -> Optional[Tuple[str, str]]:
cipher = Fernet(self._derive_key(self._get_machine_password(), salt))
api_key = cipher.decrypt(key_blob).decode("utf-8").strip()
private_key = cipher.decrypt(secret_blob).decode("utf-8").strip()
sec_logger = _get_security_logger()
if sec_logger is not None:
sec_logger.log_credential_use("robinhood", "decrypt_credentials")
return api_key, private_key
except Exception as exc:
logger.debug(
Expand Down Expand Up @@ -410,6 +425,12 @@ def decrypt_credentials(self) -> Optional[Tuple[str, str]]:
self._save_metadata(refreshed)
except Exception as exc:
logger.warning("Re-encrypt after legacy decrypt failed: %s", exc)
sec_logger = _get_security_logger()
if sec_logger is not None:
sec_logger.log_credential_use(
"robinhood",
"decrypt_credentials_legacy_migration",
)
return api_key, private_key

# ------------------------------------------------------------------
Expand Down Expand Up @@ -483,12 +504,22 @@ def rotate_credentials(
except OSError:
pass
logger.info("Credentials rotated successfully")
sec_logger = _get_security_logger()
if sec_logger is not None:
sec_logger.log_credential_rotation("robinhood", True)
return True

raise RuntimeError("encrypt_credentials returned False")

except Exception as exc:
logger.error("Credential rotation failed: %s", exc)
sec_logger = _get_security_logger()
if sec_logger is not None:
sec_logger.log_credential_rotation(
"robinhood",
False,
details={"error": str(exc)},
)
if backed_up:
try:
# os.replace is atomic (POSIX rename): no partial-restore window
Expand Down Expand Up @@ -644,6 +675,10 @@ def validate(
missing_trading = (
sorted(TRADING_PERMISSIONS - granted) if require_trading else []
)
required_now = set(REQUIRED_PERMISSIONS)
if require_trading:
required_now |= TRADING_PERMISSIONS
excess_permissions = sorted(granted - required_now)
has_required = len(missing_required) == 0
has_trading = len(missing_trading) == 0
audit_passed = has_required and (has_trading if require_trading else True)
Expand All @@ -662,6 +697,13 @@ def validate(
f"Live trading will be unavailable."
)
logger.warning(message)
if excess_permissions:
compliance = (
f"PERMISSION COMPLIANCE WARNING: API key has more permissions than "
f"required: {excess_permissions}. Least-privilege is recommended."
)
logger.warning(compliance)
message = f"{message} {compliance}"

result = PermissionAuditResult(
timestamp=now,
Expand All @@ -670,12 +712,31 @@ def validate(
granted_permissions=sorted(granted),
missing_required=missing_required,
missing_trading=missing_trading,
excess_permissions=excess_permissions,
audit_passed=audit_passed,
message=message,
)
self._log_audit(result)
self._log_security_audit(result)
return result

def _log_security_audit(self, result: PermissionAuditResult) -> None:
sec_logger = _get_security_logger()
if sec_logger is None:
return
sec_logger.log_credential_use("robinhood", "permission_validation")
for permission in result.missing_required + result.missing_trading:
sec_logger.log_permission_denied(
"robinhood",
permission,
details={"granted_permissions": result.granted_permissions},
)
if result.excess_permissions:
sec_logger.log_permission_compliance_warning(
"robinhood",
result.excess_permissions,
)

def _log_audit(self, result: PermissionAuditResult) -> None:
"""Append audit result to JSONL log. Rotates when MAX_AUDIT_LINES is
reached (renames active log to ``*.1``, drops older rotations). This
Expand Down Expand Up @@ -856,38 +917,53 @@ def get_credentials() -> Optional[Tuple[str, str]]:
Returns (api_key, private_key_b64) or None.
"""
manager = SecureCredentialManager()
sec_logger = _get_security_logger()
if sec_logger is None:
logger.warning(
"Security logger unavailable; credential access events will not be "
"recorded in security_audit.jsonl."
)

if manager.has_encrypted_credentials():
return manager.decrypt_credentials()
creds = manager.decrypt_credentials()
if sec_logger is not None:
if creds:
sec_logger.log_credential_use("robinhood", "get_credentials_vault")
else:
sec_logger.log_auth_attempt(
"robinhood",
False,
details={"operation": "get_credentials_vault"},
)
return creds

env_key = os.environ.get("POWERTRADER_ROBINHOOD_API_KEY")
env_secret = os.environ.get("POWERTRADER_ROBINHOOD_PRIVATE_KEY")
if env_key and env_secret:
if sec_logger is not None:
sec_logger.log_credential_use("robinhood", "get_credentials_environment")
return env_key.strip(), env_secret.strip()

if manager.has_plaintext_credentials():
if manager.migrate_from_plaintext():
return manager.decrypt_credentials()
# Plaintext fallback: migration failed (e.g. vault write permission
# denied). Return plaintext creds rather than locking the user out.
# Logged at error level so the degraded security posture is visible.
creds = manager.decrypt_credentials()
if sec_logger is not None:
if creds:
sec_logger.log_credential_use(
"robinhood", "get_credentials_migrated"
)
else:
sec_logger.log_auth_attempt(
"robinhood",
False,
details={"operation": "get_credentials_migrated"},
)
return creds
logger.error(
"SECURITY DEGRADATION: encrypted vault write failed — returning "
"PLAINTEXT credentials. Callers cannot distinguish vault-backed "
"from plaintext via this API. Fix vault permissions and re-run "
"to migrate."
"SECURITY ALERT: Plaintext credentials were detected but migration "
"to encrypted storage failed. Refusing to use plaintext credentials."
)
try:
base_dir = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(base_dir, "r_key.txt"), "r", encoding="utf-8") as f:
api_key = f.read().strip()
with open(
os.path.join(base_dir, "r_secret.txt"), "r", encoding="utf-8"
) as f:
private_key = f.read().strip()
return api_key, private_key
except OSError:
pass
return None

return None

Expand All @@ -911,14 +987,73 @@ def validate_credentials_on_startup(
manager = SecureCredentialManager(base_dir)
validator = PermissionValidator(base_dir)
messages = []
sec_logger = _get_security_logger()
if sec_logger is None:
logger.warning(
"Security logger unavailable during startup validation; credential "
"access audit events will not be recorded."
)

env_key = os.environ.get("POWERTRADER_ROBINHOOD_API_KEY")
env_secret = os.environ.get("POWERTRADER_ROBINHOOD_PRIVATE_KEY")
has_env_credentials = bool(env_key and env_secret)

if manager.has_encrypted_credentials():
creds = manager.decrypt_credentials()
if creds is None:
return (
False,
"SECURITY ALERT: Encrypted credential vault is present but unreadable "
"or corrupt. Startup rejected.",
)
api_key, private_key = creds
if not api_key or not private_key:
return (
False,
"SECURITY ALERT: Encrypted credential vault is present but unreadable "
"or corrupt. Startup rejected.",
)
if sec_logger is not None:
sec_logger.log_credential_use("robinhood", "startup_validation_vault")
elif manager.has_plaintext_credentials():
if not manager.migrate_from_plaintext():
return (
False,
"SECURITY ALERT: Plaintext credentials detected but migration failed. "
"Startup rejected to avoid insecure credential use.",
)
creds = manager.decrypt_credentials()
if not creds:
return (
False,
"SECURITY ALERT: Plaintext migration completed but encrypted vault "
"could not be read. Startup rejected.",
)
if sec_logger is not None:
sec_logger.log_credential_use("robinhood", "startup_validation_migrated")
elif not has_env_credentials:
return (
False,
"SECURITY ALERT: Missing API credentials. Configure encrypted credentials "
"or set POWERTRADER_ROBINHOOD_API_KEY / POWERTRADER_ROBINHOOD_PRIVATE_KEY.",
)
elif sec_logger is not None:
sec_logger.log_credential_use("robinhood", "startup_validation_environment")

warning = manager.check_rotation_warning()
if warning:
messages.append(warning)
if notify_rotation:
notify_rotation(warning)

audit = validator.validate(permission_fetcher, require_trading)
messages.append(audit.message)

return audit.audit_passed, " | ".join(messages)
if permission_fetcher is None:
skip_msg = "Permission validation skipped: no permission_fetcher provided."
logger.warning(skip_msg)
messages.append(skip_msg)
audit_passed = True
else:
audit = validator.validate(permission_fetcher, require_trading)
messages.append(audit.message)
audit_passed = audit.audit_passed

return audit_passed, " | ".join(messages)
22 changes: 21 additions & 1 deletion app/pt_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import re
import subprocess
import sys
from typing import Dict, List, Optional, Tuple
from typing import Callable, Dict, List, Optional, Tuple

from pkg_resources import parse_version

Expand Down Expand Up @@ -292,5 +292,25 @@ def run_dependency_audit():
return results


def validate_startup_api_permissions(
permission_fetcher: Optional[Callable[[], List[str]]] = None,
require_trading: bool = True,
base_dir: Optional[str] = None,
) -> Tuple[bool, str]:
"""
Security startup hook for credential + API permission validation.

Delegates to pt_credentials.validate_credentials_on_startup so callers of
this module can run startup checks from a security entry point.
"""
from pt_credentials import validate_credentials_on_startup

return validate_credentials_on_startup(
permission_fetcher=permission_fetcher,
require_trading=require_trading,
base_dir=base_dir,
)


if __name__ == "__main__":
run_dependency_audit()
28 changes: 28 additions & 0 deletions app/pt_security_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class SecurityEventType(Enum):
CREDENTIAL_ROTATION = "credential_rotation" # Credential rotated
SUSPICIOUS_ACTIVITY = "suspicious_activity" # Anomalous behavior detected
PERMISSION_DENIED = "permission_denied" # Insufficient API permissions
PERMISSION_COMPLIANCE = (
"permission_compliance_warning" # API key has excessive scope
)
RATE_LIMIT = "rate_limit" # Rate limit hit
TRADE_EXECUTED = "trade_executed" # Order placed
TRADE_REJECTED = "trade_rejected" # Order rejected
Expand Down Expand Up @@ -406,6 +409,31 @@ def log_permission_denied(
)
)

def log_permission_compliance_warning(
self,
api_name: str,
excess_permissions: List[str],
details: Optional[Dict[str, Any]] = None,
) -> None:
"""Log least-privilege non-compliance for API key scope."""
msg = (
f"Permission compliance warning on {api_name}: "
f"excess permissions {sorted(excess_permissions)}"
)
logger.warning("SECURITY: %s", msg)
self._emit(
self._make_event(
SecurityEventType.PERMISSION_COMPLIANCE,
msg,
source=api_name,
success=True,
details={
**(details or {}),
"excess_permissions": sorted(excess_permissions),
},
)
)

def log_trade_event(
self,
symbol: str,
Expand Down
Loading