diff --git a/.gitignore b/.gitignore index 0a784701375d9..599ea7878a1c4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ .claude CLAUDE.md .cursor* +.kiro* # intellij files .idea/ diff --git a/sandbox/libs/composite-engine-lib/build.gradle b/sandbox/libs/composite-engine-lib/build.gradle new file mode 100644 index 0000000000000..1fa0c39d443f3 --- /dev/null +++ b/sandbox/libs/composite-engine-lib/build.gradle @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Shared concurrent queue utilities for the composite indexing engine. + * No external dependencies — pure Java concurrency primitives. + */ + +dependencies { +} + +testingConventions.enabled = false + +tasks.named('forbiddenApisMain').configure { + replaceSignatureFiles 'jdk-signatures' +} diff --git a/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/ConcurrentQueue.java b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/ConcurrentQueue.java new file mode 100644 index 0000000000000..08eee90a5f54a --- /dev/null +++ b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/ConcurrentQueue.java @@ -0,0 +1,140 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite.queue; + +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Predicate; +import java.util.function.Supplier; + +/** + * A striped concurrent queue that distributes entries across multiple internal + * queues using thread-affinity-based hashing. This reduces contention by allowing + * concurrent threads to operate on different stripes without blocking each other. + * + * @param the type of elements held in this queue + * @opensearch.experimental + */ +public final class ConcurrentQueue { + + static final int MIN_CONCURRENCY = 1; + static final int MAX_CONCURRENCY = 256; + + private final int concurrency; + private final Lock[] locks; + private final Queue[] queues; + private final Supplier> queueSupplier; + + ConcurrentQueue(Supplier> queueSupplier, int concurrency) { + if (concurrency < MIN_CONCURRENCY || concurrency > MAX_CONCURRENCY) { + throw new IllegalArgumentException( + "concurrency must be in [" + MIN_CONCURRENCY + ", " + MAX_CONCURRENCY + "], got " + concurrency + ); + } + this.concurrency = concurrency; + this.queueSupplier = queueSupplier; + locks = new Lock[concurrency]; + @SuppressWarnings({ "rawtypes", "unchecked" }) + Queue[] queues = new Queue[concurrency]; + this.queues = queues; + for (int i = 0; i < concurrency; ++i) { + locks[i] = new ReentrantLock(); + queues[i] = queueSupplier.get(); + } + } + + void add(T entry) { + // Seed the order in which to look at entries based on the current thread. This helps distribute + // entries across queues and gives a bit of thread affinity between entries and threads, which + // can't hurt. + final int threadHash = Thread.currentThread().hashCode() & 0xFFFF; + for (int i = 0; i < concurrency; ++i) { + final int index = (threadHash + i) % concurrency; + final Lock lock = locks[index]; + final Queue queue = queues[index]; + if (lock.tryLock()) { + try { + queue.add(entry); + return; + } finally { + lock.unlock(); + } + } + } + final int index = threadHash % concurrency; + final Lock lock = locks[index]; + final Queue queue = queues[index]; + lock.lock(); + try { + queue.add(entry); + } finally { + lock.unlock(); + } + } + + T poll(Predicate predicate) { + final int threadHash = Thread.currentThread().hashCode() & 0xFFFF; + for (int i = 0; i < concurrency; ++i) { + final int index = (threadHash + i) % concurrency; + final Lock lock = locks[index]; + final Queue queue = queues[index]; + if (lock.tryLock()) { + try { + Iterator it = queue.iterator(); + while (it.hasNext()) { + T entry = it.next(); + if (predicate.test(entry)) { + it.remove(); + return entry; + } + } + } finally { + lock.unlock(); + } + } + } + for (int i = 0; i < concurrency; ++i) { + final int index = (threadHash + i) % concurrency; + final Lock lock = locks[index]; + final Queue queue = queues[index]; + lock.lock(); + try { + Iterator it = queue.iterator(); + while (it.hasNext()) { + T entry = it.next(); + if (predicate.test(entry)) { + it.remove(); + return entry; + } + } + } finally { + lock.unlock(); + } + } + return null; + } + + boolean remove(T entry) { + for (int i = 0; i < concurrency; ++i) { + final Lock lock = locks[i]; + final Queue queue = queues[i]; + lock.lock(); + try { + if (queue.remove(entry)) { + return true; + } + } finally { + lock.unlock(); + } + } + return false; + } +} diff --git a/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/LockableConcurrentQueue.java b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/LockableConcurrentQueue.java new file mode 100644 index 0000000000000..a65107f487c49 --- /dev/null +++ b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/LockableConcurrentQueue.java @@ -0,0 +1,65 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite.queue; + +import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.function.Supplier; + +/** + * A concurrent queue wrapper that adds lock-and-poll / add-and-unlock semantics + * on top of {@link ConcurrentQueue}. Entries must implement {@link Lock} so that + * they can be atomically locked when polled and unlocked when returned. + *

