-
Notifications
You must be signed in to change notification settings - Fork 1.1k
fix: resolution of complex type variants in Avro unions #9328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Move the newly added tests above the two large comprehensive e2e tests.
711a3a5 to
409eef0
Compare
Test an Avro reader schema evolution where a field is added to a nested record, and the record is typed as nullable in the reader schema. Reuse the existing nested_records.avro test file which provides a wealth of nested record types to exercise different cases of nesting, nullability and defaults.
Move the nullability index value into the NullablePlan enum used by the decoder.
When an Avro reader schema has a union type that needs to be resolved against the type in the writer schema, resolution information other than primitive type promotions was not properly handled when creating the decoder. Extend the union resolution information in the decoder with variant data for enum remapping and record projection. The Projector data structure with Skipper decoders makes part of this information, which necessitated some refactoring.
Do apply resolutions other than promotions; only the promotions are actually worked into the decoder when decode_with_promotion is called. A default value is a valid resolution case for fields added in the reader schema with a default value, don't return an error, but implement as a ResolutionPlan::DefaultValue variant.
In the Avro decoder, the NullabilityPlan::ReadTag variant did not preserve the resolution information that needs to be applied to conform to the writer schema.
Change the expected results to the new behavior: the resolution for a writer union type that is not an exact match is always Union to signal that the tag needs to be read. Certain ResolutionInfo values could be equivalently replaced with None which means tag reading and direct promotion, but the nullability plan will be constructed the same way regardless. Conversely, the resolution for a non-union writer type may be other than Union.
409eef0 to
6bdb068
Compare
jecsand838
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mzabaluev LMGTM!
Overall these changes are extremely clean. Just left a few comments.
arrow-avro/src/reader/record.rs
Outdated
| panic!( | ||
| "unexpected union resolution info for non-union writer and union reader type", | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new panic in decoder construction can abort the process for malformed/partial union resolution state. Since this path already returns Result, I think using return AvroError::SchemaError (or ParseError) instead of panicking would be better, so callers can decide how to handle the failure.
arrow-avro/src/reader/record.rs
Outdated
| let Some((_, resolution)) = | ||
| info.writer_to_reader[nullability.non_null_index()].as_ref() | ||
| else { | ||
| panic!("unexpected union resolution info for nullable writer type"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. The panic on unexpected nullable-union mapping turns a recoverable schema mismatch into a hard crash.
arrow-avro/src/reader/record.rs
Outdated
| panic!("enum mapping resolution provided for non-enum decoder"); | ||
| }; | ||
| let raw = buf.get_int()?; | ||
| let resolved = res.resolve(raw)?; | ||
| indices.push(resolved); | ||
| Ok(()) | ||
| } | ||
| ResolutionPlan::Record(proj) => { | ||
| let Self::Record(_, encodings, _, _) = self else { | ||
| panic!("record projection provided for non-record decoder"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here as well. We should probably be passing AvroErrors back to the caller instead of potentially crashing imo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant to catch the cases where it's clearly a programming error if any other combination occurs from reading the schema, and the graceful failure would be too cryptic and require a debugging session to investigate. But OK.
arrow-avro/src/reader/record.rs
Outdated
| ResolutionPlan::Record(ProjectorBuilder::try_new(r, field_defaults).build()?), | ||
| ), | ||
| (_, ResolutionInfo::Record(_)) => { | ||
| unreachable!("record resolution on non-record decoder") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using unreachable! here is potentially unsafe. Probably best to also pass an AvroError back here too. Perhaps it's unreachable right now, but all it takes is for an edge case to occur or some change down the line to create the potential for a crash.
| type_ids: Vec<i8>, | ||
| offsets: Vec<i32>, | ||
| branches: Vec<Decoder>, | ||
| counts: Vec<i32>, | ||
| reader_type_codes: Vec<i8>, | ||
| branches: UnionDecoderBranches, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Love the clean up here :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The borrow checker made me do this, but yes, it's also become cleaner.
| let children = reader_variants | ||
| .iter() | ||
| .map(|variant| self.parse_type(variant, namespace)) | ||
| .collect::<Result<Vec<_>, _>>()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's a risk of a regression here by not using the resolve_type to preserve field level resolution, especially the DefaultValue.
See the test below and let me know what you think.
#[test]
fn test_resolve_writer_non_union_record_to_reader_union_preserves_defaults() {
// Writer: record Inner{a: int}
// Reader: union [Inner{a: int, b: int default 42}, string]
// The matching child (Inner) should preserve DefaultValue(Int(42)) on field b.
let writer = Schema::Complex(ComplexType::Record(Record {
name: "Inner",
namespace: None,
doc: None,
aliases: vec![],
fields: vec![AvroFieldSchema {
name: "a",
doc: None,
r#type: mk_primitive(PrimitiveType::Int),
default: None,
aliases: vec![],
}],
attributes: Attributes::default(),
}));
let reader = mk_union(vec![
Schema::Complex(ComplexType::Record(Record {
name: "Inner",
namespace: None,
doc: None,
aliases: vec![],
fields: vec![
AvroFieldSchema {
name: "a",
doc: None,
r#type: mk_primitive(PrimitiveType::Int),
default: None,
aliases: vec![],
},
AvroFieldSchema {
name: "b",
doc: None,
r#type: mk_primitive(PrimitiveType::Int),
default: Some(Value::Number(serde_json::Number::from(42))),
aliases: vec![],
},
],
attributes: Attributes::default(),
})),
mk_primitive(PrimitiveType::String),
]);
let mut maker = Maker::new(false, false);
let dt = maker
.make_data_type(&writer, Some(&reader), None)
.expect("resolution should succeed");
// Verify the union resolution structure
let resolved = match dt.resolution.as_ref() {
Some(ResolutionInfo::Union(u)) => u,
other => panic!("expected union resolution info, got {other:?}"),
};
assert!(!resolved.writer_is_union && resolved.reader_is_union);
// The matching child (Inner at index 0) should have field b with DefaultValue
let children = match dt.codec() {
Codec::Union(children, _, _) => children,
other => panic!("expected union codec, got {other:?}"),
};
let inner_fields = match children[0].codec() {
Codec::Struct(f) => f,
other => panic!("expected struct codec for Inner, got {other:?}"),
};
assert_eq!(inner_fields.len(), 2);
assert_eq!(inner_fields[1].name(), "b");
assert_eq!(
inner_fields[1].data_type().resolution,
Some(ResolutionInfo::DefaultValue(AvroLiteral::Int(42))),
"field b should have DefaultValue(Int(42)) from schema resolution"
);
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the writer type is not a union in this case, the only resolution we need is with the best matching union variant that is found above.
A case could be made for not processing the variant schemas the second time to build the field information for the codec, but previous code does that as well since the loop in find_best_union_match exits early upon finding the direct match. Since schema resolution is not on a critical stage for performance, I left it to work as before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, the test fails because inner resolution information is not preserved. The same should be done when resolving the union-to-union case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default value should be retrieved when parsing any Avro schema, not only when resolving. The current representation looks like a data model mismatch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have tried to address this the best I could, but the fix (making DefaultValue resolutions for record fields in any parsed schema) invites more test failures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While default values are a property of record fields, other nested resolutions are particular for each writer-to-reader resolution case. If the writer schema is a union with multiple branches resolving to the same reader branch, fields in any nested records may need to be resolved differently depending on the branch encoded. The current data structure does not allow for that.
Unexpected cases in schema resolution code of arrow-avro reader were handled as panics to help catch programmer errors and refactoring gotchas. Upon review suggestion, converting them to graceful errors.
Specifically, an inner record type in non-union writer schema is resolved to an evolved record type with a default value for an added field.
Modify ResolvedRecord per-field information to include resolution as needed for this writer to reader pair. This is needed to correctly implement e.g. union-to-union resolution when multiple writer schema branches can resolve to the same reader schema branch.
f823b82 to
9addadc
Compare
Which issue does this PR close?
Rationale for this change
When an Avro reader schema has a union type that needs to be resolved against the type in the writer schema, resolution information other than primitive type promotions is not properly handled when creating the decoder.
For example, when the reader schema has a nullable record field that has an added nested field on top of the fields defined in the writer schema, the record type resolution needs to be applied, using a projection with the default field value.
What changes are included in this PR?
Extend the union resolution information in the decoder with variant
data for enum remapping and record projection. The
Projectordatastructure with
Skipperdecoders makes part of this information,which necessitated some refactoring.
Are these changes tested?
TODO:
Are there any user-facing changes?
No.