Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
.claude
CLAUDE.md
.cursor*
.kiro*

# intellij files
.idea/
Expand Down
21 changes: 21 additions & 0 deletions sandbox/libs/composite-engine-lib/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Shared concurrent queue utilities for the composite indexing engine.
* No external dependencies — pure Java concurrency primitives.
*/

dependencies {
}

testingConventions.enabled = false

tasks.named('forbiddenApisMain').configure {
replaceSignatureFiles 'jdk-signatures'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.composite.queue;

import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.function.Supplier;

/**
* A striped concurrent queue that distributes entries across multiple internal
* queues using thread-affinity-based hashing. This reduces contention by allowing
* concurrent threads to operate on different stripes without blocking each other.
*
* @param <T> the type of elements held in this queue
* @opensearch.experimental
*/
public final class ConcurrentQueue<T> {

static final int MIN_CONCURRENCY = 1;
static final int MAX_CONCURRENCY = 256;

private final int concurrency;
private final Lock[] locks;
private final Queue<T>[] queues;
private final Supplier<Queue<T>> queueSupplier;

ConcurrentQueue(Supplier<Queue<T>> queueSupplier, int concurrency) {
if (concurrency < MIN_CONCURRENCY || concurrency > MAX_CONCURRENCY) {
throw new IllegalArgumentException(
"concurrency must be in [" + MIN_CONCURRENCY + ", " + MAX_CONCURRENCY + "], got " + concurrency
);
}
this.concurrency = concurrency;
this.queueSupplier = queueSupplier;
locks = new Lock[concurrency];
@SuppressWarnings({ "rawtypes", "unchecked" })
Queue<T>[] queues = new Queue[concurrency];
this.queues = queues;
for (int i = 0; i < concurrency; ++i) {
locks[i] = new ReentrantLock();
queues[i] = queueSupplier.get();
}
}

void add(T entry) {
// Seed the order in which to look at entries based on the current thread. This helps distribute
// entries across queues and gives a bit of thread affinity between entries and threads, which
// can't hurt.
final int threadHash = Thread.currentThread().hashCode() & 0xFFFF;
for (int i = 0; i < concurrency; ++i) {
final int index = (threadHash + i) % concurrency;
final Lock lock = locks[index];
final Queue<T> queue = queues[index];
if (lock.tryLock()) {
try {
queue.add(entry);
return;
} finally {
lock.unlock();
}
}
}
final int index = threadHash % concurrency;
final Lock lock = locks[index];
final Queue<T> queue = queues[index];
lock.lock();
try {
queue.add(entry);
} finally {
lock.unlock();
}
}

T poll(Predicate<T> predicate) {
final int threadHash = Thread.currentThread().hashCode() & 0xFFFF;
for (int i = 0; i < concurrency; ++i) {
final int index = (threadHash + i) % concurrency;
final Lock lock = locks[index];
final Queue<T> queue = queues[index];
if (lock.tryLock()) {
try {
Iterator<T> it = queue.iterator();
while (it.hasNext()) {
T entry = it.next();
if (predicate.test(entry)) {
it.remove();
return entry;
}
}
} finally {
lock.unlock();
}
}
}
for (int i = 0; i < concurrency; ++i) {
final int index = (threadHash + i) % concurrency;
final Lock lock = locks[index];
final Queue<T> queue = queues[index];
lock.lock();
try {
Iterator<T> it = queue.iterator();
while (it.hasNext()) {
T entry = it.next();
if (predicate.test(entry)) {
it.remove();
return entry;
}
}
} finally {
lock.unlock();
}
}
return null;
}

boolean remove(T entry) {
for (int i = 0; i < concurrency; ++i) {
final Lock lock = locks[i];
final Queue<T> queue = queues[i];
lock.lock();
try {
if (queue.remove(entry)) {
return true;
}
} finally {
lock.unlock();
}
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.composite.queue;

import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.function.Supplier;

/**
* A concurrent queue wrapper that adds lock-and-poll / add-and-unlock semantics
* on top of {@link ConcurrentQueue}. Entries must implement {@link Lock} so that
* they can be atomically locked when polled and unlocked when returned.
* <p>
* This is used by the composite writer pool to ensure that a writer is locked
* before it is handed out and unlocked when it is returned.
*
* @param <T> the type of lockable elements held in this queue
* @opensearch.experimental
*/
public final class LockableConcurrentQueue<T extends Lock> {

private final ConcurrentQueue<T> queue;
private final AtomicInteger addAndUnlockCounter = new AtomicInteger();

public LockableConcurrentQueue(Supplier<Queue<T>> queueSupplier, int concurrency) {
this.queue = new ConcurrentQueue<>(queueSupplier, concurrency);
}

/**
* Lock an entry, and poll it from the queue, in that order. If no entry can be found and locked,
* {@code null} is returned.
*/
public T lockAndPoll() {
int addAndUnlockCount;
do {
addAndUnlockCount = addAndUnlockCounter.get();
T entry = queue.poll(Lock::tryLock);
if (entry != null) {
return entry;
}
// If an entry has been added to the queue in the meantime, try again.
} while (addAndUnlockCount != addAndUnlockCounter.get());

return null;
}

/** Remove an entry from the queue. */
public boolean remove(T entry) {
return queue.remove(entry);
}

/** Add an entry to the queue and unlock it, in that order. */
public void addAndUnlock(T entry) {
queue.add(entry);
entry.unlock();
addAndUnlockCounter.incrementAndGet();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Concurrent queue utilities for the composite indexing engine.
*
* @opensearch.experimental
*/
package org.opensearch.composite.queue;
18 changes: 18 additions & 0 deletions sandbox/plugins/composite-engine/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

opensearchplugin {
description = 'Composite indexing engine plugin that orchestrates multi-format indexing across multiple data format engines.'
classname = 'org.opensearch.composite.CompositeEnginePlugin'
}

dependencies {
api project(':sandbox:libs:composite-engine-lib')
compileOnly project(':server')
testImplementation project(':test:framework')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.composite;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.engine.dataformat.DataFormat;
import org.opensearch.index.engine.dataformat.FieldTypeCapabilities;

import java.util.List;
import java.util.Objects;
import java.util.Set;

/**
* A composite {@link DataFormat} that wraps multiple per-format {@link DataFormat} instances.
* Each constituent format retains its own {@link FieldTypeCapabilities} — field routing is
* handled per-format by {@link CompositeDocumentInput}, not by this class.
*
* @opensearch.experimental
*/
@ExperimentalApi
public class CompositeDataFormat implements DataFormat {

private final List<DataFormat> dataFormats;
private final DataFormat primaryDataFormat;

/**
* Constructs a CompositeDataFormat from the given list of data formats.
*
* @param dataFormats the constituent data formats
* @param primaryDataFormat the primary data format (must be contained in {@code dataFormats})
*/
public CompositeDataFormat(List<DataFormat> dataFormats, DataFormat primaryDataFormat) {
this.dataFormats = dataFormats;
this.primaryDataFormat = primaryDataFormat;
}

public CompositeDataFormat() {
this.dataFormats = List.of();
this.primaryDataFormat = null;
}

/**
* Returns the primary data format.
*
* @return the primary data format
*/
public DataFormat getPrimaryDataFormat() {
return primaryDataFormat;
}

/**
* Returns the list of constituent data formats.
*
* @return the data formats
*/
public List<DataFormat> getDataFormats() {
return dataFormats;
}

@Override
public String name() {
return "composite";
}

@Override
public long priority() {
return Long.MAX_VALUE;
}

@Override
public Set<FieldTypeCapabilities> supportedFields() {
return primaryDataFormat.supportedFields();
}

@Override
public String toString() {
return "CompositeDataFormat{" + "dataFormats=" + dataFormats + ", primaryDataFormat=" + primaryDataFormat + '}';
}
}
Loading
Loading