Source code for temfield_mpylab.measurement_worker

# This Python file uses the following encoding: utf-8
import time
import traceback

from PySide6.QtCore import QObject, QMutex, QMutexLocker, QWaitCondition, Signal, Slot

try:
    from .TestSusceptibility import TestSusceptibility
except ImportError:
    from TestSusceptibility import TestSusceptibility


[docs] class TEMFieldWorker(QObject): finished = Signal() error = Signal(str) log = Signal(str, object) test_progress = Signal(int) eut_progress = Signal(int) frequency_done = Signal(object) waveform = Signal(object) rf_state_changed = Signal(bool) am_state_changed = Signal(bool)
[docs] def __init__(self, *, dwell_time, e_target, names, dotfile, searchpath, adjust_to_setting, am, freqs): super().__init__() self.dwell_time = dwell_time self.e_target = e_target self.names = names self.dotfile = dotfile self.searchpath = searchpath self.adjust_to_setting = adjust_to_setting self.am = am self.freqs = list(freqs) self.meas = None self._devices_ready = False self._stop = False self._paused = False self._rf_is_on = False self._am_is_on = False self._mutex = QMutex() self._pause_cond = QWaitCondition()
[docs] @Slot() def run(self): try: self.meas = TestSusceptibility() self.meas.Init(dwell_time=self.dwell_time, e_target=self.e_target, names=self.names, dotfile=self.dotfile, SearchPath=self.searchpath, adjust_to_setting=self.adjust_to_setting) self.meas.init_measurement(self.am) self._devices_ready = True self._rf_is_on = True self.rf_state_changed.emit(True) total = len(self.freqs) for idx, f in enumerate(self.freqs, start=1): if self._should_stop() or not self._wait_if_paused(): break self.test_progress.emit(int(idx / total * 100) if total else 100) self.log.emit(f"set freq to {f} MHz", f"Freq: {round(f * 1e-6, 2)} MHz") self.meas.mg.SetFreq_Devices(f) self.meas.mg.EvaluateConditions() self._set_am(False) self._set_rf(True) self.log.emit("adjust Level...", None) e_field = self.meas.adjust_level() short = ( f"Ex = {round(e_field[0].get_expectation_value_as_float(), 2)} V/m, " f"Ey = {round(e_field[1].get_expectation_value_as_float(), 2)} V/m, " f"Ez = {round(e_field[2].get_expectation_value_as_float(), 2)} V/m" ) self.log.emit(f"E-Field: Ex = {e_field[0]}, Ey = {e_field[1]}, Ez = {e_field[2]},", short) self._emit_waveform() self._set_am(True) if not self._run_eut_monitor(): break self.frequency_done.emit({ "freq": f, "e_field": e_field, "status": "Passed", }) self._safe_finish() if not self._should_stop(): self.log.emit("all frequencies processed", None) self.test_progress.emit(100) except Exception: self.error.emit(traceback.format_exc()) self._safe_finish() finally: self.finished.emit()
def _run_eut_monitor(self): start = now = time.time() end = start + self.dwell_time last_waveform = 0.0 while now < end: if self._should_stop() or not self._wait_if_paused(): return False time.sleep(0.01) now = time.time() percentage = round((now - start) / self.dwell_time, 2) * 100 if self.dwell_time else 100 self.eut_progress.emit(min(100, int(percentage))) if now - last_waveform > 0.2: self._emit_waveform() last_waveform = now self.eut_progress.emit(100) return True def _emit_waveform(self): if self.meas is None: return err, ts, ex, ey, ez = self.meas.get_waveform() self.waveform.emit({ "err": err, "t": ts, "ex": ex, "ey": ey, "ez": ez, "main_e_component": self.meas.main_e_component, }) def _safe_finish(self): if self.meas is None: return try: self._set_am(False) self._set_rf(False) try: self.meas.mg.CmdDevices(False, "Standby") except AttributeError: pass self.meas.mg.Quit_Devices() self._devices_ready = False except Exception as exc: self.error.emit(f"Could not shut down devices cleanly: {exc}") def _set_rf(self, state): if self.meas is None or not self._devices_ready: return False try: status = self.meas.rf_on() if state else self.meas.rf_off() except Exception as exc: self.error.emit(f"RF {'On' if state else 'Off'} failed: {exc}") return False if status is True: self._rf_is_on = state self.rf_state_changed.emit(state) self.log.emit("RF On" if state else "RF Off", None) return status def _set_am(self, state): if self.meas is None or not self._devices_ready: return False try: status = self.meas.am_on() if state else self.meas.am_off() except Exception as exc: self.error.emit(f"AM {'On' if state else 'Off'} failed: {exc}") return False if status is True: self._am_is_on = state self.am_state_changed.emit(state) self.log.emit("AM On" if state else "AM Off", None) return status def _should_stop(self): with QMutexLocker(self._mutex): return self._stop def _wait_if_paused(self): self._mutex.lock() try: while self._paused and not self._stop: self._pause_cond.wait(self._mutex) return not self._stop finally: self._mutex.unlock()
[docs] @Slot() def stop(self): with QMutexLocker(self._mutex): self._stop = True self._paused = False self._pause_cond.wakeAll() self.rf_off()
[docs] @Slot(result=bool) def toggle_pause(self): with QMutexLocker(self._mutex): if self._stop: return False self._paused = not self._paused paused = self._paused if not paused: self._pause_cond.wakeAll() if paused: self.rf_off() else: self.rf_on() if self._am_is_on: self.am_on() return paused
[docs] @Slot() def rf_on(self): self._set_rf(True)
[docs] @Slot() def rf_off(self): self._set_rf(False)
[docs] @Slot() def am_on(self): self._set_am(True)
[docs] @Slot() def am_off(self): self._set_am(False)