Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions lovdata_pipeline/domain/embedding_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Embedding provider abstraction.

Decouples the embedding logic from specific providers (OpenAI, Cohere, local models, etc.)
"""

from typing import Protocol


class EmbeddingProvider(Protocol):
"""Protocol for embedding providers.

This allows swapping between different embedding implementations
(OpenAI, Cohere, local models, etc.) without changing business logic.
"""

def embed_batch(self, texts: list[str]) -> list[list[float]]:
"""Embed a batch of texts.

Args:
texts: List of text strings to embed

Returns:
List of embedding vectors (one per input text)

Raises:
Exception: If embedding fails
"""
...

def get_model_name(self) -> str:
"""Get the model identifier used for these embeddings.

Returns:
Model name/identifier string
"""
...
1 change: 1 addition & 0 deletions lovdata_pipeline/domain/services/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Domain services for legal document processing."""
58 changes: 58 additions & 0 deletions lovdata_pipeline/domain/services/chunking_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""Chunking service for legal articles.

Responsible for splitting articles into appropriately-sized chunks.
Single Responsibility: Coordinate article chunking.
"""

from lovdata_pipeline.domain.models import ChunkMetadata
from lovdata_pipeline.domain.parsers.xml_chunker import LegalArticle
from lovdata_pipeline.domain.services.xml_parsing_service import ParsedArticle
from lovdata_pipeline.domain.splitters.recursive_splitter import XMLAwareRecursiveSplitter


class ChunkingService:
"""Service for chunking legal articles.

Single Responsibility: Split articles into chunks that fit within token limits.
"""

def __init__(self, max_tokens: int):
"""Initialize chunking service.

Args:
max_tokens: Maximum tokens per chunk
"""
self._max_tokens = max_tokens
self._splitter = XMLAwareRecursiveSplitter(max_tokens=max_tokens)

def chunk_article(
self,
article: ParsedArticle,
doc_id: str,
dataset: str,
) -> list[ChunkMetadata]:
"""Split an article into chunks.

Args:
article: Parsed article to chunk
doc_id: Document identifier
dataset: Dataset name

Returns:
List of chunk metadata objects
"""
# Convert ParsedArticle to LegalArticle format expected by splitter
# Note: We don't parse paragraphs separately in current implementation
legal_article = LegalArticle(
article_id=article.article_id,
content=article.content,
paragraphs=[], # Current implementation extracts text directly
section_heading=article.heading,
absolute_address=article.address,
document_id=doc_id,
)

# Use the splitter to create chunks
chunks = self._splitter.split_article(legal_article, dataset_name=dataset)

return chunks
81 changes: 81 additions & 0 deletions lovdata_pipeline/domain/services/embedding_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""Embedding service for generating vector embeddings.

Responsible for coordinating the embedding of text chunks.
Single Responsibility: Generate embeddings for chunks using a provider.
"""

from datetime import UTC, datetime

from lovdata_pipeline.domain.embedding_provider import EmbeddingProvider
from lovdata_pipeline.domain.models import ChunkMetadata, EnrichedChunk


class EmbeddingService:
"""Service for embedding text chunks.

Single Responsibility: Generate embeddings using a provider,
decoupled from progress tracking and specific embedding implementations.
"""

def __init__(self, provider: EmbeddingProvider, batch_size: int = 100):
"""Initialize embedding service.

Args:
provider: Embedding provider implementation
batch_size: Number of chunks to embed in each batch
"""
self._provider = provider
self._batch_size = batch_size

def embed_chunks(
self,
chunks: list[ChunkMetadata],
progress_callback: callable[[int, int], None] | None = None,
) -> list[EnrichedChunk]:
"""Embed chunks in batches.

Args:
chunks: List of chunks to embed
progress_callback: Optional callback(current, total) for progress tracking

Returns:
List of enriched chunks with embeddings

Raises:
Exception: If embedding fails
"""
all_enriched = []
total_chunks = len(chunks)
model_name = self._provider.get_model_name()

for i in range(0, len(chunks), self._batch_size):
batch = chunks[i : i + self._batch_size]
texts = [c.text for c in batch]

# Get embeddings from provider
embeddings = self._provider.embed_batch(texts)

# Create enriched chunks
embedded_at = datetime.now(UTC).isoformat()
for chunk, embedding in zip(batch, embeddings, strict=True):
enriched = EnrichedChunk(
chunk_id=chunk.chunk_id,
document_id=chunk.document_id,
dataset_name=chunk.dataset_name,
content=chunk.content,
token_count=chunk.token_count,
section_heading=chunk.section_heading,
absolute_address=chunk.absolute_address,
split_reason=chunk.split_reason,
parent_chunk_id=chunk.parent_chunk_id,
embedding=embedding,
embedding_model=model_name,
embedded_at=embedded_at,
)
all_enriched.append(enriched)

