From c8d90e254b6782891a81c76c5f0c488740ab8783 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Wed, 18 Mar 2026 15:08:04 +0530 Subject: [PATCH 1/2] Crappy commit --- .gitignore | 1 + sandbox/plugins/composite-engine/build.gradle | 17 ++ .../composite/CompositeDataFormat.java | 80 ++++++ .../composite/CompositeDocumentInput.java | 130 ++++++++++ .../composite/CompositeEnginePlugin.java | 126 +++++++++ .../CompositeIndexingExecutionEngine.java | 139 ++++++++++ .../opensearch/composite/CompositeWriter.java | 104 ++++++++ .../opensearch/composite/package-info.java | 17 ++ .../composite/CompositeDataFormatTests.java | 150 +++++++++++ .../composite/CompositeEnginePluginTests.java | 239 ++++++++++++++++++ .../exec/IndexFileReferenceManager.java | 37 +++ 11 files changed, 1040 insertions(+) create mode 100644 sandbox/plugins/composite-engine/build.gradle create mode 100644 sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java create mode 100644 sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDocumentInput.java create mode 100644 sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java create mode 100644 sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java create mode 100644 sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java create mode 100644 sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/package-info.java create mode 100644 sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDataFormatTests.java create mode 100644 sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java create mode 100644 server/src/main/java/org/opensearch/index/engine/exec/IndexFileReferenceManager.java diff --git a/.gitignore b/.gitignore index 0a784701375d9..599ea7878a1c4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ .claude CLAUDE.md .cursor* +.kiro* # intellij files .idea/ diff --git a/sandbox/plugins/composite-engine/build.gradle b/sandbox/plugins/composite-engine/build.gradle new file mode 100644 index 0000000000000..efb110231508a --- /dev/null +++ b/sandbox/plugins/composite-engine/build.gradle @@ -0,0 +1,17 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +opensearchplugin { + description = 'Composite indexing engine plugin that orchestrates multi-format indexing across multiple data format engines.' + classname = 'org.opensearch.composite.CompositeEnginePlugin' +} + +dependencies { + compileOnly project(':server') + testImplementation project(':test:framework') +} diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java new file mode 100644 index 0000000000000..8a98aa69eecf1 --- /dev/null +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.dataformat.FieldTypeCapabilities; + +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * A composite {@link DataFormat} that wraps multiple per-format {@link DataFormat} instances. + * Each constituent format retains its own {@link FieldTypeCapabilities} — field routing is + * handled per-format by {@link CompositeDocumentInput}, not by this class. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class CompositeDataFormat implements DataFormat { + + private final List dataFormats; + private final DataFormat primaryDataFormat; + + /** + * Constructs a CompositeDataFormat from the given list of data formats. + * + * @param dataFormats the constituent data formats + * @param primaryDataFormat the primary data format (must be contained in {@code dataFormats}) + */ + public CompositeDataFormat(List dataFormats, DataFormat primaryDataFormat) { + this.dataFormats = List.copyOf(Objects.requireNonNull(dataFormats, "dataFormats must not be null")); + this.primaryDataFormat = Objects.requireNonNull(primaryDataFormat, "primaryDataFormat must not be null"); + } + + /** + * Returns the primary data format. + * + * @return the primary data format + */ + public DataFormat getPrimaryDataFormat() { + return primaryDataFormat; + } + + /** + * Returns the list of constituent data formats. + * + * @return the data formats + */ + public List getDataFormats() { + return dataFormats; + } + + @Override + public String name() { + return "composite"; + } + + @Override + public long priority() { + return Long.MAX_VALUE; + } + + @Override + public Set supportedFields() { + return primaryDataFormat.supportedFields(); + } + + @Override + public String toString() { + return "CompositeDataFormat{" + "dataFormats=" + dataFormats + ", primaryDataFormat=" + primaryDataFormat + '}'; + } +} diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDocumentInput.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDocumentInput.java new file mode 100644 index 0000000000000..2e614ed380d75 --- /dev/null +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDocumentInput.java @@ -0,0 +1,130 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.dataformat.DocumentInput; +import org.opensearch.index.engine.dataformat.FieldTypeCapabilities; +import org.opensearch.index.mapper.MappedFieldType; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * A composite {@link DocumentInput} that wraps one {@link DocumentInput} per registered + * data format and routes field additions to the appropriate per-format inputs based on + * {@link FieldTypeCapabilities}. + *

