Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 54 additions & 48 deletions peppy/cache.py
Original file line number Diff line number Diff line change
@@ -1,123 +1,129 @@
"""Caching functionality for codebase indices."""

import json
import os
import hashlib
from pathlib import Path
from typing import Optional, Dict, Any
from datetime import datetime
from datetime import datetime, timedelta


class IndexCache:
"""Manages caching of codebase indices."""

def __init__(self, cache_dir: Optional[Path] = None):
def __init__(self, cache_dir: Optional[Path] = None, ttl_seconds: Optional[int] = None):
"""Initialize the cache manager.

Args:
cache_dir: Directory to store cache files. Defaults to .peppy_cache in home dir.
ttl_seconds: Optional cache TTL. If set, entries older than this are treated as stale.
"""
if cache_dir is None:
cache_dir = Path.home() / ".peppy_cache"
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.ttl_seconds = ttl_seconds

def _get_cache_key(self, path: Path) -> str:
"""Generate a cache key for a given path.

Args:
path: The codebase path

Returns:
A hash string to use as cache key
"""
# Use absolute path for consistent hashing
"""Generate a cache key for a given path."""
abs_path = path.resolve()
return hashlib.md5(str(abs_path).encode()).hexdigest()
return hashlib.sha256(str(abs_path).encode()).hexdigest()

def _get_cache_path(self, cache_key: str) -> Path:
"""Get the cache file path for a given key.
"""Get the cache file path for a given key."""
return self.cache_dir / f"{cache_key}.json"

Args:
cache_key: The cache key
def _compute_signature(self, path: Path, index: Optional[Dict[str, Any]] = None) -> Optional[str]:
"""Compute a lightweight freshness signature for indexed files.

Returns:
Path to the cache file
Signature format is based on file count + max mtime + total size over indexed files.
"""
return self.cache_dir / f"{cache_key}.json"
files = (index or {}).get("files", [])
if not files:
return None

def get(self, path: Path) -> Optional[Dict[str, Any]]:
"""Retrieve cached index for a path.
count = 0
total_size = 0
max_mtime = 0.0
for file_info in files:
file_path = Path(file_info.get("path", ""))
if not file_path.is_absolute():
file_path = path / file_path
try:
st = file_path.stat()
except OSError:
return None
count += 1
total_size += st.st_size
if st.st_mtime > max_mtime:
max_mtime = st.st_mtime

Args:
path: The codebase path
raw = f"{count}:{int(max_mtime)}:{total_size}"
return hashlib.sha256(raw.encode()).hexdigest()

Returns:
Cached index data or None if not found/expired
"""
def get(self, path: Path) -> Optional[Dict[str, Any]]:
"""Retrieve cached index for a path."""
cache_key = self._get_cache_key(path)
cache_path = self._get_cache_path(cache_key)

if not cache_path.exists():
return None

try:
with open(cache_path, "r") as f:
with open(cache_path, "r", encoding="utf-8") as f:
data = json.load(f)

# Check if cache is still valid
cached_time = datetime.fromisoformat(data.get("timestamp", ""))
codebase_path = Path(data.get("path", ""))

# Simple validation: check if path still exists
if not codebase_path.exists():
return None

if self.ttl_seconds is not None:
if datetime.now() - cached_time > timedelta(seconds=self.ttl_seconds):
return None

index = data.get("index")
if not isinstance(index, dict):
return None

cached_signature = data.get("signature")
if cached_signature:
current_signature = self._compute_signature(codebase_path, index=index)
if not current_signature or current_signature != cached_signature:
return None

return data

except (json.JSONDecodeError, KeyError, ValueError, OSError):
# Cache is corrupted or invalid
return None

def set(self, path: Path, index_data: Dict[str, Any]) -> None:
"""Store index data in cache.

Args:
path: The codebase path
index_data: The index data to cache
"""
"""Store index data in cache."""
cache_key = self._get_cache_key(path)
cache_path = self._get_cache_path(cache_key)

# Add metadata
resolved_path = path.resolve()
cache_entry = {
"path": str(path.resolve()),
"path": str(resolved_path),
"timestamp": datetime.now().isoformat(),
"signature": self._compute_signature(resolved_path, index=index_data),
"index": index_data,
}

try:
with open(cache_path, "w") as f:
with open(cache_path, "w", encoding="utf-8") as f:
json.dump(cache_entry, f, indent=2)
except OSError as e:
# Failed to write cache, but don't fail the operation
print(f"Warning: Failed to write cache: {e}")

def clear(self, path: Optional[Path] = None) -> None:
"""Clear cache for a specific path or all caches.

Args:
path: Optional path to clear cache for. If None, clears all caches.
"""
"""Clear cache for a specific path or all caches."""
if path is None:
# Clear all caches
for cache_file in self.cache_dir.glob("*.json"):
try:
cache_file.unlink()
except OSError:
pass
else:
# Clear specific cache
cache_key = self._get_cache_key(path)
cache_path = self._get_cache_path(cache_key)
if cache_path.exists():
Expand Down
Loading