# -*- coding: utf-8 -*-
"""mpylab.env.univers.AmplifierTest module."""
import re
import sys
import time
import numpy as np
from scipy.interpolate import interp1d
from scipy.optimize import minimize_scalar
from scuq import quantities, si
from mpylab.env.Measure import Measure, AmplifierProtectionError
from mpylab.env.univers.name_maps import coerce_amplifier_names
from mpylab.tools import util
from mpylab.tools.aunits import POWERRATIO
from mpylab.tools.mgraph import MGraph
extrap1d = util.extrap1d
[docs]
def dBm2W(v):
"""Convert a dBm input to Watt
:param v: input value(s) in dBm
:type v: float or array-like
:return: input value(s) converted to Watt
:rtype: float or array-like (same as input)
"""
return 10 ** (v * 0.1) * 0.001
def W2dBm(v):
"""Convert a Watt input to dBm
:param v: input value(s) in Watt
:type v: float or array-like
:return: input value(s) converted to dBm
:rtype: float or array-like (same as input)
"""
return 10 * np.log10(v * 1000)
class AmplifierTest(Measure):
"""AmplifierTest class."""
def __init__(self, SearchPaths=None):
super().__init__(SearchPaths=SearchPaths)
self.rawData = {}
self.processedData = {}
def _HandleUserInterrupt(self, dct, ignorelist=''):
return self._handle_user_interrupt_common(
dct,
ignorelist=ignorelist,
set_level_cb=lambda mg, names, sg_level: self.setLevel(mg, names, sg_level),
do_leveling_cb=lambda leveling, mg, names, scope: self.doLeveling(leveling, mg, names, scope),
wait_handler=self._HandleUserInterrupt,
)
def Measure(self, description="AmplifierTest",
dotfile='amplifier.dot',
delay=0.2,
freqs=None,
levels=None,
names=None,
virtual=False):
"""
Performs an amplifier test measurement.
Parameter:
- *description*: key to identify the measurement in the result dictionary
- *dotfile*: forwarded to :class:`mpylab.tools.mgraph.MGraph` to create the mearsurment graph.
- *delay*: time in seconds to wait after setting the frequencie before pulling date from the instruments
- *freqs*: sequence of frequencies in Hz to use for the measurements.
- *names*: dict with the mapping from internal names to dot-file names.
The dict has to have keys 'sg', 'amp_in', 'amp_out', 'pm_fwd', 'pm_bwd' and 'output'.
"""
names = coerce_amplifier_names(names)
self.PreUserEvent()
if self.autosave:
self.messenger(util.tstamp() + " Resume amplifier test measurement from autosave...", [])
else:
self.messenger(util.tstamp() + " Start new amplifier test measurement...", [])
self.rawData.setdefault(description, {})
self.mg = mg = MGraph(dotfile, names, SearchPaths=self.SearchPaths)
instrumentation = mg.CreateDevices()
if virtual:
mg.CmdDevices(False, 'SetVirtual', True)
err = self._init_measurement_devices(mg, do_zero=False, do_rfoff=False)
if err:
return err
try:
if freqs is None:
freqs = []
# set up pin, pout, ...
sg_pout = {}
amp_pin = {}
amp_pout = {}
if self.autosave:
sg_pout = self.rawData[description]['sg_pout'].copy()
amp_pin = self.rawData[description]['amp_pin'].copy()
amp_pout = self.rawData[description]['amp_pout'].copy()
self.autosave = False
msg = \
"""
Amplifier test measurement.
Are you ready to start the measurement?
Start: start measurement.
Quit: quit measurement.
"""
but = ["Start", "Quit"]
answer = self.messenger(msg, but)
if answer == but.index('Quit'):
self.messenger(util.tstamp() + " measurement terminated by user.", [])
raise UserWarning # to reach finally statement
readlist = [mg.get_gname(dev) for dev in ('pm_fwd', 'pm_bwd')]
# loop freqs
for f in freqs:
self.messenger(util.tstamp() + " Frequency %e Hz" % (f), [])
mg.EvaluateConditions()
# set frequency for all devices
(minf, maxf) = mg.SetFreq_Devices(f)
# cable corrections
c_sg_amp = mg.get_path_correction(mg.name.sg, mg.name.amp_in, POWERRATIO)
c_amp_out = mg.get_path_correction(mg.name.amp_out, mg.name.output, POWERRATIO)
c_amp_pm1 = mg.get_path_correction(mg.name.amp_out, mg.name.pm_fwd, POWERRATIO)
c_out_pm2 = mg.get_path_correction(mg.name.output, mg.name.pm_bwd, POWERRATIO)
# ALL measurement start here
block = {}
nbresult = {} # dict for NB-Read results
# measurement..
self.messenger(util.tstamp() + " Starting amplifier test measurement for f = %e Hz ..." % (f), [])
mg.RFOn_Devices()
# print levels
for counter, lv in enumerate(levels):
# print lv
maxSafe = mg.MaxSafeLevel(mg.name.sg, mg.name.amp_out)
if maxSafe:
self.messenger(util.tstamp() + " Maximum safe level: %s" % (maxSafe), [])
isSafe, msg = mg.AmplifierProtect(mg.name.sg, mg.name.amp_out, lv)
# if not isSafe:
# self.messenger(util.tstamp()+" %s"%(msg), [])
# continue
instrumentation.sg.SetLevel(lv)
time.sleep(delay)
mg.NBTrigger(readlist)
results = mg.Read(readlist)
pfwd = results[mg.name.pm_fwd]
pbwd = results[mg.name.pm_bwd]
pin = (lv * c_sg_amp).reduce_to(si.WATT)
pout = (pfwd / c_amp_pm1).reduce_to(si.WATT)
pgtem = (pout * c_amp_out).reduce_to(si.WATT)
self.__addLoggerBlock(block, 'sg_pout_%d', 'Amplifier test sg level reading', lv, {})
self.__addLoggerBlock(block['sg_pout_%d']['parameter'], 'freq', 'the frequency [Hz]', f, {})
self.__addLoggerBlock(block, 'pfwd_%d', 'Fwd power meter reading reading', pfwd, {})
self.__addLoggerBlock(block['pfwd_%d']['parameter'], 'freq', 'the frequency [Hz]', f, {})
self.__addLoggerBlock(block['pfwd_%d']['parameter'], 'lv', 'sg level', lv, {})
self.__addLoggerBlock(block, 'pbwd_%d', 'Bwd power meter reading reading', pbwd, {})
self.__addLoggerBlock(block['pbwd_%d']['parameter'], 'freq', 'the frequency [Hz]', f, {})
self.__addLoggerBlock(block['pbwd_%d']['parameter'], 'lv', 'sg level', lv, {})
self.__addLoggerBlock(block, 'c_sg_amp', 'Correction from sg to amp', c_sg_amp, {})
self.__addLoggerBlock(block['c_sg_amp']['parameter'], 'freq', 'the frequency [Hz]', f, {})
self.__addLoggerBlock(block, 'c_amp_pm1', 'Correction from amp to pm1', c_amp_pm1, {})
self.__addLoggerBlock(block['c_amp_pm1']['parameter'], 'freq', 'the frequency [Hz]', f, {})
self.__addLoggerBlock(block, 'c_out_pm2', 'Correction from out to pm2', c_out_pm2, {})
self.__addLoggerBlock(block['c_out_pm2']['parameter'], 'freq', 'the frequency [Hz]', f, {})
self.__addLoggerBlock(block, 'c_amp_out', 'Correction from amp to out', c_amp_out, {})
self.__addLoggerBlock(block['c_amp_out']['parameter'], 'freq', 'the frequency [Hz]', f, {})
self.__addLoggerBlock(block, 'amp_pin_%d', 'Amplifier test input level reading', pin, {})
self.__addLoggerBlock(block['amp_pin_%d']['parameter'], 'freq', 'the frequency [Hz]', f, {})
self.__addLoggerBlock(block['amp_pin_%d']['parameter'], 'lv', 'sg level', lv, {})
self.__addLoggerBlock(block, 'amp_pout_%d', 'Amplifier test output level reading', pout, {})
self.__addLoggerBlock(block['amp_pout_%d']['parameter'], 'freq', 'the frequency [Hz]', f, {})
self.__addLoggerBlock(block['amp_pout_%d']['parameter'], 'lv', 'sg level', lv, {})
sg_pout = self.__insert_it(sg_pout, lv, pfwd, pbwd, f, lv)
amp_pin = self.__insert_it(amp_pin, pin, pfwd, pbwd, f, lv)
amp_pout = self.__insert_it(amp_pout, pout, pfwd, pbwd, f, lv)
mg.RFOff_Devices()
self.messenger(util.tstamp() + " Amplifier test measurement done.", [])
for log in self.logger:
log(block)
self._HandleUserInterrupt(locals())
# END OF f LOOP
lowBatList = mg.getBatteryLow_Devices()
if len(lowBatList):
self.messenger(util.tstamp() + " WARNING: Low battery status detected for: %s" % (str(lowBatList)), [])
self.rawData[description].update({'sg_pout': sg_pout})
self.rawData[description].update({'amp_pin': amp_pin})
self.rawData[description].update({'amp_pout': amp_pout})
# autosave class instance
if self.asname and (time.time() - self.lastautosave > self.autosave_interval):
self.messenger(util.tstamp() + " autosave ...", [])
self.do_autosave()
self.messenger(util.tstamp() + " ... done", [])
# Amplifier test finished
finally:
# finally is executed if and if not an exception occur -> save exit
stat = self._finalize_measurement_devices(mg, do_rfoff=True, do_quit=True)
self.messenger(util.tstamp() + " End of Amplifier test Measurement. Status: %d" % stat, [])
self.PostUserEvent()
return stat
def __insert_it(self, field, value, pf, pb, f, sglv, dct=None):
"""
Inserts a value in a field.
field: '3D' dictionary of a list of dicts ;-)
e.g.: amp_pout[f][sglv] is a list [{'value': vector of Quantities, 'pfwd': Quantity, 'pbwd': Quantity}, ...]
f: frequency (float)
"""
field.setdefault(f, {})
field[f].setdefault(repr(sglv), [])
field[f][repr(sglv)].append({'value': value, 'pfwd': pf, 'pbwd': pb})
if dct is not None:
field[f][repr(sglv)][-1].update(dct)
return field
def __addLoggerBlock(self, parent, key, comment, val, parameter):
"""
Helper function to add a block for the logger function(s).
parent must be a dict
key is used as key
parent[key] results in a dict like {'comment' comment, 'value': val, 'parameter': parameter}
parameter should be a dict of the same form as parent or an empty dict
"""
parent[key] = {}
parent[key]['comment'] = comment
parent[key]['value'] = val
parent[key]['parameter'] = parameter
def OutputIniFile(self, description=None, fname=None, driver="amplifier.py", gpib=1):
"""OutputIniFile method."""
if fname is None:
out = sys.stdout
else:
out = open(fname, 'w')
pd = self.processedData
if description not in pd:
# return silently
return 0
pdd = pd[description]
gain = pdd['gain']
ic1 = pdd['input_compression_1dB']
ic3 = pdd['input_compression_3dB']
oc1 = pdd['output_compression_1dB']
oc3 = pdd['output_compression_3dB']
freqs = sorted(gain.keys())
header = """[description]
DESCRIPTION = %s
TYPE = AMPLIFIER
VENDOR =
SERIALNR =
DEVICEID =
DRIVER = %s
[INIT_VALUE]
FSTART = %f
FSTOP = %f
FSTEP = 0.0
NR_OF_CHANNELS = 2
GPIB = %d
VIRTUAL = 0
"""
gaintmpl = """[CHANNEL_1]
NAME = S21
UNIT = dB
INTERPOLATION = LOG
FILE = io.StringIO(format_block('''
FUNIT: Hz
UNIT: dB
ABSERROR: 0.0
"""
maxintmpl = """[CHANNEL_2]
NAME = MAXIN
UNIT = dBm
INTERPOLATION = LOG
FILE = io.StringIO(format_block('''
FUNIT: Hz
UNIT: dBm
ABSERROR: 0.0
"""
out.write(header % (description, driver, freqs[0], freqs[-1], gpib))
out.write(gaintmpl)
for f in freqs:
g = gain[f][0].get_expectation_value_as_float()
g = 10 * np.log10(g)
out.write(' %f %f\n' % (f, g))
out.write(" '''))\n")
out.write(maxintmpl)
for f in freqs:
g = ic1[f][0].get_expectation_value_as_float()
g = 30 + 10 * np.log10(g)
out.write(' %f %f\n' % (f, g))
out.write(" '''))\n")
if fname:
out.close()
def GetGainAndCompression(self, description=None, small_signal_factor=10):
"""GetGainAndCompression method."""
rd = self.rawData[description]
pd = {}
pdg = pd.setdefault('gain', {})
pdic1 = pd.setdefault('input_compression_1dB', {})
pdic3 = pd.setdefault('input_compression_3dB', {})
pdoc1 = pd.setdefault('output_compression_1dB', {})
pdoc3 = pd.setdefault('output_compression_3dB', {})
freqs = list(rd['amp_pin'].keys())
r = re.compile(r'[(Quantity)\(, \)]*')
idx = 0
while True:
allprocessed = False
for f in sorted(freqs): # freqs are all frequencies in RawData
pinlst = []
poutlst = []
pin = rd['amp_pin'][f]
pout = rd['amp_pout'][f]
pdg.setdefault(f, [])
pdic1.setdefault(f, [])
pdic3.setdefault(f, [])
pdoc1.setdefault(f, [])
pdoc3.setdefault(f, [])
sglvs = list(pin.keys()) # e.g. "Quantity(W, 1e-6)"
u_l = [r.split(lv)[1:3] for lv in sglvs]
for u, lv in sorted(u_l, key=lambda l: float(l[1])): # unit, level
sgkey = "Quantity(%s, %s)" % (u, lv)
try:
pinlst.append(pin[sgkey][idx]['value'])
poutlst.append(pout[sgkey][idx]['value'])
except IndexError:
pass
if pinlst and len(pinlst) == len(poutlst): # list complete for this freq
# process list
gain, offset, pinc1, poutc1, pinc3, poutc3 = self._get_gain_compression(pinlst, poutlst,
small_signal_factor=small_signal_factor)
# dbgain=10*np.log10(gain.get_expectation_value_as_float())
# dbpinc1, dbpoutc1, dbpinc3, dbpoutc3 = [10*np.log10(x.get_expectation_value_as_float()*1000) for x in (pinc1, poutc1, pinc3, poutc3)]
# print f, dbgain, dbpinc1, dbpoutc1, dbpinc3, dbpoutc3, dbgain-(dbpoutc1-dbpinc1), dbgain-(dbpoutc3-dbpinc3)
pdg[f].append(gain)
pdic1[f].append(pinc1)
pdic3[f].append(pinc3)
pdoc1[f].append(poutc1)
pdoc3[f].append(poutc3)
allprocessed = True # all processed for this freq
if not allprocessed:
# reached when idx gets to large (no sg for that idx)
del pdg[f]
del pdic1[f]
del pdic3[f]
del pdoc1[f]
del pdoc3[f]
break # no more sg in list -> escape while-True-loop
idx += 1
self.processedData[description] = pd.copy()
def _get_gain_compression(self, pin, pout, small_signal_factor=10):
in_unit = pin[0]._unit
out_unit = pout[0]._unit
assert (in_unit.get_dimension() == out_unit.get_dimension())
pin_vals = [abs(p.get_expectation_value_as_float()) for p in pin]
pout_vals = [abs(p.get_expectation_value_as_float()) for p in pout]
pin_ss = [pi for pi in pin_vals if pi <= pin_vals[0] * small_signal_factor]
pout_ss = pout_vals[:len(pin_ss)]
# gain, offset = np.polyfit(pin_ss, pout_ss, 1)
gain, offset = np.polynomial.Polynomial.fit(pin_ss, pout_ss, 1)
ideal = lambda pi: offset + gain * pi # linear
orig = extrap1d(interp1d(pin_vals, pout_vals))
# c1func = lambda pi: abs(ideal(pi)-orig(pi)*1.259) # 1 dB
# c3func = lambda pi: abs(ideal(pi)-orig(pi)*1.995) # 3 dB
# pinc1 = fminbound(c1func, 0.5*pin_vals[-1], pin_vals[-1])[0]
# pinc3 = fminbound(c3func, pinc1, pin_vals[-1])[0]
c1func = lambda pi: abs(ideal(pi) - orig(pi) * 1.259) # 1 dB
c3func = lambda pi: abs(ideal(pi) - orig(pi) * 1.995) # 3 dB
lower = pin_ss[-1]
for _ in range(100):
# pinc1 = fminbound(c1func, lower, 10 * pin_vals[-1], xtol=1e-7, maxfun=1000)[0]
pinc1_result = minimize_scalar(c1func,
bracket=(lower,10 * pin_vals[-1]),
bounds=(lower,10 * pin_vals[-1]),
method='bounded',
tol=1e-7,
options={'maxiter': 1000})
pinc1 = pinc1_result.get('x')[0]
if c1func(pinc1) > 0.01:
# print pinc1, c1func(pinc1)
lower = pinc1
else:
break
# pinc3 = fminbound(c3func, pinc1, 10 * pin_vals[-1], xtol=1e-7, maxfun=1000)[0]
pinc3_result = minimize_scalar(c3func,
bracket=(pinc1, 10 * pin_vals[-1]),
bounds=(pinc1, 10 * pin_vals[-1]),
method='bounded',
tol=1e-7,
options={'maxiter': 1000})
pinc3 = pinc3_result.get('x')[0]
poutc1 = float(orig(pinc1))
poutc3 = float(orig(pinc3))
# make quantities
pinc1 = quantities.Quantity(in_unit, pinc1)
pinc3 = quantities.Quantity(in_unit, pinc3)
poutc1 = quantities.Quantity(out_unit, poutc1)
poutc3 = quantities.Quantity(out_unit, poutc3)
gain = quantities.Quantity(out_unit / in_unit, gain)
offset = quantities.Quantity(out_unit, offset)
return gain, offset, pinc1, poutc1, pinc3, poutc3
# pin_vals=10*np.log10(pin_vals)
# pout_vals=10*np.log10(pout_vals)
# pin_ss=[pi for pi in pin_vals if pi <= pin_vals[0]+np.log10(small_signal_factor)]
# pout_ss = pout_vals[:len(pin_ss)]
# gain, offset = np.polyfit(pin_ss, pout_ss, 1)
# ideal = lambda pi: offset+gain*pi
# orig = interp1d(pin_vals, pout_vals)
# c1func = lambda pi: ideal(pi)-orig(pi)-1. # 1 dB
# c3func = lambda pi: ideal(pi)-orig(pi)-3. # 3 dB
# pinc1 = fminbound(c1func, pin_vals[0], pin_vals[-1])[0]
# pinc3 = fminbound(c3func, pinc1, pin_vals[-1])[0]
# poutc1=float(orig(pinc1))
# poutc3=float(orig(pinc3))
# return offset, gain, 10**(pinc1*0.1), 10**(poutc1*0.1), 10**(pinc3*0.1), 10**(poutc3*0.1)
if __name__ == '__main__':
import pickle
from numpy import linspace
from scuq.quantities import Quantity
from scuq.si import WATT
dot = 'gtem-immunity.dot'
# keys: names in program, values: names in graph
names = {'sg': 'sg',
'amp_in': 'amp_in',
'amp_out': 'amp_out',
'pm_fwd': 'pm1',
'pm_bwd': 'pm2',
'output': 'gtem'}
AT = AmplifierTest()
AT.set_logfile('at.log')
AT.Measure(description="IFI SMX25 Band1",
dotfile=dot,
names=names,
freqs=linspace(10e3, 200e6, 10),
levels=[Quantity(WATT, dBm2W(dBmval)) for dBmval in linspace(-30, 0, 3)])
pickle.dump(AT, open('at.p', 'wb'), 2)