+ * Metadata operations ({@code setRowId}, {@code setVersion}, {@code setSeqNo}, + * {@code setPrimaryTerm}) are broadcast to all per-format inputs. Fields are routed + * only to formats whose {@link DataFormat#supportedFields()} includes a + * {@link FieldTypeCapabilities} matching the field's type name. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class CompositeDocumentInput implements DocumentInput> { + + private static final Logger logger = LogManager.getLogger(CompositeDocumentInput.class); + + private final Map> inputs; + private final Map> fieldTypeToFormats; + + /** + * Constructs a CompositeDocumentInput wrapping the given per-format inputs. + *

+ * Precomputes a routing map from field type name to the set of data formats + * that support that field type, based on each format's + * {@link DataFormat#supportedFields()}. + * + * @param inputs a map of data format to its corresponding document input + */ + public CompositeDocumentInput(Map> inputs) { + this.inputs = Collections.unmodifiableMap(Objects.requireNonNull(inputs, "inputs must not be null")); + this.fieldTypeToFormats = buildFieldTypeToFormatsMap(inputs); + } + + private static Map> buildFieldTypeToFormatsMap(Map> inputs) { + Map> routing = new HashMap<>(); + for (DataFormat format : inputs.keySet()) { + for (FieldTypeCapabilities ftc : format.supportedFields()) { + routing.computeIfAbsent(ftc.getFieldType(), k -> new HashSet<>()).add(format); + } + } + return Collections.unmodifiableMap(routing); + } + + @Override + public void addField(MappedFieldType fieldType, Object value) { + String typeName = fieldType.typeName(); + Set formats = fieldTypeToFormats.get(typeName); + if (formats == null || formats.isEmpty()) { + logger.debug("No data format supports field type [{}], field will be dropped", typeName); + return; + } + for (DataFormat format : formats) { + inputs.get(format).addField(fieldType, value); + } + } + + @Override + public void setRowId(String rowIdFieldName, long rowId) { + for (DocumentInput input : inputs.values()) { + input.setRowId(rowIdFieldName, rowId); + } + } + + @Override + public void setVersion(String fieldName, long version) { + for (DocumentInput input : inputs.values()) { + input.setVersion(fieldName, version); + } + } + + @Override + public void setSeqNo(String fieldName, long seqNo) { + for (DocumentInput input : inputs.values()) { + input.setSeqNo(fieldName, seqNo); + } + } + + @Override + public void setPrimaryTerm(String fieldName, long primaryTerm) { + for (DocumentInput input : inputs.values()) { + input.setPrimaryTerm(fieldName, primaryTerm); + } + } + + @Override + public Map getFinalInput() { + Map result = new HashMap<>(); + for (Map.Entry> entry : inputs.entrySet()) { + result.put(entry.getKey(), entry.getValue().getFinalInput()); + } + return Collections.unmodifiableMap(result); + } + + @Override + public void close() { + for (DocumentInput input : inputs.values()) { + try { + input.close(); + } catch (Exception e) { + logger.warn("Failed to close per-format DocumentInput", e); + } + } + } +} diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java new file mode 100644 index 0000000000000..d2ab39f85516b --- /dev/null +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java @@ -0,0 +1,126 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.settings.Setting; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.EngineFactory; +import org.opensearch.index.engine.InternalEngine; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.dataformat.DataFormatPlugin; +import org.opensearch.plugins.EnginePlugin; +import org.opensearch.plugins.Plugin; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * Sandbox plugin that provides a {@link CompositeIndexingExecutionEngine} for + * orchestrating multi-format indexing. Discovers {@link DataFormatPlugin} instances + * during node bootstrap and creates a composite engine when composite indexing is + * enabled for an index. + *

+ * Registers two index settings: + *

    + *
  • {@code index.composite.enabled} — activates composite indexing (default {@code false})
  • + *
  • {@code index.composite.primary_data_format} — designates the primary format (default {@code "lucene"})
  • + *
+ * + * @opensearch.experimental + */ +@ExperimentalApi +public class CompositeEnginePlugin extends Plugin implements EnginePlugin { + + private static final Logger logger = LogManager.getLogger(CompositeEnginePlugin.class); + + /** + * Index setting to enable composite indexing for an index. + * When {@code true}, the composite engine orchestrates writes across all registered data formats. + */ + public static final Setting COMPOSITE_ENABLED = Setting.boolSetting( + "index.composite.enabled", + false, + Setting.Property.IndexScope + ); + + /** + * Index setting that designates the primary data format for an index. + * The primary format is the authoritative format used for merge operations. + */ + public static final Setting PRIMARY_DATA_FORMAT = Setting.simpleString( + "index.composite.primary_data_format", + "lucene", + Setting.Property.IndexScope + ); + + /** + * Discovered {@link DataFormatPlugin} instances keyed by format name. + * When multiple plugins declare the same format name, the one with the highest + * {@link DataFormat#priority()} is retained. + */ + private volatile Map dataFormatPlugins = Map.of(); + + @Override + public List> getSettings() { + return List.of(COMPOSITE_ENABLED, PRIMARY_DATA_FORMAT); + } + + @Override + public Optional getEngineFactory(IndexSettings indexSettings) { + if (COMPOSITE_ENABLED.get(indexSettings.getSettings()) == false) { + return Optional.empty(); + } + + if (dataFormatPlugins.isEmpty()) { + logger.warn( + "Composite indexing is enabled for index [{}] but no DataFormatPlugin instances were discovered; " + + "falling back to the standard engine", + indexSettings.getIndex().getName() + ); + return Optional.empty(); + } + + String primaryFormat = PRIMARY_DATA_FORMAT.get(indexSettings.getSettings()); + if (dataFormatPlugins.containsKey(primaryFormat) == false) { + throw new IllegalArgumentException( + "Primary data format [" + + primaryFormat + + "] does not match any registered DataFormatPlugin. Available formats: " + + dataFormatPlugins.keySet() + ); + } + + return Optional.of(config -> new InternalEngine(config)); + } + + /** + * Receives {@link DataFormatPlugin} instances discovered during node bootstrap. + * When multiple plugins declare the same {@link DataFormat#name()}, the one with + * the highest {@link DataFormat#priority()} is retained. + * + * @param plugins the discovered data format plugins + */ + public void onDiscovery(Collection plugins) { + Map resolved = new HashMap<>(); + for (DataFormatPlugin plugin : plugins) { + String formatName = plugin.getDataFormat().name(); + DataFormatPlugin existing = resolved.get(formatName); + if (existing == null || plugin.getDataFormat().priority() > existing.getDataFormat().priority()) { + resolved.put(formatName, plugin); + } + } + this.dataFormatPlugins = Map.copyOf(resolved); + } +} diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java new file mode 100644 index 0000000000000..56997cf7015ee --- /dev/null +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java @@ -0,0 +1,139 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.dataformat.DocumentInput; +import org.opensearch.index.engine.dataformat.IndexingExecutionEngine; +import org.opensearch.index.engine.dataformat.Merger; +import org.opensearch.index.engine.dataformat.RefreshInput; +import org.opensearch.index.engine.dataformat.RefreshResult; +import org.opensearch.index.engine.dataformat.Writer; +import org.opensearch.index.engine.exec.Segment; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * A composite {@link IndexingExecutionEngine} that orchestrates indexing across + * multiple per-format engines behind a single interface. + *

+ * The engine delegates writer creation, refresh, file deletion, and document input + * creation to each per-format engine. A primary engine is designated based on the + * configured primary format name and is used for merge operations. + *

+ * The composite {@link DataFormat} exposed by this engine represents the union of + * all per-format supported field type capabilities. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class CompositeIndexingExecutionEngine implements IndexingExecutionEngine { + + private final List> engines; + private final IndexingExecutionEngine primaryEngine; + private final CompositeDataFormat compositeDataFormat; + + /** + * Constructs a CompositeIndexingExecutionEngine from the given per-format engines. + *

+ * Identifies the primary engine by matching {@code getDataFormat().name()} to the + * given {@code primaryFormatName}. Builds a {@link CompositeDataFormat} from the + * union of all engines' supported field type capabilities. + * + * @param engines the list of per-format indexing execution engines + * @param primaryFormatName the name of the primary data format + * @throws IllegalArgumentException if no engine matches the primary format name + */ + public CompositeIndexingExecutionEngine(List> engines, String primaryFormatName) { + this.engines = List.copyOf(Objects.requireNonNull(engines, "engines must not be null")); + Objects.requireNonNull(primaryFormatName, "primaryFormatName must not be null"); + + IndexingExecutionEngine foundPrimary = null; + for (IndexingExecutionEngine engine : this.engines) { + if (engine.getDataFormat().name().equals(primaryFormatName)) { + foundPrimary = engine; + break; + } + } + if (foundPrimary == null) { + throw new IllegalArgumentException("No engine found matching primary format name [" + primaryFormatName + "]"); + } + this.primaryEngine = foundPrimary; + + List allFormats = new ArrayList<>(); + for (IndexingExecutionEngine engine : this.engines) { + allFormats.add(engine.getDataFormat()); + } + this.compositeDataFormat = new CompositeDataFormat(allFormats, foundPrimary.getDataFormat()); + } + + @Override + public Writer createWriter(long writerGeneration) { + Map> writerMap = new LinkedHashMap<>(); + for (IndexingExecutionEngine engine : engines) { + Writer writer = engine.createWriter(writerGeneration); + writerMap.put(engine.getDataFormat(), writer); + } + return new CompositeWriter(writerMap); + } + + @Override + public Merger getMerger() { + return primaryEngine.getMerger(); + } + + @Override + public RefreshResult refresh(RefreshInput refreshInput) throws IOException { + List allSegments = new ArrayList<>(); + for (IndexingExecutionEngine engine : engines) { + RefreshResult result = engine.refresh(refreshInput); + allSegments.addAll(result.refreshedSegments()); + } + return new RefreshResult(allSegments); + } + + @Override + public DataFormat getDataFormat() { + return compositeDataFormat; + } + + @Override + public long getNativeBytesUsed() { + long total = 0; + for (IndexingExecutionEngine engine : engines) { + total += engine.getNativeBytesUsed(); + } + return total; + } + + @Override + public void deleteFiles(Map> filesToDelete) throws IOException { + for (IndexingExecutionEngine engine : engines) { + engine.deleteFiles(filesToDelete); + } + } + + @Override + public CompositeDocumentInput newDocumentInput() { + Map> inputMap = new HashMap<>(); + for (IndexingExecutionEngine engine : engines) { + DocumentInput input = engine.newDocumentInput(); + inputMap.put(engine.getDataFormat(), input); + } + return new CompositeDocumentInput(inputMap); + } +} diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java new file mode 100644 index 0000000000000..9227d6bb9e06c --- /dev/null +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java @@ -0,0 +1,104 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.dataformat.DocumentInput; +import org.opensearch.index.engine.dataformat.FileInfos; +import org.opensearch.index.engine.dataformat.WriteResult; +import org.opensearch.index.engine.dataformat.Writer; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +/** + * A composite {@link Writer} that wraps one {@link Writer} per registered data format + * and delegates write operations to each per-format writer. + *

+ * {@code addDoc()} extracts per-format inputs from the {@link CompositeDocumentInput}'s + * {@code getFinalInput()} map and calls {@code addDoc()} on each per-format writer. + * {@code flush()} aggregates {@link FileInfos} from all per-format writers. + * {@code sync()} and {@code close()} delegate to each per-format writer. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class CompositeWriter implements Writer { + + private static final Logger logger = LogManager.getLogger(CompositeWriter.class); + + private final Map> writers; + + /** + * Constructs a CompositeWriter wrapping the given per-format writers. + * + * @param writers a map of data format to its corresponding writer + */ + public CompositeWriter(Map> writers) { + this.writers = Collections.unmodifiableMap(Objects.requireNonNull(writers, "writers must not be null")); + } + + @SuppressWarnings("unchecked") + @Override + public WriteResult addDoc(CompositeDocumentInput doc) throws IOException { + Map perFormatInputs = doc.getFinalInput(); + WriteResult lastResult = null; + for (Map.Entry> entry : writers.entrySet()) { + DataFormat format = entry.getKey(); + Writer writer = entry.getValue(); + Object input = perFormatInputs.get(format); + if (input == null) { + logger.debug("No input found for data format [{}], skipping addDoc", format.name()); + continue; + } + lastResult = ((Writer>) writer).addDoc((DocumentInput) input); + if (lastResult.success() == false) { + return lastResult; + } + } + if (lastResult == null) { + return new WriteResult(true, null, -1L, -1L, -1L); + } + return lastResult; + } + + @Override + public FileInfos flush() throws IOException { + FileInfos.Builder builder = FileInfos.builder(); + for (Writer writer : writers.values()) { + FileInfos perFormatInfos = writer.flush(); + builder.putAll(perFormatInfos.getWriterFilesMap()); + } + return builder.build(); + } + + @Override + public void sync() throws IOException { + for (Writer writer : writers.values()) { + writer.sync(); + } + } + + @Override + public void close() { + for (Map.Entry> entry : writers.entrySet()) { + try { + entry.getValue().close(); + } catch (Exception e) { + logger.warn(new ParameterizedMessage("Failed to close per-format Writer for format [{}]", entry.getKey().name()), e); + } + } + } +} diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/package-info.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/package-info.java new file mode 100644 index 0000000000000..216fe2c5469ae --- /dev/null +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/package-info.java @@ -0,0 +1,17 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Composite indexing engine plugin that orchestrates multi-format indexing + * across multiple data format engines. + * @opensearch.experimental + */ +@ExperimentalApi +package org.opensearch.composite; + +import org.opensearch.common.annotation.ExperimentalApi; diff --git a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDataFormatTests.java b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDataFormatTests.java new file mode 100644 index 0000000000000..e8bc784bc7b4d --- /dev/null +++ b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDataFormatTests.java @@ -0,0 +1,150 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.dataformat.FieldTypeCapabilities; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Tests for {@link CompositeDataFormat}. + */ +public class CompositeDataFormatTests extends OpenSearchTestCase { + + public void testNameReturnsComposite() { + DataFormat primary = createMockDataFormat("lucene", 1, Set.of()); + CompositeDataFormat format = new CompositeDataFormat(List.of(primary), primary); + assertEquals("composite", format.name()); + } + + public void testPriorityReturnsMaxValue() { + DataFormat primary = createMockDataFormat("lucene", 1, Set.of()); + CompositeDataFormat format = new CompositeDataFormat(List.of(primary), primary); + assertEquals(Long.MAX_VALUE, format.priority()); + } + + public void testSupportedFieldsDelegatesToPrimaryFormat() { + FieldTypeCapabilities primaryCap = new FieldTypeCapabilities("keyword", Set.of(FieldTypeCapabilities.Capability.FULL_TEXT_SEARCH)); + FieldTypeCapabilities secondaryCap = new FieldTypeCapabilities( + "integer", + Set.of(FieldTypeCapabilities.Capability.COLUMNAR_STORAGE) + ); + DataFormat primary = createMockDataFormat("lucene", 1, Set.of(primaryCap)); + DataFormat secondary = createMockDataFormat("parquet", 2, Set.of(secondaryCap)); + + CompositeDataFormat composite = new CompositeDataFormat(List.of(primary, secondary), primary); + + // supportedFields() should return only the primary's fields, not the union + assertEquals(1, composite.supportedFields().size()); + assertTrue(composite.supportedFields().contains(primaryCap)); + assertFalse(composite.supportedFields().contains(secondaryCap)); + } + + public void testGetPrimaryDataFormat() { + DataFormat lucene = createMockDataFormat("lucene", 1, Set.of()); + DataFormat parquet = createMockDataFormat("parquet", 2, Set.of()); + CompositeDataFormat composite = new CompositeDataFormat(List.of(lucene, parquet), lucene); + assertSame(lucene, composite.getPrimaryDataFormat()); + } + + public void testGetDataFormats() { + DataFormat lucene = createMockDataFormat("lucene", 1, Set.of()); + DataFormat parquet = createMockDataFormat("parquet", 2, Set.of()); + CompositeDataFormat composite = new CompositeDataFormat(List.of(lucene, parquet), lucene); + assertEquals(2, composite.getDataFormats().size()); + assertSame(lucene, composite.getDataFormats().get(0)); + assertSame(parquet, composite.getDataFormats().get(1)); + } + + public void testGetDataFormatsIsUnmodifiable() { + DataFormat primary = createMockDataFormat("lucene", 1, Set.of()); + CompositeDataFormat composite = new CompositeDataFormat(List.of(primary), primary); + expectThrows(UnsupportedOperationException.class, () -> composite.getDataFormats().add(createMockDataFormat("x", 0, Set.of()))); + } + + public void testConstructorRejectsNullDataFormats() { + DataFormat primary = createMockDataFormat("lucene", 1, Set.of()); + expectThrows(NullPointerException.class, () -> new CompositeDataFormat(null, primary)); + } + + public void testConstructorRejectsNullPrimaryDataFormat() { + DataFormat format = createMockDataFormat("lucene", 1, Set.of()); + expectThrows(NullPointerException.class, () -> new CompositeDataFormat(List.of(format), null)); + } + + public void testToString() { + DataFormat primary = createMockDataFormat("lucene", 1, Set.of()); + CompositeDataFormat composite = new CompositeDataFormat(List.of(primary), primary); + String str = composite.toString(); + assertTrue(str.contains("CompositeDataFormat")); + assertTrue(str.contains("dataFormats=")); + assertTrue(str.contains("primaryDataFormat=")); + } + + // Feature: composite-indexing-plugin, Property 2: supportedFields delegates to primary format + // Validates: Requirements 2.4 + public void testPropertySupportedFieldsDelegatesToPrimary() { + String[] fieldTypes = { "keyword", "text", "integer", "long", "float", "double", "date", "boolean", "geo_point", "binary" }; + FieldTypeCapabilities.Capability[] allCapabilities = FieldTypeCapabilities.Capability.values(); + + for (int iteration = 0; iteration < 100; iteration++) { + int numFormats = randomIntBetween(1, 5); + List formats = new ArrayList<>(); + + for (int f = 0; f < numFormats; f++) { + Set formatCaps = new HashSet<>(); + int numCaps = randomIntBetween(0, 4); + for (int c = 0; c < numCaps; c++) { + String fieldType = randomFrom(fieldTypes); + Set caps = new HashSet<>(); + int capCount = randomIntBetween(1, allCapabilities.length); + for (int k = 0; k < capCount; k++) { + caps.add(randomFrom(allCapabilities)); + } + formatCaps.add(new FieldTypeCapabilities(fieldType, caps)); + } + formats.add(createMockDataFormat("format-" + f, f, formatCaps)); + } + + DataFormat primary = formats.get(randomIntBetween(0, formats.size() - 1)); + CompositeDataFormat composite = new CompositeDataFormat(formats, primary); + + // supportedFields() must equal the primary's supportedFields() + assertEquals( + "supportedFields() must match primary format's fields [iteration=" + iteration + "]", + primary.supportedFields(), + composite.supportedFields() + ); + } + } + + private DataFormat createMockDataFormat(String name, long priority, Set supportedFields) { + return new DataFormat() { + @Override + public String name() { + return name; + } + + @Override + public long priority() { + return priority; + } + + @Override + public Set supportedFields() { + return supportedFields; + } + }; + } +} diff --git a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java new file mode 100644 index 0000000000000..148865e94160d --- /dev/null +++ b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java @@ -0,0 +1,239 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.opensearch.Version; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.EngineFactory; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.dataformat.DataFormatPlugin; +import org.opensearch.index.engine.dataformat.DocumentInput; +import org.opensearch.index.engine.dataformat.FieldTypeCapabilities; +import org.opensearch.index.engine.dataformat.IndexingExecutionEngine; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +/** + * Tests for {@link CompositeEnginePlugin}. + */ +public class CompositeEnginePluginTests extends OpenSearchTestCase { + + // --- getSettings tests --- + + public void testGetSettingsReturnsBothSettings() { + CompositeEnginePlugin plugin = new CompositeEnginePlugin(); + List> settings = plugin.getSettings(); + assertEquals(2, settings.size()); + assertTrue(settings.contains(CompositeEnginePlugin.COMPOSITE_ENABLED)); + assertTrue(settings.contains(CompositeEnginePlugin.PRIMARY_DATA_FORMAT)); + } + + // --- getEngineFactory tests --- + + public void testGetEngineFactoryReturnsEmptyWhenCompositeDisabled() { + CompositeEnginePlugin plugin = new CompositeEnginePlugin(); + IndexSettings indexSettings = createIndexSettings(false, "lucene"); + Optional factory = plugin.getEngineFactory(indexSettings); + assertTrue(factory.isEmpty()); + } + + public void testGetEngineFactoryReturnsFactoryWhenCompositeEnabledAndPrimaryFormatMatches() { + CompositeEnginePlugin plugin = new CompositeEnginePlugin(); + plugin.onDiscovery(List.of(createMockDataFormatPlugin("lucene", 1))); + IndexSettings indexSettings = createIndexSettings(true, "lucene"); + Optional factory = plugin.getEngineFactory(indexSettings); + assertTrue(factory.isPresent()); + } + + public void testGetEngineFactoryThrowsWhenPrimaryFormatDoesNotMatch() { + CompositeEnginePlugin plugin = new CompositeEnginePlugin(); + plugin.onDiscovery(List.of(createMockDataFormatPlugin("lucene", 1))); + IndexSettings indexSettings = createIndexSettings(true, "parquet"); + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> plugin.getEngineFactory(indexSettings)); + assertTrue(ex.getMessage().contains("parquet")); + assertTrue(ex.getMessage().contains("does not match any registered DataFormatPlugin")); + } + + public void testGetEngineFactoryReturnsEmptyWhenNoPluginsDiscoveredAndCompositeEnabled() { + CompositeEnginePlugin plugin = new CompositeEnginePlugin(); + // No onDiscovery call — dataFormatPlugins is empty + IndexSettings indexSettings = createIndexSettings(true, "lucene"); + Optional factory = plugin.getEngineFactory(indexSettings); + assertTrue(factory.isEmpty()); + } + + // --- onDiscovery tests --- + + public void testOnDiscoveryWithEmptyCollection() { + CompositeEnginePlugin plugin = new CompositeEnginePlugin(); + plugin.onDiscovery(Collections.emptyList()); + // Composite enabled but no plugins → should return empty + IndexSettings indexSettings = createIndexSettings(true, "lucene"); + Optional factory = plugin.getEngineFactory(indexSettings); + assertTrue(factory.isEmpty()); + } + + public void testOnDiscoveryWithOnePlugin() { + CompositeEnginePlugin plugin = new CompositeEnginePlugin(); + plugin.onDiscovery(List.of(createMockDataFormatPlugin("lucene", 1))); + IndexSettings indexSettings = createIndexSettings(true, "lucene"); + Optional factory = plugin.getEngineFactory(indexSettings); + assertTrue(factory.isPresent()); + } + + public void testOnDiscoveryWithMultiplePlugins() { + CompositeEnginePlugin plugin = new CompositeEnginePlugin(); + plugin.onDiscovery(List.of(createMockDataFormatPlugin("lucene", 1), createMockDataFormatPlugin("parquet", 2))); + // Both formats should be available + IndexSettings luceneSettings = createIndexSettings(true, "lucene"); + assertTrue(plugin.getEngineFactory(luceneSettings).isPresent()); + + IndexSettings parquetSettings = createIndexSettings(true, "parquet"); + assertTrue(plugin.getEngineFactory(parquetSettings).isPresent()); + } + + public void testOnDiscoveryDuplicateFormatNameKeepsHighestPriority() { + CompositeEnginePlugin plugin = new CompositeEnginePlugin(); + DataFormatPlugin lowPriority = createMockDataFormatPlugin("lucene", 1); + DataFormatPlugin highPriority = createMockDataFormatPlugin("lucene", 100); + plugin.onDiscovery(List.of(lowPriority, highPriority)); + + // The plugin should still work with "lucene" format — the high-priority one wins + IndexSettings indexSettings = createIndexSettings(true, "lucene"); + assertTrue(plugin.getEngineFactory(indexSettings).isPresent()); + } + + public void testOnDiscoveryDuplicateFormatNameKeepsHighestPriorityRegardlessOfOrder() { + CompositeEnginePlugin plugin = new CompositeEnginePlugin(); + DataFormatPlugin highPriority = createMockDataFormatPlugin("lucene", 100); + DataFormatPlugin lowPriority = createMockDataFormatPlugin("lucene", 1); + // High priority first, then low priority — should still keep high priority + plugin.onDiscovery(List.of(highPriority, lowPriority)); + + IndexSettings indexSettings = createIndexSettings(true, "lucene"); + assertTrue(plugin.getEngineFactory(indexSettings).isPresent()); + } + + // Feature: composite-indexing-plugin, Property 10: Primary format validation + // Validates: Requirements 7.3 + // + // For any primary format name and any set of registered DataFormatPlugin instances, + // the CompositeEnginePlugin should accept the configuration if and only if the + // primary format name matches a registered plugin's DataFormat.name(). + public void testPropertyPrimaryFormatValidation() { + int iterations = 100; + for (int i = 0; i < iterations; i++) { + // Generate a random set of format names (1-5 formats) + int formatCount = randomIntBetween(1, 5); + String[] formatNames = new String[formatCount]; + for (int j = 0; j < formatCount; j++) { + formatNames[j] = randomAlphaOfLength(randomIntBetween(3, 10)); + } + + // Register plugins for all generated format names + CompositeEnginePlugin plugin = new CompositeEnginePlugin(); + List plugins = new ArrayList<>(); + for (String name : formatNames) { + plugins.add(createMockDataFormatPlugin(name, randomLongBetween(1, 1000))); + } + plugin.onDiscovery(plugins); + + // Case 1: Primary format matches one of the registered formats → factory is present + String matchingFormat = formatNames[randomIntBetween(0, formatCount - 1)]; + IndexSettings matchSettings = createIndexSettings(true, matchingFormat); + Optional matchResult = plugin.getEngineFactory(matchSettings); + assertTrue( + "Expected factory to be present when primary format [" + + matchingFormat + + "] matches a registered plugin (iteration " + + i + + ")", + matchResult.isPresent() + ); + + // Case 2: Primary format does NOT match any registered format → throws IllegalArgumentException + String nonMatchingFormat = randomValueOtherThanMany( + candidate -> Arrays.asList(formatNames).contains(candidate), + () -> randomAlphaOfLength(randomIntBetween(3, 10)) + ); + IndexSettings noMatchSettings = createIndexSettings(true, nonMatchingFormat); + IllegalArgumentException ex = expectThrows( + IllegalArgumentException.class, + "Expected IllegalArgumentException when primary format [" + + nonMatchingFormat + + "] does not match any registered plugin (iteration " + + i + + ")", + () -> plugin.getEngineFactory(noMatchSettings) + ); + assertTrue("Error message should contain the non-matching format name", ex.getMessage().contains(nonMatchingFormat)); + } + } + + // --- Helper methods --- + + private IndexSettings createIndexSettings(boolean compositeEnabled, String primaryFormat) { + Settings settings = Settings.builder() + .put("index.composite.enabled", compositeEnabled) + .put("index.composite.primary_data_format", primaryFormat) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetadata indexMetadata = IndexMetadata.builder("test-index").settings(settings).build(); + return new IndexSettings(indexMetadata, Settings.EMPTY); + } + + private DataFormatPlugin createMockDataFormatPlugin(String formatName, long priority) { + DataFormat dataFormat = new DataFormat() { + @Override + public String name() { + return formatName; + } + + @Override + public long priority() { + return priority; + } + + @Override + public Set supportedFields() { + return Set.of(new FieldTypeCapabilities("keyword", Set.of(FieldTypeCapabilities.Capability.FULL_TEXT_SEARCH))); + } + }; + + return new DataFormatPlugin() { + @Override + public DataFormat getDataFormat() { + return dataFormat; + } + + @Override + @SuppressWarnings("unchecked") + public > IndexingExecutionEngine indexingEngine( + MapperService mapperService, + ShardPath shardPath, + IndexSettings indexSettings + ) { + return null; // Not needed for plugin-level tests + } + }; + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/IndexFileReferenceManager.java b/server/src/main/java/org/opensearch/index/engine/exec/IndexFileReferenceManager.java new file mode 100644 index 0000000000000..353ebe6ec699f --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/IndexFileReferenceManager.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec; + +import org.opensearch.common.annotation.ExperimentalApi; + +/** + * Manages reference counting for index files to prevent premature deletion. + * Tracks which files are in use by catalog snapshots and ensures files are only + * deleted when no longer referenced by any snapshot. + * + * @opensearch.experimental + */ +@ExperimentalApi +public interface IndexFileReferenceManager { + /** + * Adds file references for all files in the given catalog snapshot. + * Increments reference counts to prevent deletion while the snapshot is in use. + * + * @param snapshot the catalog snapshot whose files should be referenced + */ + void addFileReferences(CatalogSnapshot snapshot); + + /** + * Removes file references for all files in the given catalog snapshot. + * Decrements reference counts and may trigger deletion of unreferenced files. + * + * @param snapshot the catalog snapshot whose file references should be removed + */ + void removeFileReferences(CatalogSnapshot snapshot); +} From 392e1fdba8dfdc0d6bbd1106ec6f552607b10e1d Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Wed, 18 Mar 2026 18:24:48 +0530 Subject: [PATCH 2/2] Add few changes for the indexing --- .../libs/composite-engine-lib/build.gradle | 21 ++ .../composite/queue/ConcurrentQueue.java | 140 ++++++++++ .../queue/LockableConcurrentQueue.java | 65 +++++ .../composite/queue/package-info.java | 14 + sandbox/plugins/composite-engine/build.gradle | 1 + .../composite/CompositeDataFormat.java | 9 +- .../CompositeDataFormatWriterPool.java | 138 ++++++++++ .../composite/CompositeEnginePlugin.java | 162 ++++++++---- .../CompositeIndexingExecutionEngine.java | 164 +++++++++--- .../opensearch/composite/CompositeWriter.java | 53 +++- .../composite/CompositeDataFormatTests.java | 150 ----------- .../composite/CompositeEnginePluginTests.java | 239 ------------------ 12 files changed, 685 insertions(+), 471 deletions(-) create mode 100644 sandbox/libs/composite-engine-lib/build.gradle create mode 100644 sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/ConcurrentQueue.java create mode 100644 sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/LockableConcurrentQueue.java create mode 100644 sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/package-info.java create mode 100644 sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormatWriterPool.java delete mode 100644 sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDataFormatTests.java delete mode 100644 sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java diff --git a/sandbox/libs/composite-engine-lib/build.gradle b/sandbox/libs/composite-engine-lib/build.gradle new file mode 100644 index 0000000000000..1fa0c39d443f3 --- /dev/null +++ b/sandbox/libs/composite-engine-lib/build.gradle @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Shared concurrent queue utilities for the composite indexing engine. + * No external dependencies — pure Java concurrency primitives. + */ + +dependencies { +} + +testingConventions.enabled = false + +tasks.named('forbiddenApisMain').configure { + replaceSignatureFiles 'jdk-signatures' +} diff --git a/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/ConcurrentQueue.java b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/ConcurrentQueue.java new file mode 100644 index 0000000000000..08eee90a5f54a --- /dev/null +++ b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/ConcurrentQueue.java @@ -0,0 +1,140 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite.queue; + +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Predicate; +import java.util.function.Supplier; + +/** + * A striped concurrent queue that distributes entries across multiple internal + * queues using thread-affinity-based hashing. This reduces contention by allowing + * concurrent threads to operate on different stripes without blocking each other. + * + * @param the type of elements held in this queue + * @opensearch.experimental + */ +public final class ConcurrentQueue { + + static final int MIN_CONCURRENCY = 1; + static final int MAX_CONCURRENCY = 256; + + private final int concurrency; + private final Lock[] locks; + private final Queue[] queues; + private final Supplier> queueSupplier; + + ConcurrentQueue(Supplier> queueSupplier, int concurrency) { + if (concurrency < MIN_CONCURRENCY || concurrency > MAX_CONCURRENCY) { + throw new IllegalArgumentException( + "concurrency must be in [" + MIN_CONCURRENCY + ", " + MAX_CONCURRENCY + "], got " + concurrency + ); + } + this.concurrency = concurrency; + this.queueSupplier = queueSupplier; + locks = new Lock[concurrency]; + @SuppressWarnings({ "rawtypes", "unchecked" }) + Queue[] queues = new Queue[concurrency]; + this.queues = queues; + for (int i = 0; i < concurrency; ++i) { + locks[i] = new ReentrantLock(); + queues[i] = queueSupplier.get(); + } + } + + void add(T entry) { + // Seed the order in which to look at entries based on the current thread. This helps distribute + // entries across queues and gives a bit of thread affinity between entries and threads, which + // can't hurt. + final int threadHash = Thread.currentThread().hashCode() & 0xFFFF; + for (int i = 0; i < concurrency; ++i) { + final int index = (threadHash + i) % concurrency; + final Lock lock = locks[index]; + final Queue queue = queues[index]; + if (lock.tryLock()) { + try { + queue.add(entry); + return; + } finally { + lock.unlock(); + } + } + } + final int index = threadHash % concurrency; + final Lock lock = locks[index]; + final Queue queue = queues[index]; + lock.lock(); + try { + queue.add(entry); + } finally { + lock.unlock(); + } + } + + T poll(Predicate predicate) { + final int threadHash = Thread.currentThread().hashCode() & 0xFFFF; + for (int i = 0; i < concurrency; ++i) { + final int index = (threadHash + i) % concurrency; + final Lock lock = locks[index]; + final Queue queue = queues[index]; + if (lock.tryLock()) { + try { + Iterator it = queue.iterator(); + while (it.hasNext()) { + T entry = it.next(); + if (predicate.test(entry)) { + it.remove(); + return entry; + } + } + } finally { + lock.unlock(); + } + } + } + for (int i = 0; i < concurrency; ++i) { + final int index = (threadHash + i) % concurrency; + final Lock lock = locks[index]; + final Queue queue = queues[index]; + lock.lock(); + try { + Iterator it = queue.iterator(); + while (it.hasNext()) { + T entry = it.next(); + if (predicate.test(entry)) { + it.remove(); + return entry; + } + } + } finally { + lock.unlock(); + } + } + return null; + } + + boolean remove(T entry) { + for (int i = 0; i < concurrency; ++i) { + final Lock lock = locks[i]; + final Queue queue = queues[i]; + lock.lock(); + try { + if (queue.remove(entry)) { + return true; + } + } finally { + lock.unlock(); + } + } + return false; + } +} diff --git a/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/LockableConcurrentQueue.java b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/LockableConcurrentQueue.java new file mode 100644 index 0000000000000..a65107f487c49 --- /dev/null +++ b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/LockableConcurrentQueue.java @@ -0,0 +1,65 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite.queue; + +import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.function.Supplier; + +/** + * A concurrent queue wrapper that adds lock-and-poll / add-and-unlock semantics + * on top of {@link ConcurrentQueue}. Entries must implement {@link Lock} so that + * they can be atomically locked when polled and unlocked when returned. + *

+ * This is used by the composite writer pool to ensure that a writer is locked + * before it is handed out and unlocked when it is returned. + * + * @param the type of lockable elements held in this queue + * @opensearch.experimental + */ +public final class LockableConcurrentQueue { + + private final ConcurrentQueue queue; + private final AtomicInteger addAndUnlockCounter = new AtomicInteger(); + + public LockableConcurrentQueue(Supplier> queueSupplier, int concurrency) { + this.queue = new ConcurrentQueue<>(queueSupplier, concurrency); + } + + /** + * Lock an entry, and poll it from the queue, in that order. If no entry can be found and locked, + * {@code null} is returned. + */ + public T lockAndPoll() { + int addAndUnlockCount; + do { + addAndUnlockCount = addAndUnlockCounter.get(); + T entry = queue.poll(Lock::tryLock); + if (entry != null) { + return entry; + } + // If an entry has been added to the queue in the meantime, try again. + } while (addAndUnlockCount != addAndUnlockCounter.get()); + + return null; + } + + /** Remove an entry from the queue. */ + public boolean remove(T entry) { + return queue.remove(entry); + } + + /** Add an entry to the queue and unlock it, in that order. */ + public void addAndUnlock(T entry) { + queue.add(entry); + entry.unlock(); + addAndUnlockCounter.incrementAndGet(); + } +} diff --git a/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/package-info.java b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/package-info.java new file mode 100644 index 0000000000000..9e52cbabf073b --- /dev/null +++ b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/package-info.java @@ -0,0 +1,14 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Concurrent queue utilities for the composite indexing engine. + * + * @opensearch.experimental + */ +package org.opensearch.composite.queue; diff --git a/sandbox/plugins/composite-engine/build.gradle b/sandbox/plugins/composite-engine/build.gradle index efb110231508a..8aa122a1e1e8e 100644 --- a/sandbox/plugins/composite-engine/build.gradle +++ b/sandbox/plugins/composite-engine/build.gradle @@ -12,6 +12,7 @@ opensearchplugin { } dependencies { + api project(':sandbox:libs:composite-engine-lib') compileOnly project(':server') testImplementation project(':test:framework') } diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java index 8a98aa69eecf1..c34716c0544c5 100644 --- a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java @@ -36,8 +36,13 @@ public class CompositeDataFormat implements DataFormat { * @param primaryDataFormat the primary data format (must be contained in {@code dataFormats}) */ public CompositeDataFormat(List dataFormats, DataFormat primaryDataFormat) { - this.dataFormats = List.copyOf(Objects.requireNonNull(dataFormats, "dataFormats must not be null")); - this.primaryDataFormat = Objects.requireNonNull(primaryDataFormat, "primaryDataFormat must not be null"); + this.dataFormats = dataFormats; + this.primaryDataFormat = primaryDataFormat; + } + + public CompositeDataFormat() { + this.dataFormats = List.of(); + this.primaryDataFormat = null; } /** diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormatWriterPool.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormatWriterPool.java new file mode 100644 index 0000000000000..467812eeb1624 --- /dev/null +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormatWriterPool.java @@ -0,0 +1,138 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.composite.queue.LockableConcurrentQueue; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.function.Supplier; + +public class CompositeWriterPool implements Iterable, Closeable { + + private final Set writers; + private final LockableConcurrentQueue availableWriters; + private final Supplier writerSupplier; + private volatile boolean closed; + + public CompositeWriterPool( + Supplier writerSupplier, + Supplier> queueSupplier, + int concurrency + ) { + this.writers = Collections.newSetFromMap(new IdentityHashMap<>()); + this.writerSupplier = writerSupplier; + this.availableWriters = new LockableConcurrentQueue<>(queueSupplier, concurrency); + } + + /** + * This method is used by CompositeIndexingExecutionEngine to grab a writer from the pool to perform an indexing + * operation. + * + * @return a pooled CompositeWriter if available, or a newly created instance if none are available + */ + public CompositeWriter getAndLock() { + ensureOpen(); + CompositeWriter CompositeWriter = availableWriters.lockAndPoll(); + return Objects.requireNonNullElseGet(CompositeWriter, this::fetchWriter); + } + + /** + * Create a new {@link CompositeWriter} to be added to this pool. + * + * @return a new instance of {@link CompositeWriter} + */ + private synchronized CompositeWriter fetchWriter() { + ensureOpen(); + CompositeWriter CompositeWriter = writerSupplier.get(); + CompositeWriter.lock(); + writers.add(CompositeWriter); + return CompositeWriter; + } + + /** + * Release the given {@link CompositeWriter} to this pool for reuse if it is currently managed by this + * pool. + * + * @param state {@link CompositeWriter} to release to the pool. + */ + public void releaseAndUnlock(CompositeWriter state) { + assert + !state.isFlushPending() && !state.isAborted() : + "CompositeWriter has pending flush: " + state.isFlushPending() + " aborted=" + state.isAborted(); + assert isRegistered(state) : "CompositeDocumentWriterPool doesn't know about this CompositeWriter"; + availableWriters.addAndUnlock(state); + } + + /** + * Lock and checkout all CompositeWriters from the pool for flush. + * + * @return Unmodifiable list of all CompositeWriters locked by current thread. + */ + public List checkoutAll() { + ensureOpen(); + List lockedWriters = new ArrayList<>(); + List checkedOutWriters = new ArrayList<>(); + for (CompositeWriter CompositeWriter : this) { + CompositeWriter.lock(); + lockedWriters.add(CompositeWriter); + } + synchronized (this) { + for (CompositeWriter CompositeWriter : lockedWriters) { + try { + // Release this writer if it’s no longer managed by this pool; otherwise, check it out. + if (isRegistered(CompositeWriter) && writers.remove(CompositeWriter)) { + availableWriters.remove(CompositeWriter); + CompositeWriter.setFlushPending(); + checkedOutWriters.add(CompositeWriter); + } + } finally { + CompositeWriter.unlock(); + } + } + } + return Collections.unmodifiableList(checkedOutWriters); + } + + /** + * Check if {@link CompositeWriter} is part of this pool. + * + * @param perThread {@link CompositeWriter} to validate. + * @return true if {@link CompositeWriter} is part of this pool, false otherwise. + */ + synchronized boolean isRegistered(CompositeWriter perThread) { + return writers.contains(perThread); + } + + private void ensureOpen() { + if (closed) { + throw new AlreadyClosedException("CompositeDocumentWriterPool is already closed"); + } + } + + @Override + public synchronized Iterator iterator() { + return List.copyOf(writers).iterator(); + } + + @Override + public void close() throws IOException { + this.closed = true; + } +} + diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java index d2ab39f85516b..a89477497ff02 100644 --- a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java @@ -12,46 +12,75 @@ import org.apache.logging.log4j.Logger; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; -import org.opensearch.index.engine.EngineFactory; -import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.dataformat.DataFormat; import org.opensearch.index.engine.dataformat.DataFormatPlugin; -import org.opensearch.plugins.EnginePlugin; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.plugins.ExtensiblePlugin; import org.opensearch.plugins.Plugin; -import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Optional; /** * Sandbox plugin that provides a {@link CompositeIndexingExecutionEngine} for * orchestrating multi-format indexing. Discovers {@link DataFormatPlugin} instances - * during node bootstrap and creates a composite engine when composite indexing is - * enabled for an index. + * during node bootstrap via the {@link ExtensiblePlugin} SPI and creates a composite + * engine when composite indexing is enabled for an index. *

* Registers two index settings: *

    *
  • {@code index.composite.enabled} — activates composite indexing (default {@code false})
  • *
  • {@code index.composite.primary_data_format} — designates the primary format (default {@code "lucene"})
  • *
+ *

+ * Format plugins (e.g., Parquet) extend this plugin by declaring + * {@code extendedPlugins = ['composite-engine']} in their {@code build.gradle} + * and implementing {@link DataFormatPlugin}. The {@link ExtensiblePlugin} SPI + * discovers them automatically during node bootstrap. * * @opensearch.experimental */ @ExperimentalApi -public class CompositeEnginePlugin extends Plugin implements EnginePlugin { +public class CompositeEnginePlugin extends Plugin implements ExtensiblePlugin, DataFormatPlugin { private static final Logger logger = LogManager.getLogger(CompositeEnginePlugin.class); /** * Index setting to enable composite indexing for an index. * When {@code true}, the composite engine orchestrates writes across all registered data formats. + * Validates that the primary data format is non-empty when enabled. */ public static final Setting COMPOSITE_ENABLED = Setting.boolSetting( "index.composite.enabled", false, + new Setting.Validator<>() { + @Override + public void validate(Boolean value) {} + + @Override + public void validate(Boolean enabled, Map, Object> settings) { + if (enabled) { + String primary = (String) settings.get(PRIMARY_DATA_FORMAT); + if (primary == null || primary.isEmpty()) { + throw new IllegalArgumentException( + "[index.composite.enabled] requires [index.composite.primary_data_format] to be set" + ); + } + } + } + + @Override + public Iterator> settings() { + return List.>of(PRIMARY_DATA_FORMAT, SECONDARY_DATA_FORMATS).iterator(); + } + }, Setting.Property.IndexScope ); @@ -65,6 +94,18 @@ public class CompositeEnginePlugin extends Plugin implements EnginePlugin { Setting.Property.IndexScope ); + /** + * Index setting that lists the secondary data formats for an index. + * Secondary formats receive writes alongside the primary but are not used + * as the merge authority. + */ + public static final Setting> SECONDARY_DATA_FORMATS = Setting.listSetting( + "index.composite.secondary_data_formats", + Collections.emptyList(), + s -> s, + Setting.Property.IndexScope + ); + /** * Discovered {@link DataFormatPlugin} instances keyed by format name. * When multiple plugins declare the same format name, the one with the highest @@ -72,55 +113,88 @@ public class CompositeEnginePlugin extends Plugin implements EnginePlugin { */ private volatile Map dataFormatPlugins = Map.of(); + /** Creates a new composite engine plugin. */ + public CompositeEnginePlugin() {} + + + @Override + public void loadExtensions(ExtensionLoader loader) { + List formatPlugins = loader.loadExtensions(DataFormatPlugin.class); + Map registry = new HashMap<>(); + for (DataFormatPlugin plugin : formatPlugins) { + DataFormat format = plugin.getDataFormat(); + if (format == null) { + logger.warn("DataFormatPlugin [{}] returned null DataFormat, skipping", plugin.getClass().getName()); + continue; + } + String name = format.name(); + DataFormatPlugin existing = registry.get(name); + if (existing != null) { + long existingPriority = existing.getDataFormat().priority(); + if (format.priority() <= existingPriority) { + logger.debug( + "Skipping DataFormatPlugin [{}] for format [{}] (priority {} <= existing {})", + plugin.getClass().getName(), + name, + format.priority(), + existingPriority + ); + continue; + } + logger.info( + "Replacing DataFormatPlugin for format [{}] (priority {} > existing {})", + name, + format.priority(), + existingPriority + ); + } + registry.put(name, plugin); + logger.info("Registered DataFormatPlugin [{}] for format [{}]", plugin.getClass().getName(), name); + } + this.dataFormatPlugins = Collections.unmodifiableMap(registry); + } + + @Override public List> getSettings() { - return List.of(COMPOSITE_ENABLED, PRIMARY_DATA_FORMAT); + return List.of(COMPOSITE_ENABLED, PRIMARY_DATA_FORMAT, SECONDARY_DATA_FORMATS); } @Override - public Optional getEngineFactory(IndexSettings indexSettings) { - if (COMPOSITE_ENABLED.get(indexSettings.getSettings()) == false) { - return Optional.empty(); + public void onIndexModule(IndexModule indexModule) { + Settings settings = indexModule.getSettings(); + boolean compositeEnabled = COMPOSITE_ENABLED.get(settings); + if (compositeEnabled == false) { + return; } - if (dataFormatPlugins.isEmpty()) { - logger.warn( - "Composite indexing is enabled for index [{}] but no DataFormatPlugin instances were discovered; " - + "falling back to the standard engine", - indexSettings.getIndex().getName() - ); - return Optional.empty(); - } + String primaryFormatName = PRIMARY_DATA_FORMAT.get(settings); + List secondaryFormatNames = SECONDARY_DATA_FORMATS.get(settings); + CompositeIndexingExecutionEngine.validateFormatsRegistered(dataFormatPlugins, primaryFormatName, secondaryFormatNames); + } - String primaryFormat = PRIMARY_DATA_FORMAT.get(indexSettings.getSettings()); - if (dataFormatPlugins.containsKey(primaryFormat) == false) { - throw new IllegalArgumentException( - "Primary data format [" - + primaryFormat - + "] does not match any registered DataFormatPlugin. Available formats: " - + dataFormatPlugins.keySet() - ); - } + @Override + public DataFormat getDataFormat() { + // TODO: Dataformat for Composite is per index, while this one talks about cluster level. Switching it off for now + return null; + } - return Optional.of(config -> new InternalEngine(config)); + @Override + @SuppressWarnings("unchecked") + public CompositeIndexingExecutionEngine indexingEngine( + MapperService mapperService, + ShardPath shardPath, + IndexSettings indexSettings + ) { + return new CompositeIndexingExecutionEngine(dataFormatPlugins, indexSettings, mapperService, shardPath); } /** - * Receives {@link DataFormatPlugin} instances discovered during node bootstrap. - * When multiple plugins declare the same {@link DataFormat#name()}, the one with - * the highest {@link DataFormat#priority()} is retained. + * Returns the discovered data format plugins keyed by format name. * - * @param plugins the discovered data format plugins + * @return unmodifiable map of format name to plugin */ - public void onDiscovery(Collection plugins) { - Map resolved = new HashMap<>(); - for (DataFormatPlugin plugin : plugins) { - String formatName = plugin.getDataFormat().name(); - DataFormatPlugin existing = resolved.get(formatName); - if (existing == null || plugin.getDataFormat().priority() > existing.getDataFormat().priority()) { - resolved.put(formatName, plugin); - } - } - this.dataFormatPlugins = Map.copyOf(resolved); + public Map getDataFormatPlugins() { + return dataFormatPlugins; } } diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java index 56997cf7015ee..af313360fe2f1 100644 --- a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java @@ -8,8 +8,13 @@ package org.opensearch.composite; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.dataformat.DataFormatPlugin; import org.opensearch.index.engine.dataformat.DocumentInput; import org.opensearch.index.engine.dataformat.IndexingExecutionEngine; import org.opensearch.index.engine.dataformat.Merger; @@ -17,6 +22,8 @@ import org.opensearch.index.engine.dataformat.RefreshResult; import org.opensearch.index.engine.dataformat.Writer; import org.opensearch.index.engine.exec.Segment; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.shard.ShardPath; import java.io.IOException; import java.util.ArrayList; @@ -41,50 +48,120 @@ * @opensearch.experimental */ @ExperimentalApi -public class CompositeIndexingExecutionEngine implements IndexingExecutionEngine { +public class CompositeIndexingExecutionEngine implements IndexingExecutionEngine { + + private static final Logger logger = LogManager.getLogger(CompositeIndexingExecutionEngine.class); - private final List> engines; private final IndexingExecutionEngine primaryEngine; + private final List> secondaryEngines; + private final List> allEngines; private final CompositeDataFormat compositeDataFormat; /** - * Constructs a CompositeIndexingExecutionEngine from the given per-format engines. + * Constructs a CompositeIndexingExecutionEngine by reading index settings to + * determine the primary and secondary data formats, validating that all configured + * formats are registered, and creating per-format engines via the discovered + * {@link DataFormatPlugin} instances. *

- * Identifies the primary engine by matching {@code getDataFormat().name()} to the - * given {@code primaryFormatName}. Builds a {@link CompositeDataFormat} from the - * union of all engines' supported field type capabilities. + * The primary engine is the authoritative format used for merge operations and + * commit coordination. Secondary engines receive writes alongside the primary but + * are not used as the merge authority. * - * @param engines the list of per-format indexing execution engines - * @param primaryFormatName the name of the primary data format - * @throws IllegalArgumentException if no engine matches the primary format name + * @param dataFormatPlugins the discovered data format plugins keyed by format name + * @param indexSettings the index settings containing composite configuration + * @param mapperService the mapper service for field mapping resolution + * @param shardPath the shard path for file storage + * @throws IllegalStateException if composite indexing is not enabled + * @throws IllegalArgumentException if any configured format is not registered */ - public CompositeIndexingExecutionEngine(List> engines, String primaryFormatName) { - this.engines = List.copyOf(Objects.requireNonNull(engines, "engines must not be null")); - Objects.requireNonNull(primaryFormatName, "primaryFormatName must not be null"); - - IndexingExecutionEngine foundPrimary = null; - for (IndexingExecutionEngine engine : this.engines) { - if (engine.getDataFormat().name().equals(primaryFormatName)) { - foundPrimary = engine; - break; - } + public CompositeIndexingExecutionEngine( + Map dataFormatPlugins, + IndexSettings indexSettings, + MapperService mapperService, + ShardPath shardPath + ) { + Objects.requireNonNull(dataFormatPlugins, "dataFormatPlugins must not be null"); + Objects.requireNonNull(indexSettings, "indexSettings must not be null"); + + Settings settings = indexSettings.getSettings(); + boolean compositeEnabled = CompositeEnginePlugin.COMPOSITE_ENABLED.get(settings); + if (compositeEnabled == false) { + throw new IllegalStateException( + "Composite indexing is not enabled for index [" + indexSettings.getIndex().getName() + "]" + ); } - if (foundPrimary == null) { - throw new IllegalArgumentException("No engine found matching primary format name [" + primaryFormatName + "]"); + + String primaryFormatName = CompositeEnginePlugin.PRIMARY_DATA_FORMAT.get(settings); + List secondaryFormatNames = CompositeEnginePlugin.SECONDARY_DATA_FORMATS.get(settings); + + validateFormatsRegistered(dataFormatPlugins, primaryFormatName, secondaryFormatNames); + + DataFormatPlugin primaryPlugin = dataFormatPlugins.get(primaryFormatName); + this.primaryEngine = primaryPlugin.indexingEngine(mapperService, shardPath, indexSettings); + + List> secondaries = new ArrayList<>(); + for (String secondaryName : secondaryFormatNames) { + if (secondaryName.equals(primaryFormatName)) { + logger.warn("Secondary data format [{}] is the same as primary, skipping duplicate", secondaryName); + continue; + } + DataFormatPlugin secondaryPlugin = dataFormatPlugins.get(secondaryName); + secondaries.add(secondaryPlugin.indexingEngine(mapperService, shardPath, indexSettings)); } - this.primaryEngine = foundPrimary; + this.secondaryEngines = List.copyOf(secondaries); + + List> all = new ArrayList<>(); + all.add(this.primaryEngine); + all.addAll(this.secondaryEngines); + this.allEngines = List.copyOf(all); List allFormats = new ArrayList<>(); - for (IndexingExecutionEngine engine : this.engines) { + for (IndexingExecutionEngine engine : this.allEngines) { allFormats.add(engine.getDataFormat()); } - this.compositeDataFormat = new CompositeDataFormat(allFormats, foundPrimary.getDataFormat()); + this.compositeDataFormat = new CompositeDataFormat(allFormats, this.primaryEngine.getDataFormat()); + } + + /** + * Validates that the primary and all secondary data format plugins are registered. + * + * @param dataFormatPlugins the discovered data format plugins keyed by format name + * @param primaryFormatName the configured primary format name + * @param secondaryFormatNames the configured secondary format names + * @throws IllegalArgumentException if any configured format is not registered + */ + static void validateFormatsRegistered( + Map dataFormatPlugins, + String primaryFormatName, + List secondaryFormatNames + ) { + if (dataFormatPlugins.containsKey(primaryFormatName) == false) { + throw new IllegalArgumentException( + "Primary data format [" + + primaryFormatName + + "] is not registered on this node. Available formats: " + + dataFormatPlugins.keySet() + ); + } + for (String secondaryName : secondaryFormatNames) { + if (secondaryName.equals(primaryFormatName)) { + continue; + } + if (dataFormatPlugins.containsKey(secondaryName) == false) { + throw new IllegalArgumentException( + "Secondary data format [" + + secondaryName + + "] is not registered on this node. Available formats: " + + dataFormatPlugins.keySet() + ); + } + } } @Override public Writer createWriter(long writerGeneration) { Map> writerMap = new LinkedHashMap<>(); - for (IndexingExecutionEngine engine : engines) { + for (IndexingExecutionEngine engine : allEngines) { Writer writer = engine.createWriter(writerGeneration); writerMap.put(engine.getDataFormat(), writer); } @@ -99,7 +176,7 @@ public Merger getMerger() { @Override public RefreshResult refresh(RefreshInput refreshInput) throws IOException { List allSegments = new ArrayList<>(); - for (IndexingExecutionEngine engine : engines) { + for (IndexingExecutionEngine engine : allEngines) { RefreshResult result = engine.refresh(refreshInput); allSegments.addAll(result.refreshedSegments()); } @@ -107,14 +184,14 @@ public RefreshResult refresh(RefreshInput refreshInput) throws IOException { } @Override - public DataFormat getDataFormat() { + public CompositeDataFormat getDataFormat() { return compositeDataFormat; } @Override public long getNativeBytesUsed() { long total = 0; - for (IndexingExecutionEngine engine : engines) { + for (IndexingExecutionEngine engine : allEngines) { total += engine.getNativeBytesUsed(); } return total; @@ -122,7 +199,7 @@ public long getNativeBytesUsed() { @Override public void deleteFiles(Map> filesToDelete) throws IOException { - for (IndexingExecutionEngine engine : engines) { + for (IndexingExecutionEngine engine : allEngines) { engine.deleteFiles(filesToDelete); } } @@ -130,10 +207,37 @@ public void deleteFiles(Map> filesToDelete) throws IO @Override public CompositeDocumentInput newDocumentInput() { Map> inputMap = new HashMap<>(); - for (IndexingExecutionEngine engine : engines) { + for (IndexingExecutionEngine engine : allEngines) { DocumentInput input = engine.newDocumentInput(); inputMap.put(engine.getDataFormat(), input); } return new CompositeDocumentInput(inputMap); } + + /** + * Returns the primary indexing execution engine. + * + * @return the primary engine + */ + public IndexingExecutionEngine getPrimaryEngine() { + return primaryEngine; + } + + /** + * Returns an unmodifiable list of secondary indexing execution engines. + * + * @return the secondary engines + */ + public List> getSecondaryEngines() { + return secondaryEngines; + } + + /** + * Returns an unmodifiable list of all engines (primary first, then secondaries). + * + * @return all engines + */ + public List> getAllEngines() { + return allEngines; + } } diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java index 9227d6bb9e06c..35f798aedd3fc 100644 --- a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java @@ -22,6 +22,10 @@ import java.util.Collections; import java.util.Map; import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * A composite {@link Writer} that wraps one {@link Writer} per registered data format @@ -35,10 +39,10 @@ * @opensearch.experimental */ @ExperimentalApi -public class CompositeWriter implements Writer { +public class CompositeWriter implements Writer, Lock { private static final Logger logger = LogManager.getLogger(CompositeWriter.class); - + private final ReentrantLock lock; private final Map> writers; /** @@ -48,13 +52,14 @@ public class CompositeWriter implements Writer { */ public CompositeWriter(Map> writers) { this.writers = Collections.unmodifiableMap(Objects.requireNonNull(writers, "writers must not be null")); + this.lock = new ReentrantLock(); } @SuppressWarnings("unchecked") @Override public WriteResult addDoc(CompositeDocumentInput doc) throws IOException { Map perFormatInputs = doc.getFinalInput(); - WriteResult lastResult = null; + WriteResult lastResult; for (Map.Entry> entry : writers.entrySet()) { DataFormat format = entry.getKey(); Writer writer = entry.getValue(); @@ -64,12 +69,18 @@ public WriteResult addDoc(CompositeDocumentInput doc) throws IOException { continue; } lastResult = ((Writer>) writer).addDoc((DocumentInput) input); - if (lastResult.success() == false) { - return lastResult; + + switch (lastResult) { + case WriteResult.Success s -> logger.debug("Successfully added document [{}] in [{}]", s, format); + case WriteResult.Failure f -> { + logger.debug("Failed to add document [{}] in [{}]", f, format); + return lastResult; + } } } + if (lastResult == null) { - return new WriteResult(true, null, -1L, -1L, -1L); + return new WriteResult.Success(false, null, -1L); } return lastResult; } @@ -101,4 +112,34 @@ public void close() { } } } + + @Override + public void lock() { + lock.lock(); + } + + @Override + public void lockInterruptibly() throws InterruptedException { + lock.lockInterruptibly(); + } + + @Override + public boolean tryLock() { + return lock.tryLock(); + } + + @Override + public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + return lock.tryLock(time, unit); + } + + @Override + public void unlock() { + lock.unlock(); + } + + @Override + public Condition newCondition() { + throw new UnsupportedOperationException(); + } } diff --git a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDataFormatTests.java b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDataFormatTests.java deleted file mode 100644 index e8bc784bc7b4d..0000000000000 --- a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDataFormatTests.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.composite; - -import org.opensearch.index.engine.dataformat.DataFormat; -import org.opensearch.index.engine.dataformat.FieldTypeCapabilities; -import org.opensearch.test.OpenSearchTestCase; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -/** - * Tests for {@link CompositeDataFormat}. - */ -public class CompositeDataFormatTests extends OpenSearchTestCase { - - public void testNameReturnsComposite() { - DataFormat primary = createMockDataFormat("lucene", 1, Set.of()); - CompositeDataFormat format = new CompositeDataFormat(List.of(primary), primary); - assertEquals("composite", format.name()); - } - - public void testPriorityReturnsMaxValue() { - DataFormat primary = createMockDataFormat("lucene", 1, Set.of()); - CompositeDataFormat format = new CompositeDataFormat(List.of(primary), primary); - assertEquals(Long.MAX_VALUE, format.priority()); - } - - public void testSupportedFieldsDelegatesToPrimaryFormat() { - FieldTypeCapabilities primaryCap = new FieldTypeCapabilities("keyword", Set.of(FieldTypeCapabilities.Capability.FULL_TEXT_SEARCH)); - FieldTypeCapabilities secondaryCap = new FieldTypeCapabilities( - "integer", - Set.of(FieldTypeCapabilities.Capability.COLUMNAR_STORAGE) - ); - DataFormat primary = createMockDataFormat("lucene", 1, Set.of(primaryCap)); - DataFormat secondary = createMockDataFormat("parquet", 2, Set.of(secondaryCap)); - - CompositeDataFormat composite = new CompositeDataFormat(List.of(primary, secondary), primary); - - // supportedFields() should return only the primary's fields, not the union - assertEquals(1, composite.supportedFields().size()); - assertTrue(composite.supportedFields().contains(primaryCap)); - assertFalse(composite.supportedFields().contains(secondaryCap)); - } - - public void testGetPrimaryDataFormat() { - DataFormat lucene = createMockDataFormat("lucene", 1, Set.of()); - DataFormat parquet = createMockDataFormat("parquet", 2, Set.of()); - CompositeDataFormat composite = new CompositeDataFormat(List.of(lucene, parquet), lucene); - assertSame(lucene, composite.getPrimaryDataFormat()); - } - - public void testGetDataFormats() { - DataFormat lucene = createMockDataFormat("lucene", 1, Set.of()); - DataFormat parquet = createMockDataFormat("parquet", 2, Set.of()); - CompositeDataFormat composite = new CompositeDataFormat(List.of(lucene, parquet), lucene); - assertEquals(2, composite.getDataFormats().size()); - assertSame(lucene, composite.getDataFormats().get(0)); - assertSame(parquet, composite.getDataFormats().get(1)); - } - - public void testGetDataFormatsIsUnmodifiable() { - DataFormat primary = createMockDataFormat("lucene", 1, Set.of()); - CompositeDataFormat composite = new CompositeDataFormat(List.of(primary), primary); - expectThrows(UnsupportedOperationException.class, () -> composite.getDataFormats().add(createMockDataFormat("x", 0, Set.of()))); - } - - public void testConstructorRejectsNullDataFormats() { - DataFormat primary = createMockDataFormat("lucene", 1, Set.of()); - expectThrows(NullPointerException.class, () -> new CompositeDataFormat(null, primary)); - } - - public void testConstructorRejectsNullPrimaryDataFormat() { - DataFormat format = createMockDataFormat("lucene", 1, Set.of()); - expectThrows(NullPointerException.class, () -> new CompositeDataFormat(List.of(format), null)); - } - - public void testToString() { - DataFormat primary = createMockDataFormat("lucene", 1, Set.of()); - CompositeDataFormat composite = new CompositeDataFormat(List.of(primary), primary); - String str = composite.toString(); - assertTrue(str.contains("CompositeDataFormat")); - assertTrue(str.contains("dataFormats=")); - assertTrue(str.contains("primaryDataFormat=")); - } - - // Feature: composite-indexing-plugin, Property 2: supportedFields delegates to primary format - // Validates: Requirements 2.4 - public void testPropertySupportedFieldsDelegatesToPrimary() { - String[] fieldTypes = { "keyword", "text", "integer", "long", "float", "double", "date", "boolean", "geo_point", "binary" }; - FieldTypeCapabilities.Capability[] allCapabilities = FieldTypeCapabilities.Capability.values(); - - for (int iteration = 0; iteration < 100; iteration++) { - int numFormats = randomIntBetween(1, 5); - List formats = new ArrayList<>(); - - for (int f = 0; f < numFormats; f++) { - Set formatCaps = new HashSet<>(); - int numCaps = randomIntBetween(0, 4); - for (int c = 0; c < numCaps; c++) { - String fieldType = randomFrom(fieldTypes); - Set caps = new HashSet<>(); - int capCount = randomIntBetween(1, allCapabilities.length); - for (int k = 0; k < capCount; k++) { - caps.add(randomFrom(allCapabilities)); - } - formatCaps.add(new FieldTypeCapabilities(fieldType, caps)); - } - formats.add(createMockDataFormat("format-" + f, f, formatCaps)); - } - - DataFormat primary = formats.get(randomIntBetween(0, formats.size() - 1)); - CompositeDataFormat composite = new CompositeDataFormat(formats, primary); - - // supportedFields() must equal the primary's supportedFields() - assertEquals( - "supportedFields() must match primary format's fields [iteration=" + iteration + "]", - primary.supportedFields(), - composite.supportedFields() - ); - } - } - - private DataFormat createMockDataFormat(String name, long priority, Set supportedFields) { - return new DataFormat() { - @Override - public String name() { - return name; - } - - @Override - public long priority() { - return priority; - } - - @Override - public Set supportedFields() { - return supportedFields; - } - }; - } -} diff --git a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java deleted file mode 100644 index 148865e94160d..0000000000000 --- a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.composite; - -import org.opensearch.Version; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.common.settings.Setting; -import org.opensearch.common.settings.Settings; -import org.opensearch.index.IndexSettings; -import org.opensearch.index.engine.EngineFactory; -import org.opensearch.index.engine.dataformat.DataFormat; -import org.opensearch.index.engine.dataformat.DataFormatPlugin; -import org.opensearch.index.engine.dataformat.DocumentInput; -import org.opensearch.index.engine.dataformat.FieldTypeCapabilities; -import org.opensearch.index.engine.dataformat.IndexingExecutionEngine; -import org.opensearch.index.mapper.MapperService; -import org.opensearch.index.shard.ShardPath; -import org.opensearch.test.OpenSearchTestCase; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.Set; - -/** - * Tests for {@link CompositeEnginePlugin}. - */ -public class CompositeEnginePluginTests extends OpenSearchTestCase { - - // --- getSettings tests --- - - public void testGetSettingsReturnsBothSettings() { - CompositeEnginePlugin plugin = new CompositeEnginePlugin(); - List> settings = plugin.getSettings(); - assertEquals(2, settings.size()); - assertTrue(settings.contains(CompositeEnginePlugin.COMPOSITE_ENABLED)); - assertTrue(settings.contains(CompositeEnginePlugin.PRIMARY_DATA_FORMAT)); - } - - // --- getEngineFactory tests --- - - public void testGetEngineFactoryReturnsEmptyWhenCompositeDisabled() { - CompositeEnginePlugin plugin = new CompositeEnginePlugin(); - IndexSettings indexSettings = createIndexSettings(false, "lucene"); - Optional factory = plugin.getEngineFactory(indexSettings); - assertTrue(factory.isEmpty()); - } - - public void testGetEngineFactoryReturnsFactoryWhenCompositeEnabledAndPrimaryFormatMatches() { - CompositeEnginePlugin plugin = new CompositeEnginePlugin(); - plugin.onDiscovery(List.of(createMockDataFormatPlugin("lucene", 1))); - IndexSettings indexSettings = createIndexSettings(true, "lucene"); - Optional factory = plugin.getEngineFactory(indexSettings); - assertTrue(factory.isPresent()); - } - - public void testGetEngineFactoryThrowsWhenPrimaryFormatDoesNotMatch() { - CompositeEnginePlugin plugin = new CompositeEnginePlugin(); - plugin.onDiscovery(List.of(createMockDataFormatPlugin("lucene", 1))); - IndexSettings indexSettings = createIndexSettings(true, "parquet"); - IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> plugin.getEngineFactory(indexSettings)); - assertTrue(ex.getMessage().contains("parquet")); - assertTrue(ex.getMessage().contains("does not match any registered DataFormatPlugin")); - } - - public void testGetEngineFactoryReturnsEmptyWhenNoPluginsDiscoveredAndCompositeEnabled() { - CompositeEnginePlugin plugin = new CompositeEnginePlugin(); - // No onDiscovery call — dataFormatPlugins is empty - IndexSettings indexSettings = createIndexSettings(true, "lucene"); - Optional factory = plugin.getEngineFactory(indexSettings); - assertTrue(factory.isEmpty()); - } - - // --- onDiscovery tests --- - - public void testOnDiscoveryWithEmptyCollection() { - CompositeEnginePlugin plugin = new CompositeEnginePlugin(); - plugin.onDiscovery(Collections.emptyList()); - // Composite enabled but no plugins → should return empty - IndexSettings indexSettings = createIndexSettings(true, "lucene"); - Optional factory = plugin.getEngineFactory(indexSettings); - assertTrue(factory.isEmpty()); - } - - public void testOnDiscoveryWithOnePlugin() { - CompositeEnginePlugin plugin = new CompositeEnginePlugin(); - plugin.onDiscovery(List.of(createMockDataFormatPlugin("lucene", 1))); - IndexSettings indexSettings = createIndexSettings(true, "lucene"); - Optional factory = plugin.getEngineFactory(indexSettings); - assertTrue(factory.isPresent()); - } - - public void testOnDiscoveryWithMultiplePlugins() { - CompositeEnginePlugin plugin = new CompositeEnginePlugin(); - plugin.onDiscovery(List.of(createMockDataFormatPlugin("lucene", 1), createMockDataFormatPlugin("parquet", 2))); - // Both formats should be available - IndexSettings luceneSettings = createIndexSettings(true, "lucene"); - assertTrue(plugin.getEngineFactory(luceneSettings).isPresent()); - - IndexSettings parquetSettings = createIndexSettings(true, "parquet"); - assertTrue(plugin.getEngineFactory(parquetSettings).isPresent()); - } - - public void testOnDiscoveryDuplicateFormatNameKeepsHighestPriority() { - CompositeEnginePlugin plugin = new CompositeEnginePlugin(); - DataFormatPlugin lowPriority = createMockDataFormatPlugin("lucene", 1); - DataFormatPlugin highPriority = createMockDataFormatPlugin("lucene", 100); - plugin.onDiscovery(List.of(lowPriority, highPriority)); - - // The plugin should still work with "lucene" format — the high-priority one wins - IndexSettings indexSettings = createIndexSettings(true, "lucene"); - assertTrue(plugin.getEngineFactory(indexSettings).isPresent()); - } - - public void testOnDiscoveryDuplicateFormatNameKeepsHighestPriorityRegardlessOfOrder() { - CompositeEnginePlugin plugin = new CompositeEnginePlugin(); - DataFormatPlugin highPriority = createMockDataFormatPlugin("lucene", 100); - DataFormatPlugin lowPriority = createMockDataFormatPlugin("lucene", 1); - // High priority first, then low priority — should still keep high priority - plugin.onDiscovery(List.of(highPriority, lowPriority)); - - IndexSettings indexSettings = createIndexSettings(true, "lucene"); - assertTrue(plugin.getEngineFactory(indexSettings).isPresent()); - } - - // Feature: composite-indexing-plugin, Property 10: Primary format validation - // Validates: Requirements 7.3 - // - // For any primary format name and any set of registered DataFormatPlugin instances, - // the CompositeEnginePlugin should accept the configuration if and only if the - // primary format name matches a registered plugin's DataFormat.name(). - public void testPropertyPrimaryFormatValidation() { - int iterations = 100; - for (int i = 0; i < iterations; i++) { - // Generate a random set of format names (1-5 formats) - int formatCount = randomIntBetween(1, 5); - String[] formatNames = new String[formatCount]; - for (int j = 0; j < formatCount; j++) { - formatNames[j] = randomAlphaOfLength(randomIntBetween(3, 10)); - } - - // Register plugins for all generated format names - CompositeEnginePlugin plugin = new CompositeEnginePlugin(); - List plugins = new ArrayList<>(); - for (String name : formatNames) { - plugins.add(createMockDataFormatPlugin(name, randomLongBetween(1, 1000))); - } - plugin.onDiscovery(plugins); - - // Case 1: Primary format matches one of the registered formats → factory is present - String matchingFormat = formatNames[randomIntBetween(0, formatCount - 1)]; - IndexSettings matchSettings = createIndexSettings(true, matchingFormat); - Optional matchResult = plugin.getEngineFactory(matchSettings); - assertTrue( - "Expected factory to be present when primary format [" - + matchingFormat - + "] matches a registered plugin (iteration " - + i - + ")", - matchResult.isPresent() - ); - - // Case 2: Primary format does NOT match any registered format → throws IllegalArgumentException - String nonMatchingFormat = randomValueOtherThanMany( - candidate -> Arrays.asList(formatNames).contains(candidate), - () -> randomAlphaOfLength(randomIntBetween(3, 10)) - ); - IndexSettings noMatchSettings = createIndexSettings(true, nonMatchingFormat); - IllegalArgumentException ex = expectThrows( - IllegalArgumentException.class, - "Expected IllegalArgumentException when primary format [" - + nonMatchingFormat - + "] does not match any registered plugin (iteration " - + i - + ")", - () -> plugin.getEngineFactory(noMatchSettings) - ); - assertTrue("Error message should contain the non-matching format name", ex.getMessage().contains(nonMatchingFormat)); - } - } - - // --- Helper methods --- - - private IndexSettings createIndexSettings(boolean compositeEnabled, String primaryFormat) { - Settings settings = Settings.builder() - .put("index.composite.enabled", compositeEnabled) - .put("index.composite.primary_data_format", primaryFormat) - .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .build(); - IndexMetadata indexMetadata = IndexMetadata.builder("test-index").settings(settings).build(); - return new IndexSettings(indexMetadata, Settings.EMPTY); - } - - private DataFormatPlugin createMockDataFormatPlugin(String formatName, long priority) { - DataFormat dataFormat = new DataFormat() { - @Override - public String name() { - return formatName; - } - - @Override - public long priority() { - return priority; - } - - @Override - public Set supportedFields() { - return Set.of(new FieldTypeCapabilities("keyword", Set.of(FieldTypeCapabilities.Capability.FULL_TEXT_SEARCH))); - } - }; - - return new DataFormatPlugin() { - @Override - public DataFormat getDataFormat() { - return dataFormat; - } - - @Override - @SuppressWarnings("unchecked") - public > IndexingExecutionEngine indexingEngine( - MapperService mapperService, - ShardPath shardPath, - IndexSettings indexSettings - ) { - return null; // Not needed for plugin-level tests - } - }; - } -}