Source code for mpylab.env.Measure

# -*- coding: utf-8 -*-
"""This is :mod:`mpylab.env.Measure` with :class:`mpylab.env.Measure.Measure` being the base class for e.g. :class:`mpylab.env.msc.MSC.MSC`

   :author: Hans Georg Krauthäuser (main author)

   :license: GPL-3 or higher
"""

from typing import Any
import gzip
import inspect
import os
import pickle
import sys
import tempfile
import time

#try:
#    import mpylab.tools.unixcrt as crt
#except ImportError:
#    class CRT:
#        def unbuffer_stdin(self):
#            pass
#
#        def restore_stdin(self):
#            pass
#
#    crt = CRT()

from mpylab.tools import util, calling
from mpylab.env.ui.ui_adapter import TUIAdapter
from scuq.quantities import Quantity
from scuq.si import WATT

try:
    import pyttsx3
    _tts = pyttsx3.init()
    _tts.setProperty('volume', 1.0)
    vs = _tts.getProperty('voices')
    for v in vs:
        if 'en_GB' in v.languages:  # take first british speaker
            _tts.setProperty('voice', v.id)
            break
    #import festival
    #festival.execCommand("(voice_en1_mbrola)")
    #_tts = festival
    #_tts.say = _tts.sayText
    #def __runAndWait():
    #    pass
    #_tts.runAndWait = __runAndWait
except ImportError:
    #festival = None
    pyttsx3 = None
    _tts = None


