From 9f50f432d2d77e3a10b339ba9da63845abca55a3 Mon Sep 17 00:00:00 2001 From: ITheEqualizer Date: Sat, 11 Apr 2026 17:55:10 +0330 Subject: [PATCH] refactor: improve scraper Python code quality Replace all print() calls with proper logging module usage in both scraper.py and generator.py for better log level control and structured output in Docker containers. Pre-compile regex patterns as module-level constants instead of recompiling on every function call. Remove dead code: unused _extract_return_type method, unused subtype_match variable, and unused current_section variable. Cache process_types/process_methods results in generate_all to avoid redundant processing passes. Co-Authored-By: Claude Opus 4.6 (1M context) --- scraper/src/generator.py | 116 ++++++++++++++++---------------- scraper/src/scraper.py | 138 ++++++++++++++++++--------------------- 2 files changed, 121 insertions(+), 133 deletions(-) diff --git a/scraper/src/generator.py b/scraper/src/generator.py index 71ef299..ef9aa6a 100644 --- a/scraper/src/generator.py +++ b/scraper/src/generator.py @@ -5,11 +5,15 @@ """ import json +import logging import os import re from pathlib import Path + from jinja2 import Environment, BaseLoader +logger = logging.getLogger(__name__) + # Type mappings RUST_TYPE_MAP = { @@ -39,13 +43,20 @@ 'true', 'type', 'unsafe', 'use', 'where', 'while', 'async', 'await', 'dyn', } +_RE_WHITESPACE = re.compile(r'\s+') +_RE_ARRAY_OF = re.compile(r'Array\s*of\s*', re.IGNORECASE) +_RE_ARRAY_ANGLE = re.compile(r'Array\s*<\s*(.+?)\s*>') +_RE_WORD_TOKEN = re.compile(r'[A-Za-z_][A-Za-z0-9_]*') +_RE_GLUED_OR = re.compile(r'(.+?)or\s*([A-Z].+)') +_RE_GLUED_AND = re.compile(r'(.+?)and\s*([A-Z].+)') + CUSTOM_TYPE_NAMES: set[str] = set() def extract_custom_type_tokens(type_str: str) -> set[str]: """Extract referenced custom Telegram type names from a raw type expression.""" normalized = normalize_type_expr(type_str) - tokens = set(re.findall(r'[A-Za-z_][A-Za-z0-9_]*', normalized)) + tokens = set(_RE_WORD_TOKEN.findall(normalized)) return {token for token in tokens if token in CUSTOM_TYPE_NAMES} @@ -75,10 +86,10 @@ def normalize_type_expr(type_str: str) -> str: if not s: return 'serde_json::Value' - s = re.sub(r'\s+', ' ', s) - s = re.sub(r'Array\s*of\s*', 'Array of ', s, flags=re.IGNORECASE) + s = _RE_WHITESPACE.sub(' ', s) + s = _RE_ARRAY_OF.sub('Array of ', s) - return re.sub(r'\s+', ' ', s).strip() + return _RE_WHITESPACE.sub(' ', s).strip() def is_known_atomic_type(token: str) -> bool: @@ -107,9 +118,8 @@ def split_union_parts(type_str: str) -> list[str]: if len(parts) > 1: return parts - # Handle glued unions like InputFileor String and AandB. - for connector in ('or', 'and'): - glued = re.fullmatch(rf'(.+?){connector}\s*([A-Z].+)', type_str) + for pattern in (_RE_GLUED_OR, _RE_GLUED_AND): + glued = pattern.fullmatch(type_str) if not glued: continue @@ -135,7 +145,7 @@ def parse_type(type_str: str, lang: str = 'rust', ts_use_namespace: bool = False return array_wrapper.format(inner_type) # Handle "Array" as an alternative notation. - array_match = re.fullmatch(r'Array\s*<\s*(.+?)\s*>', type_str) + array_match = _RE_ARRAY_ANGLE.fullmatch(type_str) if array_match: inner = array_match.group(1).strip() inner_type = parse_type(inner, lang, ts_use_namespace) @@ -301,8 +311,8 @@ def load_data(self) -> None: data = json.load(f) self.methods_data = data['methods'] - print(f"šŸ“‚ Loaded {len(self.types_data)} types") - print(f"šŸ“‚ Loaded {len(self.methods_data)} methods") + logger.info("Loaded %d types", len(self.types_data)) + logger.info("Loaded %d methods", len(self.methods_data)) def process_types(self) -> list: """Process types for code generation.""" @@ -392,66 +402,56 @@ def process_methods(self) -> list: return processed - def generate_rust_types(self, output_file: str) -> None: - """Generate Rust types file.""" - types = self.process_types() - template = self.env.from_string(RUST_TYPES_TEMPLATE) - output = template.render(types=types) - - os.makedirs(os.path.dirname(output_file), exist_ok=True) - with open(output_file, 'w', encoding='utf-8') as f: - f.write(output) - - print(f"āœ… Generated Rust types: {output_file} ({len(types)} types)") - - def generate_rust_methods(self, output_file: str) -> None: - """Generate Rust methods file.""" - methods = self.process_methods() - template = self.env.from_string(RUST_METHODS_TEMPLATE) - output = template.render(methods=methods) + def _render_to_file( + self, template_str: str, context: dict, output_file: str, + ) -> None: + """Render a Jinja2 template to an output file.""" + template = self.env.from_string(template_str) + output = template.render(**context) os.makedirs(os.path.dirname(output_file), exist_ok=True) with open(output_file, 'w', encoding='utf-8') as f: f.write(output) - print(f"āœ… Generated Rust methods: {output_file} ({len(methods)} methods)") + def generate_all( + self, rust_output_dir: str, ts_output_dir: str, + ) -> None: + """Generate all code files. - def generate_ts_types(self, output_file: str) -> None: - """Generate TypeScript types file.""" + Processes types and methods once, then renders all four + output files (Rust types/methods, TypeScript types/methods). + """ types = self.process_types() - template = self.env.from_string(TS_TYPES_TEMPLATE) - output = template.render(types=types) - - os.makedirs(os.path.dirname(output_file), exist_ok=True) - with open(output_file, 'w', encoding='utf-8') as f: - f.write(output) - - print(f"āœ… Generated TypeScript types: {output_file} ({len(types)} types)") - - def generate_ts_methods(self, output_file: str) -> None: - """Generate TypeScript methods file.""" methods = self.process_methods() - template = self.env.from_string(TS_METHODS_TEMPLATE) - output = template.render(methods=methods) - - os.makedirs(os.path.dirname(output_file), exist_ok=True) - with open(output_file, 'w', encoding='utf-8') as f: - f.write(output) - print(f"āœ… Generated TypeScript methods: {output_file} ({len(methods)} methods)") - - def generate_all(self, rust_output_dir: str, ts_output_dir: str) -> None: - """Generate all code files.""" - self.generate_rust_types(os.path.join(rust_output_dir, 'types.rs')) - self.generate_rust_methods(os.path.join(rust_output_dir, 'methods.rs')) - self.generate_ts_types(os.path.join(ts_output_dir, 'types.ts')) - self.generate_ts_methods(os.path.join(ts_output_dir, 'methods.ts')) + outputs = [ + (RUST_TYPES_TEMPLATE, {'types': types}, + os.path.join(rust_output_dir, 'types.rs'), + f"{len(types)} types"), + (RUST_METHODS_TEMPLATE, {'methods': methods}, + os.path.join(rust_output_dir, 'methods.rs'), + f"{len(methods)} methods"), + (TS_TYPES_TEMPLATE, {'types': types}, + os.path.join(ts_output_dir, 'types.ts'), + f"{len(types)} types"), + (TS_METHODS_TEMPLATE, {'methods': methods}, + os.path.join(ts_output_dir, 'methods.ts'), + f"{len(methods)} methods"), + ] + + for template_str, context, path, desc in outputs: + self._render_to_file(template_str, context, path) + logger.info("Generated %s: %s", desc, path) def main(): """Main entry point.""" - print("šŸš€ Simula - Code Generator") - print("=" * 50) + logging.basicConfig( + level=logging.INFO, + format="%(levelname)s: %(message)s", + ) + + logger.info("Simula - Code Generator") script_dir = Path(__file__).resolve().parent scraper_root = script_dir.parent @@ -482,7 +482,7 @@ def main(): ts_output_dir=str(ts_output_dir) ) - print("\nāœ… Code generation completed!") + logger.info("Code generation completed!") if __name__ == "__main__": diff --git a/scraper/src/scraper.py b/scraper/src/scraper.py index c4a809d..2df3ed5 100644 --- a/scraper/src/scraper.py +++ b/scraper/src/scraper.py @@ -10,12 +10,35 @@ """ import json -import re +import logging import os +import re +from dataclasses import asdict, dataclass, field from typing import Optional -from dataclasses import dataclass, asdict, field -from bs4 import BeautifulSoup, Tag + import requests +from bs4 import BeautifulSoup, Tag + +logger = logging.getLogger(__name__) + +# Pre-compiled regex patterns for return type extraction +_RE_RETURNS_TRUE = re.compile(r'Returns?\s+True\b', re.IGNORECASE) +_RE_ARRAY_OF_LINK = re.compile( + r'[Aa]rray\s+of\s+]*>([A-Z][a-zA-Z]+)', +) +_RE_RETURNS_LINK = re.compile( + r'[Rr]eturns?[^<]*]*>([A-Z][a-zA-Z]+)', +) +_RE_SENT_LINK = re.compile( + r'the\s+sent\s+]*>([A-Z][a-zA-Z]+)', +) +_RE_SUCCESS_LINK = re.compile( + r'[Oo]n\s+success[^<]*]*>([A-Z][a-zA-Z]+)', +) +_RE_FORM_OF_LINK = re.compile( + r'form\s+of\s+(?:a|an)\s+]*>([A-Z][a-zA-Z]+)', +) +_RE_RETURN_OR_SUCCESS = re.compile(r'[Rr]eturn|success') # Data structures @@ -68,11 +91,11 @@ def fetch_page(self) -> str: """Fetch the API documentation page.""" # Check cache first if self.use_cache and os.path.exists(self.CACHE_FILE): - print(f"šŸ“‚ Loading from cache: {self.CACHE_FILE}") + logger.info("Loading from cache: %s", self.CACHE_FILE) with open(self.CACHE_FILE, 'r', encoding='utf-8') as f: return f.read() - print(f"🌐 Fetching: {self.API_URL}") + logger.info("Fetching: %s", self.API_URL) headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } @@ -83,7 +106,7 @@ def fetch_page(self) -> str: os.makedirs("output", exist_ok=True) with open(self.CACHE_FILE, 'w', encoding='utf-8') as f: f.write(response.text) - print(f"šŸ’¾ Cached to: {self.CACHE_FILE}") + logger.info("Cached to: %s", self.CACHE_FILE) return response.text @@ -97,10 +120,9 @@ def parse(self, html: str) -> None: raise ValueError("Could not find main content div") # Process all h4 elements (methods and types are defined under h4) - current_section = None h4_elements = content.find_all('h4') - print(f"šŸ“ Found {len(h4_elements)} h4 elements") + logger.info("Found %d h4 elements", len(h4_elements)) for h4 in h4_elements: name = h4.get_text(strip=True) @@ -120,8 +142,8 @@ def parse(self, html: str) -> None: if telegram_type: self.types.append(telegram_type) - print(f"āœ… Parsed {len(self.methods)} methods") - print(f"āœ… Parsed {len(self.types)} types") + logger.info("Parsed %d methods", len(self.methods)) + logger.info("Parsed %d types", len(self.types)) def _parse_method(self, h4: Tag) -> Optional[Method]: """Parse a method definition.""" @@ -171,7 +193,7 @@ def _extract_return_type_from_element(self, element: Tag) -> Optional[str]: text = element.get_text() # Check for True/Boolean return first - if re.search(r'Returns?\s+True\b', text, re.IGNORECASE): + if _RE_RETURNS_TRUE.search(text): return "Boolean" # Find all links in the element @@ -189,61 +211,33 @@ def _extract_return_type_from_element(self, element: Tag) -> Optional[str]: html = str(element) # Check for "Array of X" pattern - array_match = re.search(r'[Aa]rray\s+of\s+]*>([A-Z][a-zA-Z]+)', html) + array_match = _RE_ARRAY_OF_LINK.search(html) if array_match: return f"Array<{array_match.group(1)}>" # Check for "Returns ... Type" pattern - returns_link_match = re.search( - r'[Rr]eturns?[^<]*]*>([A-Z][a-zA-Z]+)', - html - ) + returns_link_match = _RE_RETURNS_LINK.search(html) if returns_link_match: return returns_link_match.group(1) # Check for "the sent Message" pattern - sent_match = re.search( - r'the\s+sent\s+]*>([A-Z][a-zA-Z]+)', - html - ) + sent_match = _RE_SENT_LINK.search(html) if sent_match: return sent_match.group(1) # Check for "On success, Type is returned" - success_match = re.search( - r'[Oo]n\s+success[^<]*]*>([A-Z][a-zA-Z]+)', - html - ) + success_match = _RE_SUCCESS_LINK.search(html) if success_match: return success_match.group(1) # Check for "form of a Type" - form_match = re.search( - r'form\s+of\s+(?:a|an)\s+]*>([A-Z][a-zA-Z]+)', - html - ) + form_match = _RE_FORM_OF_LINK.search(html) if form_match: return form_match.group(1) # If we have type links and the text mentions returns/success - if type_links and re.search(r'[Rr]eturn|success', text): - return type_links[-1] # Usually the return type is last mentioned - - return None - - def _extract_return_type(self, text: str) -> Optional[str]: - """Extract return type from plain text (fallback).""" - # Check for True/Boolean return first - if re.search(r'Returns?\s+True\b', text, re.IGNORECASE): - return "Boolean" - - # Pattern for "Array of X" - array_match = re.search( - r'(?:Returns?|returns)\s+(?:an?\s+)?[Aa]rray\s+of\s+([A-Z][a-zA-Z]+)', - text - ) - if array_match: - return f"Array<{array_match.group(1)}>" + if type_links and _RE_RETURN_OR_SUCCESS.search(text): + return type_links[-1] # Usually the return type is last return None @@ -267,12 +261,6 @@ def _parse_type(self, h4: Tag) -> Optional[TelegramType]: text = sibling.get_text(strip=True) description_parts.append(text) - # Check for inheritance - subtype_match = re.search( - r'(?:This object represents|Describes)\s+(?:a|an|the)?\s*([a-z\s]+)', - text, re.IGNORECASE - ) - elif sibling.name == 'ul': # Sometimes types have a list of subtypes pass @@ -383,13 +371,13 @@ def save_output(self, output_dir: str = "output") -> None: methods_file = os.path.join(output_dir, "methods.json") with open(methods_file, 'w', encoding='utf-8') as f: json.dump(methods_data, f, indent=2, ensure_ascii=False) - print(f"šŸ’¾ Saved {len(self.methods)} methods to {methods_file}") + logger.info("Saved %d methods to %s", len(self.methods), methods_file) # Save types types_file = os.path.join(output_dir, "types.json") with open(types_file, 'w', encoding='utf-8') as f: json.dump(types_data, f, indent=2, ensure_ascii=False) - print(f"šŸ’¾ Saved {len(self.types)} types to {types_file}") + logger.info("Saved %d types to %s", len(self.types), types_file) def _method_to_dict(self, method: Method) -> dict: """Convert Method to dictionary.""" @@ -410,33 +398,33 @@ def _type_to_dict(self, t: TelegramType) -> dict: "properties": [asdict(p) for p in t.properties] } - def print_summary(self) -> None: - """Print a summary of scraped data.""" - print("\n" + "="*60) - print("šŸ“Š SCRAPING SUMMARY") - print("="*60) + def log_summary(self) -> None: + """Log a summary of scraped data.""" + logger.info("SCRAPING SUMMARY") - print(f"\nšŸ“Œ Methods ({len(self.methods)}):") + logger.info("Methods (%d):", len(self.methods)) for m in self.methods[:10]: params = len(m.parameters) - print(f" • {m.name}({params} params) -> {m.return_type}") + logger.info(" %s(%d params) -> %s", m.name, params, m.return_type) if len(self.methods) > 10: - print(f" ... and {len(self.methods) - 10} more") + logger.info(" ... and %d more", len(self.methods) - 10) - print(f"\nšŸ“Œ Types ({len(self.types)}):") + logger.info("Types (%d):", len(self.types)) for t in self.types[:10]: props = len(t.properties) - print(f" • {t.name} ({props} properties)") + logger.info(" %s (%d properties)", t.name, props) if len(self.types) > 10: - print(f" ... and {len(self.types) - 10} more") - - print("\n" + "="*60) + logger.info(" ... and %d more", len(self.types) - 10) def main(): """Main entry point.""" - print("šŸš€ Simula - Telegram Bot API Scraper") - print("="*50) + logging.basicConfig( + level=logging.INFO, + format="%(levelname)s: %(message)s", + ) + + logger.info("Simula - Telegram Bot API Scraper") # Check if we should use cache (for offline development) use_cache = os.environ.get('USE_CACHE', '').lower() == 'true' @@ -451,17 +439,17 @@ def main(): # Save output scraper.save_output() - # Print summary - scraper.print_summary() + # Log summary + scraper.log_summary() - print("\nāœ… Scraping completed successfully!") + logger.info("Scraping completed successfully!") except requests.RequestException as e: - print(f"āŒ Network error: {e}") - print("šŸ’” Tip: Set USE_CACHE=true to use cached HTML") + logger.error("Network error: %s", e) + logger.info("Tip: Set USE_CACHE=true to use cached HTML") return 1 except Exception as e: - print(f"āŒ Error: {e}") + logger.error("Error: %s", e) raise return 0