Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/api/v1/endpoints/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,13 @@ async def get_job_result(
detail={"code": "JOB_NOT_FOUND", "message": "Job not found."},
)

if job.status in {JobStatus.QUEUED, JobStatus.PROCESSING}:
result = await repository.get_job_result(jobId)
if job.status in {JobStatus.QUEUED, JobStatus.PROCESSING} and not result:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={"code": "RESULT_NOT_READY", "message": f"Job is currently {job.status.value}."},
)

result = await repository.get_job_result(jobId)
content = await repository.get_crawled_content(jobId)
link_stats = await repository.get_link_stats(jobId)

Expand Down
6 changes: 3 additions & 3 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ class Settings(BaseSettings):
instagram_og_wait_timeout_ms: int = 3000
instagram_block_resource_types: str = "image,font,media"
instagram_image_fetch_timeout_ms: int = 8000
instagram_image_fetch_max_images: int = 10
instagram_image_fetch_max_next_clicks: int = 10
instagram_image_fetch_max_images: int = 20
instagram_image_fetch_max_next_clicks: int = 20
instagram_rate_limit_cooldown_seconds: int = 1800
instagram_cooldown_key: str = "processing:cooldown:instagram"

Expand All @@ -119,7 +119,7 @@ class Settings(BaseSettings):
playwright_disable_dev_shm_usage: bool = True

# TODO(next stage): enable when embedding-based candidate extraction is implemented.
extraction_max_candidates: int = 12
extraction_max_candidates: int = 40
extraction_max_sentence_length: int = 280

# TODO(next stage): enable when Kakao Local enrichment is implemented.
Expand Down
2 changes: 1 addition & 1 deletion app/infra/llm/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def build_extraction_system_prompt(max_candidates: int) -> str:
)


EXTRACTION_SYSTEM_PROMPT = build_extraction_system_prompt(12)
EXTRACTION_SYSTEM_PROMPT = build_extraction_system_prompt(40)


class HFExtractionError(Exception):
Expand Down
216 changes: 194 additions & 22 deletions app/worker/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import re
import time
from dataclasses import dataclass
from dataclasses import dataclass, replace
from typing import Protocol
from uuid import UUID

Expand All @@ -31,6 +31,8 @@
logger = logging.getLogger("processing.worker.processor")

UNSUPPORTED_PLATFORM_URL_ERROR_CODE = "UNSUPPORTED_PLATFORM_URL"
INSTAGRAM_OCR_BATCH_SIZE = 5
INSTAGRAM_PARTIAL_RESULT_MIN_NEW_RESOLVED_PLACES = 5


