diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java index fe109a1ffe60b..8be2725fc7ab2 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java @@ -18,6 +18,7 @@ package org.apache.hudi.io.storage; +import org.apache.hudi.SparkAdapterSupport$; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.engine.TaskContextSupplier; @@ -26,6 +27,7 @@ import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaType; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.io.lance.HoodieBaseLanceWriter; import org.apache.hudi.io.storage.row.HoodieBloomFilterRowWriteSupport; @@ -34,7 +36,6 @@ import org.apache.hudi.storage.StoragePath; import org.lance.spark.arrow.LanceArrowWriter; -import lombok.AllArgsConstructor; import lombok.Builder; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.types.pojo.ArrowType; @@ -42,8 +43,11 @@ import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters; import org.apache.spark.sql.types.ArrayType; import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.MapType; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.MetadataBuilder; @@ -58,6 +62,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.BiConsumer; import java.util.function.Function; import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD; @@ -88,7 +93,11 @@ public class HoodieSparkLanceWriter extends HoodieBaseLanceWriter variantPlan = enrichForLanceVariant(sparkSchema); + this.variantOrdinals = variantPlan.getRight(); + this.sparkSchema = enrichSparkSchemaForLance(variantPlan.getLeft()); Schema baseArrow = LanceArrowUtils.toArrowSchema(this.sparkSchema, DEFAULT_TIMEZONE, true); // Force LargeBinary + `lance-encoding:blob=true` on each BLOB's nested `data` Arrow leaf. // Can't be expressed Spark-side: toArrowSchema drops nested-field metadata, and tagging @@ -293,6 +305,54 @@ private static Field rewriteBlobDataChild(Field blobStructField) { return new Field(blobStructField.getName(), blobStructField.getFieldType(), rebuilt); } + /** + * Single-pass walk that returns (a) the enriched schema with top-level + * {@code VariantType} fields replaced by Hudi's canonical + * {@code Struct[metadata: binary, value: binary]} (tagged {@code hudi_type=VARIANT} + * so {@code HoodieSparkSchemaConverters} promotes it back on read), and (b) the + * variant ordinals in ascending order. {@code LanceArrowUtils} has no VariantType + * case, so we hand it a plain struct. Top-level only - nested variants are not + * yet supported. + */ + private static Pair enrichForLanceVariant(StructType sparkSchema) { + StructField[] fields = sparkSchema.fields(); + StructField[] newFields = null; + List ordinals = null; + for (int i = 0; i < fields.length; i++) { + StructField field = fields[i]; + if (!SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(field.dataType())) { + if (newFields != null) { + newFields[i] = field; + } + continue; + } + if (newFields == null) { + newFields = new StructField[fields.length]; + System.arraycopy(fields, 0, newFields, 0, i); + ordinals = new ArrayList<>(); + } + ordinals.add(i); + StructField metaField = new StructField( + HoodieSchema.Variant.VARIANT_METADATA_FIELD, DataTypes.BinaryType, false, Metadata.empty()); + StructField valField = new StructField( + HoodieSchema.Variant.VARIANT_VALUE_FIELD, DataTypes.BinaryType, false, Metadata.empty()); + StructType variantStruct = new StructType(new StructField[] {metaField, valField}); + Metadata enriched = new MetadataBuilder() + .withMetadata(field.metadata()) + .putString(HoodieSchema.TYPE_METADATA_FIELD, HoodieSchemaType.VARIANT.name()) + .build(); + newFields[i] = new StructField(field.name(), variantStruct, field.nullable(), enriched); + } + if (newFields == null) { + return Pair.of(sparkSchema, EMPTY_INT_ARRAY); + } + int[] ordinalArr = new int[ordinals.size()]; + for (int i = 0; i < ordinalArr.length; i++) { + ordinalArr[i] = ordinals.get(i); + } + return Pair.of(new StructType(newFields), ordinalArr); + } + @Override public void writeRowWithMetadata(HoodieKey key, InternalRow row) throws IOException { UTF8String recordKey = UTF8String.fromString(key.getRecordKey()); @@ -323,7 +383,8 @@ public void writeRow(InternalRow row) throws IOException { @Override protected ArrowWriter createArrowWriter(VectorSchemaRoot root) { - return SparkArrowWriter.of(LanceArrowWriter.create(root, sparkSchema)); + return SparkArrowWriter.of( + LanceArrowWriter.create(root, sparkSchema), inputSparkSchema, variantOrdinals); } /** @@ -399,13 +460,59 @@ protected void updateRecordMetadata(InternalRow row, row.update(FILENAME_METADATA_FIELD.ordinal(), fileName); } - @AllArgsConstructor(staticName = "of") - private static class SparkArrowWriter implements ArrowWriter { + /** + * Forwards rows to the lance-spark {@link LanceArrowWriter}. When the schema + * has no {@code VariantType} columns, rows are passed through directly. When it + * does, a single {@link VariantProjectedRow} instance is reused per row to + * delegate every accessor to the underlying input row except at variant + * ordinals, where it returns a pre-allocated {@code (metadata, value)} struct + * populated by {@link org.apache.spark.sql.hudi.SparkAdapter#createVariantValueWriter}. + */ + private static final class SparkArrowWriter implements ArrowWriter { private final LanceArrowWriter lanceArrowWriter; + private final VariantProjectedRow projectedRow; + + private SparkArrowWriter(LanceArrowWriter lanceArrowWriter, VariantProjectedRow projectedRow) { + this.lanceArrowWriter = lanceArrowWriter; + this.projectedRow = projectedRow; + } + + static SparkArrowWriter of(LanceArrowWriter lanceArrowWriter, + StructType inputSchema, + int[] variantOrdinals) { + if (variantOrdinals.length == 0) { + return new SparkArrowWriter(lanceArrowWriter, null); + } + int numFields = inputSchema.fields().length; + GenericInternalRow[] structByOrdinal = new GenericInternalRow[numFields]; + List> extractorByOrdinal = new ArrayList<>(numFields); + for (int i = 0; i < numFields; i++) { + extractorByOrdinal.add(null); + } + for (int o : variantOrdinals) { + // Pre-allocated per variant column; slot 0 = metadata, slot 1 = value + // (parquet-variant spec on-disk order). lance-spark's BinaryWriter copies + // bytes synchronously via VarBinaryVector.setSafe, so the byte[] returned by + // VariantVal.getMetadata/getValue does not need to outlive a single write(). + GenericInternalRow struct = new GenericInternalRow(new Object[2]); + structByOrdinal[o] = struct; + extractorByOrdinal.set(o, SparkAdapterSupport$.MODULE$.sparkAdapter().createVariantValueWriter( + inputSchema.fields()[o].dataType(), + (byte[] v) -> struct.update(1, v), + (byte[] m) -> struct.update(0, m))); + } + VariantProjectedRow projectedRow = SparkAdapterSupport$.MODULE$.sparkAdapter() + .createVariantProjectedRow(numFields, structByOrdinal, extractorByOrdinal); + return new SparkArrowWriter(lanceArrowWriter, projectedRow); + } @Override public void write(InternalRow row) { - lanceArrowWriter.write(row); + if (projectedRow == null) { + lanceArrowWriter.write(row); + return; + } + lanceArrowWriter.write(projectedRow.wrap(row)); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java index 91a0421d01186..4f217a55edc6a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java @@ -18,12 +18,15 @@ package org.apache.hudi.io.storage; +import org.apache.hudi.SparkAdapterSupport$; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.exception.HoodieException; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.StructVector; import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; @@ -149,6 +152,33 @@ public boolean hasNext() { return false; } + /** + * If the requested Spark field is {@code VariantType} and the matching Arrow vector is a + * struct, wrap it so Spark's positional {@link ColumnVector#getVariant(int)} reads + * {@code value} from {@code child(0)} and {@code metadata} from {@code child(1)}, while the + * Lance file keeps the parquet-variant spec on-disk order ({@code metadata}, {@code value}). + * Otherwise return a plain {@link LanceArrowColumnVector}. + */ + private static ColumnVector wrapForVariantIfNeeded(StructField sparkField, FieldVector fv) { + if (!SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(sparkField.dataType()) + || !(fv instanceof StructVector)) { + return new LanceArrowColumnVector(fv); + } + StructVector sv = (StructVector) fv; + FieldVector valueFv = sv.getChild(HoodieSchema.Variant.VARIANT_VALUE_FIELD); + FieldVector metadataFv = sv.getChild(HoodieSchema.Variant.VARIANT_METADATA_FIELD); + if (valueFv == null || metadataFv == null) { + throw new HoodieException("Variant column '" + sparkField.name() + + "' is missing required child fields '" + HoodieSchema.Variant.VARIANT_VALUE_FIELD + + "' and/or '" + HoodieSchema.Variant.VARIANT_METADATA_FIELD + "'"); + } + return new LanceVariantColumnVector( + sparkField.dataType(), + new LanceArrowColumnVector(fv), + new LanceArrowColumnVector(valueFv), + new LanceArrowColumnVector(metadataFv)); + } + @Override public UnsafeRow next() { if (!hasNext()) { @@ -182,7 +212,7 @@ private void buildColumnVectors(VectorSchemaRoot root) { throw new HoodieException("Lance batch missing expected column '" + name + "' for file: " + path + "; available columns: " + byName.keySet()); } - columnVectors[i] = new LanceArrowColumnVector(fv); + columnVectors[i] = wrapForVariantIfNeeded(sparkFields[i], fv); } if (blobTransform != null) { blobTransform.init(columnVectors, sparkSchema); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceVariantColumnVector.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceVariantColumnVector.java new file mode 100644 index 0000000000000..79ea84b35776a --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceVariantColumnVector.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarArray; +import org.apache.spark.sql.vectorized.ColumnarMap; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * {@link ColumnVector} wrapper that exposes a Hudi/parquet-variant on-disk struct + * ({@code metadata: binary, value: binary}) as a Spark {@code VariantType} column. + * + *

