Skip to content

Commit ffa70bf

Browse files
committed
refactor: removing threading in bbdev
1 parent a97a197 commit ffa70bf

2 files changed

Lines changed: 6 additions & 49 deletions

File tree

api/motia.config.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ import endpointPlugin from '@motiadev/plugin-endpoint/plugin'
33
import logsPlugin from '@motiadev/plugin-logs/plugin'
44
import observabilityPlugin from '@motiadev/plugin-observability/plugin'
55
import statesPlugin from '@motiadev/plugin-states/plugin'
6-
import bullmqPlugin from '@motiadev/plugin-bullmq/plugin'
76

87
export default defineConfig({
9-
plugins: [observabilityPlugin, statesPlugin, endpointPlugin, logsPlugin, bullmqPlugin],
8+
plugins: [observabilityPlugin, statesPlugin, endpointPlugin, logsPlugin],
109
})

bbdev

Lines changed: 5 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -449,41 +449,11 @@ if __name__ == "__main__":
449449
available_port = find_available_port(start_port=5100, end_port=5500)
450450
print(f"Starting server on port {available_port}...")
451451
proc = subprocess.Popen(
452-
["pnpm", "dev", "--port", str(available_port)],
453-
cwd=workflow_dir,
454-
start_new_session=True,
455-
stdout=subprocess.PIPE,
456-
stderr=subprocess.PIPE,
452+
["pnpm", "dev", "--port", str(available_port)], cwd=workflow_dir
457453
)
458454

459-
# Forward stdout/stderr in background threads; stopped before
460-
# shutdown to suppress BullMQ teardown errors and pnpm's
461-
# ELIFECYCLE message (Motia framework limitation).
462-
import threading
463-
464-
output_active = threading.Event()
465-
output_active.set()
466-
467-
def forward_stream(stream, dest):
468-
for line in iter(stream.readline, b""):
469-
if not output_active.is_set():
470-
break
471-
dest.write(line)
472-
dest.flush()
473-
474-
threading.Thread(
475-
target=forward_stream,
476-
args=(proc.stdout, sys.stdout.buffer),
477-
daemon=True,
478-
).start()
479-
threading.Thread(
480-
target=forward_stream,
481-
args=(proc.stderr, sys.stderr.buffer),
482-
daemon=True,
483-
).start()
484-
485455
# Wait for service to start
486-
max_retries = 600
456+
max_retries = 30
487457
for i in range(max_retries):
488458
try:
489459
# Disable proxy, connect directly to localhost
@@ -536,22 +506,10 @@ if __name__ == "__main__":
536506
)
537507

538508
# 3. Shutdown service ================================
539-
output_active.clear()
540-
import signal
541-
509+
# Give observability plugin time to finish async operations (e.g., Redis writes)
542510
time.sleep(1)
543-
try:
544-
os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
545-
except ProcessLookupError:
546-
pass
547-
try:
548-
proc.wait(timeout=10)
549-
except subprocess.TimeoutExpired:
550-
try:
551-
os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
552-
except ProcessLookupError:
553-
pass
554-
proc.wait()
511+
proc.terminate()
512+
proc.wait()
555513
print(
556514
f"\nTask completed. Command running on http://localhost:{available_port} is finished"
557515
)

0 commit comments

Comments
 (0)