11import { ChildProcess , spawn } from 'child_process' ;
22import { getQueueStatus } from './http' ;
33import { REPO_ROOT } from './paths' ;
4- import { TestFixture } from './fixture' ;
4+ import { getFreePort , TestFixture } from './fixture' ;
55import { waitFor } from './wait' ;
66
77export interface ProcessorHandle {
@@ -14,79 +14,93 @@ export async function startProcessor(
1414 fixture : TestFixture ,
1515 env : Record < string , string > = { }
1616) : Promise < ProcessorHandle > {
17- let outputBuffer = '' ;
18- let exited = false ;
17+ for ( let attempt = 0 ; attempt < 5 ; attempt ++ ) {
18+ let outputBuffer = '' ;
19+ let exited = false ;
1920
20- const child = spawn ( process . execPath , [ 'dist/queue-processor.js' ] , {
21- cwd : REPO_ROOT ,
22- env : {
23- ...process . env ,
24- NODE_ENV : 'test' ,
25- HOME : fixture . homeDir ,
26- TINYCLAW_HOME : fixture . tinyclawHome ,
27- TINYCLAW_API_PORT : String ( fixture . apiPort ) ,
28- ...env ,
29- } ,
30- stdio : [ 'ignore' , 'pipe' , 'pipe' ] ,
31- } ) ;
21+ const child = spawn ( process . execPath , [ 'dist/queue-processor.js' ] , {
22+ cwd : REPO_ROOT ,
23+ env : {
24+ ...process . env ,
25+ NODE_ENV : 'test' ,
26+ HOME : fixture . homeDir ,
27+ TINYCLAW_HOME : fixture . tinyclawHome ,
28+ TINYCLAW_API_PORT : String ( fixture . apiPort ) ,
29+ ...env ,
30+ } ,
31+ stdio : [ 'ignore' , 'pipe' , 'pipe' ] ,
32+ } ) ;
3233
33- child . stdout ?. setEncoding ( 'utf8' ) ;
34- child . stderr ?. setEncoding ( 'utf8' ) ;
35- child . stdout ?. on ( 'data' , ( chunk : string ) => {
36- outputBuffer += chunk ;
37- } ) ;
38- child . stderr ?. on ( 'data' , ( chunk : string ) => {
39- outputBuffer += chunk ;
40- } ) ;
41- child . on ( 'exit' , ( ) => {
42- exited = true ;
43- } ) ;
34+ child . stdout ?. setEncoding ( 'utf8' ) ;
35+ child . stderr ?. setEncoding ( 'utf8' ) ;
36+ child . stdout ?. on ( 'data' , ( chunk : string ) => {
37+ outputBuffer += chunk ;
38+ } ) ;
39+ child . stderr ?. on ( 'data' , ( chunk : string ) => {
40+ outputBuffer += chunk ;
41+ } ) ;
42+ child . on ( 'exit' , ( ) => {
43+ exited = true ;
44+ } ) ;
4445
45- await waitFor ( async ( ) => {
46- if ( exited ) {
47- throw new Error ( `Queue processor exited before readiness:\n${ outputBuffer } ` ) ;
48- }
4946 try {
50- return await getQueueStatus ( fixture . baseUrl ) ;
51- } catch {
52- return undefined ;
53- }
54- } , 10_000 ) ;
47+ await waitFor ( async ( ) => {
48+ if ( exited ) {
49+ throw new Error ( `Queue processor exited before readiness:\n${ outputBuffer } ` ) ;
50+ }
51+ try {
52+ return await getQueueStatus ( fixture . baseUrl ) ;
53+ } catch {
54+ return undefined ;
55+ }
56+ } , 10_000 ) ;
5557
56- return {
57- child,
58- async stop ( ) {
59- if ( exited ) {
60- return ;
61- }
58+ return {
59+ child,
60+ async stop ( ) {
61+ if ( exited ) {
62+ return ;
63+ }
6264
63- const exitPromise = new Promise < void > ( ( resolve ) => {
64- child . once ( 'exit' , ( ) => resolve ( ) ) ;
65- } ) ;
66- let timeoutId : NodeJS . Timeout | undefined ;
65+ const exitPromise = new Promise < void > ( ( resolve ) => {
66+ child . once ( 'exit' , ( ) => resolve ( ) ) ;
67+ } ) ;
68+ let timeoutId : NodeJS . Timeout | undefined ;
6769
68- child . kill ( 'SIGTERM' ) ;
69- await Promise . race ( [
70- exitPromise ,
71- new Promise < void > ( ( resolve , reject ) => {
72- timeoutId = setTimeout ( ( ) => {
73- if ( ! exited ) {
74- child . kill ( 'SIGKILL' ) ;
70+ child . kill ( 'SIGTERM' ) ;
71+ await Promise . race ( [
72+ exitPromise ,
73+ new Promise < void > ( ( resolve , reject ) => {
74+ timeoutId = setTimeout ( ( ) => {
75+ if ( ! exited ) {
76+ child . kill ( 'SIGKILL' ) ;
77+ }
78+ reject ( new Error ( `Timed out stopping queue processor:\n${ outputBuffer } ` ) ) ;
79+ } , 5_000 ) ;
80+ } ) ,
81+ ] ) . catch ( async ( error ) => {
82+ await exitPromise ;
83+ throw error ;
84+ } ) . finally ( ( ) => {
85+ if ( timeoutId ) {
86+ clearTimeout ( timeoutId ) ;
7587 }
76- reject ( new Error ( `Timed out stopping queue processor:\n${ outputBuffer } ` ) ) ;
77- } , 5_000 ) ;
78- } ) ,
79- ] ) . catch ( async ( error ) => {
80- await exitPromise ;
88+ } ) ;
89+ } ,
90+ output ( ) {
91+ return outputBuffer ;
92+ } ,
93+ } ;
94+ } catch ( error ) {
95+ if ( ! outputBuffer . includes ( 'EADDRINUSE' ) || attempt === 4 ) {
8196 throw error ;
82- } ) . finally ( ( ) => {
83- if ( timeoutId ) {
84- clearTimeout ( timeoutId ) ;
85- }
86- } ) ;
87- } ,
88- output ( ) {
89- return outputBuffer ;
90- } ,
91- } ;
97+ }
98+
99+ fixture . apiPort = await getFreePort ( ) ;
100+ fixture . baseUrl = `http://127.0.0.1:${ fixture . apiPort } ` ;
101+ await new Promise ( resolve => child . once ( 'exit' , resolve ) ) ;
102+ }
103+ }
104+
105+ throw new Error ( 'Queue processor could not start after retrying port allocation' ) ;
92106}
0 commit comments