Spark's {@link ColumnVector#getVariant(int)} is {@code final} and reads its + * children positionally: + *

+ *   new VariantVal(getChild(0).getBinary(rowId), getChild(1).getBinary(rowId))
+ * 
+ * - i.e. {@code child(0) = value}, {@code child(1) = metadata}. Hudi follows the + * parquet-variant spec on disk ({@code metadata} first, {@code value} second), so we + * swap children at the {@code ColumnVector} boundary: {@code getChild(0)} returns the + * value column and {@code getChild(1)} returns the metadata column. The on-disk + * Lance file is unchanged. See #18334 and future actionable items required to fix this. + */ +class LanceVariantColumnVector extends ColumnVector { + + private final ColumnVector innerStructVector; + private final ColumnVector valueChild; + private final ColumnVector metadataChild; + + LanceVariantColumnVector(DataType variantType, + ColumnVector innerStructVector, + ColumnVector valueChild, + ColumnVector metadataChild) { + super(variantType); + this.innerStructVector = innerStructVector; + this.valueChild = valueChild; + this.metadataChild = metadataChild; + } + + @Override + public void close() { + // Only close the inner struct wrapper. The metadata/value child wrappers + // reference Arrow buffers owned by the parent struct's allocator, so closing + // them would double-release. + innerStructVector.close(); + } + + @Override + public boolean hasNull() { + return innerStructVector.hasNull(); + } + + @Override + public int numNulls() { + return innerStructVector.numNulls(); + } + + @Override + public boolean isNullAt(int rowId) { + return innerStructVector.isNullAt(rowId); + } + + @Override + public ColumnVector getChild(int ordinal) { + if (ordinal == 0) { + return valueChild; + } + if (ordinal == 1) { + return metadataChild; + } + throw new IndexOutOfBoundsException("Variant child ordinal out of range: " + ordinal); + } + + // The remaining accessors are unreachable: Spark's getVariant only calls isNullAt + // and getChild on a VariantType column. Fail loudly if anything else is invoked. + + @Override + public boolean getBoolean(int rowId) { + throw new UnsupportedOperationException("getBoolean not supported on variant column"); + } + + @Override + public byte getByte(int rowId) { + throw new UnsupportedOperationException("getByte not supported on variant column"); + } + + @Override + public short getShort(int rowId) { + throw new UnsupportedOperationException("getShort not supported on variant column"); + } + + @Override + public int getInt(int rowId) { + throw new UnsupportedOperationException("getInt not supported on variant column"); + } + + @Override + public long getLong(int rowId) { + throw new UnsupportedOperationException("getLong not supported on variant column"); + } + + @Override + public float getFloat(int rowId) { + throw new UnsupportedOperationException("getFloat not supported on variant column"); + } + + @Override + public double getDouble(int rowId) { + throw new UnsupportedOperationException("getDouble not supported on variant column"); + } + + @Override + public ColumnarArray getArray(int rowId) { + throw new UnsupportedOperationException("getArray not supported on variant column"); + } + + @Override + public ColumnarMap getMap(int ordinal) { + throw new UnsupportedOperationException("getMap not supported on variant column"); + } + + @Override + public Decimal getDecimal(int rowId, int precision, int scale) { + throw new UnsupportedOperationException("getDecimal not supported on variant column"); + } + + @Override + public UTF8String getUTF8String(int rowId) { + throw new UnsupportedOperationException("getUTF8String not supported on variant column"); + } + + @Override + public byte[] getBinary(int rowId) { + throw new UnsupportedOperationException("getBinary not supported on variant column"); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/VariantProjectedRow.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/VariantProjectedRow.java new file mode 100644 index 0000000000000..bf0a76697b2fa --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/VariantProjectedRow.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.spark.sql.catalyst.InternalRow; + +/** + * View over a Lance-bound {@link InternalRow} that delegates every accessor to the + * wrapped input row, except at configured variant ordinals where it returns a + * pre-allocated {@code (metadata, value)} struct populated by extractors built + * from {@link org.apache.spark.sql.hudi.SparkAdapter#createVariantValueWriter}. + * + *

