diff --git a/Changes b/Changes index 72d8a7503..58e4853e9 100644 --- a/Changes +++ b/Changes @@ -1,5 +1,8 @@ {{$NEXT}} + - Add --spawn-worker-at-max option to allow multiple collectors + - When max-open-jobs is reached, fork worker collectors to handle backlog + 1.000163 2026-02-23 20:07:43-08:00 America/Los_Angeles - Fix missing Changes entries diff --git a/lib/App/Yath/Command/collector.pm b/lib/App/Yath/Command/collector.pm index 574c5ddef..e6f4d6428 100644 --- a/lib/App/Yath/Command/collector.pm +++ b/lib/App/Yath/Command/collector.pm @@ -36,22 +36,28 @@ sub run { my $run = Test2::Harness::Run->new(%{decode_json()}); - my $collector = $collector_class->new( + my $collector; + $collector = $collector_class->new( %args, settings => $settings, workdir => $dir, run_id => $run_id, runner_pid => $runner_pid, run => $run, + output_fh => $fh, # as_json may already have the json form of the event cached, if so - # we can avoid doing an extra call to encode_json - action => sub { print $fh defined($_[0]) ? $_[0]->as_json . "\n" : "null\n"; }, + # we can avoid doing an extra call to encode_json. + # Workers pass raw JSON strings, so check ref() before calling as_json. + action => sub { print {$collector->output_fh} defined($_[0]) ? ref($_[0]) ? $_[0]->as_json . "\n" : $_[0] : "null\n"; }, ); local $SIG{PIPE} = 'IGNORE'; my $ok = eval { $collector->process(); 1 }; my $err = $@; + # Avoid refcycle ($collector->action closes over $collector) + delete $collector->{action}; + eval { print $fh "null\n"; 1 } or warn $@; die $err unless $ok; diff --git a/lib/App/Yath/Options/Collector.pm b/lib/App/Yath/Options/Collector.pm index 940f9b2f0..007a0076c 100644 --- a/lib/App/Yath/Options/Collector.pm +++ b/lib/App/Yath/Options/Collector.pm @@ -14,6 +14,12 @@ option_group {prefix => 'collector', category => "Collector Options"} => sub { short_examples => [' 18'], ); + option spawn_worker_at_max => ( + type => 'b', + description => 'Spawn an extra collector whenever we hit max_open_jobs. (Default: off)', + default => 0, + ); + option max_poll_events => ( type => 's', description => 'Maximum number of events to poll from a job before jumping to the next job. (Default: 1000)', diff --git a/lib/Test2/Harness/Collector.pm b/lib/Test2/Harness/Collector.pm index 261e95749..71b1d63de 100644 --- a/lib/Test2/Harness/Collector.pm +++ b/lib/Test2/Harness/Collector.pm @@ -12,6 +12,7 @@ use Test2::Harness::Util::UUID qw/gen_uuid/; use Test2::Harness::Util::Queue; use Time::HiRes qw/sleep time/; use File::Spec; +use POSIX qw/:sys_wait_h/; use File::Path qw/remove_tree/; @@ -24,7 +25,8 @@ use Test2::Harness::Util::HashBase qw{ {+WAIT_TIME} //= 0.02; - $self->{+ACTION}->($self->_harness_event(0, undef, time, harness_run => $self->{+RUN}, harness_settings => $self->settings, about => {no_display => 1})); + $self->{+WORKERS} = []; +} + +sub spawn_worker { + my $self = shift; + + my $workers = $self->{+WORKERS} //= []; + + my ($rh, $wh); + pipe($rh, $wh) or die "Could not open pipe: $!"; + + my $pid = fork // die "Could not fork: $!"; + if ($pid) { + close($wh); + $rh->blocking(0); + my $buffer = ""; + push @$workers => [$pid, $rh, \$buffer]; + + # Child will process the jobs we had, we start from scratch + $self->{+JOBS} = {}; + + return $pid; + } + + # Child: State Management + $self->{+WORKER} = 1; + $self->{+JOBS_DONE} = 1; + $self->{+TASKS_DONE} = 1; + $self->{+WORKERS} = []; + + close($rh); + + $wh->autoflush(1); + $self->{+OUTPUT_FH} = $wh; + + return; +} + +sub process_workers { + my $self = shift; + + my $workers = $self->{+WORKERS} or return 0; + my $count = 0; + + my $max_poll_events = $self->settings->collector->max_poll_events; + + for my $worker (@$workers) { + my ($pid, $rh, $buffer) = @$worker; + $count++; + + my $check = waitpid($pid, WNOHANG); + my $exit = $?; + my $done = $check < 0 || $check == $pid; + + if ($done) { + my $e = $self->_harness_event(0, undef, time, info => [{details => "Collector $pid complete ($exit)", tag => "INTERNAL", debug => 0, important => 0}]); + $self->{+ACTION}->($e); + + @$workers = grep { "$worker" ne "$_" } @$workers; + + $rh->blocking(1); + for my $line (<$rh>) { + $$buffer .= $line; + } + } + else { + my $lcount = 0; + for my $line (<$rh>) { + $$buffer .= $line; + last if $lcount++ >= $max_poll_events; + } + } + + my $oldbuf = $$buffer; + $$buffer = ""; + + for my $line (split /^/, $oldbuf) { + unless ($done || substr($line, -1, 1) eq "\n") { + $$buffer = $line; + last; + } + + $count++; + $self->{+ACTION}->($line); + } + + # Do this here to make sure all events from the collector are received + die "Collector worker ($pid) did not exit properly (check: $check, exit: $exit)" if $done && $exit || $check < 0; + } + + return $count; } sub process { @@ -57,16 +149,23 @@ sub process { my %warning_seen; my $settings = $self->settings; + $self->{+ACTION}->($self->_harness_event(0, undef, time, harness_run => $self->{+RUN}, harness_settings => $settings, about => {no_display => 1})); + while (1) { my $count = 0; + $count += $self->process_workers if @{$self->{+WORKERS}}; $count += $self->process_runner_output if $self->{+SHOW_RUNNER_OUTPUT}; $count += $self->process_tasks(); my $jobs = $self->jobs; - unless (keys %$jobs) { + unless (keys %$jobs || @{$self->{+WORKERS}}) { next if $count; + if ($self->{+WORKER}) { + last; + } + if ($self->persistent_runner) { last if $self->{+JOBS_DONE}; last if $self->runner_done; @@ -118,10 +217,12 @@ sub process { delete $self->{+PENDING}->{$jdir->job_id} unless $done->{retry}; } - last if !$count && $self->runner_exited; + last if !$count && !@{$self->{+WORKERS}} && ($self->runner_exited || $self->{+WORKER}); sleep $self->{+WAIT_TIME} unless $count; } + exit(0) if $self->{+WORKER}; + # One last slurp $self->process_runner_output if $self->{+SHOW_RUNNER_OUTPUT}; @@ -153,6 +254,8 @@ sub runner_exited { sub process_runner_output { my $self = shift; + return 0 if $self->{+WORKER}; + my $out = 0; return $out unless $self->{+SHOW_RUNNER_OUTPUT}; @@ -225,6 +328,7 @@ sub process_runner_output { sub process_tasks { my $self = shift; + return 0 if $self->{+WORKER}; return 0 if $self->{+TASKS_DONE}; my $queue = $self->tasks_queue or return 0; @@ -250,8 +354,16 @@ sub process_tasks { return $count; } -sub send_backed_up { +sub handle_backed_up { my $self = shift; + + if ($self->settings->collector->spawn_worker_at_max) { + my $pid = $self->spawn_worker() or return; + my $e = $self->_harness_event(0, undef, time, info => [{details => "Spawned an extra collector: $pid", tag => "INTERNAL", debug => 0, important => 0}]); + $self->{+ACTION}->($e); + return; + } + return if $self->{+BACKED_UP}++; # This is an unlikely code path. If we're here, it means the last loop couldn't process any results. @@ -264,7 +376,8 @@ sub send_backed_up { Set a higher --max-open-jobs collector setting to prevent this problem in the future, but be advised that could result in too many open filehandles on some -systems. +systems. You can also use --spawn-worker-at-max to automatically spawn extra +collectors when the limit is reached. This message will only be shown once. EOT @@ -277,22 +390,31 @@ sub jobs { my $self = shift; my $jobs = $self->{+JOBS} //= {}; + my $buffer = $self->{+JOBS_BUFFER} //= []; - return $jobs if $self->{+JOBS_DONE}; + return $jobs if $self->{+WORKER}; + return $jobs if $self->{+JOBS_DONE} && !@$buffer; + + my $queue = $self->jobs_queue or return $jobs; - # Don't monitor more than 'max_open_jobs' or we might have too many open file handles and crash - # Max open files handles on a process applies. Usually this is 1024 so we - # can't have everything open at once when we're behind. my $max_open_jobs = $self->settings->collector->max_open_jobs // 1024; - my $additional_jobs_to_parse = $max_open_jobs - keys %$jobs; - if($additional_jobs_to_parse <= 0) { - $self->send_backed_up; - return $jobs; - } + push @$buffer => $queue->poll(); - my $queue = $self->jobs_queue or return $jobs; + while (my $item = shift @$buffer) { + if (keys(%$jobs) >= $max_open_jobs) { + $self->handle_backed_up; + + # This may have changed after spawn_worker + $jobs = $self->{+JOBS}; + + return $jobs if $self->{+WORKER}; + + if (keys %$jobs) { + unshift @$buffer => $item; + return $jobs; + } + } - for my $item ($queue->poll($additional_jobs_to_parse)) { my ($spos, $epos, $job) = @$item; unless ($job) { @@ -340,9 +462,6 @@ sub jobs { ); } - # The collector didn't read in all the jobs because it'd run out of file handles. We need to let the stream know we're behind. - $self->send_backed_up if $max_open_jobs <= keys %$jobs; - return $jobs; }