-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathgithub_api_client.py
More file actions
891 lines (729 loc) · 37.1 KB
/
github_api_client.py
File metadata and controls
891 lines (729 loc) · 37.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
"""
GitHub API Client with enhanced connection handling and error recovery.
This module provides a robust client for GitHub API interactions with:
- Circuit breaker pattern for preventing repeated failures
- Advanced rate limit tracking and preemptive throttling
- Improved connection pooling and error handling
- Caching for repository metadata
- DNS and network diagnostics
"""
import os
import time
import random
import socket
import asyncio
import logging
import platform
import traceback
from urllib.parse import urlparse
import aiohttp
from aiohttp.client_exceptions import (
ClientConnectorError, ServerDisconnectedError,
ClientOSError, ClientPayloadError, ContentTypeError
)
from collections import defaultdict
import json
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('github_api_client')
class CircuitBreaker:
"""
Implements the Circuit Breaker pattern to prevent repeated failed requests.
The circuit transitions between three states:
- CLOSED: Normal operation, requests are allowed
- OPEN: Requests are blocked due to multiple failures
- HALF_OPEN: Testing if the service has recovered
"""
# Circuit states
CLOSED = 'CLOSED'
OPEN = 'OPEN'
HALF_OPEN = 'HALF_OPEN'
def __init__(self, failure_threshold=5, recovery_timeout=30, request_timeout=10):
"""
Initialize a new circuit breaker.
Args:
failure_threshold: Number of failures before circuit opens
recovery_timeout: Seconds to wait before attempting recovery
request_timeout: Timeout for individual requests in seconds
"""
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.request_timeout = request_timeout
self.state = self.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.endpoints = defaultdict(lambda: {
'state': self.CLOSED,
'failure_count': 0,
'last_failure_time': None
})
def record_success(self, endpoint=None):
"""Record a successful request."""
if endpoint:
self.endpoints[endpoint]['state'] = self.CLOSED
self.endpoints[endpoint]['failure_count'] = 0
else:
self.state = self.CLOSED
self.failure_count = 0
def record_failure(self, endpoint=None):
"""Record a failed request."""
now = time.time()
if endpoint:
self.endpoints[endpoint]['failure_count'] += 1
self.endpoints[endpoint]['last_failure_time'] = now
if (self.endpoints[endpoint]['state'] == self.CLOSED and
self.endpoints[endpoint]['failure_count'] >= self.failure_threshold):
logger.warning(f"Circuit OPEN for endpoint {endpoint} due to {self.failure_threshold} failures")
self.endpoints[endpoint]['state'] = self.OPEN
else:
self.failure_count += 1
self.last_failure_time = now
if self.state == self.CLOSED and self.failure_count >= self.failure_threshold:
logger.warning(f"Circuit OPEN due to {self.failure_threshold} failures")
self.state = self.OPEN
def allow_request(self, endpoint=None):
"""Check if a request should be allowed."""
if endpoint and endpoint in self.endpoints:
state = self.endpoints[endpoint]['state']
last_failure = self.endpoints[endpoint]['last_failure_time']
else:
state = self.state
last_failure = self.last_failure_time
if state == self.CLOSED:
return True
if state == self.OPEN:
now = time.time()
if last_failure and now - last_failure > self.recovery_timeout:
if endpoint:
logger.info(f"Circuit HALF_OPEN for endpoint {endpoint}, testing recovery")
self.endpoints[endpoint]['state'] = self.HALF_OPEN
else:
logger.info("Circuit HALF_OPEN, testing recovery")
self.state = self.HALF_OPEN
return True
return False
# HALF_OPEN state allows a single request to test recovery
return True
class RateLimitTracker:
"""
Tracks GitHub API rate limits and provides preemptive throttling.
Maintains separate tracking for different API endpoints with their
specific rate limits.
"""
def __init__(self):
"""Initialize a new rate limit tracker."""
# Default GitHub API rate limits
self.core_limit = 60 # Unauthenticated default
self.core_remaining = 60
self.core_reset = 0
self.search_limit = 10 # Search API has stricter limits
self.search_remaining = 10
self.search_reset = 0
# Endpoint-specific limits (some endpoints have their own quotas)
self.endpoint_limits = {}
# Track when we last checked limits
self.last_updated = 0
# Is the user authenticated?
self.authenticated = False
def update_from_headers(self, headers, endpoint=None):
"""Update rate limit information from API response headers."""
now = time.time()
self.last_updated = now
if 'X-RateLimit-Limit' in headers:
limit_type = 'search' if '/search/' in str(endpoint) else 'core'
if limit_type == 'core':
self.core_limit = int(headers.get('X-RateLimit-Limit', self.core_limit))
self.core_remaining = int(headers.get('X-RateLimit-Remaining', self.core_remaining))
self.core_reset = int(headers.get('X-RateLimit-Reset', self.core_reset))
# Update authentication status based on limits
self.authenticated = (self.core_limit > 60)
logger.debug(f"Rate limits updated: {self.core_remaining}/{self.core_limit}, " +
f"resets in {max(0, self.core_reset - now):.0f}s")
else:
self.search_limit = int(headers.get('X-RateLimit-Limit', self.search_limit))
self.search_remaining = int(headers.get('X-RateLimit-Remaining', self.search_remaining))
self.search_reset = int(headers.get('X-RateLimit-Reset', self.search_reset))
logger.debug(f"Search rate limits: {self.search_remaining}/{self.search_limit}, " +
f"resets in {max(0, self.search_reset - now):.0f}s")
# Store endpoint-specific limits if provided
if endpoint and 'X-RateLimit-Limit' in headers:
self.endpoint_limits[endpoint] = {
'limit': int(headers.get('X-RateLimit-Limit')),
'remaining': int(headers.get('X-RateLimit-Remaining')),
'reset': int(headers.get('X-RateLimit-Reset'))
}
async def wait_if_needed(self, endpoint=None):
"""Wait if we're approaching rate limits."""
now = time.time()
# Use search or core limits based on endpoint
is_search = endpoint and '/search/' in endpoint
if is_search:
remaining = self.search_remaining
limit = self.search_limit
reset_time = self.search_reset
else:
remaining = self.core_remaining
limit = self.core_limit
reset_time = self.core_reset
# Check endpoint-specific limits if available
if endpoint and endpoint in self.endpoint_limits:
remaining = self.endpoint_limits[endpoint]['remaining']
reset_time = self.endpoint_limits[endpoint]['reset']
# Implement preemptive throttling
# If we're below 10% of our quota, start adding delays
threshold = max(1, int(limit * 0.1))
if remaining <= threshold:
# If almost depleted, wait for reset
if remaining <= 2:
wait_time = max(0, reset_time - now) + 2 # Add buffer
logger.warning(f"Rate limit almost depleted ({remaining} remaining). " +
f"Waiting {wait_time:.0f}s for reset.")
await asyncio.sleep(wait_time)
return True
# Otherwise add progressive delays as we approach the limit
delay = (threshold - remaining + 1) / threshold * 2.0
logger.info(f"Preemptive throttling: {remaining}/{limit} requests remaining. " +
f"Adding {delay:.1f}s delay.")
await asyncio.sleep(delay)
return False
class GitHubAPIClient:
"""
Enhanced GitHub API client with robust error handling and connection management.
Features:
- Circuit breaker pattern for preventing repeated failures
- Advanced rate limit tracking and preemptive throttling
- Improved connection pooling and error handling
- DNS and network diagnostics
- Caching for repository metadata
"""
# Base URL for GitHub API
BASE_URL = "https://api.github.com"
# GitHub API version
API_VERSION = "2022-11-28"
def __init__(self, token=None, user_agent=None, cache_ttl=300):
"""
Initialize a new GitHub API client.
Args:
token: GitHub API token for authentication
user_agent: Custom User-Agent string
cache_ttl: Time-to-live for cached data in seconds
"""
self.token = token
self.user_agent = user_agent or self._generate_user_agent()
self.cache_ttl = cache_ttl
# Initialize circuit breaker
self.circuit_breaker = CircuitBreaker()
# Initialize rate limit tracker
self.rate_limiter = RateLimitTracker()
# Cache for repository data
self.repo_cache = {}
self.readme_cache = {}
# Session will be created when needed
self.session = None
# Track if we've shown DNS diagnostics already
self._dns_check_performed = False
def _generate_user_agent(self):
"""Generate a detailed User-Agent string with system information."""
system = platform.system()
release = platform.release()
python_version = platform.python_version()
try:
# Try to get hostname for better diagnosis of issues
hostname = socket.gethostname()
hostname = hostname.split('.')[0] # Truncate domain portion
except:
hostname = 'unknown'
return f"GitHub-Dataset-Analyzer/1.0 ({system}; {release}; Python/{python_version}) Host/{hostname}"
async def initialize(self):
"""Initialize the client and check connectivity."""
if self.session is None:
# Set up TCP connector with improved settings
tcp_connector = aiohttp.TCPConnector(
limit=30, # Limit concurrent connections
ttl_dns_cache=300, # DNS cache TTL in seconds
family=0, # Both IPv4 and IPv6
enable_cleanup_closed=True, # Clean up closed connections
force_close=False, # Keep-alive where possible
ssl=False # No SSL verification for public API
)
# Configure timeouts
timeout = aiohttp.ClientTimeout(
total=60, # Total request timeout
connect=20, # Connection timeout
sock_connect=20, # Socket connection timeout
sock_read=30 # Socket read timeout
)
# Headers for all requests
headers = {
'Accept': 'application/vnd.github.v3+json',
'User-Agent': self.user_agent,
'X-GitHub-Api-Version': self.API_VERSION
}
# Add authorization if token is provided
if self.token:
headers['Authorization'] = f'token {self.token}'
self.rate_limiter.authenticated = True
self.rate_limiter.core_limit = 5000 # Authenticated users get 5000 requests/hour
self.rate_limiter.core_remaining = 5000
# Create session
self.session = aiohttp.ClientSession(
headers=headers,
connector=tcp_connector,
timeout=timeout,
raise_for_status=False # We'll handle status checking ourselves
)
# Check connectivity and rate limits
await self._check_connectivity()
async def close(self):
"""Close the client session."""
if self.session:
await self.session.close()
self.session = None
async def _check_connectivity(self):
"""Verify GitHub API connectivity and check rate limits."""
try:
# Perform DNS check first
if not self._dns_check_performed:
self._dns_check_performed = True
self._check_dns_resolution()
# Get rate limit information
rate_limit = await self.fetch("/rate_limit")
if rate_limit:
limits = rate_limit.get('resources', {})
core = limits.get('core', {})
self.rate_limiter.core_limit = core.get('limit', 60)
self.rate_limiter.core_remaining = core.get('remaining', 60)
self.rate_limiter.core_reset = core.get('reset', int(time.time() + 3600))
search = limits.get('search', {})
self.rate_limiter.search_limit = search.get('limit', 10)
self.rate_limiter.search_remaining = search.get('remaining', 10)
self.rate_limiter.search_reset = search.get('reset', int(time.time() + 3600))
auth_status = "authenticated" if self.rate_limiter.authenticated else "unauthenticated"
logger.info(f"GitHub API connected ({auth_status}): " +
f"{self.rate_limiter.core_remaining}/{self.rate_limiter.core_limit} " +
f"requests remaining, resets in " +
f"{max(0, self.rate_limiter.core_reset - time.time()):.0f}s")
return True
return False
except Exception as e:
logger.error(f"GitHub API connectivity check failed: {str(e)}")
return False
def _check_dns_resolution(self):
"""Check DNS resolution for GitHub API endpoints."""
try:
logger.info("Checking DNS resolution for GitHub API...")
github_ip = socket.gethostbyname('api.github.com')
github_raw_ip = socket.gethostbyname('raw.githubusercontent.com')
logger.info(f"DNS Resolution: api.github.com -> {github_ip}")
logger.info(f"DNS Resolution: raw.githubusercontent.com -> {github_raw_ip}")
# Try to check connection to GitHub on port 443
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(2)
result = s.connect_ex((github_ip, 443))
s.close()
if result == 0:
logger.info("Port 443 (HTTPS) is open on api.github.com")
else:
logger.warning(f"Port 443 check on api.github.com failed with code: {result}")
return True
except socket.gaierror:
logger.error("DNS resolution failed for GitHub API. Check your internet connection or DNS settings.")
return False
except Exception as e:
logger.error(f"Network diagnostic error: {str(e)}")
return False
async def fetch(self, endpoint, headers=None, check_cache=True, ignore_circuit_breaker=False):
"""
Fetch data from GitHub API with enhanced error handling.
Args:
endpoint: API endpoint (with or without leading slash)
headers: Additional headers for the request
check_cache: Whether to check cache before making request
ignore_circuit_breaker: Whether to bypass circuit breaker
Returns:
Parsed JSON response or None on failure
"""
# Ensure session is initialized
if not self.session:
await self.initialize()
# Format the endpoint
if not endpoint.startswith('/'):
endpoint = f"/{endpoint}"
url = f"{self.BASE_URL}{endpoint}"
# Check if endpoint is blocked by circuit breaker
if not ignore_circuit_breaker and not self.circuit_breaker.allow_request(endpoint):
logger.warning(f"Request to {endpoint} blocked by circuit breaker")
return None
# Wait if we're approaching rate limits
await self.rate_limiter.wait_if_needed(endpoint)
# Add jitter to avoid thundering herd problem
await asyncio.sleep(random.uniform(0.1, 0.5))
# Make the request with retries
max_retries = 3
backoff_factor = 1.5
for attempt in range(max_retries):
try:
async with self.session.get(url, headers=headers) as response:
# Update rate limit information
self.rate_limiter.update_from_headers(response.headers, endpoint)
# Check for rate limiting
if response.status == 403:
if 'X-RateLimit-Remaining' in response.headers:
remaining = int(response.headers.get('X-RateLimit-Remaining', 0))
if remaining == 0:
reset_time = int(response.headers.get('X-RateLimit-Reset', 0))
wait_time = max(0, reset_time - time.time()) + 2 # Add buffer
logger.warning(f"Rate limit exceeded. Waiting {wait_time:.0f} seconds...")
await asyncio.sleep(wait_time)
continue # Retry after waiting
# If it's a 403 but not rate limiting, might be authentication issue
logger.error(f"GitHub API returned 403 Forbidden for {url}: "
f"{await response.text()}")
self.circuit_breaker.record_failure(endpoint)
return None
# Handle other error status codes
if response.status >= 400:
error_text = await response.text()
logger.error(f"GitHub API error: {response.status} for {url}: {error_text}")
# Only retry on specific status codes
if response.status in [429, 500, 502, 503, 504]:
wait_time = (backoff_factor ** attempt) * 2
logger.info(f"Retrying in {wait_time:.1f} seconds (attempt {attempt+1}/{max_retries})...")
await asyncio.sleep(wait_time)
continue
self.circuit_breaker.record_failure(endpoint)
return None
# Success path
try:
data = await response.json()
self.circuit_breaker.record_success(endpoint)
return data
except ContentTypeError:
# Not JSON, return text
text = await response.text()
self.circuit_breaker.record_success(endpoint)
return text
except Exception as e:
logger.error(f"Error parsing response from {url}: {str(e)}")
self.circuit_breaker.record_failure(endpoint)
return None
except (ClientConnectorError, ServerDisconnectedError) as e:
logger.error(f"Connection error for {url} (attempt {attempt+1}/{max_retries}): {str(e)}")
# Check for DNS resolution issues and provide guidance
if isinstance(e, ClientConnectorError) and "getaddrinfo failed" in str(e):
logger.error("DNS resolution failed. Please check your internet connection.")
if not self._dns_check_performed:
self._dns_check_performed = True
self._check_dns_resolution()
# Add increasing backoff between retries
if attempt < max_retries - 1:
wait_time = (backoff_factor ** attempt) * 2
logger.info(f"Retrying in {wait_time:.1f} seconds...")
await asyncio.sleep(wait_time)
else:
self.circuit_breaker.record_failure(endpoint)
return None
except ClientOSError as e:
logger.error(f"OS error for {url}: {str(e)}")
if "Connection reset by peer" in str(e):
logger.error("Connection reset by GitHub. This may indicate rate limiting or blocked connections.")
if attempt < max_retries - 1:
wait_time = (backoff_factor ** attempt) * 3 # Longer wait for OS errors
logger.info(f"Retrying in {wait_time:.1f} seconds...")
await asyncio.sleep(wait_time)
else:
self.circuit_breaker.record_failure(endpoint)
return None
except ClientPayloadError as e:
logger.error(f"Payload error for {url}: {str(e)}")
logger.error("GitHub closed the connection before complete response was received.")
if attempt < max_retries - 1:
wait_time = (backoff_factor ** attempt) * 3
logger.info(f"Retrying in {wait_time:.1f} seconds...")
await asyncio.sleep(wait_time)
else:
self.circuit_breaker.record_failure(endpoint)
return None
except asyncio.TimeoutError:
logger.error(f"Timeout accessing {url} (attempt {attempt+1}/{max_retries})")
if attempt < max_retries - 1:
wait_time = (backoff_factor ** attempt) * 2
logger.info(f"Retrying in {wait_time:.1f} seconds...")
await asyncio.sleep(wait_time)
else:
self.circuit_breaker.record_failure(endpoint)
return None
except Exception as e:
logger.error(f"Unexpected error accessing {url}: {str(e)}")
logger.debug(f"Stack trace: {traceback.format_exc()}")
if attempt < max_retries - 1:
wait_time = (backoff_factor ** attempt) * 2
logger.info(f"Retrying in {wait_time:.1f} seconds...")
await asyncio.sleep(wait_time)
else:
self.circuit_breaker.record_failure(endpoint)
return None
# If we get here, all retries failed
logger.error(f"All retries failed for {url}")
self.circuit_breaker.record_failure(endpoint)
return None
async def fetch_raw(self, url, headers=None):
"""
Fetch raw content from GitHub or any URL with enhanced error handling.
Args:
url: Full URL to fetch
headers: Additional headers for the request
Returns:
Raw text content or None on failure
"""
# Ensure session is initialized
if not self.session:
await self.initialize()
# Add jitter to avoid thundering herd problem
await asyncio.sleep(random.uniform(0.1, 0.3))
# Make the request with retries
max_retries = 3
backoff_factor = 1.5
for attempt in range(max_retries):
try:
async with self.session.get(url, headers=headers) as response:
# Check for rate limiting if it's GitHub
if 'github.com' in url and response.status == 403:
if 'X-RateLimit-Remaining' in response.headers:
self.rate_limiter.update_from_headers(response.headers)
remaining = int(response.headers.get('X-RateLimit-Remaining', 0))
if remaining == 0:
reset_time = int(response.headers.get('X-RateLimit-Reset', 0))
wait_time = max(0, reset_time - time.time()) + 2 # Add buffer
logger.warning(f"Rate limit exceeded. Waiting {wait_time:.0f} seconds...")
await asyncio.sleep(wait_time)
continue # Retry after waiting
# Handle error status codes
if response.status >= 400:
logger.error(f"HTTP error {response.status} for {url}")
# Only retry on specific status codes
if response.status in [429, 500, 502, 503, 504]:
wait_time = (backoff_factor ** attempt) * 2
logger.info(f"Retrying in {wait_time:.1f} seconds (attempt {attempt+1}/{max_retries})...")
await asyncio.sleep(wait_time)
continue
return None
# Success path
try:
return await response.text()
except Exception as e:
logger.error(f"Error reading response from {url}: {str(e)}")
return None
except (ClientConnectorError, ServerDisconnectedError) as e:
logger.error(f"Connection error for {url} (attempt {attempt+1}/{max_retries}): {str(e)}")
if attempt < max_retries - 1:
wait_time = (backoff_factor ** attempt) * 2
logger.info(f"Retrying in {wait_time:.1f} seconds...")
await asyncio.sleep(wait_time)
else:
return None
except ClientOSError as e:
logger.error(f"OS error for {url}: {str(e)}")
if attempt < max_retries - 1:
wait_time = (backoff_factor ** attempt) * 3 # Longer wait for OS errors
logger.info(f"Retrying in {wait_time:.1f} seconds...")
await asyncio.sleep(wait_time)
else:
return None
except ClientPayloadError as e:
logger.error(f"Payload error for {url}: {str(e)}")
logger.error("Connection closed before complete response was received.")
if attempt < max_retries - 1:
wait_time = (backoff_factor ** attempt) * 3
logger.info(f"Retrying in {wait_time:.1f} seconds...")
await asyncio.sleep(wait_time)
else:
return None
except asyncio.TimeoutError:
logger.error(f"Timeout accessing {url} (attempt {attempt+1}/{max_retries})")
if attempt < max_retries - 1:
wait_time = (backoff_factor ** attempt) * 2
logger.info(f"Retrying in {wait_time:.1f} seconds...")
await asyncio.sleep(wait_time)
else:
return None
except Exception as e:
logger.error(f"Unexpected error accessing {url}: {str(e)}")
if attempt < max_retries - 1:
wait_time = (backoff_factor ** attempt) * 2
logger.info(f"Retrying in {wait_time:.1f} seconds...")
await asyncio.sleep(wait_time)
else:
return None
# If we get here, all retries failed
logger.error(f"All retries failed for {url}")
return None
async def get_repository_data(self, owner_repo):
"""
Get repository data with caching.
Args:
owner_repo: Repository identifier in format "owner/repo"
Returns:
Repository data including star count or None on failure
"""
# Check cache first
cache_key = f"repo:{owner_repo}"
now = time.time()
if cache_key in self.repo_cache:
cached = self.repo_cache[cache_key]
if cached['expires'] > now:
logger.debug(f"Using cached data for {owner_repo}")
return cached['data']
# Fetch repository data
endpoint = f"/repos/{owner_repo}"
repo_data = await self.fetch(endpoint)
if repo_data:
# Cache the result
self.repo_cache[cache_key] = {
'data': repo_data,
'expires': now + self.cache_ttl
}
return repo_data
return None
async def get_repository_readme(self, owner_repo):
"""
Get repository README content with caching.
Args:
owner_repo: Repository identifier in format "owner/repo"
Returns:
README content as string or None on failure
"""
# Check cache first
cache_key = f"readme:{owner_repo}"
now = time.time()
if cache_key in self.readme_cache:
cached = self.readme_cache[cache_key]
if cached['expires'] > now:
logger.debug(f"Using cached README for {owner_repo}")
return cached['data']
# Fetch README data
endpoint = f"/repos/{owner_repo}/readme"
headers = {"Accept": "application/vnd.github.v3.raw"}
readme = await self.fetch(endpoint, headers=headers)
if isinstance(readme, str):
# Cache the result
self.readme_cache[cache_key] = {
'data': readme,
'expires': now + self.cache_ttl
}
return readme
return None
async def get_repository_contents(self, owner_repo, path=""):
"""
Get contents of a repository directory.
Args:
owner_repo: Repository identifier in format "owner/repo"
path: Optional path within the repository
Returns:
List of file information or None on failure
"""
endpoint = f"/repos/{owner_repo}/contents/{path}".rstrip('/')
contents = await self.fetch(endpoint)
if isinstance(contents, list):
return contents
return None
async def analyze_repository(self, github_url):
"""
Analyze a GitHub repository for database format and star count.
This method combines several API calls to gather repository information
about a GitHub repository to extract its database format and star count.
Args:
github_url: URL to the GitHub repository
Returns:
Dictionary with format and stars information
"""
try:
# Parse repository owner and name from URL
parsed_url = urlparse(github_url)
path_parts = parsed_url.path.strip('/').split('/')
if len(path_parts) < 2:
logger.warning(f"Invalid GitHub URL format: {github_url}")
return {"format": "Unknown", "stars": "N/A"}
owner_repo = '/'.join(path_parts[:2])
logger.info(f"Analyzing repository: {owner_repo}")
# Get repository data and README in parallel
repo_data, readme = await asyncio.gather(
self.get_repository_data(owner_repo),
self.get_repository_readme(owner_repo),
return_exceptions=True
)
# Process repo data for star count
star_count = "N/A"
if not isinstance(repo_data, Exception) and repo_data:
if "stargazers_count" in repo_data:
star_count = repo_data["stargazers_count"]
logger.debug(f"Found {star_count} stars for {owner_repo}")
elif isinstance(repo_data, Exception):
logger.error(f"Error fetching repo data for {owner_repo}: {str(repo_data)}")
# Determine format from README
db_format = "Unknown"
if not isinstance(readme, Exception) and readme:
db_format = self._identify_database_format(readme)
logger.debug(f"Identified format from README: {db_format}")
elif isinstance(readme, Exception):
logger.error(f"Error fetching README for {owner_repo}: {str(readme)}")
# If README didn't reveal format, try repository contents
if db_format == "Unknown" and not isinstance(repo_data, Exception) and repo_data:
try:
contents = await self.get_repository_contents(owner_repo)
if contents:
file_list = [file["name"].lower() for file in contents
if isinstance(file, dict) and "name" in file]
db_format = self._identify_format_from_files(file_list)
logger.debug(f"Identified format from files: {db_format}")
except Exception as e:
logger.error(f"Error analyzing contents for {owner_repo}: {str(e)}")
return {"format": db_format, "stars": star_count}
except Exception as e:
logger.error(f"Unexpected error analyzing {github_url}: {str(e)}")
return {"format": "Unknown", "stars": "N/A"}
def _identify_database_format(self, content):
"""Identify database format from repository content."""
if not content:
return "Unknown"
content = content.lower()
# Define format patterns and corresponding database formats
format_patterns = {
"CSV": ["csv", "comma separated", "comma-separated"],
"JSON": ["json", "javascript object notation"],
"XML": ["xml", "extensible markup"],
"SQL": ["sql", "mysql", "postgresql", "sqlite"],
"Parquet": ["parquet", "apache parquet"],
"Avro": ["avro", "apache avro"],
"Excel": ["xlsx", "excel", ".xls"],
"HDF5": ["hdf5", "hierarchical data format"],
"NoSQL": ["mongodb", "dynamodb", "couchdb", "nosql"],
"TSV": ["tsv", "tab separated", "tab-separated"],
"YAML": ["yaml", "yml"],
"Plain Text": ["txt", "text file", "plain text"],
}
# Check for each format pattern in the content
detected_formats = []
for format_name, patterns in format_patterns.items():
if any(pattern in content for pattern in patterns):
detected_formats.append(format_name)
return ", ".join(detected_formats) if detected_formats else "Unknown"
def _identify_format_from_files(self, file_list):
"""Identify database format from repository file list."""
if not file_list:
return "Unknown"
extensions = [file.split('.')[-1] for file in file_list if '.' in file]
extension_counts = {}
for ext in extensions:
if ext.lower() in ['md', 'rst', 'txt', 'git', 'gitignore', 'py', 'js', 'html', 'css', 'sh', 'bat']:
continue # Skip non-data files
extension_counts[ext.lower()] = extension_counts.get(ext.lower(), 0) + 1
# Get the most common extensions
if extension_counts:
sorted_exts = sorted(extension_counts.items(), key=lambda x: x[1], reverse=True)
top_exts = [ext for ext, count in sorted_exts[:3]]
return ", ".join(top_exts)
return "Unknown"