Skip to content

Add per-operator physical implementation hints#295

Open
tareqmahmood wants to merge 2 commits into
mitdbg:mainfrom
tareqmahmood:add-physical-op-hints
Open

Add per-operator physical implementation hints#295
tareqmahmood wants to merge 2 commits into
mitdbg:mainfrom
tareqmahmood:add-physical-op-hints

Conversation

@tareqmahmood
Copy link
Copy Markdown

Summary

Adds a physical= parameter to semantic Dataset operations (sem_filter, sem_map, sem_flat_map, sem_join, sem_agg) that lets users override the optimizer's physical implementation choice per operator.

Today, palimpzest's Cascades optimizer explores all valid physical implementations (LLMFilter, RAGFilter, MixtureOfAgentsFilter, etc.) for each logical operator and picks the best one according to the policy. This works well in general, but there are cases where the user knows which implementation they want — for benchmarking, debugging, cost control, or when the optimizer's choice is suboptimal for a specific workload.

The physical= dict gives users a direct way to pin a specific physical operator class and its constructor kwargs for any semantic operation, while leaving other operators in the query free for the optimizer to handle.

Usage

import palimpzest as pz
from palimpzest.query.operators.convert import LLMConvertBonded

plan = (
    ds
    .sem_map(
        [{"name": "genre", "desc": "The genre", "type": str}],
        physical={"implementation": LLMConvertBonded, "model": pz.Model.GPT_4o_MINI},
    )
    .sem_filter("the genre is Action or Sci-Fi")  # no hint — optimizer picks freely
)

results = plan.run(config)

The physical dict requires an "implementation" key (the physical operator class). All other keys are forwarded as constructor kwargs to that operator, overriding the rule-generated defaults. This means any parameter the operator accepts (model, embedding_model, chunk_size, prompt_strategy, etc.) can be controlled.

Validation at query construction time ensures "implementation" is present and is a class. If the hint filters out all candidates for an operator, a warning is logged.

How it works

The feature touches four files in the optimizer pipeline:

  1. LogicalOperator (logical.py) — stores physical dict, includes it in get_logical_op_params() for copy support, excludes it from get_logical_id_params() so it doesn't affect caching or logical plan identity.

  2. Dataset (dataset.py) — threads physical= through sem_filter, sem_map, sem_flat_map, sem_join, sem_agg into the logical operator constructor.

  3. ImplementationRule._perform_substitution (rules.py) — when building physical operators, extra kwargs from the physical dict are only injected when the physical_op_class matches the requested "implementation". This prevents invalid kwargs from being passed to unrelated operators.

  4. ApplyRule.perform (tasks.py) — after each implementation rule fires, a post-filter discards physical expressions whose exact type doesn't match "implementation". A warning is logged if all candidates from a rule are filtered out.

How it interacts with the optimizer

Every key in the physical dict (beyond "implementation") overrides the corresponding constructor kwarg for all candidates of the matching class. For example, with available_models=[GPT_4o_MINI, GPT_4o]:

# Fully pinned — optimizer has no choice for this operator.
# "model" overrides the rule-generated model on every candidate,
# collapsing them all to a single operator.
physical={"implementation": LLMConvertBonded, "model": pz.Model.GPT_4o, "reasoning_effort": "high"}

# Partially pinned — optimizer can still choose between GPT_4o_MINI and GPT_4o.
physical={"implementation": LLMConvertBonded}

Operators without physical= are completely free — the optimizer explores all implementations and models as usual.

  • With run(): The Cascades optimizer generates all physical candidates per logical operator, then the hint filters and overrides them before costing. The optimizer costs and ranks the survivors normally.

  • With optimize_and_run(): The hint constrains which candidates enter the sentinel plan's MAB sampling. Only matching operators are sampled on the training data, avoiding wasted budget on operators the user doesn't want. After sentinel execution, the final plan selection respects the hint.

  • Transformation rules (filter push-down, convert reorder) are unaffected — the hint travels with the logical operator.

Validation

  • 18 unit tests in tests/pytest/test_hints.py covering filtering logic, propagation, copy, Dataset API, and validation.
  • All pre-existing tests pass unchanged.

Example 1: run() with hinted map

"""Example: physical= hint with .run() — map is hinted, filter is not."""

import palimpzest as pz
from palimpzest.query.operators.convert import LLMConvertBonded

ds = pz.MemoryDataset(id="movies", vals=[
    "The Dark Knight (Action, 152 min)",
    "The Godfather (Crime, 175 min)",
    "Inception (Action, 148 min)",
    "12 Angry Men (Drama, 96 min)",
    "Star Wars (Action, 121 min)",
    "Forrest Gump (Drama, 142 min)",
    "Pulp Fiction (Crime, 154 min)",
    "Interstellar (Adventure, 169 min)",
    "Fight Club (Drama, 139 min)",
    "The Shawshank Redemption (Crime, 142 min)",
])

