Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.io.storage;

import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.engine.TaskContextSupplier;
Expand All @@ -26,6 +27,7 @@
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.io.lance.HoodieBaseLanceWriter;
import org.apache.hudi.io.storage.row.HoodieBloomFilterRowWriteSupport;
Expand All @@ -34,16 +36,18 @@
import org.apache.hudi.storage.StoragePath;

import org.lance.spark.arrow.LanceArrowWriter;
import lombok.AllArgsConstructor;
import lombok.Builder;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.MetadataBuilder;
Expand All @@ -58,6 +62,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;

import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD;
Expand Down Expand Up @@ -88,7 +93,11 @@ public class HoodieSparkLanceWriter extends HoodieBaseLanceWriter<InternalRow, U
private static final String LANCE_BLOB_ENCODING_KEY = "lance-encoding:blob";
private static final String LANCE_BLOB_ENCODING_VALUE = "true";

private static final int[] EMPTY_INT_ARRAY = new int[0];

private final StructType sparkSchema;
private final StructType inputSparkSchema;
private final int[] variantOrdinals;
private final Schema arrowSchema;
private final UTF8String fileName;
private final UTF8String instantTime;
Expand Down Expand Up @@ -157,7 +166,10 @@ private HoodieSparkLanceWriter(StoragePath file,
long flushByteWatermark) {
super(file, DEFAULT_BATCH_SIZE, allocatorSize, flushByteWatermark,
bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new));
this.sparkSchema = enrichSparkSchemaForLance(sparkSchema);
this.inputSparkSchema = sparkSchema;
Pair<StructType, int[]> variantPlan = enrichForLanceVariant(sparkSchema);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: variantPlan reads like a LogicalPlan or QueryPlan in the Spark context — could you rename it to something like schemaAndOrdinals or variantEnrichment to make it clear this is just a (StructType, int[]) pair?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: variantPlan reads like a query/execution plan, which isn't quite what this is — it's just the result of schema enrichment (a transformed schema + ordinal list). Could you rename it to something like variantEnriched or schemaAndOrdinals to make the content clearer at a glance?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

this.variantOrdinals = variantPlan.getRight();
this.sparkSchema = enrichSparkSchemaForLance(variantPlan.getLeft());
Schema baseArrow = LanceArrowUtils.toArrowSchema(this.sparkSchema, DEFAULT_TIMEZONE, true);
// Force LargeBinary + `lance-encoding:blob=true` on each BLOB's nested `data` Arrow leaf.
// Can't be expressed Spark-side: toArrowSchema drops nested-field metadata, and tagging
Expand Down Expand Up @@ -293,6 +305,54 @@ private static Field rewriteBlobDataChild(Field blobStructField) {
return new Field(blobStructField.getName(), blobStructField.getFieldType(), rebuilt);
}

/**
* Single-pass walk that returns (a) the enriched schema with top-level
* {@code VariantType} fields replaced by Hudi's canonical
* {@code Struct[metadata: binary, value: binary]} (tagged {@code hudi_type=VARIANT}
* so {@code HoodieSparkSchemaConverters} promotes it back on read), and (b) the
* variant ordinals in ascending order. {@code LanceArrowUtils} has no VariantType
* case, so we hand it a plain struct. Top-level only - nested variants are not
* yet supported.
*/
private static Pair<StructType, int[]> enrichForLanceVariant(StructType sparkSchema) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the sibling method is enrichSparkSchemaForLance — it might be worth renaming this to enrichSparkSchemaForLanceVariant so the two reads as a parallel pair and it's clear this one also operates on the schema.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could enrichForLanceVariant be incorporated into enrichSparkSchemaForLance?

StructField[] fields = sparkSchema.fields();
StructField[] newFields = null;
List<Integer> ordinals = null;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 enrichForLanceVariant only matches top-level VariantType. If a user has a nested variant (e.g. Struct[..variant..], Array[Variant], Map[_, Variant]), isVariantType(field.dataType()) returns false and the schema falls through unmodified — LanceArrowUtils.toArrowSchema then throws an opaque UNSUPPORTED_DATATYPE. Could we add a defensive walk that fails fast with a clear "nested variants not supported in Lance" message so users aren't left to decode the lance-spark error?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

