Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,6 @@
<dependency>
<groupId>jakarta.servlet</groupId>
<artifactId>jakarta.servlet-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
Expand All @@ -619,7 +618,6 @@
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-main-base</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.facebook.presto</groupId>
Expand All @@ -634,12 +632,10 @@
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-parser</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-analyzer</artifactId>
<scope>test</scope>
</dependency>

<dependency>
Expand Down Expand Up @@ -708,7 +704,6 @@
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<scope>test</scope>
</dependency>

<dependency>
Expand Down Expand Up @@ -782,7 +777,6 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

Expand All @@ -798,6 +792,11 @@
<ignoredNonTestScopedDependency>org.apache.httpcomponents.core5:httpcore5</ignoredNonTestScopedDependency>
<ignoredNonTestScopedDependency>com.amazonaws:aws-java-sdk-s3</ignoredNonTestScopedDependency>
<ignoredNonTestScopedDependency>com.amazonaws:aws-java-sdk-core</ignoredNonTestScopedDependency>
<ignoredNonTestScopedDependency>org.apache.commons:commons-math3</ignoredNonTestScopedDependency>
<ignoredNonTestScopedDependency>com.facebook.presto:presto-parser</ignoredNonTestScopedDependency>
<ignoredNonTestScopedDependency>com.facebook.presto:presto-analyzer</ignoredNonTestScopedDependency>
<ignoredNonTestScopedDependency>org.jetbrains:annotations</ignoredNonTestScopedDependency>
<ignoredNonTestScopedDependency>jakarta.servlet:jakarta.servlet-api</ignoredNonTestScopedDependency>
</ignoredNonTestScopedDependencies>
<!-- <ignoredUsedUndeclaredDependencies>-->
<!-- &lt;!&ndash; Ignore these because they are picked up as false-positives when configuring logging in the IcebergQueryRunner &ndash;&gt;-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa
predicateColumns));
// capture subfields from domainPredicate to add to remainingPredicate
// so those filters don't get lost
Map<String, com.facebook.presto.common.type.Type> columnTypes = getColumns(icebergTable.schema(), icebergTable.spec(), typeManager).stream()
Map<String, com.facebook.presto.common.type.Type> columnTypes = getColumns(session, icebergTable.schema(), icebergTable.spec(), typeManager).stream()
.collect(toImmutableMap(IcebergColumnHandle::getName, icebergColumnHandle -> getColumnMetadata(session, tableHandle, icebergColumnHandle).getType()));

