From 233fe8cd5e1bf60b3c4daa12c7d417b2029abe0e Mon Sep 17 00:00:00 2001 From: Areeb29 Date: Wed, 7 Jan 2026 12:57:01 +0100 Subject: [PATCH] Sem map/join: provenance + stable sqlite path + idempotent seed --- examples/db_examples/sql_sem_join.py | 146 ++++++++++++ examples/db_examples/sql_sem_map.py | 119 ++++++++++ lotus/sem_ops/sem_join.py | 325 ++++++--------------------- lotus/sem_ops/sem_map.py | 242 +++++++------------- 4 files changed, 417 insertions(+), 415 deletions(-) create mode 100644 examples/db_examples/sql_sem_join.py create mode 100644 examples/db_examples/sql_sem_map.py diff --git a/examples/db_examples/sql_sem_join.py b/examples/db_examples/sql_sem_join.py new file mode 100644 index 00000000..bb7e5eae --- /dev/null +++ b/examples/db_examples/sql_sem_join.py @@ -0,0 +1,146 @@ +from __future__ import annotations + +import sqlite3 +from pathlib import Path + +import lotus +from lotus.data_connectors import DataConnector +from lotus.models import LM + +# Always place DB next to this file (stable, no “random working dir” DBs) +DB_PATH = Path(__file__).resolve().with_name("example_movies.db") + +# Choose behavior: +# - "insert_missing": only insert rows whose id doesn't exist (no overwrites) +# - "upsert": insert new rows and update existing rows if ids match +SEED_MODE = "insert_missing" + +MOVIES_ROWS = [ + (0, "The Matrix", "Wachowskis", 8.7, 1999, "A hacker discovers the reality is simulated."), + (1, "The Godfather", "Francis Coppola", 9.2, 1972, "The rise and fall of a powerful mafia family."), + (2, "Inception", "Christopher Nolan", 8.8, 2010, "A thief enters dreams to steal secrets."), + (3, "Parasite", "Bong Joon-ho", 8.6, 2019, "A poor family schemes to infiltrate a rich household."), + (4, "Interstellar", "Christopher Nolan", 8.6, 2014, "A team travels through a wormhole to save humanity."), + (5, "Titanic", "James Cameron", 7.8, 1997, "A love story set during the Titanic tragedy."), +] + +CATEGORIES_ROWS = [ + (10, "Science Fiction"), + (11, "Crime / Mafia"), + (12, "Thriller / Heist"), + (13, "Drama"), + (14, "Romance"), +] + + +def seed(db_path: Path, mode: str = "insert_missing") -> None: + db_path.parent.mkdir(parents=True, exist_ok=True) + + conn = sqlite3.connect(str(db_path)) + cur = conn.cursor() + + # Tables + cur.execute( + """ + CREATE TABLE IF NOT EXISTS movies ( + id INTEGER PRIMARY KEY, + title TEXT NOT NULL, + director TEXT NOT NULL, + rating REAL NOT NULL, + release_year INTEGER NOT NULL, + description TEXT NOT NULL + ) + """ + ) + + cur.execute( + """ + CREATE TABLE IF NOT EXISTS categories ( + id INTEGER PRIMARY KEY, + category TEXT NOT NULL + ) + """ + ) + + if mode == "insert_missing": + # Insert only new rows; ignore if id already exists + cur.executemany( + """ + INSERT OR IGNORE INTO movies (id, title, director, rating, release_year, description) + VALUES (?, ?, ?, ?, ?, ?) + """, + MOVIES_ROWS, + ) + cur.executemany( + """ + INSERT OR IGNORE INTO categories (id, category) + VALUES (?, ?) + """, + CATEGORIES_ROWS, + ) + + elif mode == "upsert": + # Insert new rows; update existing rows on id collision + cur.executemany( + """ + INSERT INTO movies (id, title, director, rating, release_year, description) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + title = excluded.title, + director = excluded.director, + rating = excluded.rating, + release_year = excluded.release_year, + description = excluded.description + """, + MOVIES_ROWS, + ) + cur.executemany( + """ + INSERT INTO categories (id, category) + VALUES (?, ?) + ON CONFLICT(id) DO UPDATE SET + category = excluded.category + """, + CATEGORIES_ROWS, + ) + else: + conn.close() + raise ValueError("mode must be 'insert_missing' or 'upsert'") + + conn.commit() + conn.close() + + +def main() -> None: + seed(DB_PATH, mode=SEED_MODE) + + lotus.settings.configure(lm=LM(model="gpt-4o-mini")) + + movies = DataConnector.load_from_db( + f"sqlite:///{DB_PATH}", + query="SELECT id, title, description FROM movies", + ) + cats = DataConnector.load_from_db( + f"sqlite:///{DB_PATH}", + query="SELECT id, category FROM categories", + ) + + # provenance == DB primary keys + movies = movies.set_index("id") + cats = cats.set_index("id") + + out = movies.sem_join( + cats, + "the {title} belongs to the {category}.", + return_provenance=True, + provenance_left_col="movie_id", + provenance_right_col="category_id", + ) + + print(f"\nDB location: {DB_PATH}") + print("\n=== SEM_JOIN OUTPUT ===") + print(out) + + +if __name__ == "__main__": + main() diff --git a/examples/db_examples/sql_sem_map.py b/examples/db_examples/sql_sem_map.py new file mode 100644 index 00000000..d6724e6e --- /dev/null +++ b/examples/db_examples/sql_sem_map.py @@ -0,0 +1,119 @@ +from __future__ import annotations + +import sqlite3 +from pathlib import Path + +import lotus +from lotus.data_connectors import DataConnector +from lotus.models import LM + +# Always place DB next to this file (stable, no “random working dir” DBs) +DB_PATH = Path(__file__).resolve().with_name("example_papers.db") + +# Choose behavior: +# - "insert_missing": only insert rows whose id doesn't exist (fast, no changes to existing) +# - "upsert": insert new rows and update title/abstract for existing ids +SEED_MODE = "insert_missing" + + +ROWS = [ + ( + 100, + "Quantum Networks", + "This paper explores quantum entanglement to build distributed communication networks with improved security.", + ), + ( + 101, + "AI Ethics", + "We discuss fairness, accountability, and transparency challenges in autonomous AI systems deployed at scale.", + ), + ( + 102, + "Climate Modeling", + "This study models long-term climate prediction using deep learning simulation techniques and uncertainty estimation.", + ), + ( + 103, + "Database Optimization", + "We propose indexing strategies and query rewriting methods to reduce latency for analytical workloads.", + ), +] + + +def seed(db_path: Path, rows: list[tuple[int, str, str]], mode: str = "insert_missing") -> None: + db_path.parent.mkdir(parents=True, exist_ok=True) + + conn = sqlite3.connect(str(db_path)) + cur = conn.cursor() + + cur.execute( + """ + CREATE TABLE IF NOT EXISTS papers ( + id INTEGER PRIMARY KEY, + title TEXT NOT NULL, + abstract TEXT NOT NULL + ) + """ + ) + + if mode == "insert_missing": + # Only insert new ids; do not overwrite existing rows + cur.executemany( + """ + INSERT OR IGNORE INTO papers (id, title, abstract) + VALUES (?, ?, ?) + """, + rows, + ) + elif mode == "upsert": + # Insert new ids; update existing ids + cur.executemany( + """ + INSERT INTO papers (id, title, abstract) + VALUES (?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + title = excluded.title, + abstract = excluded.abstract + """, + rows, + ) + else: + conn.close() + raise ValueError("mode must be 'insert_missing' or 'upsert'") + + conn.commit() + conn.close() + + +def main() -> None: + seed(DB_PATH, ROWS, mode=SEED_MODE) + + lotus.settings.configure(lm=LM(model="gpt-4o-mini")) + + df = DataConnector.load_from_db( + f"sqlite:///{DB_PATH}", + query="SELECT id, title, abstract FROM papers", + ) + + # make provenance equal DB primary key + df = df.set_index("id") + + out = df.sem_map( + "Summarize {abstract} in one concise sentence.", + return_provenance=True, + provenance_col="paper_id", + track_pipeline=True, + op_name="sql_sem_map", + progress_bar_desc="SQL SemMap", + ) + + print(f"\nDB location: {DB_PATH}") + print("\n=== SEM_MAP OUTPUT ===") + print(out) + + print("\n=== PIPELINE PROVENANCE (attrs['_prov']) ===") + print(out.attrs.get("_prov", [])) + + +if __name__ == "__main__": + main() diff --git a/lotus/sem_ops/sem_join.py b/lotus/sem_ops/sem_join.py index 14a4881a..ba49120c 100644 --- a/lotus/sem_ops/sem_join.py +++ b/lotus/sem_ops/sem_join.py @@ -13,6 +13,15 @@ from .sem_filter import sem_filter +def _unique_col_name(df: pd.DataFrame, base: str) -> str: + if base not in df.columns: + return base + i = 1 + while f"{base}_{i}" in df.columns: + i += 1 + return f"{base}_{i}" + + def sem_join( l1: pd.Series, l2: pd.Series, @@ -33,69 +42,10 @@ def sem_join( ) -> SemanticJoinOutput: """ Joins two pandas Series using a language model based on semantic similarity. - - This function performs a semantic join between two Series by evaluating each - pair of elements using a natural language instruction. It returns pairs that - satisfy the join condition as determined by the language model. - - Args: - l1 (pd.Series): The left Series to join. Contains the first set of - elements to be compared. - l2 (pd.Series): The right Series to join. Contains the second set of - elements to be compared. - ids1 (list[int]): The IDs corresponding to elements in l1. Used to - track which elements match in the join results. - ids2 (list[int]): The IDs corresponding to elements in l2. Used to - track which elements match in the join results. - col1_label (str): The label/name for the first column. Used in - formatting the input to the language model. - col2_label (str): The label/name for the second column. Used in - formatting the input to the language model. - model (lotus.models.LM): The language model instance to use for - evaluating join conditions. Must be properly configured. - user_instruction (str): The natural language instruction that defines - the join condition. Should describe when two elements should be - considered a match. - examples_multimodal_data (list[dict[str, Any]] | None, optional): Example - pairs for few-shot learning. Each example should contain both - left and right elements. Defaults to None. - examples_answers (list[bool] | None, optional): Expected boolean outputs - for the example pairs. Should have the same length as - examples_multimodal_data. Defaults to None. - cot_reasoning (list[str] | None, optional): Chain-of-thought reasoning - for the example pairs. Used when strategy includes COT reasoning. - Defaults to None. - default (bool, optional): The default value to use when the model - output cannot be parsed as a boolean. Defaults to True. - strategy (ReasoningStrategy | None, optional): The reasoning strategy - to use. Can be None, COT, or ZS_COT. Defaults to None. - safe_mode (bool, optional): Whether to enable safe mode with cost - estimation. Defaults to False. - show_progress_bar (bool, optional): Whether to show a progress bar - during processing. Defaults to True. - progress_bar_desc (str, optional): Description for the progress bar. - Defaults to "Join comparisons". - - Returns: - SemanticJoinOutput: An object containing the join results (matching pairs), - filter outputs, raw outputs, and explanations (if applicable). - - Raises: - ValueError: If the model is not properly configured or if there are - issues with the input parameters. - - Example: - >>> l1 = pd.Series(['Machine learning', 'Data science']) - >>> l2 = pd.Series(['AI', 'Statistics']) - >>> model = LM(model="gpt-4o") - >>> result = sem_join(l1, l2, [0, 1], [0, 1], 'left', 'right', - ... model, "Are these topics related?") - >>> print(result.join_results) # List of matching pairs """ filter_outputs = [] all_raw_outputs = [] all_explanations = [] - join_results = [] left_multimodal_data = task_instructions.df2multimodal_info(l1.to_frame(col1_label), [col1_label]) @@ -118,6 +68,7 @@ def sem_join( estimated_total_cost = estimated_tokens_per_call * estimated_total_calls print("Sem_Join:") show_safe_mode(estimated_total_cost, estimated_total_calls) + if show_progress_bar: pbar = tqdm( total=len(l1) * len(l2), @@ -157,8 +108,8 @@ def sem_join( join_results.extend( [ (all_ids1[i], all_ids2[i], explanation) - for i, (output, explanation) in enumerate(zip(outputs, explanations)) - if output + for i, (out_i, explanation) in enumerate(zip(outputs, explanations)) + if out_i ] ) @@ -197,38 +148,7 @@ def sem_join_cascade( safe_mode: bool = False, ) -> SemanticJoinOutput: """ - Joins two series using a cascade helper model and a oracle model. - - Args: - l1 (pd.Series): The first series. - l2 (pd.Series): The second series. - ids1 (list[int]): The ids for the first series. - ids2 (list[int]): The ids for the second series. - col1_label (str): The label for the first column. - col2_label (str): The label for the second column. - user_instruction (str): The user instruction for join. - cascade_args (CascadeArgs): The cascade arguments. - examples_multimodal_data (list[dict[str, Any]] | None): The examples multimodal data. Defaults to None. - examples_answers (list[bool] | None): The answers for examples. Defaults to None. - map_instruction (str | None): The map instruction. Defaults to None. - map_examples (pd.DataFrame | None): The map examples. Defaults to None. - cot_reasoning (list[str] | None): The reasoning for CoT. Defaults to None. - default (bool): The default value for the join in case of parsing errors. Defaults to True. - strategy (str | None): The reasoning strategy. Defaults to None. - - Returns: - SemanticJoinOutput: The join results, filter outputs, all raw outputs, all explanations, and stats. - - Note that filter_outputs, all_raw_outputs, and all_explanations are empty list because - the helper model do not generate these outputs. - - SemanticJoinOutput.stats: - join_resolved_by_helper_model: total number of join records resolved by the helper model - join_helper_positive: number of high confidence positive results from the helper model - join_helper_negative: number of high confidence negative results from the helper model - join_resolved_by_large_model: total number of joins resolved by the oracle model - optimized_join_cost: number of LM calls from finding optimal join plan - total_LM_calls: the total number of LM calls from join cascade, ie: optimized_join_cost + join_resolved_by_helper_model + Joins two series using a cascade helper model and an oracle model. """ filter_outputs: list[bool] = [] all_raw_outputs: list[str] = [] @@ -238,7 +158,6 @@ def sem_join_cascade( num_helper = 0 num_large = 0 - # Determine the join plan helper_high_conf, helper_low_conf, num_helper_high_conf_neg, join_optimization_cost = join_optimizer( l1, l2, @@ -260,10 +179,8 @@ def sem_join_cascade( num_large = len(helper_low_conf) if safe_mode: - # TODO: implement safe mode lotus.logger.warning("Safe mode is not implemented yet.") - # Accept helper results with high confidence join_results = [(row["_left_id"], row["_right_id"], None) for _, row in helper_high_conf.iterrows()] pbar = tqdm( @@ -271,7 +188,7 @@ def sem_join_cascade( desc="Running predicate evals with oracle model", bar_format="{l_bar}{bar} {n}/{total} LM calls [{elapsed}<{remaining}, {rate_fmt}{postfix}]", ) - # Send low confidence rows to large LM + left_multimodal_data = task_instructions.df2multimodal_info( helper_low_conf[[col1_label]].drop_duplicates(), [col1_label] ) @@ -306,15 +223,14 @@ def sem_join_cascade( join_results.extend( [ (all_ids1[i], all_ids2[i], explanation) - for i, (output, explanation) in enumerate(zip(output.outputs, output.explanations)) - if output + for i, (out_i, explanation) in enumerate(zip(output.outputs, output.explanations)) + if out_i ] ) lotus.logger.debug(f"outputs: {filter_outputs}") lotus.logger.debug(f"explanations: {all_explanations}") - # Log join cascade stats: stats = { "join_resolved_by_helper_model": num_helper + num_helper_high_conf_neg, "join_helper_positive": num_helper, @@ -335,33 +251,22 @@ def sem_join_cascade( def run_sem_sim_join(l1: pd.Series, l2: pd.Series, col1_label: str, col2_label: str) -> pd.DataFrame: """ - Wrapper function to run sem_sim_join in sem_join then calibrate the scores for approximate join - - Args: - l1 (pd.Series): The first series. - l2 (pd.Series): The second series. - col1_label (str): The label for the first column. - col2_label (str): The label for the second column. - - Returns: - pd.DataFrame: The similarity join results. + Wrapper function to run sem_sim_join in sem_join then calibrate the scores for approximate join. """ - # Transform the series into DataFrame if isinstance(l1, pd.Series): l1_df = l1.to_frame(name=col1_label) elif isinstance(l1, pd.DataFrame): l1_df = l1 else: lotus.logger.error("l1 must be a pandas Series or DataFrame") + raise ValueError("l1 must be a pandas Series or DataFrame") l2_df = l2.to_frame(name=col2_label) l2_df = l2_df.sem_index(col2_label, f"{col2_label}_index") K = len(l2) - # Run sem_sim_join as helper on the sampled data out = l1_df.sem_sim_join(l2_df, left_on=col1_label, right_on=col2_label, K=K, keep_index=True) - # Correct helper scores out["_scores"] = calibrate_sem_sim_join(out["_scores"].tolist()) return out @@ -375,16 +280,6 @@ def map_l1_to_l2( ) -> tuple[pd.DataFrame, str]: """ Wrapper function to run sem_map in sem_join. - - Args: - l1 (pd.Series): The first series. - col1_label (str): The label for the first column. - col2_label (str): The label for the second column. - map_instruction (str): The map instruction. Defaults to None. - map_examples (pd.DataFrame): The map examples. Defaults to None. - - Returns: - tuple[pd.DataFrame, str]: The mapped DataFrame and the mapped column name. """ if ":left" in col1_label: real_left_on = col1_label.split(":left")[0] @@ -396,18 +291,17 @@ def map_l1_to_l2( else: real_right_on = col2_label - inst = "" if map_instruction: inst = map_instruction else: - default_map_instruction = f"Given {{{real_left_on}}}, identify the most relevant {real_right_on}. Always write your answer as a list of 2-10 comma-separated {real_right_on}." - inst = default_map_instruction + inst = ( + f"Given {{{real_left_on}}}, identify the most relevant {real_right_on}. " + f"Always write your answer as a list of 2-10 comma-separated {real_right_on}." + ) - # Transform l1 into DataFrame for sem_map l1_df = l1.to_frame(name=real_left_on) mapped_col1_name = f"_{col1_label}" - # Map l1 to l2 out = l1_df.sem_map(inst, suffix=mapped_col1_name, examples=map_examples, progress_bar_desc="Mapping examples") out = out.rename(columns={real_left_on: col1_label}) @@ -433,33 +327,10 @@ def join_optimizer( """ Find most cost-effective join plan between Search-Filter and Map-Search-Filter while satisfying the recall and precision target. - - Args: - l1 (pd.Series): The first series. - l2 (pd.Series): The second series. - col1_label (str): The label for the first column. - col2_label (str): The label for the second column. - user_instruction (str): The user instruction for join. - cascade_args (CascadeArgs): The cascade arguments. - examples_multimodal_data (list[dict[str, Any]] | None): The examples multimodal data. Defaults to None. - examples_answers (list[bool] | None): The answers for examples. Defaults to None. - map_instruction (str | None): The map instruction. Defaults to None. - map_examples (pd.DataFrame | None): The map examples. Defaults to None. - cot_reasoning (list[str] | None): The reasoning for CoT. Defaults to None. - default (bool): The default value for the join in case of parsing errors. Defaults to True. - strategy (str | None): The reasoning strategy. Defaults to None. - - returns: - tuple[pd.DataFrame, pd.DataFrame]: The high confidence and low confidence join results. - int: The number of high confidence negative results. - int: The number of LM calls from optimizing join plan. """ - - # Helper is currently default to similiarity join if lotus.settings.helper_lm is not None: lotus.logger.debug("Helper model is not supported yet. Default to similarity join.") - # Learn search-filter thresholds sf_helper_join = run_sem_sim_join(l1, l2, col1_label, col2_label) sf_t_pos, sf_t_neg, sf_learn_cost = learn_join_cascade_threshold( sf_helper_join, @@ -479,7 +350,6 @@ def join_optimizer( sf_low_conf = sf_helper_join[(sf_helper_join["_scores"] < sf_t_pos) & (sf_helper_join["_scores"] > sf_t_neg)] sf_cost = len(sf_low_conf) - # Learn map-search-filter thresholds mapped_l1, mapped_col1_label = map_l1_to_l2( l1, col1_label, col2_label, map_instruction=map_instruction, map_examples=map_examples ) @@ -501,9 +371,8 @@ def join_optimizer( msf_high_conf_neg = len(msf_helper_join[msf_helper_join["_scores"] <= msf_t_neg]) msf_low_conf = msf_helper_join[(msf_helper_join["_scores"] < msf_t_pos) & (msf_helper_join["_scores"] > msf_t_neg)] msf_cost = len(msf_low_conf) - msf_learn_cost += len(l1) # cost from map l1 to l2 + msf_learn_cost += len(l1) - # Select the cheaper join plan lotus.logger.info("Join Optimizer: plan cost analysis:") lotus.logger.info(f" Search-Filter: {sf_cost} LLM calls.") lotus.logger.info( @@ -541,26 +410,8 @@ def learn_join_cascade_threshold( strategy: ReasoningStrategy | None = None, ) -> tuple[float, float, int]: """ - Extract a small sample of the data and find the optimal threshold pair that satisfies the recall and - precision target. - - Args: - helper_join (pd.DataFrame): The helper join results. - cascade_args (CascadeArgs): The cascade arguments. - col1_label (str): The label for the first column. - col2_label (str): The label for the second column. - model (lotus.models.LM): The language model. - user_instruction (str): The user instruction for join. - cascade_args (CascadeArgs): The cascade arguments. - examples_multimodal_data (list[dict[str, Any]] | None): The examples multimodal data. Defaults to None. - examples_answers (list[bool] | None): The answers for examples. Defaults to None. - cot_reasoning (list[str] | None): The reasoning for CoT. Defaults to None. - default (bool): The default value for the join in case of parsing errors. Defaults to True. - strategy (str | None): The reasoning strategy. Defaults to None. - Returns: - tuple: The positive threshold, negative threshold, and the number of LM calls from learning thresholds. + Extract a small sample of the data and find the optimal threshold pair that satisfies the recall and precision target. """ - # Sample a small subset of the helper join result helper_scores = helper_join["_scores"].tolist() sample_indices, correction_factors = importance_sampling(helper_scores, cascade_args) @@ -605,58 +456,6 @@ def learn_join_cascade_threshold( @pd.api.extensions.register_dataframe_accessor("sem_join") class SemJoinDataframe: - """ - Applies semantic join over a dataframe. - - Args: - other (pd.DataFrame | pd.Series): The other dataframe or series to join with. - join_instruction (str): The user instruction for join. - return_explanations (bool): Whether to return explanations. Defaults to False. - how (str): The type of join to perform. Defaults to "inner". - suffix (str): The suffix for the new columns. Defaults to "_join". - examples (pd.DataFrame | None): The examples dataframe. Defaults to None. - strategy (str | None): The reasoning strategy. Defaults to None. - default (bool): The default value for the join in case of parsing errors. Defaults to True. - cascade_args (CascadeArgs | None): The arguments for join cascade. Defaults to None. - recall_target (float | None): The target recall. Defaults to None. - precision_target (float | None): The target precision when cascading. Defaults to None. - sampling_percentage (float): The percentage of the data to sample when cascading. Defaults to 0.1. - failure_probability (float): The failure probability when cascading. Defaults to 0.2. - map_instruction (str): The map instruction when cascading. Defaults to None. - map_examples (pd.DataFrame): The map examples when cascading. Defaults to None. - return_stats (bool): Whether to return stats. Defaults to False. - - Returns: - pd.DataFrame: The dataframe with the new joined columns. - - Example: - >>> import pandas as pd - >>> import lotus - >>> from lotus.models import LM - >>> lotus.settings.configure(lm=LM(model="gpt-4o-mini")) - - >>> df1 = pd.DataFrame({ - 'article': ['Machine learning tutorial', 'Data science guide', 'Python basics', 'AI in finance', 'Cooking healthy food', "Recipes for the holidays"], - }) - >>> df2 = pd.DataFrame({ - 'category': ['Computer Science', 'AI', 'Cooking'], - }) - - >>> df1.sem_join(df2, "the {article} belongs to the {category}.") - Join comparisons: 100%|█████████████████████████████████████████████████████████ 18/18 LM Calls [00:05<00:00, 3.57it/s] - article category - 0 Machine learning tutorial Computer Science - 0 Machine learning tutorial AI - 1 Data science guide Computer Science - 1 Data science guide AI - 2 Python basics Computer Science - 2 Python basics AI - 3 AI in finance Computer Science - 3 AI in finance AI - 4 Cooking healthy food Cooking - 5 Recipes for the holidays Cooking - """ - def __init__(self, pandas_obj: Any): self._validate(pandas_obj) self._obj = pandas_obj @@ -681,6 +480,9 @@ def __call__( return_stats: bool = False, safe_mode: bool = False, progress_bar_desc: str = "Join comparisons", + return_provenance: bool = False, + provenance_left_col: str = "provenance_left_index", + provenance_right_col: str = "provenance_right_index", ) -> pd.DataFrame: model = lotus.settings.lm if model is None: @@ -699,6 +501,9 @@ def __call__( cols = lotus.nl_expression.parse_cols(join_instruction) left_on = None right_on = None + real_left_on = None + real_right_on = None + for col in cols: if ":left" in col: left_on = col @@ -712,22 +517,21 @@ def __call__( if col in self._obj.columns: left_on = col real_left_on = col - if col in other.columns: raise ValueError("Column found in both dataframes") break + if right_on is None: for col in cols: if col in other.columns: right_on = col real_right_on = col - if col in self._obj.columns: raise ValueError("Column found in both dataframes") break - assert left_on is not None, "Column not found in left dataframe" - assert right_on is not None, "Column not found in right dataframe" + assert left_on is not None and real_left_on is not None, "Column not found in left dataframe" + assert right_on is not None and real_right_on is not None, "Column not found in right dataframe" examples_multimodal_data = None examples_answers = None @@ -741,7 +545,16 @@ def __call__( return_explanations = True cot_reasoning = examples["Reasoning"].tolist() - num_full_join = len(self._obj) * len(other) + df1 = self._obj.copy() + df2 = other.copy() + + left_id_col = _unique_col_name(df1, "_left_id") + right_id_col = _unique_col_name(df2, "_right_id") + + df1[left_id_col] = df1.index + df2[right_id_col] = df2.index + + num_full_join = len(df1) * len(df2) if ( (cascade_args is not None) @@ -752,11 +565,12 @@ def __call__( cascade_args.precision_target = ( 1.0 if cascade_args.precision_target is None else cascade_args.precision_target ) + output = sem_join_cascade( - self._obj[real_left_on], - other[real_right_on], - self._obj.index, - other.index, + df1[real_left_on], + df2[real_right_on], + df1[left_id_col].tolist(), + df2[right_id_col].tolist(), left_on, right_on, model, @@ -773,10 +587,10 @@ def __call__( ) else: output = sem_join( - self._obj[real_left_on], - other[real_right_on], - self._obj.index, - other.index, + df1[real_left_on], + df2[real_right_on], + df1[left_id_col].tolist(), + df2[right_id_col].tolist(), left_on, right_on, model, @@ -789,33 +603,32 @@ def __call__( safe_mode=safe_mode, progress_bar_desc=progress_bar_desc, ) - join_results = output.join_results - all_raw_outputs = output.all_raw_outputs - lotus.logger.debug(f"join_results: {join_results}") - lotus.logger.debug(f"all_raw_outputs: {all_raw_outputs}") + join_results = output.join_results - df1 = self._obj.copy() - df2 = other.copy() - df1["_left_id"] = self._obj.index - df2["_right_id"] = other.index - # add suffix to column names - for col in df1.columns: - if col in df2.columns: - df1.rename(columns={col: col + ":left"}, inplace=True) - df2.rename(columns={col: col + ":right"}, inplace=True) + for c in list(df1.columns): + if c in df2.columns and c not in (left_id_col, right_id_col): + df1.rename(columns={c: c + ":left"}, inplace=True) + df2.rename(columns={c: c + ":right"}, inplace=True) if return_explanations: - temp_df = pd.DataFrame(join_results, columns=["_left_id", "_right_id", f"explanation{suffix}"]) + temp_df = pd.DataFrame(join_results, columns=[left_id_col, right_id_col, f"explanation{suffix}"]) else: - temp_df = pd.DataFrame([(jr[0], jr[1]) for jr in join_results], columns=["_left_id", "_right_id"]) + temp_df = pd.DataFrame([(jr[0], jr[1]) for jr in join_results], columns=[left_id_col, right_id_col]) joined_df = ( - df1.join(temp_df.set_index("_left_id"), how="right", on="_left_id") - .join(df2.set_index("_right_id"), how="left", on="_right_id") - .drop(columns=["_left_id", "_right_id"]) + df1.join(temp_df.set_index(left_id_col), how="right", on=left_id_col) + .join(df2.set_index(right_id_col), how="left", on=right_id_col) ) + if return_provenance: + if provenance_left_col in joined_df.columns or provenance_right_col in joined_df.columns: + raise ValueError("Provenance column name already exists in output. Use different names.") + joined_df[provenance_left_col] = joined_df[left_id_col] + joined_df[provenance_right_col] = joined_df[right_id_col] + + joined_df = joined_df.drop(columns=[left_id_col, right_id_col]) + if output.stats and return_stats: return joined_df, output.stats diff --git a/lotus/sem_ops/sem_map.py b/lotus/sem_ops/sem_map.py index 2c8b7074..8b7b47eb 100644 --- a/lotus/sem_ops/sem_map.py +++ b/lotus/sem_ops/sem_map.py @@ -1,4 +1,7 @@ +from __future__ import annotations + from typing import Any, Callable +import time import pandas as pd @@ -11,6 +14,32 @@ from .postprocessors import map_postprocess +def _unique_col_name(df: pd.DataFrame, base: str) -> str: + if base not in df.columns: + return base + i = 1 + while f"{base}_{i}" in df.columns: + i += 1 + return f"{base}_{i}" + + +def _get_lm_model_name() -> str: + lm = getattr(lotus.settings, "lm", None) + if lm is None: + return "unconfigured" + return str(getattr(lm, "model", "unknown")) + + +def _append_pipeline_prov(df: pd.DataFrame, entry: dict[str, Any]) -> None: + prov = df.attrs.get("_prov") + if prov is None: + prov = [] + elif not isinstance(prov, list): + prov = [prov] + prov.append(entry) + df.attrs["_prov"] = prov + + def sem_map( docs: list[dict[str, Any]], model: lotus.models.LM, @@ -25,58 +54,7 @@ def sem_map( progress_bar_desc: str = "Mapping", **model_kwargs: Any, ) -> SemanticMapOutput: - """ - Maps a list of documents to a list of outputs using a language model. - - This function applies a natural language instruction to each document in the - input list, transforming them into new outputs. It supports few-shot learning - through examples and various reasoning strategies including chain-of-thought. - - Args: - docs (list[dict[str, Any]]): The list of documents to map. Each document - should be a dictionary containing multimodal information (text, images, etc.). - model (lotus.models.LM): The language model instance to use for mapping. - Must be properly configured with appropriate API keys and settings. - user_instruction (str): The natural language instruction that guides the - mapping process. This instruction tells the model how to transform - each input document. - system_prompt (str | None, optional): The system prompt to use. - postprocessor (Callable, optional): A function to post-process the model - outputs. Should take (outputs, model, use_cot) and return - SemanticMapPostprocessOutput. Defaults to map_postprocess. - examples_multimodal_data (list[dict[str, Any]] | None, optional): Example - documents for few-shot learning. Each example should have the same - structure as the input docs. Defaults to None. - examples_answers (list[str] | None, optional): Expected outputs for the - example documents. Should have the same length as examples_multimodal_data. - Defaults to None. - cot_reasoning (list[str] | None, optional): Chain-of-thought reasoning - for the example documents. Used when strategy includes COT reasoning. - Defaults to None. - strategy (ReasoningStrategy | None, optional): The reasoning strategy to use. - Can be None, COT, or ZS_COT. Defaults to None. - safe_mode (bool, optional): Whether to enable safe mode with cost estimation. - Defaults to False. - progress_bar_desc (str, optional): Description for the progress bar. - Defaults to "Mapping". - **model_kwargs: Any: Additional keyword arguments to pass to the model. - Returns: - SemanticMapOutput: An object containing the processed outputs, raw outputs, - and explanations (if applicable). - - Raises: - ValueError: If the model is not properly configured or if there are - issues with the input parameters. - - Example: - >>> docs = [{"text": "Document 1"}, {"text": "Document 2"}] - >>> model = LM(model="gpt-4o") - >>> result = sem_map(docs, model, "Summarize the text in one sentence") - >>> print(result.outputs) - """ - - # prepare model inputs - inputs = [] + inputs: list[list[dict[str, Any]]] = [] for doc in docs: prompt = task_instructions.map_formatter( model, @@ -88,26 +66,18 @@ def sem_map( strategy=strategy, system_prompt=system_prompt, ) - lotus.logger.debug(f"input to model: {prompt}") - lotus.logger.debug(f"inputs content to model: {[x.get('content') for x in prompt]}") inputs.append(prompt) - # check if safe_mode is enabled if safe_mode: - estimated_cost = sum(model.count_tokens(input) for input in inputs) + estimated_cost = sum(model.count_tokens(inp) for inp in inputs) estimated_LM_calls = len(docs) show_safe_mode(estimated_cost, estimated_LM_calls) - # call model lm_output: LMOutput = model(inputs, progress_bar_desc=progress_bar_desc, **model_kwargs) - # post process results - postprocess_output = postprocessor( - lm_output.outputs, model, strategy in [ReasoningStrategy.COT, ReasoningStrategy.ZS_COT] - ) - lotus.logger.debug(f"raw_outputs: {lm_output.outputs}") - lotus.logger.debug(f"outputs: {postprocess_output.outputs}") - lotus.logger.debug(f"explanations: {postprocess_output.explanations}") + use_cot = strategy in (ReasoningStrategy.COT, ReasoningStrategy.ZS_COT) + postprocess_output = postprocessor(lm_output.outputs, model, use_cot) + if safe_mode: model.print_total_usage() @@ -120,94 +90,12 @@ def sem_map( @pd.api.extensions.register_dataframe_accessor("sem_map") class SemMapDataframe: - """ - Apply semantic mapping over a DataFrame. - - This method performs semantic mapping on the DataFrame content using - a natural language instruction. It can process specific columns identified - in the instruction and supports few-shot learning through examples. - - Args: - user_instruction (str): The natural language instruction that guides - the mapping process. Should describe how to transform each row. - system_prompt (str | None, optional): The system prompt to use. - postprocessor (Callable, optional): A function to post-process the model - outputs. Should take (outputs, model, use_cot) and return - SemanticMapPostprocessOutput. Defaults to map_postprocess. - return_explanations (bool, optional): Whether to include explanations - in the output DataFrame. Useful for debugging and understanding - model reasoning, when strategy is COT or ZS_COT. Defaults to False. - return_raw_outputs (bool, optional): Whether to include raw model - outputs in the output DataFrame. Useful for debugging. - Defaults to False. - suffix (str, optional): The suffix for the output column names. - Defaults to "_map". - examples (pd.DataFrame | None, optional): Example DataFrame for - few-shot learning. Should have the same column structure as the - input DataFrame plus an "Answer" column. Defaults to None. - strategy (ReasoningStrategy | None, optional): The reasoning strategy - to use. Can be None, COT, or ZS_COT. Defaults to None. - safe_mode (bool, optional): Whether to enable safe mode with cost - estimation. Defaults to False. - progress_bar_desc (str, optional): Description for the progress bar. - Defaults to "Mapping". - **model_kwargs: Any: Additional keyword arguments to pass to the model. - - Returns: - pd.DataFrame: A DataFrame containing the original data plus the mapped - outputs. Additional columns may be added for explanations and raw - outputs if requested. - - Raises: - ValueError: If the language model is not configured, if specified - columns don't exist in the DataFrame, or if the examples DataFrame - doesn't have the required "Answer" column. - - Example: - >>> import pandas as pd - >>> import lotus - >>> from lotus.models import LM, SentenceTransformersRM - >>> lotus.settings.configure(lm=LM(model="gpt-4o-mini")) - >>> df = pd.DataFrame({ - ... 'document': ['Harry is happy and love cats', 'Harry is feeling nauseous'] - ... }) - # Example 1: simple mapping - >>> result1 = df.sem_map("Label the sentiment of Harry in the {document} as positive/negative/neutral. Answer in one word.") - Mapping: 100%|████████████████████████████████████████████████████████████████████ 2/2 LM calls [00:00<00:00, 3.18it/s] - document _map - 0 Harry is happy and love cats Positive - 1 Harry is feeling nauseous Negative - - # Example 2: with zero-shot chain-of-thought (ZS-COT) reasoning - >>> from lotus.types import ReasoningStrategy - >>> df.sem_map("Label the sentiment of Harry in the {document} as positive/negative/neutral. Answer in one word.", return_explanations=True, strategy=ReasoningStrategy.ZS_COT) - Mapping: 100%|████████████████████████████████████████████████████████████████████ 2/2 LM calls [00:02<00:00, 1.04s/it] - document _map explanation_map - 0 Harry is happy and love cats positive Reasoning: The document states that "Harry is ... - 1 Harry is feeling nauseous negative Reasoning: The phrase "Harry is feeling nauseo... - """ - def __init__(self, pandas_obj: pd.DataFrame): - """ - Initialize the semantic mapping accessor. - - Args: - pandas_obj (pd.DataFrame): The pandas DataFrame object to attach the accessor to. - """ self._validate(pandas_obj) self._obj = pandas_obj @staticmethod def _validate(obj: pd.DataFrame) -> None: - """ - Validate that the object is a pandas DataFrame. - - Args: - obj (pd.DataFrame): The object to validate. - - Raises: - AttributeError: If the object is not a pandas DataFrame. - """ if not isinstance(obj, pd.DataFrame): raise AttributeError("Must be a DataFrame") @@ -224,6 +112,10 @@ def __call__( strategy: ReasoningStrategy | None = None, safe_mode: bool = False, progress_bar_desc: str = "Mapping", + return_provenance: bool = False, + provenance_col: str = "provenance_index", + track_pipeline: bool = False, + op_name: str = "sem_map", **model_kwargs: Any, ) -> pd.DataFrame: if lotus.settings.lm is None: @@ -231,28 +123,37 @@ def __call__( "The language model must be an instance of LM. Please configure a valid language model using lotus.settings.configure()" ) - col_li = lotus.nl_expression.parse_cols(user_instruction) + start_t = time.time() + rows_in = len(self._obj) - # check that column exists + col_li = lotus.nl_expression.parse_cols(user_instruction) for column in col_li: if column not in self._obj.columns: raise ValueError(f"Column {column} not found in DataFrame") - multimodal_data = task_instructions.df2multimodal_info(self._obj, col_li) + df_in = self._obj.copy() + + tmp_prov = None + if return_provenance or track_pipeline: + tmp_prov = _unique_col_name(df_in, "__lotus_prov_index__") + df_in[tmp_prov] = df_in.index + + multimodal_data = task_instructions.df2multimodal_info(df_in, col_li) formatted_usr_instr = lotus.nl_expression.nle2str(user_instruction, col_li) examples_multimodal_data = None examples_answers = None cot_reasoning = None - if examples is not None: - assert "Answer" in examples.columns, "Answer must be a column in examples dataframe" + if "Answer" not in examples.columns: + raise ValueError("Answer must be a column in examples dataframe") examples_multimodal_data = task_instructions.df2multimodal_info(examples, col_li) examples_answers = examples["Answer"].tolist() - if strategy == ReasoningStrategy.COT or strategy == ReasoningStrategy.ZS_COT: + if strategy in (ReasoningStrategy.COT, ReasoningStrategy.ZS_COT): return_explanations = True - cot_reasoning = examples["Reasoning"].tolist() + if "Reasoning" in examples.columns: + cot_reasoning = examples["Reasoning"].tolist() output = sem_map( multimodal_data, @@ -269,11 +170,34 @@ def __call__( **model_kwargs, ) - new_df = self._obj.copy() - new_df[suffix] = output.outputs + out_df = df_in.copy() + out_df[suffix] = output.outputs + if return_explanations: - new_df["explanation" + suffix] = output.explanations + out_df["explanation" + suffix] = output.explanations if return_raw_outputs: - new_df["raw_output" + suffix] = output.raw_outputs - - return new_df + out_df["raw_output" + suffix] = output.raw_outputs + + if return_provenance: + if provenance_col in out_df.columns: + raise ValueError("provenance_col already exists in output. Use a different name.") + if tmp_prov is None: + tmp_prov = _unique_col_name(out_df, "__lotus_prov_index__") + out_df[tmp_prov] = self._obj.index + out_df.rename(columns={tmp_prov: provenance_col}, inplace=True) + else: + if tmp_prov is not None and tmp_prov in out_df.columns: + out_df.drop(columns=[tmp_prov], inplace=True) + + if track_pipeline: + entry = { + "op": op_name, + "langex": user_instruction, + "rows_in": rows_in, + "rows_out": len(out_df), + "duration_s": round(time.time() - start_t, 4), + "model": _get_lm_model_name(), + } + _append_pipeline_prov(out_df, entry) + + return out_df