Skip to content

feat(core): Implement memory based backpressure mechanism#605

Draft
iambriccardo wants to merge 8 commits intomainfrom
riccardobusetti/etl-525-implement-memory-aware-processing
Draft

feat(core): Implement memory based backpressure mechanism#605
iambriccardo wants to merge 8 commits intomainfrom
riccardobusetti/etl-525-implement-memory-aware-processing

Conversation

@iambriccardo
Copy link
Contributor

No description provided.

@coderabbitai
Copy link

coderabbitai bot commented Feb 17, 2026

📝 Walkthrough

Summary by CodeRabbit

Release Notes

  • New Features

    • Implemented memory backpressure monitoring to dynamically throttle data processing and prevent system overload
    • Added PostgreSQL connection lifecycle tracking for enhanced reliability and diagnostic visibility
    • Enhanced stream processing pipeline with integrated backpressure coordination across replication workflows
  • Chores

    • Added dependencies for system monitoring and streaming infrastructure

Walkthrough

This PR adds a MemoryMonitor that samples system memory and publishes a hysteresis-based blocked/unblocked signal. Two stream wrappers—BackpressureStream and BatchBackpressureStream—are introduced to observe the memory signal (and optional Postgres connection updates) and pause/resume or flush streams/batches accordingly. MemoryMonitor is instantiated at pipeline startup and threaded through ApplyWorker/ApplyLoop/TableSyncWorker and table_copy call paths so table-copy and streaming flows can react to memory pressure.

Sequence Diagram(s)

sequenceDiagram
    participant Monitor as MemoryMonitor (bg task)
    participant System as sysinfo::System
    participant Watch as watch::Sender<bool>
    participant Subscriber as MemoryMonitorSubscription

    loop Every MEMORY_REFRESH_INTERVAL
        Monitor->>System: sample memory stats
        System-->>Monitor: MemorySnapshot
        Monitor->>Monitor: compute_next_blocked(used_percent)
        Monitor->>Watch: send blocked state (if changed)
        Watch-->>Subscriber: broadcast update
    end

    Subscriber->>Subscriber: poll_update()/current_blocked()
Loading
sequenceDiagram
    participant Consumer as Consumer
    participant Stream as BackpressureStream
    participant Inner as EventsStream
    participant Memory as MemoryMonitorSubscription

    Consumer->>Stream: poll_next()
    Stream->>Memory: poll_update(cx)
    alt memory blocked
        Memory-->>Stream: Some(true)
        Stream->>Consumer: Pending
    else memory not blocked
        Memory-->>Stream: None/false
        Stream->>Inner: poll_next()
        Inner-->>Stream: Item / Pending / Done
        Stream-->>Consumer: Item / Pending / Done
    end
Loading

Assessment against linked issues

Objective Addressed Explanation
Stop streaming data from table copies and streaming when memory exceeds threshold [ETL-525]

Out-of-scope changes

Code Change Explanation
Added multiple top-level Cargo dependencies (Cargo.toml) Dependency additions (tracing-appender, tracing-log, tracing-subscriber, utoipa, utoipa-swagger-ui, uuid, x509-cert, etc.) are unrelated to implementing memory-aware processing.
Added workspace dependency/features and tokio-stream (etl/Cargo.toml) Expanding tokio-postgres features and adding tokio-stream are not required by ETL-525 and appear beyond the single objective.

Possibly related PRs


Comment @coderabbitai help to get the list of available commands and usage tips.

@coveralls
Copy link

coveralls commented Feb 17, 2026

Pull Request Test Coverage Report for Build 22102540804

Details

  • 432 of 546 (79.12%) changed or added relevant lines in 10 files are covered.
  • 1879 unchanged lines in 22 files lost coverage.
  • Overall coverage decreased (-5.0%) to 69.175%