# Call progress callback if provided
if progress_callback:
progress_callback(len(all_enriched), total_chunks)

return all_enriched
153 changes: 153 additions & 0 deletions lovdata_pipeline/domain/services/file_processing_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
"""File processing service for legal documents.

Responsible for coordinating the complete processing of a single file.
Single Responsibility: Orchestrate parse -> chunk -> embed -> index for one file.
"""

import logging
from dataclasses import dataclass
from pathlib import Path

from lovdata_pipeline.domain.services.chunking_service import ChunkingService
from lovdata_pipeline.domain.services.embedding_service import EmbeddingService
from lovdata_pipeline.domain.services.xml_parsing_service import XMLParsingService
from lovdata_pipeline.domain.vector_store import VectorStoreRepository

logger = logging.getLogger(__name__)


@dataclass
class FileProcessingResult:
"""Result of processing a single file."""

success: bool
chunk_count: int
error_message: str | None = None


@dataclass
class FileInfo:
"""Information about a file to process."""

doc_id: str
path: Path
dataset: str
hash: str


class FileProcessingService:
"""Service for processing individual legal document files.

Single Responsibility: Coordinate the complete processing pipeline
for a single file (parse, chunk, embed, index).
"""

def __init__(
self,
xml_parser: XMLParsingService,
chunking_service: ChunkingService,
embedding_service: EmbeddingService,
vector_store: VectorStoreRepository,
):
"""Initialize file processing service.

Args:
xml_parser: Service for parsing XML files
chunking_service: Service for chunking articles
embedding_service: Service for generating embeddings
vector_store: Repository for storing vectors
"""
self._xml_parser = xml_parser
self._chunking_service = chunking_service
self._embedding_service = embedding_service
self._vector_store = vector_store

def process_file(
self,
file_info: FileInfo,
progress_callback: callable[[int, int], None] | None = None,
warning_callback: callable[[str], None] | None = None,
) -> FileProcessingResult:
"""Process a single file through the complete pipeline.

Args:
file_info: Information about the file to process
progress_callback: Optional callback(current, total) for embedding progress
warning_callback: Optional callback(message) for warnings

Returns:
FileProcessingResult with success status, chunk count, and optional error
"""
vector_ids_to_cleanup = []

try:
# 1. Validate file exists
if not file_info.path.exists():
return FileProcessingResult(
success=False,
chunk_count=0,
error_message=f"File not found: {file_info.path}",
)

# 2. Parse XML
articles = self._xml_parser.parse_file(file_info.path)
if not articles:
if warning_callback:
warning_callback(f"No articles in {file_info.doc_id}")
return FileProcessingResult(success=True, chunk_count=0)

# 3. Chunk articles
all_chunks = []
for article in articles:
chunks = self._chunking_service.chunk_article(
article,
file_info.doc_id,
file_info.dataset,
)
all_chunks.extend(chunks)

if not all_chunks:
return FileProcessingResult(success=True, chunk_count=0)

logger.debug(f" Chunked: {len(all_chunks)} chunks")

# 4. Embed chunks
enriched = self._embedding_service.embed_chunks(
all_chunks,
progress_callback=progress_callback,
)
logger.debug(f" Embedded: {len(enriched)} chunks")

# 5. Generate vector IDs
vector_ids = [f"{file_info.doc_id}_chunk_{i}" for i in range(len(enriched))]
vector_ids_to_cleanup = vector_ids # Track for cleanup on failure

# Set IDs on enriched chunks
for chunk, vid in zip(enriched, vector_ids, strict=True):
chunk.chunk_id = vid

# 6. Index in vector store (upsert = replace old if exists)
self._vector_store.upsert_chunks(enriched)
logger.debug(f" Indexed: {len(vector_ids)} vectors")

return FileProcessingResult(
success=True,
chunk_count=len(all_chunks),
)

except Exception as e:
logger.debug(f" Failed: {e}")

# Clean up any partial vectors that may have been indexed
if vector_ids_to_cleanup:
try:
self._vector_store.delete_by_document_id(vector_ids_to_cleanup)
logger.debug(f" Cleaned up {len(vector_ids_to_cleanup)} partial vectors")
except Exception as cleanup_error:
logger.debug(f" Failed to clean up partial vectors: {cleanup_error}")

return FileProcessingResult(
success=False,
chunk_count=0,
error_message=str(e),
)
Loading
Loading