RUM-15735: Offload Mach profile aggregation#2878
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 831d3b7b37
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
0a068db to
c878d78
Compare
831d3b7 to
562d076
Compare
arroz
left a comment
There was a problem hiding this comment.
Overall it seems OK! I took some time to validate locking as I'm not a C++ expert, I don't see any obvious deadlocks or anything like that.
Just left some minor comments.
| @@ -49,6 +49,9 @@ void dd_delete_profiling_defaults(void); | |||
| * - Default: 5000000000ULL (5 seconds) | |||
| * - Timeout checking occurs during sample processing | |||
| * | |||
There was a problem hiding this comment.
Nitpick: empty line between these two parameters but not the others.
| // Profiling sampling backstop (see `callback`). | ||
| // Typical profile span is ~1 minute; this cutoff includes additional slack beyond that. | ||
| // The extra time avoids stopping sampling while the profile is still being processed. | ||
| static constexpr int64_t DD_PROFILER_TIMEOUT_NS = 90000000000ULL; // 1:30 minutes |
There was a problem hiding this comment.
There is a mismatch between the value and the variable type (signed vs unsigned). Although the value fits within the signed bits, both types should match.
| /// Recycled buffers retained for future producer handoff. | ||
| std::vector<std::vector<stack_trace_t>> reusable_buffers; | ||
|
|
||
| std::mutex work_mutex; |
There was a problem hiding this comment.
This mutex could have a bit of documentation regarding which variables it protects.
awforsythe
left a comment
There was a problem hiding this comment.
The level of complexity required here for proper synchronization is pretty impressive. I've done my best to understand the mental model for everything that's happening here re: concurrency, but I think this is the sort of code where you can't go wrong with more inline comments 🙂
As far as C++ code quality goes, LGTM.
| func testStop_unblocksPendingFlushRequest() { | ||
| // Given | ||
| XCTAssertEqual(dd_profiler_start(), 1) | ||
| Thread.sleep(forTimeInterval: 0.05) // Allow sampling to begin | ||
|
|
||
| let flushReturned = expectation(description: "Flush returns after stop") | ||
|
|
||
| // When | ||
| DispatchQueue.global().async { | ||
| let profile = dd_profiler_flush_and_get_profile() | ||
| if let profile { | ||
| dd_pprof_destroy(profile) | ||
| } | ||
| flushReturned.fulfill() | ||
| } | ||
|
|
||
| dd_profiler_stop() | ||
|
|
||
| // Then | ||
| wait(for: [flushReturned], timeout: 1.0) | ||
| XCTAssertEqual(dd_profiler_get_status(), DD_PROFILER_STATUS_STOPPED) | ||
| } |
There was a problem hiding this comment.
This is a pre-existing test that wasn't modified in these changes, but it seems like it may be relevant, since both dd_profiler_flush_and_get_profile and dd_profiler_stop now require coordination with the aggregation worker.
The test name ("stop unblocks pending flush request") seems to suggest that when you call dd_profiler_stop, you expect any pending call to dd_profiler_flush_and_get_profile to be unblocked, as in canceled immediately... but the test doesn't seem to validate that behavior. A flush call now waits on flush_cv while holding g_dd_profiler_mutex, meaning that a subsequent stop call will block (waiting for g_dd_profiler_mutex) until the flush completes.
i.e. from what I understand, if Thread A calls dd_profiler_flush_and_get_profile() and that call takes 100ms, a call to dd_profiler_stop() in Thread B will block until that 100ms flush operation is finished. Is this the expected behavior?
(If so, the name of the test seems a bit misleading, as the call to stop has no effect on whether the pending flush is unblocked: the test is just asserting that when you call both functions concurrently, they both return and you end up in the stopped state.)
fuzzybinary
left a comment
There was a problem hiding this comment.
This is some good work! A few comments that I think should be addressed, but I won't hold up merge.
| struct work_item { | ||
| enum class kind { | ||
| batch, | ||
| flush_barrier | ||
| }; |
There was a problem hiding this comment.
If we're able to use C++17 (which I think we are) this would be better served as an std::variant which would also likely save a bunch of memory.
| uint64_t dropped_batch_count = 0; | ||
| uint64_t dropped_sample_count = 0; | ||
| uint64_t max_pending_bytes = 0; |
There was a problem hiding this comment.
We already have a structure that stores these together - maybe just make these a dd_profiler_diagnositcs object?
|
|
||
| namespace dd::profiler { | ||
|
|
||
| static void destroy_stack_trace_payload(stack_trace_t* trace) { |
There was a problem hiding this comment.
nit: This feels like a weird place for this. Shouldn't this live somewhere with the declaration of stack_trace_t?
| } | ||
|
|
||
| void aggregation_worker::stop() { | ||
| if (is_worker_thread()) { |
There was a problem hiding this comment.
Does it really make sense to allow the worker thread to call stop?
Even if it does, I think this whole function could use some documentation, explaining what it's doing under what circumstances and why..
| lock.unlock(); | ||
| if (execute_inline && action) { | ||
| action(action_ctx); | ||
| } |
There was a problem hiding this comment.
I'm curious if this execute_inline special case can be avoided somehow. It would be nice if whatever is processing the work_items could reliably execute the flush action.
| } | ||
| } | ||
|
|
||
| void aggregation_worker::enqueue_active_buffer(std::vector<stack_trace_t>& active_buffer) { |
There was a problem hiding this comment.
I had to stare at this function for quite a while to figure out what it was doing. Comments here would be greatly appreciated.
| } | ||
| } | ||
|
|
||
| void aggregation_worker::destroy_batch(std::vector<stack_trace_t>& batch) { |
There was a problem hiding this comment.
nit: naming - this isn't destroying the batch, it's clearing the data in the individual items. It looks really weird to see destroy_batch immediately followed by recycle_batch.
| if (reusable_buffers.size() < max_reusable_buffers) { | ||
| reusable_buffers.push_back(std::move(batch)); | ||
| } |
There was a problem hiding this comment.
Did we do any profiling / checks on how much time we save performing recycling on these vectors? It seems we only allocate once during the buffer swap, which, because these are all C structs, should be a single allocation without any constructor calls. I feel like the added complexity of recycling the buffers might not be worth it, especially as the number of buffers available isn't a hard limit. You'll still get a new buffer even if we're at the max number of reusable buffers.
| dd_profiler_diagnostics_t result = empty_diagnostics(); | ||
|
|
||
| if (profiler) { | ||
| profiler->consume_diagnostics(&result); |
There was a problem hiding this comment.
Since this is all C++, any reason this couldn't be pass by reference instead of taking the pointer?
| if (profiler) { | ||
| profiler->request_flush(swap_profile_action, &swap_context); |
There was a problem hiding this comment.
Does request_flush ever take an action that isn't swap_profile_aciton? If not, I feel like we might be able to add some type safety here by letting this take an std::function over a C-style funciton pointer.
What and why?
This PR improves the iOS Mach profiler so aggregation work no longer blocks the sampling loop. It also reduces aggregation cost and adds bounded overload handling so normal profiling can keep sampling while heavy profile processing catches up.
How?
Mach profiling now separates the sampling hot path from the aggregation cold path. The sampling thread only captures raw stack samples and hands off completed buffers, avoiding expensive profile mutation, binary image resolution and flush/profile-rotation work on the sampling loop.
Aggregation runs on a long-lived serialized
aggregation_worker. The worker drains sample batches in order, executes flush requests as ordered barriers and performs profile swaps on the same serialized stream soflush_and_get_profile()produces a deterministic profile boundary while sampling continues.Additionally, the
aggregation_workeralso:Profiling Sessiontelemetry (internal spec).Review checklist
make api-surfacewhen adding new APIs