-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
128 lines (102 loc) · 3.54 KB
/
Copy pathutils.py
File metadata and controls
128 lines (102 loc) · 3.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""OSIRIS Utils — Helpers partagés (retry, extraction de domaine).
Fournit des utilitaires réutilisables par tous les axes et le scanner.
"""
from __future__ import annotations
import asyncio
import functools
import logging
from collections.abc import Callable
from typing import Any, TypeVar
from urllib.parse import urlparse
logger = logging.getLogger("osiris")
F = TypeVar("F", bound=Callable[..., Any])
def async_retry(
max_retries: int = 3,
backoff: float = 2.0,
retry_on: tuple[type[Exception], ...] = (RuntimeError,),
) -> Callable[[F], F]:
"""Décorateur de retry avec backoff exponentiel pour coroutines async.
Args:
max_retries: Nombre maximum de tentatives (incluant la première).
backoff: Facteur multiplicatif du délai entre chaque retry.
retry_on: Types d'exceptions déclenchant un retry.
Returns:
Décorateur applicable à une coroutine async.
"""
def decorator(func: F) -> F:
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
last_exception: Exception | None = None
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except retry_on as e:
last_exception = e
if attempt < max_retries - 1:
delay = backoff**attempt
logger.debug(
"Retry %d/%d pour %s après %.1fs : %s",
attempt + 1,
max_retries,
func.__name__,
delay,
e,
)
await asyncio.sleep(delay)
raise last_exception # type: ignore[misc]
return wrapper # type: ignore[return-value]
return decorator
def extract_domain(url: str) -> str:
"""Extrait le domaine d'une URL de manière robuste.
Gère les TLD composés (.co.uk, .com.au), les ports, les sous-domaines.
Utilise uniquement la stdlib (urllib.parse).
Args:
url: URL complète (ex: https://sub.example.co.uk:8080/path).
Returns:
Domaine sans protocole ni port (ex: sub.example.co.uk).
Retourne "unknown" si l'extraction échoue.
"""
parsed = urlparse(url)
return parsed.hostname or "unknown"
def extract_root_domain(url: str) -> str:
"""Extrait le domaine racine (sans sous-domaine) d'une URL.
Heuristique stdlib : prend les 2 derniers segments du hostname.
Gère les TLD composés courants (.co.uk, .com.au, .org.uk, etc.).
Args:
url: URL complète ou domaine brut.
Returns:
Domaine racine (ex: 'example.com' pour 'sub.example.com').
"""
# Si c'est déjà un domaine (pas de ://), on le wrappe
if "://" not in url:
url = f"https://{url}"
hostname = extract_domain(url)
parts = hostname.split(".")
# TLD composés courants (2 parties)
compound_tlds = {
"co.uk",
"com.au",
"com.br",
"co.nz",
"co.za",
"co.in",
"org.uk",
"net.au",
"co.jp",
"or.jp",
"ne.jp",
"ac.uk",
"gov.uk",
"com.mx",
"com.ar",
"com.co",
"co.kr",
"go.kr",
}
if len(parts) >= 3:
potential_tld = ".".join(parts[-2:])
if potential_tld in compound_tlds:
return ".".join(parts[-3:])
if len(parts) >= 2:
return ".".join(parts[-2:])
return hostname