From 9ed5fc4620e940df43e8ebcb5cc2f6ca3404db1b Mon Sep 17 00:00:00 2001 From: Justus Adam Date: Thu, 3 Jun 2021 18:30:13 +0200 Subject: [PATCH 1/3] Fixed a bug in project During `query_through` the ordering of columns in `emit` was not maintained, now it is. --- server/dataflow/src/ops/project.rs | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/server/dataflow/src/ops/project.rs b/server/dataflow/src/ops/project.rs index 69ed543b8..39fa967cd 100644 --- a/server/dataflow/src/ops/project.rs +++ b/server/dataflow/src/ops/project.rs @@ -173,13 +173,26 @@ impl Ingredient for Project { vec![] }; - new_r.extend( - r.into_owned() - .into_iter() - .enumerate() - .filter(|(i, _)| emit.iter().any(|e| e == i)) - .map(|(_, c)| c), - ); + // new_r.extend( + // r.into_owned() + // .into_iter() + // .enumerate() + // .filter(|(i, _)| emit.iter().any(|e| e == i)) + // .map(|(_, c)| c), + // ); + + { + let o = r.into_owned(); + let l = new_r.len(); + debug_assert!(emit.iter().all(|e| e < &o.len())); + unsafe { new_r.set_len(l + emit.len()); } + for (i, c) in o.into_iter().enumerate() { + match emit.iter().enumerate().find(|(_, it)| **it == i) { + Some((idx, _)) => new_r[idx + l] = c, + None => () + } + } + } new_r.append(&mut expr); if let Some(ref a) = additional { From e938d0fb1d320fed63739f4b2b12cf8588770e12 Mon Sep 17 00:00:00 2001 From: Justus Adam Date: Fri, 4 Jun 2021 16:28:02 +0200 Subject: [PATCH 2/3] Making the column ordering project fix better Added a test case Added safety assertions Added some explanation comments to the code --- server/dataflow/src/ops/project.rs | 47 +++++++++++++++++++++++------- 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/server/dataflow/src/ops/project.rs b/server/dataflow/src/ops/project.rs index 39fa967cd..e60d00b11 100644 --- a/server/dataflow/src/ops/project.rs +++ b/server/dataflow/src/ops/project.rs @@ -173,19 +173,38 @@ impl Ingredient for Project { vec![] }; - // new_r.extend( - // r.into_owned() - // .into_iter() - // .enumerate() - // .filter(|(i, _)| emit.iter().any(|e| e == i)) - // .map(|(_, c)| c), - // ); - + // This block of code is ugly, but the most + // efficient way I could think of for appending to + // the vector while also respecting the ordering of + // columns and without allocating some special + // intermediate structure to map indices. Basic + // issue is that the order of columns in `emit` is + // not necessarily monotonically increasing, so we + // have to either arbitrary retrieve from `r`, but + // then we can't *move* items and need to copy. + // Instead we arbitrarily insert. { - let o = r.into_owned(); + let o : Vec = r.into_owned(); let l = new_r.len(); + // Make sure none of the `emit` indices are out + // of bounds, because the loop does not check + // it. debug_assert!(emit.iter().all(|e| e < &o.len())); - unsafe { new_r.set_len(l + emit.len()); } + // Asserts `emit` has no duplicate elements (and + // invariant of using `find` later in the loop) + debug_assert_eq!(emit.len(), { + let mut v = emit.iter().collect::>(); + v.dedup(); + v.len() + }, "Invariant broken, duplicate column in `emit`: {:?}", emit); + let target_len = l + emit.len(); + // Just to be sure we have enough space (in + // theory has been checked earlier, but since + // this is cheap we do it again ) + new_r.reserve(target_len); + // Safe because we guarantee to insert + // `emit.len()`elements in the following loop + unsafe { new_r.set_len(target_len); } for (i, c) in o.into_iter().enumerate() { match emit.iter().enumerate().find(|(_, it)| **it == i) { Some((idx, _)) => new_r[idx + l] = c, @@ -562,6 +581,14 @@ mod tests { assert_eq!(expected, iter.next().unwrap().into_owned()); } + #[test] + fn it_queries_trough_respecting_column_order() { + let state = box RowMemoryState::default(); + let (p, states) = setup_query_through(state, &[0, 2, 1], None, None); + let expected: Vec = vec![1.into(), 3.into(), 2.into()]; + assert_query_through(p, 0, 1.into(), states, expected); + } + #[test] fn it_queries_through_all() { let state = Box::new(MemoryState::default()); From 68de705f108fde39e734f2155835a4f910e71e10 Mon Sep 17 00:00:00 2001 From: Justus Adam Date: Sun, 6 Jun 2021 00:07:14 +0100 Subject: [PATCH 3/3] Small alteration of the project operator fix Passing the correct length to `reserve` (turns the length passed to `reserve` is for *additional* elements) --- server/dataflow/src/ops/project.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server/dataflow/src/ops/project.rs b/server/dataflow/src/ops/project.rs index e60d00b11..a4d212f13 100644 --- a/server/dataflow/src/ops/project.rs +++ b/server/dataflow/src/ops/project.rs @@ -197,14 +197,13 @@ impl Ingredient for Project { v.dedup(); v.len() }, "Invariant broken, duplicate column in `emit`: {:?}", emit); - let target_len = l + emit.len(); // Just to be sure we have enough space (in // theory has been checked earlier, but since // this is cheap we do it again ) - new_r.reserve(target_len); + new_r.reserve(emit.len()); // Safe because we guarantee to insert // `emit.len()`elements in the following loop - unsafe { new_r.set_len(target_len); } + unsafe { new_r.set_len(l + emit.len()); } for (i, c) in o.into_iter().enumerate() { match emit.iter().enumerate().find(|(_, it)| **it == i) { Some((idx, _)) => new_r[idx + l] = c,