+ * This is used by the composite writer pool to ensure that a writer is locked + * before it is handed out and unlocked when it is returned. + * + * @param the type of lockable elements held in this queue + * @opensearch.experimental + */ +public final class LockableConcurrentQueue { + + private final ConcurrentQueue queue; + private final AtomicInteger addAndUnlockCounter = new AtomicInteger(); + + public LockableConcurrentQueue(Supplier> queueSupplier, int concurrency) { + this.queue = new ConcurrentQueue<>(queueSupplier, concurrency); + } + + /** + * Lock an entry, and poll it from the queue, in that order. If no entry can be found and locked, + * {@code null} is returned. + */ + public T lockAndPoll() { + int addAndUnlockCount; + do { + addAndUnlockCount = addAndUnlockCounter.get(); + T entry = queue.poll(Lock::tryLock); + if (entry != null) { + return entry; + } + // If an entry has been added to the queue in the meantime, try again. + } while (addAndUnlockCount != addAndUnlockCounter.get()); + + return null; + } + + /** Remove an entry from the queue. */ + public boolean remove(T entry) { + return queue.remove(entry); + } + + /** Add an entry to the queue and unlock it, in that order. */ + public void addAndUnlock(T entry) { + queue.add(entry); + entry.unlock(); + addAndUnlockCounter.incrementAndGet(); + } +} diff --git a/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/package-info.java b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/package-info.java new file mode 100644 index 0000000000000..9e52cbabf073b --- /dev/null +++ b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/package-info.java @@ -0,0 +1,14 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Concurrent queue utilities for the composite indexing engine. + * + * @opensearch.experimental + */ +package org.opensearch.composite.queue; diff --git a/sandbox/plugins/composite-engine/build.gradle b/sandbox/plugins/composite-engine/build.gradle new file mode 100644 index 0000000000000..8aa122a1e1e8e --- /dev/null +++ b/sandbox/plugins/composite-engine/build.gradle @@ -0,0 +1,18 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +opensearchplugin { + description = 'Composite indexing engine plugin that orchestrates multi-format indexing across multiple data format engines.' + classname = 'org.opensearch.composite.CompositeEnginePlugin' +} + +dependencies { + api project(':sandbox:libs:composite-engine-lib') + compileOnly project(':server') + testImplementation project(':test:framework') +} diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java new file mode 100644 index 0000000000000..c34716c0544c5 --- /dev/null +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java @@ -0,0 +1,85 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.dataformat.FieldTypeCapabilities; + +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * A composite {@link DataFormat} that wraps multiple per-format {@link DataFormat} instances. + * Each constituent format retains its own {@link FieldTypeCapabilities} — field routing is + * handled per-format by {@link CompositeDocumentInput}, not by this class. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class CompositeDataFormat implements DataFormat { + + private final List dataFormats; + private final DataFormat primaryDataFormat; + + /** + * Constructs a CompositeDataFormat from the given list of data formats. + * + * @param dataFormats the constituent data formats + * @param primaryDataFormat the primary data format (must be contained in {@code dataFormats}) + */ + public CompositeDataFormat(List dataFormats, DataFormat primaryDataFormat) { + this.dataFormats = dataFormats; + this.primaryDataFormat = primaryDataFormat; + } + + public CompositeDataFormat() { + this.dataFormats = List.of(); + this.primaryDataFormat = null; + } + + /** + * Returns the primary data format. + * + * @return the primary data format + */ + public DataFormat getPrimaryDataFormat() { + return primaryDataFormat; + } + + /** + * Returns the list of constituent data formats. + * + * @return the data formats + */ + public List getDataFormats() { + return dataFormats; + } + + @Override + public String name() { + return "composite"; + } + + @Override + public long priority() { + return Long.MAX_VALUE; + } + + @Override + public Set supportedFields() { + return primaryDataFormat.supportedFields(); + } + + @Override + public String toString() { + return "CompositeDataFormat{" + "dataFormats=" + dataFormats + ", primaryDataFormat=" + primaryDataFormat + '}'; + } +} diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormatWriterPool.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormatWriterPool.java new file mode 100644 index 0000000000000..467812eeb1624 --- /dev/null +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormatWriterPool.java @@ -0,0 +1,138 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.composite.queue.LockableConcurrentQueue; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.function.Supplier; + +public class CompositeWriterPool implements Iterable, Closeable { + + private final Set writers; + private final LockableConcurrentQueue availableWriters; + private final Supplier writerSupplier; + private volatile boolean closed; + + public CompositeWriterPool( + Supplier writerSupplier, + Supplier> queueSupplier, + int concurrency + ) { + this.writers = Collections.newSetFromMap(new IdentityHashMap<>()); + this.writerSupplier = writerSupplier; + this.availableWriters = new LockableConcurrentQueue<>(queueSupplier, concurrency); + } + + /** + * This method is used by CompositeIndexingExecutionEngine to grab a writer from the pool to perform an indexing + * operation. + * + * @return a pooled CompositeWriter if available, or a newly created instance if none are available + */ + public CompositeWriter getAndLock() { + ensureOpen(); + CompositeWriter CompositeWriter = availableWriters.lockAndPoll(); + return Objects.requireNonNullElseGet(CompositeWriter, this::fetchWriter); + } + + /** + * Create a new {@link CompositeWriter} to be added to this pool. + * + * @return a new instance of {@link CompositeWriter} + */ + private synchronized CompositeWriter fetchWriter() { + ensureOpen(); + CompositeWriter CompositeWriter = writerSupplier.get(); + CompositeWriter.lock(); + writers.add(CompositeWriter); + return CompositeWriter; + } + + /** + * Release the given {@link CompositeWriter} to this pool for reuse if it is currently managed by this + * pool. + * + * @param state {@link CompositeWriter} to release to the pool. + */ + public void releaseAndUnlock(CompositeWriter state) { + assert + !state.isFlushPending() && !state.isAborted() : + "CompositeWriter has pending flush: " + state.isFlushPending() + " aborted=" + state.isAborted(); + assert isRegistered(state) : "CompositeDocumentWriterPool doesn't know about this CompositeWriter"; + availableWriters.addAndUnlock(state); + } + + /** + * Lock and checkout all CompositeWriters from the pool for flush. + * + * @return Unmodifiable list of all CompositeWriters locked by current thread. + */ + public List checkoutAll() { + ensureOpen(); + List lockedWriters = new ArrayList<>(); + List checkedOutWriters = new ArrayList<>(); + for (CompositeWriter CompositeWriter : this) { + CompositeWriter.lock(); + lockedWriters.add(CompositeWriter); + } + synchronized (this) { + for (CompositeWriter CompositeWriter : lockedWriters) { + try { + // Release this writer if it’s no longer managed by this pool; otherwise, check it out. + if (isRegistered(CompositeWriter) && writers.remove(CompositeWriter)) { + availableWriters.remove(CompositeWriter); + CompositeWriter.setFlushPending(); + checkedOutWriters.add(CompositeWriter); + } + } finally { + CompositeWriter.unlock(); + } + } + } + return Collections.unmodifiableList(checkedOutWriters); + } + + /** + * Check if {@link CompositeWriter} is part of this pool. + * + * @param perThread {@link CompositeWriter} to validate. + * @return true if {@link CompositeWriter} is part of this pool, false otherwise. + */ + synchronized boolean isRegistered(CompositeWriter perThread) { + return writers.contains(perThread); + } + + private void ensureOpen() { + if (closed) { + throw new AlreadyClosedException("CompositeDocumentWriterPool is already closed"); + } + } + + @Override + public synchronized Iterator iterator() { + return List.copyOf(writers).iterator(); + } + + @Override + public void close() throws IOException { + this.closed = true; + } +} + diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDocumentInput.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDocumentInput.java new file mode 100644 index 0000000000000..2e614ed380d75 --- /dev/null +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDocumentInput.java @@ -0,0 +1,130 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.dataformat.DocumentInput; +import org.opensearch.index.engine.dataformat.FieldTypeCapabilities; +import org.opensearch.index.mapper.MappedFieldType; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * A composite {@link DocumentInput} that wraps one {@link DocumentInput} per registered + * data format and routes field additions to the appropriate per-format inputs based on + * {@link FieldTypeCapabilities}. + *

+ * Metadata operations ({@code setRowId}, {@code setVersion}, {@code setSeqNo}, + * {@code setPrimaryTerm}) are broadcast to all per-format inputs. Fields are routed + * only to formats whose {@link DataFormat#supportedFields()} includes a + * {@link FieldTypeCapabilities} matching the field's type name. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class CompositeDocumentInput implements DocumentInput> { + + private static final Logger logger = LogManager.getLogger(CompositeDocumentInput.class); + + private final Map> inputs; + private final Map> fieldTypeToFormats; + + /** + * Constructs a CompositeDocumentInput wrapping the given per-format inputs. + *

+ * Precomputes a routing map from field type name to the set of data formats + * that support that field type, based on each format's + * {@link DataFormat#supportedFields()}. + * + * @param inputs a map of data format to its corresponding document input + */ + public CompositeDocumentInput(Map> inputs) { + this.inputs = Collections.unmodifiableMap(Objects.requireNonNull(inputs, "inputs must not be null")); + this.fieldTypeToFormats = buildFieldTypeToFormatsMap(inputs); + } + + private static Map> buildFieldTypeToFormatsMap(Map> inputs) { + Map> routing = new HashMap<>(); + for (DataFormat format : inputs.keySet()) { + for (FieldTypeCapabilities ftc : format.supportedFields()) { + routing.computeIfAbsent(ftc.getFieldType(), k -> new HashSet<>()).add(format); + } + } + return Collections.unmodifiableMap(routing); + } + + @Override + public void addField(MappedFieldType fieldType, Object value) { + String typeName = fieldType.typeName(); + Set formats = fieldTypeToFormats.get(typeName); + if (formats == null || formats.isEmpty()) { + logger.debug("No data format supports field type [{}], field will be dropped", typeName); + return; + } + for (DataFormat format : formats) { + inputs.get(format).addField(fieldType, value); + } + } + + @Override + public void setRowId(String rowIdFieldName, long rowId) { + for (DocumentInput input : inputs.values()) { + input.setRowId(rowIdFieldName, rowId); + } + } + + @Override + public void setVersion(String fieldName, long version) { + for (DocumentInput input : inputs.values()) { + input.setVersion(fieldName, version); + } + } + + @Override + public void setSeqNo(String fieldName, long seqNo) { + for (DocumentInput input : inputs.values()) { + input.setSeqNo(fieldName, seqNo); + } + } + + @Override + public void setPrimaryTerm(String fieldName, long primaryTerm) { + for (DocumentInput input : inputs.values()) { + input.setPrimaryTerm(fieldName, primaryTerm); + } + } + + @Override + public Map getFinalInput() { + Map result = new HashMap<>(); + for (Map.Entry> entry : inputs.entrySet()) { + result.put(entry.getKey(), entry.getValue().getFinalInput()); + } + return Collections.unmodifiableMap(result); + } + + @Override + public void close() { + for (DocumentInput input : inputs.values()) { + try { + input.close(); + } catch (Exception e) { + logger.warn("Failed to close per-format DocumentInput", e); + } + } + } +} diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java new file mode 100644 index 0000000000000..a89477497ff02 --- /dev/null +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java @@ -0,0 +1,200 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.dataformat.DataFormatPlugin; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.plugins.ExtensiblePlugin; +import org.opensearch.plugins.Plugin; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Sandbox plugin that provides a {@link CompositeIndexingExecutionEngine} for + * orchestrating multi-format indexing. Discovers {@link DataFormatPlugin} instances + * during node bootstrap via the {@link ExtensiblePlugin} SPI and creates a composite + * engine when composite indexing is enabled for an index. + *

+ * Registers two index settings: + *

    + *
  • {@code index.composite.enabled} — activates composite indexing (default {@code false})
  • + *
  • {@code index.composite.primary_data_format} — designates the primary format (default {@code "lucene"})
  • + *
+ *

+ * Format plugins (e.g., Parquet) extend this plugin by declaring + * {@code extendedPlugins = ['composite-engine']} in their {@code build.gradle} + * and implementing {@link DataFormatPlugin}. The {@link ExtensiblePlugin} SPI + * discovers them automatically during node bootstrap. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class CompositeEnginePlugin extends Plugin implements ExtensiblePlugin, DataFormatPlugin { + + private static final Logger logger = LogManager.getLogger(CompositeEnginePlugin.class); + + /** + * Index setting to enable composite indexing for an index. + * When {@code true}, the composite engine orchestrates writes across all registered data formats. + * Validates that the primary data format is non-empty when enabled. + */ + public static final Setting COMPOSITE_ENABLED = Setting.boolSetting( + "index.composite.enabled", + false, + new Setting.Validator<>() { + @Override + public void validate(Boolean value) {} + + @Override + public void validate(Boolean enabled, Map, Object> settings) { + if (enabled) { + String primary = (String) settings.get(PRIMARY_DATA_FORMAT); + if (primary == null || primary.isEmpty()) { + throw new IllegalArgumentException( + "[index.composite.enabled] requires [index.composite.primary_data_format] to be set" + ); + } + } + } + + @Override + public Iterator> settings() { + return List.>of(PRIMARY_DATA_FORMAT, SECONDARY_DATA_FORMATS).iterator(); + } + }, + Setting.Property.IndexScope + ); + + /** + * Index setting that designates the primary data format for an index. + * The primary format is the authoritative format used for merge operations. + */ + public static final Setting PRIMARY_DATA_FORMAT = Setting.simpleString( + "index.composite.primary_data_format", + "lucene", + Setting.Property.IndexScope + ); + + /** + * Index setting that lists the secondary data formats for an index. + * Secondary formats receive writes alongside the primary but are not used + * as the merge authority. + */ + public static final Setting> SECONDARY_DATA_FORMATS = Setting.listSetting( + "index.composite.secondary_data_formats", + Collections.emptyList(), + s -> s, + Setting.Property.IndexScope + ); + + /** + * Discovered {@link DataFormatPlugin} instances keyed by format name. + * When multiple plugins declare the same format name, the one with the highest + * {@link DataFormat#priority()} is retained. + */ + private volatile Map dataFormatPlugins = Map.of(); + + /** Creates a new composite engine plugin. */ + public CompositeEnginePlugin() {} + + + @Override + public void loadExtensions(ExtensionLoader loader) { + List formatPlugins = loader.loadExtensions(DataFormatPlugin.class); + Map registry = new HashMap<>(); + for (DataFormatPlugin plugin : formatPlugins) { + DataFormat format = plugin.getDataFormat(); + if (format == null) { + logger.warn("DataFormatPlugin [{}] returned null DataFormat, skipping", plugin.getClass().getName()); + continue; + } + String name = format.name(); + DataFormatPlugin existing = registry.get(name); + if (existing != null) { + long existingPriority = existing.getDataFormat().priority(); + if (format.priority() <= existingPriority) { + logger.debug( + "Skipping DataFormatPlugin [{}] for format [{}] (priority {} <= existing {})", + plugin.getClass().getName(), + name, + format.priority(), + existingPriority + ); + continue; + } + logger.info( + "Replacing DataFormatPlugin for format [{}] (priority {} > existing {})", + name, + format.priority(), + existingPriority + ); + } + registry.put(name, plugin); + logger.info("Registered DataFormatPlugin [{}] for format [{}]", plugin.getClass().getName(), name); + } + this.dataFormatPlugins = Collections.unmodifiableMap(registry); + } + + + @Override + public List> getSettings() { + return List.of(COMPOSITE_ENABLED, PRIMARY_DATA_FORMAT, SECONDARY_DATA_FORMATS); + } + + @Override + public void onIndexModule(IndexModule indexModule) { + Settings settings = indexModule.getSettings(); + boolean compositeEnabled = COMPOSITE_ENABLED.get(settings); + if (compositeEnabled == false) { + return; + } + + String primaryFormatName = PRIMARY_DATA_FORMAT.get(settings); + List secondaryFormatNames = SECONDARY_DATA_FORMATS.get(settings); + CompositeIndexingExecutionEngine.validateFormatsRegistered(dataFormatPlugins, primaryFormatName, secondaryFormatNames); + } + + @Override + public DataFormat getDataFormat() { + // TODO: Dataformat for Composite is per index, while this one talks about cluster level. Switching it off for now + return null; + } + + @Override + @SuppressWarnings("unchecked") + public CompositeIndexingExecutionEngine indexingEngine( + MapperService mapperService, + ShardPath shardPath, + IndexSettings indexSettings + ) { + return new CompositeIndexingExecutionEngine(dataFormatPlugins, indexSettings, mapperService, shardPath); + } + + /** + * Returns the discovered data format plugins keyed by format name. + * + * @return unmodifiable map of format name to plugin + */ + public Map getDataFormatPlugins() { + return dataFormatPlugins; + } +} diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java new file mode 100644 index 0000000000000..af313360fe2f1 --- /dev/null +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java @@ -0,0 +1,243 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.dataformat.DataFormatPlugin; +import org.opensearch.index.engine.dataformat.DocumentInput; +import org.opensearch.index.engine.dataformat.IndexingExecutionEngine; +import org.opensearch.index.engine.dataformat.Merger; +import org.opensearch.index.engine.dataformat.RefreshInput; +import org.opensearch.index.engine.dataformat.RefreshResult; +import org.opensearch.index.engine.dataformat.Writer; +import org.opensearch.index.engine.exec.Segment; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.shard.ShardPath; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * A composite {@link IndexingExecutionEngine} that orchestrates indexing across + * multiple per-format engines behind a single interface. + *

+ * The engine delegates writer creation, refresh, file deletion, and document input + * creation to each per-format engine. A primary engine is designated based on the + * configured primary format name and is used for merge operations. + *

+ * The composite {@link DataFormat} exposed by this engine represents the union of + * all per-format supported field type capabilities. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class CompositeIndexingExecutionEngine implements IndexingExecutionEngine { + + private static final Logger logger = LogManager.getLogger(CompositeIndexingExecutionEngine.class); + + private final IndexingExecutionEngine primaryEngine; + private final List> secondaryEngines; + private final List> allEngines; + private final CompositeDataFormat compositeDataFormat; + + /** + * Constructs a CompositeIndexingExecutionEngine by reading index settings to + * determine the primary and secondary data formats, validating that all configured + * formats are registered, and creating per-format engines via the discovered + * {@link DataFormatPlugin} instances. + *

+ * The primary engine is the authoritative format used for merge operations and + * commit coordination. Secondary engines receive writes alongside the primary but + * are not used as the merge authority. + * + * @param dataFormatPlugins the discovered data format plugins keyed by format name + * @param indexSettings the index settings containing composite configuration + * @param mapperService the mapper service for field mapping resolution + * @param shardPath the shard path for file storage + * @throws IllegalStateException if composite indexing is not enabled + * @throws IllegalArgumentException if any configured format is not registered + */ + public CompositeIndexingExecutionEngine( + Map dataFormatPlugins, + IndexSettings indexSettings, + MapperService mapperService, + ShardPath shardPath + ) { + Objects.requireNonNull(dataFormatPlugins, "dataFormatPlugins must not be null"); + Objects.requireNonNull(indexSettings, "indexSettings must not be null"); + + Settings settings = indexSettings.getSettings(); + boolean compositeEnabled = CompositeEnginePlugin.COMPOSITE_ENABLED.get(settings); + if (compositeEnabled == false) { + throw new IllegalStateException( + "Composite indexing is not enabled for index [" + indexSettings.getIndex().getName() + "]" + ); + } + + String primaryFormatName = CompositeEnginePlugin.PRIMARY_DATA_FORMAT.get(settings); + List secondaryFormatNames = CompositeEnginePlugin.SECONDARY_DATA_FORMATS.get(settings); + + validateFormatsRegistered(dataFormatPlugins, primaryFormatName, secondaryFormatNames); + + DataFormatPlugin primaryPlugin = dataFormatPlugins.get(primaryFormatName); + this.primaryEngine = primaryPlugin.indexingEngine(mapperService, shardPath, indexSettings); + + List> secondaries = new ArrayList<>(); + for (String secondaryName : secondaryFormatNames) { + if (secondaryName.equals(primaryFormatName)) { + logger.warn("Secondary data format [{}] is the same as primary, skipping duplicate", secondaryName); + continue; + } + DataFormatPlugin secondaryPlugin = dataFormatPlugins.get(secondaryName); + secondaries.add(secondaryPlugin.indexingEngine(mapperService, shardPath, indexSettings)); + } + this.secondaryEngines = List.copyOf(secondaries); + + List> all = new ArrayList<>(); + all.add(this.primaryEngine); + all.addAll(this.secondaryEngines); + this.allEngines = List.copyOf(all); + + List allFormats = new ArrayList<>(); + for (IndexingExecutionEngine engine : this.allEngines) { + allFormats.add(engine.getDataFormat()); + } + this.compositeDataFormat = new CompositeDataFormat(allFormats, this.primaryEngine.getDataFormat()); + } + + /** + * Validates that the primary and all secondary data format plugins are registered. + * + * @param dataFormatPlugins the discovered data format plugins keyed by format name + * @param primaryFormatName the configured primary format name + * @param secondaryFormatNames the configured secondary format names + * @throws IllegalArgumentException if any configured format is not registered + */ + static void validateFormatsRegistered( + Map dataFormatPlugins, + String primaryFormatName, + List secondaryFormatNames + ) { + if (dataFormatPlugins.containsKey(primaryFormatName) == false) { + throw new IllegalArgumentException( + "Primary data format [" + + primaryFormatName + + "] is not registered on this node. Available formats: " + + dataFormatPlugins.keySet() + ); + } + for (String secondaryName : secondaryFormatNames) { + if (secondaryName.equals(primaryFormatName)) { + continue; + } + if (dataFormatPlugins.containsKey(secondaryName) == false) { + throw new IllegalArgumentException( + "Secondary data format [" + + secondaryName + + "] is not registered on this node. Available formats: " + + dataFormatPlugins.keySet() + ); + } + } + } + + @Override + public Writer createWriter(long writerGeneration) { + Map> writerMap = new LinkedHashMap<>(); + for (IndexingExecutionEngine engine : allEngines) { + Writer writer = engine.createWriter(writerGeneration); + writerMap.put(engine.getDataFormat(), writer); + } + return new CompositeWriter(writerMap); + } + + @Override + public Merger getMerger() { + return primaryEngine.getMerger(); + } + + @Override + public RefreshResult refresh(RefreshInput refreshInput) throws IOException { + List allSegments = new ArrayList<>(); + for (IndexingExecutionEngine engine : allEngines) { + RefreshResult result = engine.refresh(refreshInput); + allSegments.addAll(result.refreshedSegments()); + } + return new RefreshResult(allSegments); + } + + @Override + public CompositeDataFormat getDataFormat() { + return compositeDataFormat; + } + + @Override + public long getNativeBytesUsed() { + long total = 0; + for (IndexingExecutionEngine engine : allEngines) { + total += engine.getNativeBytesUsed(); + } + return total; + } + + @Override + public void deleteFiles(Map> filesToDelete) throws IOException { + for (IndexingExecutionEngine engine : allEngines) { + engine.deleteFiles(filesToDelete); + } + } + + @Override + public CompositeDocumentInput newDocumentInput() { + Map> inputMap = new HashMap<>(); + for (IndexingExecutionEngine engine : allEngines) { + DocumentInput input = engine.newDocumentInput(); + inputMap.put(engine.getDataFormat(), input); + } + return new CompositeDocumentInput(inputMap); + } + + /** + * Returns the primary indexing execution engine. + * + * @return the primary engine + */ + public IndexingExecutionEngine getPrimaryEngine() { + return primaryEngine; + } + + /** + * Returns an unmodifiable list of secondary indexing execution engines. + * + * @return the secondary engines + */ + public List> getSecondaryEngines() { + return secondaryEngines; + } + + /** + * Returns an unmodifiable list of all engines (primary first, then secondaries). + * + * @return all engines + */ + public List> getAllEngines() { + return allEngines; + } +} diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java new file mode 100644 index 0000000000000..35f798aedd3fc --- /dev/null +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java @@ -0,0 +1,145 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.dataformat.DocumentInput; +import org.opensearch.index.engine.dataformat.FileInfos; +import org.opensearch.index.engine.dataformat.WriteResult; +import org.opensearch.index.engine.dataformat.Writer; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A composite {@link Writer} that wraps one {@link Writer} per registered data format + * and delegates write operations to each per-format writer. + *

+ * {@code addDoc()} extracts per-format inputs from the {@link CompositeDocumentInput}'s + * {@code getFinalInput()} map and calls {@code addDoc()} on each per-format writer. + * {@code flush()} aggregates {@link FileInfos} from all per-format writers. + * {@code sync()} and {@code close()} delegate to each per-format writer. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class CompositeWriter implements Writer, Lock { + + private static final Logger logger = LogManager.getLogger(CompositeWriter.class); + private final ReentrantLock lock; + private final Map> writers; + + /** + * Constructs a CompositeWriter wrapping the given per-format writers. + * + * @param writers a map of data format to its corresponding writer + */ + public CompositeWriter(Map> writers) { + this.writers = Collections.unmodifiableMap(Objects.requireNonNull(writers, "writers must not be null")); + this.lock = new ReentrantLock(); + } + + @SuppressWarnings("unchecked") + @Override + public WriteResult addDoc(CompositeDocumentInput doc) throws IOException { + Map perFormatInputs = doc.getFinalInput(); + WriteResult lastResult; + for (Map.Entry> entry : writers.entrySet()) { + DataFormat format = entry.getKey(); + Writer writer = entry.getValue(); + Object input = perFormatInputs.get(format); + if (input == null) { + logger.debug("No input found for data format [{}], skipping addDoc", format.name()); + continue; + } + lastResult = ((Writer>) writer).addDoc((DocumentInput) input); + + switch (lastResult) { + case WriteResult.Success s -> logger.debug("Successfully added document [{}] in [{}]", s, format); + case WriteResult.Failure f -> { + logger.debug("Failed to add document [{}] in [{}]", f, format); + return lastResult; + } + } + } + + if (lastResult == null) { + return new WriteResult.Success(false, null, -1L); + } + return lastResult; + } + + @Override + public FileInfos flush() throws IOException { + FileInfos.Builder builder = FileInfos.builder(); + for (Writer writer : writers.values()) { + FileInfos perFormatInfos = writer.flush(); + builder.putAll(perFormatInfos.getWriterFilesMap()); + } + return builder.build(); + } + + @Override + public void sync() throws IOException { + for (Writer writer : writers.values()) { + writer.sync(); + } + } + + @Override + public void close() { + for (Map.Entry> entry : writers.entrySet()) { + try { + entry.getValue().close(); + } catch (Exception e) { + logger.warn(new ParameterizedMessage("Failed to close per-format Writer for format [{}]", entry.getKey().name()), e); + } + } + } + + @Override + public void lock() { + lock.lock(); + } + + @Override + public void lockInterruptibly() throws InterruptedException { + lock.lockInterruptibly(); + } + + @Override + public boolean tryLock() { + return lock.tryLock(); + } + + @Override + public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + return lock.tryLock(time, unit); + } + + @Override + public void unlock() { + lock.unlock(); + } + + @Override + public Condition newCondition() { + throw new UnsupportedOperationException(); + } +} diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/package-info.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/package-info.java new file mode 100644 index 0000000000000..216fe2c5469ae --- /dev/null +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/package-info.java @@ -0,0 +1,17 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Composite indexing engine plugin that orchestrates multi-format indexing + * across multiple data format engines. + * @opensearch.experimental + */ +@ExperimentalApi +package org.opensearch.composite; + +import org.opensearch.common.annotation.ExperimentalApi; diff --git a/server/src/main/java/org/opensearch/index/engine/exec/IndexFileReferenceManager.java b/server/src/main/java/org/opensearch/index/engine/exec/IndexFileReferenceManager.java new file mode 100644 index 0000000000000..353ebe6ec699f --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/IndexFileReferenceManager.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec; + +import org.opensearch.common.annotation.ExperimentalApi; + +/** + * Manages reference counting for index files to prevent premature deletion. + * Tracks which files are in use by catalog snapshots and ensures files are only + * deleted when no longer referenced by any snapshot. + * + * @opensearch.experimental + */ +@ExperimentalApi +public interface IndexFileReferenceManager { + /** + * Adds file references for all files in the given catalog snapshot. + * Increments reference counts to prevent deletion while the snapshot is in use. + * + * @param snapshot the catalog snapshot whose files should be referenced + */ + void addFileReferences(CatalogSnapshot snapshot); + + /** + * Removes file references for all files in the given catalog snapshot. + * Decrements reference counts and may trigger deletion of unreferenced files. + * + * @param snapshot the catalog snapshot whose file references should be removed + */ + void removeFileReferences(CatalogSnapshot snapshot); +}