Changes Missing Coverage Covered Lines Changed/Added Lines %
etl/src/pipeline.rs 2 3 66.67%
etl/src/replication/apply.rs 16 17 94.12%
etl/src/workers/apply.rs 4 6 66.67%
etl/src/workers/table_sync.rs 3 5 60.0%
etl/src/replication/stream.rs 2 7 28.57%
etl/src/concurrency/memory_monitor.rs 120 129 93.02%
etl/src/workers/table_sync_copy.rs 15 24 62.5%
etl/src/replication/client.rs 33 67 49.25%
etl/src/concurrency/stream.rs 235 286 82.17%
Files with Coverage Reduction New Missed Lines %
etl/src/workers/table_sync_copy.rs 1 69.34%
etl/src/concurrency/stream.rs 8 80.57%
etl-config/src/shared/destination.rs 14 23.21%
etl/src/metrics.rs 15 57.14%
etl-postgres/src/replication/state.rs 20 71.12%
etl-config/src/shared/connection.rs 21 83.14%
etl/src/store/both/memory.rs 24 0.0%
etl-postgres/src/replication/slots.rs 31 74.02%
etl-config/src/shared/pipeline.rs 36 38.05%
etl/src/workers/apply.rs 38 73.09%
Totals Coverage Status
Change from base Build 22071808522: -5.0%
Covered Lines: 18229
Relevant Lines: 26352

💛 - Coveralls

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@etl/src/concurrency/stream.rs`:
- Around line 47-70: The task stalls because when memory_monitor.poll_update(cx)
yields Ready(...) we set this.paused_for_memory but do not register the waker
for future updates; before returning Poll::Pending you must continue polling
memory_monitor.poll_update(cx) (or loop until it returns Poll::Pending) so the
watch channel registers the current waker. Modify the poll logic around
memory_monitor.poll_update(cx) in the stream's poll method (the match that sets
*this.paused_for_memory and calls this.memory_monitor.current_blocked()) to
consume Ready variants and only stop when poll_update returns Poll::Pending,
updating *this.paused_for_memory on each Ready, and then return Poll::Pending
(so the waker is registered for the next change).
- Around line 190-202: In BatchBackpressureStream's poll implementation, when
*this.paused_for_memory is true and this.items is empty you currently return
Poll::Pending without registering the waker; change the branch so you capture
and store the current task waker (e.g. this.waker = Some(cx.waker().clone()) or
equivalent) before returning Poll::Pending so the stream can be woken when
memory state changes, keeping the existing behavior of flushing when items exist
(the symbols to modify are this.paused_for_memory, this.items, this.reset_timer
and the Poll::Pending return).

Comment on lines 47 to 70
match this.memory_monitor.poll_update(cx) {
Poll::Ready(Some(blocked)) => {
*this.paused_for_memory = blocked;
}
Poll::Ready(None) => {
*this.paused_for_memory = false;
}
Poll::Pending => {
let currently_blocked = this.memory_monitor.current_blocked();
if *this.paused_for_memory != currently_blocked {
*this.paused_for_memory = currently_blocked;
}
}
}

if !was_paused && *this.paused_for_memory {
info!("backpressure active, stream paused");
} else if was_paused && !*this.paused_for_memory {
info!("backpressure released, stream resumed");
}

if *this.paused_for_memory {
return Poll::Pending;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: Missing waker registration causes indefinite task stall when backpressure activates.

When poll_update(cx) returns Ready(Some(true)), the waker is not registered with the watch channel because a Ready result was obtained. Returning Poll::Pending on line 69 without a registered waker means the task will never be woken when memory pressure is released.

After receiving a Ready from the watch stream, you must poll again to register for the next update before returning Pending.

Proposed fix
         if *this.paused_for_memory {
+            // Ensure waker is registered for the next state change.
+            let _ = this.memory_monitor.poll_update(cx);
             return Poll::Pending;
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
match this.memory_monitor.poll_update(cx) {
Poll::Ready(Some(blocked)) => {
*this.paused_for_memory = blocked;
}
Poll::Ready(None) => {
*this.paused_for_memory = false;
}
Poll::Pending => {
let currently_blocked = this.memory_monitor.current_blocked();
if *this.paused_for_memory != currently_blocked {
*this.paused_for_memory = currently_blocked;
}
}
}
if !was_paused && *this.paused_for_memory {
info!("backpressure active, stream paused");
} else if was_paused && !*this.paused_for_memory {
info!("backpressure released, stream resumed");
}
if *this.paused_for_memory {
return Poll::Pending;
}
match this.memory_monitor.poll_update(cx) {
Poll::Ready(Some(blocked)) => {
*this.paused_for_memory = blocked;
}
Poll::Ready(None) => {
*this.paused_for_memory = false;
}
Poll::Pending => {
let currently_blocked = this.memory_monitor.current_blocked();
if *this.paused_for_memory != currently_blocked {
*this.paused_for_memory = currently_blocked;
}
}
}
if !was_paused && *this.paused_for_memory {
info!("backpressure active, stream paused");
} else if was_paused && !*this.paused_for_memory {
info!("backpressure released, stream resumed");
}
if *this.paused_for_memory {
// Ensure waker is registered for the next state change.
let _ = this.memory_monitor.poll_update(cx);
return Poll::Pending;
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@etl/src/concurrency/stream.rs` around lines 47 - 70, The task stalls because
when memory_monitor.poll_update(cx) yields Ready(...) we set
this.paused_for_memory but do not register the waker for future updates; before
returning Poll::Pending you must continue polling memory_monitor.poll_update(cx)
(or loop until it returns Poll::Pending) so the watch channel registers the
current waker. Modify the poll logic around memory_monitor.poll_update(cx) in
the stream's poll method (the match that sets *this.paused_for_memory and calls
this.memory_monitor.current_blocked()) to consume Ready variants and only stop
when poll_update returns Poll::Pending, updating *this.paused_for_memory on each
Ready, and then return Poll::Pending (so the waker is registered for the next
change).

