Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions Doc/library/profiling.sampling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ The default configuration works well for most use cases:
- Disabled
* - Default for ``--subprocesses``
- Disabled
* - Default for ``--blocking``
- Disabled (non-blocking sampling)


Sampling interval and duration
Expand Down Expand Up @@ -362,6 +364,50 @@ This option is particularly useful when investigating concurrency issues or
when work is distributed across a thread pool.


.. _blocking-mode:

Blocking mode
-------------

By default, Tachyon reads the target process's memory without stopping it.
This non-blocking approach is ideal for most profiling scenarios because it
imposes virtually zero overhead on the target application: the profiled
program runs at full speed and is unaware it is being observed.

However, non-blocking sampling can occasionally produce incomplete or
inconsistent stack traces in applications with many generators or coroutines
that rapidly switch between yield points, or in programs with very fast-changing
call stacks where functions enter and exit between the start and end of a single
stack read, resulting in reconstructed stacks that mix frames from different
execution states or that never actually existed.

For these cases, the :option:`--blocking` option stops the target process during
each sample::

python -m profiling.sampling run --blocking script.py
python -m profiling.sampling attach --blocking 12345

When blocking mode is enabled, the profiler suspends the target process,
reads its stack, then resumes it. This guarantees that each captured stack
represents a real, consistent snapshot of what the process was doing at that
instant. The trade-off is that the target process runs slower because it is
repeatedly paused.

.. warning::

Do not use very high sample rates (low ``--interval`` values) with blocking
mode. Suspending and resuming a process takes time, and if the sampling
interval is too short, the target will spend more time stopped than running.
For blocking mode, intervals of 1000 microseconds (1 millisecond) or higher
are recommended. The default 100 microsecond interval may cause noticeable
slowdown in the target application.

Use blocking mode only when you observe inconsistent stacks in your profiles,
particularly with generator-heavy or coroutine-heavy code. For most
applications, the default non-blocking mode provides accurate results with
zero impact on the target process.


Special frames
--------------

Expand Down Expand Up @@ -1296,6 +1342,13 @@ Sampling options
Also profile subprocesses. Each subprocess gets its own profiler
instance and output file. Incompatible with ``--live``.

.. option:: --blocking

Stop the target process during each sample. This ensures consistent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if pause might sound better than stop here

Suggested change
Stop the target process during each sample. This ensures consistent
Pause the target process during each sample. This ensures consistent

stack traces at the cost of slowing down the target. Use with longer
intervals (1000 µs or higher) to minimize impact. See :ref:`blocking-mode`
for details.


Mode options
------------
Expand Down
20 changes: 20 additions & 0 deletions Lib/profiling/sampling/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,13 @@ def _add_sampling_options(parser):
action="store_true",
help="Also profile subprocesses. Each subprocess gets its own profiler and output file.",
)
sampling_group.add_argument(
"--blocking",
action="store_true",
help="Stop all threads in target process before sampling to get consistent snapshots. "
"Uses thread_suspend on macOS and ptrace on Linux. Adds overhead but ensures memory "
"reads are from a frozen state.",
)


