Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,20 @@ public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment
Set.of(PublicApi.class, ExperimentalApi.class, DeprecatedApi.class)
);

// for (var element : elements) {
// validate(element);
//
// if (!checkPackage(element)) {
// continue;
// }
//
// // Skip all not-public elements
// checkPublicVisibility(null, element);
//
// if (element instanceof TypeElement) {
// process((TypeElement) element);
// }
// }
for (var element : elements) {
validate(element);

if (!checkPackage(element)) {
continue;
}

// Skip all not-public elements
checkPublicVisibility(null, element);

if (element instanceof TypeElement) {
process((TypeElement) element);
}
}

return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,14 @@

package org.opensearch.vectorized.execution.search;

/**
* Provides search capabilities across vectorized execution catalogs.
*/
public class CatalogSearcher {

/**
* Creates a new CatalogSearcher.
*/
public CatalogSearcher() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,14 @@

import org.opensearch.common.annotation.ExperimentalApi;

/**
DataFormat supported by OpenSearch
*/
@ExperimentalApi
public enum DataFormat {
/** CSV Format*/
CSV,

/** Text Format */
Text
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,14 @@

package org.opensearch.vectorized.execution.search;

/**
* Provides read access to vectorized execution indices.
*/
public class IndexReader {

/**
* Creates a new IndexReader.
*/
public IndexReader() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Vectorized execution search components.
*/
package org.opensearch.vectorized.execution.search;
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.vectorized.execution.search.spi;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.vectorized.execution.search.DataFormat;

import java.util.List;
Expand All @@ -18,6 +19,7 @@
* Implementations provide access to different data formats (CSV, Parquet, etc.)
* through the DataFusion query engine.
*/
@ExperimentalApi
public interface DataSourceCodec {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@
*/
public class CsvDataSourceCodec implements DataSourceCodec {

/**
* Creates a new CsvDataSourceCodec.
*/
public CsvDataSourceCodec() {
}

private static final Logger logger = LogManager.getLogger(CsvDataSourceCodec.class);
private static final AtomicLong runtimeIdGenerator = new AtomicLong(0);
private static final AtomicLong sessionIdGenerator = new AtomicLong(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,18 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.index.engine.exec.DataFormat;

/**
* CSV data format implementation.
*/
public class CsvDataFormat implements DataFormat {

/**
* Creates a new CSV data format.
*/
public CsvDataFormat() {
// Default constructor
}

@Override
public Setting<Settings> dataFormatSettings() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,23 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/**
* CSV indexing execution engine.
*/
public class CsvEngine implements IndexingExecutionEngine<CsvDataFormat> {

private final AtomicLong counter = new AtomicLong();
private final Set<CsvWriter> openWriters = new HashSet<>();
private List<FileMetadata> openFiles = new ArrayList<>();
static CsvDataFormat CSV = new CsvDataFormat();

/**
* Creates a new CSV indexing execution engine.
*/
public CsvEngine() {
// Default constructor
}

@Override
public List<String> supportedFieldTypes() {
return List.of();
Expand All @@ -60,10 +70,18 @@ public RefreshResult refresh(RefreshInput refreshInput) throws IOException {
return refreshResult;
}

/**
* CSV document input.
*/
public static class CsvInput implements DocumentInput<String> {
private final List<String> values = new ArrayList<>();
private final CsvWriter writer;

/**
* Creates a new CsvInput.
*
* @param writer the CSV writer
*/
public CsvInput(CsvWriter writer) {
this.writer = writer;
}
Expand Down Expand Up @@ -93,13 +111,23 @@ public void close() throws Exception {
}
}

/**
* CSV writer implementation.
*/
public static class CsvWriter implements Writer<CsvInput> {
private final StringBuilder sb = new StringBuilder();
private final File currentFile;
private AtomicBoolean flushed = new AtomicBoolean(false);
private final Runnable onClose;
private boolean headerWritten = false;

/**
* Creates a new CsvWriter.
*
* @param currentFile the file name
* @param engine the CSV engine
* @throws IOException if an I/O error occurs
*/
public CsvWriter(String currentFile, CsvEngine engine) throws IOException {
this.currentFile = new File("/Users/gbh/" + currentFile);
this.currentFile.createNewFile();
Expand Down Expand Up @@ -149,6 +177,11 @@ public CsvInput newDocumentInput() {
return new CsvInput(this);
}

/**
* Writes CSV headers.
*
* @param headers the header list
*/
public void writeHeaders(List<String> headers) {
if (!headerWritten) {
String headerLine = String.join(",", headers) + "\n";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* CSV engine execution components.
*/
package org.opensearch.datafusion.csv.engine.exec;
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public DataFusionPlugin(Settings settings) {
* @param namedWriteableRegistry The named writeable registry.
* @param indexNameExpressionResolver The index name expression resolver instance.
* @param repositoriesServiceSupplier The supplier for the repositories service.
* @param dataSourceCodecs The data source codecs map.
* @return Collection of created components
*/
@Override
Expand Down Expand Up @@ -118,6 +119,10 @@ public List<DataFormat> getSupportedFormats() {

/**
* Create engine per shard per format with initial view of catalog
* @param dataFormat The data format
* @param formatCatalogSnapshot The format catalog snapshot
* @return The search execution engine
* @throws IOException If an I/O error occurs
*/
// TODO : one engine per format, does that make sense ?
// TODO : Engine shouldn't just be SearcherOperations, it can be more ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ private static synchronized void loadNativeLibrary() {
*/
public static native long createGlobalRuntime();

/**
* Creates a Tokio runtime.
*
* @return the runtime pointer
*/
public static native long createTokioRuntime();

/**
Expand Down Expand Up @@ -111,12 +116,25 @@ private static synchronized void loadNativeLibrary() {
* Execute a Substrait query plan
* @param cachePtr the session context ID
* @param substraitPlan the serialized Substrait query plan
* @param runtimePtr the runtime pointer
* @return stream pointer for result iteration
*/
public static native long executeSubstraitQuery(long cachePtr, byte[] substraitPlan, long runtimePtr);

/**
* Creates a DataFusion reader.
*
* @param path the directory path
* @param files the file names
* @return the reader pointer
*/
public static native long createDatafusionReader(String path, String[] files);

/**
* Closes a DataFusion reader.
*
* @param ptr the reader pointer
*/
public static native void closeDatafusionReader(long ptr);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class DataFusionService extends AbstractLifecycleComponent {

/**
* Creates a new DataFusion service instance.
*
* @param dataSourceCodecs the data source codecs map
*/
public DataFusionService(Map<DataFormat, DataSourceCodec> dataSourceCodecs) {
this.dataSourceRegistry = new DataSourceRegistry(dataSourceCodecs);
Expand Down Expand Up @@ -161,10 +163,20 @@ public CompletableFuture<RecordBatchStream> executeSubstraitQuery(long sessionCo
return engine.executeSubstraitQuery(sessionContextId, substraitPlanBytes);
}

/**
* Gets the runtime pointer.
*
* @return the runtime pointer
*/
public long getRuntimePointer() {
return globalRuntimeEnv.getPointer();
}

/**
* Gets the Tokio runtime pointer.
*
* @return the Tokio runtime pointer
*/
public long getTokioRuntimePointer() {
return globalRuntimeEnv.getTokioRuntimePtr();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ public class DataSourceRegistry {

private final ConcurrentHashMap<DataFormat, DataSourceCodec> codecs = new ConcurrentHashMap<>();

/**
* Creates a new DataSourceRegistry.
*
* @param dataSourceCodecMap the data source codec map
*/
public DataSourceRegistry(Map<DataFormat, DataSourceCodec> dataSourceCodecMap) {
codecs.putAll(dataSourceCodecMap);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
import java.util.Map;
import java.util.function.Function;

/**
* DataFusion search execution engine.
*/
public class DatafusionEngine extends SearchExecEngine<DatafusionContext, DatafusionSearcher,
DatafusionReaderManager, DatafusionQuery> {

Expand All @@ -54,6 +57,12 @@ public class DatafusionEngine extends SearchExecEngine<DatafusionContext, Datafu
private DatafusionReaderManager datafusionReaderManager;
private DataFusionService datafusionService;

/**
* Constructor
* @param dataFormat The data format
* @param formatCatalogSnapshot The format catalog snapshot
* @param dataFusionService The DataFusion service
*/
public DatafusionEngine(DataFormat dataFormat, Collection<FileMetadata> formatCatalogSnapshot, DataFusionService dataFusionService) throws IOException {
this.dataFormat = dataFormat;
this.datafusionReaderManager = new DatafusionReaderManager("TODO://FigureOutPath", formatCatalogSnapshot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
import java.util.ArrayList;

/**
* DataFusion-specific query phase searcher using Substrait queries
*
* DataFusion-specific query phase searcher using Substrait queries.
*/
public class DatafusionQueryPhaseSearcher implements GenericQueryPhaseSearcher<DatafusionContext,DatafusionSearcher, DatafusionQuery> {

/**
* Creates a new DatafusionQueryPhaseSearcher.
*/
public DatafusionQueryPhaseSearcher() {
}

// How to pass table providers that search other engines such as Lucene ?
@Override
public boolean searchWith(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class RecordBatchStream {
/**
* Creates a new RecordBatchStream for the given stream pointer
* @param streamId the stream pointer
* @param runtimePtr the runtime pointer
* @param allocator memory allocator for Arrow vectors
*/
public RecordBatchStream(long streamId, long runtimePtr, BufferAllocator allocator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.util.concurrent.CompletableFuture;

/** Default record batch stream */
public class DefaultRecordBatchStream implements RecordBatchStream {

private static final Logger logger = LogManager.getLogger(DefaultRecordBatchStream.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public long getPointer() {
return ptr;
}

/** Gets the Tokio runtime pointer */
public long getTokioRuntimePtr() {
return tokio_runtime_ptr;
}
Expand Down
Loading
Loading