Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 198 additions & 0 deletions .github/tests/ast_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
"""Tests for the LOTUS AST module (LazyFrame functionality).

Tests cover practical end-to-end flows:
- Semantic operations through the LazyFrame API
- Pandas operations in lazy pipelines
- Multi-source pipelines (concat, from_fn, merge)
- Execution caching
- Optimization (predicate pushdown)
- Complex mixed pipelines
"""

import os

import pandas as pd
import pytest

import lotus
from lotus.ast import LazyFrame, PandasFilterNode, SemFilterNode, SourceNode
from lotus.ast.optimizer.predicate_pushdown import PredicatePushdownOptimizer
from lotus.models import LM

################################################################################
# Setup
################################################################################
# Set logger level to DEBUG
lotus.logger.setLevel("DEBUG")

# Environment flags to enable/disable tests
ENABLE_OPENAI_TESTS = os.getenv("ENABLE_OPENAI_TESTS", "false").lower() == "true"
ENABLE_OLLAMA_TESTS = os.getenv("ENABLE_OLLAMA_TESTS", "false").lower() == "true"

MODEL_NAME_TO_ENABLED = {
"gpt-4o-mini": ENABLE_OPENAI_TESTS,
"gpt-4o": ENABLE_OPENAI_TESTS,
"ollama/llama3.1": ENABLE_OLLAMA_TESTS,
}
ENABLED_MODEL_NAMES = set([model_name for model_name, is_enabled in MODEL_NAME_TO_ENABLED.items() if is_enabled])


def get_enabled(*candidate_models: str) -> list[str]:
return [model for model in candidate_models if model in ENABLED_MODEL_NAMES]


@pytest.fixture(scope="session")
def setup_models():
models = {}
for model_path in ENABLED_MODEL_NAMES:
models[model_path] = LM(model=model_path)
return models


@pytest.fixture(autouse=True)
def print_usage_after_each_test(setup_models):
yield # this runs the test
models = setup_models
for model_name, model in models.items():
print(f"\nUsage stats for {model_name} after test:")
model.print_total_usage()
model.reset_stats()
model.reset_cache()


################################################################################
# Semantic Operations Tests
################################################################################


@pytest.mark.parametrize("model", get_enabled("gpt-4o-mini", "ollama/llama3.1"))
def test_sem_filter_lazyframe(setup_models, model):
"""Test sem_filter operation on LazyFrame."""
lm = setup_models[model]
lotus.settings.configure(lm=lm)

df = pd.DataFrame({"Text": ["I am really excited!", "I am very sad"]})
lf = LazyFrame(df=df).sem_filter("{Text} is a positive sentiment")
result = lf.execute({})

assert len(result) == 1
assert "I am really excited!" in result["Text"].values
assert "I am very sad" not in result["Text"].values


@pytest.mark.parametrize("model", get_enabled("gpt-4o-mini"))
def test_sem_agg_lazyframe(setup_models, model):
"""Test sem_agg operation on LazyFrame."""
lm = setup_models[model]
lotus.settings.configure(lm=lm)

df = pd.DataFrame({"Text": ["My name is John", "My name is Jane", "My name is John"]})
lf = LazyFrame(df=df).sem_agg("What is the most common name in {Text}?", suffix="output")
result = lf.execute({})

assert len(result) == 1
assert "output" in result.columns
assert "john" in result["output"].iloc[0].lower()


@pytest.mark.parametrize("model", get_enabled("gpt-4o-mini", "ollama/llama3.1"))
def test_sem_join_lazyframe(setup_models, model):
"""Test sem_join operation on LazyFrame."""
lm = setup_models[model]
lotus.settings.configure(lm=lm)

df1 = pd.DataFrame({"School": ["UC Berkeley", "Stanford"]})
df2 = pd.DataFrame({"School Type": ["Public School", "Private School"]})

lf1 = LazyFrame(df=df1)
lf2 = LazyFrame(df=df2)
lf = lf1.sem_join(lf2, "{School} is a {School Type}")
result = lf.execute({lf1: df1, lf2: df2})