for (int i = 0; i < fields.length; i++) {
StructField field = fields[i];
if (!SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(field.dataType())) {
if (newFields != null) {
newFields[i] = field;
}
continue;
}
if (newFields == null) {
newFields = new StructField[fields.length];
System.arraycopy(fields, 0, newFields, 0, i);
ordinals = new ArrayList<>();
}
ordinals.add(i);
StructField metaField = new StructField(
HoodieSchema.Variant.VARIANT_METADATA_FIELD, DataTypes.BinaryType, false, Metadata.empty());
StructField valField = new StructField(
HoodieSchema.Variant.VARIANT_VALUE_FIELD, DataTypes.BinaryType, false, Metadata.empty());
StructType variantStruct = new StructType(new StructField[] {metaField, valField});
Metadata enriched = new MetadataBuilder()
.withMetadata(field.metadata())
.putString(HoodieSchema.TYPE_METADATA_FIELD, HoodieSchemaType.VARIANT.name())
.build();
newFields[i] = new StructField(field.name(), variantStruct, field.nullable(), enriched);
}
if (newFields == null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: could you replace this manual unboxing loop with ordinals.stream().mapToInt(Integer::intValue).toArray()? It's a bit more idiomatic Java and saves a few lines.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: could you replace the manual unboxing loop with ordinals.stream().mapToInt(Integer::intValue).toArray()? It expresses the intent in one line.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

return Pair.of(sparkSchema, EMPTY_INT_ARRAY);
}
int[] ordinalArr = new int[ordinals.size()];
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the three-line int[] conversion could be collapsed to ordinals.stream().mapToInt(Integer::intValue).toArray(), removing the intermediate array declaration and manual loop.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

for (int i = 0; i < ordinalArr.length; i++) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: could you simplify this to ordinals.stream().mapToInt(Integer::intValue).toArray()? The three-line loop is a bit ceremonial for what's really just a List→int[] conversion.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

ordinalArr[i] = ordinals.get(i);
}
return Pair.of(new StructType(newFields), ordinalArr);
}

