From 2c870a550c7a5f553579820a48eeb63a82cd9814 Mon Sep 17 00:00:00 2001 From: DevGwardo Date: Mon, 2 Mar 2026 17:54:44 -0500 Subject: [PATCH 1/3] perf: optimize cache.py hot paths --- peppy/cache.py | 102 ++++++++++++++++++++++++++----------------------- 1 file changed, 54 insertions(+), 48 deletions(-) diff --git a/peppy/cache.py b/peppy/cache.py index 85a2478..2a4e2b6 100644 --- a/peppy/cache.py +++ b/peppy/cache.py @@ -1,60 +1,67 @@ """Caching functionality for codebase indices.""" import json -import os import hashlib from pathlib import Path from typing import Optional, Dict, Any -from datetime import datetime +from datetime import datetime, timedelta class IndexCache: """Manages caching of codebase indices.""" - def __init__(self, cache_dir: Optional[Path] = None): + def __init__(self, cache_dir: Optional[Path] = None, ttl_seconds: Optional[int] = None): """Initialize the cache manager. Args: cache_dir: Directory to store cache files. Defaults to .peppy_cache in home dir. + ttl_seconds: Optional cache TTL. If set, entries older than this are treated as stale. """ if cache_dir is None: cache_dir = Path.home() / ".peppy_cache" self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(parents=True, exist_ok=True) + self.ttl_seconds = ttl_seconds def _get_cache_key(self, path: Path) -> str: - """Generate a cache key for a given path. - - Args: - path: The codebase path - - Returns: - A hash string to use as cache key - """ - # Use absolute path for consistent hashing + """Generate a cache key for a given path.""" abs_path = path.resolve() - return hashlib.md5(str(abs_path).encode()).hexdigest() + return hashlib.sha256(str(abs_path).encode()).hexdigest() def _get_cache_path(self, cache_key: str) -> Path: - """Get the cache file path for a given key. + """Get the cache file path for a given key.""" + return self.cache_dir / f"{cache_key}.json" - Args: - cache_key: The cache key + def _compute_signature(self, path: Path, index: Optional[Dict[str, Any]] = None) -> Optional[str]: + """Compute a lightweight freshness signature for indexed files. - Returns: - Path to the cache file + Signature format is based on file count + max mtime + total size over indexed files. """ - return self.cache_dir / f"{cache_key}.json" + files = (index or {}).get("files", []) + if not files: + return None - def get(self, path: Path) -> Optional[Dict[str, Any]]: - """Retrieve cached index for a path. + count = 0 + total_size = 0 + max_mtime = 0.0 + for file_info in files: + file_path = Path(file_info.get("path", "")) + if not file_path.is_absolute(): + file_path = path / file_path + try: + st = file_path.stat() + except OSError: + return None + count += 1 + total_size += st.st_size + if st.st_mtime > max_mtime: + max_mtime = st.st_mtime - Args: - path: The codebase path + raw = f"{count}:{int(max_mtime)}:{total_size}" + return hashlib.sha256(raw.encode()).hexdigest() - Returns: - Cached index data or None if not found/expired - """ + def get(self, path: Path) -> Optional[Dict[str, Any]]: + """Retrieve cached index for a path.""" cache_key = self._get_cache_key(path) cache_path = self._get_cache_path(cache_key) @@ -62,62 +69,61 @@ def get(self, path: Path) -> Optional[Dict[str, Any]]: return None try: - with open(cache_path, "r") as f: + with open(cache_path, "r", encoding="utf-8") as f: data = json.load(f) - # Check if cache is still valid cached_time = datetime.fromisoformat(data.get("timestamp", "")) codebase_path = Path(data.get("path", "")) - - # Simple validation: check if path still exists if not codebase_path.exists(): return None + if self.ttl_seconds is not None: + if datetime.now() - cached_time > timedelta(seconds=self.ttl_seconds): + return None + + index = data.get("index") + if not isinstance(index, dict): + return None + + cached_signature = data.get("signature") + if cached_signature: + current_signature = self._compute_signature(codebase_path, index=index) + if not current_signature or current_signature != cached_signature: + return None + return data except (json.JSONDecodeError, KeyError, ValueError, OSError): - # Cache is corrupted or invalid return None def set(self, path: Path, index_data: Dict[str, Any]) -> None: - """Store index data in cache. - - Args: - path: The codebase path - index_data: The index data to cache - """ + """Store index data in cache.""" cache_key = self._get_cache_key(path) cache_path = self._get_cache_path(cache_key) - # Add metadata + resolved_path = path.resolve() cache_entry = { - "path": str(path.resolve()), + "path": str(resolved_path), "timestamp": datetime.now().isoformat(), + "signature": self._compute_signature(resolved_path, index=index_data), "index": index_data, } try: - with open(cache_path, "w") as f: + with open(cache_path, "w", encoding="utf-8") as f: json.dump(cache_entry, f, indent=2) except OSError as e: - # Failed to write cache, but don't fail the operation print(f"Warning: Failed to write cache: {e}") def clear(self, path: Optional[Path] = None) -> None: - """Clear cache for a specific path or all caches. - - Args: - path: Optional path to clear cache for. If None, clears all caches. - """ + """Clear cache for a specific path or all caches.""" if path is None: - # Clear all caches for cache_file in self.cache_dir.glob("*.json"): try: cache_file.unlink() except OSError: pass else: - # Clear specific cache cache_key = self._get_cache_key(path) cache_path = self._get_cache_path(cache_key) if cache_path.exists(): From 12360035111bb2c4efb6bf33cf6ec60d3d8e48a5 Mon Sep 17 00:00:00 2001 From: DevGwardo Date: Mon, 2 Mar 2026 17:54:45 -0500 Subject: [PATCH 2/3] perf: optimize indexer.py hot paths --- peppy/indexer.py | 128 +++++++++++++++-------------------------------- 1 file changed, 39 insertions(+), 89 deletions(-) diff --git a/peppy/indexer.py b/peppy/indexer.py index c9c3436..87cc9c7 100644 --- a/peppy/indexer.py +++ b/peppy/indexer.py @@ -2,7 +2,7 @@ import os from pathlib import Path -from typing import List, Dict, Any, Set, Optional +from typing import List, Dict, Any, Optional from concurrent.futures import ThreadPoolExecutor, as_completed import fnmatch @@ -12,25 +12,23 @@ except ImportError: GITIGNORE_AVAILABLE = False -from .parsers import CodeParser, Symbol +from .parsers import CodeParser from .cache import IndexCache class CodebaseIndexer: """Indexes a codebase for fast searching.""" - # Common directories to ignore DEFAULT_IGNORE_DIRS = { ".git", ".svn", ".hg", "node_modules", "venv", "env", ".venv", ".env", "__pycache__", ".pytest_cache", "dist", "build", ".eggs", "*.egg-info", ".idea", ".vscode", - "target", # Rust - "vendor", # Go + "target", + "vendor", } - # File extensions to index DEFAULT_EXTENSIONS = { ".py", ".js", ".jsx", ".ts", ".tsx", ".go", ".rs", ".java", ".c", ".cpp", ".h", ".hpp", @@ -39,38 +37,29 @@ class CodebaseIndexer: } def __init__(self, cache: Optional[IndexCache] = None): - """Initialize the indexer. - - Args: - cache: Optional cache instance. If None, creates a new one. - """ self.parser = CodeParser() self.cache = cache or IndexCache() + self._ignore_exact = {p for p in self.DEFAULT_IGNORE_DIRS if "*" not in p and "?" not in p and "[" not in p} + self._ignore_globs = [p for p in self.DEFAULT_IGNORE_DIRS if p not in self._ignore_exact] + + def _is_ignored_part(self, part: str) -> bool: + if part in self._ignore_exact: + return True + return any(fnmatch.fnmatch(part, pattern) for pattern in self._ignore_globs) def should_ignore(self, path: Path, root: Path, gitignore_matcher=None) -> bool: - """Check if a path should be ignored. - - Args: - path: Path to check - root: Root directory of the codebase - gitignore_matcher: Optional gitignore matcher function - - Returns: - True if the path should be ignored - """ - # Check against default ignore patterns - parts = path.relative_to(root).parts - for part in parts: - if part in self.DEFAULT_IGNORE_DIRS: + try: + relative = path.relative_to(root) + except ValueError: + return True + + for part in relative.parts: + if self._is_ignored_part(part): return True - for pattern in self.DEFAULT_IGNORE_DIRS: - if fnmatch.fnmatch(part, pattern): - return True - # Check gitignore if gitignore_matcher and GITIGNORE_AVAILABLE: try: - if gitignore_matcher(str(path)): + if gitignore_matcher(relative.as_posix()): return True except Exception: pass @@ -78,18 +67,9 @@ def should_ignore(self, path: Path, root: Path, gitignore_matcher=None) -> bool: return False def collect_files(self, root: Path) -> List[Path]: - """Collect all files to index. - - Args: - root: Root directory of the codebase - - Returns: - List of file paths to index - """ - files = [] + files: List[Path] = [] root = Path(root).resolve() - # Try to parse .gitignore gitignore_matcher = None gitignore_path = root / ".gitignore" if GITIGNORE_AVAILABLE and gitignore_path.exists(): @@ -98,43 +78,24 @@ def collect_files(self, root: Path) -> List[Path]: except Exception as e: print(f"Warning: Failed to parse .gitignore: {e}") - # Walk the directory tree for dirpath, dirnames, filenames in os.walk(root): current_path = Path(dirpath) - - # Filter out ignored directories (modify in-place to affect os.walk) dirnames[:] = [ d for d in dirnames if not self.should_ignore(current_path / d, root, gitignore_matcher) ] - # Collect files with supported extensions for filename in filenames: file_path = current_path / filename - ext = file_path.suffix.lower() - - if ext in self.DEFAULT_EXTENSIONS: - if not self.should_ignore(file_path, root, gitignore_matcher): - files.append(file_path) + if file_path.suffix.lower() in self.DEFAULT_EXTENSIONS and not self.should_ignore(file_path, root, gitignore_matcher): + files.append(file_path) return files def index_file(self, file_path: Path) -> Dict[str, Any]: - """Index a single file. - - Args: - file_path: Path to the file - - Returns: - Dictionary containing file metadata and symbols - """ try: - # Parse symbols symbols = self.parser.parse_file(str(file_path)) - - # Get file stats stats = file_path.stat() - return { "path": str(file_path), "size": stats.st_size, @@ -151,34 +112,18 @@ def index_file(self, file_path: Path) -> Dict[str, Any]: for s in symbols ], } - except Exception as e: print(f"Warning: Failed to index {file_path}: {e}") - return { - "path": str(file_path), - "error": str(e), - "symbols": [], - } + return {"path": str(file_path), "error": str(e), "symbols": []} def index_codebase( self, path: Path, force_reindex: bool = False, - max_workers: int = 4 + max_workers: Optional[int] = None, ) -> Dict[str, Any]: - """Index an entire codebase. - - Args: - path: Root path of the codebase - force_reindex: Force re-indexing even if cache exists - max_workers: Number of parallel workers for indexing - - Returns: - Dictionary containing the complete index - """ path = Path(path).resolve() - # Check cache first if not force_reindex: cached = self.cache.get(path) if cached: @@ -186,35 +131,40 @@ def index_codebase( return cached.get("index", {}) print(f"Indexing codebase at {path}...") - - # Collect files files = self.collect_files(path) print(f"Found {len(files)} files to index") - # Index files in parallel - file_indices = [] - with ThreadPoolExecutor(max_workers=max_workers) as executor: - futures = {executor.submit(self.index_file, f): f for f in files} + workers = max_workers if max_workers and max_workers > 0 else min(32, max(2, (os.cpu_count() or 4) * 2)) + + file_indices: List[Dict[str, Any]] = [] + symbol_types: Dict[str, int] = {} + file_extensions: Dict[str, int] = {} + with ThreadPoolExecutor(max_workers=workers) as executor: + futures = {executor.submit(self.index_file, f): f for f in files} for future in as_completed(futures): file_path = futures[future] try: result = future.result() file_indices.append(result) + + ext = Path(result.get("path", str(file_path))).suffix + file_extensions[ext] = file_extensions.get(ext, 0) + 1 + for symbol in result.get("symbols", []): + sym_type = symbol.get("type", "unknown") + symbol_types[sym_type] = symbol_types.get(sym_type, 0) + 1 except Exception as e: print(f"Error indexing {file_path}: {e}") - # Build the complete index index = { "root": str(path), "total_files": len(file_indices), "files": file_indices, "symbol_count": sum(len(f.get("symbols", [])) for f in file_indices), + "symbol_types": symbol_types, + "file_extensions": file_extensions, } - # Cache the index self.cache.set(path, index) - print(f"Indexed {index['total_files']} files with {index['symbol_count']} symbols") - return index From 7197505e24efa4be563c14650cf6dcfc76b9033b Mon Sep 17 00:00:00 2001 From: DevGwardo Date: Mon, 2 Mar 2026 17:54:46 -0500 Subject: [PATCH 3/3] perf: optimize searcher.py hot paths --- peppy/searcher.py | 181 +++++++++++++++------------------------------- 1 file changed, 59 insertions(+), 122 deletions(-) diff --git a/peppy/searcher.py b/peppy/searcher.py index 2802ea8..477360d 100644 --- a/peppy/searcher.py +++ b/peppy/searcher.py @@ -12,27 +12,31 @@ class CodebaseSearcher: """Provides search and grep functionality over indexed codebases.""" def __init__(self, cache: Optional[IndexCache] = None): - """Initialize the searcher. - - Args: - cache: Optional cache instance. If None, creates a new one. - """ self.cache = cache or IndexCache() def get_index(self, path: Path) -> Optional[Dict[str, Any]]: - """Get the index for a codebase. - - Args: - path: Root path of the codebase - - Returns: - Index dictionary or None if not found - """ cached = self.cache.get(path) if cached: return cached.get("index") return None + @staticmethod + def _matches_file_pattern(file_path: str, root: Path, file_pattern: Optional[str]) -> bool: + if not file_pattern: + return True + p = Path(file_path) + rel = p + try: + rel = p.resolve().relative_to(root.resolve()) + except Exception: + pass + rel_posix = rel.as_posix() + return ( + fnmatch.fnmatch(rel_posix, file_pattern) + or fnmatch.fnmatch(p.name, file_pattern) + or fnmatch.fnmatch(file_path, file_pattern) + ) + def search_symbols( self, codebase_path: Path, @@ -41,56 +45,32 @@ def search_symbols( file_pattern: Optional[str] = None, use_regex: bool = True, ) -> List[Dict[str, Any]]: - """Search for symbols in the indexed codebase. - - Args: - codebase_path: Root path of the codebase - query: Search query (supports regex) - symbol_type: Optional filter by symbol type (function, class, etc.) - file_pattern: Optional file pattern filter (e.g., "*.py") - use_regex: Whether to treat query as regex - - Returns: - List of matching symbols - """ index = self.get_index(codebase_path) if not index: return [] - # Compile regex pattern if needed pattern = None if use_regex: try: pattern = re.compile(query, re.IGNORECASE) except re.error: - # Invalid regex, fall back to literal search use_regex = False results = [] + query_lower = query.lower() + root = Path(index.get("root", codebase_path)) for file_info in index.get("files", []): file_path = file_info.get("path", "") - - # Apply file pattern filter - if file_pattern and not fnmatch.fnmatch(file_path, file_pattern): + if not self._matches_file_pattern(file_path, root, file_pattern): continue - # Search symbols in this file for symbol in file_info.get("symbols", []): - # Apply symbol type filter if symbol_type and symbol.get("type") != symbol_type: continue - # Check if symbol name matches query name = symbol.get("name", "") - matches = False - - if use_regex and pattern: - matches = pattern.search(name) is not None - else: - matches = query.lower() in name.lower() - - if matches: + if (use_regex and pattern and pattern.search(name)) or (not use_regex and query_lower in name.lower()): results.append( { "name": name, @@ -104,20 +84,10 @@ def search_symbols( return results def get_file_symbols(self, codebase_path: Path, file_path: str) -> List[Dict[str, Any]]: - """Get all symbols in a specific file. - - Args: - codebase_path: Root path of the codebase - file_path: Path to the file (can be relative or absolute) - - Returns: - List of symbols in the file - """ index = self.get_index(codebase_path) if not index: return [] - # Normalize both paths for comparison file_path = str(Path(file_path).resolve()) for file_info in index.get("files", []): @@ -144,24 +114,10 @@ def grep_code( use_regex: bool = True, max_results: int = 100, ) -> List[Dict[str, Any]]: - """Perform grep search across the codebase. - - Args: - codebase_path: Root path of the codebase - pattern: Search pattern (supports regex) - file_pattern: Optional file pattern filter - context_lines: Number of context lines to include - use_regex: Whether to treat pattern as regex - max_results: Maximum number of results to return - - Returns: - List of matches with context - """ index = self.get_index(codebase_path) if not index: return [] - # Compile regex pattern regex_pattern = None if use_regex: try: @@ -169,66 +125,57 @@ def grep_code( except re.error: use_regex = False - results = [] + pattern_lower = pattern.lower() + results: List[Dict[str, Any]] = [] result_count = 0 + root = Path(index.get("root", codebase_path)) for file_info in index.get("files", []): if result_count >= max_results: break file_path = file_info.get("path", "") - - # Apply file pattern filter - if file_pattern and not fnmatch.fnmatch(file_path, file_pattern): + if not self._matches_file_pattern(file_path, root, file_pattern): continue - # Read file and search try: with open(file_path, "r", encoding="utf-8", errors="ignore") as f: - lines = f.readlines() + lines = f.read().splitlines() for i, line in enumerate(lines): if result_count >= max_results: break - # Check if line matches - matches = False - if use_regex and regex_pattern: - matches = regex_pattern.search(line) is not None - else: - matches = pattern.lower() in line.lower() + is_match = (use_regex and regex_pattern and regex_pattern.search(line) is not None) or ( + not use_regex and pattern_lower in line.lower() + ) + if not is_match: + continue + + result = { + "file": file_path, + "line": i + 1, + "content": line, + "context": None, + } - if matches: - # Get context lines + if context_lines > 0: start_line = max(0, i - context_lines) end_line = min(len(lines), i + context_lines + 1) - - context = { + result["context"] = { "before": [ - { - "line": start_line + j + 1, - "content": lines[start_line + j].rstrip(), - } - for j in range(i - start_line) + {"line": ln + 1, "content": lines[ln]} + for ln in range(start_line, i) ], - "match": {"line": i + 1, "content": line.rstrip()}, + "match": {"line": i + 1, "content": line}, "after": [ - {"line": i + j + 2, "content": lines[i + j + 1].rstrip()} - for j in range(end_line - i - 1) + {"line": ln + 1, "content": lines[ln]} + for ln in range(i + 1, end_line) ], } - results.append( - { - "file": file_path, - "line": i + 1, - "context": context if context_lines > 0 else None, - "content": line.rstrip(), - } - ) - - result_count += 1 - + results.append(result) + result_count += 1 except Exception as e: print(f"Warning: Failed to grep {file_path}: {e}") continue @@ -236,32 +183,22 @@ def grep_code( return results def get_statistics(self, codebase_path: Path) -> Dict[str, Any]: - """Get statistics about the indexed codebase. - - Args: - codebase_path: Root path of the codebase - - Returns: - Dictionary with statistics - """ index = self.get_index(codebase_path) if not index: return {} - # Count symbols by type - symbol_types = {} - file_extensions = {} - - for file_info in index.get("files", []): - # Count file extensions - file_path = file_info.get("path", "") - ext = Path(file_path).suffix - file_extensions[ext] = file_extensions.get(ext, 0) + 1 - - # Count symbol types - for symbol in file_info.get("symbols", []): - sym_type = symbol.get("type", "unknown") - symbol_types[sym_type] = symbol_types.get(sym_type, 0) + 1 + symbol_types = index.get("symbol_types") + file_extensions = index.get("file_extensions") + + if symbol_types is None or file_extensions is None: + symbol_types = {} + file_extensions = {} + for file_info in index.get("files", []): + ext = Path(file_info.get("path", "")).suffix + file_extensions[ext] = file_extensions.get(ext, 0) + 1 + for symbol in file_info.get("symbols", []): + sym_type = symbol.get("type", "unknown") + symbol_types[sym_type] = symbol_types.get(sym_type, 0) + 1 return { "root": index.get("root"),