Split multithreaded executor#403
Conversation
…into a specific `MultiThreadedTaskExecutor` class.
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
…ecuting the task.
…the 'finally' section.
loichuder
left a comment
There was a problem hiding this comment.
Some basic comments but I am not very familiar with the executor architecture so I'll let someone else continue the review
| @dataclass | ||
| class _TaskExecutorState: | ||
| callbacks: Iterable[Callable[[ThreadedTaskExecutor], None]] | ||
| task_kwargs: dict |
There was a problem hiding this comment.
| task_kwargs: dict | |
| task_kwargs: Dict[str, Any] |
| """Create and execute each Ewoks task in its own dedicated thread.""" | ||
|
|
||
| sigComputationStarted = Signal() | ||
| """Signal emitted when a computation is started""" |
| log_missing_inputs=log_missing_inputs, | ||
| task_executor=task_executor, | ||
| ) | ||
| self.__add_task_executor(state) |
There was a problem hiding this comment.
I don't think having a method __add_task_executor is worthwhile since it is simply an append on the __task_executors
| None, | ||
| ) | ||
|
|
||
| def __process_ended_direct(self, task_executor: ThreadedTaskExecutor): |
There was a problem hiding this comment.
| def __process_ended_direct(self, task_executor: ThreadedTaskExecutor): | |
| def __handle_process_ended(self, task_executor: ThreadedTaskExecutor): |
| def __process_ended(self): | ||
| self.__process_ended_direct(self.sender()) | ||
|
|
||
| def _getState(self, task_executor: ThreadedTaskExecutor) -> _TaskExecutorState: |
There was a problem hiding this comment.
Other methods are in snake_case. Should this one be as well?
| def __remove_task_executor(self, task_executor: ThreadedTaskExecutor) -> None: | ||
| if task_executor is None: |
There was a problem hiding this comment.
If the type is right, this condition should never be true
There was a problem hiding this comment.
It was too complex before and it is still too complex now.
Do you think we can simplify TaskExecutor, ThreadedTaskExecutor, MultiThreadedTaskExecutor and TaskExecutorQueue?
Ultimately we are executing Ewoks tasks
from ewokscore.inittask import instantiate_task
task = instantiate_task(...)
task.execute()with multithreading or in the future multiprocessing (so they can be killed).
The execution modes iirc are
- serial (
TaskExecutorQueue), - parallel (
MultiThreadedTaskExecutor) - "drop if busy" (
ThreadedTaskExecutor)
In addition we need Qt signals for success and failure of each execution.
How about this as a basis which would work for threads, processes and Qt (chatgpt generated)
Edit: better suggestion with demo in #403 (comment)
from enum import Enum, auto
from concurrent.futures import Executor
from threading import Lock
from AnyQt.QtCore import QObject, Signal
class SubmitMode(Enum):
DROP_IF_BUSY = auto() # Ignore new submissions while one is running
SERIAL = auto() # Queue work (max_workers=1)
PARALLEL = auto() # Unlimited (executor decides)
class TaskExecutor(QObject):
succeeded = Signal(object, object) # future, result
failed = Signal(object, Exception) # future, exception
ignored = Signal()
def __init__(self, executor: Executor, mode: SubmitMode, parent=None):
super().__init__(parent)
self._executor = executor
self._mode = mode
self._running = 0
self._lock = Lock()
def submit(self, fn, *args, **kwargs):
with self._lock:
if self._mode is SubmitMode.DROP_IF_BUSY and self._running:
self.ignored.emit()
return None
self._running += 1
future = self._executor.submit(fn, *args, **kwargs)
future.add_done_callback(self._done)
return future
def _done(self, future):
with self._lock:
self._running -= 1
try:
result = future.result()
except Exception as exc:
self.failed.emit(future, exc)
else:
self.succeeded.emit(future, result)
def shutdown(self, **kwargs):
self._executor.shutdown(**kwargs)If we cannot use this with Qt because of the event loop or something, we should be able to implement a similar thing with Qt primitives.
| import warnings | ||
|
|
||
| from ..gui.concurrency.base import TaskExecutor # noqa F401 | ||
| from ..gui.concurrency.threaded import MultiThreadedTaskExecutor # noqa F401 |
There was a problem hiding this comment.
No need. This module is deprecated like many thinking under ewoksorange.bindings
|
Would be used like this from concurrent.futures import ThreadPoolExecutor
# Drop while busy
exec1 = TaskExecutor(
ThreadPoolExecutor(max_workers=1),
SubmitMode.DROP_IF_BUSY,
)
# Queue
exec2 = TaskExecutor(
ThreadPoolExecutor(max_workers=1),
SubmitMode.SERIAL,
)
# Parallel
exec3 = TaskExecutor(
ThreadPoolExecutor(),
SubmitMode.PARALLEL,
)from concurrent.futures import ProcessPoolExecutor
exec4 = TaskExecutor(
ProcessPoolExecutor(),
SubmitMode.PARALLEL,
)I would not integrate more logic in |
|
Another chatgpt generated demo with a somewhat changed logic in terms for execution mode (tested with pyqt5): import logging
import random
import sys
import time
from concurrent.futures import Executor
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from enum import Enum
from enum import auto
from threading import Lock
from AnyQt.QtCore import QObject
from AnyQt.QtCore import Signal
from AnyQt.QtWidgets import QApplication
from AnyQt.QtWidgets import QGridLayout
from AnyQt.QtWidgets import QPushButton
from AnyQt.QtWidgets import QTextEdit
from AnyQt.QtWidgets import QVBoxLayout
from AnyQt.QtWidgets import QWidget
class SubmitPolicy(Enum):
ALWAYS = auto()
DROP_IF_BUSY = auto()
class QtExecutor(QObject):
submitted = Signal(object) # Future
succeeded = Signal(object, object) # Future, result
failed = Signal(object, object) # Future, exception
ignored = Signal()
def __init__(self, executor: Executor, policy=SubmitPolicy.ALWAYS):
super().__init__()
self._executor = executor
self._policy = policy
self._running = 0
self._lock = Lock()
def submit(self, fn, *args, **kwargs):
with self._lock:
if self._policy is SubmitPolicy.DROP_IF_BUSY and self._running:
logging.warning("Submission ignored: executor busy")
self.ignored.emit()
return None
self._running += 1
future = self._executor.submit(fn, *args, **kwargs)
future.add_done_callback(self._done)
self.submitted.emit(future)
return future
def _done(self, future):
with self._lock:
self._running -= 1
try:
result = future.result()
except Exception as exc:
self.failed.emit(future, exc)
else:
self.succeeded.emit(future, result)
def shutdown(self):
self._executor.shutdown(wait=False)
# Must be top-level for ProcessPoolExecutor
def worker(task_id):
delay = random.uniform(1.5, 3.5)
time.sleep(delay)
if random.random() < 0.2:
raise RuntimeError(f"Task {task_id} failed")
return f"Task {task_id} finished after {delay:.1f}s"
class Window(QWidget):
def __init__(self):
super().__init__()
self.setWindowTitle("QtExecutor Demo")
self.log = QTextEdit(readOnly=True)
self.executors = {
"T DROP": QtExecutor(
ThreadPoolExecutor(max_workers=1),
SubmitPolicy.DROP_IF_BUSY,
),
"T QUEUE": QtExecutor(
ThreadPoolExecutor(max_workers=1),
SubmitPolicy.ALWAYS,
),
"T PARALLEL": QtExecutor(
ThreadPoolExecutor(max_workers=4),
SubmitPolicy.ALWAYS,
),
"P DROP": QtExecutor(
ProcessPoolExecutor(max_workers=1),
SubmitPolicy.DROP_IF_BUSY,
),
"P QUEUE": QtExecutor(
ProcessPoolExecutor(max_workers=1),
SubmitPolicy.ALWAYS,
),
"P PARALLEL": QtExecutor(
ProcessPoolExecutor(max_workers=4),
SubmitPolicy.ALWAYS,
),
}
for name, executor in self.executors.items():
executor.submitted.connect(
lambda future, n=name: self.on_submitted(n, future)
)
executor.ignored.connect(lambda n=name: self.log.append(f"[{n}] ⚠ Ignored"))
executor.succeeded.connect(
lambda future, result, n=name: self.log.append(
f"[{n}] ✅ {id(future)} -> {result}"
)
)
executor.failed.connect(
lambda future, exc, n=name: self.log.append(
f"[{n}] ❌ {id(future)} -> {exc}"
)
)
layout = QVBoxLayout(self)
grid = QGridLayout()
buttons = [
("Submit x5 (Thread Drop)", "T DROP"),
("Submit x5 (Thread Queue)", "T QUEUE"),
("Submit x5 (Thread Parallel)", "T PARALLEL"),
("Submit x5 (Process Drop)", "P DROP"),
("Submit x5 (Process Queue)", "P QUEUE"),
("Submit x5 (Process Parallel)", "P PARALLEL"),
]
for i, (text, key) in enumerate(buttons):
button = QPushButton(text)
button.clicked.connect(
lambda checked=False, k=key: self.submit_many(self.executors[k])
)
grid.addWidget(button, i // 2, i % 2)
layout.addLayout(grid)
layout.addWidget(self.log)
self.counter = 1
def on_submitted(self, name, future):
if future is None:
self.log.append(f"[{name}] Submitted -> None")
else:
self.log.append(f"[{name}] Submitted -> {id(future)}")
def submit_many(self, executor):
for _ in range(5):
executor.submit(worker, self.counter)
self.counter += 1
def closeEvent(self, event):
for executor in self.executors.values():
executor.shutdown()
super().closeEvent(event)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
app = QApplication(sys.argv)
window = Window()
window.resize(900, 500)
window.show()
sys.exit(app.exec()) |
This PR extract a multiprocessing executor from the
OWEwoksWidgetOneThreadPerRunclass.The goal is to have a clearer separation between the widget and the executor and ease refactoring proposed in #404
Note: This PR was done using Claude.