joined_pairs = set(zip(result["School"], result["School Type"]))
expected_pairs = {("UC Berkeley", "Public School"), ("Stanford", "Private School")}
assert joined_pairs == expected_pairs


################################################################################
# Multi-Source Pipeline Tests
################################################################################


def test_pipeline_from_fn_basic():
"""Test custom processing with LazyFrame.from_fn."""
df1 = pd.DataFrame({"a": [1, 2]})
df2 = pd.DataFrame({"a": [3, 4]})

def combine(dfs):
return pd.concat(dfs)

p1 = LazyFrame()
p2 = LazyFrame()
combined = LazyFrame.from_fn(combine, [p1, p2])

result = combined.execute({p1: df1, p2: df2})
assert len(result) == 4


def test_multi_source_execution():
"""Test merging a LazyFrame with a static DataFrame."""
left_df = pd.DataFrame({"key": [1, 2], "val": ["a", "b"]})
right_df = pd.DataFrame({"key": [1, 2], "other": ["x", "y"]})

lf = LazyFrame().merge(right_df, on="key")
result = lf.execute(left_df)

assert "other" in result.columns
assert len(result) == 2


################################################################################
# Execution Caching Tests
################################################################################


def test_lazyframe_execution_caching():
"""Test that re-executing a LazyFrameRun uses cached results."""
df = pd.DataFrame({"a": [3, 1, 2]})
lf = LazyFrame(df=df).sort_values("a")

run = lf.run(df)
result1 = run.execute()
stats1 = run.cache_stats

result2 = run.execute()
stats2 = run.cache_stats

pd.testing.assert_frame_equal(result1, result2)
assert stats2["hits"] > stats1["hits"]
assert stats2["misses"] == stats1["misses"]


################################################################################
# Optimization Tests
################################################################################


def test_predicate_pushdown_optimization():
"""Test that predicate pushdown moves pandas filters before sem_filters.

Verifies the optimizer reorders nodes so cheap pandas predicates run
before expensive semantic operations, reducing rows processed by the LLM.
"""
df = pd.DataFrame({"a": [1, 2, 3, 4, 5], "text": ["x", "y", "z", "w", "v"]})

lf = LazyFrame(df=df).sem_filter("{text} is interesting").filter(lambda d: d["a"] > 2)

nodes_before = lf._nodes
assert isinstance(nodes_before[0], SourceNode)
assert isinstance(nodes_before[1], SemFilterNode)
assert isinstance(nodes_before[2], PandasFilterNode)

optimizer = PredicatePushdownOptimizer()
optimized_lf = lf.optimize([optimizer])

nodes_after = optimized_lf._nodes
assert isinstance(nodes_after[0], SourceNode)
assert isinstance(nodes_after[1], PandasFilterNode)
assert isinstance(nodes_after[2], SemFilterNode)
12 changes: 4 additions & 8 deletions .github/tests/lm_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,11 +579,11 @@ def test_format_logprobs_for_filter_cascade(setup_models, model):
]
response = lm(messages, logprobs=True)
formatted_logprobs = lm.format_logprobs_for_filter_cascade(response.logprobs)
true_probs = formatted_logprobs.true_probs
assert len(true_probs) == 1
positive_probs = formatted_logprobs.positive_probs
assert len(positive_probs) == 1

# Very safe (in practice its ~1)
assert true_probs[0] > 0.8
assert positive_probs[0] > 0.8
assert len(formatted_logprobs.tokens) == len(formatted_logprobs.confidences)


Expand Down Expand Up @@ -629,12 +629,8 @@ def test_llm_as_judge(setup_models, model):
}
df = pd.DataFrame(data)
judge_instruction = "Rate the accuracy and completeness of this {answer} to the {question} on a scale of 1-10, where 10 is excellent. Only output the score."
expected_scores = ["8", "1"]
df = df.llm_as_judge(judge_instruction)
assert len(list(df["_judge_0"].values)) == len(expected_scores)
for i in range(len(df)):
assert len(df.iloc[i]["_judge_0"]) >= 1
assert df.iloc[i]["_judge_0"] in expected_scores
assert len(list(df["_judge_0"].values)) == 2