def build_instagram_ocr_augmented_content(
Expand All @@ -54,6 +56,82 @@ def build_instagram_ocr_augmented_content(
return "\n\n".join(f"{header}\n{body}" for header, body in sections).strip()


def _iter_batches(values: list[str], batch_size: int):
size = max(1, int(batch_size))
for index in range(0, len(values), size):
yield values[index : index + size]


def _merge_extraction_results(
results: list[ExtractionResult],
*,
max_places: int,
) -> ExtractionResult | None:
places: list[ExtractedPlace] = []
seen_keys: set[str] = set()
for result in results:
for place in extracted_places_from_result(result):
key = _extracted_place_dedupe_key(place)
if key in seen_keys:
continue
places.append(place)
seen_keys.add(key)
if len(places) >= max(1, int(max_places)):
break
if len(places) >= max(1, int(max_places)):
break

if not places:
return None

first_place = places[0]
return ExtractionResult(
store_name=first_place.store_name,
address=first_place.address,
store_name_evidence=first_place.store_name_evidence,
address_evidence=first_place.address_evidence,
certainty=first_place.certainty,
places=places,
)


def _extracted_place_dedupe_key(place: ExtractedPlace) -> str:
return "|".join(
(
str(place.store_name or "").strip(),
str(place.address or "").strip(),
str(place.store_name_evidence or "").strip(),
str(place.address_evidence or "").strip(),
)
)


def _merge_place_dicts(
existing: list[dict[str, object]],
new_items: list[dict[str, object]],
) -> list[dict[str, object]]:
merged = list(existing)
seen_keys = {_place_dict_dedupe_key(place) for place in merged}
for place in new_items:
key = _place_dict_dedupe_key(place)
if key in seen_keys:
continue
merged.append(place)
seen_keys.add(key)
return merged


def _place_dict_dedupe_key(place: dict[str, object]) -> str:
kakao_place_id = str(place.get("kakao_place_id") or "").strip()
if kakao_place_id:
return f"id:{kakao_place_id}"
return (
f"name:{place.get('place_name') or ''}|"
f"{place.get('address_name') or ''}|"
f"{place.get('road_address_name') or ''}"
)


@dataclass(slots=True)
class JobProcessOutcome:
processed: bool
Expand Down Expand Up @@ -259,6 +337,16 @@ async def process_job(self, job_id: UUID) -> JobProcessOutcome:
extraction_result,
crawl_artifact,
)
if not self._has_extracted_places(extraction_result):
fallback_outputs = await self._extract_with_instagram_image_batches(
job=job,
original_url=job.original_url,
crawl_artifact=crawl_artifact,
initial_extraction_result=extraction_result,
)
if fallback_outputs is not None:
extraction_result, place_candidates, resolved_places = fallback_outputs

logger.info(
"job crawl completed job_id=%s content_text_len=%s place_candidates=%s resolved_places=%s",
job.job_id,
Expand Down Expand Up @@ -435,17 +523,9 @@ async def _extract_result(
if not self._extraction_client:
return None

extraction_result: ExtractionResult | None = None
if crawl_artifact.content_text:
extraction_result = await self._run_extraction(original_url, crawl_artifact)
if self._has_extracted_places(extraction_result):
return extraction_result

fallback_result = await self._extract_with_instagram_image_fallback(
original_url,
crawl_artifact,
)
return fallback_result if fallback_result is not None else extraction_result
return await self._run_extraction(original_url, crawl_artifact)
return None

async def _run_extraction(
self,
Expand All @@ -464,34 +544,123 @@ async def _run_extraction(
raise
return None

async def _extract_with_instagram_image_fallback(
async def _extract_with_instagram_image_batches(
self,
*,
job: JobRecord,
original_url: str,
crawl_artifact: CrawlArtifact,
) -> ExtractionResult | None:
if not self._ocr_client:
initial_extraction_result: ExtractionResult | None,
) -> tuple[ExtractionResult | None, list[dict[str, object]], list[dict[str, object]]] | None:
if not self._extraction_client or not self._ocr_client:
return None
if not self._should_run_instagram_image_fallback(original_url, crawl_artifact):
return None

try:
image_result = await fetch_instagram_post_images(crawl_artifact.url, self._settings)
ocr_texts = await self._ocr_client.extract_texts_from_image_urls(image_result.image_urls)
augmented_text = build_instagram_ocr_augmented_content(
caption=crawl_artifact.content_text,
ocr_texts=ocr_texts,
original_caption = crawl_artifact.content_text
all_ocr_texts: list[str] = []
extraction_results: list[ExtractionResult] = []
if initial_extraction_result is not None:
extraction_results.append(initial_extraction_result)
place_candidates: list[dict[str, object]] = []
resolved_places: list[dict[str, object]] = []
last_persisted_resolved_count = 0

for image_batch in _iter_batches(
image_result.image_urls,
INSTAGRAM_OCR_BATCH_SIZE,
):
try:
ocr_texts = await self._ocr_client.extract_texts_from_image_urls(image_batch)
all_ocr_texts.extend(ocr_texts)
batch_text = build_instagram_ocr_augmented_content(
caption=original_caption,
ocr_texts=ocr_texts,
)
if not batch_text:
continue

batch_artifact = replace(crawl_artifact, content_text=batch_text)
batch_extraction_result = await self._run_extraction(original_url, batch_artifact)
if batch_extraction_result is None:
continue

extraction_results.append(batch_extraction_result)
batch_place_candidates, batch_resolved_places = await self._enrich_place(
batch_extraction_result,
batch_artifact,
)
except Exception:
logger.exception(
"instagram image OCR batch failed original_url=%s accumulated_resolved_places=%s",
original_url,
len(resolved_places),
)
if resolved_places or _merge_extraction_results(
extraction_results,
max_places=self._settings.extraction_max_candidates,
):
break
if self._settings.extraction_failure_retry_enabled:
raise
continue

place_candidates = _merge_place_dicts(place_candidates, batch_place_candidates)
resolved_places = _merge_place_dicts(resolved_places, batch_resolved_places)

if (
len(resolved_places) - last_persisted_resolved_count
>= INSTAGRAM_PARTIAL_RESULT_MIN_NEW_RESOLVED_PLACES
):
crawl_artifact.content_text = build_instagram_ocr_augmented_content(
caption=original_caption,
ocr_texts=all_ocr_texts,
)
final_extraction_result = _merge_extraction_results(
extraction_results,
max_places=self._settings.extraction_max_candidates,
)
self._record_instagram_ocr_fallback(
crawl_artifact,
image_count=len(image_result.image_urls),
ocr_text_count=len(all_ocr_texts),
image_fetch_timed_out=image_result.timed_out,
image_fetch_error=image_result.error,
batch_size=INSTAGRAM_OCR_BATCH_SIZE,
)
await self._persist_outputs(
job=job,
crawl_artifact=crawl_artifact,
extraction_result=final_extraction_result,
place_candidates=place_candidates,
resolved_places=resolved_places,
)
last_persisted_resolved_count = len(resolved_places)

final_augmented_text = build_instagram_ocr_augmented_content(
caption=original_caption,
ocr_texts=all_ocr_texts,
)
if final_augmented_text:
crawl_artifact.content_text = final_augmented_text
self._record_instagram_ocr_fallback(
crawl_artifact,
image_count=len(image_result.image_urls),
ocr_text_count=len(ocr_texts),
ocr_text_count=len(all_ocr_texts),
image_fetch_timed_out=image_result.timed_out,
image_fetch_error=image_result.error,
batch_size=INSTAGRAM_OCR_BATCH_SIZE,
)
if not augmented_text:

final_extraction_result = _merge_extraction_results(
extraction_results,
max_places=self._settings.extraction_max_candidates,
)
if final_extraction_result is None and not place_candidates and not resolved_places:
return None
crawl_artifact.content_text = augmented_text
return await self._run_extraction(original_url, crawl_artifact)
return final_extraction_result, place_candidates, resolved_places
except Exception:
logger.exception("instagram image OCR fallback failed original_url=%s", original_url)
if self._settings.extraction_failure_retry_enabled:
Expand Down Expand Up @@ -521,6 +690,7 @@ def _record_instagram_ocr_fallback(
ocr_text_count: int,
image_fetch_timed_out: bool,
image_fetch_error: str | None,
batch_size: int | None = None,
) -> None:
raw_metadata = dict(crawl_artifact.raw_metadata or {})
instagram_metadata = dict(raw_metadata.get("instagram") or {})
Expand All @@ -531,6 +701,8 @@ def _record_instagram_ocr_fallback(
"image_fetch_timed_out": image_fetch_timed_out,
"image_fetch_error": image_fetch_error,
}
if batch_size is not None:
instagram_metadata["ocr_fallback"]["batch_size"] = batch_size
raw_metadata["instagram"] = instagram_metadata
raw_metadata["extraction_source"] = "instagram_og_meta_with_image_ocr_fallback"
crawl_artifact.raw_metadata = raw_metadata
Expand Down
5 changes: 3 additions & 2 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ def test_instagram_image_fetch_and_hf_ocr_defaults_are_separate_from_caption_ext
settings = Settings()

assert settings.instagram_image_fetch_timeout_ms == 8000
assert settings.instagram_image_fetch_max_images == 10
assert settings.instagram_image_fetch_max_next_clicks == 10
assert settings.instagram_image_fetch_max_images == 20
assert settings.instagram_image_fetch_max_next_clicks == 20
assert settings.extraction_max_candidates == 40
assert settings.hf_ocr_endpoint_url == ""
assert settings.hf_ocr_api_token == ""
assert settings.hf_ocr_model_name == "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8"
Loading
Loading