def _add_mode_options(parser):
Expand Down Expand Up @@ -545,6 +552,15 @@ def _validate_args(args, parser):
args: Parsed command-line arguments
parser: ArgumentParser instance for error reporting
"""
# Warn about blocking mode with aggressive sampling intervals
if args.blocking and args.interval < 100:
print(
f"Warning: --blocking with a {args.interval} µs interval will stop all threads "
f"{1_000_000 // args.interval} times per second. "
"Consider using --interval 1000 or higher to reduce overhead.",
file=sys.stderr
)

# Check if live mode is available
if hasattr(args, 'live') and args.live and LiveStatsCollector is None:
parser.error(
Expand Down Expand Up @@ -778,6 +794,7 @@ def _handle_attach(args):
native=args.native,
gc=args.gc,
opcodes=args.opcodes,
blocking=args.blocking,
)
_handle_output(collector, args, args.pid, mode)

Expand Down Expand Up @@ -848,6 +865,7 @@ def _handle_run(args):
native=args.native,
gc=args.gc,
opcodes=args.opcodes,
blocking=args.blocking,
)
_handle_output(collector, args, process.pid, mode)
finally:
Expand Down Expand Up @@ -893,6 +911,7 @@ def _handle_live_attach(args, pid):
native=args.native,
gc=args.gc,
opcodes=args.opcodes,
blocking=args.blocking,
)


Expand Down Expand Up @@ -940,6 +959,7 @@ def _handle_live_run(args):
native=args.native,
gc=args.gc,
opcodes=args.opcodes,
blocking=args.blocking,
)
finally:
# Clean up the subprocess
Expand Down
48 changes: 36 additions & 12 deletions Lib/profiling/sampling/sample.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import _remote_debugging
import contextlib
import os
import pstats
import statistics
Expand All @@ -9,6 +10,21 @@
from _colorize import ANSIColors

from .pstats_collector import PstatsCollector


@contextlib.contextmanager
def _pause_threads(unwinder, blocking):
"""Context manager to pause/resume threads around sampling if blocking is True."""
if blocking:
unwinder.pause_threads()
try:
yield
finally:
unwinder.resume_threads()
else:
yield


from .stack_collector import CollapsedStackCollector, FlamegraphCollector
from .heatmap_collector import HeatmapCollector
from .gecko_collector import GeckoCollector
Expand All @@ -28,12 +44,13 @@


class SampleProfiler:
def __init__(self, pid, sample_interval_usec, all_threads, *, mode=PROFILING_MODE_WALL, native=False, gc=True, opcodes=False, skip_non_matching_threads=True, collect_stats=False):
def __init__(self, pid, sample_interval_usec, all_threads, *, mode=PROFILING_MODE_WALL, native=False, gc=True, opcodes=False, skip_non_matching_threads=True, collect_stats=False, blocking=False):
self.pid = pid
self.sample_interval_usec = sample_interval_usec
self.all_threads = all_threads
self.mode = mode # Store mode for later use
self.collect_stats = collect_stats
self.blocking = blocking
try:
self.unwinder = self._new_unwinder(native, gc, opcodes, skip_non_matching_threads)
except RuntimeError as err:
Expand Down Expand Up @@ -63,12 +80,11 @@ def sample(self, collector, duration_sec=10, *, async_aware=False):
running_time = 0
num_samples = 0
errors = 0
interrupted = False
start_time = next_time = time.perf_counter()
last_sample_time = start_time
realtime_update_interval = 1.0 # Update every second
last_realtime_update = start_time
interrupted = False

try:
while running_time < duration_sec:
# Check if live collector wants to stop
Expand All @@ -78,20 +94,22 @@ def sample(self, collector, duration_sec=10, *, async_aware=False):
current_time = time.perf_counter()
if next_time < current_time:
try:
if async_aware == "all":
stack_frames = self.unwinder.get_all_awaited_by()
elif async_aware == "running":
stack_frames = self.unwinder.get_async_stack_trace()
else:
stack_frames = self.unwinder.get_stack_trace()
collector.collect(stack_frames)
except ProcessLookupError:
with _pause_threads(self.unwinder, self.blocking):
if async_aware == "all":
stack_frames = self.unwinder.get_all_awaited_by()
elif async_aware == "running":
stack_frames = self.unwinder.get_async_stack_trace()
else:
stack_frames = self.unwinder.get_stack_trace()
collector.collect(stack_frames)
except ProcessLookupError as e:
duration_sec = current_time - start_time
break
except (RuntimeError, UnicodeDecodeError, MemoryError, OSError):
except (RuntimeError, UnicodeDecodeError, MemoryError, OSError) as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
except (RuntimeError, UnicodeDecodeError, MemoryError, OSError) as e:
except (RuntimeError, UnicodeDecodeError, MemoryError, OSError):

collector.collect_failed_sample()
errors += 1
except Exception as e:
print(e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
print(e)

if not _is_process_running(self.pid):
break
raise e from None
Expand Down Expand Up @@ -303,6 +321,7 @@ def sample(
native=False,
gc=True,
opcodes=False,
blocking=False,
):
"""Sample a process using the provided collector.

