LCOV - code coverage report
Current view: top level - src/pairinteraction_gui - worker.py (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 51 0.0 %
Date: 2025-04-29 15:59:54 Functions: 0 12 0.0 %

          Line data    Source code
       1             : # SPDX-FileCopyrightText: 2025 Pairinteraction Developers
       2             : # SPDX-License-Identifier: LGPL-3.0-or-later
       3             : 
       4           0 : import logging
       5           0 : from functools import wraps
       6           0 : from multiprocessing import Process, SimpleQueue
       7           0 : from typing import TYPE_CHECKING, Any, Callable, TypeVar
       8             : 
       9           0 : from PySide6.QtCore import QObject, QThread, Signal
      10             : 
      11           0 : from pairinteraction_gui.app import Application
      12             : 
      13             : if TYPE_CHECKING:
      14             :     from typing_extensions import ParamSpec
      15             : 
      16             :     P = ParamSpec("P")
      17             :     R = TypeVar("R")
      18             : 
      19           0 : logger = logging.getLogger(__name__)
      20             : 
      21             : 
      22           0 : class WorkerSignals(QObject):
      23             :     """Signals to be used by the Worker class."""
      24             : 
      25           0 :     finished = Signal(bool)
      26           0 :     error = Signal(Exception)
      27           0 :     result = Signal(object)
      28             : 
      29             : 
      30           0 : class Worker(QThread):
      31             :     """Simple worker class to run a function in a separate thread."""
      32             : 
      33           0 :     def __init__(self, fn: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
      34           0 :         super().__init__(Application.instance())
      35             : 
      36           0 :         Application.all_threads.add(self)
      37             : 
      38           0 :         self.fn = fn
      39           0 :         self.args = args
      40           0 :         self.kwargs = kwargs
      41             : 
      42           0 :         self.signals = WorkerSignals()
      43           0 :         self.finished.connect(self.finish_up)
      44             : 
      45           0 :     def run(self) -> None:
      46             :         """Initialise the runner function with passed args, kwargs."""
      47           0 :         logger.debug("Starting thread %s", self)
      48           0 :         success = False
      49           0 :         try:
      50           0 :             result = self.fn(*self.args, **self.kwargs)
      51           0 :             success = True
      52           0 :         except Exception as err:
      53           0 :             self.signals.error.emit(err)
      54             :         else:
      55           0 :             self.signals.result.emit(result)
      56             :         finally:
      57           0 :             self.signals.finished.emit(success)
      58             : 
      59           0 :     def finish_up(self) -> None:
      60             :         """Perform any final cleanup or actions before the thread exits."""
      61           0 :         logger.debug("Finishing up thread %s", self)
      62           0 :         Application.all_threads.discard(self)
      63             : 
      64             : 
      65           0 : def run_in_other_process(func: Callable["P", "R"]) -> Callable["P", "R"]:
      66           0 :     @wraps(func)
      67           0 :     def wrapper_func(*args: "P.args", **kwargs: "P.kwargs") -> "R":
      68           0 :         def mp_func(queue: "SimpleQueue[R]", *args: "P.args", **kwargs: "P.kwargs") -> None:
      69           0 :             result = func(*args, **kwargs)
      70           0 :             queue.put(result)
      71             : 
      72           0 :         queue: SimpleQueue[R] = SimpleQueue()
      73           0 :         process = Process(target=mp_func, args=(queue, *args), kwargs=kwargs, daemon=True)
      74           0 :         Application.all_processes.add(process)
      75           0 :         process.start()
      76           0 :         logger.debug("Starting process %s", process.pid)
      77           0 :         result = queue.get()
      78           0 :         process.join()
      79           0 :         logger.debug("Closing process %s", process.pid)
      80           0 :         process.close()
      81           0 :         Application.all_processes.discard(process)
      82           0 :         return result
      83             : 
      84           0 :     return wrapper_func

Generated by: LCOV version 1.16