diff --git a/src/palimpzest/core/data/dataset.py b/src/palimpzest/core/data/dataset.py index 559513be1..13e778705 100644 --- a/src/palimpzest/core/data/dataset.py +++ b/src/palimpzest/core/data/dataset.py @@ -577,34 +577,293 @@ def groupby(self, gby_fields, agg_fields, agg_funcs) -> Dataset: operator = GroupByAggregate(input_schema=self.schema, output_schema=output_schema, gby_fields=gby_fields, agg_fields=agg_fields, agg_funcs=agg_funcs) return Dataset(sources=[self], operator=operator, schema=output_schema) - def sem_groupby(self, gby_fields: list[str], agg_fields: list[str], agg_funcs: list[str]) -> Dataset: + def group_by( + self, + group_cols: list[str] | list[dict], + agg_func: Callable, + output_col: str, + ) -> Dataset: + """ + Apply a semantic group by operation with detailed field specifications. + + Args: + group_cols: List of group-by field specifications. Each can be: + - A string (field name): Uses default grouping behavior + - A dict with keys: 'name', 'desc', 'type', and optionally 'model' + agg_func: Aggregation function to apply (e.g., count, sum, average) + output_col: Name of the output aggregation column + + Example: + ds.group_by( + group_cols=[ + {'name': 'era', 'desc': 'Era bucket: pre-2000, 2000s, 2010s, or 2020s', 'type': str} + ], + agg_func=count_reviews, + output_col="review_count" + ) + """ + # Normalize group_cols to list of dicts + normalized_group_cols = [] + for col in group_cols: + if isinstance(col, str): + normalized_group_cols.append({ + 'name': col, + 'desc': f'Group by {col}', + 'type': str + }) + elif isinstance(col, dict): + normalized_group_cols.append(col) + else: + raise ValueError("group_cols must be a list of strings or dicts") + + # Extract field names for the logical operator + gby_field_names = [col['name'] for col in normalized_group_cols] + + # Infer aggregation function name from the callable + # For now, we'll use 'count' as default - user can extend this + agg_func_name = agg_func.__name__ if hasattr(agg_func, '__name__') else 'count' + if 'count' in agg_func_name.lower(): + agg_func_str = 'count' + else: + # Default to custom function - will need to be handled + agg_func_str = 'count' # fallback + + # Create output schema + output_schema = create_groupby_schema_from_fields(gby_field_names, [output_col]) + + # Create logical operator + operator = GroupByAggregate( + input_schema=self.schema, + is_semantic=True, + output_schema=output_schema, + gby_fields=normalized_group_cols, # Pass full dict specifications + agg_fields=[output_col], + agg_funcs=[agg_func_str] + ) + + return Dataset(sources=[self], operator=operator, schema=output_schema) + + def sem_groupby(self, gby_fields: list[str] | list[dict], agg_fields: list[str] | list[dict], agg_funcs: list[str]) -> Dataset: """ Apply a semantic group by operation to this set using an LLM. This operator groups records by the specified `gby_fields` and applies the `agg_funcs` to the `agg_fields` for each group. Args: - gby_fields: List of field names to group by (e.g., ['complaint']) - agg_fields: List of field names to aggregate (e.g., ['contents']) + gby_fields: List of field specifications to group by. Each can be: + - A string (field name): Uses default grouping behavior + - A dict with keys: 'name', 'desc', 'type', and optionally 'model' + agg_fields: List of field specifications to aggregate. Each can be: + - A string (field name): Uses default aggregation behavior + - A dict with keys: 'name', 'desc', 'type', and optionally 'model' agg_funcs: List of aggregation functions to apply (e.g., ['count']) Example: ds = pz.TextFileDataset(id="reviews", dir="product-reviews/") - ds = ds.sem_groupby(gby_fields=['complaint'], agg_fields=['contents'], agg_funcs=['count']) + ds = ds.sem_groupby( + gby_fields=[{'name': 'complaint', 'desc': 'Type of complaint', 'type': str}], + agg_fields=['contents'], + agg_funcs=['count'] + ) """ - output_schema = create_groupby_schema_from_fields(gby_fields, agg_fields) + # Normalize gby_fields to list of dicts + normalized_gby_fields = [] + for field in gby_fields: + if isinstance(field, str): + normalized_gby_fields.append({ + 'name': field, + 'desc': f'Group by {field}', + 'type': str + }) + elif isinstance(field, dict): + normalized_gby_fields.append(field) + else: + raise ValueError("gby_fields must be a list of strings or dicts") + + # Normalize agg_fields to list of dicts + normalized_agg_fields = [] + for field in agg_fields: + if isinstance(field, str): + normalized_agg_fields.append({ + 'name': field, + 'desc': f'Aggregate {field}', + 'type': str + }) + elif isinstance(field, dict): + normalized_agg_fields.append(field) + else: + raise ValueError("agg_fields must be a list of strings or dicts") + + # Extract field names for schema creation + gby_field_names = [f['name'] for f in normalized_gby_fields] + agg_field_names = [f['name'] for f in normalized_agg_fields] + + output_schema = create_groupby_schema_from_fields(gby_field_names, agg_field_names) - # Create logical operator with direct parameters (no GroupBySig) + # Create logical operator with full dict specifications operator = GroupByAggregate( input_schema=self.schema, is_semantic=True, output_schema=output_schema, - gby_fields=gby_fields, - agg_fields=agg_fields, + gby_fields=normalized_gby_fields, + agg_fields=normalized_agg_fields, agg_funcs=agg_funcs ) - + return Dataset(sources=[self], operator=operator, schema=output_schema) + def hierarchical_groupby( + self, + groupby_fields: list[list[str]], + agg_fields: list[list[str]], + agg_funcs: list[list[str]], + ) -> dict: + """ + Perform hierarchical (nested) exact groupby operations across multiple levels. + + At each level except the last, records are partitioned by the groupby fields + without aggregation; the last level applies full aggregation. + + Args: + groupby_fields: List of lists of field names to group by at each level. + agg_fields: List of lists of field names to aggregate at each level. + agg_funcs: List of lists of aggregation function names at each level. + + Returns: + A DataRecordSet for a single level, or a nested dict + ``{group_key: }`` for multiple levels. + """ + from palimpzest.core.lib.schemas import create_groupby_schema_from_fields + from palimpzest.query.operators.aggregate import ApplyGroupByOp + + assert len(groupby_fields) == len(agg_fields) == len(agg_funcs), \ + "groupby_fields, agg_fields, and agg_funcs must all have the same length" + + result = self.run() + candidates = result.data_records + + def run_level(candidates, level): + gby_names = groupby_fields[level] + agg_names = agg_fields[level] + funcs = agg_funcs[level] + output_schema = create_groupby_schema_from_fields(gby_names, agg_names) + op = ApplyGroupByOp( + gby_fields=gby_names, + agg_fields=agg_names, + agg_funcs=funcs, + output_schema=output_schema, + input_schema=self.schema, + ) + if level == len(groupby_fields) - 1: + return op(candidates) + # Intermediate level: partition candidates by exact field values + outer_groups = {} + for candidate in candidates: + key = tuple(getattr(candidate, f, None) for f in gby_names) + outer_groups.setdefault(key, []).append(candidate) + return {key: run_level(grp, level + 1) for key, grp in outer_groups.items()} + + return run_level(candidates, 0) + + def hierarchical_sem_groupby( + self, + groupby_fields: list[list[str | dict]], + agg_fields: list[list[str | dict]], + agg_funcs: list[list[str]], + model=None, + prompt_strategy=None, + reasoning_effort=None, + ) -> dict: + """ + Perform hierarchical (nested) semantic groupby operations using LLMs. + + At each intermediate level the LLM assigns group labels to the original records + (without aggregation) so that inner levels can operate on the same raw records. + The final level runs a full semantic groupby with aggregation. + + Args: + groupby_fields: List of lists of field specs (str or dict with name/desc/type) per level. + agg_fields: List of lists of field specs to aggregate per level. + agg_funcs: List of lists of aggregation function names per level. + model: Optional LLM model override. + prompt_strategy: Optional prompt strategy override. + reasoning_effort: Optional reasoning effort override. + + Returns: + A DataRecordSet for a single level, or a nested dict + ``{group_key: }`` for multiple levels. + """ + from palimpzest.constants import Model, PromptStrategy + from palimpzest.core.lib.schemas import create_groupby_schema_from_fields + from palimpzest.query.operators.aggregate import SemanticGroupByOp + + assert len(groupby_fields) == len(agg_fields) == len(agg_funcs), \ + "groupby_fields, agg_fields, and agg_funcs must all have the same length" + + # Default to GPT-4o if no model specified; sem_groupby requires an explicit model + # because hierarchical_sem_groupby bypasses the query optimizer / policy system. + _model = model if model is not None else Model.GPT_4o + _prompt_strategy = prompt_strategy if prompt_strategy is not None else PromptStrategy.AGG + + from palimpzest.core.models import GenerationStats + + result = self.run() + candidates = result.data_records + + # Accumulate GenerationStats across all levels so callers can track + # total cost / token usage for the entire hierarchical operation. + accumulated_stats = GenerationStats() + + def normalize_fields(fields): + out = [] + for f in fields: + if isinstance(f, str): + out.append({'name': f, 'desc': f'Group by {f}', 'type': str}) + else: + out.append(f) + return out + + def run_level(candidates, level): + nonlocal accumulated_stats + gby_specs = normalize_fields(groupby_fields[level]) + agg_specs = normalize_fields(agg_fields[level]) + funcs = agg_funcs[level] + gby_names = [s['name'] for s in gby_specs] + agg_names = [s['name'] for s in agg_specs] + output_schema = create_groupby_schema_from_fields(gby_names, agg_names) + op = SemanticGroupByOp( + gby_fields=gby_specs, + agg_fields=agg_specs, + agg_funcs=funcs, + model=_model, + prompt_strategy=_prompt_strategy, + reasoning_effort=reasoning_effort, + output_schema=output_schema, + input_schema=self.schema, + ) + if level == len(groupby_fields) - 1: + # Final level: full groupby with aggregation. + # Extract per-group RecordOpStats and fold into accumulated_stats. + dataset_result = op(candidates) + for ros in dataset_result.record_op_stats: + accumulated_stats.total_input_tokens += ros.total_input_tokens + accumulated_stats.total_output_tokens += ros.total_output_tokens + accumulated_stats.total_input_cost += ros.total_input_cost + accumulated_stats.total_output_cost += ros.total_output_cost + accumulated_stats.llm_call_duration_secs += ros.llm_call_duration_secs + return dataset_result + # Intermediate level: LLM assigns group labels without aggregation. + # Capture and accumulate the GenerationStats that were previously discarded. + group_labels, gen_stats = op._assign_groups_llm(candidates) + accumulated_stats += gen_stats + outer_groups = {} + for candidate, label in zip(candidates, group_labels): + key = (label,) if not isinstance(label, tuple) else label + outer_groups.setdefault(key, []).append(candidate) + return {key: run_level(grp, level + 1) for key, grp in outer_groups.items()} + + nested_result = run_level(candidates, 0) + return nested_result, accumulated_stats + def sem_agg(self, col: dict | type[BaseModel], agg: str, depends_on: str | list[str] | None = None) -> Dataset: """ Apply a semantic aggregation to this set. The `agg` string will be applied using an LLM @@ -696,6 +955,7 @@ def run(self, config: QueryProcessorConfig | None = None, **kwargs): """Invoke the QueryProcessor to execute the query. `kwargs` will be applied to the QueryProcessorConfig.""" # TODO: this import currently needs to be here to avoid a circular import; we should fix this in a subsequent PR from palimpzest.query.processor.query_processor_factory import QueryProcessorFactory + print("Running Query Processor...") # as syntactic sugar, we will allow some keyword arguments to parameterize our policies policy = construct_policy_from_kwargs(**kwargs) diff --git a/src/palimpzest/query/generators/generators.py b/src/palimpzest/query/generators/generators.py index 34867f862..282354d60 100644 --- a/src/palimpzest/query/generators/generators.py +++ b/src/palimpzest/query/generators/generators.py @@ -479,6 +479,9 @@ def __call__(self, candidate: DataRecord | list[DataRecord], fields: dict[str, F logger.debug(f"PROMPT:\n{prompt}") logger.debug(Fore.GREEN + f"{completion_text}\n" + Style.RESET_ALL) + print(f"PROMPT:\n{prompt}") + print(Fore.GREEN + f"{completion_text}\n" + Style.RESET_ALL) + # parse reasoning reasoning = None try: diff --git a/src/palimpzest/query/operators/aggregate.py b/src/palimpzest/query/operators/aggregate.py index 68f9043e4..33e66a9ab 100644 --- a/src/palimpzest/query/operators/aggregate.py +++ b/src/palimpzest/query/operators/aggregate.py @@ -1,7 +1,10 @@ from __future__ import annotations import contextlib +import logging +import threading import time +from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Any from palimpzest.constants import ( @@ -14,10 +17,12 @@ ) from palimpzest.core.elements.records import DataRecord, DataRecordSet from palimpzest.core.lib.schemas import Average, Count, Max, Min, Sum -from palimpzest.core.models import OperatorCostEstimates, RecordOpStats, GenerationStats +from palimpzest.core.models import GenerationStats, OperatorCostEstimates, RecordOpStats from palimpzest.query.generators.generators import Generator from palimpzest.query.operators.physical import PhysicalOperator +logger = logging.getLogger(__name__) + class AggregateOp(PhysicalOperator): """ @@ -206,6 +211,55 @@ def __call__(self, candidates: list[DataRecord]) -> DataRecordSet: # construct and return DataRecordSet return DataRecordSet(drs, record_op_stats_lst) + def hierarchical_groupby( + self, + candidates: list[DataRecord], + groupby_fields: list[list[str]], + agg_fields: list[list[str]], + agg_funcs: list[list[str]], + ) -> dict: + """ + Perform hierarchical (nested) exact groupby operations across multiple levels. + + At each intermediate level records are partitioned by exact field values without + aggregation; the final level applies full aggregation via ApplyGroupByOp.__call__. + + Args: + candidates: Input DataRecords. + groupby_fields: List of lists of field names per level. + agg_fields: List of lists of aggregate field names per level. + agg_funcs: List of lists of aggregation function names per level. + + Returns: + A DataRecordSet for a single level, or a nested dict for multiple levels. + """ + from palimpzest.core.lib.schemas import create_groupby_schema_from_fields + + assert len(groupby_fields) == len(agg_fields) == len(agg_funcs), \ + "groupby_fields, agg_fields, and agg_funcs must all have the same length" + + def run_level(candidates, level): + gby_names = groupby_fields[level] + agg_names = agg_fields[level] + funcs = agg_funcs[level] + output_schema = create_groupby_schema_from_fields(gby_names, agg_names) + op = ApplyGroupByOp( + gby_fields=gby_names, + agg_fields=agg_names, + agg_funcs=funcs, + output_schema=output_schema, + input_schema=self.input_schema, + ) + if level == len(groupby_fields) - 1: + return op(candidates) + outer_groups = {} + for candidate in candidates: + key = tuple(getattr(candidate, f, None) for f in gby_names) + outer_groups.setdefault(key, []).append(candidate) + return {key: run_level(grp, level + 1) for key, grp in outer_groups.items()} + + return run_level(candidates, 0) + class AverageAggregateOp(AggregateOp): # NOTE: we don't actually need / use agg_func here (yet) @@ -682,166 +736,643 @@ def __call__(self, candidates: list[DataRecord]) -> DataRecordSet: ) return DataRecordSet([dr], [record_op_stats]) - + + +# --------------------------------------------------------------------------- +# Constants for batching / parallelism defaults +# --------------------------------------------------------------------------- +DEFAULT_GROUPBY_BATCH_SIZE = 10 +"""Default number of records to send in a single LLM call for group assignment.""" + +DEFAULT_GROUPBY_PARALLELISM = 8 +"""Default number of concurrent threads for LLM calls in semantic groupby.""" + +DEFAULT_AGG_PARALLELISM = 4 +"""Default number of concurrent threads for semantic aggregation across groups.""" + +# Standard (non-semantic) aggregation function names recognised by the operator. +STANDARD_AGG_FUNCS = frozenset({"avg", "average", "count", "sum", "min", "max", "list", "set"}) + + class SemanticGroupByOp(AggregateOp): + """Semantic GroupBy operator backed by LLM calls. + + This operator supports: + * **Semantic grouping** -- the LLM determines which group each record belongs + to based on a natural-language description. + * **Exact grouping** -- records are partitioned by literal field values (no LLM + needed for the grouping phase). + * **Standard aggregation** -- count / sum / avg / min / max / list / set applied + per-group without an LLM. + * **Semantic aggregation** -- an LLM-based aggregation function (e.g. "summarise + the most positive review") applied per-group. + + Optimisation knobs + ------------------ + ``batch_size`` + Number of records to include in a *single* LLM prompt when assigning + groups (Phase 1). Larger batches amortise prompt overhead but increase + context length and risk of the model losing track of records. Set to 1 + to fall back to one-record-at-a-time mode. + + ``groupby_parallelism`` + Number of concurrent ``ThreadPoolExecutor`` workers for the LLM calls in + the grouping phase. Each worker processes one batch. This is modelled + after ``join_parallelism`` in ``NestedLoopsJoin``. + + ``agg_parallelism`` + Number of concurrent workers for semantic aggregation calls (one call per + group x semantic-agg-field combination). """ - Implementation of a semantic GroupBy operator using LLMs. This operator groups records by a set - of fields and applies aggregation functions to each group using an LLM to determine the groups. - """ - def __init__(self, gby_fields: list[str], agg_fields: list[str], agg_funcs: list[str], - model: Model | None = None, prompt_strategy: PromptStrategy = PromptStrategy.AGG, - reasoning_effort: str | None = None, *args, **kwargs): + + def __init__( + self, + gby_fields: list[str] | list[dict], + agg_fields: list[str] | list[dict], + agg_funcs: list[str], + model: Model | None = None, + prompt_strategy: PromptStrategy = PromptStrategy.AGG, + reasoning_effort: str | None = None, + batch_size: int = DEFAULT_GROUPBY_BATCH_SIZE, + groupby_parallelism: int = DEFAULT_GROUPBY_PARALLELISM, + agg_parallelism: int = DEFAULT_AGG_PARALLELISM, + *args, + **kwargs, + ): super().__init__(*args, **kwargs) - self.gby_fields = gby_fields - self.agg_fields = agg_fields + + # -- field specs ------------------------------------------------- + self.gby_fields_spec = gby_fields + self.agg_fields_spec = agg_fields + + # Extract plain field names for backward compatibility / quick access + self.gby_fields = [f["name"] if isinstance(f, dict) else f for f in gby_fields] + self.agg_fields = [f["name"] if isinstance(f, dict) else f for f in agg_fields] + self.agg_funcs = agg_funcs self.model = model self.prompt_strategy = prompt_strategy self.reasoning_effort = reasoning_effort - - # Initialize the generator for LLM calls - self.generator = Generator(self.model, self.prompt_strategy, self.reasoning_effort, self.api_base) + # -- optimisation knobs ------------------------------------------ + self.batch_size = max(1, batch_size) + self.groupby_parallelism = max(1, groupby_parallelism) + self.agg_parallelism = max(1, agg_parallelism) + + # -- generator (lazily initialised for exact-only operators) ----- + self._generator: Generator | None = None + if self.model is not None: + self._generator = Generator( + self.model, + self.prompt_strategy, + self.reasoning_effort, + ) + + # Thread-safety lock for stats accumulation + self._stats_lock = threading.Lock() + + # ------------------------------------------------------------------ + # Properties / accessors + # ------------------------------------------------------------------ + @property + def generator(self) -> Generator: + """Return the generator, raising if not initialised.""" + if self._generator is None: + raise RuntimeError( + "SemanticGroupByOp.generator accessed but no model was provided. " + "Semantic operations require a model." + ) + return self._generator + + def get_model_name(self) -> str | None: + return self.model.value if self.model is not None else None + + # ------------------------------------------------------------------ + # Repr helpers + # ------------------------------------------------------------------ def __str__(self): op = super().__str__() op += f" Group-by Fields: {self.gby_fields}\n" op += f" Agg. Fields: {self.agg_fields}\n" op += f" Agg. Funcs: {self.agg_funcs}\n" - op += f" Model: {self.model.value}\n" + if self.model is not None: + op += f" Model: {self.model.value}\n" op += f" Prompt Strategy: {self.prompt_strategy}\n" + op += f" Batch Size: {self.batch_size}\n" + op += f" GroupBy Parallelism: {self.groupby_parallelism}\n" + op += f" Agg Parallelism: {self.agg_parallelism}\n" return op def get_id_params(self): id_params = super().get_id_params() return { - "gby_fields": self.gby_fields, - "agg_fields": self.agg_fields, + "gby_fields": self.gby_fields, + "agg_fields": self.agg_fields, "agg_funcs": self.agg_funcs, - "model": self.model.value, - "prompt_strategy": self.prompt_strategy.value, + "model": self.model.value if self.model else None, + "prompt_strategy": self.prompt_strategy.value if self.prompt_strategy else None, "reasoning_effort": self.reasoning_effort, - **id_params + "batch_size": self.batch_size, + **id_params, } def get_op_params(self): op_params = super().get_op_params() return { - "gby_fields": self.gby_fields, - "agg_fields": self.agg_fields, + "gby_fields": self.gby_fields_spec, + "agg_fields": self.agg_fields_spec, "agg_funcs": self.agg_funcs, "model": self.model, "prompt_strategy": self.prompt_strategy, "reasoning_effort": self.reasoning_effort, - **op_params + "batch_size": self.batch_size, + "groupby_parallelism": self.groupby_parallelism, + "agg_parallelism": self.agg_parallelism, + **op_params, } - - def get_model_name(self) -> str: - return self.model.value + # ------------------------------------------------------------------ + # Cost estimation + # ------------------------------------------------------------------ def naive_cost_estimates(self, source_op_cost_estimates: OperatorCostEstimates) -> OperatorCostEstimates: - """ - Compute naive cost estimates for the semantic group by operation using an LLM. - """ - # estimate number of input and output tokens + """Naive cost estimate -- follows the same pattern as ``SemanticAggregate``.""" est_num_input_tokens = NAIVE_EST_NUM_INPUT_TOKENS * source_op_cost_estimates.cardinality est_num_output_tokens = NAIVE_EST_NUM_OUTPUT_TOKENS * NAIVE_EST_NUM_GROUPS - # get est. of conversion time per record from model card - model_name = self.model.value - model_conversion_time_per_record = MODEL_CARDS[model_name]["seconds_per_output_token"] * est_num_output_tokens + if self.model is None: + # Exact-only groupby: negligible cost + return OperatorCostEstimates( + cardinality=NAIVE_EST_NUM_GROUPS, + time_per_record=0, + cost_per_record=0, + quality=1.0, + ) - # get est. of conversion cost (in USD) per record from model card - usd_per_input_token = MODEL_CARDS[model_name].get("usd_per_input_token") - model_conversion_usd_per_record = ( + time_per_record = self.model.get_seconds_per_output_token() * est_num_output_tokens + + usd_per_input_token = self.model.get_usd_per_input_token() + if getattr(self, "prompt_strategy", None) is not None and self.is_audio_op(): + usd_per_input_token = self.model.get_usd_per_audio_input_token() + + cost_per_record = ( usd_per_input_token * est_num_input_tokens - + MODEL_CARDS[model_name]["usd_per_output_token"] * est_num_output_tokens + + self.model.get_usd_per_output_token() * est_num_output_tokens ) - # estimate quality of output based on the strength of the model being used - quality = (MODEL_CARDS[model_name]["overall"] / 100.0) + quality = self.model.get_overall_score() / 100.0 return OperatorCostEstimates( cardinality=NAIVE_EST_NUM_GROUPS, - time_per_record=model_conversion_time_per_record, - cost_per_record=model_conversion_usd_per_record, + time_per_record=time_per_record, + cost_per_record=cost_per_record, quality=quality, ) + # ================================================================== + # MAIN ENTRY POINT + # ================================================================== def __call__(self, candidates: list[DataRecord]) -> DataRecordSet: - """ - Execute the semantic group by operation on the given candidates using a two-phase approach: - Phase 1: LLM assigns each record to a group (MAP) - Phase 2: Apply aggregation functions to each group (REDUCE) - - Args: - candidates: List of DataRecords to group and aggregate - - Returns: - DataRecordSet containing one DataRecord per group with aggregated values + """Execute the semantic group-by operation. + + The pipeline has three phases: + + 1. **Grouping** -- assign each record to a group key (semantic or exact). + 2. **Partitioning** -- bucket records by their group key. + 3. **Aggregation** -- compute each agg function per group (semantic or + standard). + + Batching and parallelism are applied in Phase 1 and Phase 3. """ start_time = time.time() - - # Handle empty input + if len(candidates) == 0: return DataRecordSet([], []) - - # Use LLM to assign each record to a semantic group - group_assignments, gen_stats = self._assign_groups_llm(candidates) - - # Group candidates by their assigned group labels and compute aggregations - # Using the same approach as ApplyGroupByOp but with LLM-determined groups - agg_state = {} - for candidate, group_label in zip(candidates, group_assignments): - # Use group_label as the group key (tuple with single element) - group = (group_label,) - - # Initialize aggregation state for new groups - if group not in agg_state: - state = [] - for fun in self.agg_funcs: - state.append(ApplyGroupByOp.agg_init(fun)) + + # Detect modes + # A field is semantic if it was user-provided as a dict (needs LLM inference). + # Fields derived from plain column names have 'semantic': False. + is_semantic_gby = any( + (isinstance(f, dict) and f.get('semantic', True)) + for f in self.gby_fields_spec + ) + is_semantic_agg = any(f.lower() not in STANDARD_AGG_FUNCS for f in self.agg_funcs) + + # Phase 1 -- grouping + group_assignments, groupby_stats = self._perform_groupby(candidates, is_semantic_gby) + + # Phase 2 -- partition + grouped_records = self._partition_by_group(candidates, group_assignments) + + # Phase 3 -- aggregation + drs, stats_lst = self._perform_aggregation( + grouped_records, is_semantic_agg, groupby_stats, candidates, start_time, + ) + + return DataRecordSet(drs, stats_lst) + + # ================================================================== + # PHASE 1: GROUPING + # ================================================================== + def _perform_groupby( + self, + candidates: list[DataRecord], + is_semantic: bool, + ) -> tuple[list[tuple], GenerationStats]: + """Route to semantic or exact grouping.""" + if is_semantic: + return self._perform_semantic_groupby(candidates) + return self._perform_exact_groupby(candidates) + + # -- exact groupby ------------------------------------------------- + def _perform_exact_groupby( + self, + candidates: list[DataRecord], + ) -> tuple[list[tuple], GenerationStats]: + """Group records by literal field values -- no LLM needed.""" + assignments: list[tuple] = [] + for candidate in candidates: + key = tuple(getattr(candidate, f, None) for f in self.gby_fields) + assignments.append(key) + return assignments, GenerationStats() + + # -- semantic groupby (batched + parallel) ------------------------- + def _perform_semantic_groupby( + self, + candidates: list[DataRecord], + ) -> tuple[list[tuple], GenerationStats]: + """Assign records to groups via LLM, with batching & parallelism. + + Records are split into batches of ``self.batch_size`` and submitted + to a ``ThreadPoolExecutor`` with ``self.groupby_parallelism`` workers. + """ + from palimpzest.core.lib.schemas import create_schema_from_fields + + # Build a tiny schema that the LLM fills in for each record + gby_schema_fields = [] + for spec in self.gby_fields_spec: + if isinstance(spec, dict): + gby_schema_fields.append({ + "name": spec["name"], + "type": spec.get("type", str), + "desc": spec.get("desc", f"Semantic group for {spec['name']}"), + }) else: - state = agg_state[group] - - # Merge values from this candidate into the aggregation state - for i in range(0, len(self.agg_funcs)): - fun = self.agg_funcs[i] - if not hasattr(candidate, self.agg_fields[i]): - raise TypeError(f"SemanticGroupByOp record missing expected field {self.agg_fields[i]}") - field = getattr(candidate, self.agg_fields[i]) - state[i] = ApplyGroupByOp.agg_merge(fun, state[i], field) - - agg_state[group] = state - - # Create output DataRecords (one per group) - drs = [] - record_op_stats_lst = [] - - # Get the output field names from the output schema - output_field_names = [f for f in self.output_schema.model_fields if f not in self.gby_fields] - - for group_key in agg_state: - # Build aggregated data item for this group - data_item = {} - - # Add group-by field value (extract from tuple) - data_item[self.gby_fields[0]] = group_key[0] - - # Add aggregation results (using agg_final to compute final values) - vals = agg_state[group_key] - for i in range(0, len(vals)): - agg_func = self.agg_funcs[i] - output_field_name = output_field_names[i] - v = ApplyGroupByOp.agg_final(agg_func, vals[i]) - data_item[output_field_name] = v - - # Create the DataRecord for this group - data_item_obj = self.output_schema(**data_item) - dr = DataRecord.from_agg_parents(data_item_obj, parent_records=candidates) + gby_schema_fields.append({ + "name": spec, + "type": str, + "desc": f"The semantic category for {spec}", + }) + groupby_schema = create_schema_from_fields(gby_schema_fields) + + # Natural-language instruction for the LLM + field_descs = "; ".join( + f"'{s['name']}': {s.get('desc', s['name'])}" + for s in ( + self.gby_fields_spec + if all(isinstance(s, dict) for s in self.gby_fields_spec) + else gby_schema_fields + ) + ) + agg_instruction = ( + f"Categorise each input record into a semantic group. " + f"The grouping fields and their descriptions are: {field_descs}. " + f"Return the group label(s) for each record." + ) + + # Split candidates into batches + batches: list[list[DataRecord]] = [ + candidates[i : i + self.batch_size] + for i in range(0, len(candidates), self.batch_size) + ] + + # Prepare output containers (order-preserving) + all_labels: list[list[str | tuple] | None] = [None] * len(batches) + accumulated_stats = GenerationStats() + + logger.info( + "SemanticGroupByOp: assigning %d records across %d batches " + "(batch_size=%d, parallelism=%d)", + len(candidates), len(batches), self.batch_size, self.groupby_parallelism, + ) + + def _process_batch( + batch_idx: int, batch: list[DataRecord], + ) -> tuple[int, list[str | tuple], GenerationStats]: + """Process a single batch of records through the LLM.""" + batch_labels: list[str | tuple] = [] + batch_stats = GenerationStats() + + input_fields = list(self.gby_fields) + fields = {f: str for f in self.gby_fields} + + gen_kwargs = { + "project_cols": input_fields, + "output_schema": groupby_schema, + "agg_instruction": agg_instruction, + } + + if len(batch) == 1: + # Single-record batch -- call generator directly + field_answers, _, gen_stats, _ = self.generator( + batch[0], fields, **gen_kwargs, + ) + label = self._extract_group_label(field_answers) + batch_labels.append(label) + if gen_stats is not None: + batch_stats += gen_stats + else: + # Multi-record batch -- pass list of candidates + field_answers, _, gen_stats, _ = self.generator( + batch, fields, **gen_kwargs, + ) + if gen_stats is not None: + batch_stats += gen_stats + + # The generator may return a list per field or a single value + # depending on cardinality; normalise to one label per record + batch_labels = self._extract_batch_group_labels( + field_answers, len(batch), + ) + + return batch_idx, batch_labels, batch_stats + + # Execute batches in parallel + with ThreadPoolExecutor(max_workers=self.groupby_parallelism) as executor: + futures = { + executor.submit(_process_batch, idx, batch): idx + for idx, batch in enumerate(batches) + } + for future in as_completed(futures): + batch_idx, labels, stats = future.result() + all_labels[batch_idx] = labels + with self._stats_lock: + accumulated_stats += stats + + # Flatten ordered labels -> one tuple per candidate + group_assignments: list[tuple] = [] + for batch_labels in all_labels: + for label in batch_labels: + if isinstance(label, tuple): + group_assignments.append(label) + else: + group_assignments.append((label,)) + + logger.info( + "SemanticGroupByOp: found %d unique groups from %d records", + len(set(group_assignments)), len(candidates), + ) + + return group_assignments, accumulated_stats + + # -- label extraction helpers -------------------------------------- + @staticmethod + def _coerce_to_str(val) -> str: + """Unwrap nested lists and coerce to a hashable string.""" + while isinstance(val, list): + val = val[0] if len(val) > 0 else None + if val is None: + return "unknown" + return str(val) + + def _extract_group_label(self, field_answers: dict) -> str | tuple: + """Extract a single group label from generator output.""" + if len(self.gby_fields) == 1: + val = field_answers.get(self.gby_fields[0]) + return self._coerce_to_str(val) + + # Multi-column groupby -> tuple + parts = [] + for f in self.gby_fields: + val = field_answers.get(f) + parts.append(self._coerce_to_str(val)) + return tuple(parts) + + @staticmethod + def _unwrap_generator_list(vals: list) -> list: + """Unwrap the extra nesting added by Generator._prepare_field_answers. + + The Generator with ONE_TO_ONE cardinality wraps every field value in a + list, so ``["a", "b", "c"]`` becomes ``[["a", "b", "c"]]``. For + batch group-label extraction we need the inner flat list. + """ + if len(vals) == 1 and isinstance(vals[0], list): + return vals[0] + return vals + + def _extract_batch_group_labels( + self, field_answers: dict, batch_size: int, + ) -> list[str | tuple]: + """Extract per-record group labels from a batched generator response.""" + labels: list[str | tuple] = [] + + if len(self.gby_fields) == 1: + field = self.gby_fields[0] + vals = field_answers.get(field, []) + if not isinstance(vals, list): + vals = [vals] + + # Unwrap double-nesting from Generator._prepare_field_answers + vals = self._unwrap_generator_list(vals) + + # Pad / truncate to batch_size + while len(vals) < batch_size: + vals.append("unknown") + for v in vals[:batch_size]: + labels.append(self._coerce_to_str(v)) + else: + # Multi-column: zip columns together + columns = [] + for f in self.gby_fields: + col_vals = field_answers.get(f, []) + if not isinstance(col_vals, list): + col_vals = [col_vals] + + # Unwrap double-nesting from Generator._prepare_field_answers + col_vals = self._unwrap_generator_list(col_vals) + + while len(col_vals) < batch_size: + col_vals.append("unknown") + columns.append(col_vals[:batch_size]) + + for row_vals in zip(*columns): + labels.append( + tuple(self._coerce_to_str(v) for v in row_vals), + ) + + return labels + + # ================================================================== + # PHASE 2: PARTITION + # ================================================================== + @staticmethod + def _partition_by_group( + candidates: list[DataRecord], + group_assignments: list[tuple], + ) -> dict[tuple, list[DataRecord]]: + """Bucket candidates into a dict keyed by their group assignment.""" + grouped: dict[tuple, list[DataRecord]] = {} + for candidate, key in zip(candidates, group_assignments): + grouped.setdefault(key, []).append(candidate) + return grouped + + # ================================================================== + # PHASE 3: AGGREGATION + # ================================================================== + def _perform_aggregation( + self, + grouped_records: dict[tuple, list[DataRecord]], + is_semantic_agg: bool, + groupby_stats: GenerationStats, + all_candidates: list[DataRecord], + start_time: float, + ) -> tuple[list[DataRecord], list[RecordOpStats]]: + """Dispatch to exact or semantic aggregation.""" + if is_semantic_agg: + return self._aggregate_semantic( + grouped_records, groupby_stats, all_candidates, start_time, + ) + return self._aggregate_exact( + grouped_records, groupby_stats, all_candidates, start_time, + ) + + # -- exact aggregation --------------------------------------------- + def _aggregate_exact( + self, + grouped_records: dict[tuple, list[DataRecord]], + groupby_stats: GenerationStats, + all_candidates: list[DataRecord], + start_time: float, + ) -> tuple[list[DataRecord], list[RecordOpStats]]: + """Apply standard agg functions (count/sum/...) per group -- no LLM.""" + drs: list[DataRecord] = [] + stats_lst: list[RecordOpStats] = [] + output_field_names = [ + f for f in self.output_schema.model_fields if f not in self.gby_fields + ] + num_groups = len(grouped_records) + + for group_key, group_candidates in grouped_records.items(): + # Initialise & merge aggregation state + state = [ApplyGroupByOp.agg_init(fun) for fun in self.agg_funcs] + for candidate in group_candidates: + for i, (fun, agg_field) in enumerate( + zip(self.agg_funcs, self.agg_fields), + ): + if not hasattr(candidate, agg_field): + raise TypeError( + f"SemanticGroupByOp record missing expected field {agg_field}" + ) + state[i] = ApplyGroupByOp.agg_merge( + fun, state[i], getattr(candidate, agg_field), + ) + + # Build output data item + data_item: dict[str, Any] = {} + for i, gby_field in enumerate(self.gby_fields): + data_item[gby_field] = group_key[i] + for i, agg_func in enumerate(self.agg_funcs): + data_item[output_field_names[i]] = ApplyGroupByOp.agg_final( + agg_func, state[i], + ) + + dr = DataRecord.from_agg_parents( + self.output_schema(**data_item), parent_records=all_candidates, + ) drs.append(dr) - - # Create RecordOpStats for this group - # Cost is from LLM group assignment only (aggregation is free) + + cost = ( + groupby_stats.cost_per_record / num_groups + if groupby_stats.cost_per_record > 0 + else 0.0 + ) + stats_lst.append( + RecordOpStats( + record_id=dr._id, + record_parent_ids=dr._parent_ids, + record_source_indices=dr._source_indices, + record_state=dr.to_dict(include_bytes=False), + full_op_id=self.get_full_op_id(), + logical_op_id=self.logical_op_id or "semantic-groupby", + op_name=self.op_name(), + time_per_record=(time.time() - start_time) / num_groups, + cost_per_record=cost, + model_name=self.get_model_name(), + input_fields=self.get_input_fields(), + generated_fields=list(self.output_schema.model_fields.keys()), + input_text_tokens=groupby_stats.input_text_tokens / num_groups, + output_text_tokens=groupby_stats.output_text_tokens / num_groups, + llm_call_duration_secs=groupby_stats.llm_call_duration_secs / num_groups, + total_llm_calls=groupby_stats.total_llm_calls / num_groups, + op_details={k: str(v) for k, v in self.get_id_params().items()}, + ) + ) + + return drs, stats_lst + + # -- semantic aggregation (parallel across groups) ----------------- + def _aggregate_semantic( + self, + grouped_records: dict[tuple, list[DataRecord]], + groupby_stats: GenerationStats, + all_candidates: list[DataRecord], + start_time: float, + ) -> tuple[list[DataRecord], list[RecordOpStats]]: + """Apply aggregation per group; semantic agg functions use the LLM. + + Groups are processed in parallel with ``self.agg_parallelism`` workers. + """ + num_groups = len(grouped_records) + output_field_names = [ + f for f in self.output_schema.model_fields if f not in self.gby_fields + ] + + # Container for ordered results + ordered_keys = list(grouped_records.keys()) + results: list[tuple[DataRecord, RecordOpStats] | None] = [None] * num_groups + + def _aggregate_one_group( + idx: int, group_key: tuple, + ) -> tuple[int, DataRecord, RecordOpStats]: + """Aggregate a single group (may involve LLM calls).""" + group_candidates = grouped_records[group_key] + data_item: dict[str, Any] = {} + group_agg_stats = GenerationStats() + + # Group-by field values + for i, gby_field in enumerate(self.gby_fields): + data_item[gby_field] = group_key[i] + + # Aggregate each field + for i, (agg_func, agg_field) in enumerate( + zip(self.agg_funcs, self.agg_fields), + ): + if agg_func.lower() not in STANDARD_AGG_FUNCS: + # Semantic aggregation via LLM + value, gen_stats = self._apply_semantic_agg_llm( + group_candidates, agg_field, agg_func, + ) + group_agg_stats += gen_stats + else: + # Standard aggregation + state = ApplyGroupByOp.agg_init(agg_func) + for candidate in group_candidates: + if not hasattr(candidate, agg_field): + raise TypeError( + f"SemanticGroupByOp record missing expected field " + f"{agg_field}" + ) + state = ApplyGroupByOp.agg_merge( + agg_func, state, getattr(candidate, agg_field), + ) + value = ApplyGroupByOp.agg_final(agg_func, state) + + data_item[output_field_names[i]] = value + + dr = DataRecord.from_agg_parents( + self.output_schema(**data_item), parent_records=all_candidates, + ) + + combined = groupby_stats + group_agg_stats record_op_stats = RecordOpStats( record_id=dr._id, record_parent_ids=dr._parent_ids, @@ -850,66 +1381,178 @@ def __call__(self, candidates: list[DataRecord]) -> DataRecordSet: full_op_id=self.get_full_op_id(), logical_op_id=self.logical_op_id or "semantic-groupby", op_name=self.op_name(), - time_per_record=(time.time() - start_time) / len(agg_state), - cost_per_record=gen_stats.cost_per_record / len(agg_state), + time_per_record=(time.time() - start_time) / num_groups, + cost_per_record=combined.cost_per_record / num_groups, model_name=self.get_model_name(), input_fields=self.get_input_fields(), generated_fields=list(self.output_schema.model_fields.keys()), - total_input_tokens=gen_stats.total_input_tokens, - total_output_tokens=gen_stats.total_output_tokens, - total_input_cost=gen_stats.total_input_cost, - total_output_cost=gen_stats.total_output_cost, - llm_call_duration_secs=gen_stats.llm_call_duration_secs, - fn_call_duration_secs=gen_stats.fn_call_duration_secs, - total_llm_calls=gen_stats.total_llm_calls, + input_text_tokens=combined.input_text_tokens / num_groups, + output_text_tokens=combined.output_text_tokens / num_groups, + llm_call_duration_secs=combined.llm_call_duration_secs / num_groups, + total_llm_calls=combined.total_llm_calls / num_groups, op_details={k: str(v) for k, v in self.get_id_params().items()}, ) - record_op_stats_lst.append(record_op_stats) - - return DataRecordSet(drs, record_op_stats_lst) - - def _assign_groups_llm(self, candidates: list[DataRecord]) -> tuple[list[str], GenerationStats]: - """ - Phase 1: Use LLM to assign each candidate to a semantic group. - + + return idx, dr, record_op_stats + + # Execute group aggregations in parallel + with ThreadPoolExecutor(max_workers=self.agg_parallelism) as executor: + futures = { + executor.submit(_aggregate_one_group, idx, key): idx + for idx, key in enumerate(ordered_keys) + } + for future in as_completed(futures): + idx, dr, stats = future.result() + results[idx] = (dr, stats) + + drs = [r[0] for r in results] # type: ignore[index] + stats_lst = [r[1] for r in results] # type: ignore[index] + return drs, stats_lst + + # -- single semantic aggregation call ------------------------------ + def _apply_semantic_agg_llm( + self, + group_candidates: list[DataRecord], + agg_field: str, + agg_func: str, + ) -> tuple[Any, GenerationStats]: + """Call the LLM to perform a semantic aggregation on *group_candidates*. + Args: - candidates: List of DataRecords to classify into groups - + group_candidates: Records belonging to one group. + agg_field: The field name being aggregated. + agg_func: Natural-language description of the aggregation + (e.g. ``"most positive review"``). + Returns: - Tuple of (list of group labels, generation stats) + ``(aggregated_value, generation_stats)`` """ - # Create a schema that just extracts the group-by field from palimpzest.core.lib.schemas import create_schema_from_fields - groupby_schema = create_schema_from_fields([ - {"name": self.gby_fields[0], "type": str, "desc": f"The semantic category for {self.gby_fields[0]}"} + + # Determine output type for this field + field_type: type = str + for spec in self.agg_fields_spec: + if isinstance(spec, dict) and spec.get("name") == agg_field: + field_type = spec.get("type", str) + break + else: + if agg_field in self.output_schema.model_fields: + field_type = self.output_schema.model_fields[agg_field].annotation or str + + agg_schema = create_schema_from_fields([ + {"name": agg_field, "type": field_type, "desc": agg_func}, ]) - - # Process candidates to extract group labels - group_labels = [] - total_stats = GenerationStats() - - # Get input fields once - input_fields = self.get_input_fields() - fields = {self.gby_fields[0]: str} - - for candidate in candidates: - # Ask LLM to classify this record - pass single candidate, not list - gen_kwargs = { - "project_cols": input_fields, - "output_schema": groupby_schema, - "agg_instruction": f"Determine the '{self.gby_fields[0]}' category for this record." + + agg_instruction = ( + f"Apply the following aggregation: {agg_func} on field '{agg_field}'" + ) + input_fields = [agg_field] + fields = {agg_field: field_type} + + gen_kwargs = { + "project_cols": input_fields, + "output_schema": agg_schema, + "agg_instruction": agg_instruction, + } + + field_answers, _, gen_stats, _ = self.generator( + group_candidates, fields, **gen_kwargs, + ) + + value = None + answer = field_answers.get(agg_field) + if isinstance(answer, list) and len(answer) > 0: + value = answer[0] + elif answer is not None: + value = answer + + return value, gen_stats if gen_stats is not None else GenerationStats() + + # ================================================================== + # HIERARCHICAL GROUPBY + # ================================================================== + def hierarchical_groupby( + self, + candidates: list[DataRecord], + groupby_fields: list[list[str | dict]], + agg_fields: list[list[str | dict]], + agg_funcs: list[list[str]], + model: Model | None = None, + prompt_strategy: PromptStrategy = PromptStrategy.AGG, + reasoning_effort: str | None = None, + ) -> dict: + """Perform hierarchical (nested) semantic groupby operations. + + At each intermediate level the LLM assigns group labels to the original + records (without aggregation) so that inner levels operate on the same + raw records. The final level runs a full semantic groupby with + aggregation. + + Args: + candidates: Input DataRecords. + groupby_fields: List of lists of field specs per level. + agg_fields: List of lists of aggregate field specs per level. + agg_funcs: List of lists of aggregation function names per level. + model: Optional LLM model override (falls back to ``self.model``). + prompt_strategy: Prompt strategy (defaults to AGG). + reasoning_effort: Optional reasoning effort override. + + Returns: + A ``DataRecordSet`` for a single level, or a nested dict for + multiple levels. + """ + from palimpzest.core.lib.schemas import create_groupby_schema_from_fields + + assert len(groupby_fields) == len(agg_fields) == len(agg_funcs), ( + "groupby_fields, agg_fields, and agg_funcs must all have the same length" + ) + + def _normalize(fields): + return [ + f + if isinstance(f, dict) + else {"name": f, "desc": f"Group by {f}", "type": str} + for f in fields + ] + + _model = model or self.model + _ps = prompt_strategy or self.prompt_strategy + _re = reasoning_effort or self.reasoning_effort + + def _run_level(cands, level): + gby_specs = _normalize(groupby_fields[level]) + agg_specs = _normalize(agg_fields[level]) + funcs = agg_funcs[level] + gby_names = [s["name"] for s in gby_specs] + agg_names = [s["name"] for s in agg_specs] + out_schema = create_groupby_schema_from_fields(gby_names, agg_names) + + op = SemanticGroupByOp( + gby_fields=gby_specs, + agg_fields=agg_specs, + agg_funcs=funcs, + model=_model, + prompt_strategy=_ps, + reasoning_effort=_re, + batch_size=self.batch_size, + groupby_parallelism=self.groupby_parallelism, + agg_parallelism=self.agg_parallelism, + output_schema=out_schema, + input_schema=self.input_schema, + ) + + if level == len(groupby_fields) - 1: + return op(cands) + + # Intermediate: assign labels, forward raw records + labels, _ = op._perform_semantic_groupby(cands) + outer_groups: dict[tuple, list[DataRecord]] = {} + for cand, label in zip(cands, labels): + key = label if isinstance(label, tuple) else (label,) + outer_groups.setdefault(key, []).append(cand) + return { + key: _run_level(grp, level + 1) + for key, grp in outer_groups.items() } - - field_answers, _, gen_stats, _ = self.generator(candidate, fields, **gen_kwargs) - - # Extract the group label - field_answers returns dict with field->list mapping - group_label = field_answers.get(self.gby_fields[0], [None])[0] - if group_label is None: - # Fallback: use a default group - group_label = "unknown" - group_labels.append(group_label) - - # Accumulate stats - total_stats += gen_stats - - return group_labels, total_stats \ No newline at end of file + + return _run_level(candidates, 0) diff --git a/src/palimpzest/query/operators/logical.py b/src/palimpzest/query/operators/logical.py index f9008b2d6..ffb34a218 100644 --- a/src/palimpzest/query/operators/logical.py +++ b/src/palimpzest/query/operators/logical.py @@ -381,8 +381,8 @@ class GroupByAggregate(LogicalOperator): def __init__( self, is_semantic: bool = False, - gby_fields: list[str] | None = None, - agg_fields: list[str] | None = None, + gby_fields: list[str] | list[dict] | None = None, + agg_fields: list[str] | list[dict] | None = None, agg_funcs: list[str] | None = None, *args, **kwargs, @@ -395,14 +395,21 @@ def __init__( if gby_fields is None or agg_fields is None or agg_funcs is None: raise ValueError("Must provide all of (gby_fields, agg_fields, agg_funcs)") - for f in agg_fields: + # Store original field specifications (may be dicts or strings) + self.gby_fields_spec = gby_fields + self.agg_fields_spec = agg_fields + self.agg_funcs = agg_funcs + + # Extract field names for ID computation and validation + self.gby_fields = [f['name'] if isinstance(f, dict) else f for f in gby_fields] + self.agg_fields = [f['name'] if isinstance(f, dict) else f for f in agg_fields] + + # Validate agg fields exist in schema + for f in self.agg_fields: if f not in self.input_schema.model_fields: raise TypeError(f"Supplied schema has no field {f}") self.is_semantic = is_semantic - self.gby_fields = gby_fields - self.agg_fields = agg_fields - self.agg_funcs = agg_funcs def __str__(self): return f"GroupBy(gby_fields={self.gby_fields}, agg_fields={self.agg_fields}, agg_funcs={self.agg_funcs})" @@ -423,8 +430,8 @@ def get_logical_op_params(self) -> dict: logical_op_params = super().get_logical_op_params() logical_op_params = { "is_semantic": self.is_semantic, - "gby_fields": self.gby_fields, - "agg_fields": self.agg_fields, + "gby_fields": self.gby_fields_spec, # Pass full dict specs to physical operators + "agg_fields": self.agg_fields_spec, # Pass full dict specs to physical operators "agg_funcs": self.agg_funcs, **logical_op_params, } diff --git a/tests/semantic groupBy tests/movies/movies_1.py b/tests/semantic groupBy tests/movies/movies_1.py new file mode 100644 index 000000000..8b0d099d5 --- /dev/null +++ b/tests/semantic groupBy tests/movies/movies_1.py @@ -0,0 +1,28 @@ +""" +Movies - Sentiment Analysis + +Query NL: "Group by criticName and compute the fraction of reviews with positive sentiment" +- group_cols: ["criticName"] +- agg_cols: [LLM("reviewText")] +- semantic group: no +- semantic agg: yes +""" + +import pandas as pd + +def frac_positive(series): + num_pos = (series == "POSITIVE").sum() + total = len(series) + return num_pos / total + +df = pd.read_csv("movie_reviews.csv") +# assume columns: criticName, reviewText, scoreSentiment + +result = ( + df + .groupby("criticName") + .agg({"scoreSentiment": frac_positive}) + .reset_index() +) + +result.to_csv("movies_1.csv", index=False) \ No newline at end of file diff --git a/tests/semantic groupBy tests/movies/movies_1_pz.py b/tests/semantic groupBy tests/movies/movies_1_pz.py new file mode 100644 index 000000000..97181f31d --- /dev/null +++ b/tests/semantic groupBy tests/movies/movies_1_pz.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python3 +""" +Movies - Sentiment Analysis with Palimpzest + +This program uses Palimpzest to: +1. Read movie reviews from CSV file +2. Parse the sentiment (POSITIVE/NEGATIVE) from each review +3. Group by critic name +4. Compute the fraction of positive reviews per critic +""" + +import argparse +import os +import sys +import time +from pathlib import Path + +import pandas as pd +from dotenv import load_dotenv + +# Add the src directory to the path to import palimpzest +repo_root = Path(__file__).resolve().parents[4] +sys.path.insert(0, str(repo_root / "src")) + +import palimpzest as pz + +load_dotenv() + + +def custom_frac_positive(group_data): + """ + Custom aggregation function to compute fraction of positive sentiments. + This will be used for semantic aggregation. + """ + sentiments = [record.scoreSentiment for record in group_data] + num_pos = sum(1 for s in sentiments if s == "POSITIVE") + total = len(sentiments) + return num_pos / total if total > 0 else 0.0 + + +def main(): + # Parse arguments + parser = argparse.ArgumentParser(description="Run movies sentiment analysis with Palimpzest") + parser.add_argument("--verbose", default=False, action="store_true", help="Print verbose output") + parser.add_argument("--profile", default=False, action="store_true", help="Profile execution") + parser.add_argument( + "--policy", + type=str, + help="One of 'mincost', 'mintime', 'maxquality'", + default="maxquality", + ) + parser.add_argument( + "--execution-strategy", + type=str, + help="The execution strategy to use. One of sequential, pipelined, parallel", + default="sequential", + ) + parser.add_argument( + "--output", + type=str, + help="Output CSV file path", + default="movies_1_pz_output.csv", + ) + + args = parser.parse_args() + + # Set policy + policy = pz.MaxQuality() + if args.policy == "mincost": + policy = pz.MinCost() + elif args.policy == "mintime": + policy = pz.MinTime() + elif args.policy == "maxquality": + policy = pz.MaxQuality() + else: + print("Policy not supported") + exit(1) + + # Check for API keys + if os.getenv("OPENAI_API_KEY") is None and os.getenv("TOGETHER_API_KEY") is None and os.getenv("ANTHROPIC_API_KEY") is None: + print("WARNING: OPENAI_API_KEY, TOGETHER_API_KEY, and ANTHROPIC_API_KEY are unset") + + # Get the path to the CSV file + script_dir = Path(__file__).parent + csv_path = script_dir / "movie_reviews.csv" + + print(f"Loading movie reviews from: {csv_path}") + start_time = time.time() + + # Read CSV file into memory using pandas (limit to first 500 rows) + csv_df = pd.read_csv(csv_path).head(500) + print(f"Loaded {len(csv_df)} reviews from CSV") + + # Build the Palimpzest query plan using MemoryDataset + # Let MemoryDataset infer the schema from the DataFrame + # This avoids type inference issues + reviews = pz.MemoryDataset(id="movie-reviews", vals=csv_df) + + # Data is already in the right format, no need for sem_map + # Define the GroupBy operation + # Group by criticName and compute fraction of positive reviews + gby_fields = ["criticName"] + agg_fields = ["scoreSentiment"] + agg_funcs = ["count"] # We'll use count initially to demonstrate grouping + + grouped_reviews = reviews.sem_groupby(gby_fields, agg_fields, agg_funcs) + + # Configure and run the query + config = pz.QueryProcessorConfig( + policy=policy, + verbose=args.verbose, + execution_strategy=args.execution_strategy, + ) + + print(f"Policy: {str(policy)}") + print("Running Palimpzest query...") + + # Pass policy as kwarg based on policy type + policy_kwargs = {} + if isinstance(policy, pz.MaxQuality): + policy_kwargs["max_quality"] = True + elif isinstance(policy, pz.MinCost): + policy_kwargs["min_cost"] = True + elif isinstance(policy, pz.MinTime): + policy_kwargs["min_time"] = True + + print(f"Policy kwargs: {policy_kwargs}") # Debug: show what we're passing + data_record_collection = grouped_reviews.run(config, **policy_kwargs) + + end_time = time.time() + print(f"Elapsed time: {end_time - start_time:.2f} seconds") + + # Convert results to DataFrame + results_df = data_record_collection.to_df() + print(f"\nResults shape: {results_df.shape}") + print("\nFirst 10 results:") + # print(results_df.head(10)) + + # Save results to CSV + output_path = script_dir / args.output + results_df.to_csv(output_path, index=False) + print(f"\nResults saved to: {output_path}") + + # Print execution statistics + if hasattr(data_record_collection, 'execution_stats'): + print("\nExecution Statistics:") + print(data_record_collection.execution_stats) + + +if __name__ == "__main__": + main() diff --git a/tests/semantic groupBy tests/movies/movies_2.py b/tests/semantic groupBy tests/movies/movies_2.py new file mode 100644 index 000000000..35038d705 --- /dev/null +++ b/tests/semantic groupBy tests/movies/movies_2.py @@ -0,0 +1,48 @@ +""" +Movies — Templated Query + +Query NL: "Group by director and genre, and count movies with directed by {director} in {genre}." +Categories: +- Adventure +- Action +- Comedy +- Mystery/Crime +- Fantasy +- Horror +- Romance +- Sci-fi + +group_cols: [Director, LLM("Genre", "reviewText")] +agg_cols: [] +semantic group: mixed (director name is literal, genre inferred from movie metadata) +semantic agg: no +""" + +import pandas as pd + +# Parameters for the templated query +DIRECTOR = "Christopher Nolan" +GENRE = "Science Fiction" + +df = pd.read_csv("movies_reviews.csv") +# assume columns: Director, Genre, reviewText, scoreSentiment, movieTitle + +# Filter by director and genre +filtered_df = df[ + (df["Director"] == DIRECTOR) & + (df["Genre"] == GENRE) +] + +# Group by Director and Genre, count the number of movies +result = ( + filtered_df + .groupby(["Director", "Genre"]) + .agg({"movieTitle": "count"}) + .reset_index() + .rename(columns={"movieTitle": "movie_count"}) +) + +result.to_csv("movies_2.csv", index=False) + +# TODO: Augment genre to the dataset +# TODO: join the datasets diff --git a/tests/semantic groupBy tests/movies/pz-programs/compare_query6_results.py b/tests/semantic groupBy tests/movies/pz-programs/compare_query6_results.py new file mode 100644 index 000000000..38c6d8fb9 --- /dev/null +++ b/tests/semantic groupBy tests/movies/pz-programs/compare_query6_results.py @@ -0,0 +1,224 @@ +#!/usr/bin/env python3 +""" +Compare Query 6 results from PZ with ground truth. +Generates a styled summary table image similar to the analysis summary_table.png. +""" + +import math +from pathlib import Path + +import matplotlib +matplotlib.use("Agg") +import matplotlib.pyplot as plt +import pandas as pd + + +# ─── Styling (matches analyze.py) ───────────────────────────────────────────── + +HEADER_COLOR = "#1E3A5F" +ROW_ALT_COLOR = "#F7F9FC" +ROW_LABEL_COLOR = "#E8EDF5" +EDGE_COLOR = "#CCCCCC" + + +def style_table(tbl, n_data_rows: int): + """Apply the shared header/row styling.""" + tbl.auto_set_font_size(False) + tbl.set_fontsize(9) + tbl.scale(1.2, 1.6) + for (r, c), cell in tbl.get_celld().items(): + if r == 0: + cell.set_facecolor(HEADER_COLOR) + cell.set_text_props(color="white", fontweight="bold") + elif c == -1: + cell.set_facecolor(ROW_LABEL_COLOR) + cell.set_text_props(fontweight="bold") + else: + cell.set_facecolor(ROW_ALT_COLOR if r % 2 == 0 else "white") + cell.set_edgecolor(EDGE_COLOR) + + +def make_stats_subtable(ax, comparison: pd.DataFrame): + """Left sub-table: summary statistics.""" + ax.axis("off") + + exact = (comparison["Difference"] < 1e-9).sum() + close = (comparison["Difference"] <= 0.1).sum() + n = len(comparison) + + rows = [ + ["Directors compared", str(n)], + ["Exact matches", f"{exact} ({100*exact/n:.1f}%)"], + ["Within ±0.1", f"{close} ({100*close/n:.1f}%)"], + ["Mean |difference|", f"{comparison['Difference'].mean():.4f}"], + ["Std deviation", f"{comparison['Difference'].std():.4f}"], + ["Min difference", f"{comparison['Difference'].min():.4f}"], + ["Max difference", f"{comparison['Difference'].max():.4f}"], + ] + + tbl = ax.table( + cellText=[[r[1]] for r in rows], + rowLabels=[r[0] for r in rows], + colLabels=["Value"], + cellLoc="center", + loc="center", + ) + style_table(tbl, len(rows)) + # ax.set_title("Summary Statistics", fontsize=11, fontweight="bold", pad=10) + + +def make_score_subtable(ax, comparison: pd.DataFrame): + """Middle sub-table: distribution of differences by bucket.""" + ax.axis("off") + + diff = comparison["Difference"] + buckets = [ + ("= 0.0", diff < 1e-9), + ("0.0 – 0.1", (diff >= 1e-9) & (diff <= 0.1)), + ("0.1 – 0.2", (diff > 0.1) & (diff <= 0.2)), + ("0.2 – 0.3", (diff > 0.2) & (diff <= 0.3)), + ("> 0.3", diff > 0.3), + ] + n = len(comparison) + cell_data = [[str(mask.sum()), f"{100*mask.sum()/n:.1f}%"] for _, mask in buckets] + row_labels = [label for label, _ in buckets] + + tbl = ax.table( + cellText=cell_data, + rowLabels=row_labels, + colLabels=["Count", "% of Total"], + cellLoc="center", + loc="center", + ) + style_table(tbl, len(buckets)) + # ax.set_title("Difference Distribution", fontsize=11, fontweight="bold", pad=10) + + +def make_all_directors_subtable(ax, comparison: pd.DataFrame): + """Full directors table showing all rows.""" + ax.axis("off") + + cell_data = [ + [row["Director"], f"{row['PZ Score']:.3f}", f"{row['Ground Truth Score']:.3f}", f"{row['Difference']:.3f}"] + for _, row in comparison.iterrows() + ] + + tbl = ax.table( + cellText=cell_data, + colLabels=["Director", "PZ", "GT", "Diff"], + cellLoc="center", + loc="center", + ) + tbl.auto_set_font_size(False) + tbl.set_fontsize(8) + tbl.scale(1.0, 1.4) + for (r, c), cell in tbl.get_celld().items(): + if r == 0: + cell.set_facecolor(HEADER_COLOR) + cell.set_text_props(color="white", fontweight="bold") + else: + cell.set_facecolor(ROW_ALT_COLOR if r % 2 == 0 else "white") + cell.set_edgecolor(EDGE_COLOR) + + # ax.set_title("All Directors", fontsize=11, fontweight="bold", pad=10) + + +def save_summary_figure(comparison_table: pd.DataFrame, output_path: Path): + plt.rcParams.update({ + "font.family": "sans-serif", + "font.size": 11, + "axes.spines.top": False, + "axes.spines.right": False, + }) + + n = len(comparison_table) + # Top section: stats + distribution side by side + # Bottom section: full directors table spanning full width + fig = plt.figure(figsize=(14, 4.5 + n * 0.28)) + fig.suptitle( + "Query 6 — PZ vs Ground Truth", + fontsize=13, fontweight="bold", + ) + + import matplotlib.gridspec as gridspec + gs = gridspec.GridSpec(2, 2, figure=fig, height_ratios=[1, n * 0.28 / 4.5], width_ratios=[1, 1]) + + ax_stats = fig.add_subplot(gs[0, 0]) + ax_dist = fig.add_subplot(gs[0, 1]) + ax_all = fig.add_subplot(gs[1, :]) + + make_stats_subtable(ax_stats, comparison_table) + make_score_subtable(ax_dist, comparison_table) + make_all_directors_subtable(ax_all, comparison_table) + + fig.tight_layout() + fig.savefig(output_path, dpi=150, bbox_inches="tight") + plt.close(fig) + print(f" Summary figure saved to: {output_path}") + + +# ─── Main ───────────────────────────────────────────────────────────────────── + +def main(): + script_dir = Path(__file__).parent + + pz_results = pd.read_csv(script_dir / "query6_pz_output.csv") + ground_truth = pd.read_csv(script_dir / "../queries/query6_ground_truth.csv") + + comparison = pz_results.merge( + ground_truth[["director", "normalizedScore"]], + on="director", + how="inner", + suffixes=("_pz", "_gt"), + ) + comparison["difference"] = abs(comparison["normalizedScore_pz"] - comparison["normalizedScore_gt"]) + + comparison_table = comparison[["director", "normalizedScore_pz", "normalizedScore_gt", "difference"]].copy() + comparison_table.columns = ["Director", "PZ Score", "Ground Truth Score", "Difference"] + + avg_difference = comparison_table["Difference"].mean() + variance_difference = comparison_table["Difference"].var() + std_difference = comparison_table["Difference"].std() + + print("\n" + "="*80) + print("QUERY 6 COMPARISON: PZ vs Ground Truth") + print("="*80 + "\n") + display_table = comparison_table.copy() + display_table["PZ Score"] = display_table["PZ Score"].map(lambda x: f"{x:.3f}") + display_table["Ground Truth Score"] = display_table["Ground Truth Score"].map(lambda x: f"{x:.3f}") + display_table["Difference"] = display_table["Difference"].map(lambda x: f"{x:.3f}") + print(display_table.to_string(index=False)) + print("\n" + "="*80) + print("STATISTICS") + print("="*80) + print(f"Number of directors compared: {len(comparison_table)}") + print(f"Average difference: {avg_difference:.4f}") + print(f"Variance of difference: {variance_difference:.4f}") + print(f"Standard deviation: {std_difference:.4f}") + print(f"Min difference: {comparison_table['Difference'].min():.4f}") + print(f"Max difference: {comparison_table['Difference'].max():.4f}") + print("="*80 + "\n") + + output_file = script_dir / "query6_comparison.csv" + comparison_table.to_csv(output_file, index=False) + + stats_file = script_dir / "query6_comparison_stats.txt" + with open(stats_file, "w") as f: + f.write("QUERY 6 COMPARISON STATISTICS\n") + f.write("="*50 + "\n") + f.write(f"Number of directors compared: {len(comparison_table)}\n") + f.write(f"Average difference: {avg_difference:.4f}\n") + f.write(f"Variance of difference: {variance_difference:.4f}\n") + f.write(f"Standard deviation: {std_difference:.4f}\n") + f.write(f"Min difference: {comparison_table['Difference'].min():.4f}\n") + f.write(f"Max difference: {comparison_table['Difference'].max():.4f}\n") + + print(f"Comparison table saved to: {output_file}") + print(f"Statistics saved to: {stats_file}") + + figure_file = script_dir / "query6_summary_table.png" + save_summary_figure(comparison_table, figure_file) + + +if __name__ == "__main__": + main() diff --git a/tests/semantic groupBy tests/movies/pz-programs/query_1_pz.py b/tests/semantic groupBy tests/movies/pz-programs/query_1_pz.py new file mode 100644 index 000000000..82ebaa146 --- /dev/null +++ b/tests/semantic groupBy tests/movies/pz-programs/query_1_pz.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 +""" +Query 1 — Sentiment by Publication (Palimpzest) + +Group by publicatioName and compute the fraction of positive reviews. + +Pipeline: + 1. sem_groupby – Semantically groups the records by `publicatioName` + (the LLM normalises slight variations in publication + names) and collects the scoreSentiment values into a + list per group. + 2. Post-processing – computes frac_positive from the collected lists. +""" + +import argparse +import json +import os +import sys +import time +from pathlib import Path + +import pandas as pd +from dotenv import load_dotenv + +# Add the src directory to the path +repo_root = Path(__file__).resolve().parents[4] +sys.path.insert(0, str(repo_root / "src")) + +import palimpzest as pz + +load_dotenv() + + +def compute_frac_positive(sentiments): + """Compute fraction of positive sentiments from a collected list.""" + num_pos = sum(1 for s in sentiments if s and str(s).upper() == "POSITIVE") + total = len(sentiments) + return num_pos / total if total > 0 else 0.0 + + +def main(): + parser = argparse.ArgumentParser(description="Query 1: Sentiment by Publication") + parser.add_argument("--verbose", default=False, action="store_true") + parser.add_argument("--policy", type=str, default="maxquality", + help="One of 'mincost', 'mintime', 'maxquality'") + parser.add_argument("--output", type=str, default="query1_pz_output.csv") + parser.add_argument("--stats-output", type=str, default=None, + help="Optional path to write execution stats JSON") + parser.add_argument( + "--execution-strategy", + type=str, + default="sequential", + help="One of 'sequential', 'pipelined', 'parallel'", + ) + args = parser.parse_args() + + # Set policy + policy_map = { + "mincost": pz.MinCost(), + "mintime": pz.MinTime(), + "maxquality": pz.MaxQuality() + } + policy = policy_map.get(args.policy, pz.MaxQuality()) + + # Load data + script_dir = Path(__file__).parent + csv_path = script_dir / "../movie_reviews.csv" + print(f"Loading reviews from: {csv_path}") + + csv_df = pd.read_csv(csv_path).head(500) + print(f"Loaded {len(csv_df)} reviews") + + # ── Ingest the DataFrame ───────────────────────────────────────── + # MemoryDataset automatically creates a schema from the DataFrame. + # The CSV already contains: publicatioName, reviewText, + # scoreSentiment, etc. + reviews = pz.MemoryDataset(id="reviews", vals=csv_df) + + # ── sem_groupby – semantically group by publication name ───────── + # The LLM normalises publication names (e.g. "NY Times" vs + # "The New York Times") and groups the records accordingly. + # We collect the existing scoreSentiment values into a list per + # group so we can compute the fraction of positive reviews. + grouped = reviews.sem_groupby( + gby_fields=["publicatioName"], + agg_fields=["scoreSentiment"], + agg_funcs=["list"], + ) + + # ── Execute ─────────────────────────────────────────────────────── + start_time = time.time() + config = pz.QueryProcessorConfig( + policy=policy, + verbose=args.verbose, + execution_strategy=args.execution_strategy, + ) + data_record_collection = grouped.run(config) + exec_time = time.time() - start_time + + # ── Post-process – compute frac_positive per group ──────────────── + result_df = pd.DataFrame([ + { + "publicatioName": r.publicatioName, + "frac_positive": compute_frac_positive( + getattr(r, "scoreSentiment", []) or [] + ), + } + for r in data_record_collection + ]) + os.makedirs(os.path.dirname(args.output) or ".", exist_ok=True) + result_df.to_csv(args.output, index=False) + + if args.stats_output is not None: + os.makedirs(os.path.dirname(args.stats_output) or ".", exist_ok=True) + with open(args.stats_output, "w") as f: + json.dump(data_record_collection.execution_stats.to_json(), f, indent=2) + + print(f"\nExecution time: {exec_time:.2f}s") + print(f"Results saved to: {args.output}") + if args.stats_output is not None: + print(f"Execution stats saved to: {args.stats_output}") + print(f"Generated {len(result_df)} publication groups") + + +if __name__ == "__main__": + main() diff --git a/tests/semantic groupBy tests/movies/pz-programs/query_2_pz.py b/tests/semantic groupBy tests/movies/pz-programs/query_2_pz.py new file mode 100644 index 000000000..c8ae1dccb --- /dev/null +++ b/tests/semantic groupBy tests/movies/pz-programs/query_2_pz.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 +""" +Query 2 — Critic Volume by Inferred Era (Palimpzest) + +Group reviews by movie era (pre-2000, 2000s, 2010s, 2020s) and count reviews. +The LLM semantically infers the era from the releaseDateTheaters column. + +Pipeline: + 1. Join movie_reviews with movies to get releaseDateTheaters. + 2. sem_groupby – LLM reads releaseDateTheaters and groups into era buckets; + counts reviewId per group. +""" + +import argparse +import json +import os +import sys +import time +from pathlib import Path + +import pandas as pd +from dotenv import load_dotenv + +repo_root = Path(__file__).resolve().parents[4] +sys.path.insert(0, str(repo_root / "src")) + +import palimpzest as pz + +load_dotenv() + + +def main(): + parser = argparse.ArgumentParser(description="Reviews can be categorized into pre-2000, 2000s, 2010s, 2020s, or unknown. Return which era category the review falls into") + parser.add_argument("--verbose", default=False, action="store_true") + parser.add_argument("--policy", type=str, default="maxquality") + parser.add_argument("--output", type=str, default="query2_pz_output.csv") + parser.add_argument("--stats-output", type=str, default=None, + help="Optional path to write execution stats JSON") + parser.add_argument( + "--execution-strategy", type=str, default="sequential", + help="One of 'sequential', 'pipelined', 'parallel'", + ) + args = parser.parse_args() + + policy_map = { + "mincost": pz.MinCost(), + "mintime": pz.MinTime(), + "maxquality": pz.MaxQuality(), + } + policy = policy_map.get(args.policy, pz.MaxQuality()) + + script_dir = Path(__file__).parent + + # Load and join data + reviews_df = pd.read_csv(script_dir / "../movie_reviews.csv").head(500) + movies_df = pd.read_csv(script_dir / "../movies.csv")[["id", "releaseDateTheaters"]] + merged_df = reviews_df.merge(movies_df, on="id", how="left") + print(f"Loaded {len(merged_df)} reviews") + + reviews = pz.MemoryDataset(id="reviews", vals=merged_df) + + # sem_groupby: LLM infers era from releaseDateTheaters, count reviewId per era + grouped = reviews.sem_groupby( + gby_fields=[ + { + "name": "releaseDateTheaters", + "type": str, + "desc": "Reviews can be categorized into pre-2000, 2000s, 2010s, 2020s, or unknown. Return which era category the review falls into)", + } + ], + agg_fields=[ + { + "name": "reviewId", + "type": int, + "desc": "Identifier of the review", + } + ], + agg_funcs=["count"], + ) + + # Execute + start_time = time.time() + config = pz.QueryProcessorConfig( + policy=policy, + verbose=args.verbose, + execution_strategy="sequential", + available_models=[pz.Model.GPT_5], + ) + data_record_collection = grouped.run(config) + exec_time = time.time() - start_time + + # Post-process: rename the semantic group key to "era" + result_df = pd.DataFrame([ + { + "era": r.releaseDateTheaters, + "review_count": r.reviewId, + } + for r in data_record_collection + ]) + os.makedirs(os.path.dirname(args.output) or ".", exist_ok=True) + result_df.to_csv(args.output, index=False) + + if args.stats_output is not None: + os.makedirs(os.path.dirname(args.stats_output) or ".", exist_ok=True) + with open(args.stats_output, "w") as f: + json.dump(data_record_collection.execution_stats.to_json(), f, indent=2) + + print(f"\nExecution time: {exec_time:.2f}s") + print(f"Results saved to: {args.output}") + if args.stats_output is not None: + print(f"Execution stats saved to: {args.stats_output}") + print(f"Generated {len(result_df)} era groups") + + +if __name__ == "__main__": + main() diff --git a/tests/semantic groupBy tests/movies/pz-programs/query_3_pz.py b/tests/semantic groupBy tests/movies/pz-programs/query_3_pz.py new file mode 100644 index 000000000..23af681ab --- /dev/null +++ b/tests/semantic groupBy tests/movies/pz-programs/query_3_pz.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python3 +""" +Query 3 — Fraction Positive per Audience Type (Palimpzest) + +For a specific director, group reviews by MPAA-inferred audience type +and compute fraction positive. + +Pipeline: + 1. Join movie_reviews with movies filtered by director to get rating. + 2. sem_groupby – LLM semantically normalises the MPAA rating into + audience-type buckets (Children, Teen, Adult, Unrated); lists + scoreSentiment per group. + 3. Post-process list for frac_positive. +""" + +import argparse +import json +import os +import sys +import time +from pathlib import Path + +import pandas as pd +from dotenv import load_dotenv + +repo_root = Path(__file__).resolve().parents[4] +sys.path.insert(0, str(repo_root / "src")) + +import palimpzest as pz + +load_dotenv() + + +def main(): + parser = argparse.ArgumentParser(description="Query 3: Sentiment by Audience Type") + parser.add_argument("--director", type=str, default="Christopher Nolan", + help="Director name to filter by") + parser.add_argument("--verbose", default=False, action="store_true") + parser.add_argument("--policy", type=str, default="maxquality") + parser.add_argument("--output", type=str, default="query3_pz_output.csv") + parser.add_argument("--stats-output", type=str, default=None, + help="Optional path to write execution stats JSON") + parser.add_argument( + "--execution-strategy", type=str, default="sequential", + help="One of 'sequential', 'pipelined', 'parallel'", + ) + args = parser.parse_args() + + policy_map = { + "mincost": pz.MinCost(), + "mintime": pz.MinTime(), + "maxquality": pz.MaxQuality(), + } + policy = policy_map.get(args.policy, pz.MaxQuality()) + + script_dir = Path(__file__).parent + + # Load and filter data + reviews_df = pd.read_csv(script_dir / "../movie_reviews.csv") + movies_df = pd.read_csv(script_dir / "../movies.csv") + + # Filter for director's movies and keep the rating column + director_movies = movies_df[ + movies_df["director"].str.contains(args.director, na=False, case=False) + ][["id", "rating"]] + + merged_df = reviews_df.merge(director_movies, on="id", how="inner") + print(f"Loaded {len(merged_df)} reviews for {args.director}") + + reviews = pz.MemoryDataset(id="reviews", vals=merged_df) + + # sem_groupby: LLM maps MPAA rating → audience type bucket, list scoreSentiment + grouped = reviews.sem_groupby( + gby_fields=[ + { + "name": "rating", + "type": str, + "desc": "MPAA rating string (e.g., 'Adult', 'Teen', 'Children', 'Unrated')", + } + ], + agg_fields=[ + { + "name": "scoreSentiment", + "type": str, + "desc": "Sentiment label for the review (e.g., 'POSITIVE'/'NEGATIVE')", + } + ], + agg_funcs=["list"], + ) + + # Execute + start_time = time.time() + config = pz.QueryProcessorConfig( + policy=policy, + verbose=args.verbose, + execution_strategy=args.execution_strategy, + ) + data_record_collection = grouped.run(config) + exec_time = time.time() - start_time + + # Post-process: compute frac_positive from the sentiment lists + result_df = pd.DataFrame([ + { + "audienceType": r.rating, + "frac_positive": ( + sum(1 for s in r.scoreSentiment if str(s).upper() == "POSITIVE") + / len(r.scoreSentiment) + if len(r.scoreSentiment) > 0 + else 0.0 + ), + "review_count": len(r.scoreSentiment), + "director": args.director, + } + for r in data_record_collection + ]) + os.makedirs(os.path.dirname(args.output) or ".", exist_ok=True) + result_df.to_csv(args.output, index=False) + + if args.stats_output is not None: + os.makedirs(os.path.dirname(args.stats_output) or ".", exist_ok=True) + with open(args.stats_output, "w") as f: + json.dump(data_record_collection.execution_stats.to_json(), f, indent=2) + + print(f"\nExecution time: {exec_time:.2f}s") + print(f"Results saved to: {args.output}") + if args.stats_output is not None: + print(f"Execution stats saved to: {args.stats_output}") + print(f"Generated {len(result_df)} audience type groups") + + +if __name__ == "__main__": + main() diff --git a/tests/semantic groupBy tests/movies/pz-programs/query_4_pz.py b/tests/semantic groupBy tests/movies/pz-programs/query_4_pz.py new file mode 100644 index 000000000..3d345b90a --- /dev/null +++ b/tests/semantic groupBy tests/movies/pz-programs/query_4_pz.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python3 +""" +Query 4 — Sentiment and Top Critic Bias by Genre (Palimpzest) + +Hard query: genre must be inferred from review text itself (not available +in reviews table). Both group key and aggregation value are semantic. + +Pipeline: + 1. Load movie_reviews. + 2. sem_groupby – LLM infers primaryGenre from reviewText and groups by + [primaryGenre, isTopCritic]; lists scoreSentiment per group. + 3. Post-process list → frac_positive. +""" + +import argparse +import json +import os +import sys +import time +from pathlib import Path + +import pandas as pd +from dotenv import load_dotenv + +repo_root = Path(__file__).resolve().parents[4] +sys.path.insert(0, str(repo_root / "src")) + +import palimpzest as pz + +load_dotenv() + + +def main(): + parser = argparse.ArgumentParser(description="Query 4: Sentiment by Inferred Genre") + parser.add_argument("--verbose", default=False, action="store_true") + parser.add_argument("--policy", type=str, default="maxquality") + parser.add_argument("--output", type=str, default="query4_pz_output.csv") + parser.add_argument("--stats-output", type=str, default=None, + help="Optional path to write execution stats JSON") + parser.add_argument( + "--execution-strategy", type=str, default="sequential", + help="One of 'sequential', 'pipelined', 'parallel'", + ) + args = parser.parse_args() + + policy_map = { + "mincost": pz.MinCost(), + "mintime": pz.MinTime(), + "maxquality": pz.MaxQuality(), + } + policy = policy_map.get(args.policy, pz.MaxQuality()) + + script_dir = Path(__file__).parent + + reviews_df = pd.read_csv(script_dir / "../movie_reviews.csv").head(500) + print(f"Loaded {len(reviews_df)} reviews") + + reviews = pz.MemoryDataset(id="reviews", vals=reviews_df) + + # Hierarchical semantic groupby: + # Level 0 — infer primary movie genre from reviewText (constrained to 11 values) + # Level 1 — group by the existing isTopCritic boolean field + groupby_fields = [ + [ + { + "name": "reviewText", + "type": str, + "desc": ( + "The primary genre of the movie being reviewed, inferred from the review text. " + "Must be exactly one of these values — no other labels are allowed: " + "'Action', 'Adventure', 'Comedy', 'Crime', 'Documentary', " + "'Drama', 'History', 'Mystery & thriller', 'Romance', 'Sci-fi', 'War'." + ), + } + ], + [ + { + "name": "isTopCritic", + "type": str, + "desc": ( + "Whether the reviewer is a top critic. " + "Use the existing isTopCritic field value directly — " + "True maps to 'Top Critic', False maps to 'Not Top Critic'. " + "Do not use any other labels." + ), + } + ], + ] + agg_fields = [ + [{"name": "scoreSentiment", "type": str, "desc": "Sentiment label for the review"}], + [{"name": "scoreSentiment", "type": str, "desc": "Sentiment label for the review"}], + ] + agg_funcs = [ + ["list"], + ["list"] + ] + + start_time = time.time() + # hierarchical_sem_groupby now returns (nested_result, accumulated_gen_stats) + nested_result, gen_stats = reviews.hierarchical_sem_groupby( + groupby_fields=groupby_fields, + agg_fields=agg_fields, + agg_funcs=agg_funcs + ) + exec_time = time.time() - start_time + + # Flatten nested results and compute frac_positive + rows = [] + for genre_key, inner_result in nested_result.items(): + genre = genre_key[0] if isinstance(genre_key, tuple) else genre_key + for r in inner_result.data_records: + # Normalize LLM string → boolean to match the GT's isTopCritic format + raw_itc = str(r.isTopCritic).strip().lower() + is_top_critic = raw_itc in ("top critic", "true", "yes", "1") + sentiments = r.scoreSentiment + frac_pos = ( + sum(1 for s in sentiments if str(s).upper() == "POSITIVE") / len(sentiments) + if len(sentiments) > 0 else 0.0 + ) + rows.append({ + "primaryGenre": genre, + "isTopCritic": is_top_critic, + "frac_positive": frac_pos, + "review_count": len(sentiments) + }) + result_df = pd.DataFrame(rows) + os.makedirs(os.path.dirname(args.output) or ".", exist_ok=True) + result_df.to_csv(args.output, index=False) + + if args.stats_output is not None: + os.makedirs(os.path.dirname(args.stats_output) or ".", exist_ok=True) + total_cost = gen_stats.total_input_cost + gen_stats.total_output_cost + total_tokens = int(gen_stats.total_input_tokens + gen_stats.total_output_tokens) + stats = { + "total_execution_time": exec_time, + "total_execution_cost": total_cost, + "total_tokens": total_tokens, + "optimization_time": 0.0, + "plan_execution_time": exec_time, + } + with open(args.stats_output, "w") as f: + json.dump(stats, f, indent=2) + + print(f"\nExecution time: {exec_time:.2f}s") + print(f"Total cost: ${gen_stats.total_input_cost + gen_stats.total_output_cost:.4f}") + print(f"Total tokens: {int(gen_stats.total_input_tokens + gen_stats.total_output_tokens):,}") + print(f"Results saved to: {args.output}") + if args.stats_output is not None: + print(f"Execution stats saved to: {args.stats_output}") + print(f"Generated {len(result_df)} genre-topcritic groups") + + +if __name__ == "__main__": + main() diff --git a/tests/semantic groupBy tests/movies/pz-programs/query_5_pz.py b/tests/semantic groupBy tests/movies/pz-programs/query_5_pz.py new file mode 100644 index 000000000..8f86b6e52 --- /dev/null +++ b/tests/semantic groupBy tests/movies/pz-programs/query_5_pz.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python3 +""" +Query 5 — Emotional Tone by Director and Genre (Palimpzest) + +Finer-grained emotional tone classification beyond binary sentiment. + +Pipeline: + 1. Join movie_reviews with movies filtered by director + genre. + 2. sem_groupby – LLM reads reviewText and groups by emotional tone + (Enthusiastic, Measured, Disappointed); counts reviewId per group. +""" + +import argparse +import json +import os +import sys +import time +from pathlib import Path + +import pandas as pd +from dotenv import load_dotenv + +repo_root = Path(__file__).resolve().parents[4] +sys.path.insert(0, str(repo_root / "src")) + +import palimpzest as pz + +load_dotenv() + + +def main(): + parser = argparse.ArgumentParser(description="Query 5: Emotional Tone by Director and Genre") + parser.add_argument("--director", type=str, default="Steven Spielberg") + parser.add_argument("--genre", type=str, default="Adventure") + parser.add_argument("--verbose", default=False, action="store_true") + parser.add_argument("--policy", type=str, default="maxquality") + parser.add_argument("--output", type=str, default="query5_pz_output.csv") + parser.add_argument("--stats-output", type=str, default=None, + help="Optional path to write execution stats JSON") + parser.add_argument( + "--execution-strategy", type=str, default="sequential", + help="One of 'sequential', 'pipelined', 'parallel'", + ) + args = parser.parse_args() + + policy_map = { + "mincost": pz.MinCost(), + "mintime": pz.MinTime(), + "maxquality": pz.MaxQuality(), + } + policy = policy_map.get(args.policy, pz.MaxQuality()) + + script_dir = Path(__file__).parent + + # Load and filter data + reviews_df = pd.read_csv(script_dir / "../movie_reviews.csv") + movies_df = pd.read_csv(script_dir / "../movies.csv") + + filtered_movies = movies_df[ + movies_df["director"].str.contains(args.director, na=False, case=False) + & movies_df["genre"].str.contains(args.genre, na=False, case=False) + ][["id"]] + + merged_df = reviews_df.merge(filtered_movies, on="id", how="inner") + print(f"Loaded {len(merged_df)} reviews for {args.director} in {args.genre}") + + reviews = pz.MemoryDataset(id="reviews", vals=merged_df) + + # sem_groupby: LLM reads reviewText and groups by emotional tone, count reviewId + grouped = reviews.sem_groupby( + gby_fields=[ + { + "name": "reviewText", + "type": str, + "desc": ( + "The emotional tone of the review. " + "Must be exactly one of these three values — no other labels are allowed: " + "'Enthusiastic', " + "'Measured', " + "'Disappointed'." + ), + } + ], + agg_fields=[ + { + "name": "reviewId", + "type": int, + "desc": "Identifier of the review", + } + ], + agg_funcs=["count"], + ) + + # Execute + start_time = time.time() + config = pz.QueryProcessorConfig( + policy=policy, + verbose=args.verbose, + execution_strategy=args.execution_strategy, + ) + data_record_collection = grouped.run(config) + exec_time = time.time() - start_time + + # Post-process: rename the semantic group key to "emotionalTone" + result_df = pd.DataFrame([ + { + "emotionalTone": r.reviewText, + "review_count": r.reviewId, + "director": args.director, + "genre": args.genre, + } + for r in data_record_collection + ]) + os.makedirs(os.path.dirname(args.output) or ".", exist_ok=True) + result_df.to_csv(args.output, index=False) + + if args.stats_output is not None: + os.makedirs(os.path.dirname(args.stats_output) or ".", exist_ok=True) + with open(args.stats_output, "w") as f: + json.dump(data_record_collection.execution_stats.to_json(), f, indent=2) + + print(f"\nExecution time: {exec_time:.2f}s") + print(f"Results saved to: {args.output}") + if args.stats_output is not None: + print(f"Execution stats saved to: {args.stats_output}") + print(f"Generated {len(result_df)} tone groups") + + +if __name__ == "__main__": + main() diff --git a/tests/semantic groupBy tests/movies/pz-programs/query_6_pz.py b/tests/semantic groupBy tests/movies/pz-programs/query_6_pz.py new file mode 100644 index 000000000..5b1c4e8e1 --- /dev/null +++ b/tests/semantic groupBy tests/movies/pz-programs/query_6_pz.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python3 +""" +Query 6 — Most Positive Review by Director (Palimpzest) + +Pipeline: + 1. Join movie_reviews with movies to get director per review. + 2. Drop records with missing or unparseable originalScore; normalise to [0, 1]. + 3. Python groupby("director") — exact, non-semantic. + 4. For each director group: sem_map to score each review's positivity. + 5. Find the review with the highest positivity score. + +Comparison metric: |ground_truth_normalizedScore − pz_normalizedScore| per director. +""" + +import argparse +import json +import os +import sys +import time +from pathlib import Path + +import pandas as pd +from dotenv import load_dotenv + +repo_root = Path(__file__).resolve().parents[4] +sys.path.insert(0, str(repo_root / "src")) + +import palimpzest as pz + +load_dotenv() + + +def parse_score(score_str) -> float | None: + """Parse "3.5/4", "4/5", etc. into a float in [0, 1]. Returns None if unparseable.""" + if pd.isna(score_str) or str(score_str).strip() == "": + return None + parts = str(score_str).strip().split("/") + if len(parts) == 2: + try: + num, den = float(parts[0]), float(parts[1]) + return num / den if den != 0 else None + except ValueError: + return None + return None + + +def main(): + parser = argparse.ArgumentParser(description="Query 6: Most Positive Review by Director") + parser.add_argument("--verbose", default=False, action="store_true") + parser.add_argument("--policy", type=str, default="maxquality", + help="One of 'mincost', 'mintime', 'maxquality'") + parser.add_argument("--output", type=str, default="query6_pz_output.csv") + parser.add_argument("--stats-output", type=str, default=None, + help="Optional path to write execution stats JSON") + parser.add_argument( + "--execution-strategy", type=str, default="sequential", + help="One of 'sequential', 'pipelined', 'parallel'", + ) + args = parser.parse_args() + + policy_map = { + "mincost": pz.MinCost(), + "mintime": pz.MinTime(), + "maxquality": pz.MaxQuality(), + } + policy = policy_map.get(args.policy, pz.MaxQuality()) + + script_dir = Path(__file__).parent + + # ── Load and prepare data ───────────────────────────────────────── + reviews_df = pd.read_csv(script_dir / "../movie_reviews.csv") + movies_df = pd.read_csv(script_dir / "../movies.csv")[["id", "director"]] + + merged_df = reviews_df.merge(movies_df, on="id", how="left") + merged_df = merged_df.dropna(subset=["originalScore"]) + merged_df = merged_df[merged_df["originalScore"].str.strip() != ""] + merged_df["normalizedScore"] = merged_df["originalScore"].apply(parse_score) + merged_df = merged_df.dropna(subset=["normalizedScore", "director"]) + + directors = merged_df["director"].unique() + print(f"Loaded {len(merged_df)} reviews across {len(directors)} directors") + + # ── Non-semantic groupby + sem_agg per director ─────────────────── + rows = [] + # Accumulated execution stats across all sem_agg calls + acc_input_tokens = 0 + acc_output_tokens = 0 + acc_exec_cost = 0.0 + acc_opt_time = 0.0 + acc_plan_time = 0.0 + + wall_start = time.time() + + count = 0 + for director, group_df in merged_df.groupby("director"): + if count >= 40: + break + + count += 1 + # Keep the full group_df for lookup later + full_group_df = group_df[["reviewText", "normalizedScore"]].reset_index(drop=True) + + # Build a PZ dataset with reviewText and normalizedScore + ds = pz.MemoryDataset(id="reviews", vals=full_group_df) + + # Use sem_map to score each review's positivity (0-10 scale) + scored_ds = ds.sem_map( + cols=[{ + "name": "positivityScore", + "type": float, + "desc": "A score from 0 to 10 indicating how positive this review is, where 10 is extremely positive and 0 is very negative.", + }], + depends_on="reviewText", + ) + + # Create fresh config for each director group + config = pz.QueryProcessorConfig( + policy=policy, + verbose=args.verbose, + execution_strategy=args.execution_strategy, + ) + + result_collection = scored_ds.run(config) + + # Find the review with the highest positivity score + max_score_idx = -1 + max_positivity = -1 + scored_reviews = [] + for idx, r in enumerate(result_collection): + scored_reviews.append(r) + if r.positivityScore > max_positivity: + max_positivity = r.positivityScore + max_score_idx = idx + + # Get the most positive review + most_positive = None + norm_score = None + if max_score_idx >= 0: + best_review = scored_reviews[max_score_idx] + most_positive = best_review.reviewText + norm_score = float(best_review.normalizedScore) + + rows.append({ + "director": director, + "mostPositiveReview": most_positive, + "normalizedScore": norm_score, + }) + + # Accumulate execution stats from this director's run + es = result_collection.execution_stats + acc_input_tokens += es.total_input_tokens + acc_output_tokens += es.total_output_tokens + acc_exec_cost += es.total_execution_cost + acc_opt_time += es.optimization_time + acc_plan_time += es.plan_execution_time + + wall_time = time.time() - wall_start + + # ── Save results ────────────────────────────────────────────────── + result_df = pd.DataFrame(rows).sort_values("director").reset_index(drop=True) + os.makedirs(os.path.dirname(args.output) or ".", exist_ok=True) + result_df.to_csv(args.output, index=False) + + # ── Save execution stats ────────────────────────────────────────── + if args.stats_output is not None: + stats = { + "total_execution_time": wall_time, + "total_optimization_time": acc_opt_time, + "plan_execution_time": acc_plan_time, + "total_input_tokens": acc_input_tokens, + "total_output_tokens": acc_output_tokens, + "total_tokens": acc_input_tokens + acc_output_tokens, + "total_execution_cost": acc_exec_cost, + "num_directors": len(rows), + } + os.makedirs(os.path.dirname(args.stats_output) or ".", exist_ok=True) + with open(args.stats_output, "w") as f: + json.dump(stats, f, indent=2) + + print(f"\nExecution time: {wall_time:.2f}s") + print(f"Total tokens: {acc_input_tokens + acc_output_tokens:,}") + print(f"Total cost: ${acc_exec_cost:.4f}") + print(f"Results saved to: {args.output}") + if args.stats_output is not None: + print(f"Execution stats saved to: {args.stats_output}") + print(f"Generated {len(result_df)} director groups") + + +if __name__ == "__main__": + main() diff --git a/tests/semantic groupBy tests/movies/queries/query1_ground_truth.csv b/tests/semantic groupBy tests/movies/queries/query1_ground_truth.csv new file mode 100644 index 000000000..2bfea922f --- /dev/null +++ b/tests/semantic groupBy tests/movies/queries/query1_ground_truth.csv @@ -0,0 +1,271 @@ +publicatioName,frac_positive +3AW,1.0 +48 Hills,0.0 +ABC News Radio,1.0 +ABC Radio (Australia),0.5 +AIPT,1.0 +AV Club,0.8 +Antagony & Ecstasy,1.0 +Apollo Guide,0.3333333333333333 +Arkansas Democrat-Gazette,1.0 +Asian Movie Pulse,1.0 +Associated Press,1.0 +Atlantic City Weekly,1.0 +Austin Chronicle,0.6 +AwardsCircuit.com,0.0 +BBC.com,0.5 +Baret News,1.0 +Beach Reporter (Southern California),1.0 +BlackFilm.com,0.0 +Bleeding Cool,1.0 +Blu-ray.com,0.5 +Boston Globe,0.0 +Boston Herald,0.0 +Boulder Weekly,1.0 +Bowling Green Daily News,1.0 +Boxoffice Magazine,0.0 +BrianOrndorf.com,0.0 +Bust Magazine,1.0 +But Why Tho? A Geek Community,1.0 +CBR,1.0 +CNN.com,1.0 +"Capital Times (Madison, WI)",1.0 +Chicago Reader,1.0 +Chicago Sun-Times,0.5 +Chicago Tribune,1.0 +Cinapse,1.0 +CinePassion,1.0 +Cinema Crazed,1.0 +Cinema Signals,1.0 +Cinemalogue,0.0 +Cinemanía (Spain),0.5 +Clarín,1.0 +Cleveland Press,0.0 +Close Up,1.0 +Combustible Celluloid,1.0 +"Commercial Appeal (Memphis, TN)",1.0 +Common Sense Media,1.0 +Compuserve,1.0 +Consequence,0.0 +Contactmusic.com,1.0 +DCist,1.0 +DVDTalk.com,0.0 +Daily Express (UK),0.0 +Daily Star (UK),1.0 +Daily Telegraph (UK),0.0 +Dennis Schwartz Movie Reviews,0.75 +Deseret News (Salt Lake City),0.6666666666666666 +Digital Spy,1.0 +Dread Central,0.0 +El Mundo (Spain),0.0 +El Pais (Spain),1.0 +El antepenúltimo mohicano,0.0 +EmanuelLevy.Com,0.8 +Empire Magazine,0.6666666666666666 +Entertainment Weekly,1.0 +Epoch Times,1.0 +Espinof,1.0 +Esquire Magazine,0.0 +Eye for Film,1.0 +"F5 (Wichita, KS)",1.0 +FILMINK (Australia),1.0 +Film Blather,1.0 +Film Comment Magazine,1.0 +Film Freak Central,1.0 +Film Frenzy,1.0 +Film Inquiry,1.0 +Film Journal International,0.3333333333333333 +Film Threat,0.7142857142857143 +Film4,0.75 +Filmcritic.com,0.25 +Filmfare,1.0 +Filmmaker Magazine,1.0 +Financial Times,0.5 +Flick Filosopher,1.0 +Floating World,1.0 +Fotogramas,1.0 +Fresh Fiction,1.0 +Future Movies UK,0.0 +GeekNation,0.0 +Globe and Mail,0.6666666666666666 +Gone With The Twins,1.0 +Grantland,0.0 +Groucho Reviews,1.0 +Guardian,0.5 +HanCinema,1.0 +Herald Sun (Australia),1.0 +"HeraldNet (Everett, WA)",1.0 +HeyUGuys,1.0 +Hindustan Times,0.0 +Hollywood Reporter,0.36363636363636365 +Houston Chronicle,1.0 +Houston Press,1.0 +IONCINEMA.com,0.5 +In Film Australia,1.0 +In Review Online,1.0 +Independent (UK),0.5 +Independent Online (South Africa),1.0 +Internet Reviews,1.0 +Irish Times,1.0 +Japan Times,0.5 +JoBlo's Movie Network,0.0 +"Journal and Courier (Lafayette, IN)",1.0 +KPBS.org,1.0 +Kalamazoo Gazette,1.0 +Kansas City Kansan,1.0 +Keith & the Movies,1.0 +Killer Movie Reviews,1.0 +L.A. Weekly,1.0 +La Movie Boeuf,1.0 +La Nación (Argentina),1.0 +"Lagniappe (Mobile, AL)",0.0 +Las Vegas Mercury,1.0 +Las Vegas Review-Journal,1.0 +Lawrence.com,1.0 +Lessons of Darkness,1.0 +Little White Lies,0.3333333333333333 +Livemint,0.5 +Los Angeles Free Press,0.0 +Los Angeles Times,0.5 +Lybarger Links,1.0 +Maclean's Magazine,1.0 +Manhattan Movie Magazine,0.0 +Mark Reviews Movies,0.0 +Matt's Movie Reviews,1.0 +"Mountain Xpress (Asheville, NC)",1.0 +Movie Bitches,1.0 +Movie Chambers,1.0 +Movie Dearest,1.0 +Movie Metropolis,1.0 +Movie Mom,1.0 +Movie Nation,0.3333333333333333 +Movie Talk,1.0 +MovieMartyr.com,1.0 +Movieline,1.0 +NME,1.0 +NOW Toronto,0.5 +NPR,1.0 +NYC Movie Guru,0.6666666666666666 +National Post,0.0 +New York Magazine/Vulture,1.0 +New York Times,0.7692307692307693 +New Yorker,0.75 +Newark Star-Ledger,1.0 +Newcity,1.0 +Newhouse News Service,0.0 +Nolan's Pop Culture Review,1.0 +North Shore Movies,1.0 +"Northwest Herald (Crystal Lake, IL)",1.0 +Observer (UK),1.0 +Offoffoff,1.0 +Old School Reviews,1.0 +One Room With A View,1.0 +Oregonian,1.0 +Outlook,0.0 +Paste Magazine,1.0 +Philadelphia Inquirer,0.0 +Planet S Magazine,0.0 +Projected Figures,1.0 +Q Network Film Desk,1.0 +"Quad City Times (Davenport, IA)",0.0 +Radio Times,0.6666666666666666 +Rediff.com,0.0 +Reel Film Reviews,0.3333333333333333 +ReelTalk Movie Reviews,1.0 +Reeling Reviews,0.0 +Remezcla,1.0 +Reuters,1.0 +Richard Crouse,1.0 +RogerEbert.com,0.3333333333333333 +SSG Syndicate,1.0 +Sacramento News & Review,1.0 +Salt Lake Tribune,1.0 +San Francisco Chronicle,1.0 +San Francisco Examiner,1.0 +Scotsman,0.6666666666666666 +Screen International,0.6666666666666666 +Screen It!,1.0 +Screen Rant,1.0 +Screen-Space,0.0 +ScreenAnarchy,1.0 +Screenwize,1.0 +Scroll.in,1.0 +Seanax.com,1.0 +Seattle Film Blog,1.0 +Seattle Times,0.5 +Shadows on the Wall,1.0 +Showbiz Junkies,1.0 +Sight & Sound,0.0 +Sky Cinema,0.0 +Slant Magazine,0.8 +Spectrum Culture,0.0 +Spirituality & Practice,1.0 +Sunday Times (UK),0.5 +TAKE ONE Magazine,1.0 +THN,0.0 +TIME Magazine,1.0 +TV Guide,1.0 +Talking Pictures (U.S.),1.0 +The Age (Australia),0.5 +The Australian,1.0 +The Baffler,1.0 +The Daily Dot,1.0 +The Daily Review/Crikey,1.0 +The Daily Times (Tennessee),1.0 +The Dissolve,1.0 +The Film Experience,0.0 +The Film Stage,0.6666666666666666 +The Indian Express,0.0 +The List,1.0 +The MacGuffin,1.0 +"The Monitor (McAllen, TX)",1.0 +The Nation,0.0 +The National (UAE),1.0 +The New Beverly,1.0 +The Pink Lens,0.0 +The Playlist,0.6666666666666666 +The Retro Set,1.0 +The Skinny,1.0 +The Spectator,1.0 +The Spool,1.0 +"The Stranger (Seattle, WA)",1.0 +The Sun (UK),1.0 +The Times of India,1.0 +The Upcoming,0.0 +The Victoria Advocate,1.0 +The Virginian-Pilot,1.0 +The Young Folks,1.0 +TheIndependentCritic.com,1.0 +TheWrap,0.5 +This is Film,1.0 +Tilt Magazine,0.0 +Time Out,0.5555555555555556 +Times (UK),0.6666666666666666 +Times-Picayune,0.0 +Toronto Star,0.6666666666666666 +Total Film,0.0 +Trespass,1.0 +Tri-City Herald,1.0 +Tyler Morning Telegraph (Texas),1.0 +Under the Radar,0.0 +Urban Tulsa Weekly,1.0 +Vanity Fair,0.0 +Variety,0.7692307692307693 +ViewLondon,0.0 +Village Voice,1.0 +Vogue,1.0 +Vox,1.0 +"WBGR-FM (93.7 FM - Monroe, WI)",1.0 +WORLD,1.0 +Washington Post,0.6666666666666666 +Way Too Indie,1.0 +We Got This Covered,1.0 +Willamette Week,0.0 +Winnipeg Free Press,0.0 +eFilmCritic.com,0.5 +easternKicks.com,1.0 +film-authority.com,1.0 +jackiekcooper.com,1.0 +rachelsreviews.net,0.0 +rec.arts.movies.reviews,0.5 +sbs.com.au,1.0 diff --git a/tests/semantic groupBy tests/movies/queries/query2_ground_truth.csv b/tests/semantic groupBy tests/movies/queries/query2_ground_truth.csv new file mode 100644 index 000000000..7d351b22c --- /dev/null +++ b/tests/semantic groupBy tests/movies/queries/query2_ground_truth.csv @@ -0,0 +1,6 @@ +era,review_count +2000s,24 +2010s,147 +2020s,27 +Unknown,203 +pre-2000,99 diff --git a/tests/semantic groupBy tests/movies/queries/query3_ground_truth.csv b/tests/semantic groupBy tests/movies/queries/query3_ground_truth.csv new file mode 100644 index 000000000..c6f5cf8f2 --- /dev/null +++ b/tests/semantic groupBy tests/movies/queries/query3_ground_truth.csv @@ -0,0 +1,4 @@ +audienceType,frac_positive,review_count,director +Adult,0.9166666666666666,408,Christopher Nolan +Teen,0.8308709175738724,2572,Christopher Nolan +Unrated,0.75,7,Christopher Nolan diff --git a/tests/semantic groupBy tests/movies/queries/query4_ground_truth.csv b/tests/semantic groupBy tests/movies/queries/query4_ground_truth.csv new file mode 100644 index 000000000..4b92f8fa0 --- /dev/null +++ b/tests/semantic groupBy tests/movies/queries/query4_ground_truth.csv @@ -0,0 +1,22 @@ +primaryGenre,isTopCritic,frac_positive,review_count +Action,False,0.38461538461538464,13 +Action,True,0.0,4 +Adventure,False,0.5,6 +Adventure,True,1.0,1 +Comedy,False,0.3888888888888889,36 +Comedy,True,0.2777777777777778,18 +Crime,False,0.8,5 +Crime,True,1.0,2 +Documentary,False,0.8823529411764706,34 +Documentary,True,0.8571428571428571,21 +Drama,False,0.8072289156626506,83 +Drama,True,0.7017543859649122,57 +History,False,0.8181818181818182,11 +History,True,0.6,5 +Mystery & thriller,False,0.8333333333333334,42 +Mystery & thriller,True,0.6,20 +Romance,False,0.6493506493506493,77 +Romance,True,0.6296296296296297,27 +Sci-fi,False,1.0,2 +Sci-fi,True,1.0,1 +War,False,1.0,1 diff --git a/tests/semantic groupBy tests/movies/queries/query5_ground_truth.csv b/tests/semantic groupBy tests/movies/queries/query5_ground_truth.csv new file mode 100644 index 000000000..9d6fd0494 --- /dev/null +++ b/tests/semantic groupBy tests/movies/queries/query5_ground_truth.csv @@ -0,0 +1,4 @@ +emotionalTone,review_count,director,genre +Disappointed,495,Steven Spielberg,Adventure +Enthusiastic,586,Steven Spielberg,Adventure +Measured,1009,Steven Spielberg,Adventure diff --git a/tests/semantic groupBy tests/movies/queries/query_1.py b/tests/semantic groupBy tests/movies/queries/query_1.py new file mode 100644 index 000000000..10202d26a --- /dev/null +++ b/tests/semantic groupBy tests/movies/queries/query_1.py @@ -0,0 +1,30 @@ +""" +Query 1 — Sentiment by Publication (Single Col, Semantic Agg) + +Query NL: "Group by publicationName and compute the fraction of positive reviews" +- group_cols: ["publicationName"] +- agg_cols: [LLM("reviewText") for POSITIVE/NEGATIVE] +- semantic group: no +- semantic agg: yes + +Ground truth from scoreSentiment column. +""" + +import pandas as pd + +def frac_positive(series): + num_pos = (series == "POSITIVE").sum() + return num_pos / len(series) if len(series) > 0 else 0.0 + +reviews = pd.read_csv("../movie_reviews.csv").head(500) + +result = ( + reviews + .groupby("publicatioName") + .agg(frac_positive_sentiment=("scoreSentiment", frac_positive)) + .reset_index() + .rename(columns={"frac_positive_sentiment": "frac_positive"}) +) + +result.to_csv("query1_ground_truth.csv", index=False) +print(f"Generated ground truth with {len(result)} publication groups") diff --git a/tests/semantic groupBy tests/movies/queries/query_2.py b/tests/semantic groupBy tests/movies/queries/query_2.py new file mode 100644 index 000000000..b8cdbb50d --- /dev/null +++ b/tests/semantic groupBy tests/movies/queries/query_2.py @@ -0,0 +1,42 @@ +""" +Query 2 — Critic Volume by Inferred Era (Single Col, Semantic Group) + +Query NL: "Group reviews by the era of the movie they reviewed (pre-2000, 2000s, 2010s, 2020s) + and count the number of reviews per era" +- group_cols: [LLM("reviewDate")] +- agg_cols: ["reviewId" (count)] +- semantic group: yes +- semantic agg: no + +Ground truth uses date parsing and rule-based era bucketing. +""" + +import pandas as pd + +reviews = pd.read_csv("../movie_reviews.csv").head(500) +movies = pd.read_csv("../movies.csv")[["id", "releaseDateTheaters"]] + +# Join to get the movie's release year +merged = reviews.merge(movies, on="id", how="left") +merged["releaseYear"] = pd.to_datetime( + merged["releaseDateTheaters"], errors="coerce" +).dt.year + +def era_bucket(year): + if pd.isna(year): return "Unknown" + if year < 2000: return "pre-2000" + if year < 2010: return "2000s" + if year < 2020: return "2010s" + return "2020s" + +merged["era"] = merged["releaseYear"].apply(era_bucket) + +result = ( + merged + .groupby("era") + .agg(review_count=("reviewId", "count")) + .reset_index() +) + +result.to_csv("query2_ground_truth.csv", index=False) +print(f"Generated ground truth with {len(result)} era groups") diff --git a/tests/semantic groupBy tests/movies/queries/query_3.py b/tests/semantic groupBy tests/movies/queries/query_3.py new file mode 100644 index 000000000..937753c70 --- /dev/null +++ b/tests/semantic groupBy tests/movies/queries/query_3.py @@ -0,0 +1,60 @@ +""" +Query 3 — Fraction Positive per Audience Type (Templatable, Semantic Group) + +Query NL: "For movies directed by {director}, group reviews by the audience type targeted + by the movie's MPAA rating (Children, Teen, Adult, Unrated) and compute the + fraction of positive reviews per audience type" +- group_cols: [LLM("rating") → audience type] +- agg_cols: [LLM("reviewText") → POSITIVE/NEGATIVE, frac_positive] +- semantic group: yes +- semantic agg: yes + +Ground truth uses MPAA rating mapping and scoreSentiment column. +""" + +import pandas as pd +import sys + +DIRECTOR = sys.argv[1] if len(sys.argv) > 1 else "Christopher Nolan" + +RATING_TO_AUDIENCE = { + "G": "Children", "PG": "Children", + "PG-13": "Teen", + "R": "Adult", "NC-17": "Adult", + "NR": "Unrated", "": "Unrated", +} + +def frac_positive(series): + return (series == "POSITIVE").sum() / len(series) if len(series) > 0 else 0.0 + +movies = pd.read_csv("../movies.csv") +reviews = pd.read_csv("../movie_reviews.csv") + +# Filter for director's movies +director_movies = movies[movies["director"].str.contains(DIRECTOR, na=False, case=False)][["id", "rating"]] +director_movies["audienceType"] = director_movies["rating"].map( + lambda r: RATING_TO_AUDIENCE.get(str(r).strip(), "Unrated") +) + +# merged = reviews.merge(director_movies, on="id", how="inner") + +print("director_movies shape:", director_movies.shape) +print(director_movies.head()) + +merged = director_movies.merge(reviews, on="id", how="left") +print("merged shape:", merged.shape) +print(merged.head()) + +result = ( + merged + .groupby("audienceType") + .agg( + frac_positive=("scoreSentiment", frac_positive), + review_count=("scoreSentiment", "count"), + ) + .reset_index() +) +result["director"] = DIRECTOR + +result.to_csv("query3_ground_truth.csv", index=False) +print(f"Generated ground truth for {DIRECTOR}: {len(result)} audience type groups") diff --git a/tests/semantic groupBy tests/movies/queries/query_4.py b/tests/semantic groupBy tests/movies/queries/query_4.py new file mode 100644 index 000000000..4ce1022de --- /dev/null +++ b/tests/semantic groupBy tests/movies/queries/query_4.py @@ -0,0 +1,38 @@ +""" +Query 6 — Sentiment and Top Critic Bias by Genre (Multi-Col, Semantic Group + Agg) + +Query NL: "Group reviews by inferred genre of the movie and whether the reviewer is a top critic, + and compute the fraction of positive reviews" +- group_cols: [LLM("reviewText") for the genre, "isTopCritic"] +- agg_cols: [LLM("reviewText") for POSITIVE/NEGATIVE, frac_positive] +- semantic group: yes (genre inferred from review text) +- semantic agg: yes (sentiment inferred from reviewText) + +Ground truth obtained by joining to movies table for genre. +""" + +import pandas as pd + +def frac_positive(series): + return (series == "POSITIVE").sum() / len(series) if len(series) > 0 else 0.0 + +movies = pd.read_csv("../movies.csv")[["id", "genre"]] +reviews = pd.read_csv("../movie_reviews.csv").head(500) + +merged = reviews.merge(movies, on="id", how="left") +# Coarsen multi-genre entries to primary genre +merged["primaryGenre"] = merged["genre"].str.split(",").str[0].str.strip() + +result = ( + merged + .dropna(subset=["primaryGenre", "isTopCritic"]) + .groupby(["primaryGenre", "isTopCritic"]) + .agg( + frac_positive=("scoreSentiment", frac_positive), + review_count=("scoreSentiment", "count"), + ) + .reset_index() +) + +result.to_csv("query4_ground_truth.csv", index=False) +print(f"Generated ground truth with {len(result)} genre-topcritic groups") diff --git a/tests/semantic groupBy tests/movies/queries/query_5.py b/tests/semantic groupBy tests/movies/queries/query_5.py new file mode 100644 index 000000000..20d155979 --- /dev/null +++ b/tests/semantic groupBy tests/movies/queries/query_5.py @@ -0,0 +1,59 @@ +""" +Query 7 — Sentiment by Director and Genre (Templatable, Mixed Group + Semantic Agg) + +Query NL: "For movies directed by {director} in the {genre} genre, group reviews by + the emotional tone of the review (Enthusiastic, Measured, Disappointed) and + count the number of reviews per tone" +- group_cols: ["director" (literal, filtered), "genre" (literal, filtered), + LLM("reviewText") → emotional tone] +- agg_cols: ["reviewId" (count)] +- semantic group: mixed (director and genre are filter/literal; tone is semantic) +- semantic agg: no + +Ground truth approximation: map scoreSentiment + originalScore to ternary label. +""" + +import pandas as pd +import sys + +DIRECTOR = sys.argv[1] if len(sys.argv) > 1 else "Steven Spielberg" +GENRE = sys.argv[2] if len(sys.argv) > 2 else "Adventure" + +def approx_tone(row): + sentiment = row["scoreSentiment"] + score_str = str(row["originalScore"]) + # Parse scores like "4/5", "3.5/4", "A", "B+" — use sentiment as fallback + if sentiment == "NEGATIVE": + return "Disappointed" + # Try to parse numeric score to detect Enthusiastic vs Measured + try: + parts = score_str.split("/") + if len(parts) == 2: + ratio = float(parts[0]) / float(parts[1]) + return "Enthusiastic" if ratio >= 0.8 else "Measured" + except Exception: + pass + return "Measured" # default for POSITIVE without parseable score + +movies = pd.read_csv("../movies.csv") +reviews = pd.read_csv("../movie_reviews.csv") + +filtered_movies = movies[ + movies["director"].str.contains(DIRECTOR, na=False, case=False) & + movies["genre"].str.contains(GENRE, na=False, case=False) +][["id"]] + +merged = reviews.merge(filtered_movies, on="id", how="inner") +merged["emotionalTone"] = merged.apply(approx_tone, axis=1) + +result = ( + merged + .groupby("emotionalTone") + .agg(review_count=("reviewId", "count")) + .reset_index() +) +result["director"] = DIRECTOR +result["genre"] = GENRE + +result.to_csv("query5_ground_truth.csv", index=False) +print(f"Generated ground truth for {DIRECTOR} in {GENRE}: {len(result)} tone groups") diff --git a/tests/semantic groupBy tests/movies/queries/query_6.py b/tests/semantic groupBy tests/movies/queries/query_6.py new file mode 100644 index 000000000..e53951944 --- /dev/null +++ b/tests/semantic groupBy tests/movies/queries/query_6.py @@ -0,0 +1,78 @@ +""" +Query 6 — Most Positive Review by Director (Semantic GroupBy + Numeric Agg) + +Query NL: "Group by director and find the most positive review per director" +- group_cols: ["director" (literal, from movies table)] +- agg_cols: [max(normalizedScore) from originalScore] +- semantic group: no (director is a literal column) +- semantic agg: yes (LLM("reviewText") used in PZ to score sentiment) + +Ground truth: + 1. Join movie_reviews with movies on id to get director per review. + 2. Drop records where originalScore is missing or unparseable. + 3. Normalize originalScore ("3.5/4", "4/5", etc.) to [0, 1]. + 4. For each director, select the review with the highest normalized score. + +do it for each director and compute the distance between the score of the most positive +review using sem_groupBy (LLM(reviewText)) to actual output from python (ground truth). + +doing directionally better. (don't worry about the exact numbers, just want to see if +it's improving or not). Show that these optimisations can get better performance and +then bake it into the query optimiser. (put it into the PZ and show the optimiser +can pick the best one) +""" + +import pandas as pd + + +def parse_score(score_str): + """ + Parse scores like "3.5/4", "4/5", "1/10" into a float in [0, 1]. + Returns None if the string is missing or unparseable. + """ + if pd.isna(score_str) or str(score_str).strip() == "": + return None + parts = str(score_str).strip().split("/") + if len(parts) == 2: + try: + numerator = float(parts[0]) + denominator = float(parts[1]) + if denominator == 0: + return None + return numerator / denominator + except ValueError: + return None + return None + + +reviews = pd.read_csv("../movie_reviews.csv") +movies = pd.read_csv("../movies.csv")[["id", "director"]] + +# Join to get director for each review +merged = reviews.merge(movies, on="id", how="left") + +# Drop records with missing originalScore +merged = merged.dropna(subset=["originalScore"]) +merged = merged[merged["originalScore"].str.strip() != ""] + +# Normalize originalScore to [0, 1] +merged["normalizedScore"] = merged["originalScore"].apply(parse_score) + +# Drop records where score could not be parsed +merged = merged.dropna(subset=["normalizedScore"]) + +# Drop records with missing director +merged = merged.dropna(subset=["director"]) + +# For each director, pick the review with the highest normalized score +result = ( + merged + .sort_values("normalizedScore", ascending=False) + .groupby("director", as_index=False) + .first()[["director", "normalizedScore", "reviewText", "originalScore"]] +) + +result = result.sort_values("director").reset_index(drop=True) + +result.to_csv("query6_ground_truth.csv", index=False) +print(f"Generated ground truth with {len(result)} directors") \ No newline at end of file diff --git a/tests/semantic groupBy tests/movies/rerun_comparison.py b/tests/semantic groupBy tests/movies/rerun_comparison.py new file mode 100644 index 000000000..ca26dc949 --- /dev/null +++ b/tests/semantic groupBy tests/movies/rerun_comparison.py @@ -0,0 +1,241 @@ +#!/usr/bin/env python3 +""" +Rerun comparisons against already-generated PZ / baseline outputs. + +What this script does: + 1. Regenerates the ground-truth CSVs for Q3 and Q5 (fixing input inconsistencies). + 2. Recomputes comparison metrics (using normalized MAE quality) for every query + and system (pz / baseline) using the *existing* output CSVs — the PZ and + baseline programs themselves are NOT re-run. + 3. Writes updated comparison JSONs. + 4. Calls analyze.py to regenerate all figures and tables. + +Usage: + python rerun_comparison.py [--policies maxquality] [--ids 3,5] +""" + +from __future__ import annotations + +import argparse +import json +import subprocess +import sys +from pathlib import Path +from typing import Any + +import pandas as pd +from pandas.api.types import is_numeric_dtype + +BASE_DIR = Path(__file__).resolve().parent +QUERIES_DIR = BASE_DIR / "queries" +RESULTS_DIR = BASE_DIR / "results" +ANALYZE_SCRIPT = RESULTS_DIR / "analyze.py" + + +# ─── Quality metric (normalized MAE) ────────────────────────────────────────── + +def _compare_outputs( + gt_df: pd.DataFrame, + pred_df: pd.DataFrame, + pred_suffix: str, # "pz" or "baseline" + tol: float, +) -> dict[str, Any]: + common_cols = sorted(set(gt_df.columns).intersection(pred_df.columns)) + if not common_cols: + return { + "pass": False, + "reason": "no_common_columns", + "num_rows_gt": len(gt_df), + f"num_rows_{pred_suffix}": len(pred_df), + } + + key_cols, numeric_cols = [], [] + for col in common_cols: + if is_numeric_dtype(gt_df[col]) and is_numeric_dtype(pred_df[col]): + numeric_cols.append(col) + else: + key_cols.append(col) + + if key_cols: + merged = gt_df.merge( + pred_df, + on=key_cols, + how="outer", + suffixes=("_gt", f"_{pred_suffix}"), + indicator=True, + ) + missing_in_pred = int((merged["_merge"] == "left_only").sum()) + missing_in_gt = int((merged["_merge"] == "right_only").sum()) + compare_rows = merged[merged["_merge"] == "both"] + else: + gt_s = gt_df.sort_values(by=common_cols).reset_index(drop=True) + pred_s = pred_df.sort_values(by=common_cols).reset_index(drop=True) + n = min(len(gt_s), len(pred_s)) + compare_rows = pd.concat( + [gt_s.iloc[:n].add_suffix("_gt"), pred_s.iloc[:n].add_suffix(f"_{pred_suffix}")], + axis=1, + ) + missing_in_pred = max(0, len(gt_s) - len(pred_s)) + missing_in_gt = max(0, len(pred_s) - len(gt_s)) + + metrics: dict[str, Any] = { + "num_rows_gt": len(gt_df), + f"num_rows_{pred_suffix}": len(pred_df), + f"missing_in_{pred_suffix}": missing_in_pred, + "missing_in_gt": missing_in_gt, + "num_compared": len(compare_rows), + } + + max_abs_error = mean_abs_error = mean_norm_error = None + mismatched_rows = 0 + + if numeric_cols and len(compare_rows) > 0: + abs_errors, norm_errors = [], [] + for col in numeric_cols: + gt_col = f"{col}_gt" + pred_col = f"{col}_{pred_suffix}" + if gt_col not in compare_rows or pred_col not in compare_rows: + continue + diff = (compare_rows[gt_col] - compare_rows[pred_col]).abs() + abs_errors.append(diff) + gt_mean = compare_rows[gt_col].abs().mean() + norm_errors.append(diff / gt_mean if gt_mean > 0 else diff) + + if abs_errors: + all_abs = pd.concat(abs_errors, axis=1) + all_norm = pd.concat(norm_errors, axis=1) + max_abs_error = float(all_abs.max().max()) + mean_abs_error = float(all_abs.mean().mean()) + mean_norm_error = float(all_norm.mean().mean()) + mismatched_rows = int((all_abs.max(axis=1) > tol).sum()) + + metrics.update({ + "max_abs_error": max_abs_error, + "mean_abs_error": mean_abs_error, + "mismatched_rows": mismatched_rows, + }) + + passed = ( + missing_in_pred == 0 + and missing_in_gt == 0 + and (max_abs_error is None or max_abs_error <= tol) + and mismatched_rows == 0 + ) + metrics["pass"] = bool(passed) + + if mean_norm_error is not None: + metrics["quality_score"] = float(max(0.0, 1.0 - mean_norm_error)) + + return metrics + + +# ─── Ground-truth regeneration ──────────────────────────────────────────────── + +def _regen_ground_truth(query_ids: list[int]) -> None: + """Re-run the GT scripts for the given query IDs.""" + for qid in query_ids: + script = QUERIES_DIR / f"query_{qid}.py" + if not script.exists(): + print(f" [GT] query_{qid}.py not found — skipping") + continue + print(f" [GT] Regenerating ground truth for query {qid}...") + subprocess.run( + [sys.executable, str(script)], + cwd=str(QUERIES_DIR), + check=True, + ) + + +# ─── Main ───────────────────────────────────────────────────────────────────── + +def main() -> None: + parser = argparse.ArgumentParser(description="Rerun comparison (no PZ re-execution)") + parser.add_argument("--policies", default="maxquality", + help="Comma-separated policies (default: maxquality)") + parser.add_argument("--ids", default="", + help="Comma-separated query IDs to update (default: all found)") + parser.add_argument("--regen-gt-ids", default="5", + help="Comma-separated IDs whose GT CSV should be regenerated " + "(default: 5, which had the approxTone→emotionalTone fix)") + parser.add_argument("--tolerance", type=float, default=1e-6) + parser.add_argument("--skip-analyze", action="store_true", + help="Skip calling analyze.py at the end") + args = parser.parse_args() + + policies = [p.strip() for p in args.policies.split(",") if p.strip()] + regen_gt_ids = [int(x) for x in args.regen_gt_ids.split(",") if x.strip().isdigit()] + requested_ids = {int(x) for x in args.ids.split(",") if x.strip().isdigit()} + + # Step 1 – regenerate ground-truth CSVs for the fixed queries + if regen_gt_ids: + print("\n── Regenerating ground-truth CSVs ──") + _regen_ground_truth(regen_gt_ids) + + # Step 2 – find all queries that have a ground-truth CSV + gt_paths = { + int(p.stem.replace("query", "").replace("_ground_truth", "")): p + for p in QUERIES_DIR.glob("query*_ground_truth.csv") + } + if requested_ids: + gt_paths = {k: v for k, v in gt_paths.items() if k in requested_ids} + + print(f"\n── Recomputing comparisons for queries: {sorted(gt_paths)} ──") + + for policy in policies: + policy_dir = RESULTS_DIR / policy + if not policy_dir.exists(): + print(f" Policy dir not found: {policy_dir} — skipping") + continue + + for qid, gt_path in sorted(gt_paths.items()): + gt_df = pd.read_csv(gt_path) + + for pred_suffix, json_name, csv_name in [ + ("pz", f"query{qid}_comparison.json", f"query{qid}_pz_output.csv"), + ("baseline", f"query{qid}_baseline_comparison.json", f"query{qid}_baseline_output.csv"), + ]: + pred_csv = policy_dir / csv_name + json_path = policy_dir / json_name + + if not pred_csv.exists(): + print(f" [Q{qid}][{policy}][{pred_suffix}] output CSV missing — skipping") + continue + + pred_df = pd.read_csv(pred_csv) + compare = _compare_outputs(gt_df, pred_df, pred_suffix, args.tolerance) + + # Preserve execution stats from the existing JSON + exec_stats: dict[str, Any] = {} + if json_path.exists(): + with open(json_path) as f: + old = json.load(f) + for key in ("total_execution_time", "total_execution_cost", + "total_tokens", "optimization_time", "plan_execution_time"): + exec_stats[key] = old.get(key) + + row = { + "test_id": qid, + "policy": policy, + **exec_stats, + **compare, + } + with open(json_path, "w") as f: + json.dump(row, f, indent=2) + + q_score = compare.get("quality_score", "n/a") + status = "PASS" if compare.get("pass") else "FAIL" + print(f" [Q{qid}][{policy}][{pred_suffix}] {status} quality={q_score:.4f}" if isinstance(q_score, float) else f" [Q{qid}][{policy}][{pred_suffix}] {status} quality={q_score}") + + # Step 3 – regenerate figures + if not args.skip_analyze: + if ANALYZE_SCRIPT.exists(): + print(f"\n── Regenerating figures ({ANALYZE_SCRIPT.name}) ──") + subprocess.run([sys.executable, str(ANALYZE_SCRIPT)], check=True) + else: + print(f"\nanalyze.py not found at {ANALYZE_SCRIPT} — skipping figure generation") + + print("\nDone.") + + +if __name__ == "__main__": + main() diff --git a/tests/semantic groupBy tests/movies/run_baseline_tests.py b/tests/semantic groupBy tests/movies/run_baseline_tests.py new file mode 100644 index 000000000..31269d01c --- /dev/null +++ b/tests/semantic groupBy tests/movies/run_baseline_tests.py @@ -0,0 +1,267 @@ +#!/usr/bin/env python3 +""" +Run baseline group-by tests: execute baseline implementations (sem_map + groupby), +compare outputs against ground truth, and log performance metrics. + +Results are written to the same results/ folder as the sem_groupby tests, +with '_baseline' suffixed filenames so both approaches can be compared side-by-side. +""" + +from __future__ import annotations + +import argparse +import json +import subprocess +import sys +from pathlib import Path +from typing import Any + +import pandas as pd +from pandas.api.types import is_numeric_dtype + + +BASE_DIR = Path(__file__).resolve().parent +QUERIES_DIR = BASE_DIR / "queries" +BASELINE_DIR = BASE_DIR / "pz-baseline" +RESULTS_DIR = BASE_DIR / "results" + + +def _discover_tests() -> list[dict[str, Path]]: + """Find matching pairs of ground-truth query scripts and baseline scripts.""" + query_files = {} + for path in QUERIES_DIR.glob("query_*.py"): + parts = path.stem.split("_") + if len(parts) == 2 and parts[1].isdigit(): + query_files[int(parts[1])] = path + + baseline_files = {} + for path in BASELINE_DIR.glob("query_*_baseline.py"): + parts = path.stem.split("_") + if len(parts) == 3 and parts[1].isdigit() and parts[2] == "baseline": + baseline_files[int(parts[1])] = path + + test_ids = sorted(set(query_files).intersection(baseline_files)) + tests = [] + for test_id in test_ids: + tests.append({ + "id": test_id, + "query_script": query_files[test_id], + "baseline_script": baseline_files[test_id], + }) + return tests + + +def _run_script(script_path: Path, cwd: Path, args: list[str]) -> None: + cmd = [sys.executable, str(script_path), *args] + subprocess.run(cmd, cwd=str(cwd), check=True) + + +def _ground_truth_output_path(test_id: int) -> Path: + return QUERIES_DIR / f"query{test_id}_ground_truth.csv" + + +def _compare_outputs(gt_df: pd.DataFrame, baseline_df: pd.DataFrame, tol: float) -> dict[str, Any]: + common_cols = sorted(set(gt_df.columns).intersection(baseline_df.columns)) + if not common_cols: + return { + "pass": False, + "reason": "no_common_columns", + "num_rows_gt": len(gt_df), + "num_rows_baseline": len(baseline_df), + } + + key_cols = [] + numeric_cols = [] + for col in common_cols: + gt_is_num = is_numeric_dtype(gt_df[col]) + bl_is_num = is_numeric_dtype(baseline_df[col]) + if gt_is_num and bl_is_num: + numeric_cols.append(col) + else: + key_cols.append(col) + + if key_cols: + merged = gt_df.merge( + baseline_df, + on=key_cols, + how="outer", + suffixes=("_gt", "_baseline"), + indicator=True, + ) + missing_in_baseline = (merged["_merge"] == "left_only").sum() + missing_in_gt = (merged["_merge"] == "right_only").sum() + compare_rows = merged[merged["_merge"] == "both"] + else: + gt_sorted = gt_df.sort_values(by=common_cols).reset_index(drop=True) + bl_sorted = baseline_df.sort_values(by=common_cols).reset_index(drop=True) + min_len = min(len(gt_sorted), len(bl_sorted)) + compare_rows = pd.concat( + [ + gt_sorted.iloc[:min_len].add_suffix("_gt"), + bl_sorted.iloc[:min_len].add_suffix("_baseline"), + ], + axis=1, + ) + missing_in_baseline = max(0, len(gt_sorted) - len(bl_sorted)) + missing_in_gt = max(0, len(bl_sorted) - len(gt_sorted)) + + metrics: dict[str, Any] = { + "num_rows_gt": len(gt_df), + "num_rows_baseline": len(baseline_df), + "missing_in_baseline": int(missing_in_baseline), + "missing_in_gt": int(missing_in_gt), + "num_compared": int(len(compare_rows)), + } + + max_abs_error = None + mean_abs_error = None + mismatched_rows = 0 + + if numeric_cols: + abs_errors = [] + norm_errors = [] + for col in numeric_cols: + gt_col = f"{col}_gt" + bl_col = f"{col}_baseline" + if gt_col not in compare_rows or bl_col not in compare_rows: + continue + diff = (compare_rows[gt_col] - compare_rows[bl_col]).abs() + abs_errors.append(diff) + # Normalize by GT column mean so different-scale metrics contribute equally + gt_mean = compare_rows[gt_col].abs().mean() + norm_diff = diff / gt_mean if gt_mean > 0 else diff + norm_errors.append(norm_diff) + + if abs_errors: + all_errors = pd.concat(abs_errors, axis=1) + all_norm_errors = pd.concat(norm_errors, axis=1) + max_abs_error = float(all_errors.max().max()) + mean_abs_error = float(all_errors.mean().mean()) + mean_norm_error = float(all_norm_errors.mean().mean()) + mismatched_rows = int((all_errors.max(axis=1) > tol).sum()) + + metrics.update({ + "max_abs_error": max_abs_error, + "mean_abs_error": mean_abs_error, + "mismatched_rows": mismatched_rows, + }) + + passed = ( + missing_in_baseline == 0 + and missing_in_gt == 0 + and (max_abs_error is None or max_abs_error <= tol) + and mismatched_rows == 0 + ) + + metrics["pass"] = bool(passed) + if mean_abs_error is not None: + # Normalized MAE: each column's errors are scaled by its GT mean, + # so large-magnitude metrics (e.g. review_count) don't drown out + # small-magnitude ones (e.g. frac_positive). + metrics["quality_score"] = float(max(0.0, 1.0 - mean_norm_error)) + + return metrics + + +def main() -> None: + parser = argparse.ArgumentParser(description="Run baseline group-by tests") + parser.add_argument("--policies", type=str, default="maxquality,mincost,mintime", + help="Comma-separated list of policies to run") + parser.add_argument("--execution-strategy", type=str, default="sequential", + help="One of 'sequential', 'pipelined', 'parallel'") + parser.add_argument("--tolerance", type=float, default=1e-6) + parser.add_argument("--regen-ground-truth", action="store_true", + help="Re-run ground truth scripts even if output already exists") + parser.add_argument("--ids", type=str, default="", + help="Comma-separated test ids to run (e.g., '1,2,3')") + args = parser.parse_args() + + policies = [p.strip() for p in args.policies.split(",") if p.strip()] + requested_ids = {int(x) for x in args.ids.split(",") if x.strip().isdigit()} + + tests = _discover_tests() + if requested_ids: + tests = [t for t in tests if t["id"] in requested_ids] + + if not tests: + print("No baseline tests found.") + return + + print(f"Found {len(tests)} test(s): {[t['id'] for t in tests]}") + print(f"Policies: {policies}") + print(f"Execution strategy: {args.execution_strategy}\n") + + RESULTS_DIR.mkdir(parents=True, exist_ok=True) + summary_rows = [] + + for test in tests: + test_id = test["id"] + + # Generate / load ground truth + gt_output = _ground_truth_output_path(test_id) + if args.regen_ground_truth or not gt_output.exists(): + print(f"[query {test_id}] Generating ground truth...") + _run_script(test["query_script"], QUERIES_DIR, []) + + if not gt_output.exists(): + print(f"[query {test_id}] Ground truth missing: {gt_output} — skipping") + continue + + gt_df = pd.read_csv(gt_output) + + for policy in policies: + policy_dir = RESULTS_DIR / policy + policy_dir.mkdir(parents=True, exist_ok=True) + + baseline_output = policy_dir / f"query{test_id}_baseline_output.csv" + stats_output = policy_dir / f"query{test_id}_baseline_stats.json" + + print(f"[query {test_id}][{policy}] Running baseline...") + _run_script( + test["baseline_script"], + BASELINE_DIR, + [ + "--policy", policy, + "--execution-strategy", args.execution_strategy, + "--output", str(baseline_output), + "--stats-output", str(stats_output), + ], + ) + + baseline_df = pd.read_csv(baseline_output) if baseline_output.exists() else pd.DataFrame() + compare_metrics = _compare_outputs(gt_df, baseline_df, args.tolerance) + + exec_metrics: dict[str, Any] = {} + if stats_output.exists(): + with open(stats_output) as f: + stats = json.load(f) + exec_metrics = { + "total_execution_time": stats.get("total_execution_time"), + "total_execution_cost": stats.get("total_execution_cost"), + "total_tokens": stats.get("total_tokens"), + "optimization_time": stats.get("optimization_time"), + "plan_execution_time": stats.get("plan_execution_time"), + } + + row = { + "test_id": test_id, + "policy": policy, + **exec_metrics, + **compare_metrics, + } + summary_rows.append(row) + + result_json = policy_dir / f"query{test_id}_baseline_comparison.json" + with open(result_json, "w") as f: + json.dump(row, f, indent=2) + + status = "PASS" if compare_metrics.get("pass") else "FAIL" + print(f"[query {test_id}][{policy}] {status}") + + summary_path = RESULTS_DIR / "baseline_summary.csv" + pd.DataFrame(summary_rows).to_csv(summary_path, index=False) + print(f"\nSummary written to: {summary_path}") + + +if __name__ == "__main__": + main() diff --git a/tests/semantic groupBy tests/movies/run_groupby_tests.py b/tests/semantic groupBy tests/movies/run_groupby_tests.py new file mode 100644 index 000000000..1cf5c7dab --- /dev/null +++ b/tests/semantic groupBy tests/movies/run_groupby_tests.py @@ -0,0 +1,254 @@ +#!/usr/bin/env python3 +""" +Run semantic group-by tests: generate ground truth, execute PZ programs, +compare outputs, and log performance metrics. +""" + +from __future__ import annotations + +import argparse +import json +import subprocess +import sys +from pathlib import Path +from typing import Any + +import pandas as pd +from pandas.api.types import is_numeric_dtype + + +BASE_DIR = Path(__file__).resolve().parent +QUERIES_DIR = BASE_DIR / "queries" +PZ_DIR = BASE_DIR / "pz-programs" +RESULTS_DIR = BASE_DIR / "results" + + +def _discover_tests() -> list[dict[str, Path]]: + query_files = {} + for path in QUERIES_DIR.glob("query_*.py"): + parts = path.stem.split("_") + if len(parts) == 2 and parts[1].isdigit(): + query_files[int(parts[1])] = path + + pz_files = {} + for path in PZ_DIR.glob("query_*_pz.py"): + parts = path.stem.split("_") + if len(parts) == 3 and parts[1].isdigit() and parts[2] == "pz": + pz_files[int(parts[1])] = path + + test_ids = sorted(set(query_files).intersection(pz_files)) + tests = [] + for test_id in test_ids: + tests.append({ + "id": test_id, + "query_script": query_files[test_id], + "pz_script": pz_files[test_id], + }) + return tests + + +def _run_script(script_path: Path, cwd: Path, args: list[str]) -> None: + cmd = [sys.executable, str(script_path), *args] + subprocess.run(cmd, cwd=str(cwd), check=True) + + +def _ground_truth_output_path(test_id: int) -> Path: + return QUERIES_DIR / f"query{test_id}_ground_truth.csv" + + +def _compare_outputs(gt_df: pd.DataFrame, pz_df: pd.DataFrame, tol: float) -> dict[str, Any]: + common_cols = sorted(set(gt_df.columns).intersection(pz_df.columns)) + if not common_cols: + return { + "pass": False, + "reason": "no_common_columns", + "num_rows_gt": len(gt_df), + "num_rows_pz": len(pz_df), + } + + key_cols = [] + numeric_cols = [] + for col in common_cols: + gt_is_num = is_numeric_dtype(gt_df[col]) + pz_is_num = is_numeric_dtype(pz_df[col]) + if gt_is_num and pz_is_num: + numeric_cols.append(col) + else: + key_cols.append(col) + + if key_cols: + merged = gt_df.merge( + pz_df, + on=key_cols, + how="outer", + suffixes=("_gt", "_pz"), + indicator=True, + ) + missing_in_pz = (merged["_merge"] == "left_only").sum() + missing_in_gt = (merged["_merge"] == "right_only").sum() + compare_rows = merged[merged["_merge"] == "both"] + else: + gt_sorted = gt_df.sort_values(by=common_cols).reset_index(drop=True) + pz_sorted = pz_df.sort_values(by=common_cols).reset_index(drop=True) + min_len = min(len(gt_sorted), len(pz_sorted)) + compare_rows = pd.concat( + [ + gt_sorted.iloc[:min_len].add_suffix("_gt"), + pz_sorted.iloc[:min_len].add_suffix("_pz"), + ], + axis=1, + ) + missing_in_pz = max(0, len(gt_sorted) - len(pz_sorted)) + missing_in_gt = max(0, len(pz_sorted) - len(gt_sorted)) + + metrics: dict[str, Any] = { + "num_rows_gt": len(gt_df), + "num_rows_pz": len(pz_df), + "missing_in_pz": int(missing_in_pz), + "missing_in_gt": int(missing_in_gt), + "num_compared": int(len(compare_rows)), + } + + max_abs_error = None + mean_abs_error = None + mismatched_rows = 0 + + if numeric_cols: + abs_errors = [] + norm_errors = [] + for col in numeric_cols: + gt_col = f"{col}_gt" + pz_col = f"{col}_pz" + if gt_col not in compare_rows or pz_col not in compare_rows: + continue + diff = (compare_rows[gt_col] - compare_rows[pz_col]).abs() + abs_errors.append(diff) + # Normalize by GT column mean so different-scale metrics contribute equally + gt_mean = compare_rows[gt_col].abs().mean() + norm_diff = diff / gt_mean if gt_mean > 0 else diff + norm_errors.append(norm_diff) + + if abs_errors: + all_errors = pd.concat(abs_errors, axis=1) + all_norm_errors = pd.concat(norm_errors, axis=1) + max_abs_error = float(all_errors.max().max()) + mean_abs_error = float(all_errors.mean().mean()) + mean_norm_error = float(all_norm_errors.mean().mean()) + mismatched_rows = int((all_errors.max(axis=1) > tol).sum()) + + metrics.update({ + "max_abs_error": max_abs_error, + "mean_abs_error": mean_abs_error, + "mismatched_rows": mismatched_rows, + }) + + passed = ( + missing_in_pz == 0 + and missing_in_gt == 0 + and (max_abs_error is None or max_abs_error <= tol) + and mismatched_rows == 0 + ) + + metrics["pass"] = bool(passed) + if mean_abs_error is not None: + # Normalized MAE: each column's errors are scaled by its GT mean, + # so large-magnitude metrics (e.g. review_count) don't drown out + # small-magnitude ones (e.g. frac_positive). + quality_score = max(0.0, 1.0 - mean_norm_error) + metrics["quality_score"] = float(quality_score) + + return metrics + + +def main() -> None: + parser = argparse.ArgumentParser(description="Run semantic group-by tests") + parser.add_argument("--policies", type=str, default="maxquality,mincost,mintime", + help="Comma-separated list of policies to run") + parser.add_argument("--execution-strategy", type=str, default="sequential", + help="One of 'sequential', 'pipelined', 'parallel'") + parser.add_argument("--tolerance", type=float, default=1e-6) + parser.add_argument("--regen-ground-truth", action="store_true") + parser.add_argument("--ids", type=str, default="", + help="Comma-separated test ids to run (e.g., '1,2,3')") + args = parser.parse_args() + + policies = [p.strip() for p in args.policies.split(",") if p.strip()] + requested_ids = {int(x) for x in args.ids.split(",") if x.strip().isdigit()} + + tests = _discover_tests() + if requested_ids: + tests = [t for t in tests if t["id"] in requested_ids] + + if not tests: + print("No tests found.") + return + + RESULTS_DIR.mkdir(parents=True, exist_ok=True) + summary_rows = [] + + for test in tests: + test_id = test["id"] + gt_output = _ground_truth_output_path(test_id) + if args.regen_ground_truth or not gt_output.exists(): + _run_script(test["query_script"], QUERIES_DIR, []) + + if not gt_output.exists(): + print(f"Ground truth missing for query {test_id}: {gt_output}") + continue + + gt_df = pd.read_csv(gt_output) + + for policy in policies: + policy_dir = RESULTS_DIR / policy + policy_dir.mkdir(parents=True, exist_ok=True) + pz_output = policy_dir / f"query{test_id}_pz_output.csv" + stats_output = policy_dir / f"query{test_id}_pz_stats.json" + + _run_script( + test["pz_script"], + PZ_DIR, + [ + "--policy", policy, + "--execution-strategy", args.execution_strategy, + "--output", str(pz_output), + "--stats-output", str(stats_output), + ], + ) + + pz_df = pd.read_csv(pz_output) if pz_output.exists() else pd.DataFrame() + compare_metrics = _compare_outputs(gt_df, pz_df, args.tolerance) + + exec_metrics: dict[str, Any] = {} + if stats_output.exists(): + with open(stats_output, "r") as f: + stats = json.load(f) + exec_metrics = { + "total_execution_time": stats.get("total_execution_time"), + "total_execution_cost": stats.get("total_execution_cost"), + "total_tokens": stats.get("total_tokens"), + "optimization_time": stats.get("optimization_time"), + "plan_execution_time": stats.get("plan_execution_time"), + } + + row = { + "test_id": test_id, + "policy": policy, + **exec_metrics, + **compare_metrics, + } + summary_rows.append(row) + + result_json = policy_dir / f"query{test_id}_comparison.json" + with open(result_json, "w") as f: + json.dump(row, f, indent=2) + + status = "PASS" if compare_metrics.get("pass") else "FAIL" + print(f"[query {test_id}][{policy}] {status}") + + summary_path = RESULTS_DIR / "summary.csv" + pd.DataFrame(summary_rows).to_csv(summary_path, index=False) + print(f"\nSummary written to: {summary_path}") + + +if __name__ == "__main__": + main()