Expand All @@ -318,6 +337,7 @@ def sample(
native: Whether to include native frames
gc: Whether to include GC frames
opcodes: Whether to include opcode information
blocking: Whether to stop all threads before sampling for consistent snapshots

Returns:
The collector with collected samples
Expand All @@ -343,6 +363,7 @@ def sample(
opcodes=opcodes,
skip_non_matching_threads=skip_non_matching_threads,
collect_stats=realtime_stats,
blocking=blocking,
)
profiler.realtime_stats = realtime_stats

Expand All @@ -364,6 +385,7 @@ def sample_live(
native=False,
gc=True,
opcodes=False,
blocking=False,
):
"""Sample a process in live/interactive mode with curses TUI.

Expand All @@ -379,6 +401,7 @@ def sample_live(
native: Whether to include native frames
gc: Whether to include GC frames
opcodes: Whether to include opcode information
blocking: Whether to stop all threads before sampling for consistent snapshots

Returns:
The collector with collected samples
Expand All @@ -404,6 +427,7 @@ def sample_live(
opcodes=opcodes,
skip_non_matching_threads=skip_non_matching_threads,
collect_stats=realtime_stats,
blocking=blocking,
)
profiler.realtime_stats = realtime_stats

Expand Down
82 changes: 39 additions & 43 deletions Lib/test/test_external_inspection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2931,24 +2931,24 @@ def top():
"Test only runs on Linux with process_vm_readv support",
)
def test_partial_stack_reuse(self):
"""Test that unchanged bottom frames are reused when top changes (A→B→C to A→B→D)."""
"""Test that unchanged parent frames are reused from cache when top frame moves."""
script_body = """\
def func_c():
sock.sendall(b"at_c")
def level4():
sock.sendall(b"sync1")
sock.recv(16)

def func_d():
sock.sendall(b"at_d")
sock.sendall(b"sync2")
sock.recv(16)

def func_b():
func_c()
func_d()
def level3():
level4()

def func_a():
func_b()
def level2():
level3()

def level1():
level2()

func_a()
level1()
"""

with self._target_process(script_body) as (
Expand All @@ -2958,55 +2958,51 @@ def func_a():
):
unwinder = make_unwinder(cache_frames=True)

# Sample at C: stack is A→B→C
frames_c = self._sample_frames(
# Sample 1: level4 at first sendall
frames1 = self._sample_frames(
client_socket,
unwinder,
b"at_c",
b"sync1",
b"ack",
{"func_a", "func_b", "func_c"},
{"level1", "level2", "level3", "level4"},
)
# Sample at D: stack is A→B→D (C returned, D called)
frames_d = self._sample_frames(
# Sample 2: level4 at second sendall (same stack, different line)
frames2 = self._sample_frames(
client_socket,
unwinder,
b"at_d",
b"sync2",
b"done",
{"func_a", "func_b", "func_d"},
{"level1", "level2", "level3", "level4"},
)

self.assertIsNotNone(frames_c)
self.assertIsNotNone(frames_d)
self.assertIsNotNone(frames1)
self.assertIsNotNone(frames2)

# Find func_a and func_b frames in both samples
def find_frame(frames, funcname):
for f in frames:
if f.funcname == funcname:
return f
return None

frame_a_in_c = find_frame(frames_c, "func_a")
frame_b_in_c = find_frame(frames_c, "func_b")
frame_a_in_d = find_frame(frames_d, "func_a")
frame_b_in_d = find_frame(frames_d, "func_b")

self.assertIsNotNone(frame_a_in_c)
self.assertIsNotNone(frame_b_in_c)
self.assertIsNotNone(frame_a_in_d)
self.assertIsNotNone(frame_b_in_d)

# The bottom frames (A, B) should be the SAME objects (cache reuse)
self.assertIs(
frame_a_in_c,
frame_a_in_d,
"func_a frame should be reused from cache",
)
self.assertIs(
frame_b_in_c,
frame_b_in_d,
"func_b frame should be reused from cache",
# level4 should have different line numbers (it moved)
l4_1 = find_frame(frames1, "level4")
l4_2 = find_frame(frames2, "level4")
self.assertIsNotNone(l4_1)
self.assertIsNotNone(l4_2)
self.assertNotEqual(
l4_1.location.lineno,
l4_2.location.lineno,
"level4 should be at different lines",
)

# Parent frames (level1, level2, level3) should be reused from cache
for name in ["level1", "level2", "level3"]:
f1 = find_frame(frames1, name)
f2 = find_frame(frames2, name)
self.assertIsNotNone(f1, f"{name} missing from sample 1")
self.assertIsNotNone(f2, f"{name} missing from sample 2")
self.assertIs(f1, f2, f"{name} should be reused from cache")

@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
Expand Down
Loading
Loading