Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.composite.queue;

import jdk.jfr.Experimental;

/**
* A minimal locking contract for objects managed by a {@link LockableConcurrentQueue}.
*
* @opensearch.experimental
*/
@Experimental
public interface Lockable {

/**
* Acquires the lock.
*/
void lock();

/**
* Attempts to acquire the lock without blocking.
*
* @return {@code true} if the lock was acquired
*/
boolean tryLock();

/**
* Releases the lock.
*/
void unlock();
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@

import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.function.Supplier;

/**
* A concurrent queue wrapper that adds lock-and-poll / add-and-unlock semantics
* on top of {@link ConcurrentQueue}. Entries must implement {@link Lock} so that
* on top of {@link ConcurrentQueue}. Entries must implement {@link Lockable} so that
* they can be atomically locked when polled and unlocked when returned.
* <p>
* This is used by the composite writer pool to ensure that a writer is locked
Expand All @@ -24,7 +23,7 @@
* @param <T> the type of lockable elements held in this queue
* @opensearch.experimental
*/
public final class LockableConcurrentQueue<T extends Lock> {
public final class LockableConcurrentQueue<T extends Lockable> {

private final ConcurrentQueue<T> queue;
private final AtomicInteger addAndUnlockCounter = new AtomicInteger();
Expand All @@ -47,7 +46,7 @@ public T lockAndPoll() {
int addAndUnlockCount;
do {
addAndUnlockCount = addAndUnlockCounter.get();
T entry = queue.poll(Lock::tryLock);
T entry = queue.poll(Lockable::tryLock);
if (entry != null) {
return entry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,37 @@ public class LockableConcurrentQueueTests extends OpenSearchTestCase {
/**
* A simple lockable entry for testing.
*/
static class LockableEntry extends ReentrantLock {
static class LockableEntry implements Lockable {
final String id;
private final ReentrantLock delegate = new ReentrantLock();

LockableEntry(String id) {
this.id = id;
}

@Override
public void lock() {
delegate.lock();
}

@Override
public boolean tryLock() {
return delegate.tryLock();
}

@Override
public void unlock() {
delegate.unlock();
}

boolean isHeldByCurrentThread() {
return delegate.isHeldByCurrentThread();
}

boolean isLocked() {
return delegate.isLocked();
}

@Override
public String toString() {
return "LockableEntry{" + id + "}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* @opensearch.experimental
*/
@ExperimentalApi
public class CompositeDataFormat implements DataFormat {
public class CompositeDataFormat extends DataFormat {

private final List<DataFormat> dataFormats;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.dataformat.DataFormat;
import org.opensearch.index.engine.dataformat.DataFormatPlugin;
import org.opensearch.index.engine.dataformat.DocumentInput;
import org.opensearch.index.engine.dataformat.IndexingExecutionEngine;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.shard.ShardPath;
Expand Down Expand Up @@ -129,6 +128,10 @@ public void loadExtensions(ExtensionLoader loader) {
continue;
}
String name = format.name();
if (name == null || name.isBlank()) {
logger.warn("DataFormatPlugin [{}] returned a DataFormat with null/blank name, skipping", plugin.getClass().getName());
continue;
}
DataFormatPlugin existing = registry.get(name);
if (existing != null) {
long existingPriority = existing.getDataFormat().priority();
Expand Down Expand Up @@ -180,13 +183,12 @@ public DataFormat getDataFormat() {
}

@Override
@SuppressWarnings("unchecked")
public <T extends DataFormat, P extends DocumentInput<?>> IndexingExecutionEngine<T, P> indexingEngine(
public IndexingExecutionEngine<?, ?> indexingEngine(
MapperService mapperService,
ShardPath shardPath,
IndexSettings indexSettings
) {
return (IndexingExecutionEngine<T, P>) new CompositeIndexingExecutionEngine(
return new CompositeIndexingExecutionEngine(
dataFormatPlugins,
indexSettings,
mapperService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,6 @@ public CompositeIndexingExecutionEngine(

List<IndexingExecutionEngine<?, ?>> secondaries = new ArrayList<>();
for (String secondaryName : secondaryFormatNames) {
if (secondaryName.equals(primaryFormatName)) {
logger.warn("Secondary data format [{}] is the same as primary, skipping duplicate", secondaryName);
continue;
}
DataFormatPlugin secondaryPlugin = dataFormatPlugins.get(secondaryName);
secondaries.add(secondaryPlugin.indexingEngine(mapperService, shardPath, indexSettings));
allFormats.add(secondaryPlugin.getDataFormat());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@

import java.io.IOException;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.opensearch.composite.queue.Lockable;

/**
* A composite {@link Writer} that wraps one {@link Writer} per registered data format
* and delegates write operations to each per-format writer.
Expand All @@ -42,12 +40,12 @@
* @opensearch.experimental
*/
@ExperimentalApi
public class CompositeWriter implements Writer<CompositeDocumentInput>, Lock {
public class CompositeWriter implements Writer<CompositeDocumentInput>, Lockable {

private static final Logger logger = LogManager.getLogger(CompositeWriter.class);

private final Map.Entry<DataFormat, Writer<DocumentInput<?>>> primaryWriter;
private final Map<DataFormat, Writer<DocumentInput<?>>> secondaryWriters;
private final Map<DataFormat, Writer<DocumentInput<?>>> secondaryWritersByFormat;
private final ReentrantLock lock;
private final long writerGeneration;
private final RowIdGenerator rowIdGenerator;
Expand Down Expand Up @@ -79,9 +77,12 @@ public CompositeWriter(CompositeIndexingExecutionEngine engine, long writerGener

Map<DataFormat, Writer<DocumentInput<?>>> secondaries = new LinkedHashMap<>();
for (IndexingExecutionEngine<?, ?> delegate : engine.getSecondaryDelegates()) {
secondaries.put(delegate.getDataFormat(), (Writer<DocumentInput<?>>) delegate.createWriter(writerGeneration));
secondaries.put(
delegate.getDataFormat(),
(Writer<DocumentInput<?>>) delegate.createWriter(writerGeneration)
);
}
this.secondaryWriters = Collections.unmodifiableMap(secondaries);
this.secondaryWritersByFormat = Map.copyOf(secondaries);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this not be unmodifiableMap?

this.rowIdGenerator = new RowIdGenerator(CompositeWriter.class.getName());
}

Expand All @@ -97,19 +98,20 @@ public WriteResult addDoc(CompositeDocumentInput doc) throws IOException {
}
}

// Then write to each secondary by matching format keys
Map<DataFormat, DocumentInput<?>> secondaryInputMap = doc.getSecondaryInputs();
for (Map.Entry<DataFormat, Writer<DocumentInput<?>>> entry : secondaryWriters.entrySet()) {
DocumentInput<?> input = secondaryInputMap.get(entry.getKey());
if (input == null) {
logger.warn("No secondary input found for format [{}], skipping", entry.getKey().name());
// Then write to each secondary — keyed lookup by DataFormat (equals/hashCode based on name)
Map<DataFormat, DocumentInput<?>> secondaryInputs = doc.getSecondaryInputs();
for (Map.Entry<DataFormat, DocumentInput<?>> inputEntry : secondaryInputs.entrySet()) {
DataFormat format = inputEntry.getKey();
Writer<DocumentInput<?>> writer = secondaryWritersByFormat.get(format);
if (writer == null) {
logger.warn("No writer found for secondary format [{}], skipping", format.name());
continue;
}
WriteResult result = entry.getValue().addDoc(input);
WriteResult result = writer.addDoc(inputEntry.getValue());
switch (result) {
case WriteResult.Success s -> logger.trace("Successfully added document in secondary format [{}]", entry.getKey().name());
case WriteResult.Success s -> logger.trace("Successfully added document in secondary format [{}]", format.name());
case WriteResult.Failure f -> {
logger.debug("Failed to add document in secondary format [{}]", entry.getKey().name());
logger.debug("Failed to add document in secondary format [{}]", format.name());
return result;
}
}
Expand All @@ -125,25 +127,28 @@ public FileInfos flush() throws IOException {
Optional<WriterFileSet> primaryWfs = primaryWriter.getValue().flush().getWriterFileSet(primaryWriter.getKey());
primaryWfs.ifPresent(writerFileSet -> builder.putWriterFileSet(primaryWriter.getKey(), writerFileSet));
// Flush secondaries
for (Map.Entry<DataFormat, Writer<DocumentInput<?>>> entry : secondaryWriters.entrySet()) {
Optional<WriterFileSet> wfs = entry.getValue().flush().getWriterFileSet(entry.getKey());
wfs.ifPresent(writerFileSet -> builder.putWriterFileSet(entry.getKey(), writerFileSet));
for (Writer<DocumentInput<?>> writer : secondaryWritersByFormat.values()) {
FileInfos fileInfos = writer.flush();
// Iterate all format entries in the returned FileInfos
for (Map.Entry<DataFormat, WriterFileSet> fileEntry : fileInfos.writerFilesMap().entrySet()) {
builder.putWriterFileSet(fileEntry.getKey(), fileEntry.getValue());
}
}
return builder.build();
}

@Override
public void sync() throws IOException {
primaryWriter.getValue().sync();
for (Writer<DocumentInput<?>> writer : secondaryWriters.values()) {
for (Writer<DocumentInput<?>> writer : secondaryWritersByFormat.values()) {
writer.sync();
}
}

@Override
public void close() throws IOException {
primaryWriter.getValue().close();
for (Writer<DocumentInput<?>> writer : secondaryWriters.values()) {
for (Writer<DocumentInput<?>> writer : secondaryWritersByFormat.values()) {
writer.close();
}
}
Expand Down Expand Up @@ -194,28 +199,13 @@ public void lock() {
lock.lock();
}

@Override
public void lockInterruptibly() throws InterruptedException {
lock.lockInterruptibly();
}

@Override
public boolean tryLock() {
return lock.tryLock();
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return lock.tryLock(time, unit);
}

@Override
public void unlock() {
lock.unlock();
}

@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,11 @@ public DataFormat getDataFormat() {
}

@Override
public <
T extends DataFormat,
P extends org.opensearch.index.engine.dataformat.DocumentInput<?>>
org.opensearch.index.engine.dataformat.IndexingExecutionEngine<T, P>
indexingEngine(
org.opensearch.index.mapper.MapperService mapperService,
org.opensearch.index.shard.ShardPath shardPath,
IndexSettings indexSettings
) {
public org.opensearch.index.engine.dataformat.IndexingExecutionEngine<?, ?> indexingEngine(
org.opensearch.index.mapper.MapperService mapperService,
org.opensearch.index.shard.ShardPath shardPath,
IndexSettings indexSettings
) {
return null;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,12 @@ public DataFormat getDataFormat() {
}

@Override
@SuppressWarnings("unchecked")
public <T extends DataFormat, P extends DocumentInput<?>> IndexingExecutionEngine<T, P> indexingEngine(
public IndexingExecutionEngine<?, ?> indexingEngine(
MapperService mapperService,
ShardPath shardPath,
IndexSettings indexSettings
) {
return (IndexingExecutionEngine<T, P>) new StubIndexingExecutionEngine(format);
return new StubIndexingExecutionEngine(format);
}
};
}
Expand All @@ -96,13 +95,12 @@ public DataFormat getDataFormat() {
}

@Override
@SuppressWarnings("unchecked")
public <T extends DataFormat, P extends DocumentInput<?>> IndexingExecutionEngine<T, P> indexingEngine(
public IndexingExecutionEngine<?, ?> indexingEngine(
MapperService mapperService,
ShardPath shardPath,
IndexSettings indexSettings
) {
return (IndexingExecutionEngine<T, P>) new StubIndexingExecutionEngine(format);
return new StubIndexingExecutionEngine(format);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
* Tests for {@link CompositeWriter}.
Expand Down Expand Up @@ -76,19 +75,6 @@ public void testTryLockSucceedsWhenUnlocked() throws IOException {
writer.close();
}

public void testTryLockWithTimeoutSucceeds() throws Exception {
CompositeWriter writer = new CompositeWriter(engine, 0);
assertTrue(writer.tryLock(100, TimeUnit.MILLISECONDS));
writer.unlock();
writer.close();
}

public void testNewConditionThrowsUnsupported() throws IOException {
CompositeWriter writer = new CompositeWriter(engine, 0);
expectThrows(UnsupportedOperationException.class, writer::newCondition);
writer.close();
}

public void testFlushReturnsFileInfos() throws IOException {
CompositeWriter writer = new CompositeWriter(engine, 0);
FileInfos fileInfos = writer.flush();
Expand All @@ -109,10 +95,4 @@ public void testCloseDoesNotThrow() throws IOException {
writer.close();
}

public void testLockInterruptiblySucceeds() throws Exception {
CompositeWriter writer = new CompositeWriter(engine, 0);
writer.lockInterruptibly();
writer.unlock();
writer.close();
}
}
Loading
Loading