@pytest.mark.parametrize("model", get_enabled("gpt-4o-mini"))
Expand Down
6 changes: 5 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,14 @@ jobs:
file: .github/tests/multimodality_tests.py
timeout: 10
requires-openai: true
- test-suite: ast
file: .github/tests/ast_tests.py
timeout: 10
requires-openai: true
- test-suite: utility_operators
file: .github/tests/utility_operators_tests.py
timeout: 25
extra-deps: pymupdf llama-index docx2txt python-pptx python-magic
extra-deps: pymupdf llama-index docx2txt python-pptx python-magic llama-index-readers-file
- test-suite: cache
file: .github/tests/cache_tests.py
timeout: 10
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ docs/_build

test_env/

.cursor

qdrant_storage/
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ repos:
rev: v1.13.0
hooks:
- id: mypy
args: ["--config-file", "mypy.ini"]
args: ["--config-file", "mypy.ini", "--explicit-package-bases"]
exclude: \.pyi$
additional_dependencies:
- types-setuptools
- pydantic>=2.11.7
Expand Down
18 changes: 18 additions & 0 deletions benchmarks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""LOTUS benchmark registry."""

from . import failure_mode_discovery, llm_as_judge, rag_pubmedqa

_REGISTRY = {
"failure_mode_discovery": failure_mode_discovery,
"llm_as_judge": llm_as_judge,
"rag_pubmedqa": rag_pubmedqa,
}

BENCHMARKS = list(_REGISTRY.keys())


def get_benchmark(name: str):
"""Return the benchmark module for the given name."""
if name not in _REGISTRY:
raise ValueError(f"Unknown benchmark {name!r}. Choose from {BENCHMARKS}")
return _REGISTRY[name]
8 changes: 8 additions & 0 deletions benchmarks/failure_mode_discovery/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from .evaluate import evaluate, make_eval_fn
from .load_data import load_data
from .pipeline import (
SUPPORTS_CASCADE,
build_pipeline,
configure_models,
optimize_pipeline,
)
55 changes: 55 additions & 0 deletions benchmarks/failure_mode_discovery/evaluate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Evaluation for failure mode discovery. Primary metric: coverage."""

import pandas as pd

from .load_data import get_failed_traces


def _compute_coverage(
output_df: pd.DataFrame,
eval_df: pd.DataFrame,
) -> tuple[float, dict]:
generated_modes = (
list(output_df["failure_modes"].dropna().str.strip().values) if "failure_modes" in output_df.columns else []
)
generated_str = "\n".join(f"- {m}" for m in generated_modes) if generated_modes else "(none)"

failed_df = get_failed_traces(eval_df).reset_index(drop=True)
check_df = failed_df[["trace_id", "agent_trace"]].copy()
check_df["failure_list"] = generated_str

covered_df = check_df.sem_filter(
"{agent_trace} contains a failure that is described by " "at least one mode in {failure_list}"
)
n_covered = len(covered_df)
n_total = len(check_df)
coverage = n_covered / n_total if n_total > 0 else 0.0

return coverage, {
"coverage": coverage,
"n_covered": n_covered,
"n_total": n_total,
"n_modes": len(generated_modes),
}


def evaluate(
output_df: pd.DataFrame,
input_df: pd.DataFrame,
oracle_lm,
helper_lm,
) -> dict:
"""Standard evaluation interface. Returns metrics dict."""
_, info = _compute_coverage(output_df, input_df)
cost = oracle_lm.stats.physical_usage.total_cost + helper_lm.stats.physical_usage.total_cost
tokens = oracle_lm.stats.physical_usage.total_tokens + helper_lm.stats.physical_usage.total_tokens
return {**info, "cost_usd": float(cost), "total_tokens": int(tokens)}


def make_eval_fn(train_df: pd.DataFrame):
"""Standard GEPA eval_fn factory."""

def eval_fn(output_df, example=None):
return _compute_coverage(output_df, train_df)

return eval_fn
Loading
Loading