-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat: add max_row_group_bytes option to WriterProperties #9357
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
parquet/src/file/properties.rs
Outdated
| /// Sets maximum size of a row group in bytes, or `None` for unlimited. | ||
| /// | ||
| /// Row groups are flushed when their estimated encoded size exceeds this threshold. | ||
| /// This is similar to parquet-mr's `parquet.block.size` behavior. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parquet-mr is just the official java implementation for parquet, you can rewrite the comment to clarify that this match the official parquet Java implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, parquet-mr is I think now officially called "parquet-java" https://github.com/apache/parquet-java
618f003 to
0e07315
Compare
| @@ -575,7 +595,34 @@ impl WriterPropertiesBuilder { | |||
| /// If the value is set to 0. | |||
| pub fn set_max_row_group_size(mut self, value: usize) -> Self { | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we deprecate this function function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably. Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait wait, no so fast, this is a breaking change, as clippy will fail for users, I was asking, it might be in a different pr, but open for discussion. if you keep it please update the pr description under changes to users
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do agree that this API should be deprecated. Thanks for pointing it out!
this is a breaking change
This is inherently not a breaking change; the purpose of marking APIs as deprecated is to warn users before making a breaking change, without actually making this change.
This PR already calls for a minor bump due to the new APIs introduced; deprecating the old one does not change the version semantics for this PR.
as clippy will fail for users
It's a rustc warning:
The deprecated attribute marks an item as deprecated. rustc will issue warnings on usage of #[deprecated] items
So unless users add -D warnings, compilation won't break.
it might be in a different pr
I don't mind either way: Leaving it here or opening a new PR for deprecating the old API. LMK what's your preference and I'll do it.
if you keep it please update the pr description under changes to users
Done!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait wait, no so fast, this is a breaking change, as clippy will fail for users, I was asking, it might be in a different pr, but open for discussion. if you keep it please update the pr description under changes to users
Yeah, in general I agree with @yonipeleg33 -- I don't think a clippy failure is a breaking change per-se -- the rust compiler will be happy to compile it. If downstream projects want to take a more strict "clippy must pass" stance I donthink that is technically an API breakage
| } | ||
|
|
||
| /// Helper to create a test batch with the given number of rows. | ||
| /// Each row is approximately 4 bytes (one i32). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// Each row is approximately 4 bytes (one i32). | |
| /// Each row is 4 bytes (one `i32`). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| ArrowDataType::Int32, | ||
| false, | ||
| )])); | ||
| let array = Int32Array::from((0..num_rows as i32).collect::<Vec<_>>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| let array = Int32Array::from((0..num_rows as i32).collect::<Vec<_>>()); | |
| let array = Int32Array::from_iter(0..num_rows as i32); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Done
|
|
||
| #[test] | ||
| fn test_row_group_limit_rows_only() { | ||
| // When only max_row_group_size is set, respect the row limit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the comment is not on the correct line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
|
||
| #[test] | ||
| fn test_row_group_limit_none_writes_single_row_group() { | ||
| // When both limits are None, all data should go into a single row group |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the comment is not on the correct line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done (moved to above the test function)
| false, | ||
| )])); | ||
|
|
||
| // Set byte limit to approximately fit ~30 rows worth of data (~100 bytes each) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the comment is not on the correct line (it should be on the Some(3500) one)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
|
||
| #[test] | ||
| fn test_row_group_limit_both_row_wins() { | ||
| // When both limits are set, the row limit triggers first |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the comment is not on the correct line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
parquet/src/file/properties.rs
Outdated
| pub const DEFAULT_WRITE_PAGE_HEADER_STATISTICS: bool = false; | ||
| /// Default value for [`WriterProperties::max_row_group_size`] | ||
| pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; | ||
| /// Default value for [`WriterProperties::max_row_group_bytes`] (128 MB, same as parquet-mr's parquet.block.size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as my other parquet-mr comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
parquet/src/file/properties.rs
Outdated
| if let Some(v) = value { | ||
| assert!(v > 0, "Cannot have a 0 max row group bytes"); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if let Some(v) = value { | |
| assert!(v > 0, "Cannot have a 0 max row group bytes"); | |
| } | |
| assert_ne!(value, Some(0), "Cannot have a 0 max row group bytes"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Neat, done
| } | ||
|
|
||
| #[test] | ||
| fn test_row_group_limit_both_row_wins() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a similar test when bytes win with the same structure as this, i.e. writing single large batch, but only changing the config (same test with only conf change)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No can do; The first batch is always written as a whole, because we need some statistics in order to calculate average row size. This is also noted in the PR description:
This means that the first batch will always be written as a whole (unless row count limit is also set).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you don't need statistics, you can calculate it from the data types you need to encocde
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICT, that beats the purpose of this configuration: Its purpose is to control the IO profile of the writer (i.e. how much and when it writes to disk), and for that, the data needs to at least be already encoded before calculating the row group size.
This is also backed by the Java source code:
it calculates memSize using columnStore.getBufferedSize(), which is documented as follows:
@return approximate size of the buffered encoded binary data
| } | ||
|
|
||
| #[test] | ||
| fn test_row_group_limit_both_bytes_wins() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you have a similar test with rows wins that have the same structure but only config change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done - see test_row_group_limit_both_row_wins_multiple_batches vs. test_row_group_limit_both_row_wins_single_batch
| #[test] | ||
| fn test_row_group_limit_both_bytes_wins() { | ||
| // When both limits are set, the byte limit triggers first | ||
| // Write in multiple small batches so byte-based splitting can work |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the comment on the method, the way you write batches should not affect, only the config, that is if the byte based got hit first, it should write that, if the row hit first it should write that.
and also, it should work regardless of how you feed the data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, the way data is fed does affect the row group splits, because of the first batch issue (noted in the PR description):
This means that the first batch will always be written as a whole (unless row count limit is also set).
And even beyond the first batch, the behaviour is not predictable: Byte-based limit is enforced by calculating the average row size, based on previous batches. This is more dynamic than row-based limit.
I'm not sure what's actionable from this comment. If you think there's still a missing test case, please LMK.
parquet/src/file/properties.rs
Outdated
| /// Sets maximum size of a row group in bytes, or `None` for unlimited. | ||
| /// | ||
| /// Row groups are flushed when their estimated encoded size exceeds this threshold. | ||
| /// This is similar to the official `parquet.block.size` behavior. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// This is similar to the official `parquet.block.size` behavior. | |
| /// This is similar to the official Java implementation `parquet.block.size` behavior. |
this is not part of the spec so there is no official about it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
yonipeleg33
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rluvaton, PTAL
| } | ||
|
|
||
| /// Helper to create a test batch with the given number of rows. | ||
| /// Each row is approximately 4 bytes (one i32). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| ArrowDataType::Int32, | ||
| false, | ||
| )])); | ||
| let array = Int32Array::from((0..num_rows as i32).collect::<Vec<_>>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Done
|
|
||
| #[test] | ||
| fn test_row_group_limit_none_writes_single_row_group() { | ||
| // When both limits are None, all data should go into a single row group |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done (moved to above the test function)
|
|
||
| #[test] | ||
| fn test_row_group_limit_rows_only() { | ||
| // When only max_row_group_size is set, respect the row limit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| false, | ||
| )])); | ||
|
|
||
| // Set byte limit to approximately fit ~30 rows worth of data (~100 bytes each) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
parquet/src/file/properties.rs
Outdated
| if let Some(v) = value { | ||
| assert!(v > 0, "Cannot have a 0 max row group bytes"); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Neat, done
| @@ -575,7 +595,34 @@ impl WriterPropertiesBuilder { | |||
| /// If the value is set to 0. | |||
| pub fn set_max_row_group_size(mut self, value: usize) -> Self { | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably. Done
| } | ||
|
|
||
| #[test] | ||
| fn test_row_group_limit_both_row_wins() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No can do; The first batch is always written as a whole, because we need some statistics in order to calculate average row size. This is also noted in the PR description:
This means that the first batch will always be written as a whole (unless row count limit is also set).
| } | ||
|
|
||
| #[test] | ||
| fn test_row_group_limit_both_bytes_wins() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done - see test_row_group_limit_both_row_wins_multiple_batches vs. test_row_group_limit_both_row_wins_single_batch
| #[test] | ||
| fn test_row_group_limit_both_bytes_wins() { | ||
| // When both limits are set, the byte limit triggers first | ||
| // Write in multiple small batches so byte-based splitting can work |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, the way data is fed does affect the row group splits, because of the first batch issue (noted in the PR description):
This means that the first batch will always be written as a whole (unless row count limit is also set).
And even beyond the first batch, the behaviour is not predictable: Byte-based limit is enforced by calculating the average row size, based on previous batches. This is more dynamic than row-based limit.
I'm not sure what's actionable from this comment. If you think there's still a missing test case, please LMK.
etseidl
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @yonipeleg33. Flushing partial review for now, but I think this is looking pretty sound so far.
parquet/src/file/properties.rs
Outdated
| /// # Panics | ||
| /// If the value is `Some(0)`. | ||
| pub fn set_max_row_group_row_count(mut self, value: Option<usize>) -> Self { | ||
| assert_ne!(value, Some(0), "Cannot have a 0 max row group bytes"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| assert_ne!(value, Some(0), "Cannot have a 0 max row group bytes"); | |
| assert_ne!(value, Some(0), "Cannot have a 0 max row group row count"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
parquet/src/file/properties.rs
Outdated
| /// # Panics | ||
| /// If the value is set to 0. | ||
| #[deprecated( | ||
| since = "57.3.0", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| since = "57.3.0", | |
| since = "58.0.0", |
57.3.0 has already been released
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done
parquet/src/file/properties.rs
Outdated
| pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; | ||
| /// Default value for [`WriterProperties::max_row_group_bytes`] (128 MB, same as the official Java | ||
| /// implementation for `parquet.block.size`) | ||
| pub const DEFAULT_MAX_ROW_GROUP_BYTES: usize = 128 * 1024 * 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This constant appears to be unused. I'd vote for less clutter and get rid of it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or should we set it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed. Good catch! (Leftover from previous implementations)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or should we set it?
@alamb I think not, as it changes behaviour without users opting-in for that new behaviour. None preserves the existing behaviour by default, which is no byte count limit at all.
parquet/src/file/properties.rs
Outdated
| /// This is similar to the official Java implementation for `parquet.block.size`'s behavior. | ||
| /// | ||
| /// If both `max_row_group_row_count` and `max_row_group_bytes` are set, | ||
| /// the row group with the smallest limit will be applied. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// the row group with the smallest limit will be applied. | |
| /// the row group with the smaller limit will be produced. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
parquet/src/file/properties.rs
Outdated
| /// Sets maximum number of rows in a row group, or `None` for unlimited. | ||
| /// | ||
| /// If both `max_row_group_row_count` and `max_row_group_bytes` are set, | ||
| /// the row group with the smallest limit will be applied. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// the row group with the smallest limit will be applied. | |
| /// the row group with the smaller limit will be produced. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| @@ -314,8 +320,12 @@ impl<W: Write + Send> ArrowWriter<W> { | |||
| /// Encodes the provided [`RecordBatch`] | |||
| /// | |||
| /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`] | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`] | |
| /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_row_count`] |
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed! Done
- Change deprecation notice to 58.0.0 - Improve wording in comments - Cleanup references to the newly deprecated API
yonipeleg33
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @etseidl, PTAL
parquet/src/file/properties.rs
Outdated
| pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; | ||
| /// Default value for [`WriterProperties::max_row_group_bytes`] (128 MB, same as the official Java | ||
| /// implementation for `parquet.block.size`) | ||
| pub const DEFAULT_MAX_ROW_GROUP_BYTES: usize = 128 * 1024 * 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed. Good catch! (Leftover from previous implementations)
parquet/src/file/properties.rs
Outdated
| /// # Panics | ||
| /// If the value is `Some(0)`. | ||
| pub fn set_max_row_group_row_count(mut self, value: Option<usize>) -> Self { | ||
| assert_ne!(value, Some(0), "Cannot have a 0 max row group bytes"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
parquet/src/file/properties.rs
Outdated
| /// # Panics | ||
| /// If the value is set to 0. | ||
| #[deprecated( | ||
| since = "57.3.0", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done
parquet/src/file/properties.rs
Outdated
| /// This is similar to the official Java implementation for `parquet.block.size`'s behavior. | ||
| /// | ||
| /// If both `max_row_group_row_count` and `max_row_group_bytes` are set, | ||
| /// the row group with the smallest limit will be applied. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
parquet/src/file/properties.rs
Outdated
| /// Sets maximum number of rows in a row group, or `None` for unlimited. | ||
| /// | ||
| /// If both `max_row_group_row_count` and `max_row_group_bytes` are set, | ||
| /// the row group with the smallest limit will be applied. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| @@ -314,8 +320,12 @@ impl<W: Write + Send> ArrowWriter<W> { | |||
| /// Encodes the provided [`RecordBatch`] | |||
| /// | |||
| /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`] | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed! Done
parquet/src/file/properties.rs
Outdated
| pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; | ||
| /// Default value for [`WriterProperties::max_row_group_bytes`] (128 MB, same as the official Java | ||
| /// implementation for `parquet.block.size`) | ||
| pub const DEFAULT_MAX_ROW_GROUP_BYTES: usize = 128 * 1024 * 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or should we set it?
| /// Returns maximum number of rows in a row group, or `usize::MAX` if unlimited. | ||
| /// | ||
| /// For more details see [`WriterPropertiesBuilder::set_max_row_group_size`] | ||
| pub fn max_row_group_size(&self) -> usize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the introduction of max_row_group_count, what would you think about deprecating max_row_group_size and directing people to that new setting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense, as we also deprecate the corresponding setter. Done!
parquet/src/file/properties.rs
Outdated
| data_page_row_count_limit: DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT, | ||
| write_batch_size: DEFAULT_WRITE_BATCH_SIZE, | ||
| max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE, | ||
| max_row_group_row_count: Some(DEFAULT_MAX_ROW_GROUP_SIZE), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we also please align the constant name to the parameter name (eg. DEFAULT_MAX_ROW_GROUP_COUNT)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
Thank you @yonipeleg33 -- sorry I forgot to submit my review from the other day when I reviewed this PR |
yonipeleg33
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @yonipeleg33 -- sorry I forgot to submit my review from the other day when I reviewed this PR
Happens to the best of us 😄
Done, thanks for the review! PTAL
parquet/src/file/properties.rs
Outdated
| pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; | ||
| /// Default value for [`WriterProperties::max_row_group_bytes`] (128 MB, same as the official Java | ||
| /// implementation for `parquet.block.size`) | ||
| pub const DEFAULT_MAX_ROW_GROUP_BYTES: usize = 128 * 1024 * 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or should we set it?
@alamb I think not, as it changes behaviour without users opting-in for that new behaviour. None preserves the existing behaviour by default, which is no byte count limit at all.
| /// Returns maximum number of rows in a row group, or `usize::MAX` if unlimited. | ||
| /// | ||
| /// For more details see [`WriterPropertiesBuilder::set_max_row_group_size`] | ||
| pub fn max_row_group_size(&self) -> usize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense, as we also deprecate the corresponding setter. Done!
parquet/src/file/properties.rs
Outdated
| data_page_row_count_limit: DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT, | ||
| write_batch_size: DEFAULT_WRITE_BATCH_SIZE, | ||
| max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE, | ||
| max_row_group_row_count: Some(DEFAULT_MAX_ROW_GROUP_SIZE), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
etseidl
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @yonipeleg33, this looks good to me.
|
Thanks so much for the review, guys! |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me too -- thanks @yonipeleg33 @etseidl and @rluvaton
| @@ -575,7 +595,34 @@ impl WriterPropertiesBuilder { | |||
| /// If the value is set to 0. | |||
| pub fn set_max_row_group_size(mut self, value: usize) -> Self { | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait wait, no so fast, this is a breaking change, as clippy will fail for users, I was asking, it might be in a different pr, but open for discussion. if you keep it please update the pr description under changes to users
Yeah, in general I agree with @yonipeleg33 -- I don't think a clippy failure is a breaking change per-se -- the rust compiler will be happy to compile it. If downstream projects want to take a more strict "clippy must pass" stance I donthink that is technically an API breakage
| let a = batch.slice(0, to_write); | ||
| let b = batch.slice(to_write, batch.num_rows() - to_write); | ||
| self.write(&a)?; | ||
| return self.write(&b); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this recurses, this could potentially blow out the stack with pathalogical inputs (e.g. a RecordBatch with 1M rows with a max_row_group_count of 1). I don't think it is necessary to fix now, I just wanted to point it out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here is a reproducer (I will file a follow on ticket)
#[test]
fn test_row_group_limit_rows_only_pathological_stack_overflow_demo() {
let schema = Arc::new(Schema::new(vec![Field::new(
"int",
ArrowDataType::Int32,
false,
)]));
let array = Int32Array::from((0..1_000_000_i32).collect::<Vec<_>>());
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let props = WriterProperties::builder()
.set_max_row_group_row_count(Some(1))
.set_max_row_group_bytes(None)
.build();
let file = tempfile::tempfile().unwrap();
let mut writer = ArrowWriter::try_new(file, schema, Some(props)).unwrap();
// This currently recurses once per row-group split and can overflow the stack.
writer.write(&batch).unwrap();
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I filed a ticket to track
Given the prior code path for max_row_group_size also uses recursion I don't think this is a new bug introduced by this PR (though the max bytes path is now also susceptible to the same issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for looking into it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for looking into it!
|
I made a small follow on PR to add some additional tests |
Which issue does this PR close?
This PR implements another suggestion introduced in #1213:
So not "Closes" anything new.
Rationale for this change
A best effort to match Spark's (or more specifically, Hadoop's)
parquet.block.sizeconfiguration behaviour, as documented in parquet-hadoop's README:Since arrow's parquet writer writes batches, it's inherently different than Hadoop's per-record writer behaviour - so the behaviour of
max_row_group_byteswill be different than Hadoop'sparquet.block.size, but this is the closest I could reasonably get (see details below).What changes are included in this PR?
Configuration changes
max_row_group_bytesconfiguration option inWriterPropertiesmax_row_group_sizeprivate property tomax_row_group_row_countset_max_row_group_size()andmax_row_group_size()still remain with their existing signatures.set_max_row_group_row_count()andmax_row_group_row_count()which expose theOption<usize>type.set_max_row_group_row_count(None)is called,max_row_group_size()will returnusize::MAX.Writer changes
ArrowWriter::writenow supports any combination of these two properties (row count and row bytes):Byte limit is calculated once per batch (as opposed to Hadoop's per-record calculation):
Before writing each batch, compute the average row size in bytes based on previous writes, and flush or split the batch according to that average before hitting the limit.
This means that the first batch will always be written as a whole (unless row count limit is also set).
Are these changes tested?
Yes - added unit tests to check all different combinations of these two properties being set.
Are there any user-facing changes?
Yes:
usize::MAXfrommax_row_group_size()if it was unset by the user).set_max_row_group_sizeandmax_row_group_sizeAPIs.