Skip to content

Create Initial Semantic Group By Operation #253

@mdr223

Description

@mdr223

The goal of this issue is to create a semantic group by operation such that users can execute an operation like the following:

# assume there is a directory of product-reviews/ laid out like:
# product-reviews/
# |
# |-- review1.txt
# |-- review2.txt
# | ...
# |-- review100.txt
#
# the following creates a dataset with default fields `contents` and `filename` for each review
ds = pz.TextFileDataset(id="reviews", dir="product-reviews/")

# this operation computes groups of complaints and then returns the count of reviews which belong to each complaint 
ds = ds.sem_groupby(gby_fields=['complaint'], agg_fields=['contents'], agg_funcs=['count'])

# run and print output
out = ds.run()
print(out.to_df()

# should output something like:
# 
#                       complaint       count
# 0               size not right            57
# 1    did not match description            33
# 2                not ergonomic            10

Here is a high-level overview of the changes which need to be made (although this may not be an exhaustive list)

1. Add sem_groupby() operation to pz.Dataset

In src/core/data/dataset.py you will need to add a function for sem_groupby() to the pz.Dataset class. (Classes like pz.TextFileDataset inherit from this base class, so defining the sem_groupby() function here will make this available to all Dataset classes).

The function will need to have the following signature:

def sem_groupby(gby_fields: list[str], agg_fields: list[str], agg_funcs: list[str]) --> Dataset

At this point, you should pause reading this issue and read the description of #252

This issue should similarly assume that pz.GroupBySig() no longer exists, and should pass these fields directly into the logical GroupByAggregate() operator. You can begin to implement the sem_groupby() function by copy-pasting the groupby() function, but be aware that you will likely need to pull the logic from pz.GroupBySig.output_schema into this function so that you can pass an output_schema to the GroupByAggregate() operator.

2. Update GroupByAggregate() to No Longer Accept pz.GroupyBySig

You will likely need to pass the gby_fields, agg_fields, and agg_funcs directly into the logical operator's constructor (in src/core/data/dataset.py::sem_groupby(), and update any internal logic to use these fields anywhere GroupByAggregate previously accessed the pz.GroupBySig object (in src/query/operators/logical.py).

For GroupByAggregate.get_logical_id_params() and GroupByAggregate.get_logical_op_params(), make sure that the param dictionaries which are returned by each function contain the new gby_fields, agg_fields, and agg_funcs parameters that you provide to __init__().

3. (The Fun Part!) Create Your Semantic Group By Implementation

In src/query/operators/aggregate.py, you should create a new class which contains your implementation of the semantic group by. I've pasted a template for you to get started with below:

class SemanticGroupByOp(AggregateOp):
    """
    Implementation of a GroupBy operator. This operator groups records by a set of fields
    and applies a function to each group. The group_by_sig object contains the fields to
    group by and the aggregation functions to apply to each group.
    """
    def __init__(self, gby_fields: list[str], agg_fields: list[str], agg_funcs: list[str], *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.gby_fields = gby_fields
        self.agg_fields = agg_fields
        self.agg_funcs = agg_funcs

    def __str__(self):
        op = super().__str__()
        op += f"    Group-by Fields: {self.gby_fields}\n"
        op += f"    Agg. Fields: {self.agg_fields}\n"
        op += f"    Agg. Funcs: {self.agg_funcs}\n"
        return op

    def get_id_params(self):
        id_params = super().get_id_params()
        return {"gby_fields": self.gby_fields, "agg_fields": self.agg_fields, "agg_funcs": self.agg_funcs, **id_params}

    def get_op_params(self):
        op_params = super().get_op_params()
        return {"gby_fields": self.gby_fields, "agg_fields": self.agg_fields, "agg_funcs": self.agg_funcs, **op_params}

    def naive_cost_estimates(self, source_op_cost_estimates: OperatorCostEstimates) -> OperatorCostEstimates:
        # for now, assume the groupby outputs NAIVE_EST_NUM_GROUPS 
        return OperatorCostEstimates(
            cardinality=NAIVE_EST_NUM_GROUPS,
            time_per_record=0,
            cost_per_record=0,
            quality=1.0,
        )

    def __call__(self, candidates: list[DataRecord]) -> DataRecordSet:
        # TODO: you will be given a list of `candidates` which are input `DataRecords`
        #             each `DataRecord` will have the values `dr.contents` and `dr.filename`.
        #
        #             1. Use some method (perhaps start with asking an LLM) to compute the unique group(s) across all `candidates` according to the `gby_fields`.
        #                (`gby_fields` may be a singleton, but it could also be a list of two or more fields (e.g. `["state", "city"]`), if it helps, just assume it's a singleton for now).
        #            2. Use some method (again perhaps asking an LLM) to assign each input `candidate` to one of the unique group(s) based on its `agg_fields`.
        #            3. Apply the `agg_funcs` over each group of records (e.g. if `agg_funcs` is just `["count"]`, then count the number of records in each group. For now let's assume that `agg_funcs` is a singleton and the only aggregation method is "count").
        #           4. For now, return a `list[dict]`, where each `dict` in the list contains the `gby_fields` and aggregate for one unique group.
        #                For example, you might return [{"complaint": "size not right", "count": 57}, {"complaint": "did not match description", "count": 33}, {"complaint": "not ergonomic", "count": 10}]

Final Notes

Once you have accomplished 1.-3., let's check-in to talk about the final steps (plugging this into the optimizer) as well as unit tests we can run to make sure this solution is working!

In order to test that your code works, I would suggest running the following script:

import pandas as pd
import palimpzest as pz
from palimpzest.query.operators.aggregate import SemanticGroupByOp

# create list of candidates from text file dataset
ds = pz.TextFileDataset(id="reviews", dir="product-reviews/")
output = ds.run()
candidates = [dr for dr in output]

# create instance of your physical operator
sem_group_by_op = SemanticGroupByOp(gby_fields=['complaint'], agg_fields=['contents'], agg_funcs=['count'])
grouped_output = sem_group_by_op(candidates)
print(pd.DataFrame(grouped_output))

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions