diff --git a/docs/docs.json b/docs/docs.json index a32f350..81a4b98 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -159,42 +159,37 @@ "geneva/index", "geneva/overview/index", { - "group": "User Defined Functions (UDFs)", + "group": "Transforms", "pages": [ "geneva/udfs/index", - { - "group": "Built-in Providers", - "pages": [ - "geneva/udfs/providers/index", - "geneva/udfs/providers/openai", - "geneva/udfs/providers/gemini", - "geneva/udfs/providers/sentence-transformers" - ] - }, - "geneva/udfs/blobs", + "geneva/udfs/udfs", + "geneva/udfs/scalar-udtfs", + "geneva/udfs/batch-udtfs", "geneva/udfs/error_handling", - "geneva/udfs/advanced-configuration" + "geneva/udfs/blobs" ] }, { - "group": "Jobs", + "group": "Built-in Transforms", "pages": [ - "geneva/jobs/lifecycle", - "geneva/jobs/backfilling", - "geneva/jobs/conflicts", - "geneva/jobs/materialized-views", - "geneva/jobs/performance", - "geneva/jobs/job_metrics", - "geneva/jobs/troubleshooting" + "geneva/udfs/providers/index", + "geneva/udfs/providers/openai", + "geneva/udfs/providers/gemini", + "geneva/udfs/providers/sentence-transformers" ] }, { - "group": "Operations", + "group": "Running Jobs", "pages": [ "geneva/jobs/contexts", - "geneva/deployment/dependency-verification", - "geneva/jobs/startup", - "geneva/jobs/console" + "geneva/jobs/backfilling", + "geneva/jobs/materialized-views", + "geneva/jobs/lifecycle", + "geneva/jobs/conflicts", + "geneva/jobs/performance", + "geneva/jobs/job_metrics", + "geneva/jobs/console", + "geneva/jobs/troubleshooting" ] }, { @@ -202,6 +197,9 @@ "pages": [ "geneva/deployment/index", "geneva/deployment/helm", + "geneva/jobs/startup", + "geneva/deployment/dependency-verification", + "geneva/udfs/advanced-configuration", "geneva/deployment/troubleshooting" ] }, diff --git a/docs/geneva/index.mdx b/docs/geneva/index.mdx index 57a80c9..3e296b1 100644 --- a/docs/geneva/index.mdx +++ b/docs/geneva/index.mdx @@ -23,7 +23,7 @@ Feature Engineering and the `geneva` Python package are currently only available in scaling up your feature engineering workloads for your AI and multimodal use cases. -The `geneva` package uses Python [User Defined Functions (UDFs)](/geneva/udfs/) to define features +The `geneva` package uses Python [User Defined Functions (UDFs)](/geneva/udfs/udfs) to define features as columns in a Lance dataset. Adding a feature is straightforward: @@ -31,7 +31,7 @@ as columns in a Lance dataset. Adding a feature is straightforward: Prototype your Python function in your favorite environment. - Wrap the function with a small UDF decorator (see [UDFs](/geneva/udfs/)). + Wrap the function with a small UDF decorator (see [UDFs](/geneva/udfs/udfs)). Register the UDF as a virtual column using `Table.add_columns()`. @@ -50,6 +50,6 @@ You can build your Python feature generator function in an IDE or a notebook usi Visit the following pages to learn more about featuring engineering in LanceDB Enterprise: - **Overview**: [What is Feature Engineering?](/geneva/overview/) -- **UDFs**: [Using UDFs](/geneva/udfs/) · [Blob helpers](/geneva/udfs/blobs/) · [Error handling](/geneva/udfs/error_handling) · [Advanced configuration](/geneva/udfs/advanced-configuration) +- **UDFs**: [Using UDFs](/geneva/udfs/udfs) · [Blob helpers](/geneva/udfs/blobs/) · [Error handling](/geneva/udfs/error_handling) · [Advanced configuration](/geneva/udfs/advanced-configuration) - **Jobs**: [Backfilling](/geneva/jobs/backfilling/) · [Startup optimizations](/geneva/jobs/startup/) · [Materialized views](/geneva/jobs/materialized-views/) · [Execution contexts](/geneva/jobs/contexts/) · [Geneva console](/geneva/jobs/console) · [Performance](/geneva/jobs/performance/) - **Deployment**: [Deployment overview](/geneva/deployment/) · [Helm deployment](/geneva/deployment/helm/) · [Troubleshooting](/geneva/deployment/troubleshooting/) \ No newline at end of file diff --git a/docs/geneva/udfs/batch-udtfs.mdx b/docs/geneva/udfs/batch-udtfs.mdx new file mode 100644 index 0000000..bc3f9f3 --- /dev/null +++ b/docs/geneva/udfs/batch-udtfs.mdx @@ -0,0 +1,305 @@ +--- +title: Batch User-Defined Table Functions (UDTFs) +sidebarTitle: Batch UDTFs +description: Use batch UDTFs for N:M transformations like deduplication, clustering, and aggregation across entire tables or partitions. +icon: layer-group +--- + +Beta — introduced in Geneva 0.11.0 + +Geneva's standard UDFs operate **row-at-a-time** — one input row produces exactly one output value. Batch User-Defined Table Functions (UDTFs) lift this restriction, enabling **N:M transformations** where the output can have a completely different schema and row count than the input. + +| Workflow | Input | Output | Cardinality | +|----------|-------|--------|-------------| +| Deduplication | N rows | M rows (M ≤ N) | N:M | +| Clustering | N rows | K cluster rows | N:K | +| Aggregation | N rows | 1 summary row | N:1 | +| Cross-row join/merge | N rows | M rows | N:M | + +## Defining a Batch UDTF + +Use the `@udtf` decorator on a class or function. The UDTF receives a query builder over the source data and **yields** `pa.RecordBatch` objects with an arbitrary output schema. + +### Class-based + +```python +from geneva import udtf +import pyarrow as pa +from collections.abc import Iterator + +@udtf( + output_schema=pa.schema([ + pa.field("row_id", pa.int64()), + pa.field("cluster_id", pa.int64()), + pa.field("is_duplicate", pa.bool_()), + ]), + input_columns=["row_id", "phash"], + num_cpus=4, + memory=8 * 1024**3, # 8 GiB +) +class PHashDedupe: + def __init__(self, threshold: int = 4): + self.threshold = threshold + + def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]: + data = source.to_arrow().select(["row_id", "phash"]) + clusters = self._cluster(data) + yield pa.RecordBatch.from_pydict({ + "row_id": clusters["row_id"], + "cluster_id": clusters["cluster_id"], + "is_duplicate": clusters["is_duplicate"], + }) +``` + +### Function-based + +```python +@udtf(output_schema=pa.schema([ + pa.field("label", pa.string()), + pa.field("count", pa.int64()), + pa.field("mean_score", pa.float64()), +])) +def group_stats(source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]: + df = source.to_pandas() + agg = df.groupby("label").agg( + count=("label", "size"), + mean_score=("score", "mean"), + ).reset_index() + yield pa.RecordBatch.from_pandas(agg) +``` + +### Decorator Parameters + +| Parameter | Type | Description | +|-----------|------|-------------| +| `output_schema` | `pa.Schema` | **Required.** Arrow schema of the output table. | +| `input_columns` | `list[str] \| None` | Restrict which source columns are visible. `None` means all. | +| `partition_by` | `str \| None` | Column name for partition-parallel execution. | +| `partition_by_indexed_column` | `str \| None` | Column name with an IVF index for index-based partitioning. Mutually exclusive with `partition_by`. | +| `num_cpus` | `float` | Ray CPU resource request per worker. | +| `num_gpus` | `float` | Ray GPU resource request per worker. | +| `memory` | `int \| None` | Ray memory resource request in bytes. | +| `on_error` | | Error handling configuration (see [Error Handling](#error-handling)). | + +## Creating and Refreshing a UDTF View + +Batch UDTFs are always attached to a persistent view via `create_udtf_view()`. Call `refresh()` to populate or update the view. + +```python +conn = geneva.connect("/data/mydb") +images = conn.open_table("images") + +# Create the UDTF view +deduped = conn.create_udtf_view( + "deduped_images", + source=images.search(None).select(["row_id", "phash"]), + udtf=PHashDedupe(threshold=4), +) + +# Populate the view +deduped.refresh() +``` + +UDTF views use the same cluster infrastructure as other Geneva jobs: + +```python +# KubeRay cluster +with conn.context(cluster="my-cluster", manifest="my-manifest"): + deduped.refresh() + +# Local Ray +with conn.local_ray_context(): + deduped.refresh() +``` + +### Version-aware refresh + +On each refresh, Geneva checks the source table's version against the version stored in the view metadata. If the source has not changed, the refresh is skipped entirely — an O(1) check. + +## Execution Modes + +### Single-worker (no partitioning) + +When neither `partition_by` nor `partition_by_indexed_column` is set, the UDTF runs as a **single Ray task** with access to the entire source dataset. Use this for global operations that need cross-row visibility. + +```python +@udtf(output_schema=...) +class GlobalAggregation: + def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]: + all_data = source.to_arrow() + result = expensive_cross_row_computation(all_data) + yield result.to_batches()[0] +``` + +### Partition-parallel (`partition_by`) + +The framework groups source data by the partition column and dispatches each partition as an independent Ray task. Use this when the computation is naturally parallelizable by some grouping key. + +```python +@udtf( + output_schema=pa.schema([ + ("row_id_a", pa.string()), + ("row_id_b", pa.string()), + ("hamming_dist", pa.int32()), + ]), + partition_by="partition_id", + num_cpus=2, +) +class EdgeDetection: + def __init__(self, threshold: int = 4): + self.threshold = threshold + + def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]: + data = source.to_arrow() + edges = self._pairwise_compare(data, self.threshold) + if edges: + yield pa.RecordBatch.from_pydict(edges) +``` + +### Index-based partitioning (`partition_by_indexed_column`) + +Instead of partitioning by a materialized column, the framework reads partition assignments directly from an existing **IVF vector index** (IVF_FLAT, IVF_PQ, IVF_HNSW_SQ, etc.). This avoids materializing a `partition_id` column and keeps partitions synchronized with the index. + +```python +from geneva.partitioning import create_ivf_flat_index + +# 1. Build an IVF index on the source table +create_ivf_flat_index(images, "phash", k=16) + +# 2. Define the UDTF with index-based partitioning +@udtf( + output_schema=pa.schema([ + ("row_id_a", pa.string()), + ("row_id_b", pa.string()), + ("hamming_dist", pa.int32()), + ]), + partition_by_indexed_column="phash", + num_cpus=2, +) +class IndexPartitionedEdgeDetection: + def __init__(self, threshold: int = 4): + self.threshold = threshold + + def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]: + data = source.to_arrow() + edges = self._pairwise_compare(data, self.threshold) + if edges: + yield pa.RecordBatch.from_pydict(edges) +``` + +| | `partition_by` | `partition_by_indexed_column` | +|---|---|---| +| Partition source | Column values (SQL filter) | IVF index partitions (row ID take) | +| Requires materialized column | Yes | No — reads from index metadata | +| Partition count | Number of distinct values | Number of non-empty index partitions | +| Sync with index | Manual | Automatic — always reads latest index | + + +`partition_by` and `partition_by_indexed_column` are **mutually exclusive**. Setting both raises `ValueError`. + + +## Yielding Batches + +The UDTF yields one or more `pa.RecordBatch` or `pa.Table` objects. Each batch must conform to `output_schema`. The framework validates each batch, writes it, and optionally checkpoints it. + +```python +# Streaming — yield per source batch (memory-efficient) +def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]: + for batch in source.to_batches(batch_size=1024): + yield transform(batch) + +# Bulk — load all, compute, yield once +def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]: + all_data = source.to_arrow() + result = expensive_cross_row_computation(all_data) + yield result.to_batches()[0] +``` + + +Use the streaming pattern for memory-efficient processing. Use the bulk pattern when the computation inherently requires all data in memory (e.g., clustering, global deduplication). + + +## Error Handling + +Error handling operates at **partition granularity** — the unit of work is the entire `__call__()` execution for a partition (or the full table in single-worker mode). + +| Mode | Behavior | Use case | +|---|---|---| +| **Fail** (default) | Exception kills the partition, refresh fails | Correctness-critical UDTFs | +| **Retry** | Retry the entire partition with configurable backoff | Transient failures (network, OOM) | +| **Skip** | Log error, continue with remaining partitions | Best-effort / tolerant workloads | + + +Unlike standard UDF error handling, there is no row-level skip — UDTFs yield whole batches, so the smallest error unit is the partition. + + +## Checkpointing + +Each yielded batch is checkpointed before reporting completion. On resume after a failure, completed batches are skipped and entire partitions with a `__done__` marker are skipped. + +Checkpoint keys include the source table version, so stale checkpoints from a previous source version are automatically ignored when the source changes. + + +Checkpointed UDTFs must be **deterministic** — the same input must yield the same batch sequence for resume to work correctly. + + +## Examples + +### K-Means Clustering + +```python +@udtf( + output_schema=pa.schema([ + pa.field("row_id", pa.int64()), + pa.field("cluster_id", pa.int64()), + pa.field("distance_to_centroid", pa.float64()), + ]), + num_cpus=4, + memory=16 * 1024**3, +) +class KMeansClustering: + def __init__(self, k: int = 100): + self.k = k + + def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]: + import numpy as np + embeddings = source.to_arrow().select(["row_id", "embedding"]) + row_ids = embeddings.column("row_id").to_pylist() + vectors = np.stack(embeddings.column("embedding").to_pylist()) + centroids, assignments, distances = self._fit(vectors) + + chunk_size = 10_000 + for start in range(0, len(row_ids), chunk_size): + end = min(start + chunk_size, len(row_ids)) + yield pa.RecordBatch.from_pydict({ + "row_id": row_ids[start:end], + "cluster_id": assignments[start:end].tolist(), + "distance_to_centroid": distances[start:end].tolist(), + }) +``` + +### Aggregation + +```python +@udtf( + output_schema=pa.schema([ + pa.field("label", pa.string()), + pa.field("count", pa.int64()), + pa.field("mean_score", pa.float64()), + ]), +) +class GroupStats: + def __init__(self, group_by: str = "label"): + self.group_by = group_by + + def __call__(self, source: geneva.GenevaQueryBuilder) -> Iterator[pa.RecordBatch]: + df = source.to_pandas() + agg = df.groupby(self.group_by).agg( + count=("label", "size"), + mean_score=("score", "mean"), + ).reset_index() + yield pa.RecordBatch.from_pandas(agg) +``` + +Reference: +* [`create_udtf_view` API](https://lancedb.github.io/geneva/api/connection/#geneva.db.Connection.create_udtf_view) diff --git a/docs/geneva/udfs/blobs.mdx b/docs/geneva/udfs/blobs.mdx index ac731d1..f6bd4c3 100644 --- a/docs/geneva/udfs/blobs.mdx +++ b/docs/geneva/udfs/blobs.mdx @@ -9,7 +9,7 @@ Geneva supports UDFs that take [Lance Blobs](https://lancedb.github.io/lance/gui ## Reading Blobs -Defining functions that read blob columns is straight forward. +Defining functions that read blob columns is straight forward. For scalar UDFs, blob columns are expected to be of type `BlobFile` @@ -62,7 +62,7 @@ def batch_to_blob(batch: pa.RecordBatch) -> pa.Array: blobs = [] for i in range(batch.num_rows): # do something that returns bytes - blob_data = ... + blob_data = ... blobs.append(blob_data) return pa.array(blobs, type=pa.large_binary()) ``` diff --git a/docs/geneva/udfs/index.mdx b/docs/geneva/udfs/index.mdx index 5074beb..fe0037f 100644 --- a/docs/geneva/udfs/index.mdx +++ b/docs/geneva/udfs/index.mdx @@ -1,284 +1,97 @@ --- -title: User-Defined Functions -sidebarTitle: Working with UDFs -icon: function +title: Understanding Transforms +sidebarTitle: Understanding Transforms +description: Understand the three types of user-defined functions in Geneva — UDFs, scalar UDTFs, and batch UDTFs — and when to use each. +icon: code-compare --- -## Converting functions into UDFs +Geneva provides three types of user-defined functions for transforming data. Each type has a different input/output cardinality and is suited to different workflows. -Converting your Python code to a Geneva UDF is simple. There are three kinds of UDFs that you can provide — scalar UDFs, batched UDFs and stateful UDFs. +## Choosing the Right Type -In all cases, Geneva uses Python type hints from your functions to infer the input and output -[arrow data types](https://arrow.apache.org/docs/python/api/datatypes.html) that LanceDB uses. +- **Adding a column to each row?** Use a [**UDF**](/geneva/udfs/udfs). +- **Splitting each row into multiple rows?** Use a [**Scalar UDTF**](/geneva/udfs/scalar-udtfs). +- **Computing across rows with a different output shape?** Use a [**Batch UDTF**](/geneva/udfs/batch-udtfs). -### Scalar UDFs +## At a Glance -The **simplest** form is a scalar UDF, which processes one row at a time: +| | UDF | Scalar UDTF | Batch UDTF | +|---|---|---|---| +| **Cardinality** | 1:1 | 1:N | N:M | +| **Decorator** | `@udf` | `@scalar_udtf` | `@udtf` | +| **Refresh** | Incremental | Incremental | Full | +| **Parallelism** | Fragment-parallel | Fragment-parallel | Partition-parallel | +| **Inherited columns** | N/A — adds to existing rows | Automatic from query | Independent output schema | +| **Registration** | `table.add_columns()` | `db.create_materialized_view(udtf=)` | `db.create_udtf_view()` | -```python -from geneva import udf +## UDFs (1:1) -@udf -def area_udf(x: int, y: int) -> int: - return x * y -``` +Standard UDFs produce exactly **one output value per input row**. Use them to add computed columns to existing tables or materialized views. -This UDF will take the value of `x` and value of `y` from each row and return the product. The `@udf` wrapper is all that is needed. +| id | text | **embedding** | +|----|------|---------------| +| 1 | "hello world" | **→ [0.12, 0.34, ...]** | +| 2 | "foo bar" | **→ [0.56, 0.78, ...]** | +| 3 | "baz qux" | **→ [0.90, 0.11, ...]** | -### Batched UDFs +Each input row produces exactly one output value. The new column is added to the same table. -For **better performance**, you can also define batch UDFs that process multiple rows at once. +**Use cases**: Embeddings, data enrichment, format conversion, scoring. -You can use `pyarrow.Array`s: +See [UDFs](/geneva/udfs/udfs) for the full guide. -```python -import pyarrow as pa -from geneva import udf +## Scalar UDTFs (1:N) -@udf(data_type=pa.int32()) -def batch_filename_len(filename: pa.Array) -> pa.Array: - lengths = [len(str(f)) for f in filename] - return pa.array(lengths, type=pa.int32()) -``` +Scalar UDTFs **expand each source row into multiple output rows**. The output is a materialized view that inherits parent columns and supports incremental refresh. -Or take entire rows using `pyarrow.RecordBatch`: +**Source: `documents`** -```python -import pyarrow as pa -from geneva import udf +| doc_id | title | text | +|--------|-------|------| +| 1 | "Intro to AI" | "Machine learning is..." | +| 2 | "Data Guide" | "Data pipelines are..." | -@udf(data_type=pa.int32()) -def recordbatch_filename_len(batch: pa.RecordBatch) -> pa.Array: - filenames = batch["filename"] - lengths = [len(str(f)) for f in filenames] - return pa.array(lengths, type=pa.int32()) -``` +**Derived: `chunks`** (1:N expansion via `@scalar_udtf`) -> **Note**: Batch UDFS require you to specify `data_type` in the ``@udf`` decorator for batched UDFs which defines `pyarrow.DataType` of the returned `pyarrow.Array`. +| doc_id | title | chunk_index | chunk_text | +|--------|-------|-------------|------------| +| 1 | "Intro to AI" | 0 | "Machine learning..." | +| 1 | "Intro to AI" | 1 | "Neural networks..." | +| 1 | "Intro to AI" | 2 | "Training data..." | +| | | | | +| 2 | "Data Guide" | 0 | "Data pipelines..." | +| 2 | "Data Guide" | 1 | "ETL processes..." | -### Struct fields and list inputs +Each source row produces **one or more** output rows. Parent columns (`doc_id`, `title`) are inherited automatically. -You can pass nested `struct` fields directly into a UDF by specifying `input_columns` with dot notation. For list-typed inputs, Geneva can pass a NumPy array when the argument is annotated as `np.ndarray` (use `np.ndarray | None` for nullable lists). +**Use cases**: Document chunking, video segmentation, image tiling. -```python -import numpy as np -import pyarrow as pa -from geneva import udf +See [Scalar UDTFs](/geneva/udfs/scalar-udtfs) for the full guide. -struct_type = pa.struct([("vals", pa.list_(pa.int32()))]) -schema = pa.schema([pa.field("info", struct_type)]) +## Batch UDTFs (N:M) -@udf(data_type=pa.int32(), input_columns=["info.vals"]) -def sum_vals(vals: np.ndarray | None) -> int | None: - if vals is None: - return None - assert isinstance(vals, np.ndarray) - return int(np.sum(vals)) -``` +Batch UDTFs read from a source table (or partition) and **produce output with an arbitrary schema and row count**. They always perform a full refresh. -### Stateful UDFs +**Source: `sales`** -You can also define a **stateful** UDF that retains its state across calls. +| product | region | amount | +|---------|--------|--------| +| Widget | East | 100 | +| Widget | East | 250 | +| Widget | West | 175 | +| Gadget | East | 300 | +| Gadget | West | 400 | +| Gadget | West | 150 | -This can be used to share code and **parameterize your UDFs**. In the example below, the model being used is a parameter that can be specified at UDF registration time. It can also be used to paramterize input column names of `pa.RecordBatch` batch UDFS. +**Derived: `sales_summary`** (N:M aggregation via `@udtf`) -This also can be used to **optimize expensive initialization** that may require heavy resource on the distributed workers. For example, this can be used to load an model to the GPU once for all records sent to a worker instead of once per record or per batch of records. +| product | total_amount | avg_amount | num_sales | +|---------|-------------|------------|-----------| +| Widget | 525 | 175.0 | 3 | +| Gadget | 850 | 283.3 | 3 | -A stateful UDF is a `Callable` class, with `__call__()` method. The call method can be a scalar function or a batched function. +6 input rows become 2 output rows with a completely different schema. The output shape is determined entirely by the UDTF logic — it could be fewer rows (aggregation), more rows (clustering), or the same count with different columns. -```python -from typing import Callable -from openai import OpenAI +**Use cases**: Deduplication, clustering, aggregation, cross-row joins. -@udf(data_type=pa.list_(pa.float32(), 1536)) -class OpenAIEmbedding(Callable): - def __init__(self, model: str = "text-embedding-3-small"): - self.model = model - # Per-worker openai client - self.client: OpenAI | None = None - - def __call__(self, text: str) -> pa.Array: - if self.client is None: - self.client = OpenAI() - - resp = self.client.embeddings.create(model=self.model, input=text) - return pa.array(resp.data[0].embeddings) -``` - - -For common providers like OpenAI and Gemini, Geneva ships [built-in UDFs](/geneva/udfs/providers) that handle API keys, retries, and batching for you — no custom class needed. - - -> **Note**: The state is will be independently managed on each distributed Worker. - -## UDF options - -The `udf` can have extra annotations that specify resource requirements and operational characteristics. -These are just add parameters to the `udf(...)`. - -### Resource requirements for UDFs - -Some workers may require specific resources such as gpus, cpus and certain amounts of RAM. - -You can provide these requirements by adding `num_cpus`, `num_gpus`, and `memory` parameters to the UDF. - -```python -@udf(..., num_cpus=1, num_gpus=0.5, memory = 4 * 1024**3) # require 1 CPU, 0.5 GPU, and 4GiB RAM -def func(...): - ... -``` - -### Operational parameters for UDFs - -#### checkpoint_size - -`checkpoint_size` controls how many rows are processed before checkpointing, and therefore reporting and saving progress. - -UDFs can be quite varied: some can be simple operations where thousands of calls can be completed per second, while others may be slow and require 30s per row. So a simple default like "every 1000 rows" might write once a second or once every 8 hours! - -Geneva will handle this internally, using an experimental feature that will adapt checkpoint sizing as a UDF progresses. However, if you want to see writes more or less frequently, you can set this manually. There are three parameters: - -- `checkpoint_size`: the seed for the initial checkpoint size -- `min_checkpoint_size`: the minimum value that Geneva will use while adapting checkpoint size -- `max_checkpoint_size`: the maximum value that Geneva will use while adapting checkpoint size - -Therefore, to force a checkpoint size (and effectively disable adaptive batch sizing), set all three of these parameters to the same value. - -### Error handling - -Depending on the UDF, you may want Geneva to ignore rows that hit failures, retry, or fail the entire job. For simple cases, Geneva provides a simple parameter, `on_error`, with the following options: - -| Function | Behavior | -|----------|----------| -| `retry_transient()` | Retry `ConnectionError`, `TimeoutError`, `OSError` with exponential backoff | -| `retry_all()` | Retry any exception with exponential backoff | -| `skip_on_error()` | Return `None` for any exception (skip the row) | -| `fail_fast()` | Fail immediately on any exception (default behavior) | - -If those are not specific enough, Geneva also provides [many more error handling options](/geneva/udfs/error_handling). - -## Registering Features with UDFs - -Registering a feature is done by providing the `Table.add_columns()` function a new column name and the Geneva UDF. - -```python -import geneva -import numpy as np -import pyarrow as pa - -lancedb_uri="gs://bucket/db" -db = geneva.connect(lancedb_uri) - -# Define schema for the video table -schema = pa.schema([ - ("filename", pa.string()), - ("duration_sec", pa.float32()), - ("x", pa.int32()), - ("y", pa.int32()), -]) -tbl = db.create_table("videos", schema=schema, mode="overwrite") - -# Generate fake data -N = 10 -data = { - "filename": [f"video_{i}.mp4" for i in range(N)], - "duration_sec": np.random.uniform(10, 300, size=N).astype(np.float32), - "x": np.random.choice([640, 1280, 1920], size=N), - "y": np.random.choice([360, 720, 1080], size=N), - "caption": [f"this is video {i}" for i in range(N)] -} - -# Convert to Arrow Table and add to LanceDB -batch = pa.table(data, schema=schema) -tbl.add(batch) -``` - -Here's how to register a simple UDF: -```python -@udf -def area_udf(x: int, y: int) -> int: - return x * y - -@udf -def download_udf(filename: str) -> bytes: - ... - -# {'new column name': , ...} -# simple_udf's arguments are `x` and `y` so the input columns are -# inferred to be columns `x` amd `y` -tbl.add_columns({"area": area_udf, "content": download_udf }) -``` - -Batched UDFs require return type in their `udf` annotations - -```python -@udf(data_type=pa.int32()) -def batch_filename_len(filename: pa.Array) -> pa.Array: - ... - -# {'new column name': } -# batch_filename_len's input, `filename` input column is -# specified by the UDF's argument name. -tbl.add_columns({"filename_len": batch_filename_len}) -``` - -or - -```python -@udf(data_type=pa.int32()) -def recordbatch_filename_len(batch: pa.RecordBatch) -> pa.Array: - ... - -# {'new column name': } -# batch_filename_len's input. pa.RecordBatch typed UDF -# argument pulls in all the column values for each row. -tbl.add_columns({"filename_len": recordbatch_filename_len}) -``` - -Similarly, a stateful UDF is registered by providing an instance of the Callable object. The call method may be a per-record function or a batch function. -```python -@udf(data_type=pa.list_(pa.float32(), 1536)) -class OpenAIEmbedding(Callable): - ... - def __call__(self, text: str) -> pa.Array: - ... - -# OpenAIEmbedding's call method input is inferred to be 'text' of -# type string from the __call__'s arguments, and its output type is -# a fixed size list of float32. -tbl.add_columns({"embedding": OpenAIEmbedding()}) -``` - -## Changing data in computed columns - -Let's say you backfilled data with your UDF then you noticed that your data has some issues. Here are a few scenarios: - -1. All the values are incorrect due to a bug in the UDF. -2. Most values are correct but some values are incorrect due to a failure in UDF execution. -3. Values calculated correctly and you want to perform a second pass to fixup some of the values. - -In scenario 1, you'll most likely want to replaced the UDF with a new version and recalulate all the values. You should perform a `alter_table` and then `backfill`. - -In scenario 2, you'll most likely want to re-execute `backfill` to fill in the values. If the error is in your code (certain cases not handled), you can modify the UDF, and perform an `alter_table`, and then `backfill` with some filters. - -In scenario 3, you have a few options. A) You could `alter` your UDF and include the fixup operations in the UDF. You'd `alter_table` and then `backfill` recalculating all the values. B) You could have a chain of computed columns -- create a new column, calculate the "fixed" up values and have your application use the new column or a combination of the original column. This is similar to A but does not recalulate A and can incur more storage. C) You could `update` the values in the the column with the fixed up values. This may be expedient but also sacrifices reproducability. - -The next section shows you how to change your column definition by `alter`ing the UDF. - -## Altering UDFs - -You now want to revise the code. To make the change, you'd update the UDF used to compute the column using the `alter_columns` API and the updated function. The example below replaces the definition of column `area` to use the `area_udf_v2` function. - -```python -table.alter_columns({"path": "area", "udf": area_udf_v2} ) -``` - -After making this change, the existing data already in the table does not change. However, when you perform your next basic `backfill` operation, all values would be recalculated and updated. If you only wanted some rows updated, you could perform a filtered backfill, targeting the specific rows that need the new upates. - -For example, this filter would only update the rows where area was currently null. -```python -table.backfill("area", where="area is null") -``` - -Reference: -* [`alter_columns` API](https://lancedb.github.io/geneva/api/table/#geneva.table.Table.alter_columns) +See [Batch UDTFs](/geneva/udfs/batch-udtfs) for the full guide. diff --git a/docs/geneva/udfs/scalar-udtfs.mdx b/docs/geneva/udfs/scalar-udtfs.mdx new file mode 100644 index 0000000..e826a65 --- /dev/null +++ b/docs/geneva/udfs/scalar-udtfs.mdx @@ -0,0 +1,229 @@ +--- +title: Scalar User-Defined Table Functions (UDTFs) +sidebarTitle: Scalar UDTFs +description: Use scalar UDTFs for 1:N row expansion — split videos into clips, chunk documents, or tile images with automatic parent column inheritance and incremental refresh. +icon: diagram-subtask +--- + +Beta — introduced in Geneva 0.11.0 + +Standard UDFs produce exactly **one output value per input row**. Scalar UDTFs enable **1:N row expansion** — each source row can produce multiple output rows. The results are stored as a materialized view with MV-style incremental refresh. + +| Source Table | Derived Table | Expansion | +|---|---|---| +| 1 video row | → N clip rows | Video segmentation | +| 1 document row | → N chunk rows | Text chunking | +| 1 image row | → N tile rows | Image tiling | + +## Defining a Scalar UDTF + +Use the `@scalar_udtf` decorator on a function that **yields** output rows. Geneva infers the output schema from the return type annotation. + +```python +from geneva import scalar_udtf +from typing import Iterator, NamedTuple + +class Clip(NamedTuple): + clip_start: float + clip_end: float + clip_bytes: bytes + +@scalar_udtf +def extract_clips(video_path: str, duration: float) -> Iterator[Clip]: + """Yields multiple clips per video.""" + clip_length = 10.0 + for start in range(0, int(duration), int(clip_length)): + end = min(start + clip_length, duration) + clip_data = extract_video_segment(video_path, start, end) + yield Clip(clip_start=start, clip_end=end, clip_bytes=clip_data) +``` + +Input parameters are bound to source columns **by name** — the parameter `video_path` binds to source column `video_path`, just like standard UDFs. + + +A scalar UDTF can yield **zero rows** for a source row. The source row is still marked as processed and will not be retried on the next refresh. + + +### List return pattern + +If you prefer to build the full list in memory rather than yielding, you can return a `list` instead of an `Iterator`: + +```python +@scalar_udtf +def extract_clips(video_path: str, duration: float) -> list[Clip]: + clips = [] + for start in range(0, int(duration), 10): + end = min(start + 10, duration) + clips.append(Clip(clip_start=start, clip_end=end, clip_bytes=b"...")) + return clips +``` + +### Batched scalar UDTF + +For vectorized processing, use `batch=True`. The function receives Arrow arrays and returns a `RecordBatch` of expanded rows: + +```python +@scalar_udtf(batch=True) +def extract_clips(batch: pa.RecordBatch) -> pa.RecordBatch: + """Process rows in batches. Same 1:N semantic per row.""" + ... +``` + +## Creating a Scalar UDTF View + +Scalar UDTFs use the existing `create_materialized_view` API with a `udtf=` parameter: + +```python +import geneva + +db = geneva.connect("/data/mydb") +videos = db.open_table("videos") + +# Create the 1:N materialized view +clips = db.create_materialized_view( + "clips", + query=videos.search(None).select(["video_path", "metadata"]), + udtf=extract_clips, +) + +# Populate — runs the UDTF on every source row +clips.refresh() +``` + +The `query` parameter controls which source columns are inherited. Columns listed in `.select()` are carried into every child row automatically. + +### Inheriting source columns + +```python +# Only video_path and metadata are inherited into the clips table +clips = db.create_materialized_view( + "clips", + query=videos.search(None).select(["video_path", "metadata"]), + udtf=extract_clips, +) +``` + +## Inherited Columns + +Child rows automatically include the parent's columns — no manual join required. The columns available in the child table are determined by the query's `.select()`: + +### `videos` table (source) + +| video_path | duration | metadata | +|------------|----------|----------| +| /v/a.mp4 | 120.0 | \{fps: 30\} | +| /v/b.mp4 | 60.0 | \{fps: 24\} | + +### `clips` table (derived, 1:N) + +| video_path | metadata | clip_start | clip_end | clip_bytes | +|------------|----------|------------|----------|------------| +| /v/a.mp4 | \{fps: 30\} | 0.0 | 10.0 | b"\x00\x1a..." | +| /v/a.mp4 | \{fps: 30\} | 10.0 | 20.0 | b"\x00\x2b..." | +| /v/a.mp4 | \{fps: 30\} | 20.0 | 30.0 | b"\x00\x3c..." | +| | | | | | +| /v/b.mp4 | \{fps: 24\} | 0.0 | 10.0 | b"\x00\x4d..." | +| /v/b.mp4 | \{fps: 24\} | 10.0 | 20.0 | b"\x00\x5e..." | + +The first three rows come from the `/v/a.mp4` source row, the last two from `/v/b.mp4`. Inherited columns (`video_path`, `metadata`) are carried over automatically; `clip_start`, `clip_end`, and `clip_bytes` are generated by the UDTF. + +## Adding Computed Columns After Creation + +Since scalar UDTF views are materialized views, you can add UDF-computed columns to the child table and backfill them: + +```python +@udf(data_type=pa.list_(pa.float32(), 512)) +def clip_embedding(clip_bytes: bytes) -> list[float]: + return embed_model.encode(clip_bytes) + +# Add an embedding column to the clips table +clips.add_columns({"embedding": clip_embedding}) + +# Backfill computes embeddings for all existing clips +clips.refresh() +``` + +This is a powerful pattern: expand source rows with a scalar UDTF, then enrich the expanded rows with standard UDFs. + +## Incremental Refresh + +Scalar UDTFs support **incremental refresh**, just like standard materialized views: + +- **New source rows**: The UDTF runs on new rows, inserting child rows. +- **Deleted source rows**: Child rows linked to the deleted parent are cascade-deleted. +- **Updated source rows**: Old children are deleted, UDTF re-runs, new children inserted. + +```python +# Add new videos to the source table +videos.add(new_video_data) + +# Incremental refresh — only processes the new videos +clips.refresh() +``` + +Only the new source rows are processed. Existing clips from previous refreshes are untouched. + +## Chaining UDTF Views + +Scalar UDTF views are standard materialized views, so they can serve as the source for further views: + +```python +# videos → clips (1:N) +clips = db.create_materialized_view( + "clips", query=videos.search(None), udtf=extract_clips +) + +# clips → frames (1:N) +frames = db.create_materialized_view( + "frames", query=clips.search(None), udtf=extract_frames +) +``` + +## Full Example: Document Chunking + +```python +from geneva import connect, scalar_udtf, udf +from typing import Iterator, NamedTuple +import pyarrow as pa + +class Chunk(NamedTuple): + chunk_index: int + chunk_text: str + +@scalar_udtf +def chunk_document(text: str) -> Iterator[Chunk]: + """Split a document into overlapping chunks.""" + words = text.split() + chunk_size = 500 + overlap = 50 + for i, start in enumerate(range(0, len(words), chunk_size - overlap)): + chunk_words = words[start:start + chunk_size] + yield Chunk(chunk_index=i, chunk_text=" ".join(chunk_words)) + +db = connect("/data/mydb") +docs = db.open_table("documents") + +# Create chunked view — inherits doc_id, title, etc. from source +chunks = db.create_materialized_view( + "doc_chunks", + query=docs.search(None).select(["doc_id", "title", "text"]), + udtf=chunk_document, +) +chunks.refresh() + +# Add embeddings to chunks for semantic search +@udf(data_type=pa.list_(pa.float32(), 1536)) +def embed_text(chunk_text: str) -> list[float]: + return embedding_model.encode(chunk_text) + +chunks.add_columns({"embedding": embed_text}) +chunks.refresh() # Backfills embeddings on all existing chunks + +# Query — parent columns available alongside chunk columns +chunks.search(None).select(["doc_id", "title", "chunk_text", "embedding"]).to_pandas() +``` + +For a comparison of all three function types (UDFs, Scalar UDTFs, Batch UDTFs), see [Understanding Transforms](/geneva/udfs). + +Reference: +* [`create_materialized_view` API](https://lancedb.github.io/geneva/api/connection/#geneva.db.Connection.create_materialized_view) diff --git a/docs/geneva/udfs/udfs.mdx b/docs/geneva/udfs/udfs.mdx new file mode 100644 index 0000000..d50ad37 --- /dev/null +++ b/docs/geneva/udfs/udfs.mdx @@ -0,0 +1,287 @@ +--- +title: User-Defined Functions (UDFs) +sidebarTitle: UDFs +description: Define 1:1 transforms that add computed columns to your tables — embeddings, enrichment, scoring, and more. +icon: function +--- + +UDFs are the core building block for feature engineering in Geneva. A UDF wraps a Python function and applies it to every row in a table, producing exactly **one output value per input row** (1:1). Use UDFs to compute embeddings, enrich data with external APIs, transform formats, or derive new features from existing columns. + +## Defining a UDF + +Converting your Python code to a Geneva UDF is simple. There are three kinds of UDFs that you can provide — scalar UDFs, batched UDFs and stateful UDFs. + +In all cases, Geneva uses Python type hints from your functions to infer the input and output +[arrow data types](https://arrow.apache.org/docs/python/api/datatypes.html) that LanceDB uses. + +### Scalar UDFs + +The **simplest** form is a scalar UDF, which processes one row at a time: + +```python +from geneva import udf + +@udf +def area_udf(x: int, y: int) -> int: + return x * y +``` + +This UDF will take the value of `x` and value of `y` from each row and return the product. The `@udf` wrapper is all that is needed. + +### Batched UDFs + +For **better performance**, you can also define batch UDFs that process multiple rows at once. + +You can use `pyarrow.Array`s: + +```python +import pyarrow as pa +from geneva import udf + +@udf(data_type=pa.int32()) +def batch_filename_len(filename: pa.Array) -> pa.Array: + lengths = [len(str(f)) for f in filename] + return pa.array(lengths, type=pa.int32()) +``` + +Or take entire rows using `pyarrow.RecordBatch`: + +```python +import pyarrow as pa +from geneva import udf + +@udf(data_type=pa.int32()) +def recordbatch_filename_len(batch: pa.RecordBatch) -> pa.Array: + filenames = batch["filename"] + lengths = [len(str(f)) for f in filenames] + return pa.array(lengths, type=pa.int32()) +``` + +> **Note**: Batch UDFS require you to specify `data_type` in the ``@udf`` decorator for batched UDFs which defines `pyarrow.DataType` of the returned `pyarrow.Array`. + +### Struct fields and list inputs + +You can pass nested `struct` fields directly into a UDF by specifying `input_columns` with dot notation. For list-typed inputs, Geneva can pass a NumPy array when the argument is annotated as `np.ndarray` (use `np.ndarray | None` for nullable lists). + +```python +import numpy as np +import pyarrow as pa +from geneva import udf + +struct_type = pa.struct([("vals", pa.list_(pa.int32()))]) +schema = pa.schema([pa.field("info", struct_type)]) + +@udf(data_type=pa.int32(), input_columns=["info.vals"]) +def sum_vals(vals: np.ndarray | None) -> int | None: + if vals is None: + return None + assert isinstance(vals, np.ndarray) + return int(np.sum(vals)) +``` + +### Stateful UDFs + +You can also define a **stateful** UDF that retains its state across calls. + +This can be used to share code and **parameterize your UDFs**. In the example below, the model being used is a parameter that can be specified at UDF registration time. It can also be used to paramterize input column names of `pa.RecordBatch` batch UDFS. + +This also can be used to **optimize expensive initialization** that may require heavy resource on the distributed workers. For example, this can be used to load an model to the GPU once for all records sent to a worker instead of once per record or per batch of records. + +A stateful UDF is a `Callable` class, with `__call__()` method. The call method can be a scalar function or a batched function. + +```python +from typing import Callable +from openai import OpenAI + +@udf(data_type=pa.list_(pa.float32(), 1536)) +class OpenAIEmbedding(Callable): + def __init__(self, model: str = "text-embedding-3-small"): + self.model = model + # Per-worker openai client + self.client: OpenAI | None = None + + def __call__(self, text: str) -> pa.Array: + if self.client is None: + self.client = OpenAI() + + resp = self.client.embeddings.create(model=self.model, input=text) + return pa.array(resp.data[0].embeddings) +``` + + +For common providers like OpenAI and Gemini, Geneva ships [built-in UDFs](/geneva/udfs/providers) that handle API keys, retries, and batching for you — no custom class needed. + + +> **Note**: The state is will be independently managed on each distributed Worker. + +## UDF options + +The `udf` can have extra annotations that specify resource requirements and operational characteristics. +These are just add parameters to the `udf(...)`. + +### Resource requirements for UDFs + +Some workers may require specific resources such as gpus, cpus and certain amounts of RAM. + +You can provide these requirements by adding `num_cpus`, `num_gpus`, and `memory` parameters to the UDF. + +```python +@udf(..., num_cpus=1, num_gpus=0.5, memory = 4 * 1024**3) # require 1 CPU, 0.5 GPU, and 4GiB RAM +def func(...): + ... +``` + +### Operational parameters for UDFs + +#### checkpoint_size + +`checkpoint_size` controls how many rows are processed before checkpointing, and therefore reporting and saving progress. + +UDFs can be quite varied: some can be simple operations where thousands of calls can be completed per second, while others may be slow and require 30s per row. So a simple default like "every 1000 rows" might write once a second or once every 8 hours! + +Geneva will handle this internally, using an experimental feature that will adapt checkpoint sizing as a UDF progresses. However, if you want to see writes more or less frequently, you can set this manually. There are three parameters: + +- `checkpoint_size`: the seed for the initial checkpoint size +- `min_checkpoint_size`: the minimum value that Geneva will use while adapting checkpoint size +- `max_checkpoint_size`: the maximum value that Geneva will use while adapting checkpoint size + +Therefore, to force a checkpoint size (and effectively disable adaptive batch sizing), set all three of these parameters to the same value. + +### Error handling + +Depending on the UDF, you may want Geneva to ignore rows that hit failures, retry, or fail the entire job. For simple cases, Geneva provides a simple parameter, `on_error`, with the following options: + +| Function | Behavior | +|----------|----------| +| `retry_transient()` | Retry `ConnectionError`, `TimeoutError`, `OSError` with exponential backoff | +| `retry_all()` | Retry any exception with exponential backoff | +| `skip_on_error()` | Return `None` for any exception (skip the row) | +| `fail_fast()` | Fail immediately on any exception (default behavior) | + +If those are not specific enough, Geneva also provides [many more error handling options](/geneva/udfs/error_handling). + +## Registering Features with UDFs + +Registering a feature is done by providing the `Table.add_columns()` function a new column name and the Geneva UDF. + +```python +import geneva +import numpy as np +import pyarrow as pa + +lancedb_uri="gs://bucket/db" +db = geneva.connect(lancedb_uri) + +# Define schema for the video table +schema = pa.schema([ + ("filename", pa.string()), + ("duration_sec", pa.float32()), + ("x", pa.int32()), + ("y", pa.int32()), +]) +tbl = db.create_table("videos", schema=schema, mode="overwrite") + +# Generate fake data +N = 10 +data = { + "filename": [f"video_{i}.mp4" for i in range(N)], + "duration_sec": np.random.uniform(10, 300, size=N).astype(np.float32), + "x": np.random.choice([640, 1280, 1920], size=N), + "y": np.random.choice([360, 720, 1080], size=N), + "caption": [f"this is video {i}" for i in range(N)] +} + +# Convert to Arrow Table and add to LanceDB +batch = pa.table(data, schema=schema) +tbl.add(batch) +``` + +Here's how to register a simple UDF: +```python +@udf +def area_udf(x: int, y: int) -> int: + return x * y + +@udf +def download_udf(filename: str) -> bytes: + ... + +# {'new column name': , ...} +# simple_udf's arguments are `x` and `y` so the input columns are +# inferred to be columns `x` amd `y` +tbl.add_columns({"area": area_udf, "content": download_udf }) +``` + +Batched UDFs require return type in their `udf` annotations + +```python +@udf(data_type=pa.int32()) +def batch_filename_len(filename: pa.Array) -> pa.Array: + ... + +# {'new column name': } +# batch_filename_len's input, `filename` input column is +# specified by the UDF's argument name. +tbl.add_columns({"filename_len": batch_filename_len}) +``` + +or + +```python +@udf(data_type=pa.int32()) +def recordbatch_filename_len(batch: pa.RecordBatch) -> pa.Array: + ... + +# {'new column name': } +# batch_filename_len's input. pa.RecordBatch typed UDF +# argument pulls in all the column values for each row. +tbl.add_columns({"filename_len": recordbatch_filename_len}) +``` + +Similarly, a stateful UDF is registered by providing an instance of the Callable object. The call method may be a per-record function or a batch function. +```python +@udf(data_type=pa.list_(pa.float32(), 1536)) +class OpenAIEmbedding(Callable): + ... + def __call__(self, text: str) -> pa.Array: + ... + +# OpenAIEmbedding's call method input is inferred to be 'text' of +# type string from the __call__'s arguments, and its output type is +# a fixed size list of float32. +tbl.add_columns({"embedding": OpenAIEmbedding()}) +``` + +## Changing data in computed columns + +Let's say you backfilled data with your UDF then you noticed that your data has some issues. Here are a few scenarios: + +1. All the values are incorrect due to a bug in the UDF. +2. Most values are correct but some values are incorrect due to a failure in UDF execution. +3. Values calculated correctly and you want to perform a second pass to fixup some of the values. + +In scenario 1, you'll most likely want to replaced the UDF with a new version and recalulate all the values. You should perform a `alter_table` and then `backfill`. + +In scenario 2, you'll most likely want to re-execute `backfill` to fill in the values. If the error is in your code (certain cases not handled), you can modify the UDF, and perform an `alter_table`, and then `backfill` with some filters. + +In scenario 3, you have a few options. A) You could `alter` your UDF and include the fixup operations in the UDF. You'd `alter_table` and then `backfill` recalculating all the values. B) You could have a chain of computed columns -- create a new column, calculate the "fixed" up values and have your application use the new column or a combination of the original column. This is similar to A but does not recalulate A and can incur more storage. C) You could `update` the values in the the column with the fixed up values. This may be expedient but also sacrifices reproducability. + +The next section shows you how to change your column definition by `alter`ing the UDF. + +## Altering UDFs + +You now want to revise the code. To make the change, you'd update the UDF used to compute the column using the `alter_columns` API and the updated function. The example below replaces the definition of column `area` to use the `area_udf_v2` function. + +```python +table.alter_columns({"path": "area", "udf": area_udf_v2} ) +``` + +After making this change, the existing data already in the table does not change. However, when you perform your next basic `backfill` operation, all values would be recalculated and updated. If you only wanted some rows updated, you could perform a filtered backfill, targeting the specific rows that need the new upates. + +For example, this filter would only update the rows where area was currently null. +```python +table.backfill("area", where="area is null") +``` + +Reference: +* [`alter_columns` API](https://lancedb.github.io/geneva/api/table/#geneva.table.Table.alter_columns)