RowExpression subfieldPredicate = getSubfieldPredicate(session, icebergTableLayoutHandle, columnTypes, functionResolution, rowExpressionService);
Expand Down Expand Up @@ -689,7 +689,7 @@ protected ConnectorInsertTableHandle beginIcebergTableInsert(ConnectorSession se
table.getIcebergTableName(),
toPrestoSchema(icebergTable.schema(), typeManager),
toPrestoPartitionSpec(icebergTable.spec(), typeManager),
getColumnsForWrite(icebergTable.schema(), icebergTable.spec(), typeManager),
getColumnsForWrite(session, icebergTable.schema(), icebergTable.spec(), typeManager),
icebergTable.location(),
getFileFormat(icebergTable),
getCompressionCodec(session),
Expand Down Expand Up @@ -932,7 +932,7 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
icebergTableHandle.getIcebergTableName(),
toPrestoSchema(icebergTable.schema(), typeManager),
toPrestoPartitionSpec(icebergTable.spec(), typeManager),
getColumns(icebergTable.schema(), icebergTable.spec(), typeManager),
getColumns(session, icebergTable.schema(), icebergTable.spec(), typeManager),
icebergTable.location(),
getFileFormat(icebergTable),
getCompressionCodec(session),
Expand Down Expand Up @@ -1336,7 +1336,7 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
}

ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
for (IcebergColumnHandle columnHandle : getColumns(schema, icebergTable.spec(), typeManager)) {
for (IcebergColumnHandle columnHandle : getColumns(session, schema, icebergTable.spec(), typeManager)) {
columnHandles.put(columnHandle.getName(), columnHandle);
}
if (table.getIcebergTableName().getTableType() != CHANGELOG) {
Expand Down Expand Up @@ -2344,7 +2344,7 @@ public ConnectorInsertTableHandle beginRefreshMaterializedView(
storageTableHandle.getIcebergTableName(),
toPrestoSchema(storageTable.schema(), typeManager),
toPrestoPartitionSpec(storageTable.spec(), typeManager),
getColumns(storageTable.schema(), storageTable.spec(), typeManager),
getColumns(session, storageTable.schema(), storageTable.spec(), typeManager),
storageTable.location(),
getFileFormat(storageTable),
getCompressionCodec(session),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,15 @@ public static IcebergColumnHandle create(Types.NestedField column, TypeManager t
columnType);
}

public static IcebergColumnHandle create(int partitionFieldId, Types.NestedField column, TypeManager typeManager, ColumnType columnType)
{
return new IcebergColumnHandle(
createColumnIdentity(column.name(), partitionFieldId, column.type()),
toPrestoType(column.type(), typeManager),
Optional.ofNullable(column.doc()),
columnType);
}

public static Subfield getPushedDownSubfield(IcebergColumnHandle column)
{
checkArgument(isPushedDownSubfield(column), format("not a valid pushed down subfield: type=%s, subfields=%s", column.getColumnType(), column.getRequiredSubfields()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,8 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
new IcebergTableName(tableName, DATA, Optional.empty(), Optional.empty(), Optional.empty()),
toPrestoSchema(metadata.schema(), typeManager),
toPrestoPartitionSpec(metadata.spec(), typeManager),
getColumnsForWrite(metadata.schema(), metadata.spec(), typeManager),
// TODO: Changed from getColumns to getColumnsForWrite
getColumnsForWrite(session, metadata.schema(), metadata.spec(), typeManager),
targetPath,
fileFormat,
getCompressionCodec(session),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,8 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
new IcebergTableName(tableName, DATA, Optional.empty(), Optional.empty(), Optional.empty()),
toPrestoSchema(icebergTable.schema(), typeManager),
toPrestoPartitionSpec(icebergTable.spec(), typeManager),
getColumnsForWrite(icebergTable.schema(), icebergTable.spec(), typeManager),
// Changed to getColumnsForWrite
getColumnsForWrite(session, icebergTable.schema(), icebergTable.spec(), typeManager),
icebergTable.location(),
fileFormat,
getCompressionCodec(session),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ public IcebergPageSink(
this.sortOrder = requireNonNull(sortOrder, "sortOrder is null");
String tempDirectoryPath = locationProvider.newDataLocation("sort-tmp-files");
this.tempDirectory = new Path(tempDirectoryPath);
List<Type> types = getColumnsForWrite(outputSchema, partitionSpec, requireNonNull(sortParameters.getTypeManager(), "typeManager is null")).stream()
// Changed to getColumnsForWrite
List<Type> types = getColumnsForWrite(session, outputSchema, partitionSpec, requireNonNull(sortParameters.getTypeManager(), "typeManager is null")).stream()
.map(IcebergColumnHandle::getType)
.collect(toImmutableList());
this.sortParameters = sortParameters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,7 @@ public ConnectorPageSource createPageSource(
// add any additional columns which may need to be read from storage
// by delete filters
boolean equalityDeletesRequired = table.getIcebergTableName().getTableType() == IcebergTableType.DATA;
requiredColumnsForDeletes(tableSchema, partitionSpec, split.getDeletes(), equalityDeletesRequired)
requiredColumnsForDeletes(session, tableSchema, partitionSpec, split.getDeletes(), equalityDeletesRequired)
.stream()
.filter(not(icebergColumns::contains))
.forEach(columnsToReadFromStorage::add);
Expand Down Expand Up @@ -918,7 +918,7 @@ else if (subColumn.getId() == MERGE_PARTITION_DATA.getId()) {
pageIndexerFactory,
hdfsEnvironment,
hdfsContext,
getColumns(tableSchema, partitionSpec, typeManager),
getColumns(session, tableSchema, partitionSpec, typeManager),
jsonCodec,
session,
split.getFileFormat(),
Expand Down Expand Up @@ -948,7 +948,7 @@ else if (subColumn.getId() == MERGE_PARTITION_DATA.getId()) {
return dataSource;
}

private Set<IcebergColumnHandle> requiredColumnsForDeletes(Schema schema,
private Set<IcebergColumnHandle> requiredColumnsForDeletes(ConnectorSession session, Schema schema,
PartitionSpec partitionSpec,
List<DeleteFile> deletes,
boolean equalityDeletesRequired)
Expand All @@ -959,7 +959,7 @@ private Set<IcebergColumnHandle> requiredColumnsForDeletes(Schema schema,
requiredColumns.add(IcebergColumnHandle.create(ROW_POSITION, typeManager, IcebergColumnHandle.ColumnType.REGULAR));
}
else if (deleteFile.content() == EQUALITY_DELETES && equalityDeletesRequired) {
getColumns(deleteFile.equalityFieldIds().stream(), schema, partitionSpec, typeManager)
getColumns(session, deleteFile.equalityFieldIds().stream(), schema, partitionSpec, typeManager)
.forEach(requiredColumns::add);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@

import com.facebook.airlift.log.Logger;
import com.facebook.airlift.units.DataSize;
import com.facebook.presto.FullConnectorSession;
import com.facebook.presto.common.GenericInternalException;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.predicate.NullableValue;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.resourceGroups.QueryType;
import com.facebook.presto.common.type.DecimalType;
import com.facebook.presto.common.type.Decimals;
import com.facebook.presto.common.type.TimestampType;
Expand Down Expand Up @@ -147,6 +149,7 @@
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_TABLE_TIMESTAMP;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.isMetadataColumnId;
import static com.facebook.presto.iceberg.IcebergPartitionType.ALL;
import static com.facebook.presto.iceberg.IcebergPartitionType.IDENTITY;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec;
import static com.facebook.presto.iceberg.IcebergSessionProperties.isMergeOnReadModeEnabled;
Expand Down Expand Up @@ -398,14 +401,21 @@ public static Map<String, List<String>> getPartitionFields(PartitionSpec partiti
return partitionFields;
}

public static List<IcebergColumnHandle> getColumns(Schema schema, PartitionSpec partitionSpec, TypeManager typeManager)
public static List<IcebergColumnHandle> getColumns(ConnectorSession session, Schema schema, PartitionSpec partitionSpec, TypeManager typeManager)
{
return getColumns(schema.columns().stream().map(NestedField::fieldId), schema, partitionSpec, typeManager);
return getColumns(session, schema.columns().stream().map(NestedField::fieldId), schema, partitionSpec, typeManager);
}

public static List<IcebergColumnHandle> getColumns(Stream<Integer> fields, Schema schema, PartitionSpec partitionSpec, TypeManager typeManager)
public static List<IcebergColumnHandle> getColumns(ConnectorSession session, Stream<Integer> fields, Schema schema, PartitionSpec partitionSpec, TypeManager typeManager)
{
Set<String> partitionFieldNames = getPartitionFields(partitionSpec, IDENTITY).keySet();
IcebergPartitionType partitionType = IDENTITY;
if (session instanceof FullConnectorSession) {
Optional<QueryType> queryType = ((FullConnectorSession) session).getSession().getQueryType();
if (queryType.isPresent() && queryType.get() == QueryType.INSERT) {
partitionType = ALL;
}
}
Set<String> partitionFieldNames = getPartitionFields(partitionSpec, partitionType).keySet();

return fields
.map(schema::findField)
Expand All @@ -415,20 +425,25 @@ public static List<IcebergColumnHandle> getColumns(Stream<Integer> fields, Schem
.collect(toImmutableList());
}

public static List<IcebergColumnHandle> getColumnsForWrite(Schema schema, PartitionSpec partitionSpec, TypeManager typeManager)
public static List<IcebergColumnHandle> getColumnsForWrite(ConnectorSession session, Schema schema, PartitionSpec partitionSpec, TypeManager typeManager)
{
return getColumnsForWrite(schema.columns().stream().map(NestedField::fieldId), schema, partitionSpec, typeManager);
return getColumnsForWrite(session, schema.columns().stream().map(NestedField::fieldId), schema, partitionSpec, typeManager);
}

private static List<IcebergColumnHandle> getColumnsForWrite(Stream<Integer> fields, Schema schema, PartitionSpec partitionSpec, TypeManager typeManager)
private static List<IcebergColumnHandle> getColumnsForWrite(ConnectorSession session, Stream<Integer> fields, Schema schema, PartitionSpec partitionSpec, TypeManager typeManager)
{
Set<Integer> partitionSourceIds = partitionSpec.fields().stream()
.map(PartitionField::sourceId)
.collect(toImmutableSet());
IcebergPartitionType partitionType = IDENTITY;
if (session instanceof FullConnectorSession) {
Optional<QueryType> queryType = ((FullConnectorSession) session).getSession().getQueryType();
if (queryType.isPresent() && queryType.get() == QueryType.INSERT) {
partitionType = ALL;
}
}
Set<String> partitionFieldNames = getPartitionFields(partitionSpec, partitionType).keySet();

return fields
.map(schema::findField)
.map(column -> partitionSourceIds.contains(column.fieldId()) ?
.map(column -> partitionFieldNames.contains(column.name()) ?
IcebergColumnHandle.create(column, typeManager, PARTITION_KEY) :
IcebergColumnHandle.create(column, typeManager, REGULAR))
.collect(toImmutableList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public ChangelogSplitSource(
{
requireNonNull(session, "session is null");
requireNonNull(typeManager, "typeManager is null");
this.columnHandles = getColumns(table.schema(), table.spec(), typeManager);
this.columnHandles = getColumns(session, table.schema(), table.spec(), typeManager);
this.minimumAssignedSplitWeight = getMinimumAssignedSplitWeight(session);
this.targetSplitSize = getTargetSplitSize(session, tableScan).toBytes();
this.nodeSelectionStrategy = getNodeSelectionStrategy(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(Connec
tableHandle.getIcebergTableName(),
toPrestoSchema(icebergTable.schema(), typeManager),
toPrestoPartitionSpec(icebergTable.spec(), typeManager),
getColumns(icebergTable.schema(), icebergTable.spec(), typeManager),
getColumns(session, icebergTable.schema(), icebergTable.spec(), typeManager),
icebergTable.location(),
getFileFormat(icebergTable),
getCompressionCodec(session),
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ release: #: Build the release version
$(MAKE) cmake BUILD_DIR=release BUILD_TYPE=Release && \
$(MAKE) build BUILD_DIR=release

cmake-and-build: #: cmake and build without updating submodules which requires git
cmake-and-build: #: cmake and build without updating submodules which requires git
cmake -B "$(BUILD_BASE_DIR)/$(BUILD_DIR)" $(FORCE_COLOR) $(CMAKE_FLAGS) $(EXTRA_CMAKE_FLAGS) && \
cmake --build $(BUILD_BASE_DIR)/$(BUILD_DIR) -j $(NUM_THREADS)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,8 @@ HivePrestoToVeloxConnector::toVeloxTableHandle(
std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
HivePrestoToVeloxConnector::toVeloxInsertTableHandle(
const protocol::CreateHandle* createHandle,
const TypeParser& typeParser) const {
const TypeParser& typeParser,
memory::MemoryPool* pool) const {
auto hiveOutputTableHandle =
std::dynamic_pointer_cast<protocol::hive::HiveOutputTableHandle>(
createHandle->handle.connectorHandle);
Expand Down Expand Up @@ -423,7 +424,8 @@ HivePrestoToVeloxConnector::toVeloxInsertTableHandle(
std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
HivePrestoToVeloxConnector::toVeloxInsertTableHandle(
const protocol::InsertHandle* insertHandle,
const TypeParser& typeParser) const {
const TypeParser& typeParser, const {
velox::memory::MemoryPool* pool)
auto hiveInsertTableHandle =
std::dynamic_pointer_cast<protocol::hive::HiveInsertTableHandle>(
insertHandle->handle.connectorHandle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ class HivePrestoToVeloxConnector final : public PrestoToVeloxConnector {
std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
toVeloxInsertTableHandle(
const protocol::CreateHandle* createHandle,
const TypeParser& typeParser) const final;
const TypeParser& typeParser,
velox::memory::MemoryPool* pool) const final;

std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
toVeloxInsertTableHandle(
const protocol::InsertHandle* insertHandle,
const TypeParser& typeParser) const final;
const TypeParser& typeParser,
velox::memory::MemoryPool* pool) const final;

std::unique_ptr<velox::core::PartitionFunctionSpec>
createVeloxPartitionFunctionSpec(
Expand Down
Loading
Loading