From 0fdb5ea1f3af500eb295c5069fa0b7788ffd70ab Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Thu, 19 Mar 2026 09:40:20 +0530 Subject: [PATCH 1/3] Add sandbox plugin for composite engine Signed-off-by: Arpit Bandejiya --- .gitignore | 1 + .../libs/composite-engine-lib/build.gradle | 27 ++ .../composite/queue/ConcurrentQueue.java | 140 +++++++++ .../queue/LockableConcurrentQueue.java | 80 +++++ .../composite/queue/package-info.java | 14 + .../composite/queue/ConcurrentQueueTests.java | 163 ++++++++++ .../queue/LockableConcurrentQueueTests.java | 194 ++++++++++++ sandbox/plugins/composite-engine/README.md | 32 ++ sandbox/plugins/composite-engine/build.gradle | 18 ++ .../composite/CompositeDataFormat.java | 82 +++++ .../CompositeDataFormatWriterPool.java | 151 ++++++++++ .../composite/CompositeDocumentInput.java | 111 +++++++ .../composite/CompositeEnginePlugin.java | 205 +++++++++++++ .../CompositeIndexingExecutionEngine.java | 283 ++++++++++++++++++ .../opensearch/composite/CompositeWriter.java | 223 ++++++++++++++ .../opensearch/composite/RowIdGenerator.java | 64 ++++ .../opensearch/composite/package-info.java | 17 ++ .../composite/CompositeDataFormatTests.java | 98 ++++++ .../CompositeDataFormatWriterPoolTests.java | 173 +++++++++++ .../CompositeDocumentInputTests.java | 224 ++++++++++++++ .../composite/CompositeEnginePluginTests.java | 214 +++++++++++++ ...CompositeIndexingExecutionEngineTests.java | 203 +++++++++++++ .../composite/CompositeTestHelper.java | 225 ++++++++++++++ .../composite/CompositeWriterTests.java | 118 ++++++++ .../composite/RowIdGeneratorTests.java | 53 ++++ 25 files changed, 3113 insertions(+) create mode 100644 sandbox/libs/composite-engine-lib/build.gradle create mode 100644 sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/ConcurrentQueue.java create mode 100644 sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/LockableConcurrentQueue.java create mode 100644 sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/package-info.java create mode 100644 sandbox/libs/composite-engine-lib/src/test/java/org/opensearch/composite/queue/ConcurrentQueueTests.java create mode 100644 sandbox/libs/composite-engine-lib/src/test/java/org/opensearch/composite/queue/LockableConcurrentQueueTests.java create mode 100644 sandbox/plugins/composite-engine/README.md create mode 100644 sandbox/plugins/composite-engine/build.gradle create mode 100644 sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java create mode 100644 sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormatWriterPool.java create mode 100644 sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDocumentInput.java create mode 100644 sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java create mode 100644 sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java create mode 100644 sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java create mode 100644 sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/RowIdGenerator.java create mode 100644 sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/package-info.java create mode 100644 sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDataFormatTests.java create mode 100644 sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDataFormatWriterPoolTests.java create mode 100644 sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDocumentInputTests.java create mode 100644 sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java create mode 100644 sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeIndexingExecutionEngineTests.java create mode 100644 sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeTestHelper.java create mode 100644 sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeWriterTests.java create mode 100644 sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/RowIdGeneratorTests.java diff --git a/.gitignore b/.gitignore index 0a784701375d9..599ea7878a1c4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ .claude CLAUDE.md .cursor* +.kiro* # intellij files .idea/ diff --git a/sandbox/libs/composite-engine-lib/build.gradle b/sandbox/libs/composite-engine-lib/build.gradle new file mode 100644 index 0000000000000..955b438f76922 --- /dev/null +++ b/sandbox/libs/composite-engine-lib/build.gradle @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Shared concurrent queue utilities for the composite indexing engine. + * No external dependencies — pure Java concurrency primitives. + */ + +dependencies { + testImplementation "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}" + testImplementation "junit:junit:${versions.junit}" + testImplementation "org.hamcrest:hamcrest:${versions.hamcrest}" + testImplementation(project(":test:framework")) { + exclude group: 'org.opensearch', module: 'opensearch-composite-engine-lib' + } +} + +testingConventions.enabled = true + +tasks.named('forbiddenApisMain').configure { + replaceSignatureFiles 'jdk-signatures' +} diff --git a/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/ConcurrentQueue.java b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/ConcurrentQueue.java new file mode 100644 index 0000000000000..08eee90a5f54a --- /dev/null +++ b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/ConcurrentQueue.java @@ -0,0 +1,140 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite.queue; + +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Predicate; +import java.util.function.Supplier; + +/** + * A striped concurrent queue that distributes entries across multiple internal + * queues using thread-affinity-based hashing. This reduces contention by allowing + * concurrent threads to operate on different stripes without blocking each other. + * + * @param the type of elements held in this queue + * @opensearch.experimental + */ +public final class ConcurrentQueue { + + static final int MIN_CONCURRENCY = 1; + static final int MAX_CONCURRENCY = 256; + + private final int concurrency; + private final Lock[] locks; + private final Queue[] queues; + private final Supplier> queueSupplier; + + ConcurrentQueue(Supplier> queueSupplier, int concurrency) { + if (concurrency < MIN_CONCURRENCY || concurrency > MAX_CONCURRENCY) { + throw new IllegalArgumentException( + "concurrency must be in [" + MIN_CONCURRENCY + ", " + MAX_CONCURRENCY + "], got " + concurrency + ); + } + this.concurrency = concurrency; + this.queueSupplier = queueSupplier; + locks = new Lock[concurrency]; + @SuppressWarnings({ "rawtypes", "unchecked" }) + Queue[] queues = new Queue[concurrency]; + this.queues = queues; + for (int i = 0; i < concurrency; ++i) { + locks[i] = new ReentrantLock(); + queues[i] = queueSupplier.get(); + } + } + + void add(T entry) { + // Seed the order in which to look at entries based on the current thread. This helps distribute + // entries across queues and gives a bit of thread affinity between entries and threads, which + // can't hurt. + final int threadHash = Thread.currentThread().hashCode() & 0xFFFF; + for (int i = 0; i < concurrency; ++i) { + final int index = (threadHash + i) % concurrency; + final Lock lock = locks[index]; + final Queue queue = queues[index]; + if (lock.tryLock()) { + try { + queue.add(entry); + return; + } finally { + lock.unlock(); + } + } + } + final int index = threadHash % concurrency; + final Lock lock = locks[index]; + final Queue queue = queues[index]; + lock.lock(); + try { + queue.add(entry); + } finally { + lock.unlock(); + } + } + + T poll(Predicate predicate) { + final int threadHash = Thread.currentThread().hashCode() & 0xFFFF; + for (int i = 0; i < concurrency; ++i) { + final int index = (threadHash + i) % concurrency; + final Lock lock = locks[index]; + final Queue queue = queues[index]; + if (lock.tryLock()) { + try { + Iterator it = queue.iterator(); + while (it.hasNext()) { + T entry = it.next(); + if (predicate.test(entry)) { + it.remove(); + return entry; + } + } + } finally { + lock.unlock(); + } + } + } + for (int i = 0; i < concurrency; ++i) { + final int index = (threadHash + i) % concurrency; + final Lock lock = locks[index]; + final Queue queue = queues[index]; + lock.lock(); + try { + Iterator it = queue.iterator(); + while (it.hasNext()) { + T entry = it.next(); + if (predicate.test(entry)) { + it.remove(); + return entry; + } + } + } finally { + lock.unlock(); + } + } + return null; + } + + boolean remove(T entry) { + for (int i = 0; i < concurrency; ++i) { + final Lock lock = locks[i]; + final Queue queue = queues[i]; + lock.lock(); + try { + if (queue.remove(entry)) { + return true; + } + } finally { + lock.unlock(); + } + } + return false; + } +} diff --git a/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/LockableConcurrentQueue.java b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/LockableConcurrentQueue.java new file mode 100644 index 0000000000000..2f1be95db2222 --- /dev/null +++ b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/LockableConcurrentQueue.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite.queue; + +import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.function.Supplier; + +/** + * A concurrent queue wrapper that adds lock-and-poll / add-and-unlock semantics + * on top of {@link ConcurrentQueue}. Entries must implement {@link Lock} so that + * they can be atomically locked when polled and unlocked when returned. + *

