diff --git a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/ShardScanExecutionContext.java b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/ShardScanExecutionContext.java
index fb0df3f1301d3..8e6a2fc7dfbe6 100644
--- a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/ShardScanExecutionContext.java
+++ b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/ShardScanExecutionContext.java
@@ -10,7 +10,10 @@
import org.apache.arrow.memory.BufferAllocator;
import org.opensearch.analytics.spi.CommonExecutionContext;
+import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
+import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.exec.IndexReaderProvider.Reader;
+import org.opensearch.index.mapper.MapperService;
import org.opensearch.tasks.Task;
/**
@@ -26,6 +29,9 @@ public class ShardScanExecutionContext implements CommonExecutionContext {
private final Task task;
private byte[] fragmentBytes;
private BufferAllocator allocator;
+ private MapperService mapperService;
+ private IndexSettings indexSettings;
+ private NamedWriteableRegistry namedWriteableRegistry;
/**
* Constructs an execution context.
@@ -73,4 +79,34 @@ public BufferAllocator getAllocator() {
public void setAllocator(BufferAllocator allocator) {
this.allocator = allocator;
}
+
+ /** Returns the shard's mapper service for field type resolution. */
+ public MapperService getMapperService() {
+ return mapperService;
+ }
+
+ /** Sets the shard's mapper service. */
+ public void setMapperService(MapperService mapperService) {
+ this.mapperService = mapperService;
+ }
+
+ /** Returns the shard's index settings. */
+ public IndexSettings getIndexSettings() {
+ return indexSettings;
+ }
+
+ /** Sets the shard's index settings. */
+ public void setIndexSettings(IndexSettings indexSettings) {
+ this.indexSettings = indexSettings;
+ }
+
+ /** Returns the NamedWriteableRegistry for deserializing delegated expressions. */
+ public NamedWriteableRegistry getNamedWriteableRegistry() {
+ return namedWriteableRegistry;
+ }
+
+ /** Sets the NamedWriteableRegistry. */
+ public void setNamedWriteableRegistry(NamedWriteableRegistry namedWriteableRegistry) {
+ this.namedWriteableRegistry = namedWriteableRegistry;
+ }
}
diff --git a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AnalyticsSearchBackendPlugin.java b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AnalyticsSearchBackendPlugin.java
index e4722784197f6..37ae28cf0e168 100644
--- a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AnalyticsSearchBackendPlugin.java
+++ b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AnalyticsSearchBackendPlugin.java
@@ -8,6 +8,8 @@
package org.opensearch.analytics.spi;
+import java.util.List;
+
/**
* SPI extension point for backend query engine plugins.
*
@@ -81,4 +83,34 @@ default ExchangeSinkProvider getExchangeSinkProvider() {
default FragmentInstructionHandlerFactory getInstructionHandlerFactory() {
throw new UnsupportedOperationException("getInstructionHandlerFactory not implemented for [" + name() + "]");
}
+
+ /**
+ * Prepare a filter delegation handle for the given delegated expressions.
+ * Called by Core after all instruction handlers have run, when the plan has delegation.
+ *
+ *
The accepting backend initializes its internal state (e.g., DirectoryReader,
+ * QueryShardContext, compiled Queries) and returns a handle that the driving backend
+ * will call into during execution.
+ *
+ * @param expressions the delegated expressions (annotationId + serialized query bytes)
+ * @param ctx the shared execution context (Reader, MapperService, IndexSettings)
+ * @return a handle the driving backend calls into via FFM upcalls
+ */
+ default FilterDelegationHandle getFilterDelegationHandle(List expressions, CommonExecutionContext ctx) {
+ throw new UnsupportedOperationException("getFilterDelegationHandle not implemented for [" + name() + "]");
+ }
+
+ /**
+ * Configure the driving backend to use the given delegation handle during execution.
+ * Called by Core after obtaining the handle from the accepting backend.
+ *
+ *
The driving backend registers the handle so that FFM upcalls from Rust
+ * (createProvider, createCollector, collectDocs) route to it.
+ *
+ * @param handle the delegation handle from the accepting backend
+ * @param backendContext the driving backend's execution context (from instruction handlers)
+ */
+ default void configureFilterDelegation(FilterDelegationHandle handle, BackendExecutionContext backendContext) {
+ throw new UnsupportedOperationException("configureFilterDelegation not implemented for [" + name() + "]");
+ }
}
diff --git a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/DelegationDescriptor.java b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/DelegationDescriptor.java
new file mode 100644
index 0000000000000..86c641517edd8
--- /dev/null
+++ b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/DelegationDescriptor.java
@@ -0,0 +1,52 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.analytics.spi;
+
+import org.opensearch.core.common.io.stream.StreamInput;
+import org.opensearch.core.common.io.stream.StreamOutput;
+import org.opensearch.core.common.io.stream.Writeable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Describes the delegation metadata for a plan alternative. Carried on the wire
+ * alongside the instruction list so that Core can orchestrate the handle exchange
+ * between accepting and driving backends at the data node.
+ *
+ * @opensearch.internal
+ */
+public record DelegationDescriptor(FilterTreeShape treeShape, int delegatedPredicateCount, List delegatedExpressions)
+ implements
+ Writeable {
+
+ public DelegationDescriptor(StreamInput in) throws IOException {
+ this(in.readEnum(FilterTreeShape.class), in.readVInt(), readExpressions(in));
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeEnum(treeShape);
+ out.writeVInt(delegatedPredicateCount);
+ out.writeVInt(delegatedExpressions.size());
+ for (DelegatedExpression expr : delegatedExpressions) {
+ expr.writeTo(out);
+ }
+ }
+
+ private static List readExpressions(StreamInput in) throws IOException {
+ int count = in.readVInt();
+ List expressions = new ArrayList<>(count);
+ for (int i = 0; i < count; i++) {
+ expressions.add(new DelegatedExpression(in));
+ }
+ return expressions;
+ }
+}
diff --git a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/FilterDelegationHandle.java b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/FilterDelegationHandle.java
new file mode 100644
index 0000000000000..6f7f914a36e1a
--- /dev/null
+++ b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/FilterDelegationHandle.java
@@ -0,0 +1,77 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.analytics.spi;
+
+import java.io.Closeable;
+import java.lang.foreign.MemorySegment;
+
+/**
+ * Callback surface for filter delegation between a driving backend and an accepting backend.
+ *
+ *
One handle per query per shard. The accepting backend implements this interface;
+ * the driving backend calls into it via FFM upcalls during execution. Core closes it
+ * after execution completes.
+ *
+ *
Lifecycle:
+ *
+ *
Rust calls {@link #createProvider(int)} once per delegated predicate (per annotationId)
Rust calls {@link #collectDocs(int, int, int, MemorySegment)} per row group
+ *
Rust calls {@link #releaseCollector(int)} when done with a segment
+ *
Rust calls {@link #releaseProvider(int)} when the query ends
+ *
+ *
+ * @opensearch.internal
+ */
+public interface FilterDelegationHandle extends Closeable {
+
+ /**
+ * Create a provider for the given annotation ID. The accepting backend looks up
+ * the pre-compiled query for this annotation and prepares it for segment iteration.
+ *
+ * @param annotationId the annotation ID identifying the delegated predicate
+ * @return a provider key {@code >= 0}, or {@code -1} on failure
+ */
+ int createProvider(int annotationId);
+
+ /**
+ * Create a collector for one (segment, [minDoc, maxDoc)) range.
+ *
+ * @param providerKey key returned by {@link #createProvider(int)}
+ * @param segmentOrd the segment ordinal
+ * @param minDoc inclusive lower bound
+ * @param maxDoc exclusive upper bound
+ * @return a collector key {@code >= 0}, or {@code -1} on failure
+ */
+ int createCollector(int providerKey, int segmentOrd, int minDoc, int maxDoc);
+
+ /**
+ * Fill {@code out} with the matching doc-id bitset for the given collector.
+ *
+ *
Bit layout: word {@code i} contains matches for docs
+ * {@code [minDoc + i*64, minDoc + (i+1)*64)}, LSB-first within each word.
+ *
+ * @param collectorKey key returned by {@link #createCollector(int, int, int, int)}
+ * @param minDoc inclusive lower bound
+ * @param maxDoc exclusive upper bound
+ * @param out destination buffer; implementation writes up to {@code out.byteSize() / 8} words
+ * @return number of words written, or {@code -1} on error
+ */
+ int collectDocs(int collectorKey, int minDoc, int maxDoc, MemorySegment out);
+
+ /**
+ * Release resources for a collector.
+ */
+ void releaseCollector(int collectorKey);
+
+ /**
+ * Release resources for a provider.
+ */
+ void releaseProvider(int providerKey);
+}
diff --git a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/FilterDelegationInstructionNode.java b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/FilterDelegationInstructionNode.java
index d56a5c5bed775..11a947d86ca13 100644
--- a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/FilterDelegationInstructionNode.java
+++ b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/FilterDelegationInstructionNode.java
@@ -44,7 +44,7 @@ public FilterDelegationInstructionNode(StreamInput in) throws IOException {
@Override
public InstructionType type() {
- return InstructionType.SETUP_FILTER_DELEGATION_FOR_INDEX;
+ return InstructionType.SETUP_SHARD_SCAN_WITH_DELEGATION;
}
@Override
diff --git a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/FragmentInstructionHandlerFactory.java b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/FragmentInstructionHandlerFactory.java
index 6e62fa10f012e..f40d7472c2d4d 100644
--- a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/FragmentInstructionHandlerFactory.java
+++ b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/FragmentInstructionHandlerFactory.java
@@ -35,6 +35,9 @@ Optional createFilterDelegationNode(
List delegatedQueries
);
+ /** Creates a shard scan with delegation instruction node — combines scan setup with delegation config. */
+ Optional createShardScanWithDelegationNode(FilterTreeShape treeShape, int delegatedPredicateCount);
+
/** Creates a partial aggregate instruction node. */
Optional createPartialAggregateNode();
diff --git a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/InstructionType.java b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/InstructionType.java
index 490f60a967707..d426e3c8c7c0c 100644
--- a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/InstructionType.java
+++ b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/InstructionType.java
@@ -23,14 +23,14 @@ public enum InstructionType {
/** Base scan setup — reader acquisition, SessionContext creation, default table provider. */
SETUP_SHARD_SCAN,
/**
- * Filter delegation to an index backend — bridge setup, UDF registration, IndexedTableProvider.
+ * Filter delegation to an index backend — bridge setup, UDF registration, custom scan operator.
*
*
TODO: add a DelegationStrategy field (BACKEND_DRIVEN vs CENTRALLY_DRIVEN) to the
* instruction node when centrally-driven delegation is implemented. Currently only
* BACKEND_DRIVEN exists — derived from the backend declaring
* {@code supportedDelegations(DelegationType.FILTER)}.
*/
- SETUP_FILTER_DELEGATION_FOR_INDEX,
+ SETUP_SHARD_SCAN_WITH_DELEGATION,
/** Partial aggregate mode — disable combine optimizer, cut plan to partial-only. */
SETUP_PARTIAL_AGGREGATE,
/** Final aggregate for coordinator reduce — ExchangeSink path, final-only agg. */
@@ -40,7 +40,7 @@ public enum InstructionType {
public InstructionNode readNode(StreamInput in) throws IOException {
return switch (this) {
case SETUP_SHARD_SCAN -> new ShardScanInstructionNode(in);
- case SETUP_FILTER_DELEGATION_FOR_INDEX -> new FilterDelegationInstructionNode(in);
+ case SETUP_SHARD_SCAN_WITH_DELEGATION -> new ShardScanWithDelegationInstructionNode(in);
case SETUP_PARTIAL_AGGREGATE -> new PartialAggregateInstructionNode(in);
case SETUP_FINAL_AGGREGATE -> new FinalAggregateInstructionNode(in);
};
diff --git a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/ShardScanWithDelegationInstructionNode.java b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/ShardScanWithDelegationInstructionNode.java
new file mode 100644
index 0000000000000..18af354e02355
--- /dev/null
+++ b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/ShardScanWithDelegationInstructionNode.java
@@ -0,0 +1,59 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.analytics.spi;
+
+import org.opensearch.core.common.io.stream.StreamInput;
+import org.opensearch.core.common.io.stream.StreamOutput;
+
+import java.io.IOException;
+
+/**
+ * Instruction node for shard scan with filter delegation — extends base shard scan
+ * with {@link FilterTreeShape} and delegated predicate count so the driving backend
+ * can configure its indexed execution path (UDF registration, IndexedTableProvider)
+ * in a single FFM call.
+ *
+ * @opensearch.internal
+ */
+public class ShardScanWithDelegationInstructionNode extends ShardScanInstructionNode {
+
+ private final FilterTreeShape treeShape;
+ private final int delegatedPredicateCount;
+
+ public ShardScanWithDelegationInstructionNode(FilterTreeShape treeShape, int delegatedPredicateCount) {
+ this.treeShape = treeShape;
+ this.delegatedPredicateCount = delegatedPredicateCount;
+ }
+
+ public ShardScanWithDelegationInstructionNode(StreamInput in) throws IOException {
+ super(in);
+ this.treeShape = in.readEnum(FilterTreeShape.class);
+ this.delegatedPredicateCount = in.readVInt();
+ }
+
+ @Override
+ public InstructionType type() {
+ return InstructionType.SETUP_SHARD_SCAN_WITH_DELEGATION;
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ super.writeTo(out);
+ out.writeEnum(treeShape);
+ out.writeVInt(delegatedPredicateCount);
+ }
+
+ public FilterTreeShape getTreeShape() {
+ return treeShape;
+ }
+
+ public int getDelegatedPredicateCount() {
+ return delegatedPredicateCount;
+ }
+}
diff --git a/sandbox/plugins/analytics-backend-datafusion/build.gradle b/sandbox/plugins/analytics-backend-datafusion/build.gradle
index e2504d6f49c55..d382603186f40 100644
--- a/sandbox/plugins/analytics-backend-datafusion/build.gradle
+++ b/sandbox/plugins/analytics-backend-datafusion/build.gradle
@@ -6,12 +6,6 @@
* compatible open source license.
*/
-apply plugin: 'opensearch.internal-cluster-test'
-
-// SQL Unified Query API version (aligned with OpenSearch build version) — required by
-// the PPL transport plugin used in CoordinatorReduceIT.
-def sqlUnifiedQueryVersion = '3.6.0.0-SNAPSHOT'
-
opensearchplugin {
description = 'DataFusion native execution engine plugin for the query engine.'
classname = 'org.opensearch.be.datafusion.DataFusionPlugin'
@@ -85,25 +79,6 @@ dependencies {
testImplementation project(':sandbox:plugins:analytics-backend-lucene')
testCompileOnly 'org.immutables:value-annotations:2.8.8'
- // ── internalClusterTest: end-to-end coordinator-reduce IT ───────────────────
- // Pulls in every sibling plugin needed to construct a parquet-backed composite
- // index, dispatch a multi-shard PPL aggregate, and exercise DatafusionReduceSink.
- internalClusterTestImplementation project(':sandbox:plugins:analytics-engine')
- internalClusterTestImplementation project(':sandbox:plugins:parquet-data-format')
- internalClusterTestImplementation project(':sandbox:plugins:composite-engine')
- internalClusterTestImplementation project(':sandbox:plugins:analytics-backend-lucene')
- internalClusterTestImplementation project(':plugins:arrow-flight-rpc')
- internalClusterTestImplementation("org.opensearch.query:unified-query-api:${sqlUnifiedQueryVersion}") {
- exclude group: 'org.opensearch'
- }
- internalClusterTestImplementation("org.opensearch.query:unified-query-core:${sqlUnifiedQueryVersion}") {
- exclude group: 'org.opensearch'
- }
- internalClusterTestImplementation("org.opensearch.query:unified-query-ppl:${sqlUnifiedQueryVersion}") {
- exclude group: 'org.opensearch'
- }
- // PPL front-end plugin — provides UnifiedPPLExecuteAction transport action used by the IT.
- internalClusterTestImplementation project(':sandbox:plugins:test-ppl-frontend')
}
test {
@@ -169,30 +144,10 @@ task cargoTest(type: Exec) {
check.dependsOn cargoTest
-internalClusterTest {
- jvmArgs '--add-opens=java.base/java.nio=ALL-UNNAMED'
- jvmArgs '--add-opens=java.base/java.lang=ALL-UNNAMED'
- jvmArgs '--add-opens=java.base/sun.nio.ch=ALL-UNNAMED'
- jvmArgs '--enable-native-access=ALL-UNNAMED'
- jvmArgs '-Darrow.memory.debug.allocator=false'
- jvmArgs += ["--add-opens", "java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED"]
- systemProperty 'io.netty.allocator.numDirectArenas', '1'
- systemProperty 'io.netty.noUnsafe', 'false'
- systemProperty 'io.netty.tryUnsafe', 'true'
- systemProperty 'io.netty.tryReflectionSetAccessible', 'true'
- systemProperty 'native.lib.path', project(':sandbox:libs:dataformat-native').ext.nativeLibPath.absolutePath
- dependsOn ':sandbox:libs:dataformat-native:buildRustLibrary'
-}
-
configurations.all {
- // okhttp-aws-signer is a transitive dep of unified-query-common (via unified-query-core),
- // only published on JitPack, not needed for PPL parsing/planning
exclude group: 'com.github.babbel', module: 'okhttp-aws-signer'
resolutionStrategy {
- // Align transitive versions with OpenSearch's managed versions — required because the
- // unified-query-* artifacts pull in older versions of common libs that conflict with
- // OpenSearch's enforced versions on internalClusterTest classpath.
force 'com.google.guava:guava:33.4.0-jre'
force 'com.google.guava:failureaccess:1.0.2'
force 'com.google.errorprone:error_prone_annotations:2.36.0'
diff --git a/sandbox/plugins/analytics-backend-datafusion/rust/src/api.rs b/sandbox/plugins/analytics-backend-datafusion/rust/src/api.rs
index d6bfab0c9fd37..3ccc46e679dc5 100644
--- a/sandbox/plugins/analytics-backend-datafusion/rust/src/api.rs
+++ b/sandbox/plugins/analytics-backend-datafusion/rust/src/api.rs
@@ -557,10 +557,15 @@ pub unsafe fn sender_send(
// `from_ffi` takes the array by value (consumes it) and the schema by
// reference (it is still dropped when `ffi_schema` goes out of scope).
- let array_data = arrow_array::ffi::from_ffi(ffi_array, &ffi_schema).map_err(|e| {
+ let mut array_data = arrow_array::ffi::from_ffi(ffi_array, &ffi_schema).map_err(|e| {
DataFusionError::Execution(format!("Failed to import Arrow C Data array: {}", e))
})?;
+ // Buffers from Java's Flight RPC deserialization may not meet Rust's
+ // native alignment requirements. align_buffers() is a no-op for
+ // already-aligned buffers; only misaligned ones are reallocated.
+ array_data.align_buffers();
+
let struct_array = StructArray::from(array_data);
let batch = RecordBatch::from(struct_array);
diff --git a/sandbox/plugins/analytics-backend-datafusion/rust/src/ffm.rs b/sandbox/plugins/analytics-backend-datafusion/rust/src/ffm.rs
index ce180c060f563..168ee40e9a10c 100644
--- a/sandbox/plugins/analytics-backend-datafusion/rust/src/ffm.rs
+++ b/sandbox/plugins/analytics-backend-datafusion/rust/src/ffm.rs
@@ -435,6 +435,27 @@ pub unsafe extern "C" fn df_create_session_context(
.map_err(|e| e.to_string())
}
+#[ffm_safe]
+#[no_mangle]
+pub unsafe extern "C" fn df_create_session_context_indexed(
+ shard_view_ptr: i64,
+ runtime_ptr: i64,
+ table_name_ptr: *const u8,
+ table_name_len: i64,
+ context_id: i64,
+ tree_shape: i32,
+ delegated_predicate_count: i32,
+) -> i64 {
+ let table_name = str_from_raw(table_name_ptr, table_name_len)
+ .map_err(|e| format!("df_create_session_context_indexed: {}", e))?;
+ let mgr = get_rt_manager()?;
+ mgr.io_runtime
+ .block_on(crate::session_context::create_session_context_indexed(
+ runtime_ptr, shard_view_ptr, table_name, context_id, tree_shape, delegated_predicate_count,
+ ))
+ .map_err(|e| e.to_string())
+}
+
#[ffm_safe]
#[no_mangle]
pub unsafe extern "C" fn df_cache_manager_remove_files(
@@ -565,11 +586,24 @@ pub unsafe extern "C" fn df_execute_with_context(
let mgr = get_rt_manager()?;
let plan_bytes = slice::from_raw_parts(plan_ptr, plan_len as usize);
let cpu_executor = mgr.cpu_executor();
- mgr.io_runtime
- .block_on(crate::query_executor::execute_with_context(
- session_ctx_ptr,
- plan_bytes,
- cpu_executor,
- ))
- .map_err(|e| e.to_string())
+
+ // Route based on whether the session was configured for indexed execution
+ let handle_ref = &*(session_ctx_ptr as *const crate::session_context::SessionContextHandle);
+ if handle_ref.indexed_config.is_some() {
+ mgr.io_runtime
+ .block_on(crate::indexed_executor::execute_indexed_with_context(
+ session_ctx_ptr,
+ plan_bytes.to_vec(),
+ cpu_executor,
+ ))
+ .map_err(|e| e.to_string())
+ } else {
+ mgr.io_runtime
+ .block_on(crate::query_executor::execute_with_context(
+ session_ctx_ptr,
+ plan_bytes,
+ cpu_executor,
+ ))
+ .map_err(|e| e.to_string())
+ }
}
diff --git a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_executor.rs b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_executor.rs
index 12ce9342f5989..a312996595296 100644
--- a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_executor.rs
+++ b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_executor.rs
@@ -11,7 +11,7 @@
//!
//! Per-leaf lifecycle at query time (one compiled-query + per-segment matcher
//! per Collector leaf):
-//! 1. `createProvider(query_bytes)` FFM upcall → `provider_key` (once per
+//! 1. `createProvider(annotation_id)` FFM upcall → `provider_key` (once per
//! Collector leaf, once per query).
//! 2. `createCollector(provider_key, seg, min, max)` FFM upcall → collector
//! (once per SegmentChunk × Collector leaf).
@@ -81,6 +81,10 @@ use crate::indexed_table::page_pruner::{build_pruning_predicate, PagePruneMetric
/// was built from a catalog snapshot). `num_partitions` comes from the caller's
/// session config. `query_memory_pool` is the per-query tracker (same as
/// vanilla path) — `None` disables tracking and uses the global pool.
+// TODO: remove this function once all callers migrate to the instruction-based path
+// TODO: remove once api.rs migrates to instruction-based path directly.
+// Kept as thin wrapper to make existing tests exercise execute_indexed_with_context
+// with minimal changes.
pub async fn execute_indexed_query(
substrait_bytes: Vec,
table_name: String,
@@ -134,18 +138,300 @@ pub async fn execute_indexed_query(
let ctx = SessionContext::new_with_state(state);
ctx.register_udf(create_index_filter_udf());
- // Resolve the object store for this shard's table URL (file://, s3://,
- // gs://, ... whatever the global runtime has registered). We pass this
- // store down to the parquet bridge so per-RG reads go through it instead
- // of hitting the local filesystem directly.
+ // Register default ListingTable so substrait consumer can resolve the table
+ let listing_options = datafusion::datasource::listing::ListingOptions::new(
+ Arc::new(datafusion::datasource::file_format::parquet::ParquetFormat::new()))
+ .with_file_extension(".parquet")
+ .with_collect_stat(true);
+ let resolved_schema = listing_options
+ .infer_schema(&ctx.state(), &shard_view.table_path)
+ .await?;
+ let table_config = datafusion::datasource::listing::ListingTableConfig::new(shard_view.table_path.clone())
+ .with_listing_options(listing_options)
+ .with_schema(resolved_schema);
+ let provider = Arc::new(datafusion::datasource::listing::ListingTable::try_new(table_config)?);
+ ctx.register_table(&table_name, provider)?;
+
+ // Build SessionContextHandle and delegate to execute_indexed_with_context
+ let handle = crate::session_context::SessionContextHandle {
+ ctx,
+ table_path: shard_view.table_path.clone(),
+ object_metas: shard_view.object_metas.clone(),
+ query_context: crate::query_memory_pool_tracker::QueryTrackingContext::new(0, runtime.runtime_env.memory_pool.clone()),
+ table_name: table_name.clone(),
+ indexed_config: None, // derive classification from tree
+ };
+ let ptr = Box::into_raw(Box::new(handle)) as i64;
+ unsafe { execute_indexed_with_context(ptr, substrait_bytes, cpu_executor).await }
+}
+
+// ── Helpers ───────────────────────────────────────────────────────────
+
+/// Collect all `Predicate(expr)` leaves in DFS order. Used by the
+/// dispatcher to build a per-leaf `PruningPredicate` cache keyed by
+/// `Arc::as_ptr` identity.
+fn collect_predicate_exprs(tree: &BoolNode, out: &mut Vec>) {
+ match tree {
+ BoolNode::And(c) | BoolNode::Or(c) => {
+ c.iter().for_each(|ch| collect_predicate_exprs(ch, out))
+ }
+ BoolNode::Not(inner) => collect_predicate_exprs(inner, out),
+ BoolNode::Collector { .. } => {}
+ BoolNode::Predicate(expr) => out.push(Arc::clone(expr)),
+ }
+}
+
+fn collect_predicate_column_indices(extraction: Option<&ExtractionResult>) -> Vec {
+ let Some(e) = extraction else { return vec![] };
+ let mut exprs = Vec::new();
+ collect_predicate_exprs(&e.tree, &mut exprs);
+ let mut indices = BTreeSet::new();
+ for expr in &exprs {
+ let _ = expr.apply(|node| {
+ if let Some(col) = node.as_any().downcast_ref::() {
+ indices.insert(col.index());
+ }
+ Ok(TreeNodeRecursion::Continue)
+ });
+ }
+ indices.into_iter().collect()
+}
+/// For a tree classified as `SingleCollector`, walk it to find the single
+/// Collector leaf and return its query bytes.
+fn single_collector_id(tree: &BoolNode) -> Option {
+ match tree {
+ BoolNode::Collector { annotation_id } => Some(*annotation_id),
+ BoolNode::And(children) => {
+ for child in children {
+ if let Some(id) = single_collector_id(child) {
+ return Some(id);
+ }
+ }
+ None
+ }
+ _ => None,
+ }
+}
+
+/// For a tree classified as `SingleCollector`, return the residual
+/// (all non-Collector parts of the AND tree, re-assembled into a
+/// single BoolNode). Recursively strips Collector leaves from nested
+/// ANDs. Returns `None` if the tree is a bare Collector or the entire
+/// tree is collectors-only (no residual predicates).
+fn extract_single_collector_residual(tree: &BoolNode) -> Option {
+ fn strip_collectors(node: &BoolNode) -> Option {
+ match node {
+ BoolNode::Collector { .. } => None,
+ BoolNode::Predicate(_) => Some(node.clone()),
+ BoolNode::And(children) => {
+ let residuals: Vec =
+ children.iter().filter_map(strip_collectors).collect();
+ match residuals.len() {
+ 0 => None,
+ 1 => Some(residuals.into_iter().next().unwrap()),
+ _ => Some(BoolNode::And(residuals)),
+ }
+ }
+ // OR/NOT with no collectors pass through unchanged (they're
+ // pure-predicate subtrees in a SingleCollector-classified tree).
+ other => Some(other.clone()),
+ }
+ }
+ strip_collectors(tree)
+}
+
+// ── Placeholder provider used only for substrait consume pass ─────────
+
+struct PlaceholderProvider {
+ schema: SchemaRef,
+}
+
+impl fmt::Debug for PlaceholderProvider {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("PlaceholderProvider").finish()
+ }
+}
+
+#[async_trait::async_trait]
+impl TableProvider for PlaceholderProvider {
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+ async fn scan(
+ &self,
+ _state: &dyn Session,
+ _projection: Option<&Vec>,
+ _filters: &[Expr],
+ _limit: Option,
+ ) -> Result, DataFusionError> {
+ Err(DataFusionError::Internal(
+ "PlaceholderProvider should not be scanned".into(),
+ ))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::indexed_table::bool_tree::BoolNode;
+ use datafusion::arrow::datatypes::{DataType, Field, Schema};
+ use datafusion::common::ScalarValue;
+ use datafusion::logical_expr::Operator;
+ use datafusion::physical_expr::expressions::{BinaryExpr, Column as PhysColumn, Literal};
+ use datafusion::physical_expr::PhysicalExpr;
+ use std::sync::Arc;
+
+ fn collector(id: i32) -> BoolNode {
+ BoolNode::Collector {
+ annotation_id: id,
+ }
+ }
+
+ fn pred() -> BoolNode {
+ let left: Arc = Arc::new(PhysColumn::new("price", 0));
+ let right: Arc = Arc::new(Literal::new(ScalarValue::Int32(Some(0))));
+ BoolNode::Predicate(Arc::new(BinaryExpr::new(left, Operator::Eq, right)))
+ }
+
+ fn is_predicate(node: &BoolNode) -> bool {
+ matches!(node, BoolNode::Predicate(_))
+ }
+
+ // ── extract_single_collector_residual ─────────────────────────────
+
+ #[test]
+ fn residual_bare_collector_is_none() {
+ assert!(extract_single_collector_residual(&collector(10)).is_none());
+ }
+
+ #[test]
+ fn residual_and_collector_plus_predicate() {
+ let tree = BoolNode::And(vec![collector(10), pred()]);
+ let r = extract_single_collector_residual(&tree).unwrap();
+ assert!(is_predicate(&r));
+ }
+
+ #[test]
+ fn residual_and_only_collectors_is_none() {
+ let tree = BoolNode::And(vec![collector(10), collector(11)]);
+ assert!(extract_single_collector_residual(&tree).is_none());
+ }
+
+ #[test]
+ fn residual_nested_and_strips_collectors() {
+ // AND(C₁, AND(C₂, P)) → residual is P
+ let tree = BoolNode::And(vec![
+ collector(10),
+ BoolNode::And(vec![collector(11), pred()]),
+ ]);
+ let r = extract_single_collector_residual(&tree).unwrap();
+ assert!(is_predicate(&r));
+ }
+
+ #[test]
+ fn residual_deeply_nested_and() {
+ // AND(P₁, AND(C₁, AND(C₂, P₂))) → AND(P₁, P₂)
+ let p1 = pred();
+ let p2 = pred();
+ let tree = BoolNode::And(vec![
+ p1,
+ BoolNode::And(vec![
+ collector(0),
+ BoolNode::And(vec![collector(1), p2]),
+ ]),
+ ]);
+ let r = extract_single_collector_residual(&tree).unwrap();
+ match r {
+ BoolNode::And(children) => {
+ assert_eq!(children.len(), 2);
+ assert!(children.iter().all(is_predicate));
+ }
+ _ => panic!("expected AND, got {:?}", r),
+ }
+ }
+
+ #[test]
+ fn residual_nested_and_with_or_predicate() {
+ // AND(C, AND(P, OR(P, P))) → AND(P, OR(P, P))
+ let tree = BoolNode::And(vec![
+ collector(10),
+ BoolNode::And(vec![
+ pred(),
+ BoolNode::Or(vec![pred(), pred()]),
+ ]),
+ ]);
+ let r = extract_single_collector_residual(&tree).unwrap();
+ match r {
+ BoolNode::And(children) => {
+ assert_eq!(children.len(), 2);
+ assert!(is_predicate(&children[0]));
+ assert!(matches!(children[1], BoolNode::Or(_)));
+ }
+ _ => panic!("expected AND, got {:?}", r),
+ }
+ }
+
+ #[test]
+ fn residual_nested_and_all_collectors_is_none() {
+ // AND(AND(C₁, C₂), AND(C₃, C₄)) → no residual
+ let tree = BoolNode::And(vec![
+ BoolNode::And(vec![collector(0), collector(1)]),
+ BoolNode::And(vec![collector(2), collector(3)]),
+ ]);
+ assert!(extract_single_collector_residual(&tree).is_none());
+ }
+}
+
+/// Instruction-based indexed execution path. Consumes a pre-configured SessionContextHandle
+/// (with UDF registered and IndexedExecutionConfig set) and routes to the appropriate
+/// evaluator based on the Java-provided FilterTreeShape.
+///
+/// TODO: extract shared logic with `execute_indexed_query` to avoid duplication.
+/// For now this delegates to the existing function by reconstructing the needed args
+/// from the handle.
+pub async unsafe fn execute_indexed_with_context(
+ session_ctx_ptr: i64,
+ substrait_bytes: Vec,
+ cpu_executor: DedicatedExecutor,
+) -> Result {
+ let handle = *Box::from_raw(session_ctx_ptr as *mut crate::session_context::SessionContextHandle);
+ let classification_override = handle.indexed_config.map(|config| {
+ match (config.tree_shape, config.delegated_predicate_count) {
+ (1, 1) => FilterClass::SingleCollector,
+ (1, _) | (2, _) => FilterClass::Tree,
+ _ => FilterClass::None,
+ }
+ });
+
+ let query_config = Arc::new(crate::datafusion_query_config::DatafusionQueryConfig::default());
+ let num_partitions = query_config.target_partitions.max(1);
+ let ctx = handle.ctx;
+ let table_name = handle.table_name;
+ let table_path = handle.table_path;
+ let object_metas = handle.object_metas;
+ let query_context = handle.query_context;
+
+ // SessionContext already has RuntimeEnv, caches, memory pool, UDF from create_session_context_indexed.
+ // Deregister the default ListingTable (registered by create_session_context) — will be replaced
+ // with IndexedTableProvider after plan decoding.
+ ctx.deregister_table(&table_name)?;
+
let store = ctx
.state()
.runtime_env()
- .object_store(&shard_view.table_path)?;
+ .object_store(&table_path)?;
- let (segments, schema) = build_segments(Arc::clone(&store), shard_view.object_metas.as_ref())
+ let (segments, schema) = build_segments(Arc::clone(&store), object_metas.as_ref())
.await
.map_err(DataFusionError::Execution)?;
+ for (i, seg) in segments.iter().enumerate() {
+ }
let placeholder: Arc = Arc::new(PlaceholderProvider {
schema: schema.clone(),
@@ -164,14 +450,19 @@ pub async fn execute_indexed_query(
.map_err(|e| DataFusionError::Execution(format!("expr_to_bool_tree: {}", e)))?,
),
};
- let classification = match &extraction {
- None => FilterClass::None,
- Some(e) => classify_filter(&e.tree),
+
+ // Resolve classification: from Java config if available, otherwise derive from tree
+ let classification = match classification_override {
+ Some(c) => c,
+ None => match &extraction {
+ None => FilterClass::None,
+ Some(e) => classify_filter(&e.tree),
+ },
};
// Derive the parquet pushdown predicate from the BoolNode tree.
// `scan()` ignores DataFusion's filters argument (which contains
- // the `index_filter` UDF marker whose body panics) and uses this
+ // the `delegated_predicate` UDF marker whose body panics) and uses this
// field instead.
//
// SingleCollector: residual (non-Collector top-AND children) →
@@ -207,13 +498,13 @@ pub async fn execute_indexed_query(
"classify_filter returned SingleCollector but extraction is None".into(),
)
})?;
- let bytes = single_collector_bytes(&extraction.tree).ok_or_else(|| {
+ let annotation_id = single_collector_id(&extraction.tree).ok_or_else(|| {
DataFusionError::Internal(
"SingleCollector classified but leaf extraction failed".into(),
)
})?;
let provider =
- Arc::new(create_provider(&bytes).map_err(|e| DataFusionError::External(e.into()))?);
+ Arc::new(create_provider(annotation_id).map_err(|e| DataFusionError::External(e.into()))?);
let schema_for_pruner = schema.clone();
// Extract the residual (non-Collector children of top-level
@@ -243,16 +534,16 @@ pub async fn execute_indexed_query(
chunk.doc_min,
chunk.doc_max,
)
- .map_err(|e| {
- format!(
- "FfmSegmentCollector::create(provider={}, seg={}, doc_range=[{},{})): {}",
- provider.key(),
- segment.segment_ord,
- chunk.doc_min,
- chunk.doc_max,
- e
- )
- })?;
+ .map_err(|e| {
+ format!(
+ "FfmSegmentCollector::create(provider={}, seg={}, doc_range=[{},{})): {}",
+ provider.key(),
+ segment.segment_ord,
+ chunk.doc_min,
+ chunk.doc_max,
+ e
+ )
+ })?;
let pruner = Arc::new(PagePruner::new(
&schema_for_pruner,
Arc::clone(&segment.metadata),
@@ -283,11 +574,11 @@ pub async fn execute_indexed_query(
// get absorbed into the surrounding Or if applicable.
let tree = extraction.tree.push_not_down().flatten();
// One provider per Collector leaf (DFS order).
- let leaf_bytes = tree.collector_leaves();
- let mut providers: Vec> = Vec::with_capacity(leaf_bytes.len());
- for bytes in &leaf_bytes {
+ let leaf_ids = tree.collector_leaves();
+ let mut providers: Vec> = Vec::with_capacity(leaf_ids.len());
+ for annotation_id in &leaf_ids {
providers.push(Arc::new(
- create_provider(bytes).map_err(|e| DataFusionError::External(e.into()))?,
+ create_provider(*annotation_id).map_err(|e| DataFusionError::External(e.into()))?,
));
}
let tree = Arc::new(tree);
@@ -328,7 +619,7 @@ pub async fn execute_indexed_query(
chunk.doc_min,
chunk.doc_max,
)
- .map_err(|e| format!("leaf {} collector: {}", idx, e))?;
+ .map_err(|e| format!("leaf {} collector: {}", idx, e))?;
per_leaf.push((
provider.key(),
Arc::new(collector) as Arc,
@@ -372,7 +663,7 @@ pub async fn execute_indexed_query(
// DataFusion's FileScanConfig. The full URL includes the path
// (e.g. "file:///Users/.../parquet/"); ObjectStoreUrl wants only
// the scheme+authority ("file:///").
- let url_str = shard_view.table_path.as_str();
+ let url_str = table_path.as_str();
let parsed = url::Url::parse(url_str)
.map_err(|e| DataFusionError::Execution(format!("parse table_path URL: {}", e)))?;
let store_url = ObjectStoreUrl::parse(format!("{}://{}", parsed.scheme(), parsed.authority()))?;
@@ -400,242 +691,6 @@ pub async fn execute_indexed_query(
let cross_rt_stream = CrossRtStream::new_with_df_error_stream(df_stream, cpu_executor);
let schema = cross_rt_stream.schema();
let wrapped = RecordBatchStreamAdapter::new(schema, cross_rt_stream);
- Ok(Box::into_raw(Box::new(wrapped)) as i64)
-}
-
-// ── Helpers ───────────────────────────────────────────────────────────
-
-/// Collect all `Predicate(expr)` leaves in DFS order. Used by the
-/// dispatcher to build a per-leaf `PruningPredicate` cache keyed by
-/// `Arc::as_ptr` identity.
-fn collect_predicate_exprs(tree: &BoolNode, out: &mut Vec>) {
- match tree {
- BoolNode::And(c) | BoolNode::Or(c) => {
- c.iter().for_each(|ch| collect_predicate_exprs(ch, out))
- }
- BoolNode::Not(inner) => collect_predicate_exprs(inner, out),
- BoolNode::Collector { .. } => {}
- BoolNode::Predicate(expr) => out.push(Arc::clone(expr)),
- }
-}
-
-fn collect_predicate_column_indices(extraction: Option<&ExtractionResult>) -> Vec {
- let Some(e) = extraction else { return vec![] };
- let mut exprs = Vec::new();
- collect_predicate_exprs(&e.tree, &mut exprs);
- let mut indices = BTreeSet::new();
- for expr in &exprs {
- let _ = expr.apply(|node| {
- if let Some(col) = node.as_any().downcast_ref::() {
- indices.insert(col.index());
- }
- Ok(TreeNodeRecursion::Continue)
- });
- }
- indices.into_iter().collect()
-}
-/// For a tree classified as `SingleCollector`, walk it to find the single
-/// Collector leaf and return its query bytes.
-fn single_collector_bytes(tree: &BoolNode) -> Option> {
- match tree {
- BoolNode::Collector { query_bytes } => Some(Arc::clone(query_bytes)),
- BoolNode::And(children) => {
- let mut all: Vec> = Vec::new();
- for child in children {
- if let Some(b) = single_collector_bytes(child) {
- all.push(b);
- }
- }
- match all.len() {
- 0 => None,
- 1 => Some(all.remove(0)),
- _ => {
- let mut merged = Vec::new();
- for (i, b) in all.iter().enumerate() {
- if i > 0 {
- merged.push(b'\n');
- }
- merged.extend_from_slice(b);
- }
- Some(Arc::from(merged.as_slice()))
- }
- }
- }
- _ => None,
- }
-}
-
-/// For a tree classified as `SingleCollector`, return the residual
-/// (all non-Collector parts of the AND tree, re-assembled into a
-/// single BoolNode). Recursively strips Collector leaves from nested
-/// ANDs. Returns `None` if the tree is a bare Collector or the entire
-/// tree is collectors-only (no residual predicates).
-fn extract_single_collector_residual(tree: &BoolNode) -> Option {
- fn strip_collectors(node: &BoolNode) -> Option {
- match node {
- BoolNode::Collector { .. } => None,
- BoolNode::Predicate(_) => Some(node.clone()),
- BoolNode::And(children) => {
- let residuals: Vec =
- children.iter().filter_map(strip_collectors).collect();
- match residuals.len() {
- 0 => None,
- 1 => Some(residuals.into_iter().next().unwrap()),
- _ => Some(BoolNode::And(residuals)),
- }
- }
- // OR/NOT with no collectors pass through unchanged (they're
- // pure-predicate subtrees in a SingleCollector-classified tree).
- other => Some(other.clone()),
- }
- }
- strip_collectors(tree)
-}
-
-// ── Placeholder provider used only for substrait consume pass ─────────
-
-struct PlaceholderProvider {
- schema: SchemaRef,
-}
-
-impl fmt::Debug for PlaceholderProvider {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("PlaceholderProvider").finish()
- }
-}
-
-#[async_trait::async_trait]
-impl TableProvider for PlaceholderProvider {
- fn as_any(&self) -> &dyn std::any::Any {
- self
- }
- fn schema(&self) -> SchemaRef {
- self.schema.clone()
- }
- fn table_type(&self) -> TableType {
- TableType::Base
- }
- async fn scan(
- &self,
- _state: &dyn Session,
- _projection: Option<&Vec>,
- _filters: &[Expr],
- _limit: Option,
- ) -> Result, DataFusionError> {
- Err(DataFusionError::Internal(
- "PlaceholderProvider should not be scanned".into(),
- ))
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::indexed_table::bool_tree::BoolNode;
- use datafusion::arrow::datatypes::{DataType, Field, Schema};
- use datafusion::common::ScalarValue;
- use datafusion::logical_expr::Operator;
- use datafusion::physical_expr::expressions::{BinaryExpr, Column as PhysColumn, Literal};
- use datafusion::physical_expr::PhysicalExpr;
- use std::sync::Arc;
-
- fn collector(tag: &[u8]) -> BoolNode {
- BoolNode::Collector {
- query_bytes: Arc::from(tag),
- }
- }
-
- fn pred() -> BoolNode {
- let left: Arc = Arc::new(PhysColumn::new("price", 0));
- let right: Arc = Arc::new(Literal::new(ScalarValue::Int32(Some(0))));
- BoolNode::Predicate(Arc::new(BinaryExpr::new(left, Operator::Eq, right)))
- }
-
- fn is_predicate(node: &BoolNode) -> bool {
- matches!(node, BoolNode::Predicate(_))
- }
-
- // ── extract_single_collector_residual ─────────────────────────────
-
- #[test]
- fn residual_bare_collector_is_none() {
- assert!(extract_single_collector_residual(&collector(b"x")).is_none());
- }
-
- #[test]
- fn residual_and_collector_plus_predicate() {
- let tree = BoolNode::And(vec![collector(b"x"), pred()]);
- let r = extract_single_collector_residual(&tree).unwrap();
- assert!(is_predicate(&r));
- }
-
- #[test]
- fn residual_and_only_collectors_is_none() {
- let tree = BoolNode::And(vec![collector(b"x"), collector(b"y")]);
- assert!(extract_single_collector_residual(&tree).is_none());
- }
-
- #[test]
- fn residual_nested_and_strips_collectors() {
- // AND(C₁, AND(C₂, P)) → residual is P
- let tree = BoolNode::And(vec![
- collector(b"x"),
- BoolNode::And(vec![collector(b"y"), pred()]),
- ]);
- let r = extract_single_collector_residual(&tree).unwrap();
- assert!(is_predicate(&r));
- }
-
- #[test]
- fn residual_deeply_nested_and() {
- // AND(P₁, AND(C₁, AND(C₂, P₂))) → AND(P₁, P₂)
- let p1 = pred();
- let p2 = pred();
- let tree = BoolNode::And(vec![
- p1,
- BoolNode::And(vec![
- collector(b"a"),
- BoolNode::And(vec![collector(b"b"), p2]),
- ]),
- ]);
- let r = extract_single_collector_residual(&tree).unwrap();
- match r {
- BoolNode::And(children) => {
- assert_eq!(children.len(), 2);
- assert!(children.iter().all(is_predicate));
- }
- _ => panic!("expected AND, got {:?}", r),
- }
- }
-
- #[test]
- fn residual_nested_and_with_or_predicate() {
- // AND(C, AND(P, OR(P, P))) → AND(P, OR(P, P))
- let tree = BoolNode::And(vec![
- collector(b"x"),
- BoolNode::And(vec![
- pred(),
- BoolNode::Or(vec![pred(), pred()]),
- ]),
- ]);
- let r = extract_single_collector_residual(&tree).unwrap();
- match r {
- BoolNode::And(children) => {
- assert_eq!(children.len(), 2);
- assert!(is_predicate(&children[0]));
- assert!(matches!(children[1], BoolNode::Or(_)));
- }
- _ => panic!("expected AND, got {:?}", r),
- }
- }
-
- #[test]
- fn residual_nested_and_all_collectors_is_none() {
- // AND(AND(C₁, C₂), AND(C₃, C₄)) → no residual
- let tree = BoolNode::And(vec![
- BoolNode::And(vec![collector(b"a"), collector(b"b")]),
- BoolNode::And(vec![collector(b"c"), collector(b"d")]),
- ]);
- assert!(extract_single_collector_residual(&tree).is_none());
- }
+ let stream_handle = crate::api::QueryStreamHandle::new(wrapped, query_context);
+ Ok(Box::into_raw(Box::new(stream_handle)) as i64)
}
diff --git a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/bool_tree.rs b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/bool_tree.rs
index 63854c4088105..bb2d081b99e5b 100644
--- a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/bool_tree.rs
+++ b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/bool_tree.rs
@@ -15,7 +15,7 @@
//! Two flavors:
//!
//! - [`BoolNode`] — unresolved. Produced by `expr_to_bool_tree`.
-//! `Collector` leaves carry the serialized query bytes
+//! `Collector` leaves carry the annotation ID identifying the delegated predicate
//! (as extracted from the `index_filter(bytes)` UDF call);
//! `Predicate` leaves carry an arbitrary DataFusion
//! [`PhysicalExpr`](datafusion::physical_expr::PhysicalExpr) —
@@ -38,12 +38,12 @@ pub enum BoolNode {
And(Vec),
Or(Vec),
Not(Box),
- /// index-backend query payload. The caller (typically the indexed_executor)
- /// upcalls into Java with these bytes at query-resolve time to get a
- /// `provider_key`, then creates per-segment collectors. Bytes are opaque
- /// to Rust; only the Java factory knows how to interpret them.
+ /// Delegated predicate identified by annotation ID. At query-resolve time,
+ /// the indexed executor upcalls into Java with this ID to get a `provider_key`,
+ /// then creates per-segment collectors. The annotation ID maps to a pre-compiled
+ /// query on the Java side (via FilterDelegationHandle).
Collector {
- query_bytes: Arc<[u8]>,
+ annotation_id: i32,
},
/// Arbitrary boolean-valued DataFusion expression. At refinement
/// time, `expr.evaluate(batch)` produces the per-row mask; at page-
@@ -91,13 +91,13 @@ impl BoolNode {
/// `resolve` (via the `*next` index) relies on this invariant; if you
/// change one traversal you MUST change the other in lockstep, or
/// collector-to-leaf matching will silently become wrong.
- pub fn collector_leaves(&self) -> Vec> {
+ pub fn collector_leaves(&self) -> Vec {
let mut out = Vec::new();
self.collect_leaves(&mut out);
out
}
- fn collect_leaves(&self, out: &mut Vec>) {
+ fn collect_leaves(&self, out: &mut Vec) {
match self {
BoolNode::And(children) | BoolNode::Or(children) => {
for c in children {
@@ -105,8 +105,8 @@ impl BoolNode {
}
}
BoolNode::Not(child) => child.collect_leaves(out),
- BoolNode::Collector { query_bytes } => {
- out.push(Arc::clone(query_bytes));
+ BoolNode::Collector { annotation_id } => {
+ out.push(*annotation_id);
}
BoolNode::Predicate(_) => {}
}
@@ -167,7 +167,7 @@ impl BoolNode {
/// `(column, op, value)`.
///
/// Caller is responsible for creating the collectors — typically by
- /// upcalling Java `createProvider(query_bytes)` per leaf to get a
+ /// upcalling Java `createProvider(annotation_id)` per leaf to get a
/// `provider_key`, then `createCollector(provider_key, seg, min, max)`
/// per chunk.
///
@@ -363,9 +363,9 @@ mod tests {
}
}
- fn collector(bytes: &[u8]) -> BoolNode {
+ fn collector(id: i32) -> BoolNode {
BoolNode::Collector {
- query_bytes: Arc::from(bytes),
+ annotation_id: id,
}
}
@@ -382,8 +382,8 @@ mod tests {
#[test]
fn leaf_count_counts_only_collectors() {
let tree = BoolNode::And(vec![
- collector(b"a"),
- BoolNode::Or(vec![collector(b"b"), predicate("x", Operator::Eq, 1)]),
+ collector(0),
+ BoolNode::Or(vec![collector(1), predicate("x", Operator::Eq, 1)]),
predicate("y", Operator::Eq, 2),
]);
assert_eq!(tree.collector_leaf_count(), 2);
@@ -392,21 +392,21 @@ mod tests {
#[test]
fn leaves_dfs_order() {
let tree = BoolNode::And(vec![
- collector(b"x"),
- BoolNode::Or(vec![collector(b"y"), collector(b"z")]),
+ collector(10),
+ BoolNode::Or(vec![collector(11), collector(12)]),
]);
let leaves = tree.collector_leaves();
assert_eq!(leaves.len(), 3);
- assert_eq!(&*leaves[0], b"x");
- assert_eq!(&*leaves[1], b"y");
- assert_eq!(&*leaves[2], b"z");
+ assert_eq!(leaves[0], 10);
+ assert_eq!(leaves[1], 11);
+ assert_eq!(leaves[2], 12);
}
// ── push_not_down (De Morgan) ─────────────────────────────────────
#[test]
fn not_collector_stays_wrapped() {
- let tree = BoolNode::Not(Box::new(collector(b"x")));
+ let tree = BoolNode::Not(Box::new(collector(10)));
let n = tree.push_not_down();
assert!(matches!(n, BoolNode::Not(b) if matches!(*b, BoolNode::Collector { .. })));
}
@@ -414,8 +414,8 @@ mod tests {
#[test]
fn de_morgan_not_and_to_or() {
let tree = BoolNode::Not(Box::new(BoolNode::And(vec![
- collector(b"a"),
- collector(b"b"),
+ collector(0),
+ collector(1),
])));
match tree.push_not_down() {
BoolNode::Or(children) => {
@@ -447,7 +447,7 @@ mod tests {
#[test]
fn double_negation_cancels() {
- let tree = BoolNode::Not(Box::new(BoolNode::Not(Box::new(collector(b"x")))));
+ let tree = BoolNode::Not(Box::new(BoolNode::Not(Box::new(collector(10)))));
let n = tree.push_not_down();
assert!(matches!(n, BoolNode::Collector { .. }));
}
@@ -455,8 +455,8 @@ mod tests {
#[test]
fn nested_not_recurses_through_and_or() {
let tree = BoolNode::Not(Box::new(BoolNode::And(vec![
- BoolNode::Or(vec![collector(b"a"), collector(b"b")]),
- collector(b"c"),
+ BoolNode::Or(vec![collector(0), collector(1)]),
+ collector(2),
])));
match tree.push_not_down() {
BoolNode::Or(outer) => {
@@ -473,8 +473,8 @@ mod tests {
#[test]
fn flatten_collapses_nested_and() {
let tree = BoolNode::And(vec![
- BoolNode::And(vec![collector(b"a"), collector(b"b")]),
- collector(b"c"),
+ BoolNode::And(vec![collector(0), collector(1)]),
+ collector(2),
]);
match tree.flatten() {
BoolNode::And(children) => {
@@ -490,10 +490,10 @@ mod tests {
#[test]
fn flatten_collapses_nested_or() {
let tree = BoolNode::Or(vec![
- collector(b"a"),
+ collector(0),
BoolNode::Or(vec![
- collector(b"b"),
- BoolNode::Or(vec![collector(b"c"), collector(b"d")]),
+ collector(1),
+ BoolNode::Or(vec![collector(2), collector(3)]),
]),
]);
match tree.flatten() {
@@ -505,9 +505,9 @@ mod tests {
#[test]
fn flatten_preserves_mixed_connectives() {
let tree = BoolNode::And(vec![
- collector(b"a"),
- BoolNode::Or(vec![collector(b"b"), collector(b"c")]),
- BoolNode::And(vec![collector(b"d"), collector(b"e")]),
+ collector(0),
+ BoolNode::Or(vec![collector(1), collector(2)]),
+ BoolNode::And(vec![collector(3), collector(4)]),
]);
match tree.flatten() {
BoolNode::And(children) => {
@@ -521,8 +521,8 @@ mod tests {
#[test]
fn flatten_descends_into_not() {
let tree = BoolNode::Not(Box::new(BoolNode::And(vec![
- BoolNode::And(vec![collector(b"a"), collector(b"b")]),
- collector(b"c"),
+ BoolNode::And(vec![collector(0), collector(1)]),
+ collector(2),
])));
match tree.flatten() {
BoolNode::Not(inner) => match *inner {
@@ -537,7 +537,7 @@ mod tests {
#[test]
fn resolve_replaces_collector_bytes_with_refs() {
- let tree = BoolNode::And(vec![collector(b"a"), collector(b"b")]);
+ let tree = BoolNode::And(vec![collector(0), collector(1)]);
let a: Arc = Arc::new(StubCollector(1));
let b: Arc = Arc::new(StubCollector(2));
let resolved = tree.resolve(&[(10, a), (20, b)]).unwrap();
@@ -572,14 +572,14 @@ mod tests {
#[test]
fn resolve_out_of_range_errors() {
- let tree = collector(b"x");
+ let tree = collector(10);
let err = tree.resolve(&[]).unwrap_err();
assert!(err.contains("out of range"), "got: {}", err);
}
#[test]
fn resolve_not_collector_still_wraps() {
- let tree = BoolNode::Not(Box::new(collector(b"x")));
+ let tree = BoolNode::Not(Box::new(collector(10)));
let c: Arc = Arc::new(StubCollector(0));
let resolved = tree.resolve(&[(1, c)]).unwrap();
match resolved {
diff --git a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/ffm_callbacks.rs b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/ffm_callbacks.rs
index 3b418762f4be8..35bfa67c86787 100644
--- a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/ffm_callbacks.rs
+++ b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/ffm_callbacks.rs
@@ -11,7 +11,7 @@
//! Four callback slots, populated once at startup by
//! `df_register_filter_tree_callbacks` (see `ffm.rs`):
//!
-//! - `createProvider(queryBytes, queryBytesLen) -> providerKey|-1`
+//! - `createProvider(annotationId) -> providerKey|-1`
//! - `createCollector(providerKey, segmentOrd, minDoc, maxDoc) -> collectorKey|-1`
//! - `collectDocs(collectorKey, minDoc, maxDoc, outBuf, outWordCap) -> wordsWritten|-1`
//! - `releaseCollector(collectorKey)`
@@ -26,7 +26,7 @@ use super::index::RowGroupDocsCollector;
// ── Callback signatures ───────────────────────────────────────────────
-type CreateProviderFn = unsafe extern "C" fn(*const u8, i64) -> i32;
+type CreateProviderFn = unsafe extern "C" fn(i32) -> i32;
type ReleaseProviderFn = unsafe extern "C" fn(i32);
type CreateCollectorFn = unsafe extern "C" fn(i32, i32, i32, i32) -> i32;
type CollectDocsFn = unsafe extern "C" fn(i32, i32, i32, *mut u64, i64) -> i64;
@@ -133,14 +133,14 @@ impl Drop for ProviderHandle {
}
}
-/// Create a provider from serialized backend query bytes by upcalling Java.
-pub fn create_provider(query_bytes: &[u8]) -> Result {
+/// Create a provider by annotation ID by upcalling Java.
+pub fn create_provider(annotation_id: i32) -> Result {
let create = load_create_provider()?;
- let key = unsafe { create(query_bytes.as_ptr(), query_bytes.len() as i64) };
+ let key = unsafe { create(annotation_id) };
if key < 0 {
return Err(format!(
- "createProvider failed: len={} -> {}",
- query_bytes.len(),
+ "createProvider failed: annotation_id={} -> {}",
+ annotation_id,
key
));
}
diff --git a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/substrait_to_tree.rs b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/substrait_to_tree.rs
index 40dbeecc12a9b..272d8ffe69879 100644
--- a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/substrait_to_tree.rs
+++ b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/substrait_to_tree.rs
@@ -14,7 +14,7 @@
//!
//! - `AND` / `OR` / `NOT` → `BoolNode::And` / `Or` / `Not`
//! - `ScalarFunction` named `COLLECTOR_FUNCTION_NAME` with one `Binary`
-//! literal argument → `BoolNode::Collector { query_bytes }`. Those bytes
+//! literal argument → `BoolNode::Collector { annotation_id }`. The ID
//! are the serialized backend query payload; they're handed to a Java
//! factory at query-resolve time to create a provider.
//! - **Anything else** → lowered to [`Arc`] via
@@ -25,7 +25,7 @@
//!
//! **The substrait plan is the wire format.** Java never serializes an
//! `IndexFilterTree`; it rewrites `column = 'value'` on indexed columns to
-//! `index_filter(query_bytes)` UDF calls during the Calcite marking phase,
+//! `delegated_predicate(annotationId)` UDF calls during the Calcite marking phase,
//! and that survives the substrait round-trip. Rust just reads it back out
//! of the decoded `LogicalPlan`.
@@ -46,7 +46,7 @@ use datafusion::physical_expr::PhysicalExpr;
use super::bool_tree::BoolNode;
/// The UDF name Calcite emits for indexed-column filter markers.
-pub const COLLECTOR_FUNCTION_NAME: &str = "index_filter";
+pub const COLLECTOR_FUNCTION_NAME: &str = "delegated_predicate";
/// Classification of a query's filter expression — drives the evaluator choice.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -152,17 +152,17 @@ fn convert_expr(
}
}
-/// `index_filter(query_bytes)` — a single `Binary` literal arg.
+/// `delegated_predicate(annotationId)` — a single `Int32` literal arg.
fn convert_collector_function(args: &[Expr]) -> Result {
if args.len() != 1 {
return Err(format!(
- "{} expects 1 arg (query_bytes), got {}",
+ "{} expects 1 arg (annotationId), got {}",
COLLECTOR_FUNCTION_NAME,
args.len()
));
}
- let bytes = extract_binary_literal(&args[0])?;
- Ok(BoolNode::Collector { query_bytes: bytes })
+ let annotation_id = extract_int32_literal(&args[0])?;
+ Ok(BoolNode::Collector { annotation_id })
}
/// Strip table qualifiers from `Column` references in an `Expr` tree.
@@ -184,13 +184,11 @@ fn strip_column_qualifiers(expr: &Expr) -> Expr {
.data
}
-fn extract_binary_literal(expr: &Expr) -> Result, String> {
+fn extract_int32_literal(expr: &Expr) -> Result {
match expr {
- Expr::Literal(ScalarValue::Binary(Some(v)), _) => Ok(Arc::from(v.as_slice())),
- Expr::Literal(ScalarValue::LargeBinary(Some(v)), _) => Ok(Arc::from(v.as_slice())),
- Expr::Literal(ScalarValue::FixedSizeBinary(_, Some(v)), _) => Ok(Arc::from(v.as_slice())),
+ Expr::Literal(ScalarValue::Int32(Some(v)), _) => Ok(*v),
_ => Err(format!(
- "{} arg must be a Binary literal, got {:?}",
+ "{} arg must be an Int32 literal, got {:?}",
COLLECTOR_FUNCTION_NAME, expr
)),
}
@@ -233,7 +231,7 @@ fn is_and_only_collector_tree(tree: &BoolNode) -> bool {
}
}
-/// Create the `index_filter(query_bytes) → Boolean` UDF.
+/// Create the `delegated_predicate(annotationId) → Boolean` UDF.
///
/// This UDF exists solely as a marker for `classify_filter` / `expr_to_bool_tree`.
/// Its body is deliberately wired to return a hard `DataFusionError` if it
@@ -252,15 +250,10 @@ struct IndexFilterUdf {
impl IndexFilterUdf {
fn new() -> Self {
- // The UDF takes exactly one binary payload; LargeBinary is accepted
- // for payloads that overflow Binary's 2 GiB limit. FixedSizeBinary is
- // also accepted at decode time (see `extract_binary_literal`) but we
- // don't enumerate every fixed size in the signature.
Self {
signature: Signature::one_of(
vec![
- TypeSignature::Exact(vec![DataType::Binary]),
- TypeSignature::Exact(vec![DataType::LargeBinary]),
+ TypeSignature::Exact(vec![DataType::Int32]),
],
Volatility::Immutable,
),
@@ -418,12 +411,12 @@ mod tests {
let udf = Arc::new(create_index_filter_udf());
let expr = Expr::ScalarFunction(datafusion::logical_expr::expr::ScalarFunction::new_udf(
udf,
- vec![lit(ScalarValue::Binary(Some(b"hello-query".to_vec())))],
+ vec![lit(ScalarValue::Int32(Some(42)))],
));
let r = expr_to_bool_tree(&expr, &test_schema()).unwrap();
match r.tree {
- BoolNode::Collector { query_bytes } => {
- assert_eq!(&*query_bytes, b"hello-query");
+ BoolNode::Collector { annotation_id } => {
+ assert_eq!(annotation_id, 42);
}
_ => panic!("expected Collector"),
}
@@ -431,12 +424,12 @@ mod tests {
#[test]
fn mixed_tree() {
- // AND(collector(bytes), OR(price > 100, qty < 50))
+ // AND(collector(annotationId), OR(price > 100, qty < 50))
let udf = Arc::new(create_index_filter_udf());
let collector_expr =
Expr::ScalarFunction(datafusion::logical_expr::expr::ScalarFunction::new_udf(
udf,
- vec![lit(ScalarValue::Binary(Some(b"mixed".to_vec())))],
+ vec![lit(ScalarValue::Int32(Some(0)))],
));
let or_branch = col("price").gt(lit(100i32)).or(col("qty").lt(lit(50i32)));
let expr = Expr::BinaryExpr(datafusion::logical_expr::BinaryExpr::new(
@@ -450,9 +443,9 @@ mod tests {
// ── classify_filter ──────────────────────────────────────────────
- fn collector(tag: &[u8]) -> BoolNode {
+ fn collector(id: i32) -> BoolNode {
BoolNode::Collector {
- query_bytes: Arc::from(tag),
+ annotation_id: id,
}
}
fn dummy_predicate() -> BoolNode {
@@ -479,40 +472,40 @@ mod tests {
#[test]
fn classify_bare_collector_is_single() {
assert_eq!(
- classify_filter(&collector(b"x")),
+ classify_filter(&collector(10)),
FilterClass::SingleCollector
);
}
#[test]
fn classify_and_of_collector_and_predicates_is_single() {
- let tree = BoolNode::And(vec![collector(b"x"), dummy_predicate(), dummy_predicate()]);
+ let tree = BoolNode::And(vec![collector(10), dummy_predicate(), dummy_predicate()]);
assert_eq!(classify_filter(&tree), FilterClass::SingleCollector);
}
#[test]
fn classify_and_with_two_collectors_is_single() {
// AND(C, C, P) — all collectors under AND-only path → SingleCollector.
- let tree = BoolNode::And(vec![collector(b"x"), collector(b"y"), dummy_predicate()]);
+ let tree = BoolNode::And(vec![collector(10), collector(11), dummy_predicate()]);
assert_eq!(classify_filter(&tree), FilterClass::SingleCollector);
}
#[test]
fn classify_or_containing_collector_is_tree() {
- let tree = BoolNode::Or(vec![collector(b"x"), dummy_predicate()]);
+ let tree = BoolNode::Or(vec![collector(10), dummy_predicate()]);
assert_eq!(classify_filter(&tree), FilterClass::Tree);
}
#[test]
fn classify_not_of_collector_is_tree() {
- let tree = BoolNode::Not(Box::new(collector(b"x")));
+ let tree = BoolNode::Not(Box::new(collector(10)));
assert_eq!(classify_filter(&tree), FilterClass::Tree);
}
#[test]
fn classify_and_with_nested_collector_is_tree() {
let tree = BoolNode::And(vec![
- BoolNode::Or(vec![collector(b"x"), dummy_predicate()]),
+ BoolNode::Or(vec![collector(10), dummy_predicate()]),
dummy_predicate(),
]);
assert_eq!(classify_filter(&tree), FilterClass::Tree);
@@ -524,8 +517,8 @@ mod tests {
fn classify_nested_and_collector_plus_predicate_is_single() {
// AND(C₁, AND(C₂, P)) — nested AND, all collectors under AND-only path.
let tree = BoolNode::And(vec![
- collector(b"x"),
- BoolNode::And(vec![collector(b"y"), dummy_predicate()]),
+ collector(10),
+ BoolNode::And(vec![collector(11), dummy_predicate()]),
]);
assert_eq!(classify_filter(&tree), FilterClass::SingleCollector);
}
@@ -536,10 +529,10 @@ mod tests {
let tree = BoolNode::And(vec![
dummy_predicate(),
BoolNode::And(vec![
- collector(b"a"),
+ collector(0),
BoolNode::And(vec![
- collector(b"b"),
- BoolNode::And(vec![collector(b"c"), dummy_predicate()]),
+ collector(1),
+ BoolNode::And(vec![collector(2), dummy_predicate()]),
]),
]),
]);
@@ -550,8 +543,8 @@ mod tests {
fn classify_nested_and_only_collectors_is_single() {
// AND(AND(C₁, C₂), AND(C₃, C₄)) — nested AND of only collectors.
let tree = BoolNode::And(vec![
- BoolNode::And(vec![collector(b"a"), collector(b"b")]),
- BoolNode::And(vec![collector(b"c"), collector(b"d")]),
+ BoolNode::And(vec![collector(0), collector(1)]),
+ BoolNode::And(vec![collector(2), collector(3)]),
]);
assert_eq!(classify_filter(&tree), FilterClass::SingleCollector);
}
@@ -560,7 +553,7 @@ mod tests {
fn classify_nested_and_with_or_predicate_is_single() {
// AND(C, AND(P, OR(P, P))) — OR contains only predicates, no collectors.
let tree = BoolNode::And(vec![
- collector(b"x"),
+ collector(10),
BoolNode::And(vec![
dummy_predicate(),
BoolNode::Or(vec![dummy_predicate(), dummy_predicate()]),
@@ -573,7 +566,7 @@ mod tests {
fn classify_nested_and_with_not_predicate_is_single() {
// AND(C, NOT(P)) — NOT wraps a predicate, not a collector.
let tree = BoolNode::And(vec![
- collector(b"x"),
+ collector(10),
BoolNode::Not(Box::new(dummy_predicate())),
]);
assert_eq!(classify_filter(&tree), FilterClass::SingleCollector);
@@ -583,9 +576,9 @@ mod tests {
fn classify_nested_and_or_containing_collector_is_tree() {
// AND(C₁, AND(OR(C₂, P), P)) — OR above C₂ → Tree.
let tree = BoolNode::And(vec![
- collector(b"x"),
+ collector(10),
BoolNode::And(vec![
- BoolNode::Or(vec![collector(b"y"), dummy_predicate()]),
+ BoolNode::Or(vec![collector(11), dummy_predicate()]),
dummy_predicate(),
]),
]);
@@ -596,9 +589,9 @@ mod tests {
fn classify_nested_and_not_containing_collector_is_tree() {
// AND(C₁, AND(NOT(C₂), P)) — NOT above C₂ → Tree.
let tree = BoolNode::And(vec![
- collector(b"x"),
+ collector(10),
BoolNode::And(vec![
- BoolNode::Not(Box::new(collector(b"y"))),
+ BoolNode::Not(Box::new(collector(11))),
dummy_predicate(),
]),
]);
diff --git a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/boolean_algebra.rs b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/boolean_algebra.rs
index 062a036b22f08..22b872292b807 100644
--- a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/boolean_algebra.rs
+++ b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/boolean_algebra.rs
@@ -289,7 +289,7 @@ impl LeafId {
// Collector leaves first.
if let Some(provider_id) = self.as_collector_provider_id() {
return BoolNode::Collector {
- query_bytes: Arc::from(&[provider_id][..]),
+ annotation_id: provider_id as i32,
};
}
// Simple comparison leaves via ReferencePred.
diff --git a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/fuzz/harness.rs b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/fuzz/harness.rs
index eb4b116d9c2dd..d7411d90ead83 100644
--- a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/fuzz/harness.rs
+++ b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/fuzz/harness.rs
@@ -504,11 +504,11 @@ fn extract_single_collector(tree: &BoolNode) -> Option<(u8, BoolNode)> {
let mut residuals: Vec = Vec::new();
for child in children {
match child {
- BoolNode::Collector { query_bytes } => {
+ BoolNode::Collector { annotation_id } => {
if tag.is_some() {
return None;
}
- tag = Some(query_bytes[0]);
+ tag = Some(*annotation_id as u8);
}
other => residuals.push(other.clone()),
}
@@ -929,7 +929,7 @@ mod tests {
let lit: Arc = Arc::new(Literal::new(ScalarValue::Int32(Some(1000))));
let predicate = BoolNode::Predicate(Arc::new(BinaryExpr::new(col, Operator::Lt, lit)));
let collector = BoolNode::Collector {
- query_bytes: Arc::from(&[0u8][..]),
+ annotation_id: 0,
};
let tree_node = BoolNode::And(vec![collector, predicate]);
let matching: Vec = (0..100i32).collect();
@@ -947,7 +947,7 @@ mod tests {
let corpus = build_corpus(FixtureConfig::small(0x2222));
let loaded = load_segment(&corpus);
let collector = BoolNode::Collector {
- query_bytes: Arc::from(&[0u8][..]),
+ annotation_id: 0,
};
let matching: Vec = (0..100i32).collect();
let gt = GeneratedTree {
@@ -1038,7 +1038,7 @@ mod tests {
let lit: Arc = Arc::new(Literal::new(ScalarValue::Int32(Some(1000))));
let predicate = BoolNode::Predicate(Arc::new(BinaryExpr::new(col, Operator::Lt, lit)));
let collector_leaf = BoolNode::Collector {
- query_bytes: Arc::from(&[0u8][..]),
+ annotation_id: 0,
};
let tree_node = BoolNode::And(vec![collector_leaf, predicate]);
@@ -1108,10 +1108,10 @@ mod tests {
// Multi-collector → classifies as Tree path.
let c1 = BoolNode::Collector {
- query_bytes: Arc::from(&[0u8][..]),
+ annotation_id: 0,
};
let c2 = BoolNode::Collector {
- query_bytes: Arc::from(&[1u8][..]),
+ annotation_id: 1,
};
let tree_node = BoolNode::And(vec![BoolNode::Or(vec![c1, c2]), predicate]);
@@ -1142,12 +1142,12 @@ mod tests {
let c1_expr =
Expr::ScalarFunction(datafusion::logical_expr::expr::ScalarFunction::new_udf(
Arc::new(idx_filter_udf.clone()),
- vec![lit(ScalarValue::Binary(Some(vec![0u8])))],
+ vec![lit(ScalarValue::Int32(Some(0)))],
));
let c2_expr =
Expr::ScalarFunction(datafusion::logical_expr::expr::ScalarFunction::new_udf(
Arc::new(idx_filter_udf),
- vec![lit(ScalarValue::Binary(Some(vec![1u8])))],
+ vec![lit(ScalarValue::Int32(Some(1)))],
));
let or_expr = datafusion::logical_expr::or(c1_expr, c2_expr);
let price_lt = col("price").lt(lit(ScalarValue::Int32(Some(1000))));
@@ -1191,7 +1191,7 @@ mod tests {
let lit: Arc = Arc::new(Literal::new(ScalarValue::Int32(Some(1000))));
let predicate = BoolNode::Predicate(Arc::new(BinaryExpr::new(col, Operator::Lt, lit)));
let collector_leaf = BoolNode::Collector {
- query_bytes: Arc::from(&[0u8][..]),
+ annotation_id: 0,
};
let tree_node = BoolNode::And(vec![collector_leaf, predicate]);
diff --git a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/fuzz/oracle.rs b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/fuzz/oracle.rs
index 684fec0b30de9..d0693a1f28ce8 100644
--- a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/fuzz/oracle.rs
+++ b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/fuzz/oracle.rs
@@ -103,8 +103,8 @@ fn eval_row(node: &BoolNode, corpus: &Corpus, row: i32, collector_sets: &[HashSe
acc
}
BoolNode::Not(child) => eval_row(child, corpus, row, collector_sets).not(),
- BoolNode::Collector { query_bytes } => {
- let tag = query_bytes[0] as usize;
+ BoolNode::Collector { annotation_id } => {
+ let tag = *annotation_id as u8 as usize;
let set = collector_sets
.get(tag)
.unwrap_or_else(|| panic!("oracle: Collector tag {} has no matching set", tag));
diff --git a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/fuzz/tree_gen.rs b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/fuzz/tree_gen.rs
index 30c402f1ca945..0fcda5fc48546 100644
--- a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/fuzz/tree_gen.rs
+++ b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/fuzz/tree_gen.rs
@@ -145,7 +145,7 @@ fn gen_leaf(rng: &mut StdRng, schema: &SchemaRef, num_collectors: usize) -> Bool
if make_collector {
let id = rng.gen_range(0..num_collectors) as u8;
BoolNode::Collector {
- query_bytes: Arc::from(&[id][..]),
+ annotation_id: id as i32,
}
} else {
gen_predicate_leaf(rng, schema)
@@ -396,8 +396,8 @@ pub(in crate::indexed_table::tests_e2e) fn collect_collector_tags(tree: &BoolNod
match n {
BoolNode::And(cs) | BoolNode::Or(cs) => cs.iter().for_each(|c| walk(c, out)),
BoolNode::Not(c) => walk(c, out),
- BoolNode::Collector { query_bytes } => {
- out.push(query_bytes[0]);
+ BoolNode::Collector { annotation_id } => {
+ out.push(*annotation_id as u8);
}
BoolNode::Predicate(_) => {}
}
diff --git a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/mod.rs b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/mod.rs
index 84ecd599cf5df..14b0967da7b76 100644
--- a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/mod.rs
+++ b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/mod.rs
@@ -327,7 +327,7 @@ async fn run_tree_and_plan(
fn index_leaf(tag: u8) -> BoolNode {
BoolNode::Collector {
- query_bytes: Arc::from(&[tag][..]),
+ annotation_id: tag as i32,
}
}
@@ -375,8 +375,8 @@ fn wire(node: &BoolNode, out: &mut Vec>) {
match node {
BoolNode::And(c) | BoolNode::Or(c) => c.iter().for_each(|x| wire(x, out)),
BoolNode::Not(inner) => wire(inner, out),
- BoolNode::Collector { query_bytes } => {
- let c: Arc = match query_bytes.first().copied() {
+ BoolNode::Collector { annotation_id } => {
+ let c: Arc = match Some(*annotation_id as u8) {
Some(0) => brand_eq("amazon"),
Some(1) => brand_eq("apple"),
Some(2) => status_eq("archived"),
diff --git a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/multi_segment.rs b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/multi_segment.rs
index 86b64dadd1537..57352e63fe947 100644
--- a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/multi_segment.rs
+++ b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/multi_segment.rs
@@ -983,7 +983,7 @@ async fn wide_multi_segment_collector_and_two_predicates() {
let specs = wide_four_seg_specs();
let tree = BoolNode::And(vec![
BoolNode::Collector {
- query_bytes: Arc::from(&[0u8][..]),
+ annotation_id: 0,
},
pred_wide_int("price", Operator::Gt, 50),
pred_wide_int("qty", Operator::Lt, 3),
@@ -1001,7 +1001,7 @@ async fn wide_multi_segment_or_of_predicates_under_collector() {
let specs = wide_four_seg_specs();
let tree = BoolNode::And(vec![
BoolNode::Collector {
- query_bytes: Arc::from(&[0u8][..]),
+ annotation_id: 0,
},
BoolNode::Or(vec![
pred_wide_str("region", Operator::Eq, "us-east"),
@@ -1021,7 +1021,7 @@ async fn wide_multi_segment_not_and_three_column_predicates() {
let specs = wide_four_seg_specs();
let tree = BoolNode::And(vec![
BoolNode::Collector {
- query_bytes: Arc::from(&[0u8][..]),
+ annotation_id: 0,
},
BoolNode::Not(Box::new(pred_wide_int("price", Operator::Lt, 30))),
pred_wide_int("qty", Operator::Gt, 2),
@@ -1043,7 +1043,7 @@ async fn wide_multi_segment_deep_tree_four_predicate_columns() {
let specs = wide_four_seg_specs();
let tree = BoolNode::And(vec![
BoolNode::Collector {
- query_bytes: Arc::from(&[0u8][..]),
+ annotation_id: 0,
},
BoolNode::Or(vec![
BoolNode::And(vec![
diff --git a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/null_columns.rs b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/null_columns.rs
index 2d65eff354166..d54221e774e0b 100644
--- a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/null_columns.rs
+++ b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/null_columns.rs
@@ -233,7 +233,7 @@ fn to_engine_tree_null(tree: &NT, coll_seq: &mut u8) -> BoolNode {
let tag = *coll_seq;
*coll_seq += 1;
BoolNode::Collector {
- query_bytes: Arc::from(&[tag][..]),
+ annotation_id: tag as i32,
}
}
NT::Leaf(NullLeaf::AllNullGe(v)) => pred_int_local("all_null_col", Operator::GtEq, *v),
@@ -274,8 +274,8 @@ fn wire_null_rec(
cs.iter().for_each(|c| wire_null_rec(c, matching_sets, out))
}
BoolNode::Not(inner) => wire_null_rec(inner, matching_sets, out),
- BoolNode::Collector { query_bytes } => {
- let tag = query_bytes.first().copied().expect("empty tag bytes") as usize;
+ BoolNode::Collector { annotation_id } => {
+ let tag = *annotation_id as usize;
let set = &matching_sets[tag];
out.push(Arc::new(RgScopedCollector {
matching_rows: set.clone(),
diff --git a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/page_pruning.rs b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/page_pruning.rs
index 3bada7e3c5870..2466849df08b8 100644
--- a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/page_pruning.rs
+++ b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/page_pruning.rs
@@ -141,7 +141,7 @@ fn pred_node(expr: Arc) -> BoolNode {
fn collector_leaf(tag: u8) -> BoolNode {
BoolNode::Collector {
- query_bytes: Arc::from(&[tag][..]),
+ annotation_id: tag as i32,
}
}
@@ -271,7 +271,7 @@ fn build_pp_map(
fn wire_collectors_dfs(node: &BoolNode, out: &mut Vec>) {
match node {
- BoolNode::Collector { query_bytes } => out.push(collector_for_tag(query_bytes[0])),
+ BoolNode::Collector { annotation_id } => out.push(collector_for_tag(*annotation_id as u8)),
BoolNode::And(cs) | BoolNode::Or(cs) => cs.iter().for_each(|c| wire_collectors_dfs(c, out)),
BoolNode::Not(c) => wire_collectors_dfs(c, out),
BoolNode::Predicate(_) => {}
diff --git a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/streaming_at_scale.rs b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/streaming_at_scale.rs
index a8b140f304124..68e667c2cd495 100644
--- a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/streaming_at_scale.rs
+++ b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/streaming_at_scale.rs
@@ -248,24 +248,20 @@ fn to_engine_tree_large(tree: <) -> BoolNode {
match tree {
LT::Leaf(l) => match l {
LLeaf::LBrand(b) => BoolNode::Collector {
- query_bytes: Arc::from(
- &[match *b {
- "amazon" => 0u8,
- "apple" => 1,
- "google" => 2,
- "samsung" => 3,
- _ => panic!("unknown brand {}", b),
- }][..],
- ),
+ annotation_id: match *b {
+ "amazon" => 0,
+ "apple" => 1,
+ "google" => 2,
+ "samsung" => 3,
+ _ => panic!("unknown brand {}", b),
+ },
},
LLeaf::LStatus(s) => BoolNode::Collector {
- query_bytes: Arc::from(
- &[match *s {
- "active" => 4u8,
- "archived" => 5,
- _ => panic!("unknown status {}", s),
- }][..],
- ),
+ annotation_id: match *s {
+ "active" => 4,
+ "archived" => 5,
+ _ => panic!("unknown status {}", s),
+ },
},
LLeaf::LPriceGe(v) => pred_large_int("price", Operator::GtEq, *v),
LLeaf::LPriceLt(v) => pred_large_int("price", Operator::Lt, *v),
@@ -292,8 +288,8 @@ fn wire_large_rec(node: &BoolNode, out: &mut Vec>
match node {
BoolNode::And(cs) | BoolNode::Or(cs) => cs.iter().for_each(|c| wire_large_rec(c, out)),
BoolNode::Not(inner) => wire_large_rec(inner, out),
- BoolNode::Collector { query_bytes } => {
- let tag = query_bytes.first().copied().expect("empty tag bytes");
+ BoolNode::Collector { annotation_id } => {
+ let tag = Some(*annotation_id as u8).expect("empty tag bytes");
out.push(large_collector_for(tag));
}
BoolNode::Predicate(_) => {}
diff --git a/sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs b/sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs
index 9de9caaa968a5..4afcc6d414975 100644
--- a/sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs
+++ b/sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs
@@ -37,6 +37,15 @@ pub struct SessionContextHandle {
pub table_path: ListingTableUrl,
pub object_metas: Arc>,
pub query_context: QueryTrackingContext,
+ pub table_name: String,
+ /// When set, indicates this session uses the indexed execution path with filter delegation.
+ pub indexed_config: Option,
+}
+
+/// Configuration for indexed execution with filter delegation, provided by Java.
+pub struct IndexedExecutionConfig {
+ pub tree_shape: i32,
+ pub delegated_predicate_count: i32,
}
/// Creates a SessionContext with per-query RuntimeEnv and registers the default
@@ -134,6 +143,8 @@ pub async unsafe fn create_session_context(
table_path: shard_view.table_path.clone(),
object_metas: shard_view.object_metas.clone(),
query_context,
+ table_name: table_name.to_string(),
+ indexed_config: None,
};
Ok(Box::into_raw(Box::new(handle)) as i64)
}
@@ -147,3 +158,28 @@ pub unsafe fn close_session_context(ptr: i64) {
let _ = Box::from_raw(ptr as *mut SessionContextHandle);
}
}
+
+/// Creates a SessionContext configured for indexed execution with filter delegation.
+/// Registers the `delegated_predicate` UDF and stores the tree shape + predicate count
+/// for use during execution.
+pub async unsafe fn create_session_context_indexed(
+ runtime_ptr: i64,
+ shard_view_ptr: i64,
+ table_name: &str,
+ context_id: i64,
+ tree_shape: i32,
+ delegated_predicate_count: i32,
+) -> Result {
+ // Create base session context (same as non-indexed path)
+ let ptr = create_session_context(runtime_ptr, shard_view_ptr, table_name, context_id).await?;
+
+ // Augment with indexed config and UDF registration
+ let handle = &mut *(ptr as *mut SessionContextHandle);
+ handle.ctx.register_udf(crate::indexed_table::substrait_to_tree::create_index_filter_udf());
+ handle.indexed_config = Some(IndexedExecutionConfig {
+ tree_shape,
+ delegated_predicate_count,
+ });
+
+ Ok(ptr)
+}
diff --git a/sandbox/plugins/analytics-backend-datafusion/src/internalClusterTest/java/org/opensearch/be/datafusion/CoordinatorReduceIT.java b/sandbox/plugins/analytics-backend-datafusion/src/internalClusterTest/java/org/opensearch/be/datafusion/CoordinatorReduceIT.java
deleted file mode 100644
index 717f3054cb97a..0000000000000
--- a/sandbox/plugins/analytics-backend-datafusion/src/internalClusterTest/java/org/opensearch/be/datafusion/CoordinatorReduceIT.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * SPDX-License-Identifier: Apache-2.0
- *
- * The OpenSearch Contributors require contributions made to
- * this file be licensed under the Apache-2.0 license or a
- * compatible open source license.
- */
-
-package org.opensearch.be.datafusion;
-
-import org.opensearch.Version;
-import org.opensearch.action.admin.indices.create.CreateIndexResponse;
-import org.opensearch.analytics.AnalyticsPlugin;
-import org.opensearch.arrow.flight.transport.FlightStreamPlugin;
-import org.opensearch.be.lucene.LucenePlugin;
-import org.opensearch.cluster.metadata.IndexMetadata;
-import org.opensearch.common.settings.Settings;
-import org.opensearch.common.util.FeatureFlags;
-import org.opensearch.composite.CompositeDataFormatPlugin;
-import org.opensearch.parquet.ParquetDataFormatPlugin;
-import org.opensearch.plugins.Plugin;
-import org.opensearch.plugins.PluginInfo;
-import org.opensearch.ppl.TestPPLPlugin;
-import org.opensearch.ppl.action.PPLRequest;
-import org.opensearch.ppl.action.PPLResponse;
-import org.opensearch.ppl.action.UnifiedPPLExecuteAction;
-import org.opensearch.test.OpenSearchIntegTestCase;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * End-to-end smoke test for the streaming coordinator-reduce path:
- *
- *
All calls delegate to the currently installed {@link FilterDelegationHandle}.
+ * The handle is set per-query-per-shard before execution and cleared after.
*
*
Error-handling contract
*
Every method catches all {@link Throwable}s and returns {@code -1}
* (or silently returns for void methods). A Java exception escaping through
* an FFM upcall stub crashes the JVM.
+ *
+ * // TODO: remove old Registries-based code path and CollectorRegistry/FilterProviderRegistry
+ * // once all tests are migrated to the FilterDelegationHandle path.
*/
public final class FilterTreeCallbacks {
private static final Logger LOGGER = LogManager.getLogger(FilterTreeCallbacks.class);
- /** Both registries in one snapshot — one AtomicReference read per upcall. */
- record Registries(FilterProviderRegistry providers, CollectorRegistry collectors) {
- }
-
- private static final AtomicReference REGISTRIES = new AtomicReference<>();
+ private static final AtomicReference HANDLE = new AtomicReference<>();
private FilterTreeCallbacks() {}
/**
- * Install the registries. Called once at plugin startup.
+ * Install the delegation handle for the current execution.
+ * Called by {@code configureFilterDelegation} before query execution.
* Tests may call with {@code null} to reset.
*/
- public static void setRegistries(FilterProviderRegistry providers, CollectorRegistry collectors) {
- REGISTRIES.set(providers == null ? null : new Registries(providers, collectors));
+ public static void setHandle(FilterDelegationHandle handle) {
+ HANDLE.set(handle);
}
// ── Provider lifecycle (cold path, once per query) ────────────────
/**
- * {@code createProvider(queryBytes, queryBytesLen) -> providerKey|-1}.
+ * {@code createProvider(annotationId) -> providerKey|-1}.
*/
- public static int createProvider(MemorySegment queryBytesPtr, long queryBytesLen) {
+ public static int createProvider(int annotationId) {
try {
- Registries reg = REGISTRIES.get();
- if (reg == null) {
+ FilterDelegationHandle handle = HANDLE.get();
+ if (handle == null) {
return -1;
}
- MemorySegment view = queryBytesPtr.reinterpret(queryBytesLen);
- byte[] bytes = view.toArray(ValueLayout.JAVA_BYTE);
- return reg.providers().createProvider(bytes);
- } catch (Throwable t) {
- LOGGER.error("createProvider failed", t);
+ return handle.createProvider(annotationId);
+ } catch (Throwable throwable) {
+ LOGGER.error("createProvider failed for annotationId={}", annotationId, throwable);
return -1;
}
}
@@ -76,12 +70,12 @@ public static int createProvider(MemorySegment queryBytesPtr, long queryBytesLen
*/
public static void releaseProvider(int providerKey) {
try {
- Registries reg = REGISTRIES.get();
- if (reg != null) {
- reg.providers().releaseProvider(providerKey);
+ FilterDelegationHandle handle = HANDLE.get();
+ if (handle != null) {
+ handle.releaseProvider(providerKey);
}
- } catch (Throwable t) {
- LOGGER.error(new ParameterizedMessage("releaseProvider({}) failed", providerKey), t);
+ } catch (Throwable throwable) {
+ LOGGER.error(new ParameterizedMessage("releaseProvider({}) failed", providerKey), throwable);
}
}
@@ -92,12 +86,12 @@ public static void releaseProvider(int providerKey) {
*/
public static int createCollector(int providerKey, int segmentOrd, int minDoc, int maxDoc) {
try {
- Registries reg = REGISTRIES.get();
- if (reg == null) {
+ FilterDelegationHandle handle = HANDLE.get();
+ if (handle == null) {
return -1;
}
- return reg.providers().createCollector(providerKey, segmentOrd, minDoc, maxDoc);
- } catch (Throwable t) {
+ return handle.createCollector(providerKey, segmentOrd, minDoc, maxDoc);
+ } catch (Throwable throwable) {
LOGGER.error(
new ParameterizedMessage(
"createCollector(providerKey={}, seg={}, [{}, {})) failed",
@@ -106,7 +100,7 @@ public static int createCollector(int providerKey, int segmentOrd, int minDoc, i
minDoc,
maxDoc
),
- t
+ throwable
);
return -1;
}
@@ -114,26 +108,22 @@ public static int createCollector(int providerKey, int segmentOrd, int minDoc, i
/**
* {@code collectDocs(collectorKey, minDoc, maxDoc, outPtr, outWordCap) -> wordsWritten|-1}.
- *
- *
Single map lookup into {@link CollectorRegistry}. The provider
- * reference is already captured in the {@link CollectorHandle}.
*/
public static long collectDocs(int collectorKey, int minDoc, int maxDoc, MemorySegment outPtr, long outWordCap) {
try {
- Registries reg = REGISTRIES.get();
- if (reg == null) {
- return -1L;
- }
- CollectorHandle handle = reg.collectors().collector(collectorKey);
+ FilterDelegationHandle handle = HANDLE.get();
if (handle == null) {
return -1L;
}
int maxWords = (int) Math.min(outWordCap, (long) Integer.MAX_VALUE);
MemorySegment view = outPtr.reinterpret((long) maxWords * Long.BYTES);
- int n = handle.provider().collectDocs(handle.innerCollectorKey(), minDoc, maxDoc, view);
- return (n < 0) ? -1L : n;
- } catch (Throwable t) {
- LOGGER.error(new ParameterizedMessage("collectDocs(collectorKey={}, [{}, {})) failed", collectorKey, minDoc, maxDoc), t);
+ int wordsWritten = handle.collectDocs(collectorKey, minDoc, maxDoc, view);
+ return (wordsWritten < 0) ? -1L : wordsWritten;
+ } catch (Throwable throwable) {
+ LOGGER.error(
+ new ParameterizedMessage("collectDocs(collectorKey={}, [{}, {})) failed", collectorKey, minDoc, maxDoc),
+ throwable
+ );
return -1L;
}
}
@@ -143,18 +133,12 @@ public static long collectDocs(int collectorKey, int minDoc, int maxDoc, MemoryS
*/
public static void releaseCollector(int collectorKey) {
try {
- Registries reg = REGISTRIES.get();
- if (reg == null) {
- return;
- }
- CollectorHandle handle = reg.collectors().collector(collectorKey);
- if (handle == null) {
- return;
+ FilterDelegationHandle handle = HANDLE.get();
+ if (handle != null) {
+ handle.releaseCollector(collectorKey);
}
- handle.provider().releaseCollector(handle.innerCollectorKey());
- reg.collectors().unregisterCollector(collectorKey);
- } catch (Throwable t) {
- LOGGER.error(new ParameterizedMessage("releaseCollector({}) failed", collectorKey), t);
+ } catch (Throwable throwable) {
+ LOGGER.error(new ParameterizedMessage("releaseCollector({}) failed", collectorKey), throwable);
}
}
}
diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/nativelib/NativeBridge.java b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/nativelib/NativeBridge.java
index 7afa5eb24feff..b9660d05e83dd 100644
--- a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/nativelib/NativeBridge.java
+++ b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/nativelib/NativeBridge.java
@@ -70,6 +70,7 @@ public final class NativeBridge {
private static final MethodHandle CACHE_MANAGER_GET_TOTAL_MEMORY;
private static final MethodHandle CACHE_MANAGER_CONTAINS_BY_TYPE;
private static final MethodHandle CREATE_SESSION_CONTEXT;
+ private static final MethodHandle CREATE_SESSION_CONTEXT_INDEXED;
private static final MethodHandle CLOSE_SESSION_CONTEXT;
private static final MethodHandle EXECUTE_WITH_CONTEXT;
@@ -269,6 +270,20 @@ public final class NativeBridge {
)
);
+ CREATE_SESSION_CONTEXT_INDEXED = linker.downcallHandle(
+ lib.find("df_create_session_context_indexed").orElseThrow(),
+ FunctionDescriptor.of(
+ ValueLayout.JAVA_LONG,
+ ValueLayout.JAVA_LONG,
+ ValueLayout.JAVA_LONG,
+ ValueLayout.ADDRESS,
+ ValueLayout.JAVA_LONG,
+ ValueLayout.JAVA_LONG,
+ ValueLayout.JAVA_INT,
+ ValueLayout.JAVA_INT
+ )
+ );
+
// i64 df_cache_manager_add_files(runtime_ptr, files_ptr, files_len_ptr, files_count)
CACHE_MANAGER_ADD_FILES = linker.downcallHandle(
lib.find("df_cache_manager_add_files").orElseThrow(),
@@ -353,7 +368,7 @@ private static void installFilterTreeCallbacks(Linker linker) {
MethodHandle createProvider = lookup.findStatic(
cb,
"createProvider",
- java.lang.invoke.MethodType.methodType(int.class, java.lang.foreign.MemorySegment.class, long.class)
+ java.lang.invoke.MethodType.methodType(int.class, int.class)
);
MethodHandle releaseProvider = lookup.findStatic(
cb,
@@ -385,7 +400,7 @@ private static void installFilterTreeCallbacks(Linker linker) {
java.lang.foreign.MemorySegment createProviderStub = linker.upcallStub(
createProvider,
- FunctionDescriptor.of(ValueLayout.JAVA_INT, ValueLayout.ADDRESS, ValueLayout.JAVA_LONG),
+ FunctionDescriptor.of(ValueLayout.JAVA_INT, ValueLayout.JAVA_INT),
arena
);
java.lang.foreign.MemorySegment releaseProviderStub = linker.upcallStub(
@@ -692,6 +707,37 @@ public static SessionContextHandle createSessionContext(long readerPtr, long run
}
}
+ /**
+ * Creates a SessionContext configured for indexed execution with filter delegation.
+ * Registers the delegated_predicate UDF and stores treeShape + delegatedPredicateCount
+ * on the Rust handle for use during execution.
+ */
+ public static SessionContextHandle createSessionContextForIndexedExecution(
+ long readerPtr,
+ long runtimePtr,
+ String tableName,
+ long contextId,
+ int treeShapeOrdinal,
+ int delegatedPredicateCount
+ ) {
+ NativeHandle.validatePointer(readerPtr, "reader");
+ NativeHandle.validatePointer(runtimePtr, "runtime");
+ try (NativeCall call = new NativeCall()) {
+ NativeCall.Str table = call.str(tableName);
+ long ptr = call.invoke(
+ CREATE_SESSION_CONTEXT_INDEXED,
+ readerPtr,
+ runtimePtr,
+ table.segment(),
+ table.len(),
+ contextId,
+ treeShapeOrdinal,
+ delegatedPredicateCount
+ );
+ return new SessionContextHandle(ptr);
+ }
+ }
+
/**
* Executes a Substrait plan against the configured SessionContext.
* Consumes the session context handle (freed internally when stream closes).
diff --git a/sandbox/plugins/analytics-backend-datafusion/src/test/java/org/opensearch/be/datafusion/FilterDelegationForIndexFullConversionTests.java b/sandbox/plugins/analytics-backend-datafusion/src/test/java/org/opensearch/be/datafusion/FilterDelegationForIndexFullConversionTests.java
index bcbe2c3ef7c92..c0b7e1eb6ffe0 100644
--- a/sandbox/plugins/analytics-backend-datafusion/src/test/java/org/opensearch/be/datafusion/FilterDelegationForIndexFullConversionTests.java
+++ b/sandbox/plugins/analytics-backend-datafusion/src/test/java/org/opensearch/be/datafusion/FilterDelegationForIndexFullConversionTests.java
@@ -54,6 +54,7 @@
import org.opensearch.analytics.spi.ScalarFunction;
import org.opensearch.analytics.spi.ScanCapability;
import org.opensearch.analytics.spi.ShardScanInstructionNode;
+import org.opensearch.analytics.spi.ShardScanWithDelegationInstructionNode;
import org.opensearch.be.lucene.LuceneAnalyticsBackendPlugin;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
@@ -468,6 +469,11 @@ public Optional createFilterDelegationNode(
return Optional.of(new FilterDelegationInstructionNode(treeShape, delegatedPredicateCount, delegatedExpressions));
}
+ @Override
+ public Optional createShardScanWithDelegationNode(FilterTreeShape treeShape, int delegatedPredicateCount) {
+ return Optional.of(new ShardScanWithDelegationInstructionNode(treeShape, delegatedPredicateCount));
+ }
+
@Override
public Optional createPartialAggregateNode() {
return Optional.empty();
diff --git a/sandbox/plugins/analytics-backend-datafusion/src/test/java/org/opensearch/be/datafusion/indexfilter/IndexFilterCallbackTests.java b/sandbox/plugins/analytics-backend-datafusion/src/test/java/org/opensearch/be/datafusion/indexfilter/IndexFilterCallbackTests.java
index 5ea0da63d9a8f..1606b76facbb2 100644
--- a/sandbox/plugins/analytics-backend-datafusion/src/test/java/org/opensearch/be/datafusion/indexfilter/IndexFilterCallbackTests.java
+++ b/sandbox/plugins/analytics-backend-datafusion/src/test/java/org/opensearch/be/datafusion/indexfilter/IndexFilterCallbackTests.java
@@ -8,172 +8,174 @@
package org.opensearch.be.datafusion.indexfilter;
-import org.opensearch.analytics.spi.IndexFilterProvider;
-import org.opensearch.analytics.spi.IndexFilterProviderFactory;
+import org.opensearch.analytics.spi.FilterDelegationHandle;
import org.opensearch.test.OpenSearchTestCase;
-import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
/**
- * Tests the Java-side provider/collector registries + FFM callback dispatch
- * glue without going through the full substrait → native pipeline.
+ * Tests the Java-side FFM callback dispatch via {@link FilterTreeCallbacks}
+ * routing to a {@link FilterDelegationHandle} without going through the full
+ * substrait → native pipeline.
*/
public class IndexFilterCallbackTests extends OpenSearchTestCase {
- private FilterProviderRegistry providers;
- private CollectorRegistry collectors;
-
@Override
public void setUp() throws Exception {
super.setUp();
- collectors = new CollectorRegistry();
- providers = new FilterProviderRegistry(collectors);
- FilterTreeCallbacks.setRegistries(providers, collectors);
+ FilterTreeCallbacks.setHandle(null);
}
@Override
public void tearDown() throws Exception {
- FilterTreeCallbacks.setRegistries(null, null);
+ FilterTreeCallbacks.setHandle(null);
super.tearDown();
}
- public void testCreateCollectReleaseRoundTrip() {
- MockProvider provider = new MockProvider(new long[] { 0x5L, 0x0L });
- // Register provider directly via lifecycle, bypassing factory upcall.
- int providerKey = providers.createProvider(new byte[0]);
- // That returns -1 because no factory is set. Register manually instead.
- providerKey = registerProviderDirectly(provider);
-
+ public void testFullRoundTrip() {
+ long[] cannedWords = new long[] { 0x5L, 0x3L };
+ MockHandle handle = new MockHandle(cannedWords);
+ FilterTreeCallbacks.setHandle(handle);
+
+ // createProvider
+ int providerKey = FilterTreeCallbacks.createProvider(42);
+ assertTrue("providerKey >= 0", providerKey >= 0);
+ assertEquals("handle received annotationId", 42, handle.lastAnnotationId);
+
+ // createCollector
+ int collectorKey = FilterTreeCallbacks.createCollector(providerKey, 2, 0, 128);
+ assertTrue("collectorKey >= 0", collectorKey >= 0);
+ assertEquals("handle received providerKey", providerKey, handle.lastProviderKey);
+ assertEquals("handle received segmentOrd", 2, handle.lastSegmentOrd);
+ assertEquals("handle received minDoc", 0, handle.lastMinDoc);
+ assertEquals("handle received maxDoc", 128, handle.lastMaxDoc);
+
+ // collectDocs
try (Arena arena = Arena.ofConfined()) {
- int collectorKey = FilterTreeCallbacks.createCollector(providerKey, 0, 0, 64);
- assertTrue("collectorKey >= 0", collectorKey >= 0);
-
- MemorySegment buf = arena.allocate(Long.BYTES);
- int n = provider.collectDocs(collectors.collector(collectorKey).innerCollectorKey(), 0, 64, buf);
- assertEquals(1, n);
+ MemorySegment buf = arena.allocate(Long.BYTES * 2);
+ long wordsWritten = FilterTreeCallbacks.collectDocs(collectorKey, 0, 128, buf, 2);
+ assertEquals("wordsWritten matches canned length", 2L, wordsWritten);
assertEquals(0x5L, buf.getAtIndex(ValueLayout.JAVA_LONG, 0));
-
- FilterTreeCallbacks.releaseCollector(collectorKey);
- assertNull("collector removed from registry", collectors.collector(collectorKey));
- } finally {
- FilterTreeCallbacks.releaseProvider(providerKey);
- assertNull("provider removed from registry", providers.provider(providerKey));
+ assertEquals(0x3L, buf.getAtIndex(ValueLayout.JAVA_LONG, 1));
}
- }
- public void testCreateWithUnknownProviderReturnsError() {
- assertEquals(-1, FilterTreeCallbacks.createCollector(Integer.MAX_VALUE, 0, 0, 16));
+ // releaseCollector
+ FilterTreeCallbacks.releaseCollector(collectorKey);
+ assertEquals("handle received collectorKey for release", collectorKey, handle.lastReleasedCollectorKey);
+
+ // releaseProvider
+ FilterTreeCallbacks.releaseProvider(providerKey);
+ assertEquals("handle received providerKey for release", providerKey, handle.lastReleasedProviderKey);
}
- public void testReleaseWithUnknownCollectorIsSafe() {
- FilterTreeCallbacks.releaseCollector(Integer.MAX_VALUE);
+ public void testNoHandleReturnsNegativeOne() {
+ FilterTreeCallbacks.setHandle(null);
+ assertEquals(-1, FilterTreeCallbacks.createProvider(1));
+ assertEquals(-1, FilterTreeCallbacks.createCollector(1, 0, 0, 64));
+ try (Arena arena = Arena.ofConfined()) {
+ MemorySegment buf = arena.allocate(Long.BYTES);
+ assertEquals(-1L, FilterTreeCallbacks.collectDocs(1, 0, 64, buf, 1));
+ }
}
- public void testReleaseWithUnknownProviderIsSafe() {
+ public void testReleaseWithNoHandleIsSafe() {
+ FilterTreeCallbacks.setHandle(null);
+ FilterTreeCallbacks.releaseCollector(Integer.MAX_VALUE);
FilterTreeCallbacks.releaseProvider(Integer.MAX_VALUE);
}
- public void testCreateProviderDispatchesToRegisteredFactory() throws IOException {
- byte[] expected = new byte[] { 1, 2, 3, 4 };
- StubFactory factory = new StubFactory(expected);
- providers.setFactory(factory);
+ public void testHandleReturningNegativeOnePropagates() {
+ FilterDelegationHandle failingHandle = new FilterDelegationHandle() {
+ @Override
+ public int createProvider(int annotationId) {
+ return -1;
+ }
- try (Arena arena = Arena.ofConfined()) {
- MemorySegment seg = arena.allocate(expected.length);
- MemorySegment.copy(expected, 0, seg, ValueLayout.JAVA_BYTE, 0, expected.length);
+ @Override
+ public int createCollector(int providerKey, int segOrd, int minDoc, int maxDoc) {
+ return -1;
+ }
- int key = FilterTreeCallbacks.createProvider(seg, expected.length);
- assertTrue("providerKey >= 0", key >= 0);
- assertEquals("factory invoked exactly once", 1, factory.callCount);
+ @Override
+ public int collectDocs(int collectorKey, int minDoc, int maxDoc, MemorySegment out) {
+ return -1;
+ }
- IndexFilterProvider registered = providers.provider(key);
- assertNotNull("provider registered under returned key", registered);
- assertSame("registered provider is the one factory produced", factory.lastProvider, registered);
+ @Override
+ public void releaseCollector(int collectorKey) {}
- FilterTreeCallbacks.releaseProvider(key);
- assertNull(providers.provider(key));
- }
- }
+ @Override
+ public void releaseProvider(int providerKey) {}
- public void testCreateProviderWithNoFactoryReturnsError() {
- // Fresh lifecycle with no factory set.
- CollectorRegistry emptyColl = new CollectorRegistry();
- FilterProviderRegistry empty = new FilterProviderRegistry(emptyColl);
- FilterTreeCallbacks.setRegistries(empty, emptyColl);
+ @Override
+ public void close() {}
+ };
+ FilterTreeCallbacks.setHandle(failingHandle);
+
+ assertEquals(-1, FilterTreeCallbacks.createProvider(1));
+ assertEquals(-1, FilterTreeCallbacks.createCollector(1, 0, 0, 64));
try (Arena arena = Arena.ofConfined()) {
- MemorySegment seg = arena.allocate(1);
- int key = FilterTreeCallbacks.createProvider(seg, 1);
- assertEquals("no factory → -1", -1, key);
+ MemorySegment buf = arena.allocate(Long.BYTES);
+ assertEquals(-1L, FilterTreeCallbacks.collectDocs(1, 0, 64, buf, 1));
}
}
- /**
- * Helper: register a provider directly into the lifecycle's internal map
- * for tests that bypass the factory. Uses reflection-free approach by
- * setting a factory that returns the given provider, calling createProvider,
- * then the factory is consumed.
- */
- private int registerProviderDirectly(IndexFilterProvider provider) {
- // Use a one-shot factory that returns the given provider.
- FilterProviderRegistry directLifecycle = new FilterProviderRegistry(collectors);
- directLifecycle.setFactory(bytes -> provider);
- int key = directLifecycle.createProvider(new byte[0]);
- // Swap the lifecycle so FilterTreeCallbacks sees this provider.
- // We need to keep the collectors registry.
- this.providers = directLifecycle;
- FilterTreeCallbacks.setRegistries(directLifecycle, collectors);
- return key;
- }
-
- /** Stub factory that records its input and emits a MockProvider. */
- private static final class StubFactory implements IndexFilterProviderFactory {
- private final byte[] expectedBytes;
- int callCount = 0;
- IndexFilterProvider lastProvider;
-
- StubFactory(byte[] expectedBytes) {
- this.expectedBytes = expectedBytes;
+ /** Mock handle that records arguments and returns canned bitset words. */
+ private static final class MockHandle implements FilterDelegationHandle {
+ private final long[] cannedWords;
+ private int nextKey = 1;
+
+ int lastAnnotationId = -1;
+ int lastProviderKey = -1;
+ int lastSegmentOrd = -1;
+ int lastMinDoc = -1;
+ int lastMaxDoc = -1;
+ int lastCollectorKey = -1;
+ int lastReleasedCollectorKey = -1;
+ int lastReleasedProviderKey = -1;
+
+ MockHandle(long[] cannedWords) {
+ this.cannedWords = cannedWords;
}
@Override
- public IndexFilterProvider create(byte[] queryBytes) {
- callCount++;
- assertArrayEquals("factory receives the exact bytes from upcall", expectedBytes, queryBytes);
- lastProvider = new MockProvider(new long[] { 0xAL });
- return lastProvider;
- }
- }
-
- /** In-memory provider that returns canned bitset words. */
- private static final class MockProvider implements IndexFilterProvider {
- private final long[] cannedWords;
- private int nextCollector = 1;
-
- MockProvider(long[] cannedWords) {
- this.cannedWords = cannedWords;
+ public int createProvider(int annotationId) {
+ this.lastAnnotationId = annotationId;
+ return nextKey++;
}
@Override
- public int createCollector(int segmentOrd, int minDoc, int maxDoc) {
- return nextCollector++;
+ public int createCollector(int providerKey, int segmentOrd, int minDoc, int maxDoc) {
+ this.lastProviderKey = providerKey;
+ this.lastSegmentOrd = segmentOrd;
+ this.lastMinDoc = minDoc;
+ this.lastMaxDoc = maxDoc;
+ return nextKey++;
}
@Override
public int collectDocs(int collectorKey, int minDoc, int maxDoc, MemorySegment out) {
- int n = Math.min(cannedWords.length, (int) (out.byteSize() / Long.BYTES));
- for (int i = 0; i < n; i++) {
+ this.lastCollectorKey = collectorKey;
+ int wordCount = Math.min(cannedWords.length, (int) (out.byteSize() / Long.BYTES));
+ for (int i = 0; i < wordCount; i++) {
out.setAtIndex(ValueLayout.JAVA_LONG, i, cannedWords[i]);
}
- return n;
+ return wordCount;
}
@Override
- public void releaseCollector(int collectorKey) {}
+ public void releaseCollector(int collectorKey) {
+ this.lastReleasedCollectorKey = collectorKey;
+ }
+
+ @Override
+ public void releaseProvider(int providerKey) {
+ this.lastReleasedProviderKey = providerKey;
+ }
@Override
- public void close() throws IOException {}
+ public void close() {}
}
}
diff --git a/sandbox/plugins/analytics-backend-lucene/build.gradle b/sandbox/plugins/analytics-backend-lucene/build.gradle
index 3275ec49f65e5..4ad216c021736 100644
--- a/sandbox/plugins/analytics-backend-lucene/build.gradle
+++ b/sandbox/plugins/analytics-backend-lucene/build.gradle
@@ -11,6 +11,7 @@ apply plugin: 'opensearch.internal-cluster-test'
opensearchplugin {
description = 'OpenSearch plugin providing Lucene-based search execution engine'
classname = 'org.opensearch.be.lucene.LucenePlugin'
+ extendedPlugins = ['analytics-engine']
}
java { sourceCompatibility = JavaVersion.toVersion(25); targetCompatibility = JavaVersion.toVersion(25) }
@@ -24,9 +25,9 @@ configurations {
sourceSets.test.compileClasspath += configurations.calciteTestCompile
dependencies {
- // Shared types and SPI interfaces (EngineBridge, AnalyticsBackEndPlugin, etc.)
- // Also provides calcite-core transitively via api.
- api project(':sandbox:libs:analytics-framework')
+ // Shared types and SPI interfaces — provided at runtime by the parent analytics-engine plugin (extendedPlugins above).
+ compileOnly project(':sandbox:libs:analytics-framework')
+ compileOnly project(':sandbox:plugins:analytics-engine')
implementation "org.apache.logging.log4j:log4j-api:${versions.log4j}"
implementation "org.apache.logging.log4j:log4j-core:${versions.log4j}"
diff --git a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/ConversionUtils.java b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/ConversionUtils.java
new file mode 100644
index 0000000000000..fcd4edf7f311c
--- /dev/null
+++ b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/ConversionUtils.java
@@ -0,0 +1,80 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.be.lucene;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.opensearch.analytics.spi.FieldStorageInfo;
+import org.opensearch.common.io.stream.BytesStreamOutput;
+import org.opensearch.core.common.bytes.BytesReference;
+import org.opensearch.index.query.QueryBuilder;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Reusable utilities for extracting fields and values from PPL relevance function
+ * RexCall structures and serializing QueryBuilders.
+ *
+ *
PPL relevance functions encode arguments as MAP_VALUE_CONSTRUCTOR pairs:
+ * {@code func(MAP('field', $ref), MAP('query', literal), [MAP('param', literal)]...)}
+ * Each MAP has exactly 2 operands: key at index 0, value at index 1.
+ */
+final class ConversionUtils {
+
+ private ConversionUtils() {}
+
+ /**
+ * Extracts field name from a MAP_VALUE_CONSTRUCTOR operand: MAP('field', $inputRef).
+ */
+ static String extractFieldFromRelevanceMap(RexCall call, int operandIndex, List fieldStorage) {
+ RexNode operand = call.getOperands().get(operandIndex);
+ if (operand instanceof RexCall mapCall) {
+ RexNode value = mapCall.getOperands().get(1);
+ if (value instanceof RexInputRef inputRef) {
+ return FieldStorageInfo.resolve(fieldStorage, inputRef.getIndex()).getFieldName();
+ }
+ }
+ if (operand instanceof RexInputRef inputRef) {
+ return FieldStorageInfo.resolve(fieldStorage, inputRef.getIndex()).getFieldName();
+ }
+ throw new IllegalArgumentException("Cannot extract field name from operand " + operandIndex + ": " + operand);
+ }
+
+ /**
+ * Extracts string value from a MAP_VALUE_CONSTRUCTOR operand: MAP('key', 'value').
+ */
+ static String extractStringFromRelevanceMap(RexCall call, int operandIndex) {
+ RexNode operand = call.getOperands().get(operandIndex);
+ if (operand instanceof RexCall mapCall) {
+ RexNode value = mapCall.getOperands().get(1);
+ if (value instanceof RexLiteral literal) {
+ return literal.getValueAs(String.class);
+ }
+ }
+ if (operand instanceof RexLiteral literal) {
+ return literal.getValueAs(String.class);
+ }
+ throw new IllegalArgumentException("Cannot extract string from operand " + operandIndex + ": " + operand);
+ }
+
+ /**
+ * Serializes a QueryBuilder into bytes using NamedWriteable protocol.
+ */
+ static byte[] serializeQueryBuilder(QueryBuilder queryBuilder) {
+ try (BytesStreamOutput output = new BytesStreamOutput()) {
+ output.writeNamedWriteable(queryBuilder);
+ return BytesReference.toBytes(output.bytes());
+ } catch (IOException exception) {
+ throw new IllegalStateException("Failed to serialize delegated query: " + queryBuilder, exception);
+ }
+ }
+}
diff --git a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneAnalyticsBackendPlugin.java b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneAnalyticsBackendPlugin.java
index a99bc90e35f15..5a59dda788db0 100644
--- a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneAnalyticsBackendPlugin.java
+++ b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneAnalyticsBackendPlugin.java
@@ -8,27 +8,25 @@
package org.opensearch.be.lucene;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.search.IndexSearcher;
+import org.opensearch.analytics.backend.ShardScanExecutionContext;
import org.opensearch.analytics.spi.AnalyticsSearchBackendPlugin;
import org.opensearch.analytics.spi.BackendCapabilityProvider;
+import org.opensearch.analytics.spi.CommonExecutionContext;
+import org.opensearch.analytics.spi.DelegatedExpression;
import org.opensearch.analytics.spi.DelegatedPredicateSerializer;
import org.opensearch.analytics.spi.DelegationType;
import org.opensearch.analytics.spi.EngineCapability;
-import org.opensearch.analytics.spi.FieldStorageInfo;
import org.opensearch.analytics.spi.FieldType;
import org.opensearch.analytics.spi.FilterCapability;
+import org.opensearch.analytics.spi.FilterDelegationHandle;
import org.opensearch.analytics.spi.ScalarFunction;
-import org.opensearch.common.io.stream.BytesStreamOutput;
-import org.opensearch.core.common.bytes.BytesReference;
-import org.opensearch.index.query.MatchQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
+import org.opensearch.index.query.QueryShardContext;
-import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -100,12 +98,6 @@ public class LuceneAnalyticsBackendPlugin implements AnalyticsSearchBackendPlugi
FILTER_CAPS = caps;
}
- // TODO: add serializers for MATCH_PHRASE, FUZZY, WILDCARD, REGEXP, and standard ops (for performance delegation)
- private static final Map SERIALIZERS = Map.of(
- ScalarFunction.MATCH,
- LuceneAnalyticsBackendPlugin::serializeMatch
- );
-
private final LucenePlugin plugin;
public LuceneAnalyticsBackendPlugin(LucenePlugin plugin) {
@@ -137,58 +129,44 @@ public Set acceptedDelegations() {
@Override
public Map delegatedPredicateSerializers() {
- return SERIALIZERS;
+ return QuerySerializerRegistry.getSerializers();
}
};
}
private static final Logger LOGGER = LogManager.getLogger(LuceneAnalyticsBackendPlugin.class);
- // ---- Serializers ----
-
- private static byte[] serializeMatch(RexCall call, List fieldStorage) {
- String fieldName = extractFieldName(call, 0, fieldStorage);
- String queryText = extractStringLiteral(call, 1);
- MatchQueryBuilder queryBuilder = new MatchQueryBuilder(fieldName, queryText);
- byte[] bytes = serializeQueryBuilder(queryBuilder);
- LOGGER.debug(
- "Serialized MATCH delegation: field=[{}], query=[{}], QueryBuilder=[{}], bytes={}",
- fieldName,
- queryText,
- queryBuilder,
- bytes.length
- );
- return bytes;
+ @Override
+ public FilterDelegationHandle getFilterDelegationHandle(List expressions, CommonExecutionContext ctx) {
+ ShardScanExecutionContext shardCtx = (ShardScanExecutionContext) ctx;
+ DirectoryReader directoryReader = shardCtx.getReader().getReader(plugin.getDataFormat(), DirectoryReader.class);
+ IndexSearcher searcher = new IndexSearcher(directoryReader);
+ QueryShardContext queryShardContext = buildMinimalQueryShardContext(shardCtx, searcher);
+ return new LuceneFilterDelegationHandle(expressions, queryShardContext, directoryReader, shardCtx.getNamedWriteableRegistry());
}
- // ---- Helpers ----
-
- private static String extractFieldName(RexCall call, int operandIndex, List fieldStorage) {
- RexNode operand = call.getOperands().get(operandIndex);
- if (!(operand instanceof RexInputRef inputRef)) {
- throw new IllegalArgumentException(
- "Expected RexInputRef at operand " + operandIndex + ", got: " + operand.getClass().getSimpleName()
- );
- }
- return FieldStorageInfo.resolve(fieldStorage, inputRef.getIndex()).getFieldName();
+ private QueryShardContext buildMinimalQueryShardContext(ShardScanExecutionContext ctx, IndexSearcher searcher) {
+ return new QueryShardContext(
+ 0,
+ ctx.getIndexSettings(),
+ null, // bigArrays
+ null, // bitsetFilterCache
+ null, // indexFieldDataLookup
+ ctx.getMapperService(),
+ null, // similarityService
+ null, // scriptService
+ null, // xContentRegistry
+ null, // namedWriteableRegistry
+ null, // client
+ searcher,
+ System::currentTimeMillis,
+ null, // clusterAlias
+ s -> true, // indexNameMatcher
+ () -> true, // allowExpensiveQueries
+ null // valuesSourceRegistry
+ );
}
- private static String extractStringLiteral(RexCall call, int operandIndex) {
- RexNode operand = call.getOperands().get(operandIndex);
- if (!(operand instanceof RexLiteral literal)) {
- throw new IllegalArgumentException(
- "Expected RexLiteral at operand " + operandIndex + ", got: " + operand.getClass().getSimpleName()
- );
- }
- return literal.getValueAs(String.class);
- }
+ // ---- Serializers ----
- private static byte[] serializeQueryBuilder(QueryBuilder queryBuilder) {
- try (BytesStreamOutput output = new BytesStreamOutput()) {
- output.writeNamedWriteable(queryBuilder);
- return BytesReference.toBytes(output.bytes());
- } catch (IOException exception) {
- throw new IllegalStateException("Failed to serialize delegated query: " + queryBuilder, exception);
- }
- }
}
diff --git a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneFilterDelegationHandle.java b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneFilterDelegationHandle.java
new file mode 100644
index 0000000000000..6566adf5a5096
--- /dev/null
+++ b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneFilterDelegationHandle.java
@@ -0,0 +1,201 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.be.lucene;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreMode;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.Weight;
+import org.apache.lucene.util.FixedBitSet;
+import org.opensearch.analytics.spi.DelegatedExpression;
+import org.opensearch.analytics.spi.FilterDelegationHandle;
+import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput;
+import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
+import org.opensearch.core.common.io.stream.StreamInput;
+import org.opensearch.index.query.QueryBuilder;
+import org.opensearch.index.query.QueryShardContext;
+
+import java.io.IOException;
+import java.lang.foreign.MemorySegment;
+import java.lang.foreign.ValueLayout;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Lucene implementation of {@link FilterDelegationHandle}. Compiles delegated expressions
+ * into Lucene Queries, creates Weights on demand, and produces bitsets via Scorers.
+ *
+ * @opensearch.internal
+ */
+final class LuceneFilterDelegationHandle implements FilterDelegationHandle {
+
+ private static final Logger LOGGER = LogManager.getLogger(LuceneFilterDelegationHandle.class);
+
+ private final Map queriesByAnnotationId;
+ private final DirectoryReader directoryReader;
+ private final List leaves;
+
+ private final ConcurrentHashMap weightsByProviderKey = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap scorersByCollectorKey = new ConcurrentHashMap<>();
+ private final AtomicInteger nextProviderKey = new AtomicInteger(1);
+ private final AtomicInteger nextCollectorKey = new AtomicInteger(1);
+
+ // TODO: NamedWriteableRegistry should ideally come from LucenePlugin.createComponents
+ // instead of being threaded through ShardScanExecutionContext from Core.
+ LuceneFilterDelegationHandle(
+ List expressions,
+ QueryShardContext queryShardContext,
+ DirectoryReader directoryReader,
+ NamedWriteableRegistry namedWriteableRegistry
+ ) {
+ this.directoryReader = directoryReader;
+ this.leaves = directoryReader.leaves();
+ this.queriesByAnnotationId = compileQueries(expressions, queryShardContext, namedWriteableRegistry);
+ }
+
+ private static Map compileQueries(
+ List expressions,
+ QueryShardContext context,
+ NamedWriteableRegistry registry
+ ) {
+ Map queries = new HashMap<>();
+ for (DelegatedExpression expr : expressions) {
+ try {
+ StreamInput rawInput = StreamInput.wrap(expr.getExpressionBytes());
+ StreamInput input = new NamedWriteableAwareStreamInput(rawInput, registry);
+ QueryBuilder queryBuilder = input.readNamedWriteable(QueryBuilder.class);
+ Query query = queryBuilder.toQuery(context);
+ queries.put(expr.getAnnotationId(), query);
+ } catch (IOException exception) {
+ throw new IllegalStateException(
+ "Failed to deserialize delegated expression for annotationId=" + expr.getAnnotationId(),
+ exception
+ );
+ }
+ }
+ return queries;
+ }
+
+ @Override
+ public int createProvider(int annotationId) {
+ Query query = queriesByAnnotationId.get(annotationId);
+ if (query == null) {
+ return -1;
+ }
+ try {
+ IndexSearcher searcher = new IndexSearcher(directoryReader);
+ Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.COMPLETE_NO_SCORES, 1.0f);
+ int providerKey = nextProviderKey.getAndIncrement();
+ weightsByProviderKey.put(providerKey, weight);
+ return providerKey;
+ } catch (IOException exception) {
+ LOGGER.error("createProvider failed for annotationId={}", annotationId, exception);
+ return -1;
+ }
+ }
+
+ @Override
+ public int createCollector(int providerKey, int segmentOrd, int minDoc, int maxDoc) {
+ Weight weight = weightsByProviderKey.get(providerKey);
+ if (weight == null) {
+ return -1;
+ }
+ try {
+ // TODO: segmentOrd translation — parquet segment ord may differ from Lucene leaf ord
+ LeafReaderContext leaf = leaves.get(segmentOrd);
+ Scorer scorer = weight.scorer(leaf);
+ int collectorKey = nextCollectorKey.getAndIncrement();
+ scorersByCollectorKey.put(collectorKey, new ScorerHandle(scorer, minDoc, maxDoc));
+ return collectorKey;
+ } catch (IOException exception) {
+ LOGGER.error("createCollector failed for providerKey={}, seg={}", providerKey, segmentOrd, exception);
+ return -1;
+ }
+ }
+
+ @Override
+ public int collectDocs(int collectorKey, int minDoc, int maxDoc, MemorySegment out) {
+ ScorerHandle handle = scorersByCollectorKey.get(collectorKey);
+ if (handle == null) {
+ return -1;
+ }
+ if (maxDoc <= minDoc) {
+ return 0;
+ }
+ int span = maxDoc - minDoc;
+ FixedBitSet bits = new FixedBitSet(span);
+
+ if (handle.scorer != null) {
+ int scanFrom = Math.max(minDoc, handle.partitionMinDoc);
+ int scanTo = Math.min(maxDoc, handle.partitionMaxDoc);
+
+ if (scanFrom < scanTo) {
+ try {
+ DocIdSetIterator iterator = handle.scorer.iterator();
+ int docId = handle.currentDoc;
+ if (docId != DocIdSetIterator.NO_MORE_DOCS) {
+ if (docId < scanFrom) {
+ docId = iterator.advance(scanFrom);
+ }
+ while (docId != DocIdSetIterator.NO_MORE_DOCS && docId < scanTo) {
+ bits.set(docId - minDoc);
+ docId = iterator.nextDoc();
+ }
+ handle.currentDoc = docId;
+ }
+ } catch (IOException exception) {
+ LOGGER.warn("IOException during collectDocs, returning partial bitset", exception);
+ }
+ }
+ }
+
+ long[] words = bits.getBits();
+ int wordCount = (span + 63) >>> 6;
+ MemorySegment.copy(words, 0, out, ValueLayout.JAVA_LONG, 0, wordCount);
+ return wordCount;
+ }
+
+ @Override
+ public void releaseCollector(int collectorKey) {
+ scorersByCollectorKey.remove(collectorKey);
+ }
+
+ @Override
+ public void releaseProvider(int providerKey) {
+ weightsByProviderKey.remove(providerKey);
+ }
+
+ @Override
+ public void close() {
+ weightsByProviderKey.clear();
+ scorersByCollectorKey.clear();
+ }
+
+ private static final class ScorerHandle {
+ final Scorer scorer;
+ final int partitionMinDoc;
+ final int partitionMaxDoc;
+ int currentDoc = -1;
+
+ ScorerHandle(Scorer scorer, int partitionMinDoc, int partitionMaxDoc) {
+ this.scorer = scorer;
+ this.partitionMinDoc = partitionMinDoc;
+ this.partitionMaxDoc = partitionMaxDoc;
+ }
+ }
+}
diff --git a/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/QuerySerializerRegistry.java b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/QuerySerializerRegistry.java
new file mode 100644
index 0000000000000..13bda07674b22
--- /dev/null
+++ b/sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/QuerySerializerRegistry.java
@@ -0,0 +1,48 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.be.lucene;
+
+import org.apache.calcite.rex.RexCall;
+import org.opensearch.analytics.spi.DelegatedPredicateSerializer;
+import org.opensearch.analytics.spi.FieldStorageInfo;
+import org.opensearch.analytics.spi.ScalarFunction;
+import org.opensearch.index.query.MatchQueryBuilder;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Registry of per-function query serializers for delegated predicates.
+ * Each serializer converts a Calcite RexCall into serialized QueryBuilder bytes
+ * that the Lucene backend can deserialize at the data node.
+ *
+ *