Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@

import org.apache.arrow.memory.BufferAllocator;
import org.opensearch.analytics.spi.CommonExecutionContext;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.exec.IndexReaderProvider.Reader;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.tasks.Task;

/**
Expand All @@ -26,6 +29,9 @@ public class ShardScanExecutionContext implements CommonExecutionContext {
private final Task task;
private byte[] fragmentBytes;
private BufferAllocator allocator;
private MapperService mapperService;
private IndexSettings indexSettings;
private NamedWriteableRegistry namedWriteableRegistry;

/**
* Constructs an execution context.
Expand Down Expand Up @@ -73,4 +79,34 @@ public BufferAllocator getAllocator() {
public void setAllocator(BufferAllocator allocator) {
this.allocator = allocator;
}

/** Returns the shard's mapper service for field type resolution. */
public MapperService getMapperService() {
return mapperService;
}

/** Sets the shard's mapper service. */
public void setMapperService(MapperService mapperService) {
this.mapperService = mapperService;
}

/** Returns the shard's index settings. */
public IndexSettings getIndexSettings() {
return indexSettings;
}

/** Sets the shard's index settings. */
public void setIndexSettings(IndexSettings indexSettings) {
this.indexSettings = indexSettings;
}

/** Returns the NamedWriteableRegistry for deserializing delegated expressions. */
public NamedWriteableRegistry getNamedWriteableRegistry() {
return namedWriteableRegistry;
}

