From 7cb64d48b93449451e837855c31b3001e77fa5aa Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Mon, 16 Mar 2026 10:54:10 +0100 Subject: [PATCH] Add optional celery worker restart after N runs, to pyesdl 26.2 --- pyproject.toml | 4 ++-- src/omotes_sdk/internal/worker/configs.py | 3 +++ src/omotes_sdk/internal/worker/worker.py | 3 +++ 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index b935dc2..0e4569c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,7 @@ authors = [ ] description = "Python implementation of the OMOTES SDK through jobs which may be submitted, receive status updates for submitted jobs or delete submitted jobs." readme = "README.md" -license = {file = "LICENSE"} +license = { file = "LICENSE" } classifiers = [ "License :: OSI Approved :: Apache Software License", "Programming Language :: Python :: 3", @@ -26,7 +26,7 @@ classifiers = [ dependencies = [ "aio-pika ~= 9.4, < 9.5", "omotes-sdk-protocol ~= 1.2", - "pyesdl ~= 25.7", + "pyesdl ~= 26.2", "pamqp ~= 3.3", "celery ~= 5.3", "typing-extensions ~= 4.11", diff --git a/src/omotes_sdk/internal/worker/configs.py b/src/omotes_sdk/internal/worker/configs.py index f7b2418..73f6571 100644 --- a/src/omotes_sdk/internal/worker/configs.py +++ b/src/omotes_sdk/internal/worker/configs.py @@ -22,6 +22,8 @@ class WorkerConfig: """Name of the queue to which progress updates for the task should be send.""" log_level: str """Log level for any logging in the worker.""" + max_tasks_before_restart: int + """The number of tasks (runs) after which the celery worker will be restarted.""" def __init__(self) -> None: """Create the worker config and retrieve values from environment variables.""" @@ -33,3 +35,4 @@ def __init__(self) -> None: "TASK_PROGRESS_QUEUE_NAME", "omotes_task_progress_events" ) self.log_level = os.environ.get("LOG_LEVEL", "INFO") + self.max_tasks_before_restart = int(os.environ.get("MAX_TASKS_BEFORE_RESTART", 0)) diff --git a/src/omotes_sdk/internal/worker/worker.py b/src/omotes_sdk/internal/worker/worker.py index 188d70d..6027fd1 100644 --- a/src/omotes_sdk/internal/worker/worker.py +++ b/src/omotes_sdk/internal/worker/worker.py @@ -344,6 +344,9 @@ def start(self) -> None: # app.conf.worker_send_task_events = True # Tell the worker to send task events. self.celery_app.conf.worker_hijack_root_logger = False self.celery_app.conf.worker_redirect_stdouts = False + # optionally restart celery worker after set number of tasks (runs) + if self.config.max_tasks_before_restart != 0: + self.celery_app.conf.worker_max_tasks_per_child = self.config.max_tasks_before_restart logger.info( "Connected to broker rabbitmq (%s:%s/%s) as %s",