diff --git a/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/Lockable.java b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/Lockable.java
new file mode 100644
index 0000000000000..13f84ef15a885
--- /dev/null
+++ b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/Lockable.java
@@ -0,0 +1,37 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.composite.queue;
+
+import jdk.jfr.Experimental;
+
+/**
+ * A minimal locking contract for objects managed by a {@link LockableConcurrentQueue}.
+ *
+ * @opensearch.experimental
+ */
+@Experimental
+public interface Lockable {
+
+ /**
+ * Acquires the lock.
+ */
+ void lock();
+
+ /**
+ * Attempts to acquire the lock without blocking.
+ *
+ * @return {@code true} if the lock was acquired
+ */
+ boolean tryLock();
+
+ /**
+ * Releases the lock.
+ */
+ void unlock();
+}
diff --git a/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/LockableConcurrentQueue.java b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/LockableConcurrentQueue.java
index 2f1be95db2222..5a6533cef7771 100644
--- a/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/LockableConcurrentQueue.java
+++ b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/LockableConcurrentQueue.java
@@ -10,12 +10,11 @@
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
import java.util.function.Supplier;
/**
* A concurrent queue wrapper that adds lock-and-poll / add-and-unlock semantics
- * on top of {@link ConcurrentQueue}. Entries must implement {@link Lock} so that
+ * on top of {@link ConcurrentQueue}. Entries must implement {@link Lockable} so that
* they can be atomically locked when polled and unlocked when returned.
*
* This is used by the composite writer pool to ensure that a writer is locked
@@ -24,7 +23,7 @@
* @param the type of lockable elements held in this queue
* @opensearch.experimental
*/
-public final class LockableConcurrentQueue {
+public final class LockableConcurrentQueue {
private final ConcurrentQueue queue;
private final AtomicInteger addAndUnlockCounter = new AtomicInteger();
@@ -47,7 +46,7 @@ public T lockAndPoll() {
int addAndUnlockCount;
do {
addAndUnlockCount = addAndUnlockCounter.get();
- T entry = queue.poll(Lock::tryLock);
+ T entry = queue.poll(Lockable::tryLock);
if (entry != null) {
return entry;
}
diff --git a/sandbox/libs/composite-engine-lib/src/test/java/org/opensearch/composite/queue/LockableConcurrentQueueTests.java b/sandbox/libs/composite-engine-lib/src/test/java/org/opensearch/composite/queue/LockableConcurrentQueueTests.java
index 35716a8fdd1ae..e5a155a3dadb0 100644
--- a/sandbox/libs/composite-engine-lib/src/test/java/org/opensearch/composite/queue/LockableConcurrentQueueTests.java
+++ b/sandbox/libs/composite-engine-lib/src/test/java/org/opensearch/composite/queue/LockableConcurrentQueueTests.java
@@ -24,13 +24,37 @@ public class LockableConcurrentQueueTests extends OpenSearchTestCase {
/**
* A simple lockable entry for testing.
*/
- static class LockableEntry extends ReentrantLock {
+ static class LockableEntry implements Lockable {
final String id;
+ private final ReentrantLock delegate = new ReentrantLock();
LockableEntry(String id) {
this.id = id;
}
+ @Override
+ public void lock() {
+ delegate.lock();
+ }
+
+ @Override
+ public boolean tryLock() {
+ return delegate.tryLock();
+ }
+
+ @Override
+ public void unlock() {
+ delegate.unlock();
+ }
+
+ boolean isHeldByCurrentThread() {
+ return delegate.isHeldByCurrentThread();
+ }
+
+ boolean isLocked() {
+ return delegate.isLocked();
+ }
+
@Override
public String toString() {
return "LockableEntry{" + id + "}";
diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java
index c1f74fb60627a..2633ad0f30330 100644
--- a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java
+++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java
@@ -24,7 +24,7 @@
* @opensearch.experimental
*/
@ExperimentalApi
-public class CompositeDataFormat implements DataFormat {
+public class CompositeDataFormat extends DataFormat {
private final List dataFormats;
diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java
index 8836409a9af72..83e909d3f065c 100644
--- a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java
+++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java
@@ -17,7 +17,6 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.dataformat.DataFormat;
import org.opensearch.index.engine.dataformat.DataFormatPlugin;
-import org.opensearch.index.engine.dataformat.DocumentInput;
import org.opensearch.index.engine.dataformat.IndexingExecutionEngine;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.shard.ShardPath;
@@ -129,6 +128,10 @@ public void loadExtensions(ExtensionLoader loader) {
continue;
}
String name = format.name();
+ if (name == null || name.isBlank()) {
+ logger.warn("DataFormatPlugin [{}] returned a DataFormat with null/blank name, skipping", plugin.getClass().getName());
+ continue;
+ }
DataFormatPlugin existing = registry.get(name);
if (existing != null) {
long existingPriority = existing.getDataFormat().priority();
@@ -180,13 +183,12 @@ public DataFormat getDataFormat() {
}
@Override
- @SuppressWarnings("unchecked")
- public > IndexingExecutionEngine indexingEngine(
+ public IndexingExecutionEngine, ?> indexingEngine(
MapperService mapperService,
ShardPath shardPath,
IndexSettings indexSettings
) {
- return (IndexingExecutionEngine) new CompositeIndexingExecutionEngine(
+ return new CompositeIndexingExecutionEngine(
dataFormatPlugins,
indexSettings,
mapperService,
diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java
index 0208d8d21da22..42190afc90b7b 100644
--- a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java
+++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java
@@ -105,10 +105,6 @@ public CompositeIndexingExecutionEngine(
List> secondaries = new ArrayList<>();
for (String secondaryName : secondaryFormatNames) {
- if (secondaryName.equals(primaryFormatName)) {
- logger.warn("Secondary data format [{}] is the same as primary, skipping duplicate", secondaryName);
- continue;
- }
DataFormatPlugin secondaryPlugin = dataFormatPlugins.get(secondaryName);
secondaries.add(secondaryPlugin.indexingEngine(mapperService, shardPath, indexSettings));
allFormats.add(secondaryPlugin.getDataFormat());
diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java
index 43380beb5a025..406f976b3f20c 100644
--- a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java
+++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java
@@ -21,15 +21,13 @@
import java.io.IOException;
import java.util.AbstractMap;
-import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.opensearch.composite.queue.Lockable;
+
/**
* A composite {@link Writer} that wraps one {@link Writer} per registered data format
* and delegates write operations to each per-format writer.
@@ -42,12 +40,12 @@
* @opensearch.experimental
*/
@ExperimentalApi
-public class CompositeWriter implements Writer, Lock {
+public class CompositeWriter implements Writer, Lockable {
private static final Logger logger = LogManager.getLogger(CompositeWriter.class);
private final Map.Entry>> primaryWriter;
- private final Map>> secondaryWriters;
+ private final Map>> secondaryWritersByFormat;
private final ReentrantLock lock;
private final long writerGeneration;
private final RowIdGenerator rowIdGenerator;
@@ -79,9 +77,12 @@ public CompositeWriter(CompositeIndexingExecutionEngine engine, long writerGener
Map>> secondaries = new LinkedHashMap<>();
for (IndexingExecutionEngine, ?> delegate : engine.getSecondaryDelegates()) {
- secondaries.put(delegate.getDataFormat(), (Writer>) delegate.createWriter(writerGeneration));
+ secondaries.put(
+ delegate.getDataFormat(),
+ (Writer>) delegate.createWriter(writerGeneration)
+ );
}
- this.secondaryWriters = Collections.unmodifiableMap(secondaries);
+ this.secondaryWritersByFormat = Map.copyOf(secondaries);
this.rowIdGenerator = new RowIdGenerator(CompositeWriter.class.getName());
}
@@ -97,19 +98,20 @@ public WriteResult addDoc(CompositeDocumentInput doc) throws IOException {
}
}
- // Then write to each secondary by matching format keys
- Map> secondaryInputMap = doc.getSecondaryInputs();
- for (Map.Entry>> entry : secondaryWriters.entrySet()) {
- DocumentInput> input = secondaryInputMap.get(entry.getKey());
- if (input == null) {
- logger.warn("No secondary input found for format [{}], skipping", entry.getKey().name());
+ // Then write to each secondary — keyed lookup by DataFormat (equals/hashCode based on name)
+ Map> secondaryInputs = doc.getSecondaryInputs();
+ for (Map.Entry> inputEntry : secondaryInputs.entrySet()) {
+ DataFormat format = inputEntry.getKey();
+ Writer> writer = secondaryWritersByFormat.get(format);
+ if (writer == null) {
+ logger.warn("No writer found for secondary format [{}], skipping", format.name());
continue;
}
- WriteResult result = entry.getValue().addDoc(input);
+ WriteResult result = writer.addDoc(inputEntry.getValue());
switch (result) {
- case WriteResult.Success s -> logger.trace("Successfully added document in secondary format [{}]", entry.getKey().name());
+ case WriteResult.Success s -> logger.trace("Successfully added document in secondary format [{}]", format.name());
case WriteResult.Failure f -> {
- logger.debug("Failed to add document in secondary format [{}]", entry.getKey().name());
+ logger.debug("Failed to add document in secondary format [{}]", format.name());
return result;
}
}
@@ -125,9 +127,12 @@ public FileInfos flush() throws IOException {
Optional primaryWfs = primaryWriter.getValue().flush().getWriterFileSet(primaryWriter.getKey());
primaryWfs.ifPresent(writerFileSet -> builder.putWriterFileSet(primaryWriter.getKey(), writerFileSet));
// Flush secondaries
- for (Map.Entry>> entry : secondaryWriters.entrySet()) {
- Optional wfs = entry.getValue().flush().getWriterFileSet(entry.getKey());
- wfs.ifPresent(writerFileSet -> builder.putWriterFileSet(entry.getKey(), writerFileSet));
+ for (Writer> writer : secondaryWritersByFormat.values()) {
+ FileInfos fileInfos = writer.flush();
+ // Iterate all format entries in the returned FileInfos
+ for (Map.Entry fileEntry : fileInfos.writerFilesMap().entrySet()) {
+ builder.putWriterFileSet(fileEntry.getKey(), fileEntry.getValue());
+ }
}
return builder.build();
}
@@ -135,7 +140,7 @@ public FileInfos flush() throws IOException {
@Override
public void sync() throws IOException {
primaryWriter.getValue().sync();
- for (Writer> writer : secondaryWriters.values()) {
+ for (Writer> writer : secondaryWritersByFormat.values()) {
writer.sync();
}
}
@@ -143,7 +148,7 @@ public void sync() throws IOException {
@Override
public void close() throws IOException {
primaryWriter.getValue().close();
- for (Writer> writer : secondaryWriters.values()) {
+ for (Writer> writer : secondaryWritersByFormat.values()) {
writer.close();
}
}
@@ -194,28 +199,13 @@ public void lock() {
lock.lock();
}
- @Override
- public void lockInterruptibly() throws InterruptedException {
- lock.lockInterruptibly();
- }
-
@Override
public boolean tryLock() {
return lock.tryLock();
}
- @Override
- public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
- return lock.tryLock(time, unit);
- }
-
@Override
public void unlock() {
lock.unlock();
}
-
- @Override
- public Condition newCondition() {
- throw new UnsupportedOperationException();
- }
}
diff --git a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java
index c0399d32de735..81ee48bedfd78 100644
--- a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java
+++ b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java
@@ -156,15 +156,11 @@ public DataFormat getDataFormat() {
}
@Override
- public <
- T extends DataFormat,
- P extends org.opensearch.index.engine.dataformat.DocumentInput>>
- org.opensearch.index.engine.dataformat.IndexingExecutionEngine
- indexingEngine(
- org.opensearch.index.mapper.MapperService mapperService,
- org.opensearch.index.shard.ShardPath shardPath,
- IndexSettings indexSettings
- ) {
+ public org.opensearch.index.engine.dataformat.IndexingExecutionEngine, ?> indexingEngine(
+ org.opensearch.index.mapper.MapperService mapperService,
+ org.opensearch.index.shard.ShardPath shardPath,
+ IndexSettings indexSettings
+ ) {
return null;
}
};
diff --git a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeTestHelper.java b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeTestHelper.java
index e3eb809702269..7455b9fac7363 100644
--- a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeTestHelper.java
+++ b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeTestHelper.java
@@ -76,13 +76,12 @@ public DataFormat getDataFormat() {
}
@Override
- @SuppressWarnings("unchecked")
- public > IndexingExecutionEngine indexingEngine(
+ public IndexingExecutionEngine, ?> indexingEngine(
MapperService mapperService,
ShardPath shardPath,
IndexSettings indexSettings
) {
- return (IndexingExecutionEngine) new StubIndexingExecutionEngine(format);
+ return new StubIndexingExecutionEngine(format);
}
};
}
@@ -96,13 +95,12 @@ public DataFormat getDataFormat() {
}
@Override
- @SuppressWarnings("unchecked")
- public > IndexingExecutionEngine indexingEngine(
+ public IndexingExecutionEngine, ?> indexingEngine(
MapperService mapperService,
ShardPath shardPath,
IndexSettings indexSettings
) {
- return (IndexingExecutionEngine) new StubIndexingExecutionEngine(format);
+ return new StubIndexingExecutionEngine(format);
}
};
}
diff --git a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeWriterTests.java b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeWriterTests.java
index 83d444a2b55f0..c34f28fca0ca0 100644
--- a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeWriterTests.java
+++ b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeWriterTests.java
@@ -12,7 +12,6 @@
import org.opensearch.test.OpenSearchTestCase;
import java.io.IOException;
-import java.util.concurrent.TimeUnit;
/**
* Tests for {@link CompositeWriter}.
@@ -76,19 +75,6 @@ public void testTryLockSucceedsWhenUnlocked() throws IOException {
writer.close();
}
- public void testTryLockWithTimeoutSucceeds() throws Exception {
- CompositeWriter writer = new CompositeWriter(engine, 0);
- assertTrue(writer.tryLock(100, TimeUnit.MILLISECONDS));
- writer.unlock();
- writer.close();
- }
-
- public void testNewConditionThrowsUnsupported() throws IOException {
- CompositeWriter writer = new CompositeWriter(engine, 0);
- expectThrows(UnsupportedOperationException.class, writer::newCondition);
- writer.close();
- }
-
public void testFlushReturnsFileInfos() throws IOException {
CompositeWriter writer = new CompositeWriter(engine, 0);
FileInfos fileInfos = writer.flush();
@@ -109,10 +95,4 @@ public void testCloseDoesNotThrow() throws IOException {
writer.close();
}
- public void testLockInterruptiblySucceeds() throws Exception {
- CompositeWriter writer = new CompositeWriter(engine, 0);
- writer.lockInterruptibly();
- writer.unlock();
- writer.close();
- }
}
diff --git a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormat.java b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormat.java
index 668571b97e55c..ed261b976403f 100644
--- a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormat.java
+++ b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormat.java
@@ -10,24 +10,26 @@
import org.opensearch.common.annotation.ExperimentalApi;
+import java.util.Objects;
import java.util.Set;
/**
* Represents a data format for storing and managing index data, with declared capabilities.
* Each data format (e.g., Lucene, Parquet) declares what storage and query capabilities it supports.
*
- * Equality is based on the format name — there should be one {@code DataFormat} instance per unique name.
+ * Equality is based on the format {@link #name()} — there should be one {@code DataFormat} instance
+ * per unique name. This allows {@code DataFormat} to be used safely as a {@link java.util.Map} key.
*
* @opensearch.experimental
*/
@ExperimentalApi
-public interface DataFormat {
+public abstract class DataFormat {
/**
* Returns the unique name of this data format.
*
* @return the data format name
*/
- String name();
+ public abstract String name();
/**
* Returns the priority of this data format. Higher priority formats are preferred
@@ -35,12 +37,24 @@ public interface DataFormat {
*
* @return the priority value
*/
- long priority();
+ public abstract long priority();
/**
* Returns the set of field type capabilities supported by this data format.
*
* @return the supported field type capabilities
*/
- Set supportedFields();
+ public abstract Set supportedFields();
+
+ @Override
+ public final boolean equals(Object o) {
+ if (this == o) return true;
+ if (o instanceof DataFormat == false) return false;
+ return Objects.equals(name(), ((DataFormat) o).name());
+ }
+
+ @Override
+ public final int hashCode() {
+ return Objects.hashCode(name());
+ }
}
diff --git a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java
index c2a1fc695652a..6433aa8c0f123 100644
--- a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java
+++ b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java
@@ -36,11 +36,9 @@ public interface DataFormatPlugin {
* @param mapperService the mapper service for field mapping resolution
* @param shardPath the shard path for file storage
* @param indexSettings the index settings
- * @param the data format type
- * @param the document input type
* @return the indexing execution engine instance
*/
- > IndexingExecutionEngine indexingEngine(
+ IndexingExecutionEngine, ?> indexingEngine(
MapperService mapperService,
ShardPath shardPath,
IndexSettings indexSettings
diff --git a/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java b/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java
index 117ce798494f2..6f7f8c7c1307f 100644
--- a/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java
+++ b/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java
@@ -62,11 +62,13 @@ public void testFullDataFormatLifecycle() throws IOException {
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT)
.build();
- IndexingExecutionEngine engine = plugin.indexingEngine(
- mock(MapperService.class),
- new ShardPath(false, Path.of("/tmp/uuid/0"), Path.of("/tmp/uuid/0"), new ShardId("index", "uuid", 0)),
- new IndexSettings(IndexMetadata.builder("index").settings(settings).build(), settings)
- );
+ @SuppressWarnings("unchecked")
+ IndexingExecutionEngine engine =
+ (IndexingExecutionEngine) plugin.indexingEngine(
+ mock(MapperService.class),
+ new ShardPath(false, Path.of("/tmp/uuid/0"), Path.of("/tmp/uuid/0"), new ShardId("index", "uuid", 0)),
+ new IndexSettings(IndexMetadata.builder("index").settings(settings).build(), settings)
+ );
assertEquals(format, engine.getDataFormat());
// 2. Create a writer and write documents
@@ -222,7 +224,7 @@ public void testRefreshInput() {
assertEquals(1, input.existingSegments().size());
}
- static class MockDataFormat implements DataFormat {
+ static class MockDataFormat extends DataFormat {
@Override
public String name() {
return "mock-columnar";
@@ -400,13 +402,12 @@ public DataFormat getDataFormat() {
}
@Override
- @SuppressWarnings("unchecked")
- public > IndexingExecutionEngine indexingEngine(
+ public IndexingExecutionEngine, ?> indexingEngine(
MapperService mapperService,
ShardPath shardPath,
IndexSettings indexSettings
) {
- return (IndexingExecutionEngine) new MockIndexingExecutionEngine(dataFormat);
+ return new MockIndexingExecutionEngine(dataFormat);
}
}
}