From cc64abf8fbf71cbc0d04f27a385886c8883f99d5 Mon Sep 17 00:00:00 2001 From: "Kai (OpenClaw)" Date: Wed, 25 Feb 2026 14:08:02 +0200 Subject: [PATCH 1/4] feat: add native GCS watch directory support - Add GCSWatchSource in gcs_watcher.py for listing/downloading GCS blobs - Modify scan_and_process_once() to detect gs:// URIs and process them - Dedup key = gs://bucket/path (stable across runs, not temp file path) - Add 'gcs' optional dependency: pip install dmaf[gcs] - Temp files cleaned up after processing (finally block) - Clear ImportError if google-cloud-storage not installed - Local directory scanning unchanged - Add tests with mocked GCS client --- config.cloud.example.yaml | 3 + pyproject.toml | 5 +- src/dmaf/gcs_watcher.py | 115 ++++++++++++++++++ src/dmaf/watcher.py | 242 +++++++++++++++++++++++--------------- tests/test_gcs_watcher.py | 135 +++++++++++++++++++++ 5 files changed, 407 insertions(+), 93 deletions(-) create mode 100644 src/dmaf/gcs_watcher.py create mode 100644 tests/test_gcs_watcher.py diff --git a/config.cloud.example.yaml b/config.cloud.example.yaml index a298d09..e547781 100644 --- a/config.cloud.example.yaml +++ b/config.cloud.example.yaml @@ -2,6 +2,9 @@ # This config is used by Cloud Run Job # WhatsApp incoming media from GCS bucket +# GCS URIs are supported natively — just use gs://bucket/prefix/ and install dmaf[gcs]. +# The dedup key is the full gs:// path, so files won't be reprocessed across runs. +# You can mix local paths and GCS URIs in the same list. watch_dirs: - "gs://dmaf-production-whatsapp-media" diff --git a/pyproject.toml b/pyproject.toml index 66b1f46..8f8867f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,8 +65,11 @@ auraface = [ "onnxruntime>=1.15.0", "huggingface_hub>=0.20.0", ] +gcs = [ + "google-cloud-storage>=2.0", +] all = [ - "dmaf[face-recognition,insightface,auraface]", + "dmaf[face-recognition,insightface,auraface,gcs]", ] dev = [ "pytest>=7.4.0", diff --git a/src/dmaf/gcs_watcher.py b/src/dmaf/gcs_watcher.py new file mode 100644 index 0000000..905159c --- /dev/null +++ b/src/dmaf/gcs_watcher.py @@ -0,0 +1,115 @@ +""" +GCS watch source for DMAF. + +Enables using Google Cloud Storage buckets as watch directories. +Usage in config: watch_dirs: ["gs://my-bucket/prefix/"] + +Requires: pip install dmaf[gcs] +""" + +from __future__ import annotations + +import logging +import tempfile +from pathlib import Path +from urllib.parse import urlparse + +logger = logging.getLogger(__name__) + +IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".heic", ".webp"} + + +def _get_storage_client(): + """Get a GCS client, raising a clear error if not installed.""" + try: + from google.cloud import storage + except ImportError as e: + raise ImportError( + "google-cloud-storage is required for GCS watch directories. " + "Install with: pip install dmaf[gcs]" + ) from e + return storage.Client() + + +def parse_gcs_uri(uri: str) -> tuple[str, str]: + """ + Parse a gs:// URI into (bucket_name, prefix). + + Args: + uri: GCS URI like 'gs://bucket/prefix/' or 'gs://bucket' + + Returns: + (bucket_name, prefix) where prefix may be empty string + """ + parsed = urlparse(uri) + bucket = parsed.netloc + prefix = parsed.path.lstrip("/") + return bucket, prefix + + +def list_gcs_images(uri: str) -> list[str]: + """ + List all image files in a GCS bucket/prefix. + + Args: + uri: GCS URI like 'gs://bucket/prefix/' + + Returns: + List of full GCS paths like 'gs://bucket/path/to/image.jpg' + """ + client = _get_storage_client() + bucket_name, prefix = parse_gcs_uri(uri) + bucket = client.bucket(bucket_name) + + gcs_paths = [] + for blob in bucket.list_blobs(prefix=prefix): + # Skip "directory" markers + if blob.name.endswith("/"): + continue + suffix = Path(blob.name).suffix.lower() + if suffix in IMAGE_EXTENSIONS: + gcs_paths.append(f"gs://{bucket_name}/{blob.name}") + return gcs_paths + + +def download_gcs_blob(gcs_path: str) -> Path: + """ + Download a GCS blob to a temporary file. + + Args: + gcs_path: Full GCS path like 'gs://bucket/path/to/image.jpg' + + Returns: + Path to the downloaded temporary file. Caller must clean up with cleanup_temp_file(). + """ + client = _get_storage_client() + bucket_name, blob_name = parse_gcs_uri(gcs_path) + # blob_name from parse_gcs_uri is the prefix, but for a full path it's the object key + bucket = client.bucket(bucket_name) + blob = bucket.blob(blob_name) + + suffix = Path(blob_name).suffix + tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix, prefix="dmaf_gcs_") + blob.download_to_filename(tmp.name) + tmp.close() + + logger.debug(f"Downloaded {gcs_path} -> {tmp.name}") + return Path(tmp.name) + + +def cleanup_temp_file(local_path: Path) -> None: + """ + Remove a temporary file created by download_gcs_blob. + + Args: + local_path: Path to temporary file + """ + try: + local_path.unlink(missing_ok=True) + except Exception as e: + logger.warning(f"Failed to clean up temp file {local_path}: {e}") + + +def is_gcs_uri(path: str) -> bool: + """Check if a path is a GCS URI.""" + return path.startswith("gs://") diff --git a/src/dmaf/watcher.py b/src/dmaf/watcher.py index af1af72..12e95b6 100644 --- a/src/dmaf/watcher.py +++ b/src/dmaf/watcher.py @@ -135,6 +135,87 @@ def run_watch(dirs, handler): obs.join() +def _process_image_file( + image_path: Path, + dedup_key: str, + handler, + logger: logging.Logger, +) -> tuple[bool, bool]: + """ + Process a single image file through face recognition and upload pipeline. + + Args: + image_path: Local path to the image file (for reading pixels) + dedup_key: String key for deduplication (may differ from image_path for GCS) + handler: NewImageHandler with process_fn, db_conn, cfg, alert_manager + logger: Logger instance + + Returns: + (was_matched, had_error) tuple + """ + img = Image.open(image_path).convert("RGB") + np_img = np.array(img) + h = sha256_of_file(image_path) + + result = handler.process_fn(np_img) + + if len(result) == 3: + is_matched, who, scores = result + else: + is_matched, who = result + scores = {} + + best_score = max(scores.values()) if scores else None + best_person = max(scores, key=scores.get) if scores else None + + if handler.alert_manager and best_score is not None and not is_matched: + tolerance = handler.cfg.recognition.tolerance + borderline_offset = handler.cfg.alerting.borderline_offset + threshold = 1.0 - tolerance + borderline_low = threshold - borderline_offset + + if borderline_low <= best_score < threshold: + handler.alert_manager.record_borderline( + dedup_key, best_score, tolerance, best_person + ) + + handler.db_conn.add_file_with_score( + dedup_key, + h, + int(is_matched), + 0, + best_score, + best_person if is_matched else None, + ) + + had_error = False + if is_matched: + logger.info(f"Match {Path(dedup_key).name} -> {who}") + try: + handler.on_match(image_path, who) + if handler.cfg.delete_source_after_upload: + try: + image_path.unlink() + logger.info(f"Deleted source: {image_path.name}") + except Exception as e: + logger.warning(f"Failed to delete {image_path.name}: {e}") + except Exception as e: + logger.error(f"Upload failed for {Path(dedup_key).name}: {e}") + had_error = True + if handler.alert_manager: + handler.alert_manager.record_error("upload", str(e), dedup_key) + else: + logger.info(f"No match {Path(dedup_key).name}") + if handler.cfg.delete_unmatched_after_processing: + try: + image_path.unlink() + logger.info(f"Deleted unmatched: {image_path.name}") + except Exception as e: + logger.warning(f"Failed to delete unmatched {image_path.name}: {e}") + + return is_matched, had_error + + def scan_and_process_once(dirs, handler) -> ScanResult: """ Scan all directories once, process new images, then exit. @@ -142,13 +223,18 @@ def scan_and_process_once(dirs, handler) -> ScanResult: Used for batch/scheduled execution instead of continuous watching. Processes all image files in the directories that haven't been seen before. + Supports both local directories and GCS URIs (gs://bucket/prefix/). + For GCS URIs, the dedup key is the full gs:// path (not the temp file path). + Args: - dirs: List of directory paths to scan + dirs: List of directory paths or GCS URIs to scan handler: NewImageHandler instance with process_fn and db_conn Returns: ScanResult with statistics about the scan operation """ + from dmaf.gcs_watcher import cleanup_temp_file, download_gcs_blob, is_gcs_uri, list_gcs_images + logger = logging.getLogger(__name__) new_files = 0 @@ -159,105 +245,77 @@ def scan_and_process_once(dirs, handler) -> ScanResult: image_extensions = {".jpg", ".jpeg", ".png", ".heic", ".webp"} for dir_path in dirs: - pd = Path(dir_path) - if not pd.exists(): - logger.warning(f"Directory does not exist: {pd}") - continue + if is_gcs_uri(dir_path): + # --- GCS watch source --- + logger.info(f"Scanning GCS: {dir_path}") + try: + gcs_paths = list_gcs_images(dir_path) + except Exception as e: + logger.error(f"Failed to list GCS bucket {dir_path}: {e}") + errors += 1 + continue - logger.info(f"Scanning directory: {pd}") + for gcs_path in gcs_paths: + if handler.db_conn.seen(gcs_path): + continue - # Find all image files in directory - for image_path in pd.iterdir(): - if not image_path.is_file(): - continue - if image_path.suffix.lower() not in image_extensions: + new_files += 1 + local_path = None + try: + local_path = download_gcs_blob(gcs_path) + is_matched, had_error = _process_image_file( + local_path, gcs_path, handler, logger + ) + processed += 1 + if is_matched: + matched += 1 + if had_error: + errors += 1 + except Exception as e: + logger.error(f"Error processing {gcs_path}: {e}") + errors += 1 + if handler.alert_manager: + handler.alert_manager.record_error("processing", str(e), gcs_path) + finally: + if local_path is not None: + cleanup_temp_file(local_path) + else: + # --- Local watch source --- + pd = Path(dir_path) + if not pd.exists(): + logger.warning(f"Directory does not exist: {pd}") continue - # Check if already processed - if handler.db_conn.seen(str(image_path)): - continue + logger.info(f"Scanning directory: {pd}") - new_files += 1 + for image_path in pd.iterdir(): + if not image_path.is_file(): + continue + if image_path.suffix.lower() not in image_extensions: + continue - try: - # Process the image (same logic as _handle_file) - img = Image.open(image_path).convert("RGB") - np_img = np.array(img) - h = sha256_of_file(image_path) - - # Call process_fn - it now returns scores if configured - result = handler.process_fn(np_img) - - # Handle both old (matched, who) and new (matched, who, scores) formats - if len(result) == 3: - is_matched, who, scores = result - else: - is_matched, who = result - scores = {} - - # Extract best score and person - best_score = max(scores.values()) if scores else None - best_person = max(scores, key=scores.get) if scores else None - - # Check for borderline events (close to but below threshold) - if handler.alert_manager and best_score is not None and not is_matched: - tolerance = handler.cfg.recognition.tolerance - borderline_offset = handler.cfg.alerting.borderline_offset - threshold = 1.0 - tolerance - borderline_low = threshold - borderline_offset - - if borderline_low <= best_score < threshold: - handler.alert_manager.record_borderline( - str(image_path), best_score, tolerance, best_person - ) + dedup_key = str(image_path) + if handler.db_conn.seen(dedup_key): + continue - # Store with scores - handler.db_conn.add_file_with_score( - str(image_path), - h, - int(is_matched), - 0, - best_score, - best_person if is_matched else None, - ) - processed += 1 - - if is_matched: - matched += 1 - logger.info(f"Match {image_path.name} -> {who}") - - # Call on_match hook for uploading - try: - handler.on_match(image_path, who) - # Delete source if configured (after successful upload) - if handler.cfg.delete_source_after_upload: - try: - image_path.unlink() - logger.info(f"Deleted source: {image_path.name}") - except Exception as e: - logger.warning(f"Failed to delete {image_path.name}: {e}") - except Exception as e: - logger.error(f"Upload failed for {image_path.name}: {e}") - errors += 1 - # Record error if alert_manager available - if handler.alert_manager: - handler.alert_manager.record_error("upload", str(e), str(image_path)) - else: - logger.info(f"No match {image_path.name}") - # Delete unmatched if configured (staging cleanup) - if handler.cfg.delete_unmatched_after_processing: - try: - image_path.unlink() - logger.info(f"Deleted unmatched: {image_path.name}") - except Exception as e: - logger.warning(f"Failed to delete unmatched {image_path.name}: {e}") + new_files += 1 - except Exception as e: - logger.error(f"Error processing {image_path.name}: {e}") - errors += 1 - # Record error if alert_manager available - if handler.alert_manager: - handler.alert_manager.record_error("processing", str(e), str(image_path)) + try: + is_matched, had_error = _process_image_file( + image_path, dedup_key, handler, logger + ) + processed += 1 + if is_matched: + matched += 1 + if had_error: + errors += 1 + except Exception as e: + logger.error(f"Error processing {image_path.name}: {e}") + errors += 1 + if handler.alert_manager: + handler.alert_manager.record_error( + "processing", str(e), str(image_path) + ) success = errors == 0 and processed == new_files diff --git a/tests/test_gcs_watcher.py b/tests/test_gcs_watcher.py new file mode 100644 index 0000000..3f52c46 --- /dev/null +++ b/tests/test_gcs_watcher.py @@ -0,0 +1,135 @@ +"""Tests for GCS watch directory support.""" + +from __future__ import annotations + +import tempfile +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from dmaf.gcs_watcher import cleanup_temp_file, is_gcs_uri, parse_gcs_uri + + +class TestParseGcsUri: + def test_bucket_only(self): + assert parse_gcs_uri("gs://my-bucket") == ("my-bucket", "") + + def test_bucket_with_prefix(self): + assert parse_gcs_uri("gs://my-bucket/some/prefix/") == ("my-bucket", "some/prefix/") + + def test_bucket_with_file(self): + assert parse_gcs_uri("gs://my-bucket/path/to/file.jpg") == ( + "my-bucket", + "path/to/file.jpg", + ) + + +class TestIsGcsUri: + def test_gcs_uri(self): + assert is_gcs_uri("gs://bucket") is True + + def test_local_path(self): + assert is_gcs_uri("/tmp/local") is False + + def test_relative_path(self): + assert is_gcs_uri("./relative") is False + + +class TestCleanupTempFile: + def test_cleanup_existing(self, tmp_path): + f = tmp_path / "test.jpg" + f.write_bytes(b"data") + cleanup_temp_file(f) + assert not f.exists() + + def test_cleanup_missing_ok(self, tmp_path): + f = tmp_path / "nonexistent.jpg" + cleanup_temp_file(f) # should not raise + + +class TestGcsScanIntegration: + """Test that GCS paths are used as dedup keys, not temp paths.""" + + @patch("dmaf.gcs_watcher._get_storage_client") + def test_dedup_key_is_gcs_path(self, mock_get_client): + """The database should store gs://... paths, not /tmp/... paths.""" + from dmaf.watcher import scan_and_process_once + + # Mock GCS to return one blob + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_bucket = MagicMock() + mock_client.bucket.return_value = mock_bucket + + mock_blob = MagicMock() + mock_blob.name = "photos/test.jpg" + mock_bucket.list_blobs.return_value = [mock_blob] + + # Create a real temp image for download + import numpy as np + from PIL import Image + + test_img = Image.fromarray(np.zeros((100, 100, 3), dtype=np.uint8)) + tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") + test_img.save(tmp.name) + tmp.close() + + # Mock blob download to copy our test image + def fake_download(filename): + import shutil + shutil.copy(tmp.name, filename) + + mock_bucket.blob.return_value.download_to_filename = fake_download + + # Mock handler + handler = MagicMock() + handler.db_conn.seen.return_value = False + handler.process_fn.return_value = (False, [], {}) + handler.cfg.delete_unmatched_after_processing = False + handler.alert_manager = None + + result = scan_and_process_once(["gs://my-bucket/photos/"], handler) + + # Verify dedup key is the GCS path, not a temp path + seen_call_arg = handler.db_conn.seen.call_args[0][0] + assert seen_call_arg == "gs://my-bucket/photos/test.jpg" + + add_file_call_arg = handler.db_conn.add_file_with_score.call_args[0][0] + assert add_file_call_arg == "gs://my-bucket/photos/test.jpg" + assert not add_file_call_arg.startswith("/tmp") + + # Verify temp file was cleaned up + # (the temp files created by download_gcs_blob should be removed) + assert result.new_files == 1 + assert result.processed == 1 + + # Clean up our test image + Path(tmp.name).unlink(missing_ok=True) + + def test_local_dirs_unchanged(self, tmp_path): + """Local directory scanning should work exactly as before.""" + from dmaf.watcher import scan_and_process_once + + import numpy as np + from PIL import Image + + # Create a test image in local dir + img_path = tmp_path / "test.jpg" + test_img = Image.fromarray(np.zeros((100, 100, 3), dtype=np.uint8)) + test_img.save(img_path) + + handler = MagicMock() + handler.db_conn.seen.return_value = False + handler.process_fn.return_value = (False, [], {}) + handler.cfg.delete_unmatched_after_processing = False + handler.alert_manager = None + + result = scan_and_process_once([str(tmp_path)], handler) + + # Dedup key should be the local path + seen_call_arg = handler.db_conn.seen.call_args[0][0] + assert seen_call_arg == str(img_path) + + assert result.new_files == 1 + assert result.processed == 1 From effdb30888ccb369944f71ac9b7327ee2db2b2b3 Mon Sep 17 00:00:00 2001 From: yhyatt Date: Wed, 25 Feb 2026 14:20:24 +0200 Subject: [PATCH 2/4] Update tests/test_gcs_watcher.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/test_gcs_watcher.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/test_gcs_watcher.py b/tests/test_gcs_watcher.py index 3f52c46..da1c710 100644 --- a/tests/test_gcs_watcher.py +++ b/tests/test_gcs_watcher.py @@ -6,8 +6,6 @@ from pathlib import Path from unittest.mock import MagicMock, patch -import pytest - from dmaf.gcs_watcher import cleanup_temp_file, is_gcs_uri, parse_gcs_uri From 5b6c68c5a4e0ee1cb301baf33e30e9765207ed5a Mon Sep 17 00:00:00 2001 From: yhyatt Date: Wed, 25 Feb 2026 14:21:17 +0200 Subject: [PATCH 3/4] Update src/dmaf/gcs_watcher.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/dmaf/gcs_watcher.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/dmaf/gcs_watcher.py b/src/dmaf/gcs_watcher.py index 905159c..b4b085c 100644 --- a/src/dmaf/gcs_watcher.py +++ b/src/dmaf/gcs_watcher.py @@ -40,9 +40,16 @@ def parse_gcs_uri(uri: str) -> tuple[str, str]: Returns: (bucket_name, prefix) where prefix may be empty string + + Raises: + ValueError: If the URI is not a valid GCS URI with scheme 'gs' and a non-empty bucket. """ parsed = urlparse(uri) + if parsed.scheme != "gs": + raise ValueError(f"Invalid GCS URI '{uri}': scheme must be 'gs'") bucket = parsed.netloc + if not bucket: + raise ValueError(f"Invalid GCS URI '{uri}': bucket name is missing") prefix = parsed.path.lstrip("/") return bucket, prefix From 3d9fd9633cb386d41d4b7fbbcc50d0bca0993867 Mon Sep 17 00:00:00 2001 From: yhyatt Date: Wed, 25 Feb 2026 14:21:40 +0200 Subject: [PATCH 4/4] Update src/dmaf/watcher.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/dmaf/watcher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dmaf/watcher.py b/src/dmaf/watcher.py index 12e95b6..e518b52 100644 --- a/src/dmaf/watcher.py +++ b/src/dmaf/watcher.py @@ -153,8 +153,8 @@ def _process_image_file( Returns: (was_matched, had_error) tuple """ - img = Image.open(image_path).convert("RGB") - np_img = np.array(img) + with Image.open(image_path) as img: + np_img = np.array(img.convert("RGB")) h = sha256_of_file(image_path) result = handler.process_fn(np_img)