[docs] class Measure(object): """Base class for measurements. """
[docs] def __init__(self, SearchPaths=None): """constructor""" if SearchPaths is None: SearchPaths = [os.getcwd()] self.SearchPaths = SearchPaths self.asname = None self.ascmd = None self.autosave = False self.autosave_interval = 3600 self.lastautosave = time.time() self.logger = [self.stdlogger] self.logfile = None self.logfilename = None self._setup_ui_adapter()
def _setup_ui_adapter(self): """Create/recreate UI adapter and bind legacy interaction hooks.""" self.ui = TUIAdapter( messenger=self.stdUserMessenger, logger=self.logger, interrupt_tester=self.stdUserInterruptTester, pre_user_event=self.stdPreUserEvent, post_user_event=self.stdPostUserEvent, interactive_runner=self.stdInteractiveSession, ) self.messenger = self.ui.ask self.UserInterruptTester = self.ui.check_interrupt self.PollKey = self.ui.poll_key self.PreUserEvent = self.ui.pre_user_event self.PostUserEvent = self.ui.post_user_event def __setstate__(self, dct): """used instead of __init__ when instance is created from pickle file""" if dct['logfilename'] is None: logfile = None else: logfile = open(dct['logfilename'], "a+") self.__dict__.update(dct) self.logfile = logfile self.logger = [self.stdlogger] self._setup_ui_adapter() def __getstate__(self): """prepare a dict for pickling""" odict = self.__dict__.copy() odict.pop('logfile', None) odict.pop('logger', None) odict.pop('messenger', None) odict.pop('UserInterruptTester', None) odict.pop('PollKey', None) odict.pop('PreUserEvent', None) odict.pop('PostUserEvent', None) odict.pop('ui', None) return odict @staticmethod def _invoke_wait_handler(handler, dct): """Call a wait/interrupt handler in a backward-compatible way. Supported styles: - ``handler(dct)`` (legacy flow handler) - ``handler()`` (poll-key style) """ if not callable(handler): return None try: sig = inspect.signature(handler) params = list(sig.parameters.values()) has_var_positional = any(p.kind == inspect.Parameter.VAR_POSITIONAL for p in params) positional = [ p for p in params if p.kind in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD) ] if has_var_positional or positional: return handler(dct) return handler() except (TypeError, ValueError): try: return handler(dct) except TypeError: return handler()
[docs] def wait(self, delay, dct, uitester, intervall=0.1): """A wait function that can be interrupted. - *delay*: seconds to wait - *dct*: namespace passed to legacy handler style - *uitester*: interrupt callback; supports ``fn(dct)`` and ``fn()`` - *intervall*: seconds to sleep between callback calls Return: *None* """ start = time.time() delay = abs(delay) intervall = abs(intervall) while time.time() - start < delay: self._invoke_wait_handler(uitester, dct) time.sleep(intervall)
[docs] def out(self, item): """Helper function for all output functions. Prints *item* recursively all in one line. The parameter *item* can be: - a :class:`dict` of items (`hasattr(item, 'keys')==True`) - a :class:`list` of items (`hasattr(item, 'index')==True`) - a sequence of items (using :meth:`mpylab.tools.util.issequence`) - or anything else (will be printed via `print item,`) The return value is `None`. """ if hasattr(item, 'keys'): # a dict like object print("{", end=' ') for k in list(item.keys()): print(str(k) + ":", end=' ') self.out(item[k]) print("}", end=' ') elif hasattr(item, 'append'): # a list like object print("[", end=' ') for i in item: self.out(i) print("]", end=' ') elif util.issequence(item): # other sequence print("(", end=' ') for i in item: self.out(i) print(")", end=' ') else: print(item, end=' ')
[docs] def set_autosave_interval(self, interval): """Set the intervall between auto save. *intervall*: seconds between auto save This method returns `None`. """ self.autosave_interval = interval
[docs] def stdlogger(self, block, *args): """The standard method to write messages to log file. Print *block* to `self.logfile` or to `stdout` (if `self.logfile` is `None`). If *block* has attribute `keys` (i.e. is a :class:`dict`), the elements are processed with the local function :meth:`out_block`. Else, the block is printed directly. Parameter *block*: object to log Return value: `None` """ def out_block(b): """Helper function to log something. """ assert hasattr(b, 'keys'), "Argument b has to be a dict." try: print(repr(b['comment']), end=' ') except KeyError: pass try: par = b['parameter'] for des, p in par.items(): print(des, end=' ') out_block(p) try: item = b['value'] except KeyError: item = None self.out(item) except KeyError: pass sys.stdout.flush() stdout = sys.stdout # save stdout if self.logfile is not None: sys.stdout = self.logfile try: try: for des, bd in block.items(): print(util.tstamp(), des, end=' ') out_block(bd) print() # New Line except AttributeError: print(block) finally: sys.stdout = stdout # restore stdout
[docs] def stdUserMessenger(self, msg: str = "Are you ready?", but: list[str] | None = None, level: str = '', dct: dict[Any, Any] | None = None) -> int: """The standard (default) method to present messages to the user. The behaviour depends on the value of the parameter *but*. If `len(but)` (buttons are given) the funttions waits for a user answer. Else, the *msg* is presented only. The function also calls all additional logger functions given in `self.logger` with the same arguments. Parameters: - *msg*: message to display - *but*: sequence with the text strings of the buttons - *level*: to indicate something (not used in the standard logger) - *dct*: a :class:`dict` with further parameters (not used in the standard logger) Return value: the index of the selected button (starting from `0`), or `-1` if `len(but)` is `False`. """ if but is None: but = ["Ok", "Quit"] if dct is None: dct = {} print(msg) self.ui.emit_log(msg, but, level, dct) if level in ('email',): try: util.send_email(to=dct['to'], fr=dct['from'], subj=dct['subject'], msg=msg) except (NameError, KeyError): util.LogError(self.messenger) if len(but): # button(s) are given -> wait if _tts: _tts.say(msg) _tts.runAndWait() self.PreUserEvent() try: while True: key = chr(util.keypress()) key = key.lower() for s in but: if s.lower().startswith(key): if _tts: _tts.say(s) # , pyTTS.tts_purge_before_speak) _tts.runAndWait() return but.index(s) finally: self.PostUserEvent() else: return -1
[docs] @staticmethod def stdUserInterruptTester() -> int | None: """The standard (default) user interrupt tester. Returns return value of :meth:`mpylab.util.anykeyevent()` """ return util.anykeyevent()
[docs] def set_logfile(self, name): """Tries to open a file with the given name with mode `'a+'`. If this fails, nothing will happen, else :meth:`stdloogger` will log to this file. Parameter *name*: full qualified name of the file to be used as logfile Return: `None` """ import pathvalidate # import unicodedata # import string # import re # validFilenameChars = "-_.() %s%s" % (string.ascii_letters, string.digits) # # def slugify(value, allow_unicode=False): # """ # Taken from https://github.com/django/django/blob/master/django/utils/text.py # Convert to ASCII if 'allow_unicode' is False. Convert spaces or repeated # dashes to single dashes. Remove characters that aren't alphanumerics, # underscores, or hyphens. Convert to lowercase. Also strip leading and # trailing whitespace, dashes, and underscores. # """ # value = str(value) # if allow_unicode: # value = unicodedata.normalize('NFKC', value) # else: # value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').decode('ascii') # value = re.sub(r'[^\w\s-]', '', value.lower()) # return re.sub(r'[-\s]+', '-', value).strip('-_') # # def removeDisallowedFilenameChars(filename): # try: # cleanedFilename = unicodedata.normalize('NFKD', filename).encode('ASCII', 'ignore') # except TypeError: # cleanedFilename = unicodedata.normalize('NFKD', str(filename)).encode('ASCII', 'ignore') # return ''.join(c for c in cleanedFilename if c in validFilenameChars) log = None # name = removeDisallowedFilenameChars(name) # name = slugify(name) name = pathvalidate.sanitize_filename(name) try: log = open(name, "a+") except IOError: util.LogError(self.messenger) else: if self.logfile is not None: try: self.logfile.close() except IOError: util.LogError(self.messenger) self.logfilename = name self.logfile = log
[docs] def set_logger(self, logger=None): """Set up the list of logger fuctions (`self.logger`). If `logger is None`, :meth:`stdlogger` is used. Parameter *logger*: list of functions called to log events Return: *None* """ if logger is None: logger = [self.stdlogger] logger = util.flatten(logger) # ensure flat list self.logger = [l for l in logger if callable(l)] self.ui.set_logger(self.logger)
[docs] def set_messenger(self, messenger): """Set function to present messages. Parameter *messenger*: the messenger (see :meth:`stdUserMessenger`) Return: *None* """ if callable(messenger): self.ui.set_messenger(messenger) self.messenger = self.ui.ask
[docs] def set_user_interrupt_tester(self, tester): """Set function to test for user interrupt. Parameter *tester*: callable (see :meth:`stdUserInterruptTester`). Return: *None* """ if callable(tester): self.ui.set_interrupt_tester(tester) self.UserInterruptTester = self.ui.check_interrupt self.PollKey = self.ui.poll_key
[docs] def set_user_interrupt_Tester(self, tester): """Backward-compatible alias for :meth:`set_user_interrupt_tester`.""" self.set_user_interrupt_tester(tester)
[docs] def set_pre_user_event(self, event_cb): """Set function called before user-facing UI interactions.""" if callable(event_cb): self.ui.set_pre_user_event(event_cb) self.PreUserEvent = self.ui.pre_user_event
[docs] def set_post_user_event(self, event_cb): """Set function called after user-facing UI interactions.""" if callable(event_cb): self.ui.set_post_user_event(event_cb) self.PostUserEvent = self.ui.post_user_event
[docs] def set_interactive_runner(self, runner): """Set function used for interactive user sessions.""" if callable(runner): self.ui.set_interactive_runner(runner)
[docs] def set_autosave(self, name): """Setter for the class attribute *asname* (name of the auto save file). Parameter *name*: file name oif the auto save file Return: *None* """ self.asname = name
def _init_measurement_devices(self, mg, do_zero=False, do_rfoff=False): """Initialize measurement devices with optional safe defaults. Returns: int: status from init path (`0` means success). """ self.messenger(util.tstamp() + " Init devices...", []) err = mg.Init_Devices() if err: self.messenger(util.tstamp() + " ...faild with err %d" % (err), []) return err self.messenger(util.tstamp() + " ...done", []) if do_rfoff: mg.RFOff_Devices() if do_zero: self.messenger(util.tstamp() + " Zero devices...", []) mg.Zero_Devices() self.messenger(util.tstamp() + " ...done", []) return 0 def _finalize_measurement_devices(self, mg, do_rfoff=True, do_quit=True): """Finalize measurement devices in a fail-safe way. Returns: int: last device status seen (`0` by default). """ stat = 0 if mg is None: return stat if do_rfoff: self.messenger(util.tstamp() + " RF Off...", []) try: stat = mg.RFOff_Devices() except Exception: util.LogError(self.messenger) if do_quit: self.messenger(util.tstamp() + " Quit...", []) try: stat = mg.Quit_Devices() except Exception: util.LogError(self.messenger) return stat def _handle_user_interrupt_common( self, dct, ignorelist='', set_level_cb=None, do_leveling_cb=None, wait_handler=None, on_resume_cb=None, ): """Shared interrupt/suspend/resume flow used by measurement classes. Returns: bool: `True` if an interrupt was handled, else `False`. """ key = self.UserInterruptTester() if not key or chr(key) in ignorelist: return False # Empty key buffer. _k = self.UserInterruptTester() while _k is not None: _k = self.UserInterruptTester() mg = dct['mg'] names = dct.get('names', {}) f = dct.get('f') SGLevel = dct.get('SGLevel') leveling = dct.get('leveling') hassg = (SGLevel is not None and leveling is not None) delay = dct.get('delay', 0) nblist = dct.get('nblist', []) self.messenger(util.tstamp() + " RF Off...", []) mg.RFOff_Devices() msg1 = ( "The measurement has been interrupted by the user.\n" "How do you want to proceed?\n\n" "Continue: go ahead...\n" "Suspend: Quit devices, go ahead later after reinit...\n" "Interactive: Go to interactive mode...\n" "Quit: Quit measurement..." ) but1 = ['Continue', 'Suspend', 'Interactive', 'Quit'] answer = self.messenger(msg1, but1) if answer == but1.index('Quit'): self.messenger(util.tstamp() + " measurment terminated by user.", []) raise UserWarning if answer == but1.index('Interactive'): self.ui.run_interactive( self, "Press CTRL-D (Linux,MacOS) or CTRL-Z (Windows) plus Return to exit" ) elif answer == but1.index('Suspend'): self.messenger(util.tstamp() + " measurment suspended by user.", []) mg.Quit_Devices() msg2 = "Measurement is suspended.\n\nResume: Reinit and continue\nQuit: Quit measurement..." but2 = ['Resume', 'Quit'] answer = self.messenger(msg2, but2) if answer == but2.index('Resume'): self._init_measurement_devices(mg, do_zero=True, do_rfoff=True) if hassg and callable(set_level_cb): try: set_level_cb(mg, names, SGLevel) except AmplifierProtectionError as _e: self.messenger( util.tstamp() + " Can not set signal generator level. Amplifier protection raised with message: %s" % _e.message, [], ) if f is not None: mg.SetFreq_Devices(f) mg.EvaluateConditions() if callable(on_resume_cb): on_resume_cb(mg, names, dct) else: self.messenger(util.tstamp() + " measurment terminated by user.", []) raise UserWarning self.messenger(util.tstamp() + " RF On...", []) mg.RFOn_Devices() if hassg and callable(do_leveling_cb): do_leveling_cb(leveling, mg, names, dct) if wait_handler is None: wait_handler = self._handle_user_interrupt_common try: self.messenger(util.tstamp() + " Going to sleep for %d seconds ..." % (delay), []) self.wait(delay, dct, wait_handler) self.messenger(util.tstamp() + " ... back.", []) except Exception: pass mg.NBTrigger(nblist) return True
[docs] def do_autosave(self, name_or_obj=None, depth=None, prefixes=None): """Serialize *self* using :mod:`pickle`. Assuming a calling sequence like so:: script -> method of measurement class -> do_autosave `depth = 1` (default) will set *self.ascmd* to the command issued in the script. If depth is too large, the outermost command is used. Thus, the issued command in *script* is extracted and saved in *self.ascmd*. This can be used to redo the command after a crash. Return: *None* """ if depth is None: depth = 1 if name_or_obj is None: name_or_obj = getattr(self, 'asname', None) # we want to save the cmd that has been used # (in order to get all the calling parameters) try: self.autosave = True # mark the state calling_sequence = calling.get_calling_sequence(prefixes=prefixes) calling_sequence = [cs for cs in calling_sequence if cs != '<string>'] # print calling_sequence try: ascmd = calling_sequence[depth] except IndexError: ascmd = calling_sequence[-1] if ascmd.startswith('exec'): # print self.ascmd ascmd = ascmd[ascmd.index('(') + 1: ascmd.rindex(')')].strip() # part between brackets var = util.get_var_from_nearest_outerframe(ascmd) if var: ascmd = var self.ascmd = ascmd # print "Measure.py; 363:", self.ascmd # now, we can serialize 'self' pfile = None if isinstance(name_or_obj, str): # it's a string (filename) try: if name_or_obj.endswith(('.gz', '.zip')): # gzip pfile = gzip.open(self.asname, "wb") else: pfile = open(self.asname, "wb") # regular pickle except IOError: util.LogError(self.messenger) elif hasattr(name_or_obj, 'write'): # file-like object pfile = name_or_obj if pfile is None: fd, fname = tempfile.mkstemp(suffix='.p', prefix='autosave', dir='.', text=False) pfile = os.fdopen(fd, 'wb') # print pfile, type(pfile) try: try: pickle.dump(self, pfile, 2) self.lastautosave = time.time() except IOError: util.LogError(self.messenger) finally: try: pfile.close() except IOError: util.LogError(self.messenger) finally: self.autosave = False
# print self.ascmd def _run_with_output_target(self, fname, fn, *args, **kwargs): """Run a printer function with optional stdout redirection to file.""" stdout = sys.stdout fp = None if fname: fp = open(fname, "w") sys.stdout = fp try: return fn(*args, **kwargs) finally: try: if fp is not None: fp.close() except IOError: util.LogError(self.messenger) sys.stdout = stdout
[docs] @staticmethod def stdPreUserEvent(): #"""Just calls :meth:`mpylab.tools.unixcrt.unbuffer_stdin()`. # See there... #""" #crt.unbuffer_stdin() """stdPreUserEvent method.""" pass
[docs] @staticmethod def stdPostUserEvent(): #"""Just calls :meth:`mpylab.tools.unixcrt.restore_stdin()` # See there... #""" #crt.restore_stdin() """stdPostUserEvent method.""" pass
[docs] @staticmethod def stdInteractiveSession(obj, banner): """Default interactive session hook for terminal usage.""" util.interactive(obj=obj, banner=banner)
# def do_leveling(self, leveling, mg, names, dct): # """Perform leveling on the measurement graph. # - *leveling*: sequence of dicts with leveling records. Each record is a dict with keys # 'conditions', 'actor', 'watch', 'nominal', 'reader', 'path', 'actor_min', and 'actor_max'. # The meaning is: # - condition: has to be True in order that this lewveling takes place. The condition is evaluated in the global namespace and in C{dct}. # - actor: at the moment, this can only be a signalgenerator 'sg' # - watch: the point in the graph to be monitored (e.g. antena input) # - nominal: the desired value at watch # - reader: the device reading the value for watch (e.g. forward poer meter) # - path: Path between reader and watch # - actor_min, actor_max: valid range for actor values # - *mg*: the measurement graph # - *names*: mapping between symbolic names and real names in the dot file # - *dct*: namespace used for the evaluation of *condition* # Return: the level set at the actor # """ # for l in leveling: # if eval(l['condition'], globals(), dct): # actor = l['actor'] # watch = l['watch'] # nominal = l['nominal'] # reader = l['reader'] # path = l['path'] # ac_min = l['actor_min'] # ac_max = l['actor_max'] # if actor not in ['sg']: # self.messenger(util.tstamp()+" Only signal generator can be used as leveling actor.", []) # break # for dev in [watch, reader]: # if dev not in names: # self.messenger(util.tstamp()+" Device '%s' not found"%dev, []) # break # c_level = device.UMDCMResult(complex(0.0,mg.zero(umddevice.UMD_dB)),umddevice.UMD_dB) # for cpath in path: # if mg.find_shortest_path(names[cpath[0]],names[cpath[-1]]): # c_level *= mg.get_path_correction(names[cpath[0]],names[cpath[-1]], umddevice.UMD_dB)['total'] # elif mg.find_shortest_path(names[cpath[-1]],names[cpath[0]]): # c_level /= mg.get_path_correction(names[cpath[-1]],names[cpath[0]], umddevice.UMD_dB)['total'] # else: # self.messenger(util.tstamp()+" can't find path from %s tp %s (looked for both directions)."%(cpath[0],cpath[-1]), []) # break # if ac_min == ac_max: # return self.set_level(mg, names, ac_min) # def __objective (x, mg=mg): # self.set_level(mg, names, x) # actual = mg.Read([names[reader]])[names[reader]] # actual = device.UMDCMResult(actual) # cond, a, n = self.__test_leveling_condition(actual, nominal, c_level) # return a-n # l = util.secant_solve(__objective, ac_min, ac_max, nominal.get_u()-nominal.get_v(), 0.1) # return self.set_level(mg, names, l) # #break # only first true condition ie evaluated # return None
[docs] def set_level(self, mg, l, leveler=None): """ """ sg = mg.instrumentation[mg.name.sg] # l is in dBm -> convert to WATT l = Quantity(WATT, 10 ** (0.1 * l) * 0.001) if leveler is None: # try to use instance leveler try: leveler = self.leveler_inst # (**self.leveler_par) except AttributeError: pass # stay with None if leveler: # use MaxSafe l = min(l, leveler.MaxSafe) err, lv = sg.SetLevel(l) # is_save, message = mg.AmplifierProtect (names['sg'], names['a2'], l, sg_unit, typ='lasy') # if not is_save: # raise AmplifierProtectionError, message self.messenger(util.tstamp() + " Signal Generator set to %s" % (lv), []) return lv
# --- Backward compatible legacy aliases ---------------------------------
[docs] def setLevel(self, mg, level_or_names, level_or_leveler=None): """Backward-compatible wrapper for legacy callers. Supported call shapes: - ``setLevel(mg, level_dBm)`` - ``setLevel(mg, level_dBm, leveler)`` - ``setLevel(mg, names_dict, level_dBm)`` (legacy TEM/Univers code) """ if isinstance(level_or_names, dict): # Legacy signature: (mg, names, level_dBm) return self.set_level(mg, level_or_leveler, leveler=None) return self.set_level(mg, level_or_names, leveler=level_or_leveler)
[docs] def doLeveling(self, leveling, mg, names, dct): """Backward-compatible no-op stub for removed legacy leveling API. The legacy callers expect this method to exist and to return either a new level or ``None``. Current code path keeps behavior by returning ``None``. """ _ = (leveling, mg, names, dct) return None
# def __test_leveling_condition(self, actual, nominal, c_level): # cond = True # actual = util.flatten(actual) # ensure lists # nominal= util.flatten(nominal) # for ac,nom in zip(actual,nominal): # ac *= c_level # if hasattr(nom.get_v(), 'mag'): # a complex # nom = nom.mag() # ac = ac.mag() # ac = ac.convert(nominal.unit) # cond &= (nom.get_l() <= ac.get_v() <= nom.get_u()) # return cond, actual.get_v(), nominal.get_v()
[docs] def make_deslist(self, thedata, description): """make_deslist method.""" if description is None: description = list(thedata.keys()) if util.issequence(description): # a sequence deslist = [des for des in description if des in thedata] else: if description in thedata: deslist = [description] else: deslist = [] return deslist
[docs] def MakeDeslist(self, thedata, description): """Backward-compatible wrapper around :meth:`make_deslist`.""" return self.make_deslist(thedata, description)
[docs] def make_whatlist(self, thedata, what): """make_whatlist method.""" allwhat_withdupes = util.flatten([list(v.keys()) for v in thedata.values()]) allwhat = list(set(allwhat_withdupes)) if what is None: whatlist = allwhat else: whatlist = [] what = util.flatten(what) whatlist = [w for w in what if w in allwhat] return whatlist
[docs] def MakeWhatlist(self, thedata, what): """Backward-compatible wrapper around :meth:`make_whatlist`.""" return self.make_whatlist(thedata, what)
[docs] @staticmethod def stdEutStatusChecker(status): """stdEutStatusChecker method.""" return status in ['ok', 'OK']
[docs] @staticmethod def std_eut_status_checker(status): """Backward-compatible alias for :meth:`stdEutStatusChecker`.""" return Measure.stdEutStatusChecker(status)
class Error(Exception): """Base class for all exceptions of this module """ pass class AmplifierProtectionError(Error): """AmplifierProtectionError class.""" def __init__(self, message): self.message = message