diff --git a/scraper/src/generator.py b/scraper/src/generator.py index 71ef299..ef9aa6a 100644 --- a/scraper/src/generator.py +++ b/scraper/src/generator.py @@ -5,11 +5,15 @@ """ import json +import logging import os import re from pathlib import Path + from jinja2 import Environment, BaseLoader +logger = logging.getLogger(__name__) + # Type mappings RUST_TYPE_MAP = { @@ -39,13 +43,20 @@ 'true', 'type', 'unsafe', 'use', 'where', 'while', 'async', 'await', 'dyn', } +_RE_WHITESPACE = re.compile(r'\s+') +_RE_ARRAY_OF = re.compile(r'Array\s*of\s*', re.IGNORECASE) +_RE_ARRAY_ANGLE = re.compile(r'Array\s*<\s*(.+?)\s*>') +_RE_WORD_TOKEN = re.compile(r'[A-Za-z_][A-Za-z0-9_]*') +_RE_GLUED_OR = re.compile(r'(.+?)or\s*([A-Z].+)') +_RE_GLUED_AND = re.compile(r'(.+?)and\s*([A-Z].+)') + CUSTOM_TYPE_NAMES: set[str] = set() def extract_custom_type_tokens(type_str: str) -> set[str]: """Extract referenced custom Telegram type names from a raw type expression.""" normalized = normalize_type_expr(type_str) - tokens = set(re.findall(r'[A-Za-z_][A-Za-z0-9_]*', normalized)) + tokens = set(_RE_WORD_TOKEN.findall(normalized)) return {token for token in tokens if token in CUSTOM_TYPE_NAMES} @@ -75,10 +86,10 @@ def normalize_type_expr(type_str: str) -> str: if not s: return 'serde_json::Value' - s = re.sub(r'\s+', ' ', s) - s = re.sub(r'Array\s*of\s*', 'Array of ', s, flags=re.IGNORECASE) + s = _RE_WHITESPACE.sub(' ', s) + s = _RE_ARRAY_OF.sub('Array of ', s) - return re.sub(r'\s+', ' ', s).strip() + return _RE_WHITESPACE.sub(' ', s).strip() def is_known_atomic_type(token: str) -> bool: @@ -107,9 +118,8 @@ def split_union_parts(type_str: str) -> list[str]: if len(parts) > 1: return parts - # Handle glued unions like InputFileor String and AandB. - for connector in ('or', 'and'): - glued = re.fullmatch(rf'(.+?){connector}\s*([A-Z].+)', type_str) + for pattern in (_RE_GLUED_OR, _RE_GLUED_AND): + glued = pattern.fullmatch(type_str) if not glued: continue @@ -135,7 +145,7 @@ def parse_type(type_str: str, lang: str = 'rust', ts_use_namespace: bool = False return array_wrapper.format(inner_type) # Handle "Array" as an alternative notation. - array_match = re.fullmatch(r'Array\s*<\s*(.+?)\s*>', type_str) + array_match = _RE_ARRAY_ANGLE.fullmatch(type_str) if array_match: inner = array_match.group(1).strip() inner_type = parse_type(inner, lang, ts_use_namespace) @@ -301,8 +311,8 @@ def load_data(self) -> None: data = json.load(f) self.methods_data = data['methods'] - print(f"šŸ“‚ Loaded {len(self.types_data)} types") - print(f"šŸ“‚ Loaded {len(self.methods_data)} methods") + logger.info("Loaded %d types", len(self.types_data)) + logger.info("Loaded %d methods", len(self.methods_data)) def process_types(self) -> list: """Process types for code generation.""" @@ -392,66 +402,56 @@ def process_methods(self) -> list: return processed - def generate_rust_types(self, output_file: str) -> None: - """Generate Rust types file.""" - types = self.process_types() - template = self.env.from_string(RUST_TYPES_TEMPLATE) - output = template.render(types=types) - - os.makedirs(os.path.dirname(output_file), exist_ok=True) - with open(output_file, 'w', encoding='utf-8') as f: - f.write(output) - - print(f"āœ… Generated Rust types: {output_file} ({len(types)} types)") - - def generate_rust_methods(self, output_file: str) -> None: - """Generate Rust methods file.""" - methods = self.process_methods() - template = self.env.from_string(RUST_METHODS_TEMPLATE) - output = template.render(methods=methods) + def _render_to_file( + self, template_str: str, context: dict, output_file: str, + ) -> None: + """Render a Jinja2 template to an output file.""" + template = self.env.from_string(template_str) + output = template.render(**context) os.makedirs(os.path.dirname(output_file), exist_ok=True) with open(output_file, 'w', encoding='utf-8') as f: f.write(output) - print(f"āœ… Generated Rust methods: {output_file} ({len(methods)} methods)") + def generate_all( + self, rust_output_dir: str, ts_output_dir: str, + ) -> None: + """Generate all code files. - def generate_ts_types(self, output_file: str) -> None: - """Generate TypeScript types file.""" + Processes types and methods once, then renders all four + output files (Rust types/methods, TypeScript types/methods). + """ types = self.process_types() - template = self.env.from_string(TS_TYPES_TEMPLATE) - output = template.render(types=types) - - os.makedirs(os.path.dirname(output_file), exist_ok=True) - with open(output_file, 'w', encoding='utf-8') as f: - f.write(output) - - print(f"āœ… Generated TypeScript types: {output_file} ({len(types)} types)") - - def generate_ts_methods(self, output_file: str) -> None: - """Generate TypeScript methods file.""" methods = self.process_methods() - template = self.env.from_string(TS_METHODS_TEMPLATE) - output = template.render(methods=methods) - - os.makedirs(os.path.dirname(output_file), exist_ok=True) - with open(output_file, 'w', encoding='utf-8') as f: - f.write(output) - print(f"āœ… Generated TypeScript methods: {output_file} ({len(methods)} methods)") - - def generate_all(self, rust_output_dir: str, ts_output_dir: str) -> None: - """Generate all code files.""" - self.generate_rust_types(os.path.join(rust_output_dir, 'types.rs')) - self.generate_rust_methods(os.path.join(rust_output_dir, 'methods.rs')) - self.generate_ts_types(os.path.join(ts_output_dir, 'types.ts')) - self.generate_ts_methods(os.path.join(ts_output_dir, 'methods.ts')) + outputs = [ + (RUST_TYPES_TEMPLATE, {'types': types}, + os.path.join(rust_output_dir, 'types.rs'), + f"{len(types)} types"), + (RUST_METHODS_TEMPLATE, {'methods': methods}, + os.path.join(rust_output_dir, 'methods.rs'), + f"{len(methods)} methods"), + (TS_TYPES_TEMPLATE, {'types': types}, + os.path.join(ts_output_dir, 'types.ts'), + f"{len(types)} types"), + (TS_METHODS_TEMPLATE, {'methods': methods}, + os.path.join(ts_output_dir, 'methods.ts'), + f"{len(methods)} methods"), + ] + + for template_str, context, path, desc in outputs: + self._render_to_file(template_str, context, path) + logger.info("Generated %s: %s", desc, path) def main(): """Main entry point.""" - print("šŸš€ Simula - Code Generator") - print("=" * 50) + logging.basicConfig( + level=logging.INFO, + format="%(levelname)s: %(message)s", + ) + + logger.info("Simula - Code Generator") script_dir = Path(__file__).resolve().parent scraper_root = script_dir.parent @@ -482,7 +482,7 @@ def main(): ts_output_dir=str(ts_output_dir) ) - print("\nāœ… Code generation completed!") + logger.info("Code generation completed!") if __name__ == "__main__": diff --git a/scraper/src/scraper.py b/scraper/src/scraper.py index c4a809d..2df3ed5 100644 --- a/scraper/src/scraper.py +++ b/scraper/src/scraper.py @@ -10,12 +10,35 @@ """ import json -import re +import logging import os +import re +from dataclasses import asdict, dataclass, field from typing import Optional -from dataclasses import dataclass, asdict, field -from bs4 import BeautifulSoup, Tag + import requests +from bs4 import BeautifulSoup, Tag + +logger = logging.getLogger(__name__) + +# Pre-compiled regex patterns for return type extraction +_RE_RETURNS_TRUE = re.compile(r'Returns?\s+True\b', re.IGNORECASE) +_RE_ARRAY_OF_LINK = re.compile( + r'[Aa]rray\s+of\s+]*>([A-Z][a-zA-Z]+)', +) +_RE_RETURNS_LINK = re.compile( + r'[Rr]eturns?[^<]*]*>([A-Z][a-zA-Z]+)', +) +_RE_SENT_LINK = re.compile( + r'the\s+sent\s+]*>([A-Z][a-zA-Z]+)', +) +_RE_SUCCESS_LINK = re.compile( + r'[Oo]n\s+success[^<]*]*>([A-Z][a-zA-Z]+)', +) +_RE_FORM_OF_LINK = re.compile( + r'form\s+of\s+(?:a|an)\s+]*>([A-Z][a-zA-Z]+)', +) +_RE_RETURN_OR_SUCCESS = re.compile(r'[Rr]eturn|success') # Data structures @@ -68,11 +91,11 @@ def fetch_page(self) -> str: """Fetch the API documentation page.""" # Check cache first if self.use_cache and os.path.exists(self.CACHE_FILE): - print(f"šŸ“‚ Loading from cache: {self.CACHE_FILE}") + logger.info("Loading from cache: %s", self.CACHE_FILE) with open(self.CACHE_FILE, 'r', encoding='utf-8') as f: return f.read() - print(f"🌐 Fetching: {self.API_URL}") + logger.info("Fetching: %s", self.API_URL) headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } @@ -83,7 +106,7 @@ def fetch_page(self) -> str: os.makedirs("output", exist_ok=True) with open(self.CACHE_FILE, 'w', encoding='utf-8') as f: f.write(response.text) - print(f"šŸ’¾ Cached to: {self.CACHE_FILE}") + logger.info("Cached to: %s", self.CACHE_FILE) return response.text @@ -97,10 +120,9 @@ def parse(self, html: str) -> None: raise ValueError("Could not find main content div") # Process all h4 elements (methods and types are defined under h4) - current_section = None h4_elements = content.find_all('h4') - print(f"šŸ“ Found {len(h4_elements)} h4 elements") + logger.info("Found %d h4 elements", len(h4_elements)) for h4 in h4_elements: name = h4.get_text(strip=True) @@ -120,8 +142,8 @@ def parse(self, html: str) -> None: if telegram_type: self.types.append(telegram_type) - print(f"āœ… Parsed {len(self.methods)} methods") - print(f"āœ… Parsed {len(self.types)} types") + logger.info("Parsed %d methods", len(self.methods)) + logger.info("Parsed %d types", len(self.types)) def _parse_method(self, h4: Tag) -> Optional[Method]: """Parse a method definition.""" @@ -171,7 +193,7 @@ def _extract_return_type_from_element(self, element: Tag) -> Optional[str]: text = element.get_text() # Check for True/Boolean return first - if re.search(r'Returns?\s+True\b', text, re.IGNORECASE): + if _RE_RETURNS_TRUE.search(text): return "Boolean" # Find all links in the element @@ -189,61 +211,33 @@ def _extract_return_type_from_element(self, element: Tag) -> Optional[str]: html = str(element) # Check for "Array of X" pattern - array_match = re.search(r'[Aa]rray\s+of\s+]*>([A-Z][a-zA-Z]+)', html) + array_match = _RE_ARRAY_OF_LINK.search(html) if array_match: return f"Array<{array_match.group(1)}>" # Check for "Returns ... Type" pattern - returns_link_match = re.search( - r'[Rr]eturns?[^<]*]*>([A-Z][a-zA-Z]+)', - html - ) + returns_link_match = _RE_RETURNS_LINK.search(html) if returns_link_match: return returns_link_match.group(1) # Check for "the sent Message" pattern - sent_match = re.search( - r'the\s+sent\s+]*>([A-Z][a-zA-Z]+)', - html - ) + sent_match = _RE_SENT_LINK.search(html) if sent_match: return sent_match.group(1) # Check for "On success, Type is returned" - success_match = re.search( - r'[Oo]n\s+success[^<]*]*>([A-Z][a-zA-Z]+)', - html - ) + success_match = _RE_SUCCESS_LINK.search(html) if success_match: return success_match.group(1) # Check for "form of a Type" - form_match = re.search( - r'form\s+of\s+(?:a|an)\s+]*>([A-Z][a-zA-Z]+)', - html - ) + form_match = _RE_FORM_OF_LINK.search(html) if form_match: return form_match.group(1) # If we have type links and the text mentions returns/success - if type_links and re.search(r'[Rr]eturn|success', text): - return type_links[-1] # Usually the return type is last mentioned - - return None - - def _extract_return_type(self, text: str) -> Optional[str]: - """Extract return type from plain text (fallback).""" - # Check for True/Boolean return first - if re.search(r'Returns?\s+True\b', text, re.IGNORECASE): - return "Boolean" - - # Pattern for "Array of X" - array_match = re.search( - r'(?:Returns?|returns)\s+(?:an?\s+)?[Aa]rray\s+of\s+([A-Z][a-zA-Z]+)', - text - ) - if array_match: - return f"Array<{array_match.group(1)}>" + if type_links and _RE_RETURN_OR_SUCCESS.search(text): + return type_links[-1] # Usually the return type is last return None @@ -267,12 +261,6 @@ def _parse_type(self, h4: Tag) -> Optional[TelegramType]: text = sibling.get_text(strip=True) description_parts.append(text) - # Check for inheritance - subtype_match = re.search( - r'(?:This object represents|Describes)\s+(?:a|an|the)?\s*([a-z\s]+)', - text, re.IGNORECASE - ) - elif sibling.name == 'ul': # Sometimes types have a list of subtypes pass @@ -383,13 +371,13 @@ def save_output(self, output_dir: str = "output") -> None: methods_file = os.path.join(output_dir, "methods.json") with open(methods_file, 'w', encoding='utf-8') as f: json.dump(methods_data, f, indent=2, ensure_ascii=False) - print(f"šŸ’¾ Saved {len(self.methods)} methods to {methods_file}") + logger.info("Saved %d methods to %s", len(self.methods), methods_file) # Save types types_file = os.path.join(output_dir, "types.json") with open(types_file, 'w', encoding='utf-8') as f: json.dump(types_data, f, indent=2, ensure_ascii=False) - print(f"šŸ’¾ Saved {len(self.types)} types to {types_file}") + logger.info("Saved %d types to %s", len(self.types), types_file) def _method_to_dict(self, method: Method) -> dict: """Convert Method to dictionary.""" @@ -410,33 +398,33 @@ def _type_to_dict(self, t: TelegramType) -> dict: "properties": [asdict(p) for p in t.properties] } - def print_summary(self) -> None: - """Print a summary of scraped data.""" - print("\n" + "="*60) - print("šŸ“Š SCRAPING SUMMARY") - print("="*60) + def log_summary(self) -> None: + """Log a summary of scraped data.""" + logger.info("SCRAPING SUMMARY") - print(f"\nšŸ“Œ Methods ({len(self.methods)}):") + logger.info("Methods (%d):", len(self.methods)) for m in self.methods[:10]: params = len(m.parameters) - print(f" • {m.name}({params} params) -> {m.return_type}") + logger.info(" %s(%d params) -> %s", m.name, params, m.return_type) if len(self.methods) > 10: - print(f" ... and {len(self.methods) - 10} more") + logger.info(" ... and %d more", len(self.methods) - 10) - print(f"\nšŸ“Œ Types ({len(self.types)}):") + logger.info("Types (%d):", len(self.types)) for t in self.types[:10]: props = len(t.properties) - print(f" • {t.name} ({props} properties)") + logger.info(" %s (%d properties)", t.name, props) if len(self.types) > 10: - print(f" ... and {len(self.types) - 10} more") - - print("\n" + "="*60) + logger.info(" ... and %d more", len(self.types) - 10) def main(): """Main entry point.""" - print("šŸš€ Simula - Telegram Bot API Scraper") - print("="*50) + logging.basicConfig( + level=logging.INFO, + format="%(levelname)s: %(message)s", + ) + + logger.info("Simula - Telegram Bot API Scraper") # Check if we should use cache (for offline development) use_cache = os.environ.get('USE_CACHE', '').lower() == 'true' @@ -451,17 +439,17 @@ def main(): # Save output scraper.save_output() - # Print summary - scraper.print_summary() + # Log summary + scraper.log_summary() - print("\nāœ… Scraping completed successfully!") + logger.info("Scraping completed successfully!") except requests.RequestException as e: - print(f"āŒ Network error: {e}") - print("šŸ’” Tip: Set USE_CACHE=true to use cached HTML") + logger.error("Network error: %s", e) + logger.info("Tip: Set USE_CACHE=true to use cached HTML") return 1 except Exception as e: - print(f"āŒ Error: {e}") + logger.error("Error: %s", e) raise return 0