forked from aqua5230/usage
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathsubscription.py
More file actions
106 lines (93 loc) · 3.51 KB
/
subscription.py
File metadata and controls
106 lines (93 loc) · 3.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# SPDX-License-Identifier: AGPL-3.0-only
# Copyright (C) 2026 lollapalooza <https://github.com/aqua5230>
#
# Part of "usage". Free software licensed under the GNU Affero General Public
# License v3.0 only; see the LICENSE file for full terms and the warranty disclaimer.
"""Read the locally-stored subscription plan + start date for each agent.
Everything here is read-only and stays on disk — we only pull the plan name and
the subscription start date out of the OAuth account files that Claude Code and
Codex already keep. Tokens, emails and account IDs are never read or returned.
"""
from __future__ import annotations
import base64
import binascii
import json
from pathlib import Path
from typing import Any
CLAUDE_CONFIG = Path.home() / ".claude.json"
CODEX_AUTH = Path.home() / ".codex" / "auth.json"
_CLAUDE_PLAN_NAMES = {
"claude_pro": "Claude Pro",
"claude_max": "Claude Max",
"claude_team": "Claude Team",
"claude_enterprise": "Claude Enterprise",
}
def _decode_jwt_payload(token: str) -> dict[str, Any]:
if not isinstance(token, str):
return {}
parts = token.split(".")
if len(parts) < 2:
return {}
payload = parts[1]
payload += "=" * (-len(payload) % 4)
try:
claims = json.loads(base64.urlsafe_b64decode(payload))
if not isinstance(claims, dict):
return {}
return claims
except (binascii.Error, ValueError):
return {}
def _load_claude_subscription() -> dict[str, str | None] | None:
try:
data = json.loads(CLAUDE_CONFIG.read_text(encoding="utf-8"))
except (OSError, ValueError):
return None
if not isinstance(data, dict):
return None
account = data.get("oauthAccount")
if not isinstance(account, dict):
account = {}
org_type = account.get("organizationType")
since = account.get("subscriptionCreatedAt")
if not isinstance(org_type, str):
org_type = None
if not isinstance(since, str):
since = None
if not org_type and not since:
return None
plan = _CLAUDE_PLAN_NAMES.get(org_type or "")
if not plan:
plan = (org_type or "Claude").replace("claude_", "Claude ").replace("_", " ").title()
return {"agent": "Claude Code", "plan": plan, "since": since[:10] if since else None}
def _load_codex_subscription() -> dict[str, str | None] | None:
try:
data = json.loads(CODEX_AUTH.read_text(encoding="utf-8"))
except (OSError, ValueError):
return None
if not isinstance(data, dict):
return None
tokens = data.get("tokens")
if not isinstance(tokens, dict):
tokens = {}
claims = _decode_jwt_payload(tokens.get("id_token", ""))
auth = claims.get("https://api.openai.com/auth")
if not isinstance(auth, dict):
auth = {}
plan_type = auth.get("chatgpt_plan_type")
since = auth.get("chatgpt_subscription_active_start")
if not isinstance(plan_type, str):
plan_type = None
if not isinstance(since, str):
since = None
if not plan_type and not since:
return None
plan = f"ChatGPT {plan_type.title()}" if plan_type else "ChatGPT"
return {"agent": "Codex", "plan": plan, "since": since[:10] if since else None}
def load_subscriptions() -> list[dict[str, str | None]]:
"""Return ``[{agent, plan, since}]`` for whichever agents we can detect."""
subs: list[dict[str, str | None]] = []
for loader in (_load_claude_subscription, _load_codex_subscription):
sub = loader()
if sub:
subs.append(sub)
return subs