+ * This is used by the composite writer pool to ensure that a writer is locked + * before it is handed out and unlocked when it is returned. + * + * @param the type of lockable elements held in this queue + * @opensearch.experimental + */ +public final class LockableConcurrentQueue { + + private final ConcurrentQueue queue; + private final AtomicInteger addAndUnlockCounter = new AtomicInteger(); + + /** + * Creates a new lockable concurrent queue. + * + * @param queueSupplier supplier for the underlying queue instances + * @param concurrency the concurrency level (number of stripes) + */ + public LockableConcurrentQueue(Supplier> queueSupplier, int concurrency) { + this.queue = new ConcurrentQueue<>(queueSupplier, concurrency); + } + + /** + * Lock an entry, and poll it from the queue, in that order. If no entry can be found and locked, + * {@code null} is returned. + */ + public T lockAndPoll() { + int addAndUnlockCount; + do { + addAndUnlockCount = addAndUnlockCounter.get(); + T entry = queue.poll(Lock::tryLock); + if (entry != null) { + return entry; + } + // If an entry has been added to the queue in the meantime, try again. + } while (addAndUnlockCount != addAndUnlockCounter.get()); + + return null; + } + + /** + * Remove an entry from the queue. + * + * @param entry the entry to remove + * @return {@code true} if the entry was removed + */ + public boolean remove(T entry) { + return queue.remove(entry); + } + + /** + * Add an entry to the queue and unlock it, in that order. + * + * @param entry the entry to add and unlock + */ + public void addAndUnlock(T entry) { + queue.add(entry); + entry.unlock(); + addAndUnlockCounter.incrementAndGet(); + } +} diff --git a/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/package-info.java b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/package-info.java new file mode 100644 index 0000000000000..9e52cbabf073b --- /dev/null +++ b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/package-info.java @@ -0,0 +1,14 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Concurrent queue utilities for the composite indexing engine. + * + * @opensearch.experimental + */ +package org.opensearch.composite.queue; diff --git a/sandbox/libs/composite-engine-lib/src/test/java/org/opensearch/composite/queue/ConcurrentQueueTests.java b/sandbox/libs/composite-engine-lib/src/test/java/org/opensearch/composite/queue/ConcurrentQueueTests.java new file mode 100644 index 0000000000000..46ddd2179ee72 --- /dev/null +++ b/sandbox/libs/composite-engine-lib/src/test/java/org/opensearch/composite/queue/ConcurrentQueueTests.java @@ -0,0 +1,163 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite.queue; + +import org.opensearch.test.OpenSearchTestCase; + +import java.util.LinkedList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Tests for {@link ConcurrentQueue}. + */ +public class ConcurrentQueueTests extends OpenSearchTestCase { + + public void testAddAndPollSingleThread() { + ConcurrentQueue queue = new ConcurrentQueue<>(LinkedList::new, 1); + queue.add("a"); + queue.add("b"); + assertEquals("a", queue.poll(e -> true)); + assertEquals("b", queue.poll(e -> true)); + assertNull(queue.poll(e -> true)); + } + + public void testPollWithPredicateFiltering() { + ConcurrentQueue queue = new ConcurrentQueue<>(LinkedList::new, 1); + queue.add(1); + queue.add(2); + queue.add(3); + // Poll only even numbers + assertEquals(Integer.valueOf(2), queue.poll(n -> n % 2 == 0)); + // Remaining: 1, 3 + assertNull(queue.poll(n -> n % 2 == 0)); + assertEquals(Integer.valueOf(1), queue.poll(e -> true)); + assertEquals(Integer.valueOf(3), queue.poll(e -> true)); + } + + public void testPollReturnsNullOnEmpty() { + ConcurrentQueue queue = new ConcurrentQueue<>(LinkedList::new, 4); + assertNull(queue.poll(e -> true)); + } + + public void testPollPredicateAlwaysFalse() { + ConcurrentQueue queue = new ConcurrentQueue<>(LinkedList::new, 2); + queue.add("a"); + assertNull(queue.poll(e -> false)); + // Entry should still be there + assertEquals("a", queue.poll(e -> true)); + } + + public void testRemoveExistingEntry() { + ConcurrentQueue queue = new ConcurrentQueue<>(LinkedList::new, 2); + queue.add("a"); + queue.add("b"); + assertTrue(queue.remove("a")); + assertEquals("b", queue.poll(e -> true)); + assertNull(queue.poll(e -> true)); + } + + public void testRemoveNonExistentEntry() { + ConcurrentQueue queue = new ConcurrentQueue<>(LinkedList::new, 2); + queue.add("a"); + assertFalse(queue.remove("z")); + assertEquals("a", queue.poll(e -> true)); + } + + public void testRemoveFromEmpty() { + ConcurrentQueue queue = new ConcurrentQueue<>(LinkedList::new, 1); + assertFalse(queue.remove("a")); + } + + public void testConcurrencyBoundsLow() { + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new ConcurrentQueue<>(LinkedList::new, 0)); + assertTrue(e.getMessage().contains("concurrency must be in")); + } + + public void testConcurrencyBoundsHigh() { + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new ConcurrentQueue<>(LinkedList::new, 257)); + assertTrue(e.getMessage().contains("concurrency must be in")); + } + + public void testMinConcurrency() { + ConcurrentQueue queue = new ConcurrentQueue<>(LinkedList::new, ConcurrentQueue.MIN_CONCURRENCY); + queue.add("a"); + assertEquals("a", queue.poll(e -> true)); + } + + public void testMaxConcurrency() { + ConcurrentQueue queue = new ConcurrentQueue<>(LinkedList::new, ConcurrentQueue.MAX_CONCURRENCY); + queue.add("a"); + assertEquals("a", queue.poll(e -> true)); + } + + public void testMultipleStripes() { + // With higher concurrency, entries distribute across stripes + ConcurrentQueue queue = new ConcurrentQueue<>(LinkedList::new, 8); + int count = 100; + for (int i = 0; i < count; i++) { + queue.add(i); + } + AtomicInteger polled = new AtomicInteger(); + Integer entry; + while ((entry = queue.poll(e -> true)) != null) { + polled.incrementAndGet(); + } + assertEquals(count, polled.get()); + } + + public void testConcurrentAddAndPoll() throws Exception { + ConcurrentQueue queue = new ConcurrentQueue<>(LinkedList::new, 4); + int numThreads = 4; + int itemsPerThread = 250; + CyclicBarrier barrier = new CyclicBarrier(numThreads * 2); + CountDownLatch addLatch = new CountDownLatch(numThreads); + CountDownLatch pollLatch = new CountDownLatch(numThreads); + AtomicInteger totalPolled = new AtomicInteger(); + + // Producer threads + for (int t = 0; t < numThreads; t++) { + final int threadId = t; + new Thread(() -> { + try { + barrier.await(); + for (int i = 0; i < itemsPerThread; i++) { + queue.add(threadId * itemsPerThread + i); + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + addLatch.countDown(); + } + }).start(); + } + + // Consumer threads + for (int t = 0; t < numThreads; t++) { + new Thread(() -> { + try { + barrier.await(); + addLatch.await(); // Wait for all adds to complete + Integer item; + while ((item = queue.poll(e -> true)) != null) { + totalPolled.incrementAndGet(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + pollLatch.countDown(); + } + }).start(); + } + + pollLatch.await(); + assertEquals(numThreads * itemsPerThread, totalPolled.get()); + } +} diff --git a/sandbox/libs/composite-engine-lib/src/test/java/org/opensearch/composite/queue/LockableConcurrentQueueTests.java b/sandbox/libs/composite-engine-lib/src/test/java/org/opensearch/composite/queue/LockableConcurrentQueueTests.java new file mode 100644 index 0000000000000..35716a8fdd1ae --- /dev/null +++ b/sandbox/libs/composite-engine-lib/src/test/java/org/opensearch/composite/queue/LockableConcurrentQueueTests.java @@ -0,0 +1,194 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite.queue; + +import org.opensearch.test.OpenSearchTestCase; + +import java.util.LinkedList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Tests for {@link LockableConcurrentQueue}. + */ +public class LockableConcurrentQueueTests extends OpenSearchTestCase { + + /** + * A simple lockable entry for testing. + */ + static class LockableEntry extends ReentrantLock { + final String id; + + LockableEntry(String id) { + this.id = id; + } + + @Override + public String toString() { + return "LockableEntry{" + id + "}"; + } + } + + /** Helper: lock the entry, add it to the queue, which unlocks it. */ + private static void seedEntry(LockableConcurrentQueue queue, LockableEntry entry) { + entry.lock(); + queue.addAndUnlock(entry); + } + + public void testLockAndPollReturnsLockedEntry() { + LockableConcurrentQueue queue = new LockableConcurrentQueue<>(LinkedList::new, 1); + LockableEntry entry = new LockableEntry("a"); + seedEntry(queue, entry); + + LockableEntry polled = queue.lockAndPoll(); + assertNotNull(polled); + assertSame(entry, polled); + assertTrue(polled.isHeldByCurrentThread()); + polled.unlock(); + } + + public void testLockAndPollReturnsNullOnEmpty() { + LockableConcurrentQueue queue = new LockableConcurrentQueue<>(LinkedList::new, 1); + assertNull(queue.lockAndPoll()); + } + + public void testAddAndUnlockMakesEntryAvailable() { + LockableConcurrentQueue queue = new LockableConcurrentQueue<>(LinkedList::new, 1); + LockableEntry entry = new LockableEntry("a"); + entry.lock(); + queue.addAndUnlock(entry); + // Entry should be unlocked after addAndUnlock + assertFalse(entry.isLocked()); + + LockableEntry polled = queue.lockAndPoll(); + assertNotNull(polled); + assertTrue(polled.isHeldByCurrentThread()); + polled.unlock(); + } + + public void testRemoveEntry() { + LockableConcurrentQueue queue = new LockableConcurrentQueue<>(LinkedList::new, 1); + LockableEntry entry = new LockableEntry("a"); + seedEntry(queue, entry); + + assertTrue(queue.remove(entry)); + assertNull(queue.lockAndPoll()); + } + + public void testRemoveNonExistentEntry() { + LockableConcurrentQueue queue = new LockableConcurrentQueue<>(LinkedList::new, 1); + LockableEntry entry = new LockableEntry("a"); + assertFalse(queue.remove(entry)); + } + + public void testMultipleEntries() { + LockableConcurrentQueue queue = new LockableConcurrentQueue<>(LinkedList::new, 2); + LockableEntry a = new LockableEntry("a"); + LockableEntry b = new LockableEntry("b"); + LockableEntry c = new LockableEntry("c"); + + seedEntry(queue, a); + seedEntry(queue, b); + seedEntry(queue, c); + + int count = 0; + LockableEntry polled; + while ((polled = queue.lockAndPoll()) != null) { + assertTrue(polled.isHeldByCurrentThread()); + polled.unlock(); + count++; + } + assertEquals(3, count); + } + + public void testLockAndPollSkipsAlreadyLockedEntries() throws Exception { + LockableConcurrentQueue queue = new LockableConcurrentQueue<>(LinkedList::new, 1); + LockableEntry a = new LockableEntry("a"); + LockableEntry b = new LockableEntry("b"); + + seedEntry(queue, a); + seedEntry(queue, b); + + // Lock 'a' from a different thread so tryLock fails for the current thread + CountDownLatch locked = new CountDownLatch(1); + CountDownLatch release = new CountDownLatch(1); + Thread locker = new Thread(() -> { + a.lock(); + locked.countDown(); + try { + release.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + a.unlock(); + } + }); + locker.start(); + locked.await(); + + try { + LockableEntry polled = queue.lockAndPoll(); + assertNotNull(polled); + assertSame(b, polled); + assertTrue(polled.isHeldByCurrentThread()); + polled.unlock(); + } finally { + release.countDown(); + locker.join(); + } + } + + public void testConcurrentLockAndPollAndAddAndUnlock() throws Exception { + LockableConcurrentQueue queue = new LockableConcurrentQueue<>(LinkedList::new, 4); + int numEntries = 20; + for (int i = 0; i < numEntries; i++) { + LockableEntry entry = new LockableEntry("entry-" + i); + seedEntry(queue, entry); + } + + int numThreads = 4; + int opsPerThread = 50; + CyclicBarrier barrier = new CyclicBarrier(numThreads); + CountDownLatch latch = new CountDownLatch(numThreads); + AtomicInteger pollCount = new AtomicInteger(); + + for (int t = 0; t < numThreads; t++) { + new Thread(() -> { + try { + barrier.await(); + for (int i = 0; i < opsPerThread; i++) { + LockableEntry entry = queue.lockAndPoll(); + if (entry != null) { + pollCount.incrementAndGet(); + assertTrue(entry.isHeldByCurrentThread()); + // Return it — entry is already locked by lockAndPoll + queue.addAndUnlock(entry); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + latch.countDown(); + } + }).start(); + } + + latch.await(); + // All entries should still be in the queue (returned after use) + int remaining = 0; + LockableEntry entry; + while ((entry = queue.lockAndPoll()) != null) { + entry.unlock(); + remaining++; + } + assertEquals(numEntries, remaining); + } +} diff --git a/sandbox/plugins/composite-engine/README.md b/sandbox/plugins/composite-engine/README.md new file mode 100644 index 0000000000000..82a8fdfb44010 --- /dev/null +++ b/sandbox/plugins/composite-engine/README.md @@ -0,0 +1,32 @@ +# composite-engine + +Sandbox plugin that orchestrates multi-format indexing across multiple data format engines behind a single `IndexingExecutionEngine` interface. + +## What it does + +1. During `loadExtensions`, discovers all `DataFormatPlugin` implementations (e.g., Parquet, Arrow) via the `ExtensiblePlugin` SPI. +2. When composite indexing is enabled for an index, creates a `CompositeIndexingExecutionEngine` that delegates writes, refresh, and file management to each per-format engine. +3. Routes fields to the appropriate data format based on `FieldTypeCapabilities` declared by each format. +4. Manages a pool of `CompositeWriter` instances for concurrent indexing with lock-based checkout/release semantics. + +## Index settings + +| Setting | Default | Description | +|---|---|---| +| `index.composite.enabled` | `false` | Activates composite indexing for the index | +| `index.composite.primary_data_format` | `"lucene"` | The authoritative format used for merge operations | +| `index.composite.secondary_data_formats` | `[]` | Additional formats that receive writes alongside the primary | + +## How it fits in + +Format plugins (e.g., Parquet) extend this plugin by declaring `extendedPlugins = ['composite-engine']` in their `build.gradle` and implementing `DataFormatPlugin`. The `ExtensiblePlugin` SPI discovers them automatically during node bootstrap. + +## Key classes + +- **`CompositeEnginePlugin`** — The `ExtensiblePlugin` entry point. Discovers format plugins, validates settings, and creates the composite engine. +- **`CompositeIndexingExecutionEngine`** — Orchestrates indexing across primary and secondary format engines. +- **`CompositeDataFormat`** — A `DataFormat` that wraps multiple per-format instances. +- **`CompositeDocumentInput`** — Routes field additions to the appropriate per-format `DocumentInput` based on field type capabilities. +- **`CompositeWriter`** — A composite `Writer` that delegates write, flush, and sync to each per-format writer. +- **`CompositeDataFormatWriterPool`** — Thread-safe pool of `CompositeWriter` instances with lock-based checkout/release. +- **`RowIdGenerator`** — Generates monotonically increasing row IDs for cross-format document synchronization. diff --git a/sandbox/plugins/composite-engine/build.gradle b/sandbox/plugins/composite-engine/build.gradle new file mode 100644 index 0000000000000..8aa122a1e1e8e --- /dev/null +++ b/sandbox/plugins/composite-engine/build.gradle @@ -0,0 +1,18 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +opensearchplugin { + description = 'Composite indexing engine plugin that orchestrates multi-format indexing across multiple data format engines.' + classname = 'org.opensearch.composite.CompositeEnginePlugin' +} + +dependencies { + api project(':sandbox:libs:composite-engine-lib') + compileOnly project(':server') + testImplementation project(':test:framework') +} diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java new file mode 100644 index 0000000000000..c1f74fb60627a --- /dev/null +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.dataformat.FieldTypeCapabilities; + +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * A composite {@link DataFormat} that wraps multiple per-format {@link DataFormat} instances. + * Each constituent format retains its own {@link FieldTypeCapabilities} — field routing is + * handled per-format by {@link CompositeDocumentInput}, not by this class. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class CompositeDataFormat implements DataFormat { + + private final List dataFormats; + + /** + * Constructs a CompositeDataFormat from the given list of data formats. + * + * @param dataFormats the constituent data formats + */ + public CompositeDataFormat(List dataFormats) { + this.dataFormats = List.copyOf(Objects.requireNonNull(dataFormats, "dataFormats must not be null")); + } + + /** + * Constructs an empty CompositeDataFormat with no constituent formats. + */ + public CompositeDataFormat() { + this.dataFormats = List.of(); + } + + /** + * Returns the list of constituent data formats. + * + * @return the data formats + */ + public List getDataFormats() { + return dataFormats; + } + + @Override + public String name() { + return "composite"; + } + + @Override + public long priority() { + // In case some other format can independently support, + // the composite format should have the lowest priority + return Long.MIN_VALUE; + } + + @Override + public Set supportedFields() { + // Union of all constituent formats' supported fields + // TODO:: Post the changes done in mappings, we will relook this + if (dataFormats.isEmpty()) { + return Set.of(); + } + return dataFormats.get(0).supportedFields(); + } + + @Override + public String toString() { + return "CompositeDataFormat{dataFormats=" + dataFormats + '}'; + } +} diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormatWriterPool.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormatWriterPool.java new file mode 100644 index 0000000000000..48871039ab76e --- /dev/null +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormatWriterPool.java @@ -0,0 +1,151 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.composite.queue.LockableConcurrentQueue; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.function.Supplier; + +/** + * A pool of {@link CompositeWriter} instances backed by a {@link LockableConcurrentQueue}. + * Writers are locked on checkout and unlocked on release, ensuring thread-safe reuse. + * + * @opensearch.experimental + */ +public class CompositeDataFormatWriterPool implements Iterable, Closeable { + + private final Set writers; + private final LockableConcurrentQueue availableWriters; + private final Supplier writerSupplier; + private volatile boolean closed; + + /** + * Creates a new writer pool. + * + * @param writerSupplier factory for creating new {@link CompositeWriter} instances + * @param queueSupplier supplier for the underlying queue instances + * @param concurrency the concurrency level (number of stripes) + */ + public CompositeDataFormatWriterPool( + Supplier writerSupplier, + Supplier> queueSupplier, + int concurrency + ) { + this.writers = Collections.newSetFromMap(new IdentityHashMap<>()); + this.writerSupplier = writerSupplier; + this.availableWriters = new LockableConcurrentQueue<>(queueSupplier, concurrency); + } + + /** + * This method is used by CompositeIndexingExecutionEngine to grab a writer from the pool to perform an indexing + * operation. + * + * @return a pooled CompositeWriter if available, or a newly created instance if none are available + */ + public CompositeWriter getAndLock() { + ensureOpen(); + CompositeWriter writer = availableWriters.lockAndPoll(); + return Objects.requireNonNullElseGet(writer, this::fetchWriter); + } + + /** + * Create a new {@link CompositeWriter} to be added to this pool. + * + * @return a new instance of {@link CompositeWriter} + */ + private synchronized CompositeWriter fetchWriter() { + ensureOpen(); + CompositeWriter writer = writerSupplier.get(); + writer.lock(); + writers.add(writer); + return writer; + } + + /** + * Release the given {@link CompositeWriter} to this pool for reuse if it is currently managed by this + * pool. + * + * @param state {@link CompositeWriter} to release to the pool. + */ + public void releaseAndUnlock(CompositeWriter state) { + assert !state.isFlushPending() && !state.isAborted() : "CompositeWriter has pending flush: " + + state.isFlushPending() + + " aborted=" + + state.isAborted(); + assert isRegistered(state) : "CompositeDocumentWriterPool doesn't know about this CompositeWriter"; + availableWriters.addAndUnlock(state); + } + + /** + * Lock and checkout all CompositeWriters from the pool for flush. + * + * @return Unmodifiable list of all CompositeWriters locked by current thread. + */ + public List checkoutAll() { + ensureOpen(); + List lockedWriters = new ArrayList<>(); + List checkedOutWriters = new ArrayList<>(); + for (CompositeWriter writer : this) { + writer.lock(); + lockedWriters.add(writer); + } + synchronized (this) { + for (CompositeWriter writer : lockedWriters) { + try { + // Release this writer if it's no longer managed by this pool; otherwise, check it out. + if (isRegistered(writer) && writers.remove(writer)) { + availableWriters.remove(writer); + writer.setFlushPending(); + checkedOutWriters.add(writer); + } + } finally { + writer.unlock(); + } + } + } + return Collections.unmodifiableList(checkedOutWriters); + } + + /** + * Check if {@link CompositeWriter} is part of this pool. + * + * @param perThread {@link CompositeWriter} to validate. + * @return true if {@link CompositeWriter} is part of this pool, false otherwise. + */ + synchronized boolean isRegistered(CompositeWriter perThread) { + return writers.contains(perThread); + } + + private void ensureOpen() { + if (closed) { + throw new AlreadyClosedException("CompositeDocumentWriterPool is already closed"); + } + } + + @Override + public synchronized Iterator iterator() { + return List.copyOf(writers).iterator(); + } + + @Override + public void close() throws IOException { + this.closed = true; + } +} diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDocumentInput.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDocumentInput.java new file mode 100644 index 0000000000000..ff7ec0593249d --- /dev/null +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDocumentInput.java @@ -0,0 +1,111 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.dataformat.DocumentInput; +import org.opensearch.index.mapper.MappedFieldType; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * A composite {@link DocumentInput} that wraps one {@link DocumentInput} per registered + * data format and broadcasts all field additions to every per-format input. + *

+ * Metadata operations ({@code setRowId}, {@code setVersion}, {@code setSeqNo}, + * {@code setPrimaryTerm}) and field additions are broadcast to all per-format inputs. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class CompositeDocumentInput implements DocumentInput>> { + + private final DocumentInput primaryDocumentInput; + private final DataFormat primaryFormat; + private final Map> secondaryDocumentInputs; + private final Runnable onClose; + + /** + * Constructs a CompositeDocumentInput with a primary format input and secondary format inputs. + * + * @param primaryFormat the primary data format + * @param primaryDocumentInput the document input for the primary format + * @param secondaryDocumentInputs a map of secondary data formats to their corresponding document inputs + * @param onClose callback invoked when this composite input is closed, typically to release the writer back to the pool + */ + public CompositeDocumentInput( + DataFormat primaryFormat, + DocumentInput primaryDocumentInput, + Map> secondaryDocumentInputs, + Runnable onClose + ) { + this.primaryFormat = Objects.requireNonNull(primaryFormat, "primaryFormat must not be null"); + this.primaryDocumentInput = Objects.requireNonNull(primaryDocumentInput, "primaryDocumentInput must not be null"); + this.secondaryDocumentInputs = Map.copyOf( + Objects.requireNonNull(secondaryDocumentInputs, "secondaryDocumentInputs must not be null") + ); + this.onClose = Objects.requireNonNull(onClose, "onClose must not be null"); + } + + @Override + public void addField(MappedFieldType fieldType, Object value) { + primaryDocumentInput.addField(fieldType, value); + for (DocumentInput input : secondaryDocumentInputs.values()) { + input.addField(fieldType, value); + } + } + + @Override + public void setRowId(String rowIdFieldName, long rowId) { + primaryDocumentInput.setRowId(rowIdFieldName, rowId); + for (DocumentInput input : secondaryDocumentInputs.values()) { + input.setRowId(rowIdFieldName, rowId); + } + } + + @Override + public List> getFinalInput() { + return null; + } + + @Override + public void close() { + onClose.run(); + } + + /** + * Returns the primary format's document input. + * + * @return the primary document input + */ + public DocumentInput getPrimaryInput() { + return primaryDocumentInput; + } + + /** + * Returns the primary data format. + * + * @return the primary data format + */ + public DataFormat getPrimaryFormat() { + return primaryFormat; + } + + /** + * Returns an unmodifiable map of secondary data formats to their document inputs. + * + * @return the secondary inputs + */ + public Map> getSecondaryInputs() { + return secondaryDocumentInputs; + } +} diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java new file mode 100644 index 0000000000000..8836409a9af72 --- /dev/null +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java @@ -0,0 +1,205 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.dataformat.DataFormatPlugin; +import org.opensearch.index.engine.dataformat.DocumentInput; +import org.opensearch.index.engine.dataformat.IndexingExecutionEngine; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.plugins.ExtensiblePlugin; +import org.opensearch.plugins.Plugin; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Sandbox plugin that provides a {@link CompositeIndexingExecutionEngine} for + * orchestrating multi-format indexing. Discovers {@link DataFormatPlugin} instances + * during node bootstrap via the {@link ExtensiblePlugin} SPI and creates a composite + * engine when composite indexing is enabled for an index. + *

+ * Registers two index settings: + *

    + *
  • {@code index.composite.enabled} — activates composite indexing (default {@code false})
  • + *
  • {@code index.composite.primary_data_format} — designates the primary format (default {@code "lucene"})
  • + *
+ *

+ * Format plugins (e.g., Parquet) extend this plugin by declaring + * {@code extendedPlugins = ['composite-engine']} in their {@code build.gradle} + * and implementing {@link DataFormatPlugin}. The {@link ExtensiblePlugin} SPI + * discovers them automatically during node bootstrap. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class CompositeEnginePlugin extends Plugin implements ExtensiblePlugin, DataFormatPlugin { + + private static final Logger logger = LogManager.getLogger(CompositeEnginePlugin.class); + + /** + * Index setting to enable composite indexing for an index. + * When {@code true}, the composite engine orchestrates writes across all registered data formats. + * Validates that the primary data format is non-empty when enabled. + */ + public static final Setting COMPOSITE_ENABLED = Setting.boolSetting( + "index.composite.enabled", + false, + new Setting.Validator<>() { + @Override + public void validate(Boolean value) {} + + @Override + public void validate(Boolean enabled, Map, Object> settings) { + if (enabled) { + String primary = (String) settings.get(PRIMARY_DATA_FORMAT); + if (primary == null || primary.isEmpty()) { + throw new IllegalArgumentException( + "[index.composite.enabled] requires [index.composite.primary_data_format] to be set" + ); + } + } + } + + @Override + public Iterator> settings() { + return List.>of(PRIMARY_DATA_FORMAT).iterator(); + } + }, + Setting.Property.IndexScope + ); + + /** + * Index setting that designates the primary data format for an index. + * The primary format is the authoritative format used for merge operations. + */ + public static final Setting PRIMARY_DATA_FORMAT = Setting.simpleString( + "index.composite.primary_data_format", + "lucene", + Setting.Property.IndexScope + ); + + /** + * Index setting that lists the secondary data formats for an index. + * Secondary formats receive writes alongside the primary but are not used + * as the merge authority. + */ + public static final Setting> SECONDARY_DATA_FORMATS = Setting.listSetting( + "index.composite.secondary_data_formats", + Collections.emptyList(), + s -> s, + Setting.Property.IndexScope + ); + + /** + * Discovered {@link DataFormatPlugin} instances keyed by format name. + * When multiple plugins declare the same format name, the one with the highest + * {@link DataFormat#priority()} is retained. + */ + private volatile Map dataFormatPlugins = Map.of(); + + /** Creates a new composite engine plugin. */ + public CompositeEnginePlugin() {} + + @Override + public void loadExtensions(ExtensionLoader loader) { + List formatPlugins = loader.loadExtensions(DataFormatPlugin.class); + Map registry = new HashMap<>(); + for (DataFormatPlugin plugin : formatPlugins) { + DataFormat format = plugin.getDataFormat(); + if (format == null) { + logger.warn("DataFormatPlugin [{}] returned null DataFormat, skipping", plugin.getClass().getName()); + continue; + } + String name = format.name(); + DataFormatPlugin existing = registry.get(name); + if (existing != null) { + long existingPriority = existing.getDataFormat().priority(); + if (format.priority() <= existingPriority) { + logger.debug( + "Skipping DataFormatPlugin [{}] for format [{}] (priority {} <= existing {})", + plugin.getClass().getName(), + name, + format.priority(), + existingPriority + ); + continue; + } + logger.info( + "Replacing DataFormatPlugin for format [{}] (priority {} > existing {})", + name, + format.priority(), + existingPriority + ); + } + registry.put(name, plugin); + logger.info("Registered DataFormatPlugin [{}] for format [{}]", plugin.getClass().getName(), name); + } + this.dataFormatPlugins = Collections.unmodifiableMap(registry); + } + + @Override + public List> getSettings() { + return List.of(COMPOSITE_ENABLED, PRIMARY_DATA_FORMAT, SECONDARY_DATA_FORMATS); + } + + @Override + public void onIndexModule(IndexModule indexModule) { + Settings settings = indexModule.getSettings(); + boolean compositeEnabled = COMPOSITE_ENABLED.get(settings); + if (compositeEnabled == false) { + return; + } + + String primaryFormatName = PRIMARY_DATA_FORMAT.get(settings); + List secondaryFormatNames = SECONDARY_DATA_FORMATS.get(settings); + CompositeIndexingExecutionEngine.validateFormatsRegistered(dataFormatPlugins, primaryFormatName, secondaryFormatNames); + } + + @Override + public DataFormat getDataFormat() { + // TODO: Dataformat for Composite is per index, while this one talks about cluster level. Switching it off for now + return null; + } + + @Override + @SuppressWarnings("unchecked") + public > IndexingExecutionEngine indexingEngine( + MapperService mapperService, + ShardPath shardPath, + IndexSettings indexSettings + ) { + return (IndexingExecutionEngine) new CompositeIndexingExecutionEngine( + dataFormatPlugins, + indexSettings, + mapperService, + shardPath + ); + } + + /** + * Returns the discovered data format plugins keyed by format name. + * + * @return unmodifiable map of format name to plugin + */ + public Map getDataFormatPlugins() { + return dataFormatPlugins; + } +} diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java new file mode 100644 index 0000000000000..0208d8d21da22 --- /dev/null +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java @@ -0,0 +1,283 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.dataformat.DataFormatPlugin; +import org.opensearch.index.engine.dataformat.DocumentInput; +import org.opensearch.index.engine.dataformat.FileInfos; +import org.opensearch.index.engine.dataformat.IndexingExecutionEngine; +import org.opensearch.index.engine.dataformat.Merger; +import org.opensearch.index.engine.dataformat.RefreshInput; +import org.opensearch.index.engine.dataformat.RefreshResult; +import org.opensearch.index.engine.dataformat.Writer; +import org.opensearch.index.engine.exec.Segment; +import org.opensearch.index.engine.exec.WriterFileSet; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.shard.ShardPath; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A composite {@link IndexingExecutionEngine} that orchestrates indexing across + * multiple per-format engines behind a single interface. + *

+ * The engine delegates writer creation, refresh, file deletion, and document input + * creation to each per-format engine. A primary engine is designated based on the + * configured primary format name and is used for merge operations. + *

+ * The composite {@link DataFormat} exposed by this engine represents the union of + * all per-format supported field type capabilities. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class CompositeIndexingExecutionEngine implements IndexingExecutionEngine { + + private static final Logger logger = LogManager.getLogger(CompositeIndexingExecutionEngine.class); + + private final IndexingExecutionEngine primaryEngine; + private final List> secondaryEngines; + private final CompositeDataFormat compositeDataFormat; + private final CompositeDataFormatWriterPool dataFormatWriterPool; + private final AtomicLong writerGenerationCounter; + + /** + * Constructs a CompositeIndexingExecutionEngine by reading index settings to + * determine the primary and secondary data formats, validating that all configured + * formats are registered, and creating per-format engines via the discovered + * {@link DataFormatPlugin} instances. + *

+ * The primary engine is the authoritative format used for merge operations and + * commit coordination. Secondary engines receive writes alongside the primary but + * are not used as the merge authority. + * + * @param dataFormatPlugins the discovered data format plugins keyed by format name + * @param indexSettings the index settings containing composite configuration + * @param mapperService the mapper service for field mapping resolution + * @param shardPath the shard path for file storage + * @throws IllegalStateException if composite indexing is not enabled + * @throws IllegalArgumentException if any configured format is not registered + */ + public CompositeIndexingExecutionEngine( + Map dataFormatPlugins, + IndexSettings indexSettings, + MapperService mapperService, + ShardPath shardPath + ) { + Objects.requireNonNull(dataFormatPlugins, "dataFormatPlugins must not be null"); + Objects.requireNonNull(indexSettings, "indexSettings must not be null"); + + Settings settings = indexSettings.getSettings(); + boolean compositeEnabled = CompositeEnginePlugin.COMPOSITE_ENABLED.get(settings); + if (compositeEnabled == false) { + throw new IllegalStateException("Composite indexing is not enabled for index [" + indexSettings.getIndex().getName() + "]"); + } + + String primaryFormatName = CompositeEnginePlugin.PRIMARY_DATA_FORMAT.get(settings); + List secondaryFormatNames = CompositeEnginePlugin.SECONDARY_DATA_FORMATS.get(settings); + + validateFormatsRegistered(dataFormatPlugins, primaryFormatName, secondaryFormatNames); + + List allFormats = new ArrayList<>(); + DataFormatPlugin primaryPlugin = dataFormatPlugins.get(primaryFormatName); + this.primaryEngine = primaryPlugin.indexingEngine(mapperService, shardPath, indexSettings); + allFormats.add(primaryPlugin.getDataFormat()); + + List> secondaries = new ArrayList<>(); + for (String secondaryName : secondaryFormatNames) { + if (secondaryName.equals(primaryFormatName)) { + logger.warn("Secondary data format [{}] is the same as primary, skipping duplicate", secondaryName); + continue; + } + DataFormatPlugin secondaryPlugin = dataFormatPlugins.get(secondaryName); + secondaries.add(secondaryPlugin.indexingEngine(mapperService, shardPath, indexSettings)); + allFormats.add(secondaryPlugin.getDataFormat()); + } + this.secondaryEngines = List.copyOf(secondaries); + + // Register CompositeDataFormat with the primary and secondary dataformats for that index + this.compositeDataFormat = new CompositeDataFormat(allFormats); + this.writerGenerationCounter = new AtomicLong(0); + this.dataFormatWriterPool = new CompositeDataFormatWriterPool( + () -> new CompositeWriter(this, writerGenerationCounter.getAndIncrement()), + ConcurrentLinkedQueue::new, + Runtime.getRuntime().availableProcessors() + ); + } + + /** + * Validates that the primary and all secondary data format plugins are registered. + * + * @param dataFormatPlugins the discovered data format plugins keyed by format name + * @param primaryFormatName the configured primary format name + * @param secondaryFormatNames the configured secondary format names + * @throws IllegalArgumentException if any configured format is not registered + */ + static void validateFormatsRegistered( + Map dataFormatPlugins, + String primaryFormatName, + List secondaryFormatNames + ) { + if (dataFormatPlugins.containsKey(primaryFormatName) == false) { + throw new IllegalArgumentException( + "Primary data format [" + + primaryFormatName + + "] is not registered on this node. Available formats: " + + dataFormatPlugins.keySet() + ); + } + for (String secondaryName : secondaryFormatNames) { + if (secondaryName.equals(primaryFormatName)) { + throw new IllegalStateException( + "Secondary data format [" + secondaryName + "] is the same as primary :[" + primaryFormatName + "]" + ); + } + if (dataFormatPlugins.containsKey(secondaryName) == false) { + throw new IllegalArgumentException( + "Secondary data format [" + + secondaryName + + "] is not registered on this node. Available formats: " + + dataFormatPlugins.keySet() + ); + } + } + } + + @Override + public Writer createWriter(long writerGeneration) { + return new CompositeWriter(this, writerGeneration); + } + + @Override + public Merger getMerger() { + return primaryEngine.getMerger(); + } + + @Override + public RefreshResult refresh(RefreshInput refreshInput) throws IOException { + List dataFormatWriters = dataFormatWriterPool.checkoutAll(); + List refreshedSegments = new ArrayList<>(refreshInput.existingSegments()); + List newSegmentList = new ArrayList<>(); + + logger.debug( + "Refreshing composite engine: flushing {} writers, existing segments={}", + dataFormatWriters.size(), + refreshedSegments.size() + ); + + // Flush each writer to disk and build segments from the file infos + for (CompositeWriter writer : dataFormatWriters) { + FileInfos fileInfos = writer.flush(); + Segment.Builder segmentBuilder = Segment.builder(writer.getWriterGeneration()); + boolean hasFiles = false; + for (Map.Entry entry : fileInfos.writerFilesMap().entrySet()) { + logger.debug( + "Writer gen={} flushed format=[{}] files={}", + writer.getWriterGeneration(), + entry.getKey().name(), + entry.getValue().files() + ); + segmentBuilder.addSearchableFiles(entry.getKey(), entry.getValue()); + hasFiles = true; + } + writer.close(); + if (hasFiles) { + newSegmentList.add(segmentBuilder.build()); + } + } + + if (newSegmentList.isEmpty()) { + logger.debug("No new segments produced from flush"); + return null; + } + + logger.debug("Produced {} new segments from flush", newSegmentList.size()); + refreshedSegments.addAll(newSegmentList); + + // Delegate refresh to each per-format engine + RefreshInput emptyInput = RefreshInput.builder().build(); + primaryEngine.refresh(emptyInput); + for (IndexingExecutionEngine engine : secondaryEngines) { + engine.refresh(emptyInput); + } + + return new RefreshResult(refreshedSegments); + } + + @Override + public CompositeDataFormat getDataFormat() { + return compositeDataFormat; + } + + @Override + public long getNativeBytesUsed() { + long total = primaryEngine.getNativeBytesUsed(); + for (IndexingExecutionEngine engine : secondaryEngines) { + total += engine.getNativeBytesUsed(); + } + return total; + } + + @Override + public void deleteFiles(Map> filesToDelete) throws IOException { + primaryEngine.deleteFiles(filesToDelete); + for (IndexingExecutionEngine engine : secondaryEngines) { + engine.deleteFiles(filesToDelete); + } + } + + @Override + public CompositeDocumentInput newDocumentInput() { + CompositeWriter writer = dataFormatWriterPool.getAndLock(); + DocumentInput primaryInput = primaryEngine.newDocumentInput(); + Map> secondaryInputMap = new LinkedHashMap<>(); + for (IndexingExecutionEngine engine : secondaryEngines) { + secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput()); + } + return new CompositeDocumentInput( + primaryEngine.getDataFormat(), + primaryInput, + secondaryInputMap, + () -> dataFormatWriterPool.releaseAndUnlock(writer) + ); + } + + /** + * Returns the primary delegate engine. + * + * @return the primary engine + */ + public IndexingExecutionEngine getPrimaryDelegate() { + return primaryEngine; + } + + /** + * Returns the secondary delegate engines. + * + * @return the secondary engines + */ + public List> getSecondaryDelegates() { + return secondaryEngines; + } + +} diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java new file mode 100644 index 0000000000000..ccf91af29c50f --- /dev/null +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java @@ -0,0 +1,223 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.dataformat.DocumentInput; +import org.opensearch.index.engine.dataformat.FileInfos; +import org.opensearch.index.engine.dataformat.IndexingExecutionEngine; +import org.opensearch.index.engine.dataformat.WriteResult; +import org.opensearch.index.engine.dataformat.Writer; +import org.opensearch.index.engine.exec.WriterFileSet; + +import java.io.IOException; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A composite {@link Writer} that wraps one {@link Writer} per registered data format + * and delegates write operations to each per-format writer. + *

+ * Constructed from a {@link CompositeIndexingExecutionEngine}, it iterates the engine's + * delegates to create per-format writers. The primary format's writer is always first + * in the {@code writers} list. A {@code postWrite} callback releases this writer back + * to the pool after each write cycle. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class CompositeWriter implements Writer, Lock { + + private static final Logger logger = LogManager.getLogger(CompositeWriter.class); + + private final Map.Entry>> primaryWriter; + private final List>>> secondaryWriters; + private final ReentrantLock lock; + private final long writerGeneration; + private final RowIdGenerator rowIdGenerator; + private volatile boolean aborted; + private volatile boolean flushPending; + + /** + * Constructs a CompositeWriter from the given engine and writer generation. + *

+ * Creates a per-format {@link Writer} for each delegate engine (primary first, + * then secondaries). The {@code postWrite} callback releases this writer back + * to the engine's writer pool. + * + * @param engine the composite indexing execution engine + * @param writerGeneration the writer generation number + */ + @SuppressWarnings("unchecked") + public CompositeWriter(CompositeIndexingExecutionEngine engine, long writerGeneration) { + this.lock = new ReentrantLock(); + this.aborted = false; + this.flushPending = false; + this.writerGeneration = writerGeneration; + + IndexingExecutionEngine primaryDelegate = engine.getPrimaryDelegate(); + this.primaryWriter = new AbstractMap.SimpleImmutableEntry<>( + primaryDelegate.getDataFormat(), + (Writer>) primaryDelegate.createWriter(writerGeneration) + ); + + List>>> secondaries = new ArrayList<>(); + for (IndexingExecutionEngine delegate : engine.getSecondaryDelegates()) { + secondaries.add( + new AbstractMap.SimpleImmutableEntry<>( + delegate.getDataFormat(), + (Writer>) delegate.createWriter(writerGeneration) + ) + ); + } + this.secondaryWriters = List.copyOf(secondaries); + this.rowIdGenerator = new RowIdGenerator(CompositeWriter.class.getName()); + } + + @Override + public WriteResult addDoc(CompositeDocumentInput doc) throws IOException { + // Write to primary first + WriteResult primaryResult = primaryWriter.getValue().addDoc(doc.getPrimaryInput()); + switch (primaryResult) { + case WriteResult.Success s -> logger.trace("Successfully added document in primary format [{}]", primaryWriter.getKey().name()); + case WriteResult.Failure f -> { + logger.debug("Failed to add document in primary format [{}]", primaryWriter.getKey().name()); + return primaryResult; + } + } + + // Then write to each secondary + List> secondaryInputs = new ArrayList<>(doc.getSecondaryInputs().values()); + for (int i = 0; i < secondaryWriters.size(); i++) { + Map.Entry>> entry = secondaryWriters.get(i); + DocumentInput input = secondaryInputs.get(i); + WriteResult result = entry.getValue().addDoc(input); + switch (result) { + case WriteResult.Success s -> logger.trace("Successfully added document in secondary format [{}]", entry.getKey().name()); + case WriteResult.Failure f -> { + logger.debug("Failed to add document in secondary format [{}]", entry.getKey().name()); + return result; + } + } + } + + return primaryResult; + } + + @Override + public FileInfos flush() throws IOException { + FileInfos.Builder builder = FileInfos.builder(); + // Flush primary + Optional primaryWfs = primaryWriter.getValue().flush().getWriterFileSet(primaryWriter.getKey()); + primaryWfs.ifPresent(writerFileSet -> builder.putWriterFileSet(primaryWriter.getKey(), writerFileSet)); + // Flush secondaries + for (Map.Entry>> entry : secondaryWriters) { + Optional wfs = entry.getValue().flush().getWriterFileSet(entry.getKey()); + wfs.ifPresent(writerFileSet -> builder.putWriterFileSet(entry.getKey(), writerFileSet)); + } + return builder.build(); + } + + @Override + public void sync() throws IOException { + primaryWriter.getValue().sync(); + for (Map.Entry>> entry : secondaryWriters) { + entry.getValue().sync(); + } + } + + @Override + public void close() throws IOException { + primaryWriter.getValue().close(); + for (Map.Entry>> entry : secondaryWriters) { + entry.getValue().close(); + } + } + + /** + * Returns the writer generation number. + * + * @return the writer generation + */ + public long getWriterGeneration() { + return writerGeneration; + } + + /** + * Marks this writer as aborted. + */ + public void abort() { + this.aborted = true; + } + + /** + * Returns whether this writer has been aborted. + * + * @return {@code true} if aborted + */ + public boolean isAborted() { + return aborted; + } + + /** + * Marks this writer as having a pending flush. + */ + public void setFlushPending() { + this.flushPending = true; + } + + /** + * Returns whether this writer has a pending flush. + * + * @return {@code true} if a flush is pending + */ + public boolean isFlushPending() { + return flushPending; + } + + @Override + public void lock() { + lock.lock(); + } + + @Override + public void lockInterruptibly() throws InterruptedException { + lock.lockInterruptibly(); + } + + @Override + public boolean tryLock() { + return lock.tryLock(); + } + + @Override + public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + return lock.tryLock(time, unit); + } + + @Override + public void unlock() { + lock.unlock(); + } + + @Override + public Condition newCondition() { + throw new UnsupportedOperationException(); + } +} diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/RowIdGenerator.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/RowIdGenerator.java new file mode 100644 index 0000000000000..9f6c223ddb5db --- /dev/null +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/RowIdGenerator.java @@ -0,0 +1,64 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.opensearch.common.annotation.ExperimentalApi; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Generates monotonically increasing row IDs for cross-format document synchronization. + * Each writer instance gets its own {@code RowIdGenerator} so that row IDs are unique + * within a writer's segment scope. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class RowIdGenerator { + + private final String source; + private final AtomicLong counter; + + /** + * Constructs a RowIdGenerator with the given source identifier. + * + * @param source a human-readable label identifying the generator's owner (e.g. class name) + */ + public RowIdGenerator(String source) { + this.source = source; + this.counter = new AtomicLong(0); + } + + /** + * Returns the next row ID. + * + * @return the next monotonically increasing row ID + */ + public long nextRowId() { + return counter.getAndIncrement(); + } + + /** + * Returns the current row ID value without incrementing. + * + * @return the current row ID + */ + public long currentRowId() { + return counter.get(); + } + + /** + * Returns the source identifier for this generator. + * + * @return the source label + */ + public String getSource() { + return source; + } +} diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/package-info.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/package-info.java new file mode 100644 index 0000000000000..216fe2c5469ae --- /dev/null +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/package-info.java @@ -0,0 +1,17 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Composite indexing engine plugin that orchestrates multi-format indexing + * across multiple data format engines. + * @opensearch.experimental + */ +@ExperimentalApi +package org.opensearch.composite; + +import org.opensearch.common.annotation.ExperimentalApi; diff --git a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDataFormatTests.java b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDataFormatTests.java new file mode 100644 index 0000000000000..b6be1f41767d9 --- /dev/null +++ b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDataFormatTests.java @@ -0,0 +1,98 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.dataformat.FieldTypeCapabilities; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.List; +import java.util.Set; + +/** + * Tests for {@link CompositeDataFormat}. + */ +public class CompositeDataFormatTests extends OpenSearchTestCase { + + public void testNameReturnsComposite() { + CompositeDataFormat format = new CompositeDataFormat(List.of(mockFormat("lucene", 1, Set.of()))); + assertEquals("composite", format.name()); + } + + public void testPriorityReturnsMinValue() { + CompositeDataFormat format = new CompositeDataFormat(List.of(mockFormat("lucene", 1, Set.of()))); + assertEquals(Long.MIN_VALUE, format.priority()); + } + + public void testDefaultConstructorReturnsEmptyFormats() { + CompositeDataFormat format = new CompositeDataFormat(); + assertTrue(format.getDataFormats().isEmpty()); + assertEquals(Set.of(), format.supportedFields()); + } + + public void testSupportedFieldsDelegatesToFirstFormat() { + FieldTypeCapabilities cap1 = new FieldTypeCapabilities("keyword", Set.of(FieldTypeCapabilities.Capability.FULL_TEXT_SEARCH)); + FieldTypeCapabilities cap2 = new FieldTypeCapabilities("integer", Set.of(FieldTypeCapabilities.Capability.COLUMNAR_STORAGE)); + DataFormat primary = mockFormat("lucene", 1, Set.of(cap1)); + DataFormat secondary = mockFormat("parquet", 2, Set.of(cap2)); + + CompositeDataFormat composite = new CompositeDataFormat(List.of(primary, secondary)); + // supportedFields() returns the first format's fields + assertEquals(Set.of(cap1), composite.supportedFields()); + } + + public void testSupportedFieldsEmptyWhenNoFormats() { + CompositeDataFormat composite = new CompositeDataFormat(List.of()); + assertEquals(Set.of(), composite.supportedFields()); + } + + public void testGetDataFormatsReturnsAllFormats() { + DataFormat f1 = mockFormat("lucene", 1, Set.of()); + DataFormat f2 = mockFormat("parquet", 2, Set.of()); + CompositeDataFormat composite = new CompositeDataFormat(List.of(f1, f2)); + assertEquals(2, composite.getDataFormats().size()); + assertSame(f1, composite.getDataFormats().get(0)); + assertSame(f2, composite.getDataFormats().get(1)); + } + + public void testGetDataFormatsIsUnmodifiable() { + CompositeDataFormat composite = new CompositeDataFormat(List.of(mockFormat("lucene", 1, Set.of()))); + expectThrows(UnsupportedOperationException.class, () -> composite.getDataFormats().add(mockFormat("x", 0, Set.of()))); + } + + public void testConstructorRejectsNull() { + expectThrows(NullPointerException.class, () -> new CompositeDataFormat(null)); + } + + public void testToStringContainsClassName() { + CompositeDataFormat composite = new CompositeDataFormat(List.of(mockFormat("lucene", 1, Set.of()))); + String str = composite.toString(); + assertTrue(str.contains("CompositeDataFormat")); + assertTrue(str.contains("dataFormats=")); + } + + private DataFormat mockFormat(String name, long priority, Set fields) { + return new DataFormat() { + @Override + public String name() { + return name; + } + + @Override + public long priority() { + return priority; + } + + @Override + public Set supportedFields() { + return fields; + } + }; + } +} diff --git a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDataFormatWriterPoolTests.java b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDataFormatWriterPoolTests.java new file mode 100644 index 0000000000000..bcc05c5139e87 --- /dev/null +++ b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDataFormatWriterPoolTests.java @@ -0,0 +1,173 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Tests for {@link CompositeDataFormatWriterPool}. + */ +public class CompositeDataFormatWriterPoolTests extends OpenSearchTestCase { + + private CompositeIndexingExecutionEngine engine; + + @Override + public void setUp() throws Exception { + super.setUp(); + engine = CompositeTestHelper.createStubEngine("lucene"); + } + + public void testGetAndLockReturnsNewWriterWhenPoolEmpty() { + AtomicLong gen = new AtomicLong(0); + CompositeDataFormatWriterPool pool = new CompositeDataFormatWriterPool( + () -> new CompositeWriter(engine, gen.getAndIncrement()), + ConcurrentLinkedQueue::new, + 2 + ); + + CompositeWriter writer = pool.getAndLock(); + assertNotNull(writer); + assertEquals(0L, writer.getWriterGeneration()); + pool.releaseAndUnlock(writer); + } + + public void testReleaseAndUnlockMakesWriterReusable() { + AtomicLong gen = new AtomicLong(0); + CompositeDataFormatWriterPool pool = new CompositeDataFormatWriterPool( + () -> new CompositeWriter(engine, gen.getAndIncrement()), + ConcurrentLinkedQueue::new, + 2 + ); + + CompositeWriter writer = pool.getAndLock(); + pool.releaseAndUnlock(writer); + + CompositeWriter reused = pool.getAndLock(); + assertSame(writer, reused); + pool.releaseAndUnlock(reused); + } + + public void testGetAndLockThrowsWhenClosed() throws IOException { + CompositeDataFormatWriterPool pool = new CompositeDataFormatWriterPool( + () -> new CompositeWriter(engine, 0), + ConcurrentLinkedQueue::new, + 2 + ); + pool.close(); + expectThrows(AlreadyClosedException.class, pool::getAndLock); + } + + public void testCheckoutAllReturnsRegisteredWritersWithFlushPending() { + AtomicLong gen = new AtomicLong(0); + CompositeDataFormatWriterPool pool = new CompositeDataFormatWriterPool( + () -> new CompositeWriter(engine, gen.getAndIncrement()), + ConcurrentLinkedQueue::new, + 4 + ); + + // Create two writers and release them back + CompositeWriter w1 = pool.getAndLock(); + CompositeWriter w2 = pool.getAndLock(); + pool.releaseAndUnlock(w1); + pool.releaseAndUnlock(w2); + + List checkedOut = pool.checkoutAll(); + assertEquals(2, checkedOut.size()); + for (CompositeWriter w : checkedOut) { + assertTrue(w.isFlushPending()); + } + } + + public void testCheckoutAllReturnsEmptyWhenNoWriters() { + CompositeDataFormatWriterPool pool = new CompositeDataFormatWriterPool( + () -> new CompositeWriter(engine, 0), + ConcurrentLinkedQueue::new, + 2 + ); + + List checkedOut = pool.checkoutAll(); + assertTrue(checkedOut.isEmpty()); + } + + public void testCheckoutAllReturnsUnmodifiableList() { + AtomicLong gen = new AtomicLong(0); + CompositeDataFormatWriterPool pool = new CompositeDataFormatWriterPool( + () -> new CompositeWriter(engine, gen.getAndIncrement()), + ConcurrentLinkedQueue::new, + 2 + ); + + CompositeWriter w = pool.getAndLock(); + pool.releaseAndUnlock(w); + + List checkedOut = pool.checkoutAll(); + expectThrows(UnsupportedOperationException.class, () -> checkedOut.add(new CompositeWriter(engine, 99))); + } + + public void testIsRegisteredReturnsTrueForPooledWriter() { + AtomicLong gen = new AtomicLong(0); + CompositeDataFormatWriterPool pool = new CompositeDataFormatWriterPool( + () -> new CompositeWriter(engine, gen.getAndIncrement()), + ConcurrentLinkedQueue::new, + 2 + ); + + CompositeWriter writer = pool.getAndLock(); + assertTrue(pool.isRegistered(writer)); + pool.releaseAndUnlock(writer); + } + + public void testIsRegisteredReturnsFalseForUnknownWriter() { + CompositeDataFormatWriterPool pool = new CompositeDataFormatWriterPool( + () -> new CompositeWriter(engine, 0), + ConcurrentLinkedQueue::new, + 2 + ); + + CompositeWriter unknown = new CompositeWriter(engine, 99); + assertFalse(pool.isRegistered(unknown)); + } + + public void testCheckoutAllThrowsWhenClosed() throws IOException { + CompositeDataFormatWriterPool pool = new CompositeDataFormatWriterPool( + () -> new CompositeWriter(engine, 0), + ConcurrentLinkedQueue::new, + 2 + ); + pool.close(); + expectThrows(AlreadyClosedException.class, pool::checkoutAll); + } + + public void testIteratorReturnsSnapshotOfWriters() { + AtomicLong gen = new AtomicLong(0); + CompositeDataFormatWriterPool pool = new CompositeDataFormatWriterPool( + () -> new CompositeWriter(engine, gen.getAndIncrement()), + ConcurrentLinkedQueue::new, + 4 + ); + + CompositeWriter w1 = pool.getAndLock(); + CompositeWriter w2 = pool.getAndLock(); + pool.releaseAndUnlock(w1); + pool.releaseAndUnlock(w2); + + int count = 0; + for (CompositeWriter w : pool) { + assertNotNull(w); + count++; + } + assertEquals(2, count); + } +} diff --git a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDocumentInputTests.java b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDocumentInputTests.java new file mode 100644 index 0000000000000..958532689fdd7 --- /dev/null +++ b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDocumentInputTests.java @@ -0,0 +1,224 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.apache.lucene.search.Query; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.dataformat.DocumentInput; +import org.opensearch.index.engine.dataformat.FieldTypeCapabilities; +import org.opensearch.index.mapper.MappedFieldType; +import org.opensearch.index.mapper.TextSearchInfo; +import org.opensearch.index.mapper.ValueFetcher; +import org.opensearch.index.query.QueryShardContext; +import org.opensearch.search.lookup.SearchLookup; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Tests for {@link CompositeDocumentInput}. + */ +public class CompositeDocumentInputTests extends OpenSearchTestCase { + + public void testAddFieldBroadcastsToAllFormats() { + RecordingDocumentInput primaryInput = new RecordingDocumentInput(); + RecordingDocumentInput secondaryInput = new RecordingDocumentInput(); + + DataFormat primaryFormat = mockFormat("lucene", 1, Set.of()); + DataFormat secondaryFormat = mockFormat("parquet", 2, Set.of()); + + CompositeDocumentInput composite = new CompositeDocumentInput( + primaryFormat, + primaryInput, + Map.of(secondaryFormat, secondaryInput), + () -> {} + ); + + MappedFieldType keywordField = mockFieldType("keyword"); + composite.addField(keywordField, "value1"); + + assertEquals(1, primaryInput.addedFields.size()); + assertEquals(1, secondaryInput.addedFields.size()); + } + + public void testSetRowIdBroadcastsToAllInputs() { + RecordingDocumentInput primaryInput = new RecordingDocumentInput(); + RecordingDocumentInput secondary1 = new RecordingDocumentInput(); + RecordingDocumentInput secondary2 = new RecordingDocumentInput(); + + DataFormat primaryFormat = mockFormat("lucene", 1, Set.of()); + DataFormat secondaryFormat1 = mockFormat("parquet", 2, Set.of()); + DataFormat secondaryFormat2 = mockFormat("arrow", 3, Set.of()); + + Map> secondaries = new HashMap<>(); + secondaries.put(secondaryFormat1, secondary1); + secondaries.put(secondaryFormat2, secondary2); + + CompositeDocumentInput composite = new CompositeDocumentInput(primaryFormat, primaryInput, secondaries, () -> {}); + + composite.setRowId("_row_id", 42L); + + assertEquals(1, primaryInput.rowIds.size()); + assertEquals(42L, (long) primaryInput.rowIds.get(0)); + assertEquals(1, secondary1.rowIds.size()); + assertEquals(42L, (long) secondary1.rowIds.get(0)); + assertEquals(1, secondary2.rowIds.size()); + assertEquals(42L, (long) secondary2.rowIds.get(0)); + } + + public void testGetFinalInputReturnsNull() { + CompositeDocumentInput composite = new CompositeDocumentInput( + mockFormat("lucene", 1, Set.of()), + new RecordingDocumentInput(), + Map.of(), + () -> {} + ); + assertNull(composite.getFinalInput()); + } + + public void testCloseInvokesOnCloseCallback() { + AtomicBoolean closed = new AtomicBoolean(false); + CompositeDocumentInput composite = new CompositeDocumentInput( + mockFormat("lucene", 1, Set.of()), + new RecordingDocumentInput(), + Map.of(), + () -> closed.set(true) + ); + + composite.close(); + assertTrue(closed.get()); + } + + public void testGetPrimaryInputReturnsPrimaryDocumentInput() { + RecordingDocumentInput primaryInput = new RecordingDocumentInput(); + CompositeDocumentInput composite = new CompositeDocumentInput(mockFormat("lucene", 1, Set.of()), primaryInput, Map.of(), () -> {}); + assertSame(primaryInput, composite.getPrimaryInput()); + } + + public void testGetPrimaryFormatReturnsPrimaryDataFormat() { + DataFormat primaryFormat = mockFormat("lucene", 1, Set.of()); + CompositeDocumentInput composite = new CompositeDocumentInput(primaryFormat, new RecordingDocumentInput(), Map.of(), () -> {}); + assertSame(primaryFormat, composite.getPrimaryFormat()); + } + + public void testGetSecondaryInputsReturnsUnmodifiableMap() { + DataFormat secondaryFormat = mockFormat("parquet", 2, Set.of()); + RecordingDocumentInput secondaryInput = new RecordingDocumentInput(); + + CompositeDocumentInput composite = new CompositeDocumentInput( + mockFormat("lucene", 1, Set.of()), + new RecordingDocumentInput(), + Map.of(secondaryFormat, secondaryInput), + () -> {} + ); + + Map> secondaries = composite.getSecondaryInputs(); + assertEquals(1, secondaries.size()); + expectThrows( + UnsupportedOperationException.class, + () -> secondaries.put(mockFormat("x", 0, Set.of()), new RecordingDocumentInput()) + ); + } + + public void testConstructorRejectsNullPrimaryFormat() { + expectThrows(NullPointerException.class, () -> new CompositeDocumentInput(null, new RecordingDocumentInput(), Map.of(), () -> {})); + } + + public void testConstructorRejectsNullPrimaryInput() { + expectThrows( + NullPointerException.class, + () -> new CompositeDocumentInput(mockFormat("lucene", 1, Set.of()), null, Map.of(), () -> {}) + ); + } + + public void testConstructorRejectsNullSecondaryInputs() { + expectThrows( + NullPointerException.class, + () -> new CompositeDocumentInput(mockFormat("lucene", 1, Set.of()), new RecordingDocumentInput(), null, () -> {}) + ); + } + + public void testConstructorRejectsNullOnClose() { + expectThrows( + NullPointerException.class, + () -> new CompositeDocumentInput(mockFormat("lucene", 1, Set.of()), new RecordingDocumentInput(), Map.of(), null) + ); + } + + // --- helpers --- + + private DataFormat mockFormat(String name, long priority, Set fields) { + return new DataFormat() { + @Override + public String name() { + return name; + } + + @Override + public long priority() { + return priority; + } + + @Override + public Set supportedFields() { + return fields; + } + }; + } + + private MappedFieldType mockFieldType(String typeName) { + return new MappedFieldType(typeName, true, false, true, TextSearchInfo.NONE, Map.of()) { + @Override + public String typeName() { + return typeName; + } + + @Override + public ValueFetcher valueFetcher(QueryShardContext context, SearchLookup searchLookup, String format) { + return null; + } + + @Override + public Query termQuery(Object value, QueryShardContext context) { + return null; + } + }; + } + + /** + * Simple recording implementation of DocumentInput for test assertions. + */ + static class RecordingDocumentInput implements DocumentInput { + final List addedFields = new ArrayList<>(); + final List rowIds = new ArrayList<>(); + + @Override + public void addField(MappedFieldType fieldType, Object value) { + addedFields.add(value); + } + + @Override + public void setRowId(String rowIdFieldName, long rowId) { + rowIds.add(rowId); + } + + @Override + public Object getFinalInput() { + return null; + } + + @Override + public void close() {} + } +} diff --git a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java new file mode 100644 index 0000000000000..c0399d32de735 --- /dev/null +++ b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java @@ -0,0 +1,214 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.dataformat.DataFormatPlugin; +import org.opensearch.plugins.ExtensiblePlugin; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * Tests for {@link CompositeEnginePlugin}. + */ +public class CompositeEnginePluginTests extends OpenSearchTestCase { + + public void testGetSettingsReturnsAllThreeSettings() { + CompositeEnginePlugin plugin = new CompositeEnginePlugin(); + List> settings = plugin.getSettings(); + assertEquals(3, settings.size()); + assertTrue(settings.contains(CompositeEnginePlugin.COMPOSITE_ENABLED)); + assertTrue(settings.contains(CompositeEnginePlugin.PRIMARY_DATA_FORMAT)); + assertTrue(settings.contains(CompositeEnginePlugin.SECONDARY_DATA_FORMATS)); + } + + public void testCompositeEnabledDefaultsToFalse() { + Settings settings = Settings.builder().build(); + assertFalse(CompositeEnginePlugin.COMPOSITE_ENABLED.get(settings)); + } + + public void testPrimaryDataFormatDefaultsToLucene() { + Settings settings = Settings.builder().build(); + assertEquals("lucene", CompositeEnginePlugin.PRIMARY_DATA_FORMAT.get(settings)); + } + + public void testSecondaryDataFormatsDefaultsToEmpty() { + Settings settings = Settings.builder().build(); + assertTrue(CompositeEnginePlugin.SECONDARY_DATA_FORMATS.get(settings).isEmpty()); + } + + public void testCompositeEnabledValidatorRejectsEmptyPrimaryFormat() { + // Directly invoke the cross-setting validator with enabled=true and empty primary + Setting.Validator validator = extractValidator(); + Map, Object> deps = Map.of(CompositeEnginePlugin.PRIMARY_DATA_FORMAT, ""); + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> validator.validate(true, deps)); + assertTrue(ex.getMessage().contains("index.composite.primary_data_format")); + } + + public void testCompositeEnabledValidatorAcceptsNonEmptyPrimaryFormat() { + Setting.Validator validator = extractValidator(); + Map, Object> deps = Map.of(CompositeEnginePlugin.PRIMARY_DATA_FORMAT, "lucene"); + // Should not throw + validator.validate(true, deps); + } + + public void testCompositeEnabledValidatorSkipsWhenDisabled() { + Setting.Validator validator = extractValidator(); + Map, Object> deps = Map.of(CompositeEnginePlugin.PRIMARY_DATA_FORMAT, ""); + // Should not throw when composite is disabled, even with empty primary + validator.validate(false, deps); + } + + @SuppressWarnings("unchecked") + private Setting.Validator extractValidator() { + // The COMPOSITE_ENABLED setting has a validator; extract it via the setting's properties + // We test the validator logic by constructing the dependency map directly + return new Setting.Validator<>() { + @Override + public void validate(Boolean value) {} + + @Override + public void validate(Boolean enabled, Map, Object> settings) { + if (enabled) { + String primary = (String) settings.get(CompositeEnginePlugin.PRIMARY_DATA_FORMAT); + if (primary == null || primary.isEmpty()) { + throw new IllegalArgumentException( + "[index.composite.enabled] requires [index.composite.primary_data_format] to be set" + ); + } + } + } + + @Override + public java.util.Iterator> settings() { + return List.>of(CompositeEnginePlugin.PRIMARY_DATA_FORMAT).iterator(); + } + }; + } + + public void testGetDataFormatReturnsNull() { + CompositeEnginePlugin plugin = new CompositeEnginePlugin(); + assertNull(plugin.getDataFormat()); + } + + public void testLoadExtensionsRegistersPlugins() { + CompositeEnginePlugin plugin = new CompositeEnginePlugin(); + DataFormatPlugin lucenePlugin = CompositeTestHelper.stubPlugin("lucene", 1); + DataFormatPlugin parquetPlugin = CompositeTestHelper.stubPlugin("parquet", 2); + + plugin.loadExtensions(new ExtensiblePlugin.ExtensionLoader() { + @Override + @SuppressWarnings("unchecked") + public List loadExtensions(Class extensionPointType) { + if (extensionPointType == DataFormatPlugin.class) { + return (List) List.of(lucenePlugin, parquetPlugin); + } + return Collections.emptyList(); + } + }); + + Map plugins = plugin.getDataFormatPlugins(); + assertEquals(2, plugins.size()); + assertTrue(plugins.containsKey("lucene")); + assertTrue(plugins.containsKey("parquet")); + } + + public void testLoadExtensionsHigherPriorityWins() { + CompositeEnginePlugin plugin = new CompositeEnginePlugin(); + DataFormatPlugin lowPriority = CompositeTestHelper.stubPlugin("lucene", 1); + DataFormatPlugin highPriority = CompositeTestHelper.stubPlugin("lucene", 100); + + plugin.loadExtensions(new ExtensiblePlugin.ExtensionLoader() { + @Override + @SuppressWarnings("unchecked") + public List loadExtensions(Class extensionPointType) { + if (extensionPointType == DataFormatPlugin.class) { + return (List) List.of(lowPriority, highPriority); + } + return Collections.emptyList(); + } + }); + + Map plugins = plugin.getDataFormatPlugins(); + assertEquals(1, plugins.size()); + // The high priority one should win + assertEquals(100, plugins.get("lucene").getDataFormat().priority()); + } + + public void testLoadExtensionsSkipsNullDataFormat() { + CompositeEnginePlugin plugin = new CompositeEnginePlugin(); + DataFormatPlugin nullPlugin = new DataFormatPlugin() { + @Override + public DataFormat getDataFormat() { + return null; + } + + @Override + public < + T extends DataFormat, + P extends org.opensearch.index.engine.dataformat.DocumentInput> + org.opensearch.index.engine.dataformat.IndexingExecutionEngine + indexingEngine( + org.opensearch.index.mapper.MapperService mapperService, + org.opensearch.index.shard.ShardPath shardPath, + IndexSettings indexSettings + ) { + return null; + } + }; + + plugin.loadExtensions(new ExtensiblePlugin.ExtensionLoader() { + @Override + @SuppressWarnings("unchecked") + public List loadExtensions(Class extensionPointType) { + if (extensionPointType == DataFormatPlugin.class) { + return (List) List.of(nullPlugin); + } + return Collections.emptyList(); + } + }); + + assertTrue(plugin.getDataFormatPlugins().isEmpty()); + } + + public void testLoadExtensionsWithEmptyList() { + CompositeEnginePlugin plugin = new CompositeEnginePlugin(); + plugin.loadExtensions(new ExtensiblePlugin.ExtensionLoader() { + @Override + public List loadExtensions(Class extensionPointType) { + return Collections.emptyList(); + } + }); + + assertTrue(plugin.getDataFormatPlugins().isEmpty()); + } + + public void testGetDataFormatPluginsReturnsUnmodifiableMap() { + CompositeEnginePlugin plugin = new CompositeEnginePlugin(); + plugin.loadExtensions(new ExtensiblePlugin.ExtensionLoader() { + @Override + @SuppressWarnings("unchecked") + public List loadExtensions(Class extensionPointType) { + if (extensionPointType == DataFormatPlugin.class) { + return (List) List.of(CompositeTestHelper.stubPlugin("lucene", 1)); + } + return Collections.emptyList(); + } + }); + + Map plugins = plugin.getDataFormatPlugins(); + expectThrows(UnsupportedOperationException.class, () -> plugins.put("new", CompositeTestHelper.stubPlugin("new", 1))); + } +} diff --git a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeIndexingExecutionEngineTests.java b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeIndexingExecutionEngineTests.java new file mode 100644 index 0000000000000..fc704e260ed51 --- /dev/null +++ b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeIndexingExecutionEngineTests.java @@ -0,0 +1,203 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.opensearch.Version; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.dataformat.DataFormatPlugin; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Tests for {@link CompositeIndexingExecutionEngine}. + */ +public class CompositeIndexingExecutionEngineTests extends OpenSearchTestCase { + + public void testConstructorWithPrimaryOnly() { + CompositeIndexingExecutionEngine engine = CompositeTestHelper.createStubEngine("lucene"); + assertNotNull(engine.getPrimaryDelegate()); + assertTrue(engine.getSecondaryDelegates().isEmpty()); + assertEquals("composite", engine.getDataFormat().name()); + } + + public void testConstructorWithPrimaryAndSecondary() { + CompositeIndexingExecutionEngine engine = CompositeTestHelper.createStubEngine("lucene", "parquet"); + assertNotNull(engine.getPrimaryDelegate()); + assertEquals(1, engine.getSecondaryDelegates().size()); + assertEquals("parquet", engine.getSecondaryDelegates().get(0).getDataFormat().name()); + } + + public void testConstructorWithMultipleSecondaries() { + CompositeIndexingExecutionEngine engine = CompositeTestHelper.createStubEngine("lucene", "parquet", "arrow"); + assertEquals(2, engine.getSecondaryDelegates().size()); + } + + public void testConstructorThrowsWhenCompositeDisabled() { + Map plugins = new HashMap<>(); + plugins.put("lucene", CompositeTestHelper.stubPlugin("lucene", 1)); + + Settings settings = Settings.builder() + .put("index.composite.enabled", false) + .put("index.composite.primary_data_format", "lucene") + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetadata indexMetadata = IndexMetadata.builder("test-index").settings(settings).build(); + IndexSettings indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY); + + expectThrows(IllegalStateException.class, () -> new CompositeIndexingExecutionEngine(plugins, indexSettings, null, null)); + } + + public void testConstructorThrowsWhenPrimaryFormatNotRegistered() { + Map plugins = new HashMap<>(); + plugins.put("lucene", CompositeTestHelper.stubPlugin("lucene", 1)); + + IndexSettings indexSettings = createIndexSettings(true, "parquet"); + IllegalArgumentException ex = expectThrows( + IllegalArgumentException.class, + () -> new CompositeIndexingExecutionEngine(plugins, indexSettings, null, null) + ); + assertTrue(ex.getMessage().contains("parquet")); + } + + public void testConstructorThrowsWhenSecondaryFormatNotRegistered() { + Map plugins = new HashMap<>(); + plugins.put("lucene", CompositeTestHelper.stubPlugin("lucene", 1)); + + Settings settings = Settings.builder() + .put("index.composite.enabled", true) + .put("index.composite.primary_data_format", "lucene") + .putList("index.composite.secondary_data_formats", "parquet") + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetadata indexMetadata = IndexMetadata.builder("test-index").settings(settings).build(); + IndexSettings indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY); + + IllegalArgumentException ex = expectThrows( + IllegalArgumentException.class, + () -> new CompositeIndexingExecutionEngine(plugins, indexSettings, null, null) + ); + assertTrue(ex.getMessage().contains("parquet")); + } + + public void testConstructorRejectsNullDataFormatPlugins() { + IndexSettings indexSettings = createIndexSettings(true, "lucene"); + expectThrows(NullPointerException.class, () -> new CompositeIndexingExecutionEngine(null, indexSettings, null, null)); + } + + public void testConstructorRejectsNullIndexSettings() { + Map plugins = Map.of("lucene", CompositeTestHelper.stubPlugin("lucene", 1)); + expectThrows(NullPointerException.class, () -> new CompositeIndexingExecutionEngine(plugins, null, null, null)); + } + + public void testValidateFormatsRegisteredAcceptsValidConfig() { + Map plugins = new HashMap<>(); + plugins.put("lucene", CompositeTestHelper.stubPlugin("lucene", 1)); + plugins.put("parquet", CompositeTestHelper.stubPlugin("parquet", 2)); + + // Should not throw + CompositeIndexingExecutionEngine.validateFormatsRegistered(plugins, "lucene", List.of("parquet")); + } + + public void testValidateFormatsRegisteredRejectsMissingPrimary() { + Map plugins = new HashMap<>(); + plugins.put("lucene", CompositeTestHelper.stubPlugin("lucene", 1)); + + IllegalArgumentException ex = expectThrows( + IllegalArgumentException.class, + () -> CompositeIndexingExecutionEngine.validateFormatsRegistered(plugins, "parquet", List.of()) + ); + assertTrue(ex.getMessage().contains("parquet")); + } + + public void testValidateFormatsRegisteredRejectsMissingSecondary() { + Map plugins = new HashMap<>(); + plugins.put("lucene", CompositeTestHelper.stubPlugin("lucene", 1)); + + IllegalArgumentException ex = expectThrows( + IllegalArgumentException.class, + () -> CompositeIndexingExecutionEngine.validateFormatsRegistered(plugins, "lucene", List.of("parquet")) + ); + assertTrue(ex.getMessage().contains("parquet")); + } + + public void testValidateFormatsRegisteredRejectsSecondaryEqualToPrimary() { + Map plugins = new HashMap<>(); + plugins.put("lucene", CompositeTestHelper.stubPlugin("lucene", 1)); + + IllegalStateException ex = expectThrows( + IllegalStateException.class, + () -> CompositeIndexingExecutionEngine.validateFormatsRegistered(plugins, "lucene", List.of("lucene")) + ); + assertTrue(ex.getMessage().contains("same as primary")); + } + + public void testCreateWriterReturnsCompositeWriter() throws IOException { + CompositeIndexingExecutionEngine engine = CompositeTestHelper.createStubEngine("lucene"); + CompositeWriter writer = (CompositeWriter) engine.createWriter(42L); + assertNotNull(writer); + assertEquals(42L, writer.getWriterGeneration()); + writer.close(); + } + + public void testGetMergerDelegatesToPrimary() { + CompositeIndexingExecutionEngine engine = CompositeTestHelper.createStubEngine("lucene"); + // StubIndexingExecutionEngine returns null for getMerger + assertNull(engine.getMerger()); + } + + public void testGetNativeBytesUsedSumsAllEngines() { + CompositeIndexingExecutionEngine engine = CompositeTestHelper.createStubEngine("lucene", "parquet"); + // Stub engines return 0 for getNativeBytesUsed + assertEquals(0L, engine.getNativeBytesUsed()); + } + + public void testGetDataFormatReturnsCompositeDataFormat() { + CompositeIndexingExecutionEngine engine = CompositeTestHelper.createStubEngine("lucene", "parquet"); + CompositeDataFormat format = engine.getDataFormat(); + assertNotNull(format); + assertEquals("composite", format.name()); + assertEquals(2, format.getDataFormats().size()); + } + + public void testNewDocumentInputReturnsCompositeDocumentInput() { + CompositeIndexingExecutionEngine engine = CompositeTestHelper.createStubEngine("lucene", "parquet"); + CompositeDocumentInput input = engine.newDocumentInput(); + assertNotNull(input); + assertNotNull(input.getPrimaryInput()); + assertEquals(1, input.getSecondaryInputs().size()); + input.close(); + } + + public void testDeleteFilesDoesNotThrow() throws Exception { + CompositeIndexingExecutionEngine engine = CompositeTestHelper.createStubEngine("lucene", "parquet"); + engine.deleteFiles(Map.of()); + } + + private IndexSettings createIndexSettings(boolean compositeEnabled, String primaryFormat) { + Settings settings = Settings.builder() + .put("index.composite.enabled", compositeEnabled) + .put("index.composite.primary_data_format", primaryFormat) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetadata indexMetadata = IndexMetadata.builder("test-index").settings(settings).build(); + return new IndexSettings(indexMetadata, Settings.EMPTY); + } +} diff --git a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeTestHelper.java b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeTestHelper.java new file mode 100644 index 0000000000000..e3eb809702269 --- /dev/null +++ b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeTestHelper.java @@ -0,0 +1,225 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.opensearch.Version; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.dataformat.DataFormat; +import org.opensearch.index.engine.dataformat.DataFormatPlugin; +import org.opensearch.index.engine.dataformat.DocumentInput; +import org.opensearch.index.engine.dataformat.FieldTypeCapabilities; +import org.opensearch.index.engine.dataformat.FileInfos; +import org.opensearch.index.engine.dataformat.IndexingExecutionEngine; +import org.opensearch.index.engine.dataformat.Merger; +import org.opensearch.index.engine.dataformat.RefreshInput; +import org.opensearch.index.engine.dataformat.RefreshResult; +import org.opensearch.index.engine.dataformat.WriteResult; +import org.opensearch.index.engine.dataformat.Writer; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.shard.ShardPath; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * Shared test utilities for composite engine tests. + */ +final class CompositeTestHelper { + + private CompositeTestHelper() {} + + /** + * Creates a CompositeIndexingExecutionEngine with stub engines for testing. + */ + static CompositeIndexingExecutionEngine createStubEngine(String primaryName, String... secondaryNames) { + Map plugins = new HashMap<>(); + plugins.put(primaryName, stubPlugin(primaryName, 1)); + for (String name : secondaryNames) { + plugins.put(name, stubPlugin(name, 2)); + } + + Settings.Builder settingsBuilder = Settings.builder() + .put("index.composite.enabled", true) + .put("index.composite.primary_data_format", primaryName) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1); + + if (secondaryNames.length > 0) { + settingsBuilder.putList("index.composite.secondary_data_formats", secondaryNames); + } + + Settings settings = settingsBuilder.build(); + IndexMetadata indexMetadata = IndexMetadata.builder("test-index").settings(settings).build(); + IndexSettings indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY); + + return new CompositeIndexingExecutionEngine(plugins, indexSettings, null, null); + } + + static DataFormatPlugin stubPlugin(String formatName, long priority) { + DataFormat format = stubFormat(formatName, priority, Set.of()); + return new DataFormatPlugin() { + @Override + public DataFormat getDataFormat() { + return format; + } + + @Override + @SuppressWarnings("unchecked") + public > IndexingExecutionEngine indexingEngine( + MapperService mapperService, + ShardPath shardPath, + IndexSettings indexSettings + ) { + return (IndexingExecutionEngine) new StubIndexingExecutionEngine(format); + } + }; + } + + static DataFormatPlugin stubPlugin(String formatName, long priority, Set fields) { + DataFormat format = stubFormat(formatName, priority, fields); + return new DataFormatPlugin() { + @Override + public DataFormat getDataFormat() { + return format; + } + + @Override + @SuppressWarnings("unchecked") + public > IndexingExecutionEngine indexingEngine( + MapperService mapperService, + ShardPath shardPath, + IndexSettings indexSettings + ) { + return (IndexingExecutionEngine) new StubIndexingExecutionEngine(format); + } + }; + } + + static DataFormat stubFormat(String name, long priority, Set fields) { + return new DataFormat() { + @Override + public String name() { + return name; + } + + @Override + public long priority() { + return priority; + } + + @Override + public Set supportedFields() { + return fields; + } + + @Override + public String toString() { + return "StubDataFormat{" + name + "}"; + } + }; + } + + /** + * Minimal stub IndexingExecutionEngine that returns no-op writers and empty results. + */ + static class StubIndexingExecutionEngine implements IndexingExecutionEngine> { + + private final DataFormat dataFormat; + + StubIndexingExecutionEngine(DataFormat dataFormat) { + this.dataFormat = dataFormat; + } + + @Override + public Writer> createWriter(long writerGeneration) { + return new StubWriter(dataFormat); + } + + @Override + public Merger getMerger() { + return null; + } + + @Override + public RefreshResult refresh(RefreshInput refreshInput) { + return new RefreshResult(Collections.emptyList()); + } + + @Override + public DataFormat getDataFormat() { + return dataFormat; + } + + @Override + public void deleteFiles(Map> filesToDelete) {} + + @Override + public DocumentInput newDocumentInput() { + return new StubDocumentInput(); + } + } + + /** + * Minimal stub Writer that always succeeds and returns empty FileInfos. + */ + static class StubWriter implements Writer> { + + private final DataFormat format; + private WriteResult resultToReturn = new WriteResult.Success(1, 1, 1); + + StubWriter(DataFormat format) { + this.format = format; + } + + void setResultToReturn(WriteResult result) { + this.resultToReturn = result; + } + + @Override + public WriteResult addDoc(DocumentInput d) { + return resultToReturn; + } + + @Override + public FileInfos flush() { + return FileInfos.empty(); + } + + @Override + public void sync() {} + + @Override + public void close() {} + } + + /** + * Minimal stub DocumentInput. + */ + static class StubDocumentInput implements DocumentInput { + @Override + public Object getFinalInput() { + return null; + } + + @Override + public void addField(org.opensearch.index.mapper.MappedFieldType fieldType, Object value) {} + + @Override + public void setRowId(String rowIdFieldName, long rowId) {} + + @Override + public void close() {} + } +} diff --git a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeWriterTests.java b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeWriterTests.java new file mode 100644 index 0000000000000..83d444a2b55f0 --- /dev/null +++ b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeWriterTests.java @@ -0,0 +1,118 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.opensearch.index.engine.dataformat.FileInfos; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * Tests for {@link CompositeWriter}. + */ +public class CompositeWriterTests extends OpenSearchTestCase { + + private CompositeIndexingExecutionEngine engine; + + @Override + public void setUp() throws Exception { + super.setUp(); + engine = CompositeTestHelper.createStubEngine("lucene", "parquet"); + } + + public void testWriterGenerationIsPreserved() throws IOException { + long gen = randomLongBetween(0, 1000); + CompositeWriter writer = new CompositeWriter(engine, gen); + assertEquals(gen, writer.getWriterGeneration()); + writer.close(); + } + + public void testAbortedDefaultsToFalse() throws IOException { + CompositeWriter writer = new CompositeWriter(engine, 0); + assertFalse(writer.isAborted()); + writer.close(); + } + + public void testAbortSetsAbortedFlag() throws IOException { + CompositeWriter writer = new CompositeWriter(engine, 0); + writer.abort(); + assertTrue(writer.isAborted()); + writer.close(); + } + + public void testFlushPendingDefaultsToFalse() throws IOException { + CompositeWriter writer = new CompositeWriter(engine, 0); + assertFalse(writer.isFlushPending()); + writer.close(); + } + + public void testSetFlushPendingSetsFlag() throws IOException { + CompositeWriter writer = new CompositeWriter(engine, 0); + writer.setFlushPending(); + assertTrue(writer.isFlushPending()); + writer.close(); + } + + public void testLockAndUnlock() throws IOException { + CompositeWriter writer = new CompositeWriter(engine, 0); + writer.lock(); + assertTrue(writer.tryLock()); + writer.unlock(); + writer.unlock(); + writer.close(); + } + + public void testTryLockSucceedsWhenUnlocked() throws IOException { + CompositeWriter writer = new CompositeWriter(engine, 0); + assertTrue(writer.tryLock()); + writer.unlock(); + writer.close(); + } + + public void testTryLockWithTimeoutSucceeds() throws Exception { + CompositeWriter writer = new CompositeWriter(engine, 0); + assertTrue(writer.tryLock(100, TimeUnit.MILLISECONDS)); + writer.unlock(); + writer.close(); + } + + public void testNewConditionThrowsUnsupported() throws IOException { + CompositeWriter writer = new CompositeWriter(engine, 0); + expectThrows(UnsupportedOperationException.class, writer::newCondition); + writer.close(); + } + + public void testFlushReturnsFileInfos() throws IOException { + CompositeWriter writer = new CompositeWriter(engine, 0); + FileInfos fileInfos = writer.flush(); + assertNotNull(fileInfos); + writer.close(); + } + + public void testSyncDoesNotThrow() throws IOException { + CompositeWriter writer = new CompositeWriter(engine, 0); + writer.sync(); + writer.close(); + } + + public void testCloseDoesNotThrow() throws IOException { + CompositeWriter writer = new CompositeWriter(engine, 0); + writer.close(); + // calling close again should also not throw + writer.close(); + } + + public void testLockInterruptiblySucceeds() throws Exception { + CompositeWriter writer = new CompositeWriter(engine, 0); + writer.lockInterruptibly(); + writer.unlock(); + writer.close(); + } +} diff --git a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/RowIdGeneratorTests.java b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/RowIdGeneratorTests.java new file mode 100644 index 0000000000000..1568be65a093c --- /dev/null +++ b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/RowIdGeneratorTests.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite; + +import org.opensearch.test.OpenSearchTestCase; + +/** + * Tests for {@link RowIdGenerator}. + */ +public class RowIdGeneratorTests extends OpenSearchTestCase { + + public void testNextRowIdStartsAtZero() { + RowIdGenerator generator = new RowIdGenerator("test"); + assertEquals(0L, generator.nextRowId()); + } + + public void testNextRowIdIncrementsMonotonically() { + RowIdGenerator generator = new RowIdGenerator("test"); + for (int i = 0; i < 100; i++) { + assertEquals(i, generator.nextRowId()); + } + } + + public void testCurrentRowIdReturnsCurrentWithoutIncrementing() { + RowIdGenerator generator = new RowIdGenerator("test"); + assertEquals(0L, generator.currentRowId()); + assertEquals(0L, generator.currentRowId()); + generator.nextRowId(); + assertEquals(1L, generator.currentRowId()); + assertEquals(1L, generator.currentRowId()); + } + + public void testGetSourceReturnsConstructorArgument() { + String source = randomAlphaOfLength(10); + RowIdGenerator generator = new RowIdGenerator(source); + assertEquals(source, generator.getSource()); + } + + public void testCurrentRowIdReflectsNextRowIdCalls() { + RowIdGenerator generator = new RowIdGenerator("test"); + int count = randomIntBetween(1, 50); + for (int i = 0; i < count; i++) { + generator.nextRowId(); + } + assertEquals(count, generator.currentRowId()); + } +} From 624a94f8cd0fa11370b089ada71cf519175b0c19 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Fri, 20 Mar 2026 20:05:21 +0530 Subject: [PATCH 2/3] Making DataFormat safe to be used in Maps --- .../composite/CompositeDataFormat.java | 2 +- .../opensearch/composite/CompositeWriter.java | 54 ++++++++++--------- .../index/engine/dataformat/DataFormat.java | 24 +++++++-- .../dataformat/DataFormatPluginTests.java | 2 +- 4 files changed, 50 insertions(+), 32 deletions(-) diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java index c1f74fb60627a..2633ad0f30330 100644 --- a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java @@ -24,7 +24,7 @@ * @opensearch.experimental */ @ExperimentalApi -public class CompositeDataFormat implements DataFormat { +public class CompositeDataFormat extends DataFormat { private final List dataFormats; diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java index ccf91af29c50f..83ed18955fd7e 100644 --- a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java @@ -21,8 +21,7 @@ import java.io.IOException; import java.util.AbstractMap; -import java.util.ArrayList; -import java.util.List; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -47,7 +46,7 @@ public class CompositeWriter implements Writer, Lock { private static final Logger logger = LogManager.getLogger(CompositeWriter.class); private final Map.Entry>> primaryWriter; - private final List>>> secondaryWriters; + private final Map>> secondaryWritersByFormat; private final ReentrantLock lock; private final long writerGeneration; private final RowIdGenerator rowIdGenerator; @@ -77,16 +76,14 @@ public CompositeWriter(CompositeIndexingExecutionEngine engine, long writerGener (Writer>) primaryDelegate.createWriter(writerGeneration) ); - List>>> secondaries = new ArrayList<>(); + Map>> secondaries = new LinkedHashMap<>(); for (IndexingExecutionEngine delegate : engine.getSecondaryDelegates()) { - secondaries.add( - new AbstractMap.SimpleImmutableEntry<>( - delegate.getDataFormat(), - (Writer>) delegate.createWriter(writerGeneration) - ) + secondaries.put( + delegate.getDataFormat(), + (Writer>) delegate.createWriter(writerGeneration) ); } - this.secondaryWriters = List.copyOf(secondaries); + this.secondaryWritersByFormat = Map.copyOf(secondaries); this.rowIdGenerator = new RowIdGenerator(CompositeWriter.class.getName()); } @@ -102,16 +99,20 @@ public WriteResult addDoc(CompositeDocumentInput doc) throws IOException { } } - // Then write to each secondary - List> secondaryInputs = new ArrayList<>(doc.getSecondaryInputs().values()); - for (int i = 0; i < secondaryWriters.size(); i++) { - Map.Entry>> entry = secondaryWriters.get(i); - DocumentInput input = secondaryInputs.get(i); - WriteResult result = entry.getValue().addDoc(input); + // Then write to each secondary — keyed lookup by DataFormat (equals/hashCode based on name) + Map> secondaryInputs = doc.getSecondaryInputs(); + for (Map.Entry> inputEntry : secondaryInputs.entrySet()) { + DataFormat format = inputEntry.getKey(); + Writer> writer = secondaryWritersByFormat.get(format); + if (writer == null) { + logger.warn("No writer found for secondary format [{}], skipping", format.name()); + continue; + } + WriteResult result = writer.addDoc(inputEntry.getValue()); switch (result) { - case WriteResult.Success s -> logger.trace("Successfully added document in secondary format [{}]", entry.getKey().name()); + case WriteResult.Success s -> logger.trace("Successfully added document in secondary format [{}]", format.name()); case WriteResult.Failure f -> { - logger.debug("Failed to add document in secondary format [{}]", entry.getKey().name()); + logger.debug("Failed to add document in secondary format [{}]", format.name()); return result; } } @@ -127,9 +128,12 @@ public FileInfos flush() throws IOException { Optional primaryWfs = primaryWriter.getValue().flush().getWriterFileSet(primaryWriter.getKey()); primaryWfs.ifPresent(writerFileSet -> builder.putWriterFileSet(primaryWriter.getKey(), writerFileSet)); // Flush secondaries - for (Map.Entry>> entry : secondaryWriters) { - Optional wfs = entry.getValue().flush().getWriterFileSet(entry.getKey()); - wfs.ifPresent(writerFileSet -> builder.putWriterFileSet(entry.getKey(), writerFileSet)); + for (Writer> writer : secondaryWritersByFormat.values()) { + FileInfos fileInfos = writer.flush(); + // Iterate all format entries in the returned FileInfos + for (Map.Entry fileEntry : fileInfos.writerFilesMap().entrySet()) { + builder.putWriterFileSet(fileEntry.getKey(), fileEntry.getValue()); + } } return builder.build(); } @@ -137,16 +141,16 @@ public FileInfos flush() throws IOException { @Override public void sync() throws IOException { primaryWriter.getValue().sync(); - for (Map.Entry>> entry : secondaryWriters) { - entry.getValue().sync(); + for (Writer> writer : secondaryWritersByFormat.values()) { + writer.sync(); } } @Override public void close() throws IOException { primaryWriter.getValue().close(); - for (Map.Entry>> entry : secondaryWriters) { - entry.getValue().close(); + for (Writer> writer : secondaryWritersByFormat.values()) { + writer.close(); } } diff --git a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormat.java b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormat.java index 668571b97e55c..ed261b976403f 100644 --- a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormat.java +++ b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormat.java @@ -10,24 +10,26 @@ import org.opensearch.common.annotation.ExperimentalApi; +import java.util.Objects; import java.util.Set; /** * Represents a data format for storing and managing index data, with declared capabilities. * Each data format (e.g., Lucene, Parquet) declares what storage and query capabilities it supports. *

- * Equality is based on the format name — there should be one {@code DataFormat} instance per unique name. + * Equality is based on the format {@link #name()} — there should be one {@code DataFormat} instance + * per unique name. This allows {@code DataFormat} to be used safely as a {@link java.util.Map} key. * * @opensearch.experimental */ @ExperimentalApi -public interface DataFormat { +public abstract class DataFormat { /** * Returns the unique name of this data format. * * @return the data format name */ - String name(); + public abstract String name(); /** * Returns the priority of this data format. Higher priority formats are preferred @@ -35,12 +37,24 @@ public interface DataFormat { * * @return the priority value */ - long priority(); + public abstract long priority(); /** * Returns the set of field type capabilities supported by this data format. * * @return the supported field type capabilities */ - Set supportedFields(); + public abstract Set supportedFields(); + + @Override + public final boolean equals(Object o) { + if (this == o) return true; + if (o instanceof DataFormat == false) return false; + return Objects.equals(name(), ((DataFormat) o).name()); + } + + @Override + public final int hashCode() { + return Objects.hashCode(name()); + } } diff --git a/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java b/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java index 117ce798494f2..0206c157248ad 100644 --- a/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java +++ b/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java @@ -222,7 +222,7 @@ public void testRefreshInput() { assertEquals(1, input.existingSegments().size()); } - static class MockDataFormat implements DataFormat { + static class MockDataFormat extends DataFormat { @Override public String name() { return "mock-columnar"; From 9a8d89da66b1e9191562cd0845032420b34093d7 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Fri, 20 Mar 2026 22:40:52 +0530 Subject: [PATCH 3/3] implifying Lockable interface for CompositeWriters Signed-off-by: Bukhtawar Khan --- .../opensearch/composite/queue/Lockable.java | 37 +++++++++++++++++++ .../queue/LockableConcurrentQueue.java | 7 ++-- .../queue/LockableConcurrentQueueTests.java | 26 ++++++++++++- .../composite/CompositeEnginePlugin.java | 10 +++-- .../CompositeIndexingExecutionEngine.java | 4 -- .../opensearch/composite/CompositeWriter.java | 22 ++--------- .../composite/CompositeEnginePluginTests.java | 14 +++---- .../composite/CompositeTestHelper.java | 10 ++--- .../composite/CompositeWriterTests.java | 20 ---------- .../engine/dataformat/DataFormatPlugin.java | 4 +- .../dataformat/DataFormatPluginTests.java | 17 +++++---- 11 files changed, 93 insertions(+), 78 deletions(-) create mode 100644 sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/Lockable.java diff --git a/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/Lockable.java b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/Lockable.java new file mode 100644 index 0000000000000..13f84ef15a885 --- /dev/null +++ b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/Lockable.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.composite.queue; + +import jdk.jfr.Experimental; + +/** + * A minimal locking contract for objects managed by a {@link LockableConcurrentQueue}. + * + * @opensearch.experimental + */ +@Experimental +public interface Lockable { + + /** + * Acquires the lock. + */ + void lock(); + + /** + * Attempts to acquire the lock without blocking. + * + * @return {@code true} if the lock was acquired + */ + boolean tryLock(); + + /** + * Releases the lock. + */ + void unlock(); +} diff --git a/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/LockableConcurrentQueue.java b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/LockableConcurrentQueue.java index 2f1be95db2222..5a6533cef7771 100644 --- a/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/LockableConcurrentQueue.java +++ b/sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/LockableConcurrentQueue.java @@ -10,12 +10,11 @@ import java.util.Queue; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; import java.util.function.Supplier; /** * A concurrent queue wrapper that adds lock-and-poll / add-and-unlock semantics - * on top of {@link ConcurrentQueue}. Entries must implement {@link Lock} so that + * on top of {@link ConcurrentQueue}. Entries must implement {@link Lockable} so that * they can be atomically locked when polled and unlocked when returned. *

* This is used by the composite writer pool to ensure that a writer is locked @@ -24,7 +23,7 @@ * @param the type of lockable elements held in this queue * @opensearch.experimental */ -public final class LockableConcurrentQueue { +public final class LockableConcurrentQueue { private final ConcurrentQueue queue; private final AtomicInteger addAndUnlockCounter = new AtomicInteger(); @@ -47,7 +46,7 @@ public T lockAndPoll() { int addAndUnlockCount; do { addAndUnlockCount = addAndUnlockCounter.get(); - T entry = queue.poll(Lock::tryLock); + T entry = queue.poll(Lockable::tryLock); if (entry != null) { return entry; } diff --git a/sandbox/libs/composite-engine-lib/src/test/java/org/opensearch/composite/queue/LockableConcurrentQueueTests.java b/sandbox/libs/composite-engine-lib/src/test/java/org/opensearch/composite/queue/LockableConcurrentQueueTests.java index 35716a8fdd1ae..e5a155a3dadb0 100644 --- a/sandbox/libs/composite-engine-lib/src/test/java/org/opensearch/composite/queue/LockableConcurrentQueueTests.java +++ b/sandbox/libs/composite-engine-lib/src/test/java/org/opensearch/composite/queue/LockableConcurrentQueueTests.java @@ -24,13 +24,37 @@ public class LockableConcurrentQueueTests extends OpenSearchTestCase { /** * A simple lockable entry for testing. */ - static class LockableEntry extends ReentrantLock { + static class LockableEntry implements Lockable { final String id; + private final ReentrantLock delegate = new ReentrantLock(); LockableEntry(String id) { this.id = id; } + @Override + public void lock() { + delegate.lock(); + } + + @Override + public boolean tryLock() { + return delegate.tryLock(); + } + + @Override + public void unlock() { + delegate.unlock(); + } + + boolean isHeldByCurrentThread() { + return delegate.isHeldByCurrentThread(); + } + + boolean isLocked() { + return delegate.isLocked(); + } + @Override public String toString() { return "LockableEntry{" + id + "}"; diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java index 8836409a9af72..83e909d3f065c 100644 --- a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java @@ -17,7 +17,6 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.dataformat.DataFormat; import org.opensearch.index.engine.dataformat.DataFormatPlugin; -import org.opensearch.index.engine.dataformat.DocumentInput; import org.opensearch.index.engine.dataformat.IndexingExecutionEngine; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.shard.ShardPath; @@ -129,6 +128,10 @@ public void loadExtensions(ExtensionLoader loader) { continue; } String name = format.name(); + if (name == null || name.isBlank()) { + logger.warn("DataFormatPlugin [{}] returned a DataFormat with null/blank name, skipping", plugin.getClass().getName()); + continue; + } DataFormatPlugin existing = registry.get(name); if (existing != null) { long existingPriority = existing.getDataFormat().priority(); @@ -180,13 +183,12 @@ public DataFormat getDataFormat() { } @Override - @SuppressWarnings("unchecked") - public > IndexingExecutionEngine indexingEngine( + public IndexingExecutionEngine indexingEngine( MapperService mapperService, ShardPath shardPath, IndexSettings indexSettings ) { - return (IndexingExecutionEngine) new CompositeIndexingExecutionEngine( + return new CompositeIndexingExecutionEngine( dataFormatPlugins, indexSettings, mapperService, diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java index 0208d8d21da22..42190afc90b7b 100644 --- a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java @@ -105,10 +105,6 @@ public CompositeIndexingExecutionEngine( List> secondaries = new ArrayList<>(); for (String secondaryName : secondaryFormatNames) { - if (secondaryName.equals(primaryFormatName)) { - logger.warn("Secondary data format [{}] is the same as primary, skipping duplicate", secondaryName); - continue; - } DataFormatPlugin secondaryPlugin = dataFormatPlugins.get(secondaryName); secondaries.add(secondaryPlugin.indexingEngine(mapperService, shardPath, indexSettings)); allFormats.add(secondaryPlugin.getDataFormat()); diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java index 83ed18955fd7e..406f976b3f20c 100644 --- a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java @@ -24,11 +24,10 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.opensearch.composite.queue.Lockable; + /** * A composite {@link Writer} that wraps one {@link Writer} per registered data format * and delegates write operations to each per-format writer. @@ -41,7 +40,7 @@ * @opensearch.experimental */ @ExperimentalApi -public class CompositeWriter implements Writer, Lock { +public class CompositeWriter implements Writer, Lockable { private static final Logger logger = LogManager.getLogger(CompositeWriter.class); @@ -200,28 +199,13 @@ public void lock() { lock.lock(); } - @Override - public void lockInterruptibly() throws InterruptedException { - lock.lockInterruptibly(); - } - @Override public boolean tryLock() { return lock.tryLock(); } - @Override - public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { - return lock.tryLock(time, unit); - } - @Override public void unlock() { lock.unlock(); } - - @Override - public Condition newCondition() { - throw new UnsupportedOperationException(); - } } diff --git a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java index c0399d32de735..81ee48bedfd78 100644 --- a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java +++ b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java @@ -156,15 +156,11 @@ public DataFormat getDataFormat() { } @Override - public < - T extends DataFormat, - P extends org.opensearch.index.engine.dataformat.DocumentInput> - org.opensearch.index.engine.dataformat.IndexingExecutionEngine - indexingEngine( - org.opensearch.index.mapper.MapperService mapperService, - org.opensearch.index.shard.ShardPath shardPath, - IndexSettings indexSettings - ) { + public org.opensearch.index.engine.dataformat.IndexingExecutionEngine indexingEngine( + org.opensearch.index.mapper.MapperService mapperService, + org.opensearch.index.shard.ShardPath shardPath, + IndexSettings indexSettings + ) { return null; } }; diff --git a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeTestHelper.java b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeTestHelper.java index e3eb809702269..7455b9fac7363 100644 --- a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeTestHelper.java +++ b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeTestHelper.java @@ -76,13 +76,12 @@ public DataFormat getDataFormat() { } @Override - @SuppressWarnings("unchecked") - public > IndexingExecutionEngine indexingEngine( + public IndexingExecutionEngine indexingEngine( MapperService mapperService, ShardPath shardPath, IndexSettings indexSettings ) { - return (IndexingExecutionEngine) new StubIndexingExecutionEngine(format); + return new StubIndexingExecutionEngine(format); } }; } @@ -96,13 +95,12 @@ public DataFormat getDataFormat() { } @Override - @SuppressWarnings("unchecked") - public > IndexingExecutionEngine indexingEngine( + public IndexingExecutionEngine indexingEngine( MapperService mapperService, ShardPath shardPath, IndexSettings indexSettings ) { - return (IndexingExecutionEngine) new StubIndexingExecutionEngine(format); + return new StubIndexingExecutionEngine(format); } }; } diff --git a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeWriterTests.java b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeWriterTests.java index 83d444a2b55f0..c34f28fca0ca0 100644 --- a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeWriterTests.java +++ b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeWriterTests.java @@ -12,7 +12,6 @@ import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; -import java.util.concurrent.TimeUnit; /** * Tests for {@link CompositeWriter}. @@ -76,19 +75,6 @@ public void testTryLockSucceedsWhenUnlocked() throws IOException { writer.close(); } - public void testTryLockWithTimeoutSucceeds() throws Exception { - CompositeWriter writer = new CompositeWriter(engine, 0); - assertTrue(writer.tryLock(100, TimeUnit.MILLISECONDS)); - writer.unlock(); - writer.close(); - } - - public void testNewConditionThrowsUnsupported() throws IOException { - CompositeWriter writer = new CompositeWriter(engine, 0); - expectThrows(UnsupportedOperationException.class, writer::newCondition); - writer.close(); - } - public void testFlushReturnsFileInfos() throws IOException { CompositeWriter writer = new CompositeWriter(engine, 0); FileInfos fileInfos = writer.flush(); @@ -109,10 +95,4 @@ public void testCloseDoesNotThrow() throws IOException { writer.close(); } - public void testLockInterruptiblySucceeds() throws Exception { - CompositeWriter writer = new CompositeWriter(engine, 0); - writer.lockInterruptibly(); - writer.unlock(); - writer.close(); - } } diff --git a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java index c2a1fc695652a..6433aa8c0f123 100644 --- a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java +++ b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java @@ -36,11 +36,9 @@ public interface DataFormatPlugin { * @param mapperService the mapper service for field mapping resolution * @param shardPath the shard path for file storage * @param indexSettings the index settings - * @param the data format type - * @param

the document input type * @return the indexing execution engine instance */ - > IndexingExecutionEngine indexingEngine( + IndexingExecutionEngine indexingEngine( MapperService mapperService, ShardPath shardPath, IndexSettings indexSettings diff --git a/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java b/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java index 0206c157248ad..6f7f8c7c1307f 100644 --- a/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java +++ b/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java @@ -62,11 +62,13 @@ public void testFullDataFormatLifecycle() throws IOException { .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) .build(); - IndexingExecutionEngine engine = plugin.indexingEngine( - mock(MapperService.class), - new ShardPath(false, Path.of("/tmp/uuid/0"), Path.of("/tmp/uuid/0"), new ShardId("index", "uuid", 0)), - new IndexSettings(IndexMetadata.builder("index").settings(settings).build(), settings) - ); + @SuppressWarnings("unchecked") + IndexingExecutionEngine engine = + (IndexingExecutionEngine) plugin.indexingEngine( + mock(MapperService.class), + new ShardPath(false, Path.of("/tmp/uuid/0"), Path.of("/tmp/uuid/0"), new ShardId("index", "uuid", 0)), + new IndexSettings(IndexMetadata.builder("index").settings(settings).build(), settings) + ); assertEquals(format, engine.getDataFormat()); // 2. Create a writer and write documents @@ -400,13 +402,12 @@ public DataFormat getDataFormat() { } @Override - @SuppressWarnings("unchecked") - public > IndexingExecutionEngine indexingEngine( + public IndexingExecutionEngine indexingEngine( MapperService mapperService, ShardPath shardPath, IndexSettings indexSettings ) { - return (IndexingExecutionEngine) new MockIndexingExecutionEngine(dataFormat); + return new MockIndexingExecutionEngine(dataFormat); } } }