@Override
public void writeRowWithMetadata(HoodieKey key, InternalRow row) throws IOException {
UTF8String recordKey = UTF8String.fromString(key.getRecordKey());
Expand Down Expand Up @@ -323,7 +383,8 @@ public void writeRow(InternalRow row) throws IOException {

@Override
protected ArrowWriter<InternalRow> createArrowWriter(VectorSchemaRoot root) {
return SparkArrowWriter.of(LanceArrowWriter.create(root, sparkSchema));
return SparkArrowWriter.of(
LanceArrowWriter.create(root, sparkSchema), inputSparkSchema, variantOrdinals);
}

/**
Expand Down Expand Up @@ -399,13 +460,59 @@ protected void updateRecordMetadata(InternalRow row,
row.update(FILENAME_METADATA_FIELD.ordinal(), fileName);
}

@AllArgsConstructor(staticName = "of")
private static class SparkArrowWriter implements ArrowWriter<InternalRow> {
/**
* Forwards rows to the lance-spark {@link LanceArrowWriter}. When the schema
* has no {@code VariantType} columns, rows are passed through directly. When it
* does, a single {@link VariantProjectedRow} instance is reused per row to
* delegate every accessor to the underlying input row except at variant
* ordinals, where it returns a pre-allocated {@code (metadata, value)} struct
* populated by {@link org.apache.spark.sql.hudi.SparkAdapter#createVariantValueWriter}.
Comment on lines +464 to +469
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this projection introduce overhead? Does Lance library or writer provide its own projection or adaptation for Variant Type?

*/
private static final class SparkArrowWriter implements ArrowWriter<InternalRow> {
private final LanceArrowWriter lanceArrowWriter;
private final VariantProjectedRow projectedRow;

private SparkArrowWriter(LanceArrowWriter lanceArrowWriter, VariantProjectedRow projectedRow) {
this.lanceArrowWriter = lanceArrowWriter;
this.projectedRow = projectedRow;
}

static SparkArrowWriter of(LanceArrowWriter lanceArrowWriter,
StructType inputSchema,
int[] variantOrdinals) {
if (variantOrdinals.length == 0) {
return new SparkArrowWriter(lanceArrowWriter, null);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the add(null) loop could be replaced with new ArrayList<>(Collections.nCopies(numFields, null)) — same effect, more expressive intent.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the two-step init (new ArrayList<>(numFields) + null-fill loop) could be collapsed to new ArrayList<>(Collections.nCopies(numFields, null)), which is a bit more declarative about what's happening.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

int numFields = inputSchema.fields().length;
GenericInternalRow[] structByOrdinal = new GenericInternalRow[numFields];
List<BiConsumer<SpecializedGetters, Integer>> extractorByOrdinal = new ArrayList<>(numFields);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the null-fill loop on the next few lines could be replaced by new ArrayList<>(Collections.nCopies(numFields, null)), making the pre-sized, null-initialized intent clearer in one line.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

for (int i = 0; i < numFields; i++) {
extractorByOrdinal.add(null);
}
for (int o : variantOrdinals) {
// Pre-allocated per variant column; slot 0 = metadata, slot 1 = value
// (parquet-variant spec on-disk order). lance-spark's BinaryWriter copies
// bytes synchronously via VarBinaryVector.setSafe, so the byte[] returned by
// VariantVal.getMetadata/getValue does not need to outlive a single write().
GenericInternalRow struct = new GenericInternalRow(new Object[2]);
structByOrdinal[o] = struct;
extractorByOrdinal.set(o, SparkAdapterSupport$.MODULE$.sparkAdapter().createVariantValueWriter(
inputSchema.fields()[o].dataType(),
(byte[] v) -> struct.update(1, v),
(byte[] m) -> struct.update(0, m)));
}
VariantProjectedRow projectedRow = SparkAdapterSupport$.MODULE$.sparkAdapter()
.createVariantProjectedRow(numFields, structByOrdinal, extractorByOrdinal);
return new SparkArrowWriter(lanceArrowWriter, projectedRow);
}

@Override
public void write(InternalRow row) {
lanceArrowWriter.write(row);
if (projectedRow == null) {
lanceArrowWriter.write(row);
return;
}
lanceArrowWriter.write(projectedRow.wrap(row));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@

package org.apache.hudi.io.storage;

import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.exception.HoodieException;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
Expand Down Expand Up @@ -149,6 +152,33 @@ public boolean hasNext() {
return false;
}

/**
* If the requested Spark field is {@code VariantType} and the matching Arrow vector is a
* struct, wrap it so Spark's positional {@link ColumnVector#getVariant(int)} reads
* {@code value} from {@code child(0)} and {@code metadata} from {@code child(1)}, while the
* Lance file keeps the parquet-variant spec on-disk order ({@code metadata}, {@code value}).
* Otherwise return a plain {@link LanceArrowColumnVector}.
*/
private static ColumnVector wrapForVariantIfNeeded(StructField sparkField, FieldVector fv) {
if (!SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(sparkField.dataType())
|| !(fv instanceof StructVector)) {
return new LanceArrowColumnVector(fv);
}
StructVector sv = (StructVector) fv;
FieldVector valueFv = sv.getChild(HoodieSchema.Variant.VARIANT_VALUE_FIELD);
FieldVector metadataFv = sv.getChild(HoodieSchema.Variant.VARIANT_METADATA_FIELD);
if (valueFv == null || metadataFv == null) {
throw new HoodieException("Variant column '" + sparkField.name()
+ "' is missing required child fields '" + HoodieSchema.Variant.VARIANT_VALUE_FIELD
+ "' and/or '" + HoodieSchema.Variant.VARIANT_METADATA_FIELD + "'");
}
return new LanceVariantColumnVector(
sparkField.dataType(),
new LanceArrowColumnVector(fv),
new LanceArrowColumnVector(valueFv),
new LanceArrowColumnVector(metadataFv));
}

@Override
public UnsafeRow next() {
if (!hasNext()) {
Expand Down Expand Up @@ -182,7 +212,7 @@ private void buildColumnVectors(VectorSchemaRoot root) {
throw new HoodieException("Lance batch missing expected column '" + name
+ "' for file: " + path + "; available columns: " + byName.keySet());
}
columnVectors[i] = new LanceArrowColumnVector(fv);
columnVectors[i] = wrapForVariantIfNeeded(sparkFields[i], fv);
}
if (blobTransform != null) {
blobTransform.init(columnVectors, sparkSchema);
Expand Down
Loading
Loading