diff --git a/app/core/config.py b/app/core/config.py index 6b31df8..e250dd2 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -96,6 +96,9 @@ class Settings(BaseSettings): instagram_navigation_timeout: int = 12 instagram_og_wait_timeout_ms: int = 3000 instagram_block_resource_types: str = "image,font,media" + instagram_image_fetch_timeout_ms: int = 8000 + instagram_image_fetch_max_images: int = 10 + instagram_image_fetch_max_next_clicks: int = 10 instagram_rate_limit_cooldown_seconds: int = 1800 instagram_cooldown_key: str = "processing:cooldown:instagram" @@ -134,6 +137,12 @@ class Settings(BaseSettings): hf_extraction_max_attempts: int = 3 hf_extraction_retry_base_seconds: float = 0.0 hf_extraction_retry_backoff_multiplier: float = 2.0 + hf_ocr_endpoint_url: str = "" + hf_ocr_api_token: str = "" + hf_ocr_model_name: str = "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8" + hf_ocr_timeout_seconds: int = 30 + hf_ocr_max_new_tokens: int = 1024 + hf_ocr_max_attempts: int = 2 extraction_failure_retry_enabled: bool = True @field_validator("processing_schema") diff --git a/app/infra/llm/__init__.py b/app/infra/llm/__init__.py index c15270e..70d967e 100644 --- a/app/infra/llm/__init__.py +++ b/app/infra/llm/__init__.py @@ -1,6 +1,8 @@ from app.infra.llm.client import ( HFExtractionClient, HFExtractionError, + HFOCRClient, + HFOCRError, extract_json_object, extract_text_from_hf_payload, ) @@ -8,6 +10,8 @@ __all__ = [ "HFExtractionClient", "HFExtractionError", + "HFOCRClient", + "HFOCRError", "extract_json_object", "extract_text_from_hf_payload", ] diff --git a/app/infra/llm/client.py b/app/infra/llm/client.py index 692f1ab..9489220 100644 --- a/app/infra/llm/client.py +++ b/app/infra/llm/client.py @@ -98,6 +98,10 @@ class HFExtractionError(Exception): pass +class HFOCRError(Exception): + pass + + class HFExtractionClient: def __init__( self, @@ -227,6 +231,117 @@ def _limit_places(self, result: ExtractionResult) -> ExtractionResult: return result +OCR_PROMPT = ( + "You are an OCR engine. Extract all visible Korean and English text from this " + "Instagram post image. Preserve useful line breaks. Return only the extracted " + "text, with no preface, explanation, Markdown, or translation. If no text is " + "visible, return an empty string." +) + + +class HFOCRClient: + def __init__( + self, + settings: Settings, + *, + transport: httpx.AsyncBaseTransport | None = None, + ) -> None: + self._settings = settings + self._transport = transport + + async def extract_text_from_image_url(self, image_url: str) -> str: + if not str(image_url or "").strip(): + return "" + endpoint_url = self._endpoint_url() + api_token = self._api_token() + if not endpoint_url: + raise HFOCRError("HF OCR endpoint URL is empty") + if not api_token: + raise HFOCRError("HF OCR API token is empty") + + payload = self._build_payload(image_url=image_url) + headers = { + "Authorization": f"Bearer {api_token}", + "Content-Type": "application/json", + } + timeout = httpx.Timeout(self._settings.hf_ocr_timeout_seconds) + max_attempts = max(1, self._settings.hf_ocr_max_attempts) + last_error: HFOCRError | None = None + + async with httpx.AsyncClient(timeout=timeout, transport=self._transport) as client: + for attempt in range(1, max_attempts + 1): + try: + response = await client.post( + endpoint_url, + headers=headers, + json=payload, + ) + except (httpx.TimeoutException, httpx.NetworkError) as exc: + last_error = HFOCRError(str(exc) or exc.__class__.__name__) + if attempt >= max_attempts: + raise last_error from exc + await self._sleep_before_retry(attempt) + continue + + if response.status_code >= 400: + error = HFOCRError(f"HF OCR request failed ({response.status_code})") + if not _is_retryable_status(response.status_code) or attempt >= max_attempts: + raise error + last_error = error + await self._sleep_before_retry(attempt) + continue + + return self._parse_response(response) + + if last_error is not None: + raise last_error + raise HFOCRError("HF OCR failed") + + async def extract_texts_from_image_urls(self, image_urls: list[str]) -> list[str]: + texts: list[str] = [] + for image_url in image_urls: + text = (await self.extract_text_from_image_url(image_url)).strip() + if text: + texts.append(text) + return texts + + def _build_payload(self, *, image_url: str) -> dict[str, Any]: + return { + "model": self._settings.hf_ocr_model_name, + "messages": [ + { + "role": "user", + "content": [ + {"type": "text", "text": OCR_PROMPT}, + {"type": "image_url", "image_url": {"url": image_url}}, + ], + } + ], + "temperature": 0.0, + "max_tokens": self._settings.hf_ocr_max_new_tokens, + } + + def _endpoint_url(self) -> str: + return self._settings.hf_ocr_endpoint_url or self._settings.hf_extraction_endpoint_url + + def _api_token(self) -> str: + return self._settings.hf_ocr_api_token or self._settings.hf_extraction_api_token + + def _parse_response(self, response: httpx.Response) -> str: + try: + response_payload = response.json() + except json.JSONDecodeError as exc: + raise HFOCRError("HF OCR response is not valid JSON") from exc + return extract_text_from_hf_payload(response_payload).strip() + + async def _sleep_before_retry(self, attempt: int) -> None: + base_seconds = max(0.0, self._settings.hf_extraction_retry_base_seconds) + if base_seconds <= 0: + return + multiplier = max(1.0, self._settings.hf_extraction_retry_backoff_multiplier) + await asyncio.sleep(base_seconds * (multiplier ** max(0, attempt - 1))) + + def extract_text_from_hf_payload(payload: Any) -> str: if isinstance(payload, str): return payload diff --git a/app/services/crawler/playwright_service.py b/app/services/crawler/playwright_service.py index ede26cc..9c6a4d3 100644 --- a/app/services/crawler/playwright_service.py +++ b/app/services/crawler/playwright_service.py @@ -20,6 +20,7 @@ from app.core.config import Settings from app.services.crawler.instagram_context import ( INSTAGRAM_BROWSER_ARGS, + InstagramPageRouteStats, OG_EXTRACTION_JS, OG_READY_PREDICATE_JS, configure_instagram_page, @@ -126,6 +127,107 @@ }; }""" +INSTAGRAM_IMAGE_URL_EXTRACTION_JS = r"""() => { + const urls = []; + const push = (value) => { + if (!value || typeof value !== "string") return; + const trimmed = value.trim(); + if (!trimmed || trimmed.startsWith("data:") || trimmed.startsWith("blob:")) return; + try { + urls.push(new URL(trimmed, window.location.href).href); + } catch (e) {} + }; + const pushBestCandidate = (media) => { + const candidates = media && media.image_versions2 && media.image_versions2.candidates; + if (!Array.isArray(candidates) || candidates.length === 0) return; + const sorted = [...candidates].sort((a, b) => ((b.width || 0) * (b.height || 0)) - ((a.width || 0) * (a.height || 0))); + push(sorted[0] && sorted[0].url); + }; + const shortcode = (window.location.pathname || "").split("/").filter(Boolean)[1] || ""; + const seenObjects = new Set(); + const findCurrentMedia = (value) => { + if (!value || typeof value !== "object" || seenObjects.has(value)) return null; + seenObjects.add(value); + if ( + (value.code === shortcode || value.shortcode === shortcode) && + (value.image_versions2 || Array.isArray(value.carousel_media)) + ) { + return value; + } + if (Array.isArray(value)) { + for (const item of value) { + const found = findCurrentMedia(item); + if (found) return found; + } + return null; + } + for (const item of Object.values(value)) { + const found = findCurrentMedia(item); + if (found) return found; + } + return null; + }; + + for (const script of Array.from(document.querySelectorAll('script[type="application/json"]'))) { + const text = script.textContent || ""; + if (!shortcode || !text.includes(shortcode)) continue; + try { + const media = findCurrentMedia(JSON.parse(text)); + if (!media) continue; + const carousel = Array.isArray(media.carousel_media) ? media.carousel_media : []; + if (carousel.length > 0) { + carousel.forEach(pushBestCandidate); + } else { + pushBestCandidate(media); + } + if (urls.length > 0) return urls; + } catch (e) {} + } + + const pushSrcset = (value) => { + if (!value || typeof value !== "string") return; + for (const part of value.split(",")) { + const candidate = part.trim().split(/\s+/)[0]; + push(candidate); + } + }; + + document.querySelectorAll( + 'meta[property="og:image"], meta[name="twitter:image"], meta[property="twitter:image"]' + ).forEach((el) => push(el.getAttribute("content"))); + document.querySelectorAll("link[as='image']").forEach((el) => { + push(el.getAttribute("href")); + pushSrcset(el.getAttribute("imagesrcset")); + }); + document.querySelectorAll("article img, main img").forEach((el) => { + push(el.currentSrc || el.src || el.getAttribute("src")); + pushSrcset(el.getAttribute("srcset")); + }); + document.querySelectorAll("picture source").forEach((el) => { + push(el.getAttribute("src")); + pushSrcset(el.getAttribute("srcset")); + }); + + const scripts = Array.from(document.querySelectorAll("script")); + const cdnPattern = /https?:\\?\/\\?\/[^"'\\\s<>]+(?:cdninstagram|fbcdn)[^"'\\\s<>]+?\.(?:jpg|jpeg|png|webp)(?:\?[^"'\\\s<>]*)?/gi; + for (const script of scripts) { + const text = script.textContent || ""; + for (const match of text.matchAll(cdnPattern)) { + push(match[0].replaceAll("\\/", "/").replaceAll("\\u0026", "&")); + } + } + + return urls; +}""" + +INSTAGRAM_CAROUSEL_NEXT_SELECTORS = [ + 'button[aria-label="Next"]', + 'button[aria-label="다음"]', + 'button[aria-label="次へ"]', + 'button[aria-label="Siguiente"]', + 'button[aria-label="Suivant"]', +] + @dataclass(slots=True) class InstagramFetchResult: @@ -154,6 +256,22 @@ class InstagramFetchResult: empty_body: bool = False +@dataclass(slots=True) +class InstagramImageFetchResult: + source_url: str + image_urls: list[str] + response_status: int | None = None + response_url: str | None = None + final_url: str | None = None + total_ms: int = 0 + navigation_ms: int = 0 + collection_ms: int = 0 + next_clicks: int = 0 + timed_out: bool = False + blocked_resource_count: int = 0 + error: str | None = None + + @dataclass(slots=True) class NaverBlogFetchResult: html: str | None @@ -310,6 +428,141 @@ async def _instagram_page_diagnostics(page) -> dict[str, Any]: return {} +def _normalize_instagram_image_urls(urls: list[Any], *, max_images: int) -> list[str]: + normalized: list[str] = [] + seen_by_key: dict[str, int] = {} + limit = max(1, int(max_images)) + for value in urls: + text = str(value or "").strip() + if not text or text.startswith(("data:", "blob:")): + continue + parsed = urlparse(text) + if parsed.scheme not in {"http", "https"}: + continue + if not parsed.netloc: + continue + clean = parsed._replace(fragment="").geturl() + if not _is_probable_instagram_post_image_url(clean): + continue + key = _instagram_image_dedupe_key(parsed) + existing_index = seen_by_key.get(key) + if existing_index is not None: + if _instagram_image_quality_score(clean) > _instagram_image_quality_score( + normalized[existing_index] + ): + normalized[existing_index] = clean + continue + seen_by_key[key] = len(normalized) + normalized.append(clean) + if len(normalized) >= limit: + break + return normalized + + +def _is_probable_instagram_post_image_url(url: str) -> bool: + lower = url.lower() + if "cdninstagram.com" not in lower and "fbcdn.net" not in lower: + return False + if re.search(r"/t\d+\.\d+-19/", lower): + return False + if "profile_pic" in lower or "s150x150" in lower: + return False + return any(ext in lower for ext in (".jpg", ".jpeg", ".png", ".webp")) + + +def _instagram_image_dedupe_key(parsed) -> str: + filename = (parsed.path or "").rsplit("/", 1)[-1] + return filename or parsed._replace(query="", fragment="").geturl() + + +def _instagram_image_quality_score(url: str) -> int: + lower = url.lower() + score = 0 + if "_dst-jpg" not in lower and "a_dst-jpg" not in lower: + score += 2 + if re.search(r"[?&]stp=[^&]*c\d+\.", lower): + score -= 2 + if "s640x640" in lower: + score -= 1 + return score + + +async def _extract_instagram_image_urls_from_page(page, *, max_images: int) -> list[str]: + try: + raw = await page.evaluate(INSTAGRAM_IMAGE_URL_EXTRACTION_JS) + except Exception: + logger.debug("instagram image url extraction evaluation failed", exc_info=True) + return [] + if not isinstance(raw, list): + return [] + return _normalize_instagram_image_urls(raw, max_images=max_images) + + +async def _configure_instagram_image_page(page, settings: Settings) -> InstagramPageRouteStats: + blocked_types = settings.instagram_block_resource_type_set - {"image"} + stats = InstagramPageRouteStats() + if not blocked_types: + return stats + + async def _on_route(route) -> None: + resource_type = route.request.resource_type + if resource_type in blocked_types: + stats.blocked_resource_count += 1 + await route.abort() + return + await route.continue_() + + await page.route("**/*", _on_route) + return stats + + +async def _click_instagram_carousel_next(page) -> bool: + for selector in INSTAGRAM_CAROUSEL_NEXT_SELECTORS: + try: + locator = page.locator(selector).first + if await locator.count() <= 0: + continue + if not await locator.is_visible(): + continue + await locator.click(timeout=1000) + return True + except PlaywrightTimeoutError: + continue + except Exception: + logger.debug("instagram carousel next click failed selector=%s", selector, exc_info=True) + continue + return False + + +async def _collect_instagram_post_image_urls(page, settings: Settings) -> tuple[list[str], int, int]: + max_images = max(1, int(settings.instagram_image_fetch_max_images)) + max_next_clicks = max(0, int(settings.instagram_image_fetch_max_next_clicks)) + urls: list[str] = [] + next_clicks = 0 + started = time.monotonic() + + def merge(new_urls: list[str]) -> None: + nonlocal urls + urls = _normalize_instagram_image_urls(urls + new_urls, max_images=max_images) + + merge(await _extract_instagram_image_urls_from_page(page, max_images=max_images)) + for _ in range(max_next_clicks): + if len(urls) >= max_images: + break + clicked = await _click_instagram_carousel_next(page) + if not clicked: + break + next_clicks += 1 + await page.wait_for_timeout(350) + before = len(urls) + merge(await _extract_instagram_image_urls_from_page(page, max_images=max_images)) + if len(urls) == before: + continue + + collection_ms = int((time.monotonic() - started) * 1000) + return urls, next_clicks, collection_ms + + def _instagram_result_metadata(result: InstagramFetchResult) -> dict[str, Any]: return { "extraction_source": "instagram_og_meta", @@ -835,6 +1088,92 @@ async def fetch_instagram_media_result(url: str, settings: Settings) -> Instagra ) +async def _fetch_instagram_post_images_inner(url: str, settings: Settings) -> InstagramImageFetchResult: + started = time.monotonic() + navigation_timeout_ms = max(1, settings.instagram_navigation_timeout) * 1000 + logger.info( + "crawler start mode=instagram_images url=%s navigation_timeout_ms=%s max_images=%s max_next_clicks=%s", + safe_url_for_log(url), + navigation_timeout_ms, + settings.instagram_image_fetch_max_images, + settings.instagram_image_fetch_max_next_clicks, + ) + launch_args = _browser_args(settings) + list(INSTAGRAM_BROWSER_ARGS) + async with async_playwright() as p: + browser = await p.chromium.launch(headless=True, args=launch_args) + try: + context = await new_instagram_browser_context(browser, settings) + try: + page = await context.new_page() + route_stats = await _configure_instagram_image_page(page, settings) + navigation_started = time.monotonic() + response = await page.goto( + url, + wait_until="domcontentloaded", + timeout=navigation_timeout_ms, + ) + navigation_ms = int((time.monotonic() - navigation_started) * 1000) + image_urls, next_clicks, collection_ms = await _collect_instagram_post_image_urls( + page, + settings, + ) + total_ms = int((time.monotonic() - started) * 1000) + result = InstagramImageFetchResult( + source_url=url, + image_urls=image_urls, + response_status=response.status if response is not None else None, + response_url=response.url if response is not None else None, + final_url=getattr(page, "url", None), + total_ms=total_ms, + navigation_ms=navigation_ms, + collection_ms=collection_ms, + next_clicks=next_clicks, + blocked_resource_count=route_stats.blocked_resource_count, + ) + logger.info( + ( + "crawler done mode=instagram_images url=%s total_ms=%s " + "navigation_ms=%s collection_ms=%s image_count=%s next_clicks=%s " + "response_status=%s final_url=%s blocked_resource_count=%s" + ), + safe_url_for_log(url), + result.total_ms, + result.navigation_ms, + result.collection_ms, + len(result.image_urls), + result.next_clicks, + result.response_status, + safe_url_for_log(result.final_url), + result.blocked_resource_count, + ) + return result + finally: + await context.close() + finally: + await browser.close() + + +async def fetch_instagram_post_images(url: str, settings: Settings) -> InstagramImageFetchResult: + timeout_seconds = max( + 5.0, + (max(1, settings.instagram_navigation_timeout) * 1000 + max(0, settings.instagram_image_fetch_timeout_ms)) + / 1000.0 + + max(0.0, settings.crawler_hard_timeout_margin_seconds), + ) + try: + return await asyncio.wait_for( + _fetch_instagram_post_images_inner(str(url), settings), + timeout=timeout_seconds, + ) + except (asyncio.TimeoutError, TimeoutError): + return InstagramImageFetchResult( + source_url=str(url), + image_urls=[], + timed_out=True, + error="timeout", + ) + + async def fetch_generic_web_content(url: str, settings: Settings) -> tuple[str | None, str]: nav_ms = max(1, settings.crawler_timeout) * 1000 hard_timeout = max(5.0, nav_ms / 1000.0 + max(0.0, settings.crawler_hard_timeout_margin_seconds)) diff --git a/app/worker/processor.py b/app/worker/processor.py index 3835205..5f6d3be 100644 --- a/app/worker/processor.py +++ b/app/worker/processor.py @@ -22,9 +22,10 @@ extracted_places_from_result, ) from app.domain.job.service import INSTAGRAM_RATE_LIMITED_ERROR_CODE -from app.domain.url_contract import crawl_url_for, is_instagram_media_url +from app.domain.url_contract import crawl_url_for, is_instagram_media_url, is_instagram_post_url from app.infra.llm import HFExtractionError from app.infra.kakao import KakaoNonRetryableError +from app.services.crawler.playwright_service import fetch_instagram_post_images from app.services.crawler.extractors.registry import UnsupportedPlatformUrlError logger = logging.getLogger("processing.worker.processor") @@ -32,6 +33,27 @@ UNSUPPORTED_PLATFORM_URL_ERROR_CODE = "UNSUPPORTED_PLATFORM_URL" +def build_instagram_ocr_augmented_content( + *, + caption: str, + ocr_texts: list[str], +) -> str: + sections: list[tuple[str, str]] = [] + clean_caption = (caption or "").strip() + if clean_caption: + sections.append(("[caption]", clean_caption)) + + clean_ocr_texts = [text.strip() for text in ocr_texts if text and text.strip()] + if clean_ocr_texts: + image_text = "\n\n".join( + f"image {index}:\n{text}" + for index, text in enumerate(clean_ocr_texts, 1) + ) + sections.append(("[image_ocr]", image_text)) + + return "\n\n".join(f"{header}\n{body}" for header, body in sections).strip() + + @dataclass(slots=True) class JobProcessOutcome: processed: bool @@ -74,6 +96,10 @@ async def extract( ) -> ExtractionResult | None: ... +class OCRPort(Protocol): + async def extract_texts_from_image_urls(self, image_urls: list[str]) -> list[str]: ... + + class InstagramCooldownPort(Protocol): async def set_instagram_cooldown(self, seconds: int) -> None: ... @@ -99,12 +125,14 @@ def __init__( repository: JobRepositoryPort, settings: Settings, extraction_client: ExtractionPort | None = None, + ocr_client: OCRPort | None = None, place_search_client: PlaceSearchPort | None = None, cooldown_store: InstagramCooldownPort | None = None, ) -> None: self._repository = repository self._settings = settings self._extraction_client = extraction_client + self._ocr_client = ocr_client self._place_search_client = place_search_client self._cooldown_store = cooldown_store @@ -404,9 +432,26 @@ async def _extract_result( original_url: str, crawl_artifact: CrawlArtifact, ) -> ExtractionResult | None: - if not self._extraction_client or not crawl_artifact.content_text: + if not self._extraction_client: return None + extraction_result: ExtractionResult | None = None + if crawl_artifact.content_text: + extraction_result = await self._run_extraction(original_url, crawl_artifact) + if self._has_extracted_places(extraction_result): + return extraction_result + + fallback_result = await self._extract_with_instagram_image_fallback( + original_url, + crawl_artifact, + ) + return fallback_result if fallback_result is not None else extraction_result + + async def _run_extraction( + self, + original_url: str, + crawl_artifact: CrawlArtifact, + ) -> ExtractionResult | None: try: return await self._extraction_client.extract( text=crawl_artifact.content_text, @@ -419,6 +464,77 @@ async def _extract_result( raise return None + async def _extract_with_instagram_image_fallback( + self, + original_url: str, + crawl_artifact: CrawlArtifact, + ) -> ExtractionResult | None: + if not self._ocr_client: + return None + if not self._should_run_instagram_image_fallback(original_url, crawl_artifact): + return None + + try: + image_result = await fetch_instagram_post_images(crawl_artifact.url, self._settings) + ocr_texts = await self._ocr_client.extract_texts_from_image_urls(image_result.image_urls) + augmented_text = build_instagram_ocr_augmented_content( + caption=crawl_artifact.content_text, + ocr_texts=ocr_texts, + ) + self._record_instagram_ocr_fallback( + crawl_artifact, + image_count=len(image_result.image_urls), + ocr_text_count=len(ocr_texts), + image_fetch_timed_out=image_result.timed_out, + image_fetch_error=image_result.error, + ) + if not augmented_text: + return None + crawl_artifact.content_text = augmented_text + return await self._run_extraction(original_url, crawl_artifact) + except Exception: + logger.exception("instagram image OCR fallback failed original_url=%s", original_url) + if self._settings.extraction_failure_retry_enabled: + raise + return None + + @staticmethod + def _has_extracted_places(result: ExtractionResult | None) -> bool: + return bool(result and extracted_places_from_result(result)) + + @staticmethod + def _should_run_instagram_image_fallback( + original_url: str, + crawl_artifact: CrawlArtifact, + ) -> bool: + return ( + crawl_artifact.source_type == "INSTAGRAM" + and crawl_artifact.media_type == "post" + and is_instagram_post_url(original_url) + ) + + @staticmethod + def _record_instagram_ocr_fallback( + crawl_artifact: CrawlArtifact, + *, + image_count: int, + ocr_text_count: int, + image_fetch_timed_out: bool, + image_fetch_error: str | None, + ) -> None: + raw_metadata = dict(crawl_artifact.raw_metadata or {}) + instagram_metadata = dict(raw_metadata.get("instagram") or {}) + instagram_metadata["ocr_fallback"] = { + "attempted": True, + "image_count": image_count, + "ocr_text_count": ocr_text_count, + "image_fetch_timed_out": image_fetch_timed_out, + "image_fetch_error": image_fetch_error, + } + raw_metadata["instagram"] = instagram_metadata + raw_metadata["extraction_source"] = "instagram_og_meta_with_image_ocr_fallback" + crawl_artifact.raw_metadata = raw_metadata + async def _enrich_place( self, extraction_result: ExtractionResult | None, diff --git a/app/worker/runner.py b/app/worker/runner.py index 73ecaa4..15eda95 100644 --- a/app/worker/runner.py +++ b/app/worker/runner.py @@ -9,7 +9,7 @@ from app.core.config import get_settings from app.infra.db import JobRepository, create_db_pool from app.infra.kakao import KakaoLocalClient -from app.infra.llm import HFExtractionClient +from app.infra.llm import HFExtractionClient, HFOCRClient from app.infra.queue import RedisJobQueue from app.services.crawler.playwright_service import prewarm_crawler_runtime, shutdown_crawler_runtime from app.worker.processor import ExtractionPort, JobProcessor @@ -93,6 +93,15 @@ def build_extraction_client(settings) -> ExtractionPort | None: return HFExtractionClient(settings) +def build_ocr_client(settings) -> HFOCRClient | None: + endpoint_url = settings.hf_ocr_endpoint_url or settings.hf_extraction_endpoint_url + api_token = settings.hf_ocr_api_token or settings.hf_extraction_api_token + if not endpoint_url or not api_token: + logger.info("worker OCR client disabled (HF OCR endpoint URL/token is empty)") + return None + return HFOCRClient(settings) + + def build_place_search_client(settings): if not settings.kakao_rest_api_key: logger.warning("worker kakao client disabled (KAKAO_REST_API_KEY is empty)") @@ -119,6 +128,7 @@ async def run_worker() -> None: repository=repository, settings=settings, extraction_client=build_extraction_client(settings), + ocr_client=build_ocr_client(settings), place_search_client=build_place_search_client(settings), cooldown_store=queue, ) diff --git a/tests/test_config.py b/tests/test_config.py index 34f7161..e15f724 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -84,3 +84,14 @@ def test_youtube_defaults_do_not_require_live_api_key() -> None: assert settings.youtube_content_max_chars == 20000 assert settings.youtube_description_max_chars == 5000 assert settings.youtube_comment_max_chars == 2000 + + +def test_instagram_image_fetch_and_hf_ocr_defaults_are_separate_from_caption_extraction() -> None: + settings = Settings() + + assert settings.instagram_image_fetch_timeout_ms == 8000 + assert settings.instagram_image_fetch_max_images == 10 + assert settings.instagram_image_fetch_max_next_clicks == 10 + assert settings.hf_ocr_endpoint_url == "" + assert settings.hf_ocr_api_token == "" + assert settings.hf_ocr_model_name == "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8" diff --git a/tests/test_hf_extraction_client.py b/tests/test_hf_extraction_client.py index 5751218..ab7f776 100644 --- a/tests/test_hf_extraction_client.py +++ b/tests/test_hf_extraction_client.py @@ -11,6 +11,8 @@ from app.infra.llm import ( HFExtractionClient, HFExtractionError, + HFOCRClient, + HFOCRError, extract_json_object, extract_text_from_hf_payload, ) @@ -337,3 +339,102 @@ def test_hf_extraction_client_raises_when_endpoint_is_missing() -> None: media_type=None, ) ) + + +def test_hf_ocr_client_sends_image_url_chat_payload() -> None: + seen_requests: list[dict[str, object]] = [] + + async def handler(request: httpx.Request) -> httpx.Response: + seen_requests.append(json.loads(request.content.decode("utf-8"))) + return httpx.Response( + 200, + json={"choices": [{"message": {"content": "서울 종로구\n커먼맨션"}}]}, + ) + + client = HFOCRClient( + Settings( + hf_ocr_endpoint_url="https://example.test/v1/chat/completions", + hf_ocr_api_token="test-token", + hf_ocr_model_name="meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8", + hf_ocr_max_new_tokens=512, + ), + transport=httpx.MockTransport(handler), + ) + + text = _run( + client.extract_text_from_image_url( + "https://scontent.cdninstagram.com/v/t51.29350-15/image.jpg" + ) + ) + + assert text == "서울 종로구\n커먼맨션" + assert seen_requests[0]["model"] == "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8" + assert seen_requests[0]["max_tokens"] == 512 + message = seen_requests[0]["messages"][0] + assert message["role"] == "user" + assert message["content"][0]["type"] == "text" + assert message["content"][1] == { + "type": "image_url", + "image_url": {"url": "https://scontent.cdninstagram.com/v/t51.29350-15/image.jpg"}, + } + + +def test_hf_ocr_client_retries_transient_failure() -> None: + calls = 0 + + async def handler(request: httpx.Request) -> httpx.Response: + nonlocal calls + calls += 1 + if calls == 1: + return httpx.Response(503, json={"error": "busy"}) + return httpx.Response(200, json={"generated_text": "OCR text"}) + + client = HFOCRClient( + Settings( + hf_ocr_endpoint_url="https://example.test/v1/chat/completions", + hf_ocr_api_token="test-token", + hf_ocr_max_attempts=2, + ), + transport=httpx.MockTransport(handler), + ) + + assert _run(client.extract_text_from_image_url("https://example.test/image.jpg")) == "OCR text" + assert calls == 2 + + +def test_hf_ocr_client_raises_when_endpoint_is_missing() -> None: + client = HFOCRClient( + Settings( + hf_ocr_endpoint_url="", + hf_ocr_api_token="test-token", + hf_extraction_endpoint_url="", + hf_extraction_api_token="", + ) + ) + + with pytest.raises(HFOCRError): + _run(client.extract_text_from_image_url("https://example.test/image.jpg")) + + +def test_hf_ocr_client_falls_back_to_extraction_hf_router_settings() -> None: + seen_urls: list[str] = [] + seen_auth: list[str | None] = [] + + async def handler(request: httpx.Request) -> httpx.Response: + seen_urls.append(str(request.url)) + seen_auth.append(request.headers.get("Authorization")) + return httpx.Response(200, json={"generated_text": "fallback OCR"}) + + client = HFOCRClient( + Settings( + hf_ocr_endpoint_url="", + hf_ocr_api_token="", + hf_extraction_endpoint_url="https://example.test/v1/chat/completions", + hf_extraction_api_token="fallback-token", + ), + transport=httpx.MockTransport(handler), + ) + + assert _run(client.extract_text_from_image_url("https://example.test/image.jpg")) == "fallback OCR" + assert seen_urls == ["https://example.test/v1/chat/completions"] + assert seen_auth == ["Bearer fallback-token"] diff --git a/tests/test_instagram_ocr_fallback_live.py b/tests/test_instagram_ocr_fallback_live.py new file mode 100644 index 0000000..7b2c8d9 --- /dev/null +++ b/tests/test_instagram_ocr_fallback_live.py @@ -0,0 +1,80 @@ +from __future__ import annotations + +import asyncio +import os + +import pytest + +from app.core.config import Settings +from app.domain.job import extracted_places_from_result +from app.infra.llm import HFExtractionClient, HFOCRClient +from app.services.crawler.playwright_service import ( + fetch_instagram_media_result, + fetch_instagram_post_images, +) +from app.worker.processor import build_instagram_ocr_augmented_content + + +LIVE_POST_URLS = [ + "https://www.instagram.com/p/DLmXSK3znl-", + "https://www.instagram.com/p/C6Kp79xRiGC", + "https://www.instagram.com/p/DYgvDx0gc_X", + "https://www.instagram.com/p/DX8pE6WEeom", +] + + +def _run_with_subprocesses(coro): + if hasattr(asyncio, "WindowsProactorEventLoopPolicy"): + asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) + try: + return asyncio.run(coro) + finally: + if hasattr(asyncio, "WindowsSelectorEventLoopPolicy"): + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + + +async def _run_live_ocr_fallback_pipeline(url: str, settings: Settings): + caption_result = await fetch_instagram_media_result(url, settings) + image_result = await fetch_instagram_post_images(url, settings) + ocr_texts = await HFOCRClient(settings).extract_texts_from_image_urls(image_result.image_urls) + augmented = build_instagram_ocr_augmented_content( + caption=caption_result.caption, + ocr_texts=ocr_texts, + ) + extraction = await HFExtractionClient(settings).extract( + text=augmented, + original_url=url, + media_type="post", + ) + return image_result, ocr_texts, augmented, extraction + + +@pytest.mark.skipif( + os.getenv("RUN_LIVE_INSTAGRAM_OCR_FALLBACK_TESTS") != "1", + reason="Set RUN_LIVE_INSTAGRAM_OCR_FALLBACK_TESTS=1 to call live Instagram and HF APIs.", +) +@pytest.mark.parametrize("url", LIVE_POST_URLS) +def test_live_instagram_ocr_fallback_pipeline_extracts_places(url: str) -> None: + settings = Settings( + hf_ocr_timeout_seconds=60, + hf_ocr_max_attempts=1, + hf_ocr_max_new_tokens=1024, + hf_extraction_timeout_seconds=60, + hf_extraction_max_new_tokens=2048, + instagram_image_fetch_max_images=10, + instagram_image_fetch_timeout_ms=12000, + crawler_browser_reuse_enabled=False, + ) + if not (settings.hf_extraction_endpoint_url and settings.hf_extraction_api_token): + pytest.skip("HF extraction endpoint/token are not configured") + + image_result, ocr_texts, augmented, extraction = _run_with_subprocesses( + _run_live_ocr_fallback_pipeline(url, settings) + ) + + assert image_result.response_status == 200 + assert len(image_result.image_urls) >= 1 + assert ocr_texts + assert "[image_ocr]" in augmented + assert extraction is not None + assert extracted_places_from_result(extraction) diff --git a/tests/test_playwright_service.py b/tests/test_playwright_service.py index da9674c..d0500fd 100644 --- a/tests/test_playwright_service.py +++ b/tests/test_playwright_service.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import os from contextlib import asynccontextmanager from types import SimpleNamespace @@ -35,6 +36,16 @@ def _run(coro): pytest.skip(f"Event loop creation is blocked in this environment: {exc}") +def _run_with_subprocesses(coro): + if not hasattr(asyncio, "WindowsProactorEventLoopPolicy"): + return _run(coro) + asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) + try: + return asyncio.run(coro) + finally: + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + + class _FakeRuntime: class _BrowserStub: @staticmethod @@ -64,6 +75,47 @@ async def shutdown(self) -> None: self._launched = False +class _FakeInstagramImagePage: + def __init__(self) -> None: + self.step = 0 + + async def evaluate(self, script: str): + assert script == service.INSTAGRAM_IMAGE_URL_EXTRACTION_JS + if self.step == 0: + return [ + "https://scontent.cdninstagram.com/v/t51.29350-15/one.jpg?x=1#frag", + "https://scontent.cdninstagram.com/v/t51.29350-15/one.jpg?x=1#frag", + ] + return [ + "https://scontent.cdninstagram.com/v/t51.29350-15/two.jpg?x=2", + ] + + def locator(self, _selector: str): + return _FakeInstagramNextLocator(self) + + async def wait_for_timeout(self, _timeout_ms: int): + return None + + +class _FakeInstagramNextLocator: + def __init__(self, page: _FakeInstagramImagePage) -> None: + self._page = page + + @property + def first(self): + return self + + async def count(self) -> int: + return 1 if self._page.step == 0 else 0 + + async def is_visible(self) -> bool: + return self._page.step == 0 + + async def click(self, timeout: int): + assert timeout == 1000 + self._page.step += 1 + + def test_naver_blog_log_no_extraction_from_path_and_query() -> None: assert service.extract_naver_blog_log_no("https://blog.naver.com/example/123") == "123" assert ( @@ -91,6 +143,41 @@ def test_naver_blog_text_normalization_removes_zero_width_and_compacts_blank_lin assert service.normalize_naver_blog_text(text) == "hello world\n\nsecond line" +def test_instagram_image_url_normalization_deduplicates_and_limits() -> None: + urls = service._normalize_instagram_image_urls( + [ + "https://scontent.cdninstagram.com/a.jpg?x=1#fragment", + "https://scontent.cdninstagram.com/a.jpg?x=1#fragment", + "data:image/png;base64,abc", + "not-a-url", + "https://scontent.cdninstagram.com/v/t51.2885-19/profile.jpg?stp=dst-jpg_s150x150", + "https://scontent.cdninstagram.com/b.webp", + ], + max_images=1, + ) + + assert urls == ["https://scontent.cdninstagram.com/a.jpg?x=1"] + + +@pytest.mark.skipif(not EVENT_LOOP_AVAILABLE, reason="Event loop creation is blocked in this environment") +def test_collect_instagram_post_image_urls_clicks_carousel_next() -> None: + page = _FakeInstagramImagePage() + + urls, next_clicks, collection_ms = _run( + service._collect_instagram_post_image_urls( + page, + Settings(instagram_image_fetch_max_images=5, instagram_image_fetch_max_next_clicks=3), + ) + ) + + assert urls == [ + "https://scontent.cdninstagram.com/v/t51.29350-15/one.jpg?x=1", + "https://scontent.cdninstagram.com/v/t51.29350-15/two.jpg?x=2", + ] + assert next_clicks == 1 + assert collection_ms >= 0 + + def test_instagram_fetch_metadata_sanitizes_diagnostic_urls() -> None: result = service.InstagramFetchResult( caption="", @@ -304,3 +391,32 @@ def test_prewarm_runtime_calls_ensure_browser(monkeypatch) -> None: assert warmed is True assert runtime.ensure_calls == 1 assert runtime.launch_count == 1 + + +@pytest.mark.skipif( + os.getenv("RUN_LIVE_INSTAGRAM_IMAGE_TESTS") != "1", + reason="Set RUN_LIVE_INSTAGRAM_IMAGE_TESTS=1 to crawl live Instagram post images.", +) +@pytest.mark.skipif(not EVENT_LOOP_AVAILABLE, reason="Event loop creation is blocked in this environment") +@pytest.mark.parametrize( + "url", + [ + "https://www.instagram.com/p/DX8pE6WEeom/?", + "https://www.instagram.com/p/DYgvDx0gc_X/?img_index=1", + "https://www.instagram.com/p/C6Kp79xRiGC", + "https://www.instagram.com/p/DLmXSK3znl-", + ], +) +def test_live_instagram_post_image_fetch_returns_images(url: str) -> None: + settings = Settings( + instagram_image_fetch_timeout_ms=12000, + instagram_image_fetch_max_images=12, + instagram_image_fetch_max_next_clicks=12, + crawler_browser_reuse_enabled=False, + ) + + result = _run_with_subprocesses(service.fetch_instagram_post_images(url, settings)) + + assert result.timed_out is False + assert result.response_status != 429 + assert result.image_urls diff --git a/tests/test_worker_processor.py b/tests/test_worker_processor.py index c7e55df..81ac8b9 100644 --- a/tests/test_worker_processor.py +++ b/tests/test_worker_processor.py @@ -3,6 +3,7 @@ import asyncio from dataclasses import dataclass from datetime import datetime, timezone +from types import SimpleNamespace from uuid import UUID, uuid4 import pytest @@ -17,7 +18,7 @@ JobStatus, PlaceCandidate, ) -from app.worker.processor import JobProcessor +from app.worker.processor import JobProcessor, build_instagram_ocr_augmented_content from app.services.crawler.extractors.registry import UnsupportedPlatformUrlError if hasattr(asyncio, "WindowsSelectorEventLoopPolicy"): @@ -103,6 +104,40 @@ async def extract( return self.result +class SequentialExtractionClient: + def __init__(self, results: list[ExtractionResult | None]) -> None: + self.results = list(results) + self.calls: list[dict[str, object]] = [] + + async def extract( + self, + *, + text: str, + original_url: str, + media_type: str | None, + ) -> ExtractionResult | None: + self.calls.append( + { + "text": text, + "original_url": original_url, + "media_type": media_type, + } + ) + if not self.results: + return None + return self.results.pop(0) + + +class FakeOCRClient: + def __init__(self, texts: list[str]) -> None: + self.texts = texts + self.calls: list[list[str]] = [] + + async def extract_texts_from_image_urls(self, image_urls: list[str]) -> list[str]: + self.calls.append(image_urls) + return self.texts + + class FakeInstagramCooldownStore: def __init__(self, ttl: int = 0) -> None: self.ttl = ttl @@ -486,6 +521,149 @@ async def fake_crawl(url: str, _settings: Settings) -> CrawlArtifact: assert repo.failed is None +def test_instagram_ocr_augmented_content_includes_caption_and_image_text() -> None: + text = build_instagram_ocr_augmented_content( + caption="caption text", + ocr_texts=["상호명\n주소", "", "메뉴"], + ) + + assert text == ( + "[caption]\ncaption text\n\n" + "[image_ocr]\n" + "image 1:\n상호명\n주소\n\n" + "image 2:\n메뉴" + ) + + +@pytest.mark.skipif(not EVENT_LOOP_AVAILABLE, reason="Event loop creation is blocked in this environment") +def test_processor_runs_instagram_image_ocr_fallback_when_caption_extraction_is_empty(monkeypatch) -> None: + now = datetime.now(timezone.utc) + job = JobRecord( + job_id=uuid4(), + room_id=uuid4(), + original_url="https://www.instagram.com/p/example/", + canonical_url="https://www.instagram.com/p/example/", + status=JobStatus.QUEUED, + error_message=None, + created_at=now, + updated_at=now, + ) + repo = FakeRepository(job) + final_result = ExtractionResult( + store_name="OCR Cafe", + address="서울 마포구 연남로 1", + store_name_evidence="OCR Cafe", + address_evidence="서울 마포구 연남로 1", + certainty=ExtractionCertainty.HIGH, + ) + extractor = SequentialExtractionClient([None, final_result]) + ocr = FakeOCRClient(["OCR Cafe\n서울 마포구 연남로 1"]) + + async def fake_crawl(url: str, _settings: Settings) -> CrawlArtifact: + return CrawlArtifact( + url=url, + html=None, + content_text="맛집 모음", + media_type="post", + source_type="INSTAGRAM", + extraction_method="INSTAGRAM_OG_META", + raw_metadata={"instagram": {"og_source": "og:description"}}, + ) + + async def fake_fetch_images(url: str, _settings: Settings): + assert url == "https://www.instagram.com/p/example/" + return SimpleNamespace( + image_urls=["https://cdn.example/image1.jpg"], + timed_out=False, + error=None, + ) + + monkeypatch.setattr("app.worker.processor.crawl_and_parse", fake_crawl) + monkeypatch.setattr("app.worker.processor.fetch_instagram_post_images", fake_fetch_images) + + processor = JobProcessor( + repository=repo, + settings=Settings(), + extraction_client=extractor, + ocr_client=ocr, + ) + + _run(processor.process_job(job.job_id)) + + assert len(extractor.calls) == 2 + assert extractor.calls[0]["text"] == "맛집 모음" + assert extractor.calls[1]["text"] == ( + "[caption]\n맛집 모음\n\n" + "[image_ocr]\nimage 1:\nOCR Cafe\n서울 마포구 연남로 1" + ) + assert ocr.calls == [["https://cdn.example/image1.jpg"]] + assert repo.saved_content is not None + assert repo.saved_content["content_text"] == extractor.calls[1]["text"] + assert repo.saved_content["raw_metadata"]["instagram"]["ocr_fallback"] == { + "attempted": True, + "image_count": 1, + "ocr_text_count": 1, + "image_fetch_timed_out": False, + "image_fetch_error": None, + } + assert repo.saved_result is not None + assert repo.saved_result["extraction_result"]["store_name"] == "OCR Cafe" + + +@pytest.mark.skipif(not EVENT_LOOP_AVAILABLE, reason="Event loop creation is blocked in this environment") +def test_processor_skips_instagram_image_ocr_fallback_when_caption_has_places(monkeypatch) -> None: + now = datetime.now(timezone.utc) + job = JobRecord( + job_id=uuid4(), + room_id=uuid4(), + original_url="https://www.instagram.com/p/example/", + canonical_url="https://www.instagram.com/p/example/", + status=JobStatus.QUEUED, + error_message=None, + created_at=now, + updated_at=now, + ) + repo = FakeRepository(job) + result = ExtractionResult( + store_name="Caption Cafe", + address=None, + store_name_evidence="Caption Cafe", + address_evidence=None, + certainty=ExtractionCertainty.HIGH, + ) + extractor = SequentialExtractionClient([result]) + ocr = FakeOCRClient(["should not be called"]) + + async def fake_crawl(url: str, _settings: Settings) -> CrawlArtifact: + return CrawlArtifact( + url=url, + html=None, + content_text="Caption Cafe", + media_type="post", + source_type="INSTAGRAM", + ) + + async def fake_fetch_images(_url: str, _settings: Settings): + raise AssertionError("image fallback should not run") + + monkeypatch.setattr("app.worker.processor.crawl_and_parse", fake_crawl) + monkeypatch.setattr("app.worker.processor.fetch_instagram_post_images", fake_fetch_images) + + processor = JobProcessor( + repository=repo, + settings=Settings(), + extraction_client=extractor, + ocr_client=ocr, + ) + + _run(processor.process_job(job.job_id)) + + assert len(extractor.calls) == 1 + assert ocr.calls == [] + assert repo.saved_content is not None + assert repo.saved_content["content_text"] == "Caption Cafe" + + @pytest.mark.skipif(not EVENT_LOOP_AVAILABLE, reason="Event loop creation is blocked in this environment") def test_processor_tries_broader_location_hints_before_keyword_only(monkeypatch) -> None: job = _new_job() diff --git a/tests/test_worker_runner.py b/tests/test_worker_runner.py index cbd1e11..6061576 100644 --- a/tests/test_worker_runner.py +++ b/tests/test_worker_runner.py @@ -2,8 +2,8 @@ from app.core.config import Settings from app.infra.kakao import KakaoLocalClient -from app.infra.llm import HFExtractionClient -from app.worker.runner import build_extraction_client, build_place_search_client +from app.infra.llm import HFExtractionClient, HFOCRClient +from app.worker.runner import build_extraction_client, build_ocr_client, build_place_search_client def test_build_extraction_client_returns_none_without_endpoint() -> None: @@ -33,6 +33,48 @@ def test_build_extraction_client_returns_hf_client_when_configured() -> None: assert isinstance(build_extraction_client(settings), HFExtractionClient) +def test_build_ocr_client_returns_none_without_endpoint() -> None: + settings = Settings( + hf_ocr_endpoint_url="", + hf_ocr_api_token="test-token", + hf_extraction_endpoint_url="", + hf_extraction_api_token="", + ) + + assert build_ocr_client(settings) is None + + +def test_build_ocr_client_returns_none_without_token() -> None: + settings = Settings( + hf_ocr_endpoint_url="https://router.huggingface.co/v1/chat/completions", + hf_ocr_api_token="", + hf_extraction_endpoint_url="", + hf_extraction_api_token="", + ) + + assert build_ocr_client(settings) is None + + +def test_build_ocr_client_returns_hf_ocr_client_when_configured() -> None: + settings = Settings( + hf_ocr_endpoint_url="https://router.huggingface.co/v1/chat/completions", + hf_ocr_api_token="test-token", + ) + + assert isinstance(build_ocr_client(settings), HFOCRClient) + + +def test_build_ocr_client_can_reuse_extraction_hf_router_settings() -> None: + settings = Settings( + hf_ocr_endpoint_url="", + hf_ocr_api_token="", + hf_extraction_endpoint_url="https://router.huggingface.co/v1/chat/completions", + hf_extraction_api_token="test-token", + ) + + assert isinstance(build_ocr_client(settings), HFOCRClient) + + def test_build_place_search_client_returns_none_without_key() -> None: settings = Settings(kakao_rest_api_key="")