Skip to content
Merged

Dev #72

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
9760f5c
feat(external_worker): introduce ExternalWorkerConfig for improved co…
JulianKimmig Nov 12, 2025
a06f00b
feat(worker): enhance FuncNodesExternalWorker with nodeshelf property…
JulianKimmig Nov 12, 2025
fb6dddf
fix(tests): correct asyncio_default_fixture_loop_scope format in pyte…
JulianKimmig Nov 12, 2025
ced9c56
feat(websocket): implement graceful client connection closure and enh…
JulianKimmig Nov 12, 2025
26927c0
fix(loop): handle closed or missing event loop for tasks
JulianKimmig Nov 27, 2025
bde7dcc
chore(remote-worker): standardize exception logging variable
JulianKimmig Nov 27, 2025
26d0057
feat(external-worker): support exportable configs and nodeshelf updates
JulianKimmig Nov 27, 2025
a30647b
fix(worker): align external worker shelf updates and export
JulianKimmig Nov 27, 2025
9413bc5
test(worker): migrate fixtures and cover external worker exports
JulianKimmig Nov 27, 2025
d6a77e5
bump: version 1.2.1 → 1.3.0
JulianKimmig Nov 27, 2025
8c1f30d
refactor(tests): update worker fixture to use test name for UUID
JulianKimmig Nov 28, 2025
6bce27a
refactor(tests): migrate test_client_connection to pytest and improve…
JulianKimmig Nov 28, 2025
c6e51db
refactor(tests): migrate SocketWorker tests to pytest and utilize fix…
JulianKimmig Nov 28, 2025
d3a89d1
refactor(tests): remove unused pytestmark from test_socketworker.py
JulianKimmig Nov 28, 2025
7b371dc
refactor(tests): migrate WSWorker tests to pytest and implement fixtures
JulianKimmig Nov 29, 2025
65cff4d
refactor(tests): migrate test_loops to pytest and implement fixtures
JulianKimmig Nov 29, 2025
45bb795
fix(loop): await task shutdown and prune stale worker state
JulianKimmig Nov 30, 2025
1f7f077
feat(worker): add optional name and config parameters to external wor…
JulianKimmig Dec 5, 2025
973c5e0
bump: version 1.3.0 → 1.4.0
JulianKimmig Dec 5, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,39 @@
# Changelog

## 1.4.0 (2025-12-05)

### Feat

- **worker**: add optional name and config parameters to external worker initialization

### Fix

- **loop**: await task shutdown and prune stale worker state

### Refactor

- **tests**: migrate test_loops to pytest and implement fixtures
- **tests**: migrate WSWorker tests to pytest and implement fixtures
- **tests**: remove unused pytestmark from test_socketworker.py
- **tests**: migrate SocketWorker tests to pytest and utilize fixtures
- **tests**: migrate test_client_connection to pytest and improve test structure
- **tests**: update worker fixture to use test name for UUID

## 1.3.0 (2025-11-27)

### Feat

- **external-worker**: support exportable configs and nodeshelf updates
- **websocket**: implement graceful client connection closure and enhance message enqueue handling
- **worker**: enhance FuncNodesExternalWorker with nodeshelf property and logging for configuration updates
- **external_worker**: introduce ExternalWorkerConfig for improved configuration management and update FuncNodesExternalWorker to utilize it

### Fix

- **worker**: align external worker shelf updates and export
- **loop**: handle closed or missing event loop for tasks
- **tests**: correct asyncio_default_fixture_loop_scope format in pytest configuration

## 1.2.1 (2025-11-06)

### Fix
Expand Down
6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
[project]
name = "funcnodes-worker"
version = "1.2.1"
version = "1.4.0"
description = "Worker package for FuncNodes"
readme = "README.md"
authors = [{name = "Julian Kimmig", email = "julian.kimmig@linkdlab.de"}]

requires-python = ">=3.11"
dependencies = [
"asynctoolkit>=0.1.1",
"funcnodes-core>=1.0.1",
"funcnodes-core>=2.0.0",
"packaging>=24.2",
"pip>=25.0.1",
"pydantic>=2.12.4",
"python-slugify>=8.0.4",
]

Expand All @@ -25,6 +26,7 @@ dev = [
"funcnodes-worker[all]",
"funcnodes>=1.0.0",
"objgraph>=3.6.2",
"pytest-funcnodes>=0.2.0",
]

