# This Python file uses the following encoding: utf-8
import time
import traceback
from PySide6.QtCore import QObject, QMutex, QMutexLocker, QWaitCondition, Signal, Slot
try:
from .TestSusceptibility import TestSusceptibility
except ImportError:
from TestSusceptibility import TestSusceptibility
[docs]
class TEMFieldWorker(QObject):
finished = Signal()
error = Signal(str)
log = Signal(str, object)
test_progress = Signal(int)
eut_progress = Signal(int)
frequency_done = Signal(object)
waveform = Signal(object)
rf_state_changed = Signal(bool)
am_state_changed = Signal(bool)
[docs]
def __init__(self, *, dwell_time, e_target, names, dotfile, searchpath,
adjust_to_setting, am, freqs):
super().__init__()
self.dwell_time = dwell_time
self.e_target = e_target
self.names = names
self.dotfile = dotfile
self.searchpath = searchpath
self.adjust_to_setting = adjust_to_setting
self.am = am
self.freqs = list(freqs)
self.meas = None
self._devices_ready = False
self._stop = False
self._paused = False
self._rf_is_on = False
self._am_is_on = False
self._mutex = QMutex()
self._pause_cond = QWaitCondition()
[docs]
@Slot()
def run(self):
try:
self.meas = TestSusceptibility()
self.meas.Init(dwell_time=self.dwell_time,
e_target=self.e_target,
names=self.names,
dotfile=self.dotfile,
SearchPath=self.searchpath,
adjust_to_setting=self.adjust_to_setting)
self.meas.init_measurement(self.am)
self._devices_ready = True
self._rf_is_on = True
self.rf_state_changed.emit(True)
total = len(self.freqs)
for idx, f in enumerate(self.freqs, start=1):
if self._should_stop() or not self._wait_if_paused():
break
self.test_progress.emit(int(idx / total * 100) if total else 100)
self.log.emit(f"set freq to {f} MHz", f"Freq: {round(f * 1e-6, 2)} MHz")
self.meas.mg.SetFreq_Devices(f)
self.meas.mg.EvaluateConditions()
self._set_am(False)
self._set_rf(True)
self.log.emit("adjust Level...", None)
e_field = self.meas.adjust_level()
short = (
f"Ex = {round(e_field[0].get_expectation_value_as_float(), 2)} V/m, "
f"Ey = {round(e_field[1].get_expectation_value_as_float(), 2)} V/m, "
f"Ez = {round(e_field[2].get_expectation_value_as_float(), 2)} V/m"
)
self.log.emit(f"E-Field: Ex = {e_field[0]}, Ey = {e_field[1]}, Ez = {e_field[2]},", short)
self._emit_waveform()
self._set_am(True)
if not self._run_eut_monitor():
break
self.frequency_done.emit({
"freq": f,
"e_field": e_field,
"status": "Passed",
})
self._safe_finish()
if not self._should_stop():
self.log.emit("all frequencies processed", None)
self.test_progress.emit(100)
except Exception:
self.error.emit(traceback.format_exc())
self._safe_finish()
finally:
self.finished.emit()
def _run_eut_monitor(self):
start = now = time.time()
end = start + self.dwell_time
last_waveform = 0.0
while now < end:
if self._should_stop() or not self._wait_if_paused():
return False
time.sleep(0.01)
now = time.time()
percentage = round((now - start) / self.dwell_time, 2) * 100 if self.dwell_time else 100
self.eut_progress.emit(min(100, int(percentage)))
if now - last_waveform > 0.2:
self._emit_waveform()
last_waveform = now
self.eut_progress.emit(100)
return True
def _emit_waveform(self):
if self.meas is None:
return
err, ts, ex, ey, ez = self.meas.get_waveform()
self.waveform.emit({
"err": err,
"t": ts,
"ex": ex,
"ey": ey,
"ez": ez,
"main_e_component": self.meas.main_e_component,
})
def _safe_finish(self):
if self.meas is None:
return
try:
self._set_am(False)
self._set_rf(False)
try:
self.meas.mg.CmdDevices(False, "Standby")
except AttributeError:
pass
self.meas.mg.Quit_Devices()
self._devices_ready = False
except Exception as exc:
self.error.emit(f"Could not shut down devices cleanly: {exc}")
def _set_rf(self, state):
if self.meas is None or not self._devices_ready:
return False
try:
status = self.meas.rf_on() if state else self.meas.rf_off()
except Exception as exc:
self.error.emit(f"RF {'On' if state else 'Off'} failed: {exc}")
return False
if status is True:
self._rf_is_on = state
self.rf_state_changed.emit(state)
self.log.emit("RF On" if state else "RF Off", None)
return status
def _set_am(self, state):
if self.meas is None or not self._devices_ready:
return False
try:
status = self.meas.am_on() if state else self.meas.am_off()
except Exception as exc:
self.error.emit(f"AM {'On' if state else 'Off'} failed: {exc}")
return False
if status is True:
self._am_is_on = state
self.am_state_changed.emit(state)
self.log.emit("AM On" if state else "AM Off", None)
return status
def _should_stop(self):
with QMutexLocker(self._mutex):
return self._stop
def _wait_if_paused(self):
self._mutex.lock()
try:
while self._paused and not self._stop:
self._pause_cond.wait(self._mutex)
return not self._stop
finally:
self._mutex.unlock()
[docs]
@Slot()
def stop(self):
with QMutexLocker(self._mutex):
self._stop = True
self._paused = False
self._pause_cond.wakeAll()
self.rf_off()
[docs]
@Slot(result=bool)
def toggle_pause(self):
with QMutexLocker(self._mutex):
if self._stop:
return False
self._paused = not self._paused
paused = self._paused
if not paused:
self._pause_cond.wakeAll()
if paused:
self.rf_off()
else:
self.rf_on()
if self._am_is_on:
self.am_on()
return paused
[docs]
@Slot()
def rf_on(self):
self._set_rf(True)
[docs]
@Slot()
def rf_off(self):
self._set_rf(False)
[docs]
@Slot()
def am_on(self):
self._set_am(True)
[docs]
@Slot()
def am_off(self):
self._set_am(False)