diff --git a/app/api/v1/endpoints/jobs.py b/app/api/v1/endpoints/jobs.py index 9f41fd6..4bb4f31 100644 --- a/app/api/v1/endpoints/jobs.py +++ b/app/api/v1/endpoints/jobs.py @@ -172,13 +172,13 @@ async def get_job_result( detail={"code": "JOB_NOT_FOUND", "message": "Job not found."}, ) - if job.status in {JobStatus.QUEUED, JobStatus.PROCESSING}: + result = await repository.get_job_result(jobId) + if job.status in {JobStatus.QUEUED, JobStatus.PROCESSING} and not result: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail={"code": "RESULT_NOT_READY", "message": f"Job is currently {job.status.value}."}, ) - result = await repository.get_job_result(jobId) content = await repository.get_crawled_content(jobId) link_stats = await repository.get_link_stats(jobId) diff --git a/app/core/config.py b/app/core/config.py index e250dd2..9ed5bca 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -97,8 +97,8 @@ class Settings(BaseSettings): instagram_og_wait_timeout_ms: int = 3000 instagram_block_resource_types: str = "image,font,media" instagram_image_fetch_timeout_ms: int = 8000 - instagram_image_fetch_max_images: int = 10 - instagram_image_fetch_max_next_clicks: int = 10 + instagram_image_fetch_max_images: int = 20 + instagram_image_fetch_max_next_clicks: int = 20 instagram_rate_limit_cooldown_seconds: int = 1800 instagram_cooldown_key: str = "processing:cooldown:instagram" @@ -119,7 +119,7 @@ class Settings(BaseSettings): playwright_disable_dev_shm_usage: bool = True # TODO(next stage): enable when embedding-based candidate extraction is implemented. - extraction_max_candidates: int = 12 + extraction_max_candidates: int = 40 extraction_max_sentence_length: int = 280 # TODO(next stage): enable when Kakao Local enrichment is implemented. diff --git a/app/infra/llm/client.py b/app/infra/llm/client.py index 9489220..2848527 100644 --- a/app/infra/llm/client.py +++ b/app/infra/llm/client.py @@ -91,7 +91,7 @@ def build_extraction_system_prompt(max_candidates: int) -> str: ) -EXTRACTION_SYSTEM_PROMPT = build_extraction_system_prompt(12) +EXTRACTION_SYSTEM_PROMPT = build_extraction_system_prompt(40) class HFExtractionError(Exception): diff --git a/app/worker/processor.py b/app/worker/processor.py index 5f6d3be..c47a183 100644 --- a/app/worker/processor.py +++ b/app/worker/processor.py @@ -4,7 +4,7 @@ import logging import re import time -from dataclasses import dataclass +from dataclasses import dataclass, replace from typing import Protocol from uuid import UUID @@ -31,6 +31,8 @@ logger = logging.getLogger("processing.worker.processor") UNSUPPORTED_PLATFORM_URL_ERROR_CODE = "UNSUPPORTED_PLATFORM_URL" +INSTAGRAM_OCR_BATCH_SIZE = 5 +INSTAGRAM_PARTIAL_RESULT_MIN_NEW_RESOLVED_PLACES = 5 def build_instagram_ocr_augmented_content( @@ -54,6 +56,82 @@ def build_instagram_ocr_augmented_content( return "\n\n".join(f"{header}\n{body}" for header, body in sections).strip() +def _iter_batches(values: list[str], batch_size: int): + size = max(1, int(batch_size)) + for index in range(0, len(values), size): + yield values[index : index + size] + + +def _merge_extraction_results( + results: list[ExtractionResult], + *, + max_places: int, +) -> ExtractionResult | None: + places: list[ExtractedPlace] = [] + seen_keys: set[str] = set() + for result in results: + for place in extracted_places_from_result(result): + key = _extracted_place_dedupe_key(place) + if key in seen_keys: + continue + places.append(place) + seen_keys.add(key) + if len(places) >= max(1, int(max_places)): + break + if len(places) >= max(1, int(max_places)): + break + + if not places: + return None + + first_place = places[0] + return ExtractionResult( + store_name=first_place.store_name, + address=first_place.address, + store_name_evidence=first_place.store_name_evidence, + address_evidence=first_place.address_evidence, + certainty=first_place.certainty, + places=places, + ) + + +def _extracted_place_dedupe_key(place: ExtractedPlace) -> str: + return "|".join( + ( + str(place.store_name or "").strip(), + str(place.address or "").strip(), + str(place.store_name_evidence or "").strip(), + str(place.address_evidence or "").strip(), + ) + ) + + +def _merge_place_dicts( + existing: list[dict[str, object]], + new_items: list[dict[str, object]], +) -> list[dict[str, object]]: + merged = list(existing) + seen_keys = {_place_dict_dedupe_key(place) for place in merged} + for place in new_items: + key = _place_dict_dedupe_key(place) + if key in seen_keys: + continue + merged.append(place) + seen_keys.add(key) + return merged + + +def _place_dict_dedupe_key(place: dict[str, object]) -> str: + kakao_place_id = str(place.get("kakao_place_id") or "").strip() + if kakao_place_id: + return f"id:{kakao_place_id}" + return ( + f"name:{place.get('place_name') or ''}|" + f"{place.get('address_name') or ''}|" + f"{place.get('road_address_name') or ''}" + ) + + @dataclass(slots=True) class JobProcessOutcome: processed: bool @@ -259,6 +337,16 @@ async def process_job(self, job_id: UUID) -> JobProcessOutcome: extraction_result, crawl_artifact, ) + if not self._has_extracted_places(extraction_result): + fallback_outputs = await self._extract_with_instagram_image_batches( + job=job, + original_url=job.original_url, + crawl_artifact=crawl_artifact, + initial_extraction_result=extraction_result, + ) + if fallback_outputs is not None: + extraction_result, place_candidates, resolved_places = fallback_outputs + logger.info( "job crawl completed job_id=%s content_text_len=%s place_candidates=%s resolved_places=%s", job.job_id, @@ -435,17 +523,9 @@ async def _extract_result( if not self._extraction_client: return None - extraction_result: ExtractionResult | None = None if crawl_artifact.content_text: - extraction_result = await self._run_extraction(original_url, crawl_artifact) - if self._has_extracted_places(extraction_result): - return extraction_result - - fallback_result = await self._extract_with_instagram_image_fallback( - original_url, - crawl_artifact, - ) - return fallback_result if fallback_result is not None else extraction_result + return await self._run_extraction(original_url, crawl_artifact) + return None async def _run_extraction( self, @@ -464,34 +544,123 @@ async def _run_extraction( raise return None - async def _extract_with_instagram_image_fallback( + async def _extract_with_instagram_image_batches( self, + *, + job: JobRecord, original_url: str, crawl_artifact: CrawlArtifact, - ) -> ExtractionResult | None: - if not self._ocr_client: + initial_extraction_result: ExtractionResult | None, + ) -> tuple[ExtractionResult | None, list[dict[str, object]], list[dict[str, object]]] | None: + if not self._extraction_client or not self._ocr_client: return None if not self._should_run_instagram_image_fallback(original_url, crawl_artifact): return None try: image_result = await fetch_instagram_post_images(crawl_artifact.url, self._settings) - ocr_texts = await self._ocr_client.extract_texts_from_image_urls(image_result.image_urls) - augmented_text = build_instagram_ocr_augmented_content( - caption=crawl_artifact.content_text, - ocr_texts=ocr_texts, + original_caption = crawl_artifact.content_text + all_ocr_texts: list[str] = [] + extraction_results: list[ExtractionResult] = [] + if initial_extraction_result is not None: + extraction_results.append(initial_extraction_result) + place_candidates: list[dict[str, object]] = [] + resolved_places: list[dict[str, object]] = [] + last_persisted_resolved_count = 0 + + for image_batch in _iter_batches( + image_result.image_urls, + INSTAGRAM_OCR_BATCH_SIZE, + ): + try: + ocr_texts = await self._ocr_client.extract_texts_from_image_urls(image_batch) + all_ocr_texts.extend(ocr_texts) + batch_text = build_instagram_ocr_augmented_content( + caption=original_caption, + ocr_texts=ocr_texts, + ) + if not batch_text: + continue + + batch_artifact = replace(crawl_artifact, content_text=batch_text) + batch_extraction_result = await self._run_extraction(original_url, batch_artifact) + if batch_extraction_result is None: + continue + + extraction_results.append(batch_extraction_result) + batch_place_candidates, batch_resolved_places = await self._enrich_place( + batch_extraction_result, + batch_artifact, + ) + except Exception: + logger.exception( + "instagram image OCR batch failed original_url=%s accumulated_resolved_places=%s", + original_url, + len(resolved_places), + ) + if resolved_places or _merge_extraction_results( + extraction_results, + max_places=self._settings.extraction_max_candidates, + ): + break + if self._settings.extraction_failure_retry_enabled: + raise + continue + + place_candidates = _merge_place_dicts(place_candidates, batch_place_candidates) + resolved_places = _merge_place_dicts(resolved_places, batch_resolved_places) + + if ( + len(resolved_places) - last_persisted_resolved_count + >= INSTAGRAM_PARTIAL_RESULT_MIN_NEW_RESOLVED_PLACES + ): + crawl_artifact.content_text = build_instagram_ocr_augmented_content( + caption=original_caption, + ocr_texts=all_ocr_texts, + ) + final_extraction_result = _merge_extraction_results( + extraction_results, + max_places=self._settings.extraction_max_candidates, + ) + self._record_instagram_ocr_fallback( + crawl_artifact, + image_count=len(image_result.image_urls), + ocr_text_count=len(all_ocr_texts), + image_fetch_timed_out=image_result.timed_out, + image_fetch_error=image_result.error, + batch_size=INSTAGRAM_OCR_BATCH_SIZE, + ) + await self._persist_outputs( + job=job, + crawl_artifact=crawl_artifact, + extraction_result=final_extraction_result, + place_candidates=place_candidates, + resolved_places=resolved_places, + ) + last_persisted_resolved_count = len(resolved_places) + + final_augmented_text = build_instagram_ocr_augmented_content( + caption=original_caption, + ocr_texts=all_ocr_texts, ) + if final_augmented_text: + crawl_artifact.content_text = final_augmented_text self._record_instagram_ocr_fallback( crawl_artifact, image_count=len(image_result.image_urls), - ocr_text_count=len(ocr_texts), + ocr_text_count=len(all_ocr_texts), image_fetch_timed_out=image_result.timed_out, image_fetch_error=image_result.error, + batch_size=INSTAGRAM_OCR_BATCH_SIZE, ) - if not augmented_text: + + final_extraction_result = _merge_extraction_results( + extraction_results, + max_places=self._settings.extraction_max_candidates, + ) + if final_extraction_result is None and not place_candidates and not resolved_places: return None - crawl_artifact.content_text = augmented_text - return await self._run_extraction(original_url, crawl_artifact) + return final_extraction_result, place_candidates, resolved_places except Exception: logger.exception("instagram image OCR fallback failed original_url=%s", original_url) if self._settings.extraction_failure_retry_enabled: @@ -521,6 +690,7 @@ def _record_instagram_ocr_fallback( ocr_text_count: int, image_fetch_timed_out: bool, image_fetch_error: str | None, + batch_size: int | None = None, ) -> None: raw_metadata = dict(crawl_artifact.raw_metadata or {}) instagram_metadata = dict(raw_metadata.get("instagram") or {}) @@ -531,6 +701,8 @@ def _record_instagram_ocr_fallback( "image_fetch_timed_out": image_fetch_timed_out, "image_fetch_error": image_fetch_error, } + if batch_size is not None: + instagram_metadata["ocr_fallback"]["batch_size"] = batch_size raw_metadata["instagram"] = instagram_metadata raw_metadata["extraction_source"] = "instagram_og_meta_with_image_ocr_fallback" crawl_artifact.raw_metadata = raw_metadata diff --git a/tests/test_config.py b/tests/test_config.py index e15f724..22e569b 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -90,8 +90,9 @@ def test_instagram_image_fetch_and_hf_ocr_defaults_are_separate_from_caption_ext settings = Settings() assert settings.instagram_image_fetch_timeout_ms == 8000 - assert settings.instagram_image_fetch_max_images == 10 - assert settings.instagram_image_fetch_max_next_clicks == 10 + assert settings.instagram_image_fetch_max_images == 20 + assert settings.instagram_image_fetch_max_next_clicks == 20 + assert settings.extraction_max_candidates == 40 assert settings.hf_ocr_endpoint_url == "" assert settings.hf_ocr_api_token == "" assert settings.hf_ocr_model_name == "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8" diff --git a/tests/test_jobs_api_result.py b/tests/test_jobs_api_result.py index 036cb27..c06e048 100644 --- a/tests/test_jobs_api_result.py +++ b/tests/test_jobs_api_result.py @@ -44,7 +44,7 @@ async def get_job(self, job_id: UUID) -> JobRecord | None: class FakeJobRepository: def __init__( self, - result: JobResultRecord, + result: JobResultRecord | None, content: CrawledContentRecord | None = None, link_stats: LinkStatsRecord | None = None, ) -> None: @@ -53,7 +53,7 @@ def __init__( self.link_stats = link_stats async def get_job_result(self, job_id: UUID) -> JobResultRecord | None: - return self.result if job_id == self.result.job_id else None + return self.result if self.result and job_id == self.result.job_id else None async def get_crawled_content(self, job_id: UUID) -> CrawledContentRecord | None: return self.content if self.content and job_id == self.content.job_id else None @@ -81,7 +81,7 @@ def get(self, *args, **kwargs): def _client( job: JobRecord, - result: JobResultRecord, + result: JobResultRecord | None, content: CrawledContentRecord | None = None, link_stats: LinkStatsRecord | None = None, ) -> SkippingTestClient: @@ -255,6 +255,65 @@ def test_get_job_result_returns_public_contract_only() -> None: assert "y" not in payload["resolved_places"][0] +def test_get_job_result_returns_partial_result_while_processing() -> None: + now = datetime.now(timezone.utc) + job_id = uuid4() + place_result = { + "kakao_place_id": "123", + "place_name": "Partial Cafe", + "address_name": "Seoul Mapo-gu", + "road_address_name": "Seoul Mapo-gu Road 1", + "x": "126.92", + "y": "37.55", + "place_url": "https://place.map.kakao.com/123", + } + job = JobRecord( + job_id=job_id, + room_id=uuid4(), + original_url="https://www.instagram.com/p/example/", + canonical_url="https://www.instagram.com/p/example/", + status=JobStatus.PROCESSING, + error_message=None, + created_at=now, + updated_at=now, + ) + result = JobResultRecord( + job_id=job_id, + extraction_result=None, + place_candidates=[place_result], + resolved_places=[place_result], + created_at=now, + updated_at=now, + ) + + response = _client(job, result).get(f"/jobs/{job_id}/result", headers=_headers()) + + assert response.status_code == 200 + payload = response.json() + assert payload["status"] == "PROCESSING" + assert payload["resolved_places"][0]["place_name"] == "Partial Cafe" + + +def test_get_job_result_still_conflicts_while_processing_without_result() -> None: + now = datetime.now(timezone.utc) + job_id = uuid4() + job = JobRecord( + job_id=job_id, + room_id=uuid4(), + original_url="https://www.instagram.com/p/example/", + canonical_url="https://www.instagram.com/p/example/", + status=JobStatus.PROCESSING, + error_message=None, + created_at=now, + updated_at=now, + ) + + response = _client(job, None).get(f"/jobs/{job_id}/result", headers=_headers()) + + assert response.status_code == 409 + assert response.json()["detail"]["code"] == "RESULT_NOT_READY" + + def test_get_job_debug_result_returns_internal_fields() -> None: now = datetime.now(timezone.utc) job_id = uuid4() diff --git a/tests/test_worker_processor.py b/tests/test_worker_processor.py index 81ac8b9..83b32a1 100644 --- a/tests/test_worker_processor.py +++ b/tests/test_worker_processor.py @@ -49,6 +49,7 @@ class FakeRepository: def __init__(self, job: JobRecord) -> None: self._job = job self.saved_result: dict | None = None + self.saved_results: list[dict] = [] self.saved_content: dict | None = None self.saved_link_stats: dict | None = None self.succeeded = False @@ -62,6 +63,7 @@ async def claim_job(self, job_id: UUID) -> JobRecord | None: async def upsert_job_result(self, **kwargs): self.saved_result = kwargs + self.saved_results.append(kwargs) return None async def upsert_crawled_content(self, **kwargs): @@ -138,6 +140,30 @@ async def extract_texts_from_image_urls(self, image_urls: list[str]) -> list[str return self.texts +class SequentialOCRClient: + def __init__(self, texts_by_call: list[list[str]]) -> None: + self.texts_by_call = list(texts_by_call) + self.calls: list[list[str]] = [] + + async def extract_texts_from_image_urls(self, image_urls: list[str]) -> list[str]: + self.calls.append(image_urls) + if not self.texts_by_call: + return [] + return self.texts_by_call.pop(0) + + +class FailingAfterFirstOCRClient: + def __init__(self, first_texts: list[str]) -> None: + self.first_texts = first_texts + self.calls: list[list[str]] = [] + + async def extract_texts_from_image_urls(self, image_urls: list[str]) -> list[str]: + self.calls.append(image_urls) + if len(self.calls) == 1: + return self.first_texts + raise RuntimeError("ocr rate limited") + + class FakeInstagramCooldownStore: def __init__(self, ttl: int = 0) -> None: self.ttl = ttl @@ -277,6 +303,28 @@ def _place_candidate( ) +def _extraction_result_with_places(names: list[str]) -> ExtractionResult: + places = [ + ExtractedPlace( + store_name=name, + address=f"서울 테스트구 {index}길", + store_name_evidence=name, + address_evidence=f"서울 테스트구 {index}길", + certainty=ExtractionCertainty.HIGH, + ) + for index, name in enumerate(names, start=1) + ] + first = places[0] if places else None + return ExtractionResult( + store_name=first.store_name if first else None, + address=first.address if first else None, + store_name_evidence=first.store_name_evidence if first else None, + address_evidence=first.address_evidence if first else None, + certainty=first.certainty if first else ExtractionCertainty.LOW, + places=places, + ) + + @pytest.mark.skipif(not EVENT_LOOP_AVAILABLE, reason="Event loop creation is blocked in this environment") def test_processor_success(monkeypatch) -> None: job = _new_job() @@ -605,6 +653,7 @@ async def fake_fetch_images(url: str, _settings: Settings): "ocr_text_count": 1, "image_fetch_timed_out": False, "image_fetch_error": None, + "batch_size": 5, } assert repo.saved_result is not None assert repo.saved_result["extraction_result"]["store_name"] == "OCR Cafe" @@ -664,6 +713,233 @@ async def fake_fetch_images(_url: str, _settings: Settings): assert repo.saved_content["content_text"] == "Caption Cafe" +@pytest.mark.skipif(not EVENT_LOOP_AVAILABLE, reason="Event loop creation is blocked in this environment") +def test_processor_persists_instagram_ocr_batch_results_for_each_new_resolved_threshold(monkeypatch) -> None: + now = datetime.now(timezone.utc) + job = JobRecord( + job_id=uuid4(), + room_id=uuid4(), + original_url="https://www.instagram.com/p/example/", + canonical_url="https://www.instagram.com/p/example/", + status=JobStatus.QUEUED, + error_message=None, + created_at=now, + updated_at=now, + ) + repo = FakeRepository(job) + batch_names = [ + [f"Batch1 Place {index}" for index in range(1, 7)], + [f"Batch2 Place {index}" for index in range(1, 3)], + [f"Batch3 Place {index}" for index in range(1, 4)], + [f"Batch4 Place {index}" for index in range(1, 5)], + ] + all_names = [name for names in batch_names for name in names] + extractor = SequentialExtractionClient( + [None] + [_extraction_result_with_places(names) for names in batch_names] + ) + ocr = SequentialOCRClient( + [[f"OCR text {batch_index}-{image_index}" for image_index in range(5)] for batch_index in range(1, 5)] + ) + place_search = KeywordAwarePlaceSearchClient( + { + name: [ + _place_candidate( + kakao_place_id=str(index), + place_name=name, + query=name, + ) + ] + for index, name in enumerate(all_names, start=1) + } + ) + image_urls = [f"https://cdn.example/image{index}.jpg" for index in range(1, 21)] + + async def fake_crawl(url: str, _settings: Settings) -> CrawlArtifact: + return CrawlArtifact( + url=url, + html=None, + content_text="맛집 모음", + media_type="post", + source_type="INSTAGRAM", + extraction_method="INSTAGRAM_OG_META", + raw_metadata={"instagram": {"og_source": "og:description"}}, + ) + + async def fake_fetch_images(_url: str, _settings: Settings): + return SimpleNamespace( + image_urls=image_urls, + timed_out=False, + error=None, + ) + + monkeypatch.setattr("app.worker.processor.crawl_and_parse", fake_crawl) + monkeypatch.setattr("app.worker.processor.fetch_instagram_post_images", fake_fetch_images) + + processor = JobProcessor( + repository=repo, + settings=Settings(), + extraction_client=extractor, + ocr_client=ocr, + place_search_client=place_search, + ) + + _run(processor.process_job(job.job_id)) + + assert [len(call) for call in ocr.calls] == [5, 5, 5, 5] + assert len(extractor.calls) == 5 + assert [len(result["resolved_places"]) for result in repo.saved_results] == [6, 11, 15] + assert repo.saved_results[-1]["extraction_result"]["places"][0]["store_name"] == "Batch1 Place 1" + assert repo.saved_results[-1]["extraction_result"]["places"][-1]["store_name"] == "Batch4 Place 4" + assert repo.saved_content is not None + assert repo.saved_content["raw_metadata"]["instagram"]["ocr_fallback"]["batch_size"] == 5 + assert repo.saved_content["raw_metadata"]["instagram"]["ocr_fallback"]["ocr_text_count"] == 20 + assert repo.succeeded is True + + +@pytest.mark.skipif(not EVENT_LOOP_AVAILABLE, reason="Event loop creation is blocked in this environment") +def test_processor_waits_to_persist_instagram_ocr_result_until_new_resolved_threshold(monkeypatch) -> None: + now = datetime.now(timezone.utc) + job = JobRecord( + job_id=uuid4(), + room_id=uuid4(), + original_url="https://www.instagram.com/p/example/", + canonical_url="https://www.instagram.com/p/example/", + status=JobStatus.QUEUED, + error_message=None, + created_at=now, + updated_at=now, + ) + repo = FakeRepository(job) + batch_names = [ + ["First A", "First B"], + ["Second A", "Second B", "Second C", "Second D", "Second E"], + ] + all_names = [name for names in batch_names for name in names] + extractor = SequentialExtractionClient( + [None] + [_extraction_result_with_places(names) for names in batch_names] + ) + ocr = SequentialOCRClient( + [[f"OCR text {batch_index}-{image_index}" for image_index in range(5)] for batch_index in range(1, 3)] + ) + place_search = KeywordAwarePlaceSearchClient( + { + name: [ + _place_candidate( + kakao_place_id=str(index), + place_name=name, + query=name, + ) + ] + for index, name in enumerate(all_names, start=1) + } + ) + image_urls = [f"https://cdn.example/image{index}.jpg" for index in range(1, 11)] + + async def fake_crawl(url: str, _settings: Settings) -> CrawlArtifact: + return CrawlArtifact( + url=url, + html=None, + content_text="맛집 모음", + media_type="post", + source_type="INSTAGRAM", + extraction_method="INSTAGRAM_OG_META", + raw_metadata={"instagram": {"og_source": "og:description"}}, + ) + + async def fake_fetch_images(_url: str, _settings: Settings): + return SimpleNamespace( + image_urls=image_urls, + timed_out=False, + error=None, + ) + + monkeypatch.setattr("app.worker.processor.crawl_and_parse", fake_crawl) + monkeypatch.setattr("app.worker.processor.fetch_instagram_post_images", fake_fetch_images) + + processor = JobProcessor( + repository=repo, + settings=Settings(), + extraction_client=extractor, + ocr_client=ocr, + place_search_client=place_search, + ) + + _run(processor.process_job(job.job_id)) + + assert [len(call) for call in ocr.calls] == [5, 5] + assert [len(result["resolved_places"]) for result in repo.saved_results] == [7, 7] + assert repo.saved_results[0]["resolved_places"][0]["place_name"] == "First A" + assert repo.succeeded is True + + +@pytest.mark.skipif(not EVENT_LOOP_AVAILABLE, reason="Event loop creation is blocked in this environment") +def test_processor_keeps_accumulated_instagram_ocr_results_when_later_batch_fails(monkeypatch) -> None: + now = datetime.now(timezone.utc) + job = JobRecord( + job_id=uuid4(), + room_id=uuid4(), + original_url="https://www.instagram.com/p/example/", + canonical_url="https://www.instagram.com/p/example/", + status=JobStatus.QUEUED, + error_message=None, + created_at=now, + updated_at=now, + ) + repo = FakeRepository(job) + names = [f"Batch1 Place {index}" for index in range(1, 7)] + extractor = SequentialExtractionClient([None, _extraction_result_with_places(names)]) + ocr = FailingAfterFirstOCRClient([f"OCR text {index}" for index in range(1, 6)]) + place_search = KeywordAwarePlaceSearchClient( + { + name: [ + _place_candidate( + kakao_place_id=str(index), + place_name=name, + query=name, + ) + ] + for index, name in enumerate(names, start=1) + } + ) + image_urls = [f"https://cdn.example/image{index}.jpg" for index in range(1, 11)] + + async def fake_crawl(url: str, _settings: Settings) -> CrawlArtifact: + return CrawlArtifact( + url=url, + html=None, + content_text="맛집 모음", + media_type="post", + source_type="INSTAGRAM", + extraction_method="INSTAGRAM_OG_META", + raw_metadata={"instagram": {"og_source": "og:description"}}, + ) + + async def fake_fetch_images(_url: str, _settings: Settings): + return SimpleNamespace( + image_urls=image_urls, + timed_out=False, + error=None, + ) + + monkeypatch.setattr("app.worker.processor.crawl_and_parse", fake_crawl) + monkeypatch.setattr("app.worker.processor.fetch_instagram_post_images", fake_fetch_images) + + processor = JobProcessor( + repository=repo, + settings=Settings(), + extraction_client=extractor, + ocr_client=ocr, + place_search_client=place_search, + ) + + _run(processor.process_job(job.job_id)) + + assert [len(call) for call in ocr.calls] == [5, 5] + assert [len(result["resolved_places"]) for result in repo.saved_results] == [6, 6] + assert repo.succeeded is True + assert repo.failed is None + + @pytest.mark.skipif(not EVENT_LOOP_AVAILABLE, reason="Event loop creation is blocked in this environment") def test_processor_tries_broader_location_hints_before_keyword_only(monkeypatch) -> None: job = _new_job()