[project.optional-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[pytest]
asyncio_mode=auto
asyncio_default_fixture_loop_scope="function"
asyncio_default_fixture_loop_scope=function
addopts = -p no:logging
3 changes: 2 additions & 1 deletion src/funcnodes_worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from .worker import Worker
from .remote_worker import RemoteWorker
from .external_worker import FuncNodesExternalWorker
from .external_worker import FuncNodesExternalWorker, ExternalWorkerConfig
from .loop import CustomLoop

if not aiohttp:
Expand All @@ -17,6 +17,7 @@


__all__ = [
"ExternalWorkerConfig",
"Worker",
"RemoteWorker",
"FuncNodesExternalWorker",
Expand Down
135 changes: 131 additions & 4 deletions src/funcnodes_worker/external_worker.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,64 @@
from __future__ import annotations
from typing import Dict, List, TypedDict
from pathlib import Path
from typing import Dict, List, TypedDict, Union, Any, Optional, Type, ClassVar
from funcnodes_worker.loop import CustomLoop
from funcnodes_core import NodeClassMixin, JSONEncoder, Encdata, EventEmitterMixin
from funcnodes_core import (
NodeClassMixin,
JSONEncoder,
Encdata,
EventEmitterMixin,
Shelf,
FUNCNODES_LOGGER,
)
from weakref import WeakValueDictionary
from pydantic import BaseModel
from weakref import ref


class ExternalWorkerConfig(BaseModel):
"""
A class that represents the configuration of an external worker.
"""

EXPORT_EXCLUDE_FIELDS: ClassVar[set[str]] = set()

@classmethod
def export_exclude_fields(cls) -> set[str]:
"""Returns field names that should be removed when exporting config."""
excluded = set(getattr(cls, "EXPORT_EXCLUDE_FIELDS", set()))
fields = getattr(cls, "model_fields", None) or getattr(cls, "__fields__", {})
for name, field in fields.items():
extra = getattr(field, "json_schema_extra", None)
if extra is None and hasattr(field, "field_info"):
extra = getattr(field.field_info, "extra", {}) or getattr(
field.field_info, "json_schema_extra", None
)
if extra and extra.get("export") is False:
excluded.add(name)
return excluded

def exportable_dict(self) -> dict:
"""Serialize config without export-excluded fields."""
return self.model_dump(mode="json", exclude=self.export_exclude_fields())


class FuncNodesExternalWorker(NodeClassMixin, EventEmitterMixin, CustomLoop):
"""
A class that represents an external worker with a loop and nodeable methods.
"""

config_cls: Type[ExternalWorkerConfig] = ExternalWorkerConfig

RUNNING_WORKERS: Dict[str, WeakValueDictionary[str, FuncNodesExternalWorker]] = {}
IS_ABSTRACT = True

def __init__(self, workerid) -> None:
def __init__(
self,
workerid,
config: Optional[Union[ExternalWorkerConfig, Dict[str, Any]]] = None,
data_path: Optional[str] = None,
name: Optional[str] = None,
) -> None:
"""
Initializes the FuncNodesExternalWorker class.

Expand All @@ -24,12 +69,84 @@ def __init__(self, workerid) -> None:
delay=1,
)
self.uuid = workerid
self._nodeshelf: Optional[Shelf] = None
self._config = self.config_cls()
self._data_path: Optional[Path] = Path(data_path) if data_path else None
if name:
self.name = name
try:
self.update_config(config)
except Exception:
pass
if self.NODECLASSID not in FuncNodesExternalWorker.RUNNING_WORKERS:
FuncNodesExternalWorker.RUNNING_WORKERS[self.NODECLASSID] = (
WeakValueDictionary()
)
FuncNodesExternalWorker.RUNNING_WORKERS[self.NODECLASSID][self.uuid] = self

@property
def data_path(self) -> Optional[Path]:
if self._data_path is None:
return None
if not self._data_path.exists():
self._data_path.mkdir(parents=True, exist_ok=True)
return self._data_path

@data_path.setter
def data_path(self, data_path: Optional[Path]):
if data_path is None:
self._data_path = None
else:
self._data_path = data_path.resolve()
if not self._data_path.exists():
self._data_path.mkdir(parents=True, exist_ok=True)

def update_config(
self, config: Optional[Union[ExternalWorkerConfig, Dict[str, Any]]] = None
):
if config is None:
return
preconfig = config if isinstance(config, dict) else config.model_dump()
self._config = self.config_cls(**{**self._config.model_dump(), **preconfig})
try:
self.post_config_update()
except Exception as e:
FUNCNODES_LOGGER.exception(e)
FUNCNODES_LOGGER.info(f"config updated for worker {self.uuid}: {self._config}")
return self._config

def post_config_update(self):
"""
This method is called after the config is updated to allow the worker to perform any necessary actions.
"""
pass

@property
def config(self) -> ExternalWorkerConfig:
return self._config

@property
def nodeshelf(self) -> Optional[ref[Shelf]]:
ns = self.get_nodeshelf()
if ns is None:
return None
return ref(ns) #

@nodeshelf.setter
def nodeshelf(self, ns: Optional[Shelf]):
self.set_nodeshelf(ns)

def get_nodeshelf(self) -> Optional[Shelf]:
return self._nodeshelf

def set_nodeshelf(self, ns: Optional[Shelf]):
if ns is None:
self._nodeshelf = ns
if not isinstance(ns, Shelf):
raise ValueError("ns must be a Shelf or None")
self._nodeshelf = ns
self.emit("nodes_update")

@classmethod
def running_instances(cls) -> List[FuncNodesExternalWorker]:
"""
Expand Down Expand Up @@ -58,17 +175,26 @@ async def stop(self):
self.cleanup()
await super().stop()

def serialize(self) -> FuncNodesExternalWorkerJson:
def serialize(self, export: bool = False) -> FuncNodesExternalWorkerJson:
"""
Serializes the FuncNodesExternalWorker class.
"""
cfg = (
self.config.exportable_dict()
if export and hasattr(self.config, "exportable_dict")
else self.config.model_dump(mode="json")
)
return FuncNodesExternalWorkerJson(
uuid=self.uuid,
nodeclassid=self.NODECLASSID,
running=self.running,
name=self.name,
config=cfg,
)

async def loop(self):
pass


class FuncNodesExternalWorkerJson(TypedDict):
"""
Expand All @@ -79,6 +205,7 @@ class FuncNodesExternalWorkerJson(TypedDict):
nodeclassid: str
running: bool
name: str
config: dict


def encode_external_worker(obj, preview=False): # noqa: F841
Expand Down
Loading