feat(stream): implement streaming part of now executor#6408
feat(stream): implement streaming part of now executor#6408mergify[bot] merged 8 commits intomainfrom
Conversation
24845d9 to
9100591
Compare
Codecov Report
@@ Coverage Diff @@
## main #6408 +/- ##
========================================
Coverage 73.29% 73.29%
========================================
Files 1012 1014 +2
Lines 162172 162331 +159
========================================
+ Hits 118862 118987 +125
- Misses 43310 43344 +34
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
|
we should use |
386cb81 to
ec1ddf4
Compare
done |
ce2b319 to
42a69db
Compare
42a69db to
e0fc47c
Compare
6cb4643 to
de1d1bf
Compare
|
related #6472 |
29a6845 to
794cec8
Compare
src/stream/src/executor/now.rs
Outdated
| let mut last_timestamp = state_row.and_then(|row| row[0].clone()); | ||
|
|
||
| while let Some(barrier) = barrier_receiver.recv().await { | ||
| if !barrier.is_update() && !barrier.is_resume() { |
There was a problem hiding this comment.
Suggest maintaining a paused state instead of relying on the assumption of "pause -> update -> resume`, in case we have other barriers inside.
There was a problem hiding this comment.
Suggest maintaining a
pausedstate instead of relying on the assumption of "pause -> update -> resume`, in case we have other barriers inside.
fixed
| state_table.init_epoch(barrier.epoch); | ||
|
|
||
| // The first barrier message should be propagated. | ||
| yield Message::Barrier(barrier); |
There was a problem hiding this comment.
So we don't emit the timestamp on the first barrier? Is this expected?
There was a problem hiding this comment.
So we don't emit the timestamp on the first barrier? Is this expected?
We can't because no message is permitted before the first barrier.
There was a problem hiding this comment.
How about emitting the timestamp right after the barrier?
There was a problem hiding this comment.
How about emitting the timestamp right after the barrier?
The timestamp will be updated on the second barrier so I think that it is unnecessary.
src/stream/src/executor/now.rs
Outdated
| if last_timestamp.is_some() { | ||
| let chunk_popped = data_chunk_builder | ||
| .append_one_row_from_datums([&last_timestamp].into_iter()); | ||
| debug_assert!(chunk_popped.is_none()); | ||
| } | ||
| let data_chunk = data_chunk_builder | ||
| .append_one_row_from_datums([×tamp].into_iter()) | ||
| .unwrap(); |
There was a problem hiding this comment.
Directly use DataChunk::from_rows?
There was a problem hiding this comment.
Directly use
DataChunk::from_rows?
fixed
210f748 to
2fc2346
Compare
73cbf0f to
2dfbafd
Compare
2dfbafd to
cde35d6
Compare
|
Generally LGTM. I'd prefer to hold the PR for a while, and wait for the frontend part, so that we can test e2e with dynamic filter. |
There will be too much things in this PR:
The latter two cannot be placed in a separate PR before this because they need to use If it is recommended, I will put these into this PR. |
In multiple PRs, but merge at the same time? |
Do you mean that create multiple PRs and other PRs also include codes of |
Therefore let's work on this base PR: #6677 of following multiple PRs first. It defines interfaces of |
Actually, if we expect this PR is > 90% correct, then I don't see the harm of merging first, and then once we have e2e with dynamic filter, we can fix any bugs we encounter. And incremental change is easier to review. IMO Now executor is also independent of dynamic filter. wdyt @TennyZhuang ? |
* feat(stream): implement now executor (close #6407) * use if let * use barrier receiver * timestamp sanity check * add now executor builder * fix gen proto * rebase main Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
I hereby agree to the terms of the Singularity Data, Inc. Contributor License Agreement.
What's changed and what's your intention?
implement streaming part of now executor, which will return current time
now()emit on most barrier
frontend part not done
Checklist
./risedev check(or alias,./risedev c)Documentation
If your pull request contains user-facing changes, please specify the types of the changes, and create a release note. Otherwise, please feel free to remove this section.
Types of user-facing changes
Please keep the types that apply to your changes, and remove those that do not apply.
Release note
Please create a release note for your changes. In the release note, focus on the impact on users, and mention the environment or conditions where the impact may occur.
Refer to a related PR or issue link (optional)
#6407