Line data Source code
1 : # SPDX-FileCopyrightText: 2025 Pairinteraction Developers 2 : # SPDX-License-Identifier: LGPL-3.0-or-later 3 : 4 0 : import logging 5 0 : from functools import wraps 6 0 : from multiprocessing import Process, SimpleQueue 7 0 : from typing import TYPE_CHECKING, Any, Callable, TypeVar 8 : 9 0 : from PySide6.QtCore import QObject, QThread, Signal 10 : 11 0 : from pairinteraction_gui.app import Application 12 : 13 : if TYPE_CHECKING: 14 : from typing_extensions import ParamSpec 15 : 16 : P = ParamSpec("P") 17 : R = TypeVar("R") 18 : 19 0 : logger = logging.getLogger(__name__) 20 : 21 : 22 0 : class WorkerSignals(QObject): 23 : """Signals to be used by the Worker class.""" 24 : 25 0 : finished = Signal(bool) 26 0 : error = Signal(Exception) 27 0 : result = Signal(object) 28 : 29 : 30 0 : class Worker(QThread): 31 : """Simple worker class to run a function in a separate thread.""" 32 : 33 0 : def __init__(self, fn: Callable[..., Any], *args: Any, **kwargs: Any) -> None: 34 0 : super().__init__(Application.instance()) 35 : 36 0 : Application.all_threads.add(self) 37 : 38 0 : self.fn = fn 39 0 : self.args = args 40 0 : self.kwargs = kwargs 41 : 42 0 : self.signals = WorkerSignals() 43 0 : self.finished.connect(self.finish_up) 44 : 45 0 : def run(self) -> None: 46 : """Initialise the runner function with passed args, kwargs.""" 47 0 : logger.debug("Starting thread %s", self) 48 0 : success = False 49 0 : try: 50 0 : result = self.fn(*self.args, **self.kwargs) 51 0 : success = True 52 0 : except Exception as err: 53 0 : self.signals.error.emit(err) 54 : else: 55 0 : self.signals.result.emit(result) 56 : finally: 57 0 : self.signals.finished.emit(success) 58 : 59 0 : def finish_up(self) -> None: 60 : """Perform any final cleanup or actions before the thread exits.""" 61 0 : logger.debug("Finishing up thread %s", self) 62 0 : Application.all_threads.discard(self) 63 : 64 : 65 0 : def run_in_other_process(func: Callable["P", "R"]) -> Callable["P", "R"]: 66 0 : @wraps(func) 67 0 : def wrapper_func(*args: "P.args", **kwargs: "P.kwargs") -> "R": 68 0 : def mp_func(queue: "SimpleQueue[R]", *args: "P.args", **kwargs: "P.kwargs") -> None: 69 0 : result = func(*args, **kwargs) 70 0 : queue.put(result) 71 : 72 0 : queue: SimpleQueue[R] = SimpleQueue() 73 0 : process = Process(target=mp_func, args=(queue, *args), kwargs=kwargs, daemon=True) 74 0 : Application.all_processes.add(process) 75 0 : process.start() 76 0 : logger.debug("Starting process %s", process.pid) 77 0 : result = queue.get() 78 0 : process.join() 79 0 : logger.debug("Closing process %s", process.pid) 80 0 : process.close() 81 0 : Application.all_processes.discard(process) 82 0 : return result 83 : 84 0 : return wrapper_func