/** Sets the NamedWriteableRegistry. */
public void setNamedWriteableRegistry(NamedWriteableRegistry namedWriteableRegistry) {
this.namedWriteableRegistry = namedWriteableRegistry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.analytics.spi;

import java.util.List;

/**
* SPI extension point for backend query engine plugins.
*
Expand Down Expand Up @@ -81,4 +83,34 @@ default ExchangeSinkProvider getExchangeSinkProvider() {
default FragmentInstructionHandlerFactory getInstructionHandlerFactory() {
throw new UnsupportedOperationException("getInstructionHandlerFactory not implemented for [" + name() + "]");
}

/**
* Prepare a filter delegation handle for the given delegated expressions.
* Called by Core after all instruction handlers have run, when the plan has delegation.
*
* <p>The accepting backend initializes its internal state (e.g., DirectoryReader,
* QueryShardContext, compiled Queries) and returns a handle that the driving backend
* will call into during execution.
*
* @param expressions the delegated expressions (annotationId + serialized query bytes)
* @param ctx the shared execution context (Reader, MapperService, IndexSettings)
* @return a handle the driving backend calls into via FFM upcalls
*/
default FilterDelegationHandle getFilterDelegationHandle(List<DelegatedExpression> expressions, CommonExecutionContext ctx) {
throw new UnsupportedOperationException("getFilterDelegationHandle not implemented for [" + name() + "]");
}

/**
* Configure the driving backend to use the given delegation handle during execution.
* Called by Core after obtaining the handle from the accepting backend.
*
* <p>The driving backend registers the handle so that FFM upcalls from Rust
* (createProvider, createCollector, collectDocs) route to it.
*
* @param handle the delegation handle from the accepting backend
* @param backendContext the driving backend's execution context (from instruction handlers)
*/
default void configureFilterDelegation(FilterDelegationHandle handle, BackendExecutionContext backendContext) {
throw new UnsupportedOperationException("configureFilterDelegation not implemented for [" + name() + "]");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.analytics.spi;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* Describes the delegation metadata for a plan alternative. Carried on the wire
* alongside the instruction list so that Core can orchestrate the handle exchange
* between accepting and driving backends at the data node.
*
* @opensearch.internal
*/
public record DelegationDescriptor(FilterTreeShape treeShape, int delegatedPredicateCount, List<DelegatedExpression> delegatedExpressions)
implements
Writeable {

public DelegationDescriptor(StreamInput in) throws IOException {
this(in.readEnum(FilterTreeShape.class), in.readVInt(), readExpressions(in));
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeEnum(treeShape);
out.writeVInt(delegatedPredicateCount);
out.writeVInt(delegatedExpressions.size());
for (DelegatedExpression expr : delegatedExpressions) {
expr.writeTo(out);
}
}

private static List<DelegatedExpression> readExpressions(StreamInput in) throws IOException {
int count = in.readVInt();
List<DelegatedExpression> expressions = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
expressions.add(new DelegatedExpression(in));
}
return expressions;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.analytics.spi;

import java.io.Closeable;
import java.lang.foreign.MemorySegment;

/**
* Callback surface for filter delegation between a driving backend and an accepting backend.
*
* <p>One handle per query per shard. The accepting backend implements this interface;
* the driving backend calls into it via FFM upcalls during execution. Core closes it
* after execution completes.
*
* <p>Lifecycle:
* <ol>
* <li>Rust calls {@link #createProvider(int)} once per delegated predicate (per annotationId)</li>
* <li>Rust calls {@link #createCollector(int, int, int, int)} per (provider × segment)</li>
* <li>Rust calls {@link #collectDocs(int, int, int, MemorySegment)} per row group</li>
* <li>Rust calls {@link #releaseCollector(int)} when done with a segment</li>
* <li>Rust calls {@link #releaseProvider(int)} when the query ends</li>
* </ol>
*
* @opensearch.internal
*/
public interface FilterDelegationHandle extends Closeable {

/**
* Create a provider for the given annotation ID. The accepting backend looks up
* the pre-compiled query for this annotation and prepares it for segment iteration.
*
* @param annotationId the annotation ID identifying the delegated predicate
* @return a provider key {@code >= 0}, or {@code -1} on failure
*/
int createProvider(int annotationId);

/**
* Create a collector for one (segment, [minDoc, maxDoc)) range.
*
* @param providerKey key returned by {@link #createProvider(int)}
* @param segmentOrd the segment ordinal
* @param minDoc inclusive lower bound
* @param maxDoc exclusive upper bound
* @return a collector key {@code >= 0}, or {@code -1} on failure
*/
int createCollector(int providerKey, int segmentOrd, int minDoc, int maxDoc);

/**
* Fill {@code out} with the matching doc-id bitset for the given collector.
*
* <p>Bit layout: word {@code i} contains matches for docs
* {@code [minDoc + i*64, minDoc + (i+1)*64)}, LSB-first within each word.
*
* @param collectorKey key returned by {@link #createCollector(int, int, int, int)}
* @param minDoc inclusive lower bound
* @param maxDoc exclusive upper bound
* @param out destination buffer; implementation writes up to {@code out.byteSize() / 8} words
* @return number of words written, or {@code -1} on error
*/
int collectDocs(int collectorKey, int minDoc, int maxDoc, MemorySegment out);

/**
* Release resources for a collector.
*/
void releaseCollector(int collectorKey);

/**
* Release resources for a provider.
*/
void releaseProvider(int providerKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public FilterDelegationInstructionNode(StreamInput in) throws IOException {

@Override
public InstructionType type() {
return InstructionType.SETUP_FILTER_DELEGATION_FOR_INDEX;
return InstructionType.SETUP_SHARD_SCAN_WITH_DELEGATION;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ Optional<InstructionNode> createFilterDelegationNode(
List<DelegatedExpression> delegatedQueries
);

/** Creates a shard scan with delegation instruction node — combines scan setup with delegation config. */
Optional<InstructionNode> createShardScanWithDelegationNode(FilterTreeShape treeShape, int delegatedPredicateCount);

/** Creates a partial aggregate instruction node. */
Optional<InstructionNode> createPartialAggregateNode();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ public enum InstructionType {
/** Base scan setup — reader acquisition, SessionContext creation, default table provider. */
SETUP_SHARD_SCAN,
/**
* Filter delegation to an index backend — bridge setup, UDF registration, IndexedTableProvider.
* Filter delegation to an index backend — bridge setup, UDF registration, custom scan operator.
*
* <p>TODO: add a DelegationStrategy field (BACKEND_DRIVEN vs CENTRALLY_DRIVEN) to the
* instruction node when centrally-driven delegation is implemented. Currently only
* BACKEND_DRIVEN exists — derived from the backend declaring
* {@code supportedDelegations(DelegationType.FILTER)}.
*/
SETUP_FILTER_DELEGATION_FOR_INDEX,
SETUP_SHARD_SCAN_WITH_DELEGATION,
/** Partial aggregate mode — disable combine optimizer, cut plan to partial-only. */
SETUP_PARTIAL_AGGREGATE,
/** Final aggregate for coordinator reduce — ExchangeSink path, final-only agg. */
Expand All @@ -40,7 +40,7 @@ public enum InstructionType {
public InstructionNode readNode(StreamInput in) throws IOException {
return switch (this) {
case SETUP_SHARD_SCAN -> new ShardScanInstructionNode(in);
case SETUP_FILTER_DELEGATION_FOR_INDEX -> new FilterDelegationInstructionNode(in);
case SETUP_SHARD_SCAN_WITH_DELEGATION -> new ShardScanWithDelegationInstructionNode(in);
case SETUP_PARTIAL_AGGREGATE -> new PartialAggregateInstructionNode(in);
case SETUP_FINAL_AGGREGATE -> new FinalAggregateInstructionNode(in);
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.analytics.spi;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Instruction node for shard scan with filter delegation — extends base shard scan
* with {@link FilterTreeShape} and delegated predicate count so the driving backend
* can configure its indexed execution path (UDF registration, IndexedTableProvider)
* in a single FFM call.
*
* @opensearch.internal
*/
public class ShardScanWithDelegationInstructionNode extends ShardScanInstructionNode {

private final FilterTreeShape treeShape;
private final int delegatedPredicateCount;

public ShardScanWithDelegationInstructionNode(FilterTreeShape treeShape, int delegatedPredicateCount) {
this.treeShape = treeShape;
this.delegatedPredicateCount = delegatedPredicateCount;
}

public ShardScanWithDelegationInstructionNode(StreamInput in) throws IOException {
super(in);
this.treeShape = in.readEnum(FilterTreeShape.class);
this.delegatedPredicateCount = in.readVInt();
}

@Override
public InstructionType type() {
return InstructionType.SETUP_SHARD_SCAN_WITH_DELEGATION;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeEnum(treeShape);
out.writeVInt(delegatedPredicateCount);
}

public FilterTreeShape getTreeShape() {
return treeShape;
}

public int getDelegatedPredicateCount() {
return delegatedPredicateCount;
}
}
Loading
Loading