-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgithub_client.py
More file actions
106 lines (82 loc) · 3.75 KB
/
github_client.py
File metadata and controls
106 lines (82 loc) · 3.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
"""
Safe GitHub API client with built-in rate limiting.
Author: Jonathan Melton (@JonathanMelton-FusionAL)
License: MIT
"""
import os
import httpx
from typing import Optional, Any, Dict, List
import logging
from .rate_limiter import GitHubRateLimiter, get_limiter
logger = logging.getLogger(__name__)
class SafeGitHubClient:
"""GitHub API client with production-grade safety features."""
def __init__(
self,
token: Optional[str] = None,
limiter: Optional[GitHubRateLimiter] = None,
user_agent: str = "github-mcp-safe/1.0",
base_url: str = "https://api.github.com"
):
self.token = token or os.getenv("GITHUB_TOKEN")
if not self.token:
raise ValueError("GitHub token required (set GITHUB_TOKEN env var)")
self.limiter = limiter or get_limiter()
self.user_agent = user_agent
self.base_url = base_url.rstrip("/")
self._client: Optional[httpx.AsyncClient] = None
async def __aenter__(self):
self._client = httpx.AsyncClient(
timeout=30.0,
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20)
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._client:
await self._client.aclose()
def _get_headers(self) -> Dict[str, str]:
return {
"Authorization": f"Bearer {self.token}",
"User-Agent": self.user_agent,
"X-GitHub-Api-Version": "2022-11-28",
"Accept": "application/vnd.github+json"
}
async def get(self, path: str, params: Optional[Dict[str, Any]] = None, **kwargs) -> Any:
"""Safe GET request with rate limiting."""
url = f"{self.base_url}{path}" if path.startswith("/") else path
async def _request():
if not self._client:
raise RuntimeError("Client not initialized. Use 'async with SafeGitHubClient()'")
resp = await self._client.get(url, params=params, headers=self._get_headers(), **kwargs)
resp.raise_for_status()
self.limiter.update_from_headers(dict(resp.headers))
return resp.json()
return await self.limiter.execute(_request)
async def post(self, path: str, json: Optional[Dict[str, Any]] = None, **kwargs) -> Any:
"""Safe POST request with rate limiting."""
url = f"{self.base_url}{path}" if path.startswith("/") else path
async def _request():
if not self._client:
raise RuntimeError("Client not initialized")
resp = await self._client.post(url, json=json, headers=self._get_headers(), **kwargs)
resp.raise_for_status()
self.limiter.update_from_headers(dict(resp.headers))
return resp.json()
return await self.limiter.execute(_request)
async def graphql(self, query: str, variables: Optional[Dict[str, Any]] = None) -> Any:
"""Execute GraphQL query with rate limiting."""
async def _request():
if not self._client:
raise RuntimeError("Client not initialized")
resp = await self._client.post(
f"{self.base_url}/graphql",
json={"query": query, "variables": variables or {}},
headers=self._get_headers()
)
resp.raise_for_status()
self.limiter.update_from_headers(dict(resp.headers))
data = resp.json()
if "errors" in data:
raise Exception(f"GraphQL errors: {data['errors']}")
return data.get("data", {})
return await self.limiter.execute(_request)