The implementation lives in a Spark-4-specific module since it must override + * {@code InternalRow.getVariant} (introduced in Spark 4.0). This interface keeps + * the shared writer module free of Spark-4-only types. + */ +public interface VariantProjectedRow { + + /** + * Set the underlying row and return an {@link InternalRow} view of this projection. + * Implementations are typically stateful and return {@code this}; the returned row + * is valid only until the next call to {@code wrap}. + */ + InternalRow wrap(InternalRow input); +} diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index 2934536f4ec9a..6b935e2fc17c9 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -25,6 +25,7 @@ import org.apache.hudi.common.schema.HoodieSchema import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit import org.apache.hudi.common.util.{Option => HOption} +import org.apache.hudi.io.storage.VariantProjectedRow import org.apache.hudi.storage.StorageConfiguration import org.apache.hadoop.conf.Configuration @@ -36,7 +37,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedPredicate, SpecializedGetters} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, GenericInternalRow, InterpretedPredicate, SpecializedGetters} import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface} import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} import org.apache.spark.sql.catalyst.trees.Origin @@ -453,6 +454,29 @@ trait SparkAdapter extends Serializable { writeMetadata: Consumer[Array[Byte]] ): BiConsumer[SpecializedGetters, Integer] + /** + * Creates a [[VariantProjectedRow]] for the Lance writer's variant projection. + * Spark 3.x throws (no VariantType support); Spark 4.x returns a row that delegates + * accessors to the wrapped input row, except at the configured variant ordinals where + * it returns the pre-allocated {@code (metadata, value)} struct populated by the + * matching extractor. + * + * Kept on the adapter so the shared {@code hudi-spark-client} module never needs to + * reference Spark-4-only types like {@code VariantVal} / {@code InternalRow.getVariant}. + * + * @param numFields total number of top-level fields in the projection + * @param variantStructByOrdinal non-null only at variant ordinals; pre-allocated + * {@code GenericInternalRow(new Object[2])} + * @param extractorByOrdinal non-null only at variant ordinals; populates the + * corresponding entry in {@code variantStructByOrdinal} + * @return a stateful [[VariantProjectedRow]] (Spark 4 only) + */ + def createVariantProjectedRow( + numFields: Int, + variantStructByOrdinal: Array[GenericInternalRow], + extractorByOrdinal: java.util.List[BiConsumer[SpecializedGetters, java.lang.Integer]] + ): VariantProjectedRow + /** * Converts a VariantType field to Parquet Type. * Returns null for Spark 3.x or if the data type is not VariantType. diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala index c7039c951c4ef..1de8234e61305 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala @@ -24,6 +24,7 @@ import org.apache.hudi.common.schema.HoodieSchema import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit import org.apache.hudi.common.util.JsonUtils +import org.apache.hudi.io.storage.VariantProjectedRow import org.apache.hudi.spark.internal.ReflectUtil import org.apache.parquet.schema.Type @@ -36,7 +37,7 @@ import org.apache.spark.sql.FileFormatUtilsForFileGroupReader.applyFiltersToPlan import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Predicate, SpecializedGetters} +import org.apache.spark.sql.catalyst.expressions.{Expression, GenericInternalRow, InterpretedPredicate, Predicate, SpecializedGetters} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -204,6 +205,14 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging { throw new UnsupportedOperationException("Spark 3.x does not support VariantType") } + override def createVariantProjectedRow( + numFields: Int, + variantStructByOrdinal: Array[GenericInternalRow], + extractorByOrdinal: java.util.List[BiConsumer[SpecializedGetters, java.lang.Integer]] + ): VariantProjectedRow = { + throw new UnsupportedOperationException("Spark 3.x does not support VariantType") + } + override def convertVariantFieldToParquetType( dataType: DataType, fieldName: String, diff --git a/hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/io/storage/Spark4VariantProjectedRow.java b/hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/io/storage/Spark4VariantProjectedRow.java new file mode 100644 index 0000000000000..13af87f8dd607 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/io/storage/Spark4VariantProjectedRow.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; +import org.apache.spark.unsafe.types.VariantVal; + +import java.util.List; +import java.util.function.BiConsumer; + +/** + * Spark 4 implementation of {@link VariantProjectedRow}. Pass-through + * {@link InternalRow} that delegates every accessor to the wrapped input row + * except at variant ordinals, where it returns a pre-allocated + * {@code (metadata, value)} {@link GenericInternalRow} that the extractor + * populates on demand. + * + *

Single-row, single-threaded; {@link #wrap(InternalRow)} mutates state. + * {@link #copy()} / {@link #update(int, Object)} / {@link #setNullAt(int)} + * throw - lance-spark consumes each row synchronously inside + * {@code LanceArrowWriter.write(InternalRow)}, so no copy is ever needed. + */ +public abstract class Spark4VariantProjectedRow extends InternalRow implements VariantProjectedRow { + + private final int numFields; + private final GenericInternalRow[] variantStructByOrdinal; + private final List> extractorByOrdinal; + protected InternalRow input; + + public Spark4VariantProjectedRow(int numFields, + GenericInternalRow[] variantStructByOrdinal, + List> extractorByOrdinal) { + this.numFields = numFields; + this.variantStructByOrdinal = variantStructByOrdinal; + this.extractorByOrdinal = extractorByOrdinal; + } + + @Override + public InternalRow wrap(InternalRow input) { + this.input = input; + return this; + } + + @Override + public int numFields() { + return numFields; + } + + @Override + public boolean isNullAt(int ordinal) { + return input.isNullAt(ordinal); + } + + @Override + public InternalRow getStruct(int ordinal, int n) { + GenericInternalRow variantStruct = variantStructByOrdinal[ordinal]; + if (variantStruct != null) { + // Populate metadata (slot 0) and value (slot 1) via the captured lambdas. + extractorByOrdinal.get(ordinal).accept(input, ordinal); + return variantStruct; + } + return input.getStruct(ordinal, n); + } + + @Override + public boolean getBoolean(int ordinal) { + return input.getBoolean(ordinal); + } + + @Override + public byte getByte(int ordinal) { + return input.getByte(ordinal); + } + + @Override + public short getShort(int ordinal) { + return input.getShort(ordinal); + } + + @Override + public int getInt(int ordinal) { + return input.getInt(ordinal); + } + + @Override + public long getLong(int ordinal) { + return input.getLong(ordinal); + } + + @Override + public float getFloat(int ordinal) { + return input.getFloat(ordinal); + } + + @Override + public double getDouble(int ordinal) { + return input.getDouble(ordinal); + } + + @Override + public Decimal getDecimal(int ordinal, int precision, int scale) { + return input.getDecimal(ordinal, precision, scale); + } + + @Override + public UTF8String getUTF8String(int ordinal) { + return input.getUTF8String(ordinal); + } + + @Override + public byte[] getBinary(int ordinal) { + return input.getBinary(ordinal); + } + + @Override + public CalendarInterval getInterval(int ordinal) { + return input.getInterval(ordinal); + } + + @Override + public VariantVal getVariant(int ordinal) { + return input.getVariant(ordinal); + } + + @Override + public ArrayData getArray(int ordinal) { + return input.getArray(ordinal); + } + + @Override + public MapData getMap(int ordinal) { + return input.getMap(ordinal); + } + + @Override + public Object get(int ordinal, DataType dataType) { + return input.get(ordinal, dataType); + } + + @Override + public void setNullAt(int i) { + throw new UnsupportedOperationException("Spark4VariantProjectedRow is read-only"); + } + + @Override + public void update(int i, Object value) { + throw new UnsupportedOperationException("Spark4VariantProjectedRow is read-only"); + } + + @Override + public InternalRow copy() { + throw new UnsupportedOperationException("Spark4VariantProjectedRow is single-row scope"); + } +} diff --git a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala index ea6e96943a692..8d8be468b9b97 100644 --- a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala +++ b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala @@ -21,6 +21,7 @@ import org.apache.hudi.{AvroConversionUtils, DefaultSource, HoodieSchemaConversi import org.apache.hudi.common.schema.HoodieSchema import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.util.JsonUtils +import org.apache.hudi.io.storage.VariantProjectedRow import org.apache.hudi.spark.internal.ReflectUtil import org.apache.hudi.storage.StorageConfiguration diff --git a/hudi-spark-datasource/hudi-spark4.0.x/src/main/java/org/apache/hudi/io/storage/Spark40VariantProjectedRow.java b/hudi-spark-datasource/hudi-spark4.0.x/src/main/java/org/apache/hudi/io/storage/Spark40VariantProjectedRow.java new file mode 100644 index 0000000000000..037b83d04bd92 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark4.0.x/src/main/java/org/apache/hudi/io/storage/Spark40VariantProjectedRow.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters; + +import java.util.List; +import java.util.function.BiConsumer; + +public final class Spark40VariantProjectedRow extends Spark4VariantProjectedRow { + + public Spark40VariantProjectedRow(int numFields, + GenericInternalRow[] variantStructByOrdinal, + List> extractorByOrdinal) { + super(numFields, variantStructByOrdinal, extractorByOrdinal); + } +} diff --git a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala index 7a3d8bec2403c..1410906dd93b7 100644 --- a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala +++ b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.FileSlice import org.apache.hudi.common.schema.HoodieSchema import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit import org.apache.hudi.common.util.{Option => HOption} +import org.apache.hudi.io.storage.{Spark40VariantProjectedRow, VariantProjectedRow} import org.apache.hadoop.conf.Configuration import org.apache.parquet.schema.MessageType @@ -33,7 +34,7 @@ import org.apache.spark.sql.avro._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, ResolvedTable} import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, GenericInternalRow, SpecializedGetters} import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ @@ -58,6 +59,8 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel._ import org.apache.spark.unsafe.types.UTF8String +import java.util.function.BiConsumer + import scala.jdk.CollectionConverters.MapHasAsScala /** @@ -126,6 +129,14 @@ class Spark4_0Adapter extends BaseSpark4Adapter { new Spark40HoodieInternalRow(metaFields, sourceRow, sourceContainsMetaFields) } + override def createVariantProjectedRow( + numFields: Int, + variantStructByOrdinal: Array[GenericInternalRow], + extractorByOrdinal: java.util.List[BiConsumer[SpecializedGetters, java.lang.Integer]] + ): VariantProjectedRow = { + new Spark40VariantProjectedRow(numFields, variantStructByOrdinal, extractorByOrdinal) + } + override def createPartitionCDCFileGroupMapping(partitionValues: InternalRow, fileSplits: List[HoodieCDCFileSplit]): HoodiePartitionCDCFileGroupMapping = { new Spark40HoodiePartitionCDCFileGroupMapping(partitionValues, fileSplits) diff --git a/hudi-spark-datasource/hudi-spark4.1.x/src/main/java/org/apache/hudi/io/storage/Spark41VariantProjectedRow.java b/hudi-spark-datasource/hudi-spark4.1.x/src/main/java/org/apache/hudi/io/storage/Spark41VariantProjectedRow.java new file mode 100644 index 0000000000000..94d7c9e8abc10 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark4.1.x/src/main/java/org/apache/hudi/io/storage/Spark41VariantProjectedRow.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters; +import org.apache.spark.unsafe.types.GeographyVal; +import org.apache.spark.unsafe.types.GeometryVal; + +import java.util.List; +import java.util.function.BiConsumer; + +public final class Spark41VariantProjectedRow extends Spark4VariantProjectedRow { + + public Spark41VariantProjectedRow(int numFields, + GenericInternalRow[] variantStructByOrdinal, + List> extractorByOrdinal) { + super(numFields, variantStructByOrdinal, extractorByOrdinal); + } + + @Override + public GeometryVal getGeometry(int ordinal) { + return input.getGeometry(ordinal); + } + + @Override + public GeographyVal getGeography(int ordinal) { + return input.getGeography(ordinal); + } +} diff --git a/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_1Adapter.scala b/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_1Adapter.scala index 195979548bc4f..575702403746b 100644 --- a/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_1Adapter.scala +++ b/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_1Adapter.scala @@ -22,6 +22,7 @@ import org.apache.hudi.client.model.{HoodieInternalRow, Spark41HoodieInternalRow import org.apache.hudi.common.model.FileSlice import org.apache.hudi.common.schema.HoodieSchema import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit +import org.apache.hudi.io.storage.{Spark41VariantProjectedRow, VariantProjectedRow} import org.apache.hadoop.conf.Configuration import org.apache.parquet.schema.{GroupType, LogicalTypeAnnotation, Types} @@ -32,7 +33,7 @@ import org.apache.spark.sql.avro._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, ResolvedTable} import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, CreateNamedStruct, Expression, Literal, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, CreateNamedStruct, Expression, GenericInternalRow, Literal, SpecializedGetters, UnsafeProjection} import org.apache.spark.sql.catalyst.expressions.variant.VariantGet import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface} import org.apache.spark.sql.catalyst.planning.PhysicalOperation @@ -58,6 +59,8 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel._ import org.apache.spark.unsafe.types.UTF8String +import java.util.function.BiConsumer + import scala.jdk.CollectionConverters.MapHasAsScala /** @@ -128,6 +131,14 @@ class Spark4_1Adapter extends BaseSpark4Adapter { new Spark41HoodieInternalRow(metaFields, sourceRow, sourceContainsMetaFields) } + override def createVariantProjectedRow( + numFields: Int, + variantStructByOrdinal: Array[GenericInternalRow], + extractorByOrdinal: java.util.List[BiConsumer[SpecializedGetters, java.lang.Integer]] + ): VariantProjectedRow = { + new Spark41VariantProjectedRow(numFields, variantStructByOrdinal, extractorByOrdinal) + } + override def createPartitionCDCFileGroupMapping(partitionValues: InternalRow, fileSplits: List[HoodieCDCFileSplit]): HoodiePartitionCDCFileGroupMapping = { new Spark41HoodiePartitionCDCFileGroupMapping(partitionValues, fileSplits)