# sem_map is hinted to LLMConvertBonded + GPT_4o_MINI
# sem_filter is unhinted — optimizer picks freely from available models
plan = (
    ds
    .sem_map(
        [{"name": "genre", "desc": "The genre of the movie", "type": str}],
        physical={"implementation": LLMConvertBonded, "model": pz.Model.GPT_4o_MINI},
    )
    .sem_filter("the genre is Action or Sci-Fi")
)

config = pz.QueryProcessorConfig(
    available_models=[pz.Model.GPT_4o_MINI],
)

results = plan.run(config)

print(f"{len(results)} records:")
for r in results:
    print(f"  genre={r.genre:20s} {r.value}")
print(f"\nPlan:\n{results.executed_plans[0]}")

Output:

3 records:
  genre=Action               Star Wars (Action, 121 min)
  genre=Action               Inception (Action, 148 min)
  genre=Action               The Dark Knight (Action, 152 min)

Plan:
0. Schema['genre', 'value'] -> LLMFilter -> Schema['genre', 'value']
    (genre, value) -> (genre, value)
    Model: openai/gpt-4o-mini-2024-07-18
    Filter: the genre is Action or Sci-Fi

  1. Schema['value'] -> LLMConvertBonded -> Schema['genre', 'value']
    (value) -> (genre, value)
    Model: openai/gpt-4o-mini-2024-07-18
    Prompt Strategy: PromptStrategy.MAP
    Reasoning Effort: default

    2. MarshalAndScanDataOp(...) -> Schema['value']
    (value)

Example 2: optimize_and_run() with hinted map + extra kwargs

"""Example: physical= hint with .optimize_and_run() — map is hinted, filter is not."""

import palimpzest as pz
from palimpzest.query.operators.convert import LLMConvertBonded

ds = pz.MemoryDataset(id="movies", vals=[
    "The Dark Knight (Action, 152 min)",
    "The Godfather (Crime, 175 min)",
    "Inception (Action, 148 min)",
    "12 Angry Men (Drama, 96 min)",
    "Star Wars (Action, 121 min)",
    "Forrest Gump (Drama, 142 min)",
    "Pulp Fiction (Crime, 154 min)",
    "Interstellar (Adventure, 169 min)",
    "Fight Club (Drama, 139 min)",
    "The Shawshank Redemption (Crime, 142 min)",
])

train_ds = pz.MemoryDataset(id="movies", vals=[
    "Alien (Sci-Fi, 117 min)",
    "Titanic (Drama, 195 min)",
    "The Matrix (Action, 136 min)",
    "Goodfellas (Crime, 146 min)",
    "Toy Story (Animation, 81 min)",
])

# sem_map is hinted to LLMConvertBonded + GPT_4o + high reasoning effort
# sem_filter is unhinted — optimizer picks freely from available models
plan = (
    ds
    .sem_map(
        [{"name": "genre", "desc": "The genre of the movie", "type": str}],
        physical={"implementation": LLMConvertBonded, "model": pz.Model.GPT_4o,
                  "reasoning_effort": "high"},
    )
    .sem_filter("the genre is Action or Sci-Fi")
)

config = pz.QueryProcessorConfig(
    policy=pz.MinCost(),
    available_models=[pz.Model.GPT_4o_MINI, pz.Model.GPT_4o],
    sample_budget=5,
)

results = plan.optimize_and_run(
    config=config,
    train_dataset=train_ds,
    validator=pz.Validator(model=pz.Model.GPT_4o),
)

print(f"{len(results)} records:")
for r in results:
    print(f"  genre={r.genre:20s} {r.value}")
print(f"\nPlan:\n{results.executed_plans[0]}")

Output:

Total opt. time: 9.77s
Total opt. cost: $0.0313

3 records:
  genre=Action               Star Wars (Action, 121 min)
  genre=Action               The Dark Knight (Action, 152 min)
  genre=Action               Inception (Action, 148 min)

Plan:
0. Schema['genre', 'value'] -> LLMFilter -> Schema['genre', 'value']
    (genre, value) -> (genre, value)
    Model: openai/gpt-4o-mini-2024-07-18
    Filter: the genre is Action or Sci-Fi

  1. Schema['value'] -> LLMConvertBonded -> Schema['genre', 'value']
    (value) -> (genre, value)
    Model: openai/gpt-4o-2024-08-06
    Prompt Strategy: PromptStrategy.MAP
    Reasoning Effort: high

    2. MarshalAndScanDataOp(...) -> Schema['value']
    (value)

The map used GPT_4o with reasoning_effort: high as hinted. The filter was free to choose — the optimizer picked GPT_4o_MINI (cheaper) after sentinel sampling with both models.

Example 3: optimize_and_run() with MixtureOfAgents (partially pinned)

Here only "implementation" is specified — the optimizer is free to choose proposer models, temperatures, and aggregator model from the available pool.

"""Example: physical= hint with .optimize_and_run() — map is hinted, filter is not."""

import palimpzest as pz
from palimpzest.query.operators.mixture_of_agents import MixtureOfAgentsConvert

ds = pz.MemoryDataset(id="movies", vals=[
    "The Dark Knight (Action, 152 min)",
    "The Godfather (Crime, 175 min)",
    "Inception (Action, 148 min)",
    "12 Angry Men (Drama, 96 min)",
    "Star Wars (Action, 121 min)",
    "Forrest Gump (Drama, 142 min)",
    "Pulp Fiction (Crime, 154 min)",
    "Interstellar (Adventure, 169 min)",
    "Fight Club (Drama, 139 min)",
    "The Shawshank Redemption (Crime, 142 min)",
])

train_ds = pz.MemoryDataset(id="movies", vals=[
    "Alien (Sci-Fi, 117 min)",
    "Titanic (Drama, 195 min)",
    "The Matrix (Action, 136 min)",
    "Goodfellas (Crime, 146 min)",
    "Toy Story (Animation, 81 min)",
])

# sem_map is hinted to MixtureOfAgentsConvert — optimizer picks models/temps
# sem_filter is unhinted — optimizer picks freely
plan = (
    ds
    .sem_map(
        [{"name": "genre", "desc": "The genre of the movie", "type": str}],
        physical={"implementation": MixtureOfAgentsConvert},
    )
    .sem_filter("the genre is Action or Sci-Fi")
)

config = pz.QueryProcessorConfig(
    policy=pz.MinCost(),
    available_models=[pz.Model.GPT_4o_MINI, pz.Model.GPT_4o],
    sample_budget=5,
)

results = plan.optimize_and_run(
    config=config,
    train_dataset=train_ds,
    validator=pz.Validator(model=pz.Model.GPT_4o),
)

print(f"{len(results)} records:")
for r in results:
    print(f"  genre={r.genre:20s} {r.value}")
print(f"\nPlan:\n{results.executed_plans[0]}")

Output:

Total opt. time: 14.10s
Total opt. cost: $0.0516

3 records:
  genre=Action               Inception (Action, 148 min)
  genre=Action               The Dark Knight (Action, 152 min)
  genre=Action               Star Wars (Action, 121 min)

Plan:
0. Schema['genre', 'value'] -> LLMFilter -> Schema['genre', 'value']
    (genre, value) -> (genre, value)
    Model: openai/gpt-4o-mini-2024-07-18
    Filter: the genre is Action or Sci-Fi

  1. Schema['value'] -> MixtureOfAgentsConvert -> Schema['genre', 'value']
    (value) -> (genre, value)
    Proposer Models: [openai/gpt-4o-mini-2024-07-18]
    Temperatures: [0.4]
    Aggregator Model: openai/gpt-4o-mini-2024-07-18

    2. MarshalAndScanDataOp(...) -> Schema['value']
    (value)

The map was pinned to MixtureOfAgentsConvert but the optimizer chose the proposer model (GPT_4o_MINI), temperature (0.4), and aggregator model (GPT_4o_MINI) freely via sentinel sampling.

Test plan

  • tests/pytest/test_hints.py — 18 unit tests (filtering, propagation, copy, validation, API)
  • End-to-end tested with run(), optimize_and_run(), and optimize_and_run() + MixtureOfAgents (examples above)
  • All pre-existing tests pass (test_rules.py, test_schemas.py, test_records.py, test_optimizer.py)

Add a `physical=` dict parameter to sem_filter, sem_map, sem_flat_map,
sem_join, and sem_agg that lets users override the optimizer's physical
operator selection.

The dict requires an "implementation" key (the physical operator class).
All other keys are forwarded as constructor kwargs to that operator,
overriding rule-generated defaults. Only the matching implementation
receives the extra kwargs — other rules build operators normally and
are filtered out post-substitution.

Changes:
- logical.py: store physical on LogicalOperator, validate at construction,
  include in get_logical_op_params() but not get_logical_id_params()
- dataset.py: thread physical= through semantic Dataset methods
- rules.py: guard extra kwargs injection by implementation class match
- tasks.py: post-filter expressions by implementation, warn on empty
18 tests covering:
- Expression filtering by implementation class (exact type match)
- Validation (rejects missing/invalid implementation key)
- Propagation through logical operators and copy
- Dataset API integration (sem_filter, sem_map, sem_flat_map)
- End-to-end usage pattern
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant