Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Changes
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
{{$NEXT}}

- Add --spawn-worker-at-max option to allow multiple collectors
- When max-open-jobs is reached, fork worker collectors to handle backlog

1.000163 2026-02-23 20:07:43-08:00 America/Los_Angeles

- Fix missing Changes entries
Expand Down
12 changes: 9 additions & 3 deletions lib/App/Yath/Command/collector.pm
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,28 @@ sub run {

my $run = Test2::Harness::Run->new(%{decode_json(<STDIN>)});

my $collector = $collector_class->new(
my $collector;
$collector = $collector_class->new(
%args,
settings => $settings,
workdir => $dir,
run_id => $run_id,
runner_pid => $runner_pid,
run => $run,
output_fh => $fh,
# as_json may already have the json form of the event cached, if so
# we can avoid doing an extra call to encode_json
action => sub { print $fh defined($_[0]) ? $_[0]->as_json . "\n" : "null\n"; },
# we can avoid doing an extra call to encode_json.
# Workers pass raw JSON strings, so check ref() before calling as_json.
action => sub { print {$collector->output_fh} defined($_[0]) ? ref($_[0]) ? $_[0]->as_json . "\n" : $_[0] : "null\n"; },
);

local $SIG{PIPE} = 'IGNORE';
my $ok = eval { $collector->process(); 1 };
my $err = $@;

# Avoid refcycle ($collector->action closes over $collector)
delete $collector->{action};

eval { print $fh "null\n"; 1 } or warn $@;

die $err unless $ok;
Expand Down
6 changes: 6 additions & 0 deletions lib/App/Yath/Options/Collector.pm
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ option_group {prefix => 'collector', category => "Collector Options"} => sub {
short_examples => [' 18'],
);

option spawn_worker_at_max => (
type => 'b',
description => 'Spawn an extra collector whenever we hit max_open_jobs. (Default: off)',
default => 0,
);

option max_poll_events => (
type => 's',
description => 'Maximum number of events to poll from a job before jumping to the next job. (Default: 1000)',
Expand Down
159 changes: 139 additions & 20 deletions lib/Test2/Harness/Collector.pm
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use Test2::Harness::Util::UUID qw/gen_uuid/;
use Test2::Harness::Util::Queue;
use Time::HiRes qw/sleep time/;
use File::Spec;
use POSIX qw/:sys_wait_h/;

use File::Path qw/remove_tree/;

Expand All @@ -24,7 +25,8 @@ use Test2::Harness::Util::HashBase qw{
<run_dir
<runner_pid +runner_exited <persistent_runner

<backed_up
<output_fh
<backed_up <workers <worker <jobs_buffer

+runner_stdout +runner_stderr +runner_aux_dir +runner_aux_handles

Expand All @@ -48,7 +50,97 @@ sub init {

$self->{+WAIT_TIME} //= 0.02;

$self->{+ACTION}->($self->_harness_event(0, undef, time, harness_run => $self->{+RUN}, harness_settings => $self->settings, about => {no_display => 1}));
$self->{+WORKERS} = [];
}

sub spawn_worker {
my $self = shift;

my $workers = $self->{+WORKERS} //= [];

my ($rh, $wh);
pipe($rh, $wh) or die "Could not open pipe: $!";

my $pid = fork // die "Could not fork: $!";
if ($pid) {
close($wh);
$rh->blocking(0);
my $buffer = "";
push @$workers => [$pid, $rh, \$buffer];

# Child will process the jobs we had, we start from scratch
$self->{+JOBS} = {};

return $pid;
}

# Child: State Management
$self->{+WORKER} = 1;
$self->{+JOBS_DONE} = 1;
$self->{+TASKS_DONE} = 1;
$self->{+WORKERS} = [];

close($rh);

$wh->autoflush(1);
$self->{+OUTPUT_FH} = $wh;

return;
}

sub process_workers {
my $self = shift;

my $workers = $self->{+WORKERS} or return 0;
my $count = 0;

my $max_poll_events = $self->settings->collector->max_poll_events;

for my $worker (@$workers) {
my ($pid, $rh, $buffer) = @$worker;
$count++;

my $check = waitpid($pid, WNOHANG);
my $exit = $?;
my $done = $check < 0 || $check == $pid;

if ($done) {
my $e = $self->_harness_event(0, undef, time, info => [{details => "Collector $pid complete ($exit)", tag => "INTERNAL", debug => 0, important => 0}]);
$self->{+ACTION}->($e);

@$workers = grep { "$worker" ne "$_" } @$workers;

$rh->blocking(1);
for my $line (<$rh>) {
$$buffer .= $line;
}
}
else {
my $lcount = 0;
for my $line (<$rh>) {
$$buffer .= $line;
last if $lcount++ >= $max_poll_events;
}
}

my $oldbuf = $$buffer;
$$buffer = "";

for my $line (split /^/, $oldbuf) {
unless ($done || substr($line, -1, 1) eq "\n") {
$$buffer = $line;
last;
}

$count++;
$self->{+ACTION}->($line);
}

# Do this here to make sure all events from the collector are received
die "Collector worker ($pid) did not exit properly (check: $check, exit: $exit)" if $done && $exit || $check < 0;
}

return $count;
}

sub process {
Expand All @@ -57,16 +149,23 @@ sub process {
my %warning_seen;
my $settings = $self->settings;

$self->{+ACTION}->($self->_harness_event(0, undef, time, harness_run => $self->{+RUN}, harness_settings => $settings, about => {no_display => 1}));

while (1) {
my $count = 0;
$count += $self->process_workers if @{$self->{+WORKERS}};
$count += $self->process_runner_output if $self->{+SHOW_RUNNER_OUTPUT};
$count += $self->process_tasks();

my $jobs = $self->jobs;

unless (keys %$jobs) {
unless (keys %$jobs || @{$self->{+WORKERS}}) {
next if $count;

if ($self->{+WORKER}) {
last;
}

if ($self->persistent_runner) {
last if $self->{+JOBS_DONE};
last if $self->runner_done;
Expand Down Expand Up @@ -118,10 +217,12 @@ sub process {
delete $self->{+PENDING}->{$jdir->job_id} unless $done->{retry};
}

last if !$count && $self->runner_exited;
last if !$count && !@{$self->{+WORKERS}} && ($self->runner_exited || $self->{+WORKER});
sleep $self->{+WAIT_TIME} unless $count;
}

exit(0) if $self->{+WORKER};

# One last slurp
$self->process_runner_output if $self->{+SHOW_RUNNER_OUTPUT};

Expand Down Expand Up @@ -153,6 +254,8 @@ sub runner_exited {
sub process_runner_output {
my $self = shift;

return 0 if $self->{+WORKER};

my $out = 0;
return $out unless $self->{+SHOW_RUNNER_OUTPUT};

Expand Down Expand Up @@ -225,6 +328,7 @@ sub process_runner_output {
sub process_tasks {
my $self = shift;

return 0 if $self->{+WORKER};
return 0 if $self->{+TASKS_DONE};

my $queue = $self->tasks_queue or return 0;
Expand All @@ -250,8 +354,16 @@ sub process_tasks {
return $count;
}

sub send_backed_up {
sub handle_backed_up {
my $self = shift;

if ($self->settings->collector->spawn_worker_at_max) {
my $pid = $self->spawn_worker() or return;
my $e = $self->_harness_event(0, undef, time, info => [{details => "Spawned an extra collector: $pid", tag => "INTERNAL", debug => 0, important => 0}]);
$self->{+ACTION}->($e);
return;
}

return if $self->{+BACKED_UP}++;

# This is an unlikely code path. If we're here, it means the last loop couldn't process any results.
Expand All @@ -264,7 +376,8 @@ sub send_backed_up {

Set a higher --max-open-jobs collector setting to prevent this problem in the
future, but be advised that could result in too many open filehandles on some
systems.
systems. You can also use --spawn-worker-at-max to automatically spawn extra
collectors when the limit is reached.

This message will only be shown once.
EOT
Expand All @@ -277,22 +390,31 @@ sub jobs {
my $self = shift;

my $jobs = $self->{+JOBS} //= {};
my $buffer = $self->{+JOBS_BUFFER} //= [];

return $jobs if $self->{+JOBS_DONE};
return $jobs if $self->{+WORKER};
return $jobs if $self->{+JOBS_DONE} && !@$buffer;

my $queue = $self->jobs_queue or return $jobs;

# Don't monitor more than 'max_open_jobs' or we might have too many open file handles and crash
# Max open files handles on a process applies. Usually this is 1024 so we
# can't have everything open at once when we're behind.
my $max_open_jobs = $self->settings->collector->max_open_jobs // 1024;
my $additional_jobs_to_parse = $max_open_jobs - keys %$jobs;
if($additional_jobs_to_parse <= 0) {
$self->send_backed_up;
return $jobs;
}
push @$buffer => $queue->poll();

my $queue = $self->jobs_queue or return $jobs;
while (my $item = shift @$buffer) {
if (keys(%$jobs) >= $max_open_jobs) {
$self->handle_backed_up;

# This may have changed after spawn_worker
$jobs = $self->{+JOBS};

return $jobs if $self->{+WORKER};

if (keys %$jobs) {
unshift @$buffer => $item;
return $jobs;
}
}

for my $item ($queue->poll($additional_jobs_to_parse)) {
my ($spos, $epos, $job) = @$item;

unless ($job) {
Expand Down Expand Up @@ -340,9 +462,6 @@ sub jobs {
);
}

# The collector didn't read in all the jobs because it'd run out of file handles. We need to let the stream know we're behind.
$self->send_backed_up if $max_open_jobs <= keys %$jobs;

return $jobs;
}

Expand Down