Comment on lines +190 to +202
if *this.paused_for_memory {
if !this.items.is_empty() {
info!(
buffered_items = this.items.len(),
"backpressure active, flushing buffered batch"
);
*this.reset_timer = true;

return Poll::Ready(Some(ShutdownResult::Ok(std::mem::take(this.items))));
}

return Poll::Pending;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Same waker registration issue in BatchBackpressureStream.

The same bug exists here: when returning Poll::Pending after receiving a Ready(Some(blocked)) update, no waker is registered for subsequent state changes.

Proposed fix
             if *this.paused_for_memory {
                 if !this.items.is_empty() {
                     info!(
                         buffered_items = this.items.len(),
                         "backpressure active, flushing buffered batch"
                     );
                     *this.reset_timer = true;

                     return Poll::Ready(Some(ShutdownResult::Ok(std::mem::take(this.items))));
                 }

+                // Ensure waker is registered for the next state change.
+                let _ = this.memory_monitor.poll_update(cx);
                 return Poll::Pending;
             }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if *this.paused_for_memory {
if !this.items.is_empty() {
info!(
buffered_items = this.items.len(),
"backpressure active, flushing buffered batch"
);
*this.reset_timer = true;
return Poll::Ready(Some(ShutdownResult::Ok(std::mem::take(this.items))));
}
return Poll::Pending;
}
if *this.paused_for_memory {
if !this.items.is_empty() {
info!(
buffered_items = this.items.len(),
"backpressure active, flushing buffered batch"
);
*this.reset_timer = true;
return Poll::Ready(Some(ShutdownResult::Ok(std::mem::take(this.items))));
}
// Ensure waker is registered for the next state change.
let _ = this.memory_monitor.poll_update(cx);
return Poll::Pending;
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@etl/src/concurrency/stream.rs` around lines 190 - 202, In
BatchBackpressureStream's poll implementation, when *this.paused_for_memory is
true and this.items is empty you currently return Poll::Pending without
registering the waker; change the branch so you capture and store the current
task waker (e.g. this.waker = Some(cx.waker().clone()) or equivalent) before
returning Poll::Pending so the stream can be woken when memory state changes,
keeping the existing behavior of flushing when items exist (the symbols to
modify are this.paused_for_memory, this.items, this.reset_timer and the
Poll::Pending return).

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@etl/src/concurrency/stream.rs`:
- Around line 95-121: The code sets paused_for_memory on Ready from
memory_subscription.poll_update but doesn't register the current waker for
future updates, so returning Poll::Pending can sleep forever; fix by repeatedly
calling memory_subscription.poll_update(cx) in a loop (or otherwise re-invoking
it) until it returns Poll::Pending so the waker is registered for the next
change, updating *this.paused_for_memory on each Ready(Some/None) result; apply
this change to the same polling logic in both BackpressureStream and
BatchBackpressureStream (use the existing memory_subscription.poll_update,
paused_for_memory and was_paused symbols to locate and update the code).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants