From 6839faa80f0fbc06d536cb1cbcd8e9ce3dd32011 Mon Sep 17 00:00:00 2001 From: Tianyu Li Date: Wed, 27 Aug 2025 11:58:41 -0400 Subject: [PATCH 001/106] Fix broken dependencies (#227) --- .gitignore | 2 ++ pyproject.toml | 3 +-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 5d2646acd..4f7a5b048 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ docs/build/* docs/source/generated/* dist/* .vscode/* +.idea/* .chroma .chroma-biodex .chroma-mmqa @@ -31,6 +32,7 @@ testdata/*.tar.gz # virtual environment(s) venv/ +uv.lock # tmp testdata/maildir/ diff --git a/pyproject.toml b/pyproject.toml index 36332dc79..cfd452802 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,7 @@ name = "palimpzest" version = "0.8.2" description = "Palimpzest is a system which enables anyone to process AI-powered analytical queries simply by defining them in a declarative language" readme = "README.md" -requires-python = ">=3.8" +requires-python = ">=3.10" keywords = ["relational", "optimization", "llm", "AI programming", "extraction", "tools", "document", "search", "integration"] authors = [ {name="MIT DSG Semantic Management Lab", email="michjc@csail.mit.edu"}, @@ -53,7 +53,6 @@ docs = [ "mkdocs>=1.6.1", "mkdocs-material>=9.6.3", "mkdocstrings-python>=1.15.0", - "mkdocs-material[imaging]", ] vllm = [ "vllm>=0.10.1.1", From ba97d2167ffda5b61caeace4d6342e2162570c46 Mon Sep 17 00:00:00 2001 From: mdr223 Date: Wed, 27 Aug 2025 13:37:17 -0400 Subject: [PATCH 002/106] Move DataRecord Internal Fields to Have Leading Underscore (#229) * update README * 1. support add_columns in Dataset; 2. support run().to_df(); 3. add demo in df-newinterface.py (#78) * Support add_columns in Dataset. Support demo in df-newinterface.py Currently we have to do records, _ = qr3.run() outputDf = DataRecord.to_df(records) I'll try to make qr3.run().to_df() work in another PR. * ruff check --fix * Support run().to_df() Update run() to DataRecordCollection, so that it will be easier for use to support more features for run() output. We support to_df() in this change. I'll send out following commits to update other demos. * run check --fix * fix typo in DataRecordCollection * Update records.py * fix tiny bug in mab processor. The code will run into issue if we don't return any stats for this function in ``` max_quality_record_set = self.pick_highest_quality_output(all_source_record_sets) if ( not prev_logical_op_is_filter or ( prev_logical_op_is_filter and max_quality_record_set.record_op_stats[0].passed_operator ) ``` * update record.to_df interface update to record.to_df(records: list[DataRecord], project_cols: list[str] | None = None) which is consistent with other function in this class. * Update demo for the new execute() output format * better way to get plan from output.run() * fix getting plan from DataRecordCollection. people used to get plan from execute() of streaming processor, which is not a good practice. I update plan_str to plan_stats, and they need to get physical plan from processor. Consider use better ways to provide executed physical plan to DataRecordCollection, possibly from stats. * Update df-newinterface.py * update code based on comments from Matt. 1. add cardinality param in add_columns 2. remove extra testdata files 3. add __iter__ in DataRecordCollection to help iter over streaming output. * see if copilot just saved me 20 minutes * fix package name * use sed to get version from pyproject.toml * bump project version; keep docs behind to test ci pipeline * bumping docs version to match code version * use new __iter__ method in demos where possible * add type hint for output of __iter__; use __iter__ in unit tests * Update download-testdata.sh (#89) Added enron-tiny.csv * Clean up the retrieve API (#79) * Clean up the retrieve operator interface * fix comments * Update to the new to_df() API * Code update for https://github.com/mitdbg/palimpzest/issues/84 (#101) * Create chat.rst (#96) * Create chat.rst * Update pyproject.toml Hotfix for chat * Update conf.py Hotfix for chat.rst * code update for https://github.com/mitdbg/palimpzest/issues/84 This implementation basically resolves https://github.com/mitdbg/palimpzest/issues/84. One implementation is different from the #84: .add_columns( cols=[ {"name": "sender", "type": "string", "udf": compute_sender}, ... ] ) If add_columns() uses cols, udf, types as params, it will make this function confusing again. Instead, if users need to specify different udfs for different columns, they should just call add_columns() multiple times for different columns. * changed types to make use of Python type system; updated use of types in tests; updated docs and README * update test to match no longer allowing None default --------- Co-authored-by: Gerardo Vitagliano Co-authored-by: Matthew Russo * Skip an operator if this is a duplicate op instead of raise error (#102) * Create chat.rst (#96) * Create chat.rst * Update pyproject.toml Hotfix for chat * Update conf.py Hotfix for chat.rst * Skip an operator when it doesn't need any logicalOP instead of raise error #Final Effects 1. Dataset() init only has one responsibility: wrap a datasource to a Dataset. I think this is a better interface. 2. No extra convert() will be added to the plan. 3. When users add the same op multiple times dataset.convert(File).convert(File), the system will just dedup the same op instead of raise error. #Issue Currently Dataset(src, schema) initiation has 2 responsibilities: 1. read source 2. convert source to schema. When we use default schema for Dataset init(source, schema=DefaultSchema) for users, the code works like: 1. Read source to schema that DataSource provides. This schema is derived by system, so the users don't know (don't need to know). 2. Convert Source schema to DefaultSchema. So everytime, the system will make one more convert call to convert SourceSchema to DefaultSchema, which is definitely wrong. #Solution 1. We use schema from Datasource if exists, which is reasonable. 2. If we do 1, then we'll get a dataset node that no actual op as its input_schema ==output_schema, so I updated a line in optimizer to just skip the node if it doesn't do anything instead raiseerror. #Real Examples ##Before Generated plan: 0. MarshalAndScanDataOp -> PDFFile 1. PDFFile -> LLMConvertBonded -> DefaultSchema (contents, filename, text_conte) -> (value) Model: Model.GPT_4o Prompt Strategy: PromptStrategy.COT_QA 2. DefaultSchema -> MixtureOfAgentsConvert -> ScientificPaper (value) -> (contents, filename, paper_auth) Prompt Strategy: None Proposer Models: [GPT_4o] Temperatures: [0.0] Aggregator Model: Model.GPT_4o Proposer Prompt Strategy: chain-of-thought-mixture-of-agents-proposer Aggregator Prompt Strategy: chain-of-thought-mixture-of-agents-aggregation 3. ScientificPaper -> LLMFilter -> ScientificPaper (contents, filename, paper_auth) -> (contents, filename, paper_auth) Model: Model.GPT_4o Filter: The paper mentions phosphorylation of Exo1 4. ScientificPaper -> MixtureOfAgentsConvert -> Reference (contents, filename, paper_auth) -> (reference_first_author, refere) Prompt Strategy: None Proposer Models: [GPT_4o] Temperatures: [0.8] Aggregator Model: Model.GPT_4o Proposer Prompt Strategy: chain-of-thought-mixture-of-agents-proposer Aggregator Prompt Strategy: chain-of-thought-mixture-of-agents-aggregation ##After Generated plan: 0. MarshalAndScanDataOp -> PDFFile 1. PDFFile -> LLMConvertBonded -> ScientificPaper (contents, filename, text_conte) -> (contents, filename, paper_auth) Model: Model.GPT_4o Prompt Strategy: PromptStrategy.COT_QA 2. ScientificPaper -> LLMFilter -> ScientificPaper (contents, filename, paper_auth) -> (contents, filename, paper_auth) Model: Model.GPT_4o Filter: The paper mentions phosphorylation of Exo1 3. ScientificPaper -> MixtureOfAgentsConvert -> Reference (contents, filename, paper_auth) -> (reference_first_author, refere) Prompt Strategy: None Proposer Models: [GPT_4o] Temperatures: [0.8] Aggregator Model: Model.GPT_4o Proposer Prompt Strategy: chain-of-thought-mixture-of-agents-proposer Aggregator Prompt Strategy: chain-of-thought-mixture-of-agents-aggregation * make equality check for new field names a bit more explicit * fix fixture usage * update all plans within code base to explicitly convert when needed; and removed unnecessary schemas for reading from datasource --------- Co-authored-by: Gerardo Vitagliano Co-authored-by: Matthew Russo * Refactor demos to use .sem_add_columns or .add_columns instead of convert(), remove Schema from demos when possible. (#104) * Create chat.rst (#96) * Create chat.rst * Update pyproject.toml Hotfix for chat * Update conf.py Hotfix for chat.rst * code update for https://github.com/mitdbg/palimpzest/issues/84 This implementation basically resolves https://github.com/mitdbg/palimpzest/issues/84. One implementation is different from the #84: .add_columns( cols=[ {"name": "sender", "type": "string", "udf": compute_sender}, ... ] ) If add_columns() uses cols, udf, types as params, it will make this function confusing again. Instead, if users need to specify different udfs for different columns, they should just call add_columns() multiple times for different columns. * use field_values instead of field_types as field_values have the actual values, use field_values instead of field_types as field_values have the actual values, since field_values have the actual key-value pairs, while field_types are just contain fields and their types. records[0].schema is the schema of the output, which doesn't mean we already populate the schema into record. * Remove .convert() and use .sem_add_columns or .add_columns instead This change is based on #101 and #102, please review them first then this change. 1. This is to refactor all demos to use .sem_add_columns or .add_columns, and remove .convert(). 2. Remove Schema from demos, except demos using ValidationDataSource and dataset.retrieve() that need schema now. We can refactor these cases later. * ruff check --fix * fix unittest * demos fixed and unit tests running * fix add_columns --> sem_add_columns in demo * udpate quickstart to reflect code changes; shorten text as much as possible * passing unit tests * remove convert() everywhere * fixes to correct errors in demos; update quickstart and docs --------- Co-authored-by: Gerardo Vitagliano Co-authored-by: Matthew Russo * Simplify Datasource (#103) ## Summary of PR changes **Note 1:** I did not change anything related to val_datasource (including tangential functions like Dataset._set_data_source()) as that will all be modified in a subsequent PR to reflect our discussion re: validation data. **Note 2:** I have completely commented out datamanager.py and config.py; for now I am willing to leave the code around in case we desperately need it for PalimpChat. However, my hope is that PalimpChat can be tweaked to work without the data manager and those files can be deleted before merging dev into main **Note 3:** Despite the branch name, fixing the progress managers will be part of a separate PR. - Collapsed all four `DataSource` classes down to a single `DataReader` class - Limit the number of methods the user needs to implement to just `__len__()` and `__getitem__()` - (Switched from using `get_item() --> __getitem__()` in `DataReader`) - Provided `DataReader` directly to scan operators (also renamed `DataSourcePhysicalOp --> ScanPhysicalOp` - Removed `DataDirectory()` from `src/` entirely; this included commenting out things which made use of the cache (e.g. caching computed `DataRecords` and codegen examples) - Got rid of `dataset_id` everywhere (which tracks with the previous bullet) - Removed the `Config` class which was a relic of a bygone era (and also intertwined with the `DataDirectory()`) - Updated all demos to use `import palimpzest as pz` to make the import statement(s) more welcoming - Fixed one bug resulting from converts now producing union schemas. Instead of including the `output_schema` in an operators' `get_id_params()` we simply report the `generated_fields`. - Changed `source_id --> source_idx` everywhere (this eliminated some weird renaming logic) - Finally, I added a large set of documentation for the DataSource class(es) * Multi-LLM Refinement Pipeline for Query Output Validation (#118) * Multi-LLM Refinement Pipeline for Query Output Validation (#92) ## Summary of PR This PR contains the work to add a new `CriticConvert` physical operator to PZ. At a high-level, this operator runs a bonded convert, and then asks a critic model if the answer produced by the bonded convert can be improved upon. The original output and the critique are then fed into a refinement model, which produces the improved output. The work to implement this includes: 1. Defining the physical operator in `src/palimpzest/query/operators/critique_and_refine_convert.py` 2. Adding an implementation rule for this physical operator in `src/palimpzest/query/optimizer/rules.py` 3. Adding boolean flag(s) to enable allowing / disallowing this physical optimization 4. Adding base prompts for the critique and refinement generations One other change which this work spawned was an attempt to improve the management and construction of our prompts -- and to decouple this logic from the `BaseGenerator` class. On the management side, I split our single `prompts.py` file into a set of files. On the construction side, I created a `PromptFactory` class which templates prompts based on the `prompt_strategy` and input record. The `PromptFactory` is not a perfect solution, but I think it is a step in the right direction. Finally, I fixed an error which previously filtered out `RAGConvert` operators from being considered by the `Optimizer`, and I made 2-3 more miscellaneous small tweaks. --------- Co-authored-by: Yash Agarwal Co-authored-by: Yash Agarwal * MkDocs Site for Palimpzest API Documentation (#116) ## Summary of PR Changes 1. Changed `docs` to use [MkDocs](https://www.mkdocs.org/) instead of Sphinx 2. Created initial `Getting Started` content 3. Created placeholders for `User Guide` content (to follow in a subsequent PR) 4. Added autogenerated docs for our most user-facing code (we will need to add docstrings to our code in a subsequent PR) 5. Made small tweaks to `src/` to allow users to specify policy using kwargs in `.run()` 6. Renamed the `testdata/enron-tiny/` files so that they're not so damn weird --------- Co-authored-by: Yash Agarwal Co-authored-by: Yash Agarwal * remove registration of sources from CI; only check version bump if there is a code change * remove filter for only checking version bump when src files changed * Rename `nocache` --> `cache` everywhere (#128) * first commit * Removed myenv * added to git ignore * addressed the comments in review * flip one minor comment * minor spacing fix * fix spaces in a few more spots --------- Co-authored-by: Bari LeBari Co-authored-by: muhamed Co-authored-by: Matthew Russo * adding citation (and making 'others' explicit) (#136) * Make Generator thread-safe (#139) * fix moa prompt * fix moa prompt aggregator * update version * make generator thread-safe * update generator to return messages * address comments * Begin Process of Improving Index Abstraction(s) in PZ (#138) * quick and dirty implementation which tracks retrieve costs * bug fixes and currently unused index code * add default search func which I forgot to implement and add chromadb to pyproject.toml * leaving TODO * hotfix to add cost for retrieve operation * another hotfix to add ragatouille dependency * Add logger for PZ (#134) * add logger for PZ 1. When verbose=True, we save all logs to log_file and print them on console; 2. when verbose=False, we only save ERROR+ log to file and print ERROR+. I just add logging to somewhere I think might be important for the execution, we always can add/remove for more or less. Also I might update the logging message based on my later annotation work. But this PR should setup the logging mechanism for now. * ruff fix * update code based on comments 1. not logging output_records 2. not logging plan_stats 3. make the files to ".pz_logs" --------- Co-authored-by: Matthew Russo * fix merge bug (#141) * ruff fix * update log dir and fix tiny bug * fix merge bug * Use a singleton API client for operators (#140) * fix moa prompt * fix moa prompt aggregator * update version * make generator thread-safe * update generator to return messages * address comments * create a singleton API client * fix linting * fix logging in generators * also create parent dir. if missing * CUAD benchmark (#143) * fix moa prompt * fix moa prompt aggregator * update version * make generator thread-safe * update generator to return messages * address comments * create a singleton API client * fix linting * fix logging in generators * fix CUAD benchmarlk * fix type * minor fixes * Limit the Scope of Logging within the Optimizer (#144) * making it possible to set log level based on env. variable; adding time limit on seven filters test * deleting instead of commenting out * Remove Conventional LLM Convert; Update Bonded LLM Convert retry logic (#145) * use NullHandler in __init__ and let application control logging config (#146) * use NullHandler in __init__ and let application control logging config * ruff fix * Fix Progress Manager and Simplify `execute_plan` methods (#148) * modifying ProgressManager class to allow for dynamically adding tasks * beginning to use new progress manager * initial rewrite of execute_plan methods with new progress manager * unit tests passing * trim a few lines * unit tests passing; changes applied everywhere; MAB and Random coming in a separate PR * enable final operator to show progress in parallel * address comments * The great deletion (#149) * Adding Preliminary Work on Abacus and MAB Sentinel Execution (#147) * updating models to avoid llama3 * fix parsing bugs and some generation errors * don't require json for proposer and code synth generations; fix prompt format instruction for proposers * fix typo/bug * fix bugs in generator prep for field_answers; fix bug in filter impl.; other improvements * adding new file for abacus workload * fix len * fix errors with dataset copy; prompt construction; and more * remove JSON instruction from MOA proposer * fixed bugs in optimizer configuration, llama 3.3 generation, and filter generation * clean up demos; fix missing base prompt from map * add one more missing base prompt * prepare demo for full run; get embedding cost info from RAGConvert; use reasoning output from Critique * add script to generate text-embedding-3-small reaction embeddings * write to .chroma * run full scale generation * compute embeddings slowly and add progress bar * add sleep * fix import * add total iters * create embeddings before ingesting * fix index start and finish * load embeddings and insert directly * make chroma use cosine sim.; finish initial search fcn. for biodex workload; naming tweak in rag convert * capturing gen stats in Retrieve * added UDF map operator; rewrote biodex pipeline to match docetl impl.; switched to using __name__ for functions instead of str() * add optimizations back in * write data to csv in demo * limit to same model choice(s) as docetl and lotus * fix punctuation error(s) * try run without filter * remove unused demo file * remove print * remove prints * remove costed_phys_op_ids which were used for debugging * try slightly diff. approach * remove temp changes while branch is in PR review * remove depends_on for map * fix iteration bug in sentinel processors * one more hotfix * fix more errors w/SentinelPlanStats and sentinel processors * remove logger lib to reduce confusion (#159) * Update research.md (#160) AISD @ NAACL 2025 * Add Pneuma-Palimpzest Integration Demo (#158) * Add Pneuma demo * Remove dataset semantic column addition * Fix progress managers episode 2 attack of the clones (#156) * modifying ProgressManager class to allow for dynamically adding tasks * beginning to use new progress manager * initial rewrite of execute_plan methods with new progress manager * unit tests passing * trim a few lines * unit tests passing; changes applied everywhere; MAB and Random coming in a separate PR * enable final operator to show progress in parallel * initial work to refactor sentinel processors * passing unit tests * checking in minor changes * remove use of setup_logger inside library * stuff seems to be working * big print * turn off rag for test * try debugging exception * checking in code before changes to scoring * finished initial refactoring of mab sentinel execution strategy * get random sampling execution working with changes * passing unit tests * nosentinel progress looks good * eyeball test is working for progress bars * remove the old gods * revert small change * pull up progress manager logic in parallel execution * catch errors in generating embeddings * fix comments * Merging in Changes for Sentinel Progress Bars; Split Convert (off by default); `demos/enron-demo.py`; and MMQA Benchmark (#163) * modifying ProgressManager class to allow for dynamically adding tasks * beginning to use new progress manager * initial rewrite of execute_plan methods with new progress manager * unit tests passing * trim a few lines * unit tests passing; changes applied everywhere; MAB and Random coming in a separate PR * enable final operator to show progress in parallel * initial work to refactor sentinel processors * passing unit tests * checking in minor changes * remove use of setup_logger inside library * stuff seems to be working * big print * turn off rag for test * try debugging exception * checking in code before changes to scoring * finished initial refactoring of mab sentinel execution strategy * get random sampling execution working with changes * passing unit tests * nosentinel progress looks good * eyeball test is working for progress bars * remove the old gods * revert small change * pull up progress manager logic in parallel execution * adding prints to generator; turn progress off in favor of verbose for now * catch errors in generating embeddings * inspect frontier updates * remove args.workload * fix num_inputs in selectivity computation * pdb in score * fixed score fn issue * use execution cache to avoid unnecessary computation; use sentinel stats for updating frontier * fix progress counter * debug * fix empty stats * only count stats from newly computed results * fix tuple unpacking * only update sample counts for llm ops * de-dup duplicate record * ugh * dont forget to increment * plz * more plz * increment * recycle ops back onto reservoir so they may be reconsidered in the future * remove pdb * add progress to script args * try without rag * use term recall * just check in on term recall * make it easier to turn off progress * remove pdb * try to get re-rank to keep all inputs * try to generate more reactions * track total LLM calls * 10x parallelism * try retrieve directly on fulltext * up max workers * adding enron-demo w/optimization * remove config option * adding recall and precision to output * allow operators to be recycled back onto frontier * revert to using reactions instead of fulltext for similarity * better cycling of off-frontier operators * safety check on reservoir ops * remove pdb * fixing 5 results per query * investigate sampling behavior * check on seeds * remove pdb * test SplitConvert * debug chunking * fix bug in rag and split convert * run with chunks * test chunking logic * fix chunking logic * sum list * remove split merge for now * minor fixes to CUAD script * add embedding scripts for mmqa tables and image titles * address issue with empty titles and title collisions * prepare script for using clip embeddings for images * fix bug * get full space of possible extensions * debug * weird bug fix? * more debug * fix idiotic mistake * handle corrupted images and minor things * add another corrupted image * another one * anotha * more bad images * last disallow file * prepare cuad for runs * specify execution strategy * up samples * add sentinel execution strategy to output name * adding plan str and more stats * specify no prior * verbose=False * fix comment; comment out prints * make split merge optional for now * addressing comments * applying syntax changes to pneuma demo and supporting strings within retrieve * bump version; fix lint; fix docs * more docs tweaks; tweaking dependencies * fix install issues * one more version fix * one more version fix * one more version fix * one more version fix * last try * change runner python version * actually changing runner python version * increase time limit for runners * increase time limit for runners * Merge in Changes From Final Abacus Work (WIP) (#173) * modifying ProgressManager class to allow for dynamically adding tasks * beginning to use new progress manager * initial rewrite of execute_plan methods with new progress manager * unit tests passing * trim a few lines * unit tests passing; changes applied everywhere; MAB and Random coming in a separate PR * enable final operator to show progress in parallel * initial work to refactor sentinel processors * passing unit tests * checking in minor changes * remove use of setup_logger inside library * stuff seems to be working * big print * turn off rag for test * try debugging exception * checking in code before changes to scoring * finished initial refactoring of mab sentinel execution strategy * get random sampling execution working with changes * passing unit tests * nosentinel progress looks good * eyeball test is working for progress bars * remove the old gods * revert small change * pull up progress manager logic in parallel execution * adding prints to generator; turn progress off in favor of verbose for now * catch errors in generating embeddings * inspect frontier updates * remove args.workload * fix num_inputs in selectivity computation * pdb in score * fixed score fn issue * use execution cache to avoid unnecessary computation; use sentinel stats for updating frontier * fix progress counter * debug * fix empty stats * only count stats from newly computed results * fix tuple unpacking * only update sample counts for llm ops * de-dup duplicate record * ugh * dont forget to increment * plz * more plz * increment * recycle ops back onto reservoir so they may be reconsidered in the future * remove pdb * add progress to script args * try without rag * use term recall * just check in on term recall * make it easier to turn off progress * remove pdb * try to get re-rank to keep all inputs * try to generate more reactions * track total LLM calls * 10x parallelism * try retrieve directly on fulltext * up max workers * adding enron-demo w/optimization * remove config option * adding recall and precision to output * allow operators to be recycled back onto frontier * revert to using reactions instead of fulltext for similarity * better cycling of off-frontier operators * safety check on reservoir ops * remove pdb * fixing 5 results per query * investigate sampling behavior * check on seeds * remove pdb * test SplitConvert * debug chunking * fix bug in rag and split convert * run with chunks * test chunking logic * fix chunking logic * sum list * remove split merge for now * minor fixes to CUAD script * add embedding scripts for mmqa tables and image titles * address issue with empty titles and title collisions * prepare script for using clip embeddings for images * fix bug * get full space of possible extensions * debug * weird bug fix? * more debug * fix idiotic mistake * handle corrupted images and minor things * add another corrupted image * another one * anotha * more bad images * last disallow file * prepare cuad for runs * specify execution strategy * up samples * add sentinel execution strategy to output name * adding plan str and more stats * specify no prior * verbose=False * fix comment; comment out prints * make split merge optional for now * addressing comments * applying syntax changes to pneuma demo and supporting strings within retrieve * add prints * debug sample sets * checking in code before tweaks to mab * state of repo after running final Abacus experiments * revert to opt-profiling-data * removing print statement * remove prints * final fixes * removing ragatouille dependency * fix ruff lint checks * bump version * passing tests locally * remove pdb * fix complaint about match * Move Abacus Research Scripts into Separate Folder (#175) * re-organizing abacus research-related scripts * fix model selection and other tweaks * add data download script * bump version * remove scripts from root * removing python files which were merged back in from main * Fixed Issue(s) with Aggregate Operator Computation for Movie Queries (WIP) (#182) * queries 1-4 working for movies * removing RandomSampling * Create `Context` Class + `compute` and `search` operators (#186) * checking in changes * refactored Dataset * checking in * checking in * checking in * queries extract final answer now * checking in changes w/search operator * adding changes to agents * add isinstance checks to all executors * removing script * remove tools; include in future PR * Remove `pz.Schema` in Favor of Using `pydantic.BaseModel` (#188) * made changes throughout codebase and updated unit tests * checking in; debugging failure with image use case * simple demo / paper demos working * eliminate caching features (#195) * removing all code synthesis (#198) * removing all code synthesis * remove unused import * Using LiteLLM to Manage Generator Clients / Completion APIs (#200) * use LiteLLM for generators * remove unused function; add TODO * Added Anthropic Support; Simplified Rules; Removed Redundant Model Helpers (#202) * changes after simplifying rules * passing unit tests; removed unnecessary model helpers * simplified primitives slightly * fixing the assertion which used FieldInfo instead of FieldInfo.annotation (#204) * add support for o4-mini, gemini-2.5-pro, gemini-2.0-flash, llama-4-maverick (#205) * Adding Semantic Join Operator (#206) * initial changes to support validator class; fixed bug in generator for images * adding validator based optimization * validator agent example working * using o1 model; made validation more efficient * added initial nested loops join implementation * passing tests * unit tests passing * unit tests passing * enron-demo.py working * join demos in place * parallel join and other bugfixes (#207) * audio-demo (#208) * remove pdb * adding option to only use gemini models in audio demo * adding parallelism; fixed bug w/unique_logical_op_id (#209) * fixed issue which removed pipelined execution of operators in parallel setting (#210) * Movie bugfixes (#211) * fixed error in cost computation for gemini models; tested join on movie queries * make join count monotonic * removing progress bar updates for join for now * adding reasoning effort (#212) * made progress manager more efficient; made join op calculations accurate (#213) * make groupby ignore None values * make it possible to specify schema for MemoryDataset; reasoning model fixes * adding audio-only match in substitution (#214) * quick fix for audio prompt missing in MoA * support passing in gemini/vertex credentials path; fix minor bugs in audio generation (#216) * adding Distinct operator to PZ (#217) * masking filepaths for sembench; fix audio pricing (#218) * make GroupBySig a pz. import * remove email demo * reproduce abacus results * add notes about deprecation to scripts for generating priors * remove unsupported demos * sem_add_columns -> sem_map * Dev staging (#220) * edit cuad abacus scripts to use loacl data * edit cuad abacus scripts to use local data * edit cuad abacus scripts to use local data * fix: cuad data loader doesn't work via huggingface anymore (#215) * edit cuad abacus scripts to use loacl data * edit cuad abacus scripts to use local data * edit cuad abacus scripts to use local data --------- Co-authored-by: mdr223 --------- Co-authored-by: Shreya Shankar * adding early support for vllm models * changes to appease linter * remove models now that we have access to gpt-5 * only perform time check on local; CI runners are slow * Support google api and desc (#222) * support shreya models and re-support desc * adding gpt-5-nano to gpt-5 models * bump version * fixed merge error * fixing bug where id column in schema overrides DataRecord.id --------- Co-authored-by: Jun <130543538+chjuncn@users.noreply.github.com> Co-authored-by: Gerardo Vitagliano Co-authored-by: Sivaprasad Sudhir Co-authored-by: Yash Agarwal Co-authored-by: Yash Agarwal Co-authored-by: Bari Bo LeBari <143016395+lilbarbar@users.noreply.github.com> Co-authored-by: Bari LeBari Co-authored-by: muhamed Co-authored-by: Tranway1 Co-authored-by: Luthfi Balaka Co-authored-by: Shreya Shankar --- src/palimpzest/core/elements/groupbysig.py | 2 +- src/palimpzest/core/elements/records.py | 108 +++++++++--------- src/palimpzest/prompts/prompt_factory.py | 2 +- .../all_sample_execution_strategy.py | 2 +- .../query/execution/execution_strategy.py | 8 +- .../query/execution/mab_execution_strategy.py | 3 +- .../execution/parallel_execution_strategy.py | 2 +- .../single_threaded_execution_strategy.py | 16 +-- src/palimpzest/query/generators/generators.py | 2 + src/palimpzest/query/operators/aggregate.py | 18 +-- src/palimpzest/query/operators/compute.py | 8 +- src/palimpzest/query/operators/convert.py | 6 +- src/palimpzest/query/operators/distinct.py | 14 +-- src/palimpzest/query/operators/filter.py | 10 +- src/palimpzest/query/operators/join.py | 16 +-- src/palimpzest/query/operators/limit.py | 8 +- src/palimpzest/query/operators/physical.py | 4 +- src/palimpzest/query/operators/project.py | 8 +- src/palimpzest/query/operators/retrieve.py | 6 +- src/palimpzest/query/operators/scan.py | 12 +- src/palimpzest/query/operators/search.py | 24 ++-- src/palimpzest/validator/validator.py | 4 +- tests/pytest/fixtures/champion_outputs.py | 10 +- tests/pytest/fixtures/execution_data.py | 84 +++++++------- tests/pytest/fixtures/expected_records.py | 6 +- tests/pytest/test_convert.py | 2 +- tests/pytest/test_records.py | 6 +- 27 files changed, 197 insertions(+), 194 deletions(-) diff --git a/src/palimpzest/core/elements/groupbysig.py b/src/palimpzest/core/elements/groupbysig.py index 316538a0f..c57792d35 100644 --- a/src/palimpzest/core/elements/groupbysig.py +++ b/src/palimpzest/core/elements/groupbysig.py @@ -16,7 +16,7 @@ def __init__(self, group_by_fields: list[str], agg_funcs: list[str], agg_fields: self.agg_funcs = agg_funcs self.agg_fields = agg_fields - def validate_schema(self, input_schema: BaseModel) -> tuple[bool, str | None]: + def validate_schema(self, input_schema: type[BaseModel]) -> tuple[bool, str | None]: for f in self.group_by_fields: if f not in input_schema.model_fields: return (False, "Supplied schema has no field " + f) diff --git a/src/palimpzest/core/elements/records.py b/src/palimpzest/core/elements/records.py index 74f357809..084e3db3f 100644 --- a/src/palimpzest/core/elements/records.py +++ b/src/palimpzest/core/elements/records.py @@ -28,7 +28,7 @@ class DataRecord: def __init__( self, - schema: BaseModel, + schema: type[BaseModel], source_indices: str | list[str], parent_ids: str | list[str] | None = None, cardinality_idx: int | None = None, @@ -45,26 +45,26 @@ def __init__( parent_ids = [parent_ids] # schema for the data record - self.schema = schema + self._schema = schema # mapping from field names to Field objects; effectively a mapping from a field name to its type - self.field_types: dict[str, FieldInfo] = schema.model_fields + self._field_types: dict[str, FieldInfo] = schema.model_fields # mapping from field names to their values - self.field_values: dict[str, Any] = {} + self._field_values: dict[str, Any] = {} # the index in the root Dataset from which this DataRecord is derived; # each source index takes the form: f"{root_dataset.id}-{idx}" - self.source_indices = sorted(source_indices) + self._source_indices = sorted(source_indices) # the id(s) of the parent record(s) from which this DataRecord is derived - self.parent_ids = parent_ids + self._parent_ids = parent_ids # store the cardinality index - self.cardinality_idx = cardinality_idx + self._cardinality_idx = cardinality_idx # indicator variable which may be flipped by filter operations to signal when a record has been filtered out - self.passed_operator = True + self._passed_operator = True # NOTE: Record ids are hashed based on: # 0. their schema (keys) @@ -79,27 +79,29 @@ def __init__( # unique identifier for the record id_str = ( - str(schema) + str(parent_ids) if parent_ids is not None else str(self.source_indices) + str(schema) + str(parent_ids) if parent_ids is not None else str(self._source_indices) if cardinality_idx is None - else str(schema) + str(cardinality_idx) + str(parent_ids) if parent_ids is not None else str(self.source_indices) + else str(schema) + str(cardinality_idx) + str(parent_ids) if parent_ids is not None else str(self._source_indices) ) - # TODO(Jun): build-in id should has a special name, the current self.id is too general which would conflict with user defined schema too easily. - # the options: built_in_id, generated_id - self.id = hash_for_id(id_str) + # TODO: hide special fields in BaseModel when we deprecate DataRecord + self._id = hash_for_id(id_str) + # TODO: raise an exception if one of these fields is present in the schema + # - put these in a constant list up top + # - import the constant list in Dataset (if possible) and check at plan creation time def __setattr__(self, name: str, value: Any, /) -> None: - if name in ["schema", "field_types", "field_values", "source_indices", "parent_ids", "cardinality_idx", "passed_operator", "id"]: + if name in ["_schema", "_field_types", "_field_values", "_source_indices", "_parent_ids", "_cardinality_idx", "_passed_operator", "_id"]: super().__setattr__(name, value) else: - self.field_values[name] = value + self._field_values[name] = value def __getattr__(self, name: str) -> Any: - if name == "field_values": + if name == "_field_values": pass - elif name in self.field_values: - return self.field_values[name] + elif name in self._field_values: + return self._field_values[name] else: raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") @@ -114,44 +116,44 @@ def __setitem__(self, field: str, value: Any) -> None: def __str__(self, truncate: int | None = 15) -> str: if truncate is not None: - items = (f"{k}={str(v)[:truncate]!r}{'...' if len(str(v)) > truncate else ''}" for k, v in sorted(self.field_values.items())) + items = (f"{k}={str(v)[:truncate]!r}{'...' if len(str(v)) > truncate else ''}" for k, v in sorted(self._field_values.items())) else: - items = (f"{k}={v!r}" for k, v in sorted(self.field_values.items())) + items = (f"{k}={v!r}" for k, v in sorted(self._field_values.items())) return "{}({})".format(type(self).__name__, ", ".join(items)) def __repr__(self) -> str: return self.__str__(truncate=None) def __eq__(self, other): - return isinstance(other, DataRecord) and self.field_values == other.field_values and self.schema == other.schema + return isinstance(other, DataRecord) and self._field_values == other._field_values and self._schema == other._schema def __hash__(self): return hash(self.to_json_str(bytes_to_str=True, sorted=True)) def __iter__(self): - yield from self.field_values.items() + yield from self._field_values.items() def get_field_names(self): - return list(self.field_values.keys()) + return list(self._field_values.keys()) def get_field_type(self, field_name: str) -> FieldInfo: - return self.field_types[field_name] + return self._field_types[field_name] def copy(self, include_bytes: bool = True, project_cols: list[str] | None = None): # make copy of the current record new_dr = DataRecord( - self.schema, - source_indices=self.source_indices, - parent_ids=self.parent_ids, - cardinality_idx=self.cardinality_idx, + self._schema, + source_indices=self._source_indices, + parent_ids=self._parent_ids, + cardinality_idx=self._cardinality_idx, ) # copy the passed_operator attribute - new_dr.passed_operator = self.passed_operator + new_dr._passed_operator = self._passed_operator # get the set of fields to copy from the parent record copy_field_names = project_cols if project_cols is not None else self.get_field_names() @@ -169,7 +171,7 @@ def copy(self, include_bytes: bool = True, project_cols: list[str] | None = None continue # set field and value - new_dr.field_types[field_name] = field_type + new_dr._field_types[field_name] = field_type new_dr[field_name] = field_value return new_dr @@ -177,7 +179,7 @@ def copy(self, include_bytes: bool = True, project_cols: list[str] | None = None @staticmethod def from_parent( - schema: BaseModel, + schema: type[BaseModel], parent_record: DataRecord, project_cols: list[str] | None = None, cardinality_idx: int | None = None, @@ -187,18 +189,18 @@ def from_parent( # otherwise, it's a ProjectSchema new_schema = None if project_cols is None: - new_schema = union_schemas([schema, parent_record.schema]) + new_schema = union_schemas([schema, parent_record._schema]) elif project_cols == []: new_schema = schema else: - new_schema = union_schemas([schema, parent_record.schema]) + new_schema = union_schemas([schema, parent_record._schema]) new_schema = project(new_schema, project_cols) # make new record which has parent_record as its parent (and the same source_indices) new_dr = DataRecord( new_schema, - source_indices=parent_record.source_indices, - parent_ids=[parent_record.id], + source_indices=parent_record._source_indices, + parent_ids=[parent_record._id], cardinality_idx=cardinality_idx, ) @@ -208,7 +210,7 @@ def from_parent( # copy fields from the parent for field_name in copy_field_names: - new_dr.field_types[field_name] = parent_record.get_field_type(field_name) + new_dr._field_types[field_name] = parent_record.get_field_type(field_name) new_dr[field_name] = parent_record[field_name] return new_dr @@ -216,7 +218,7 @@ def from_parent( @staticmethod def from_agg_parents( - schema: BaseModel, + schema: type[BaseModel], parent_records: DataRecordSet, cardinality_idx: int | None = None, ) -> DataRecord: @@ -224,20 +226,20 @@ def from_agg_parents( source_indices = [ source_idx for parent_record in parent_records - for source_idx in parent_record.source_indices + for source_idx in parent_record._source_indices ] # make new record which has all parent records as its parents return DataRecord( schema, source_indices=source_indices, - parent_ids=[parent_record.id for parent_record in parent_records], + parent_ids=[parent_record._id for parent_record in parent_records], cardinality_idx=cardinality_idx, ) @staticmethod def from_join_parents( - schema: BaseModel, + schema: type[BaseModel], left_parent_record: DataRecord, right_parent_record: DataRecord, project_cols: list[str] | None = None, @@ -246,8 +248,8 @@ def from_join_parents( # make new record which has left and right parent record as its parents new_dr = DataRecord( schema, - source_indices=list(left_parent_record.source_indices) + list(right_parent_record.source_indices), - parent_ids=[left_parent_record.id, right_parent_record.id], + source_indices=list(left_parent_record._source_indices) + list(right_parent_record._source_indices), + parent_ids=[left_parent_record._id, right_parent_record._id], cardinality_idx=cardinality_idx, ) @@ -267,14 +269,14 @@ def from_join_parents( # copy fields from the parents for field_name in left_copy_field_names: - new_dr.field_types[field_name] = left_parent_record.get_field_type(field_name) + new_dr._field_types[field_name] = left_parent_record.get_field_type(field_name) new_dr[field_name] = left_parent_record[field_name] for field_name in right_copy_field_names: new_field_name = field_name if field_name in left_copy_field_names: new_field_name = f"{field_name}_right" - new_dr.field_types[new_field_name] = right_parent_record.get_field_type(field_name) + new_dr._field_types[new_field_name] = right_parent_record.get_field_type(field_name) new_dr[new_field_name] = right_parent_record[field_name] return new_dr @@ -282,7 +284,7 @@ def from_join_parents( # TODO: unused outside of unit tests @staticmethod - def from_df(df: pd.DataFrame, schema: BaseModel | None = None) -> list[DataRecord]: + def from_df(df: pd.DataFrame, schema: type[BaseModel] | None = None) -> list[DataRecord]: """Create a list of DataRecords from a pandas DataFrame Args: @@ -310,8 +312,8 @@ def from_df(df: pd.DataFrame, schema: BaseModel | None = None) -> list[DataRecor for idx, row in df.iterrows(): row_dict = row.to_dict() record = DataRecord(schema=schema, source_indices=[f"{dataset_id}-{idx}"]) - record.field_values = row_dict - record.field_types = {field_name: schema.model_fields[field_name] for field_name in row_dict} + record._field_values = row_dict + record._field_types = {field_name: schema.model_fields[field_name] for field_name in row_dict} records.append(record) return records @@ -348,7 +350,7 @@ def to_dict(self, include_bytes: bool = True, bytes_to_str: bool = False, projec field_values = { k: v.description if isinstance(v, context.Context) else v - for k, v in self.field_values.items() + for k, v in self._field_values.items() } dct = pd.Series(field_values).to_dict() @@ -358,7 +360,7 @@ def to_dict(self, include_bytes: bool = True, bytes_to_str: bool = False, projec if not include_bytes: for k in dct: - field_type = self.field_types[k] + field_type = self._field_types[k] if field_type.annotation in [bytes, AudioBase64, ImageBase64, list[bytes], list[ImageBase64]]: dct[k] = "" @@ -374,7 +376,7 @@ def to_dict(self, include_bytes: bool = True, bytes_to_str: bool = False, projec if mask_filepaths: for k in dct: - field_type = self.field_types[k] + field_type = self._field_types[k] if field_type.annotation in [AudioBase64, AudioFilepath, ImageBase64, ImageFilepath, ImageURL]: dct[k] = "" @@ -399,9 +401,9 @@ def __init__( # set data_records, parent_ids, and source_indices; note that it is possible for # data_records to be an empty list in the event of a failed convert self.data_records = data_records - self.parent_ids = data_records[0].parent_ids if len(data_records) > 0 else None - self.source_indices = data_records[0].source_indices if len(data_records) > 0 else None - self.schema = data_records[0].schema if len(data_records) > 0 else None + self.parent_ids = data_records[0]._parent_ids if len(data_records) > 0 else None + self.source_indices = data_records[0]._source_indices if len(data_records) > 0 else None + self.schema = data_records[0]._schema if len(data_records) > 0 else None # the input to the operator which produced the data_records; type is tuple[DataRecord] | tuple[int] # - for scan operators, input is a singleton tuple[int] which wraps the source_idx, e.g.: (source_idx,) diff --git a/src/palimpzest/prompts/prompt_factory.py b/src/palimpzest/prompts/prompt_factory.py index e9a213f35..dd7b1fce8 100644 --- a/src/palimpzest/prompts/prompt_factory.py +++ b/src/palimpzest/prompts/prompt_factory.py @@ -305,7 +305,7 @@ def _get_output_fields_desc(self, output_fields: list[str], **kwargs) -> str: str: The output fields description. """ output_fields_desc = "" - output_schema: BaseModel = kwargs.get("output_schema") + output_schema: type[BaseModel] = kwargs.get("output_schema") if self.prompt_strategy.is_convert_prompt(): assert output_schema is not None, "Output schema must be provided for convert prompts." diff --git a/src/palimpzest/query/execution/all_sample_execution_strategy.py b/src/palimpzest/query/execution/all_sample_execution_strategy.py index 6b6c50275..102a6d665 100644 --- a/src/palimpzest/query/execution/all_sample_execution_strategy.py +++ b/src/palimpzest/query/execution/all_sample_execution_strategy.py @@ -146,7 +146,7 @@ def update_inputs(self, source_idx_to_record_sets: dict[int, DataRecordSet]): input = [] max_quality_record_set = self.pick_highest_quality_output(record_sets) for record in max_quality_record_set: - input.append(record if record.passed_operator else None) + input.append(record if record._passed_operator else None) self.source_indices_to_inputs[source_idx] = input diff --git a/src/palimpzest/query/execution/execution_strategy.py b/src/palimpzest/query/execution/execution_strategy.py index 44257c61d..00b2ddcce 100644 --- a/src/palimpzest/query/execution/execution_strategy.py +++ b/src/palimpzest/query/execution/execution_strategy.py @@ -182,7 +182,7 @@ def is_perfect_quality_op(op: PhysicalOperator): elif isinstance(op, LLMFilter): filter_str = op.filter_obj.filter_condition input_record: DataRecord = record_set.input - output = record_set.data_records[0].passed_operator + output = record_set.data_records[0]._passed_operator full_hash = f"{filter_str}{hash(input_record)}" if full_hash not in full_hashes: full_hash_to_bool_output[full_hash] = output @@ -195,7 +195,7 @@ def is_perfect_quality_op(op: PhysicalOperator): for left_idx, left_input_record in enumerate(record_set.input[0]): for right_idx, right_input_record in enumerate(record_set.input[1]): record_idx = left_idx * len(record_set.input[1]) + right_idx - output = record_set.data_records[record_idx].passed_operator + output = record_set.data_records[record_idx]._passed_operator full_hash = f"{condition}{hash(left_input_record)}{hash(right_input_record)}" if full_hash not in full_hashes: full_hash_to_bool_output[full_hash] = output @@ -246,7 +246,7 @@ def is_perfect_quality_op(op: PhysicalOperator): elif isinstance(op, LLMFilter): filter_str = op.filter_obj.filter_condition input_record: DataRecord = record_set.input - output = record_set.data_records[0].passed_operator + output = record_set.data_records[0]._passed_operator full_hash = f"{filter_str}{hash(input_record)}" if output == full_hash_to_bool_output[full_hash]: record_set.record_op_stats[0].quality = full_hash_to_score[full_hash] @@ -258,7 +258,7 @@ def is_perfect_quality_op(op: PhysicalOperator): for left_idx, left_input_record in enumerate(record_set.input[0]): for right_idx, right_input_record in enumerate(record_set.input[1]): record_idx = left_idx * len(record_set.input[1]) + right_idx - output = record_set.data_records[record_idx].passed_operator + output = record_set.data_records[record_idx]._passed_operator full_hash = f"{condition}{hash(left_input_record)}{hash(right_input_record)}" if output == full_hash_to_bool_output[full_hash]: record_set.record_op_stats[record_idx].quality = full_hash_to_score[full_hash] diff --git a/src/palimpzest/query/execution/mab_execution_strategy.py b/src/palimpzest/query/execution/mab_execution_strategy.py index fa4163710..57d1a8136 100644 --- a/src/palimpzest/query/execution/mab_execution_strategy.py +++ b/src/palimpzest/query/execution/mab_execution_strategy.py @@ -608,11 +608,10 @@ def update_inputs(self, source_unique_logical_op_id: str, source_indices_to_reco input = [] max_quality_record_set = self.pick_highest_quality_output(record_sets) for record in max_quality_record_set: - input.append(record if record.passed_operator else None) + input.append(record if record._passed_operator else None) self.source_indices_to_inputs[source_unique_logical_op_id][source_indices] = input - class MABExecutionStrategy(SentinelExecutionStrategy): """ This class implements the Multi-Armed Bandit (MAB) execution strategy for SentinelQueryProcessors. diff --git a/src/palimpzest/query/execution/parallel_execution_strategy.py b/src/palimpzest/query/execution/parallel_execution_strategy.py index 43b63f0f7..99ff2f256 100644 --- a/src/palimpzest/query/execution/parallel_execution_strategy.py +++ b/src/palimpzest/query/execution/parallel_execution_strategy.py @@ -77,7 +77,7 @@ def _process_future_results(self, unique_full_op_id: str, future_queues: dict[st plan_stats.add_record_op_stats(unique_full_op_id, record_op_stats) # add records which aren't filtered to the output records - output_records.extend([record for record in records if record.passed_operator]) + output_records.extend([record for record in records if record._passed_operator]) # update the progress manager if total_inputs_processed > 0: diff --git a/src/palimpzest/query/execution/single_threaded_execution_strategy.py b/src/palimpzest/query/execution/single_threaded_execution_strategy.py index d07d23de8..205e9cd1b 100644 --- a/src/palimpzest/query/execution/single_threaded_execution_strategy.py +++ b/src/palimpzest/query/execution/single_threaded_execution_strategy.py @@ -52,7 +52,7 @@ def _execute_plan(self, plan: PhysicalPlan, input_queues: dict[str, dict[str, li record_set = operator(candidates=input_queues[unique_full_op_id][source_unique_full_op_id]) records = record_set.data_records record_op_stats = record_set.record_op_stats - num_outputs = sum(record.passed_operator for record in records) + num_outputs = sum(record._passed_operator for record in records) # update the progress manager self.progress_manager.incr(unique_full_op_id, num_inputs=1, num_outputs=num_outputs, total_cost=record_set.get_total_cost()) @@ -70,7 +70,7 @@ def _execute_plan(self, plan: PhysicalPlan, input_queues: dict[str, dict[str, li record_set, num_inputs_processed = operator(left_input_records, right_input_records) records = record_set.data_records record_op_stats = record_set.record_op_stats - num_outputs = sum(record.passed_operator for record in records) + num_outputs = sum(record._passed_operator for record in records) # update the progress manager self.progress_manager.incr(unique_full_op_id, num_inputs=num_inputs_processed, num_outputs=num_outputs, total_cost=record_set.get_total_cost()) @@ -82,7 +82,7 @@ def _execute_plan(self, plan: PhysicalPlan, input_queues: dict[str, dict[str, li record_set = operator(input_record) records.extend(record_set.data_records) record_op_stats.extend(record_set.record_op_stats) - num_outputs = sum(record.passed_operator for record in record_set.data_records) + num_outputs = sum(record._passed_operator for record in record_set.data_records) # update the progress manager self.progress_manager.incr(unique_full_op_id, num_inputs=1, num_outputs=num_outputs, total_cost=record_set.get_total_cost()) @@ -95,7 +95,7 @@ def _execute_plan(self, plan: PhysicalPlan, input_queues: dict[str, dict[str, li plan_stats.add_record_op_stats(unique_full_op_id, record_op_stats) # update next input_queue (if it exists) - output_records = [record for record in records if record.passed_operator] + output_records = [record for record in records if record._passed_operator] next_unique_full_op_id = plan.get_next_unique_full_op_id(topo_idx, operator) if next_unique_full_op_id is not None: input_queues[next_unique_full_op_id][unique_full_op_id] = output_records @@ -207,7 +207,7 @@ def _execute_plan(self, plan: PhysicalPlan, input_queues: dict[str, dict[str, li record_set = operator(candidates=input_records) records = record_set.data_records record_op_stats = record_set.record_op_stats - num_outputs = sum(record.passed_operator for record in records) + num_outputs = sum(record._passed_operator for record in records) # update the progress manager self.progress_manager.incr(unique_full_op_id, num_inputs=1, num_outputs=num_outputs, total_cost=record_set.get_total_cost()) @@ -225,7 +225,7 @@ def _execute_plan(self, plan: PhysicalPlan, input_queues: dict[str, dict[str, li record_set, num_inputs_processed = operator(left_input_records, right_input_records) records = record_set.data_records record_op_stats = record_set.record_op_stats - num_outputs = sum(record.passed_operator for record in records) + num_outputs = sum(record._passed_operator for record in records) # update the progress manager self.progress_manager.incr(unique_full_op_id, num_inputs=num_inputs_processed, num_outputs=num_outputs, total_cost=record_set.get_total_cost()) @@ -237,7 +237,7 @@ def _execute_plan(self, plan: PhysicalPlan, input_queues: dict[str, dict[str, li record_set = operator(input_record) records = record_set.data_records record_op_stats = record_set.record_op_stats - num_outputs = sum(record.passed_operator for record in records) + num_outputs = sum(record._passed_operator for record in records) # update the progress manager self.progress_manager.incr(unique_full_op_id, num_inputs=1, num_outputs=num_outputs, total_cost=record_set.get_total_cost()) @@ -246,7 +246,7 @@ def _execute_plan(self, plan: PhysicalPlan, input_queues: dict[str, dict[str, li plan_stats.add_record_op_stats(unique_full_op_id, record_op_stats) # update next input_queue or final_output_records - output_records = [record for record in records if record.passed_operator] + output_records = [record for record in records if record._passed_operator] next_unique_full_op_id = plan.get_next_unique_full_op_id(topo_idx, operator) if next_unique_full_op_id is not None: input_queues[next_unique_full_op_id][unique_full_op_id].extend(output_records) diff --git a/src/palimpzest/query/generators/generators.py b/src/palimpzest/query/generators/generators.py index 32a661dd9..736fc70e6 100644 --- a/src/palimpzest/query/generators/generators.py +++ b/src/palimpzest/query/generators/generators.py @@ -402,6 +402,8 @@ def __call__(self, candidate: DataRecord, fields: dict[str, FieldInfo] | None, r prompt += "