From 5d40a130d5a16e15997aeddf307d85cc7280de1b Mon Sep 17 00:00:00 2001 From: Gaurav Agarwal Date: Thu, 18 Jun 2026 18:03:56 -0400 Subject: [PATCH 1/3] Added drivers for minicircuits switches, QICK RFBoard, and a less fragile VNA driver (does not crash with multi traces) --- .../Keysight/Keysight_P9374A_GA.py | 563 +++++++++++++ .../MiniCircuits_SwitchMatrix_RC-8SPDT-A18.py | 202 +++++ .../qcodes_drivers/QICK/QickSoC_RFBoard.py | 788 ++++++++++++++++++ 3 files changed, 1553 insertions(+) create mode 100644 src/cqedtoolbox/instruments/qcodes_drivers/Keysight/Keysight_P9374A_GA.py create mode 100644 src/cqedtoolbox/instruments/qcodes_drivers/MiniCircuits/MiniCircuits_SwitchMatrix_RC-8SPDT-A18.py create mode 100644 src/cqedtoolbox/instruments/qcodes_drivers/QICK/QickSoC_RFBoard.py diff --git a/src/cqedtoolbox/instruments/qcodes_drivers/Keysight/Keysight_P9374A_GA.py b/src/cqedtoolbox/instruments/qcodes_drivers/Keysight/Keysight_P9374A_GA.py new file mode 100644 index 0000000..17949e6 --- /dev/null +++ b/src/cqedtoolbox/instruments/qcodes_drivers/Keysight/Keysight_P9374A_GA.py @@ -0,0 +1,563 @@ +# -*- coding: utf-8 -*- +""" +A driver to control the Keysight VNA P9374A using pyVISA and qcodes +@author: Hatlab: Ryan Kaufman, Gaurav Agarwal; UIUC: Wolfgang Pfaff + +""" + +import logging +from typing import Any, Union, Dict, List, Tuple + +import numpy as np +from qcodes import (VisaInstrument, Parameter, ParameterWithSetpoints, InstrumentChannel, validators as vals) +from qcodes.instrument.parameter import ParamRawDataType +import time +""" +Some basic concepts for this VNA: + +Measurement: + The VNA object that actually holds the information of measurement of the device. It will not automatically + show up on the VNA software interface when created. +Trace: + The VNA object that can be shown on the VNA software, needs to link to a certain measurement. + As a working definition in our setting, we can think the trace and measurement are equivalent as there are always + created together. +Channel: + One channel can hold many traces. So that the common settings (frequency range, number of points, etc...) + can be applied to all the measurement. In this driver we are only use one channel. More needs to do if we want to + support multiple channel functions. +Window: + Place to show the selected traces on the VNA software. + +The settings that are independent to each trace/measurement (S-parameters, trace data) will be method/parameters for +Trace object while common settings for the whole channel will be method/parameters for the Keysight_P9374A_SingleChannel +object. + +For example: + vna.trace_1.s_parameter() + get the S-parameter for the first trace/measurement. + + vna.fstart() + get the start frequency for the whole channel. + +""" + + +class SParameterData(Parameter): + """ + Qcodes parameter that can be used to set/get the S-parameter of this measurement. + """ + + def __init__(self, trace_number: int, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.trace_number = trace_number + + def get_raw(self) -> ParamRawDataType: + _, traces, _ = self.root_instrument.get_existing_traces() + if self.instrument.npts() == 0 or self.trace_number not in traces: + return 'Trace is not on' + + data = self.root_instrument.ask(f":CALC1:MEAS{self.trace_number}:PAR?").strip('"') + return data + + def set_raw(self, S_parameter: str = 'S21') -> None: + _, traces, _ = self.root_instrument.get_existing_traces() + if self.instrument.npts() == 0 or self.trace_number not in traces: + return 'Trace is not on' + + self.root_instrument.write(f":CALC1:MEAS{self.trace_number}:PAR {S_parameter}") + + +class FrequencyData(Parameter): + """ + Qcodes parameter that can be used to get the frequency data of this measurment. + """ + + def __init__(self, trace_number: int, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.trace_number = trace_number + + def get_raw(self) -> ParamRawDataType: + _, traces, _ = self.root_instrument.get_existing_traces() + if self.instrument.npts() == 0 or self.trace_number not in traces: + return np.array([]) + + data = self.root_instrument.ask(f"CALC:MEAS{self.trace_number}:X?") + return np.array(data.split(',')).astype(float) + + +class TraceData(ParameterWithSetpoints): + """ + Qcodes ParameterWithSetpoints that can be used to get the data of this measurement. + VNA will be set to polar format "POL" to acquire data by default. + If the VNA is in a different measurement format, it will be reset to that format after measurement, other wise VNA + will remain in format of self.data_fmt after the measurement. + """ + + def __init__(self, trace_number: int, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.trace_number = trace_number + self.data_fmt = "POL" + + def get_raw(self) -> ParamRawDataType: + _, traces, _ = self.root_instrument.get_existing_traces() + if self.instrument.npts() == 0 or self.trace_number not in traces: + return np.array([]) + + # get the values of relevant parameters before taking the trace + prev_fmt = None + if self.data_fmt is not None: + prev_fmt = self.root_instrument.ask(f"CALC:MEAS{self.trace_number}:FORM?") + self.root_instrument.write(f"CALC:MEAS{self.trace_number}:FORM {self.data_fmt}") + + # Code will check if VNA average trpe is SWEEP. + # If not a type error will be raised. + try: + prev_trigger_source, prev_sweep_mode, prev_averaging = self.root_instrument.average() + data = self.root_instrument.ask(f"CALC:MEAS{self.trace_number}:DATA:FDATA?") + # set relevant parameters back to their old values + self.root_instrument.trigger_source(prev_trigger_source) + self.root_instrument.sweep_mode(prev_sweep_mode) + self.root_instrument.averaging(prev_averaging) + if prev_fmt is not None: + self.root_instrument.write(f"CALC:MEAS{self.trace_number}:FORM {prev_fmt}") + + # process complex data correctly + data = np.array(data.split(',')).astype(float) + if self.data_fmt in ['POL'] and data.size % 2 == 0: + data = data.reshape((int(data.size / 2), 2)) + data = data[:, 0] + 1j * data[:, 1] + return data + + except Exception as e: + print(f"Data taking failed. {type(e)}: {e.args}") + return np.zeros(self.instrument.npts()) + + +class Trace(InstrumentChannel): + """ + A Qcode InstrumentChannel creates from the parameter instrument: Keysight_P9374A_SingleChannel. + + This InstrumentChannel contains all the parameters and functions that are unique for each different + trace/measurement. + """ + + def __init__(self, parent: "Keysight_P9374A_SingleChannel", number: int, name: str, **kwargs: Any): + self._number = number + super().__init__(parent, name=name, **kwargs) + + self.add_parameter( + name='npts', + unit='', + get_cmd=self._get_npts, + docstring='number of points in the trace', + ) + + self.add_parameter( + name='frequency', + unit='Hz', + parameter_class=FrequencyData, + trace_number=self._number, + vals=vals.Arrays(shape=(self.npts.get_latest,)), + snapshot_exclude=True, + ) + + self.add_parameter( + name='data', + unit='', + setpoints=(self.frequency,), + parameter_class=TraceData, + trace_number=self._number, + vals=vals.Arrays(shape=(self.npts.get_latest,), + valid_types=(np.floating, np.complexfloating)), + snapshot_exclude=True, + ) + + self.add_parameter( + name='s_parameter', + unit='', + parameter_class=SParameterData, + trace_number=self._number, + vals=vals.Enum('S11', 'S12', 'S21', 'S22'), + get_parser=str + ) + + def _get_npts(self): + return len(self._get_xdata()) + + def _get_xdata(self) -> np.ndarray: + _, traces, _ = self.root_instrument.get_existing_traces() + if self._number not in traces: + return np.array([]) + data = self.ask(f"CALC:MEAS{self._number}:X?") + return np.array(data.split(',')).astype(float) + + +class Keysight_P9374A_SingleChannel(VisaInstrument): + """ + This is a very simple driver for the Keysight_P9374A Vector Network Analyzer + Performs basic manipulations of parameters and data acquisition + + Note: this version does not include a way of averaging via a BUS trigger + + """ + + def __init__(self, name, address=None, **kwargs): + + """ + Initializes the Keysight_P9374A, and communicates with the wrapper. + + Input: + name (string) : name of the instrument + address (string) : GPIB address + reset (bool) : resets to default values, default=False + """ + if address is None: + raise Exception('TCP IP address needed') + logging.info(__name__ + ' : Initializing instrument $@$&% PNA') + + super().__init__(name, address, terminator='\n', **kwargs) + + self.write('CALC1:PAR:MNUM 1') # sets the active msmt to the first channel/trace + self.del_time = 0.2 # From Hat_P9374A_RK + self.last_msmt_msg = None + # Add in parameters + self.add_parameter('fstart', + get_cmd=':SENS1:FREQ:STAR?', + set_cmd=':SENS1:FREQ:STAR {}', + vals=vals.Numbers(), + get_parser=float, + unit='Hz' + ) + self.add_parameter('fstop', + get_cmd=':SENS1:FREQ:STOP?', + set_cmd=':SENS1:FREQ:STOP {}', + vals=vals.Numbers(), + get_parser=float, + unit='Hz' + ) + self.add_parameter('fcenter', + get_cmd=':SENS1:FREQ:CENT?', + set_cmd=':SENS1:FREQ:CENT {}', + vals=vals.Numbers(), + get_parser=float, + unit='Hz' + ) + self.add_parameter('fspan', + get_cmd=':SENS1:FREQ:SPAN?', + set_cmd=':SENS1:FREQ:SPAN {}', + vals=vals.Numbers(), + get_parser=float, + unit='Hz' + ) + self.add_parameter('rfout', + get_cmd=':OUTP?', + set_cmd=':OUTP {}', + vals=vals.Ints(0, 1), + get_parser=int + ) + self.add_parameter('num_points', + get_cmd=':SENS1:SWE:POIN?', + set_cmd=':SENS1:SWE:POIN {}', + vals=vals.Ints(1, 100000), + get_parser=int + ) + self.add_parameter('ifbw', + get_cmd=':SENS1:BWID?', + set_cmd=':SENS1:BWID {}', + vals=vals.Numbers(10, 1.5e6), + get_parser=float) + self.add_parameter('power', + get_cmd=":SOUR1:POW?", + set_cmd=":SOUR1:POW {}", + unit='dBm', + get_parser=float, + vals=vals.Numbers(-85, 20) + ) + self.add_parameter('power_start', + get_cmd=':SOUR1:POW:STAR?', + set_cmd=':SOUR1:POW:STAR {}', + unit='dBm', + get_parser=float, + vals=vals.Numbers(-85, 10) + ) + self.add_parameter('power_stop', + get_cmd=':SOUR:POW:STOP?', + set_cmd=':SOUR1:POW:STOP {}', + unit='dBm', + get_parser=float, + vals=vals.Numbers(-85, 10)) + self.add_parameter('averaging', + get_cmd=':SENS1:AVER?', + set_cmd=':SENS1:AVER {}', + get_parser=int, + vals=vals.Ints(0, 1) + ) + # TODO: this throws an error currently. + # self.add_parameter('average_trigger', + # get_cmd=':TRIG:AVER?', + # set_cmd=':TRIG:AVER {}', + # get_parser=int, + # vals=vals.Ints(0, 1) + # ) + + self.add_parameter('avg_num', + get_cmd='SENS1:AVER:COUN?', + set_cmd='SENS1:AVER:COUN {}', + vals=vals.Ints(1), + get_parser=int + ) + self.add_parameter('avg_type', + get_cmd='SENS1:AVER:MODE?', + set_cmd='SENS1:AVER:MODE {}', + vals=vals.Enum('POIN', 'SWEEP'), + get_parser=str + ) + self.add_parameter('phase_offset', + get_cmd='CALC1:CORR:OFFS:PHAS?', + set_cmd='CALC1:CORR:OFFS:PHAS {}', + get_parser=float, + vals=vals.Numbers()) + self.add_parameter('electrical_delay', + get_cmd='CALC1:CORR:EDEL:TIME?', + set_cmd='CALC1:CORR:EDEL:TIME {}', + unit='s', + get_parser=float, + vals=vals.Numbers() + ) + self.add_parameter('trigger_source', + get_cmd='TRIG:SOUR?', + set_cmd='TRIG:SOUR {}', + vals=vals.Enum('IMM', 'EXT', 'MAN') + ) + self.add_parameter('sweep_mode', + get_cmd='SENS1:SWE:MODE?', + set_cmd='SENS1:SWE:MODE {}', + vals=vals.Enum('HOLD', + 'CONT', + 'GRO', + 'SING')) + self.add_parameter('trigger_mode', + get_cmd='SENS:SWE:TRIG:MODE?', + set_cmd='SENS:SWE:TRIG:MODE {}', + vals=vals.Enum('CHAN', 'SWE', 'POIN', 'TRAC') + ) + self.add_parameter('trform', + get_cmd=':CALC1:FORM?', + set_cmd=':CALC1:FORM {}', + vals=vals.Enum('MLOG', 'PHAS', + 'GDEL', + 'SCOM', 'SMIT', 'SADM', + 'POL', 'MLIN', + 'SWR', 'REAL', 'IMAG', + 'UPH', 'PPH', 'SLIN', 'SLOG', ) + ) + self.add_parameter('math', + get_cmd=':CALC1:MATH:FUNC?', + set_cmd=':CALC1:MATH:FUNC {}', + vals=vals.Enum('ADD', 'SUBT', 'DIV', 'MULT', 'NORM') + ) + self.add_parameter('sweep_type', + get_cmd=':SENS1:SWE:TYPE?', + set_cmd=':SENS1:SWE:TYPE {}', + vals=vals.Enum('LIN', 'LOG', 'SEGM', 'POW') + ) + self.add_parameter('correction', + get_cmd=':SENS1:CORR:STAT?', + set_cmd=':SENS1:CORR:STAT {}', + get_parser=int) + self.add_parameter('smoothing', + get_cmd=':CALC1:SMO:STAT?', + set_cmd=':CALC1:SMO:STAT {}', + get_parser=float + ) + self.add_parameter('sweep_time', + get_cmd=':SENS1:SWE:TIME?', + set_cmd=None, # generally just adjust ifbw and number of pts to change it, + get_parser=float, + unit='s' + ) + + for i in range(1, 17): + trace = Trace(self, number=i, name=f"trace_{i}") + self.add_submodule(f"trace_{i}", trace) + + self.connect_message() + + def clear_all_traces(self): + """remove all currently defined traces.""" + self.write("CALC:MEAS:DEL:ALL") + + def get_existing_traces_by_channel(self) -> Dict[int, List[Tuple[int, str]]]: + """Returns all currently available traces. + Assumes that traces/measurements do not have custom names not ending with the + measurement number. + + Returns + A dictionary, with keys being the channel indices that have traces in them. + values are tuples of trace/measurement number and parameter measured. + """ + ret = {} + for i in range(1, 9): + traces = self.ask(f"CALC{i}:PAR:CAT:EXT?").strip('"') + if traces == "NO CATALOG": + continue + else: + ret[i] = [] + traces = traces.split(',') + names = traces[::2] + params = traces[1::2] + for n, p in zip(names, params): + ret[i].append((int(n.split('_')[-1]), p)) + return ret + + def get_existing_traces(self) -> Tuple[List[int], List[int], List[str]]: + """ + Return three lists, with one item per current trace: channel, trace/measurement number, parameter + """ + chans, numbers, params = [], [], [] + trace_dict = self.get_existing_traces_by_channel() + for chan, traces in trace_dict.items(): + for number, param in traces: + chans.append(chan) + numbers.append(number) + params.append(param) + return chans, numbers, params + + def get_sweep_data(self): + """ + Gets stimulus data in displayed range of active measurement, returns array + Will return different data depending on sweep type. + + For example: + power sweep: 1xN array of powers in dBm + frequency sweep: 1xN array of freqs in Hz + Input: + None + Output: + sweep_values (Hz, dBm, etc...) + """ + logging.info(__name__ + ' : get stim data') + strdata = str(self.ask(':SENS1:X:VAL?')) + return np.array(list(map(float, strdata.split(',')))) + + def data_to_mem(self): + """ + Calls for data to be stored in memory + """ + logging.debug(__name__ + ": data to mem called") + self.write(":CALC1:MATH:MEM") + + def remove_trace(self, number: int): + """ + Remove selected trace + + Note that when removing a new trace, vna will restart average. + """ + _, traces, _ = self.get_existing_traces() + if number not in traces: + print('Trace does not exist. Nothing happens.') + else: + logging.debug(__name__ + f": remove trace{number}") + self.write(f"CALC1:MEAS{number}:DEL") + print('Trace is successfully removed.') + + def add_trace(self, number: int = 1, s_parameter: str = "S21"): + """ + Adds a trace with a specific s_parameter + + Note that when adding a new trace, vna will restart average. + """ + _, traces, _ = self.get_existing_traces() + if number in traces: + print('Trace exist. Please use another trace number or remove the current one with remove_trace(number).') + else: + logging.debug(__name__ + f": add trace{number} with S-parameter {s_parameter}") + self.write(f"CALC1:MEAS{number}:DEF '{s_parameter}'") + self.write(f"DISP:MEAS{number}:FEED 1") # always show this trace in the window 1 (FEED number). + print('Trace is successfully created.') + + def average(self) -> Tuple[str, str, int]: + # def average(self, number: int = None, avg_over_freq: bool = False) -> Tuple[np.ndarray, str, str, int]: + """ + Do the average number (if provided) else, self.avg_num() times. + + Read the trigger settings (trigger source and mode) before doing the average and return them. + During the average the VNA will be in manual trigger source with single trigger mode. + A single trigger signal is generated with 'INIT:IMM' for self.avg_num() times to complete the whole average + process. + + Note that after the average the VNA will remain in the trigger settings for the average process. + This is to give time for user to use another command to take the data from the trace/measurement. + + The trace.data() will automatically take the old settings and put them back. But you can also just take the + return values of this function and reset by yourself. + + Will check if VNA average type is in SWEEP. If not, a type error will be raised. + """ + # if number is None: + # number = self.avg_num() + # else: + # self.avg_num(number) + # assert number > 0 + if self.avg_type() == 'POIN': + raise TypeError( + 'VNA average type is set to POINT, neeed to be SWEEP. Use vna.avg_type() function to change') + + prev_trigger_source = self.trigger_source() + prev_sweep_mode = self.sweep_mode() + prev_averaging = self.averaging() + # The following trigger settings are necessary for VNA to take the average + self.trigger_source('MAN') + self.sweep_mode('SING') + self.averaging(1) #turn on averaging + self.write("SENS:AVER:CLE") # does not apply to point averaging + total_time = self.sweep_time() * self.avg_num() + 0.5 + print(f"Waiting {np.round(total_time, 1)}s for {self.avg_num()} averages...") + for i in range(self.avg_num()): + self.write('INIT:IMM') + averaged = 0 + while averaged == 0: + averaged = self.ask("*OPC?") + print('Average completed') + # if avg_over_freq: + # data = np.average(self.trace_1.data(), axis=1).reshape((2, 1)) + # else: + # data = self.trace_1.data() + self.trigger_source(prev_trigger_source) + self.sweep_mode(prev_sweep_mode) + self.averaging(prev_averaging) + + + return prev_trigger_source, prev_sweep_mode, prev_averaging + + def clear_averages(self) -> None: + """Reset averaging and wait for new trigger to start over.""" + self.write('SENS:AVER:CLE') + + def trigger(self): + self.write(':TRIG:SING') + return None + + def set_to_manual(self): + self.rfout(1) + self.averaging(1) + self.avg_num(3) + self.trform('MLOG') + self.trigger_source('IMM') + self.avg_type('SWEEP') + + def renormalize(self, num_avgs: int, pwr_bump: float = 0): + self.power(self.power() + pwr_bump) + self.averaging(1) + self.avg_num(num_avgs) + self.prev_elec_delay = self.electrical_delay() + s_per_trace = self.sweep_time() + wait_time = s_per_trace * num_avgs * 1.3 + 2 + print(f'Renormalizing, waiting {wait_time} seconds for averaging...') + time.sleep(wait_time) + self.data_to_mem() + self.math('DIV') + self.electrical_delay(0) + self.power(self.power() - pwr_bump) + self.set_to_manual() \ No newline at end of file diff --git a/src/cqedtoolbox/instruments/qcodes_drivers/MiniCircuits/MiniCircuits_SwitchMatrix_RC-8SPDT-A18.py b/src/cqedtoolbox/instruments/qcodes_drivers/MiniCircuits/MiniCircuits_SwitchMatrix_RC-8SPDT-A18.py new file mode 100644 index 0000000..f560606 --- /dev/null +++ b/src/cqedtoolbox/instruments/qcodes_drivers/MiniCircuits/MiniCircuits_SwitchMatrix_RC-8SPDT-A18.py @@ -0,0 +1,202 @@ +# -*- coding: utf-8 -*- +""" +@author: Chao Zhou, Gaurav Agarwal + +A simple driver for controlling multiple MiniCircuits RC-8SPDT-A18 switch matrices as one big matrix using QCoDes, +transferred from the one written by Xi Cao and Pinlei Lu. + + +""" + +import logging +# from functools import partial +from typing import Union, List +from qcodes import Instrument +from urllib.request import urlopen + +class MiniCircuits_SwitchMatrix_Multi(Instrument): + def __init__(self, name:str, name_list:List[str], address_list:List[str], mode_dict:dict={}, reset=False, **kwargs): + """ + :param name: name of all the switches as one instrument + :param name_list: list of individual names of each switch matrix + :param address_list: list of address + :param mode_dict: dictionary that contains the preset modes + :param reset: + :param kwargs: + """ + super().__init__(name, **kwargs) + logging.info(__name__ + ' : Initializing MiniCircuits RC-8SPDT-A18') + self._address_list = address_list + self._mode_dict = mode_dict + self.switch_dict: {str: MiniCircuits_SwitchMatrix} = {} + for i, swt_name in enumerate(name_list): + swt_ = MiniCircuits_SwitchMatrix(swt_name, address_list[i]) + setattr(self, swt_name, swt_) + self.switch_dict[swt_name] = swt_ + + self.add_parameter('portvalue_dict', + label='portvalue_dict', + get_cmd=self.do_get_portvalue_dict, + set_cmd=self.do_set_portvalue_dict) + + self.add_parameter('mode', + label='mode', + get_cmd=self.do_get_mode, + set_cmd=self.do_set_mode) + + self.add_parameter('available_modes', + label='available_modes', + get_cmd=self.get_mode_options, + set_cmd=self.update_mode_dict) + + # self.add_function('modify_add_mode', + # label='modify_add_mode', + # set_cmd=self.add_mode) + + # self.add_function('remove_mode', + # label='remove_mode', + # set_cmd=self.remove_mode) + + if reset: + self.reset() + + def do_get_portvalue_dict(self): + port_value_dict = {} + for swt_name, swt_ in self.switch_dict.items(): + port_value_dict[swt_name] = swt_.portvalue() + return port_value_dict + + def do_set_portvalue_dict(self, port_value_dict:dict): + for swt_name, port_value in port_value_dict.items(): + self.switch_dict[swt_name].set_switch("P", port_value) + + + def do_get_mode(self): + current_states = self.portvalue_dict() + matched_modes = [] + for mode_name, mode in self._mode_dict.items(): + mode_match = True + for i, (swt_name, portvalue_) in enumerate(current_states.items()): + for j, s in enumerate(mode[i]): + if (s in ["0", "1"]) and (s != portvalue_[j]): + mode_match = False + break + if mode_match: + matched_modes.append(mode_name) + return matched_modes + + + def do_set_mode(self, mode_name): + current_states = self.portvalue_dict() + if mode_name in self._mode_dict: + for i, (swt_name, swt_) in enumerate(self.switch_dict.items()): + swt_.set_switch(self._create_new_mode_string(current_states[swt_name], self._mode_dict[mode_name][i])) + else: + print( 'Confucius say there is no such mode. Nothing has been changed.') + + + def _create_new_mode_string(self, current_state, new_state): + if len(current_state) != len(new_state): + raise ValueError("current_state and new_state must be the same length.") + output = "" + for i in range(len(new_state)): + if (new_state[i] not in ["0", "1"]): + output += current_state[i] + else: + output += new_state[i] + return output + + def get_mode_options(self): + return self._mode_dict + + def update_mode_dict(self, mode_key_dict: dict): + self._mode_dict.update(mode_key_dict) + return + + + def set_single_switch(self, switchName:str, chanel: str, state: Union[int, str] ): + self.switch_dict[switchName].set_switch(chanel, state) + + def reset(self): + for swt_ in self.switch_dict.values(): + swt_.reset() + + +class MiniCircuits_SwitchMatrix(Instrument): + def __init__(self, name, address, reset=False, **kwargs): + ''' + Initializes the Mini_Circuits switch, and communicates with the wrapper. + + Input: + name (string) : name of the instrument + address (string) : http address + reset (bool) : resets to default values, default=False + ''' + super().__init__(name, **kwargs) + logging.info(__name__ + ' : Initializing MiniCircuits RC-8SPDT-A18') + self._address = address + self.add_parameter('portvalue', + label='portvalue', + get_cmd=self.do_get_portvalue, + set_cmd=self.set_switch) + + + if reset: + self.reset() + + def set_switch(self, state: Union[int, str], chanel:str = 'P'): + ''' + :param chanel: switch 'A' through 'H' or 'P' if you want to control all the gates at same time + :param state: 0 or 1 to choose output. 0=1 (green), 1=2 (red) + ''' + state = str(state) + logging.info(__name__ + ' : Set switch%s' % chanel +' to state %s' % state) + if chanel != 'P': + ret = urlopen(self._address + "/SET" + chanel + "=" + state) + else: + if (len(state)) != 8: + print(len(state)) + raise Exception("Wrong input length!") + newstate = 0 + for x in range(0,len(state)): + if (int(state[x]) != 0) & (int(state[x]) != 1): + raise Exception("Wrong input value at %ith" % x + " switch!") + else: + newstate += int(state[x])*(2**x) + + ret = urlopen(self._address + "/SETP" + "=" + str(newstate)) + + self.get('portvalue') + + def do_get_portvalue(self): + logging.debug(__name__+' : get portvalue') + ret = urlopen(self._address + "/SWPORT?" ) + result = ret.readlines()[0] + result = int(result) + result = format(result,'08b') + result = result[::-1] + return result + + def reset(self): + self.set_switch("P", "0" * 8) + + +if __name__ == "__main__": + modes = {'2_IN': ['xxxxxxxx', 'xxxx00xx', 'xxxxxxxx'], + '3_IN': ['xxxxxxxx', 'xxxx01xx', 'xxxxxxxx'], + '12_IN': ['xxxxxxxx', 'xxxx1xx0', 'xxxxxxxx'], + '18_IN': ['xxxxxxxx', 'xxxx1xx1', 'xxxxxxxx'], + 'A_Out': ['xxxxxxxx', '00xxxxxx', 'xxxxxxxx'], + 'B_Out': ['xxxxxxxx', '01xxxxxx', 'xxxxxxxx'], + 'H_Out': ['xxxxxxxx', '1x1xxxxx', 'xxxxxxxx'], + 'E_Out': ['xxxxxxxx', '1x0xxxxx', 'xxxxxxxx'], + 'VNAInOut': ['xxxxxxxx', 'xxx0xx0x', 'xxxxxxxx'], + 'PXIInOut': ['xxx1xxxx', 'xxx1xx1x', 'xxxxxxxx'], + 'Cav1In': ['01xxxxxx', 'xxxxxxxx', 'xxxxxxxx'], + 'Cav4In': ['11xxxxxx', 'xxxxxxxx', 'xxxxxxxx'], + 'Cav6In': ['x00xxxxx', 'xxxxxxxx', 'xxxxxxxx'], + 'SAQuCaIn': ['x010xxxx', 'xxxxxxxx', 'xxxxxxxx'] + } + SWT = MiniCircuits_SwitchMatrix_Multi('SWT',name_list=["SWT1", "SWT2", "SWT3"], + address_list=['http://169.254.254.251', 'http://169.254.254.249', 'http://169.254.254.252'], + mode_dict= modes) diff --git a/src/cqedtoolbox/instruments/qcodes_drivers/QICK/QickSoC_RFBoard.py b/src/cqedtoolbox/instruments/qcodes_drivers/QICK/QickSoC_RFBoard.py new file mode 100644 index 0000000..68577a7 --- /dev/null +++ b/src/cqedtoolbox/instruments/qcodes_drivers/QICK/QickSoC_RFBoard.py @@ -0,0 +1,788 @@ +""" +@author: Gaurav Agarwal + +QCoDes driver for QICK SoC with RF Board support. +Automatically detects available DAC/ADC channels and creates parameters for: +- Attenuators (RF signal chains) +- Filters (ADMV8818) +- DC biases +- LO synthesizers +- Gain controls + +Note: This driver is designed to work with both local and remote (Pyro4) QICK SoC instances. +Since Pyro4 does not allow direct attribute access to object properties, this driver: +- Uses configuration dictionary (_soc_cfg) for channel detection instead of object attributes +- Caches attenuator values locally since they cannot be read back through Pyro4 +- Uses method calls (_soc.rfb_set_*) for all hardware control operations +- Does not attempt to read back filter or gain states (returns defaults/cached values) + +""" + +import logging +from typing import Optional, Dict +from qcodes import Instrument, InstrumentChannel, ChannelList +from qcodes.utils.validators import Numbers, Enum, Bool + +# logger = logging.getLogger(__name__) + + +class DACChannel(InstrumentChannel): + """ + Channel class for DAC (generator) channels. + Handles RF attenuators, filters, and DC bias configuration. + """ + + def __init__(self, parent: Instrument, name: str, channel: int, gen_config: dict, **kwargs): + super().__init__(parent, name, **kwargs) + self._channel = channel + self._gen_config = gen_config + + # Cache for attenuator values (cannot read back through Pyro4) + self._att1_cache = 10.0 # Default attenuation + self._att2_cache = 10.0 + + #TODO Confirm capabilities from config + self._has_rf_chain = True + self._has_dc_chain = False + self._has_filter = True + self._has_attenuator = True + + # Add basic channel info parameters + # self.add_parameter( + # 'channel_index', + # get_cmd=lambda: self._channel, + # snapshot_value=True, + # label='Channel Index', + # docstring='Index in the gens list' + # ) + + # self.add_parameter( + # 'dac_tile', + # get_cmd=lambda: int(gen_config['dac'][0]), + # snapshot_value=True, + # label='DAC Tile', + # docstring='DAC tile number' + # ) + + # self.add_parameter( + # 'dac_block', + # get_cmd=lambda: int(gen_config['dac'][1]), + # snapshot_value=True, + # label='DAC Block', + # docstring='DAC block number' + # ) + + # self.add_parameter( + # 'sampling_freq', + # get_cmd=lambda: self.root_instrument._soc_cfg['rf']['dacs'][gen_config['dac']]['fs'], + # unit='MHz', + # snapshot_value=True, + # label='Sampling Frequency', + # docstring='DAC sampling frequency in MHz' + # ) + + # RF Chain parameters (attenuators) + if self._has_rf_chain and self._has_attenuator: + self.add_parameter( + 'rf_enabled', + get_cmd=self._get_rf_enabled, + set_cmd=self._set_rf_enabled, + vals=Bool(), + label='RF Output Enabled', + docstring='Enable/disable RF output' + ) + + self.add_parameter( + 'att1', + get_cmd=lambda: self._att1_cache, + set_cmd=lambda val: self._set_attenuator(val, None), + vals=Numbers(0, 31.75), + unit='dB', + label='Attenuator 1', + docstring='First stage attenuation (0-31.75 dB)' + ) + + self.add_parameter( + 'att2', + get_cmd=lambda: self._att2_cache, + set_cmd=lambda val: self._set_attenuator(None, val), + vals=Numbers(0, 31.75), + unit='dB', + label='Attenuator 2', + docstring='Second stage attenuation (0-31.75 dB)' + ) + + # Filter parameters + if self._has_filter: + self.add_parameter( + 'filter_enabled', + get_cmd=lambda: self._get_filter_state() != 'bypass', + set_cmd=self._set_filter_enabled, + vals=Bool(), + label='Filter Enabled', + docstring='Enable/disable filter' + ) + + self.add_parameter( + 'filter_center_freq', + get_cmd=self._get_filter_fc, + set_cmd=lambda val: self._set_filter(val, None, None), + vals=Numbers(2.0, 18.0), + unit='GHz', + label='Filter Center Frequency', + docstring='Filter center frequency in GHz' + ) + + self.add_parameter( + 'filter_bandwidth', + get_cmd=self._get_filter_bw, + set_cmd=lambda val: self._set_filter(None, val, None), + vals=Numbers(0.5, 4.0), + unit='GHz', + label='Filter Bandwidth', + docstring='Filter bandwidth in GHz' + ) + + self.add_parameter( + 'filter_type', + get_cmd=self._get_filter_type, + set_cmd=lambda val: self._set_filter(None, None, val), + vals=Enum('bandpass', 'highpass', 'lowpass', 'bypass'), + label='Filter Type', + docstring='Filter type' + ) + + # DC Chain parameters + if self._has_dc_chain: + self.add_parameter( + 'dc_enabled', + get_cmd=self._get_dc_enabled, + set_cmd=self._set_dc_enabled, + vals=Bool(), + label='DC Output Enabled', + docstring='Enable/disable DC output' + ) + + def _get_rf_enabled(self) -> bool: + """Check if RF output is enabled.""" + # TODO: RF is considered enabled + return True + + def _set_rf_enabled(self, enable: bool): + """Enable or disable RF output.""" + if enable: + # Enable with default attenuation values (10 dB each) + # Cannot read back current values through Pyro4 + self.root_instrument._soc.rfb_set_gen_rf(self._channel, 10, 10) + else: + # Disable by setting max attenuation + self.root_instrument._soc.rfb_set_gen_rf(self._channel, 31.75, 31.75) + + def _set_attenuator(self, att1: Optional[float], att2: Optional[float]): + """Set attenuator values.""" + # Use cached values as defaults + new_att1 = att1 if att1 is not None else self._att1_cache + new_att2 = att2 if att2 is not None else self._att2_cache + + # Update cache + if att1 is not None: + self._att1_cache = att1 + if att2 is not None: + self._att2_cache = att2 + + self.root_instrument._soc.rfb_set_gen_rf(self._channel, new_att1, new_att2) + + def _get_filter_state(self) -> str: + """Get current filter state.""" + try: + if hasattr(self._rfb_ch, 'filt') and hasattr(self._rfb_ch.filt, 'band'): + return self._rfb_ch.filt.band + return 'bypass' + except: + return 'bypass' + + def _get_filter_fc(self) -> float: + """Get filter center frequency.""" + try: + if hasattr(self._rfb_ch, 'filt') and hasattr(self._rfb_ch.filt, 'fc'): + return self._rfb_ch.filt.fc + return 0.0 + except: + return 0.0 + + def _get_filter_bw(self) -> float: + """Get filter bandwidth.""" + try: + if hasattr(self._rfb_ch, 'filt') and hasattr(self._rfb_ch.filt, 'bw'): + return self._rfb_ch.filt.bw + return 1.0 + except: + return 1.0 + + def _get_filter_type(self) -> str: + """Get filter type.""" + state = self._get_filter_state() + if state == 'bypass': + return 'bypass' + elif 'LPF' in state: + return 'lowpass' + elif 'HPF' in state: + return 'highpass' + elif 'BPF' in state: + return 'bandpass' + return 'bypass' + + def _set_filter_enabled(self, enable: bool): + """Enable or disable filter.""" + if not enable: + self.root_instrument._soc.rfb_set_gen_filter(self._channel, fc=0, ftype='bypass') + else: + # Enable with current or default settings + fc = self._get_filter_fc() if self._get_filter_fc() > 0 else 6.0 + bw = self._get_filter_bw() + self.root_instrument._soc.rfb_set_gen_filter(self._channel, fc=fc, ftype='bandpass', bw=bw) + + def _set_filter(self, fc: Optional[float], bw: Optional[float], ftype: Optional[str]): + """Set filter parameters.""" + current_fc = self._get_filter_fc() if self._get_filter_fc() > 0 else 6.0 + current_bw = self._get_filter_bw() + current_type = self._get_filter_type() + + new_fc = fc if fc is not None else current_fc + new_bw = bw if bw is not None else current_bw + new_type = ftype if ftype is not None else current_type + + self.root_instrument._soc.rfb_set_gen_filter(self._channel, fc=new_fc, ftype=new_type, bw=new_bw) + + def _get_dc_enabled(self) -> bool: + """Check if DC output is enabled.""" + # DC state cannot be read back through Pyro4 + return False + + def _set_dc_enabled(self, enable: bool): + """Enable or disable DC output.""" + if enable: + self.root_instrument._soc.rfb_set_gen_dc(self._channel) + + +class ADCChannel(InstrumentChannel): + """ + Channel class for ADC (readout) channels. + Handles RF attenuators, filters, and gain configuration. + """ + + def __init__(self, parent: Instrument, name: str, channel: int, adc_config: dict, **kwargs): + super().__init__(parent, name, **kwargs) + self._channel = channel + self._adc_config = adc_config + + + self._att_cache = 10.0 # Default attenuation + + self._has_rf_chain = True + self._has_dc_chain = False + self._has_filter = True + self._has_attenuator = True + + # # Add basic channel info parameters + # self.add_parameter( + # 'channel_index', + # get_cmd=lambda: self._channel, + # snapshot_value=True, + # label='Channel Index', + # docstring='Index in the avg_bufs list' + # ) + + # self.add_parameter( + # 'adc_tile', + # get_cmd=lambda: int(adc_config['adc'][0]), + # snapshot_value=True, + # label='ADC Tile', + # docstring='ADC tile number' + # ) + + # self.add_parameter( + # 'adc_block', + # get_cmd=lambda: int(adc_config['adc'][1]), + # snapshot_value=True, + # label='ADC Block', + # docstring='ADC block number' + # ) + + # self.add_parameter( + # 'sampling_freq', + # get_cmd=lambda: self.root_instrument._soc_cfg['rf']['adcs'][adc_config['adc']]['fs'], + # unit='MHz', + # snapshot_value=True, + # label='Sampling Frequency', + # docstring='ADC sampling frequency in MHz' + # ) + + # RF Chain parameters (attenuator) + if self._has_rf_chain and self._has_attenuator: + self.add_parameter( + 'rf_enabled', + get_cmd=self._get_rf_enabled, + set_cmd=self._set_rf_enabled, + vals=Bool(), + label='RF Input Enabled', + docstring='Enable/disable RF input' + ) + + self.add_parameter( + 'att', + get_cmd=lambda: self._att_cache, + set_cmd=self._set_attenuator, + vals=Numbers(0, 31.75), + unit='dB', + label='Attenuator', + docstring='Input attenuation (0-31.75 dB)' + ) + + # Filter parameters + if self._has_filter: + self.add_parameter( + 'filter_enabled', + get_cmd=lambda: self._get_filter_state() != 'bypass', + set_cmd=self._set_filter_enabled, + vals=Bool(), + label='Filter Enabled', + docstring='Enable/disable filter' + ) + + self.add_parameter( + 'filter_center_freq', + get_cmd=self._get_filter_fc, + set_cmd=lambda val: self._set_filter(val, None, None), + vals=Numbers(2.0, 18.0), + unit='GHz', + label='Filter Center Frequency', + docstring='Filter center frequency in GHz' + ) + + self.add_parameter( + 'filter_bandwidth', + get_cmd=self._get_filter_bw, + set_cmd=lambda val: self._set_filter(None, val, None), + vals=Numbers(0.5, 4.0), + unit='GHz', + label='Filter Bandwidth', + docstring='Filter bandwidth in GHz' + ) + + self.add_parameter( + 'filter_type', + get_cmd=self._get_filter_type, + set_cmd=lambda val: self._set_filter(None, None, val), + vals=Enum('bandpass', 'highpass', 'lowpass', 'bypass'), + label='Filter Type', + docstring='Filter type' + ) + + # DC Chain parameters (gain) + if self._has_dc_chain: + self.add_parameter( + 'dc_enabled', + get_cmd=self._get_dc_enabled, + set_cmd=self._set_dc_enabled, + vals=Bool(), + label='DC Input Enabled', + docstring='Enable/disable DC input' + ) + + self.add_parameter( + 'dc_gain', + get_cmd=self._get_dc_gain, + set_cmd=lambda val: self.root_instrument._soc.rfb_set_ro_dc(self._channel, val), + vals=Numbers(-6, 26), + unit='dB', + label='DC Gain', + docstring='DC input gain (-6 to 26 dB)' + ) + + def _get_rf_enabled(self) -> bool: + """Check if RF input is enabled.""" + #TODO + return True + + def _set_rf_enabled(self, enable: bool): + """Enable or disable RF input.""" + if enable: + # Enable with default attenuation (10 dB) + self.root_instrument._soc.rfb_set_ro_rf(self._channel, 10) + else: + self.root_instrument._soc.rfb_set_ro_rf(self._channel, 31.75) + + def _set_attenuator(self, val: float): + """Set attenuator value and cache it.""" + self._att_cache = val + self.root_instrument._soc.rfb_set_ro_rf(self._channel, val) + + def _get_filter_state(self) -> str: + """Get current filter state.""" + try: + if hasattr(self._rfb_ch, 'filt') and hasattr(self._rfb_ch.filt, 'band'): + return self._rfb_ch.filt.band + return 'bypass' + except: + return 'bypass' + + def _get_filter_fc(self) -> float: + """Get filter center frequency.""" + try: + if hasattr(self._rfb_ch, 'filt') and hasattr(self._rfb_ch.filt, 'fc'): + return self._rfb_ch.filt.fc + return 0.0 + except: + return 0.0 + + def _get_filter_bw(self) -> float: + """Get filter bandwidth.""" + try: + if hasattr(self._rfb_ch, 'filt') and hasattr(self._rfb_ch.filt, 'bw'): + return self._rfb_ch.filt.bw + return 1.0 + except: + return 1.0 + + def _get_filter_type(self) -> str: + """Get filter type.""" + state = self._get_filter_state() + if state == 'bypass': + return 'bypass' + elif 'LPF' in state: + return 'lowpass' + elif 'HPF' in state: + return 'highpass' + elif 'BPF' in state: + return 'bandpass' + return 'bypass' + + def _set_filter_enabled(self, enable: bool): + """Enable or disable filter.""" + if not enable: + self.root_instrument._soc.rfb_set_ro_filter(self._channel, fc=0, ftype='bypass') + else: + fc = self._get_filter_fc() if self._get_filter_fc() > 0 else 6.0 + bw = self._get_filter_bw() + self.root_instrument._soc.rfb_set_ro_filter(self._channel, fc=fc, ftype='bandpass', bw=bw) + + def _set_filter(self, fc: Optional[float], bw: Optional[float], ftype: Optional[str]): + """Set filter parameters.""" + current_fc = self._get_filter_fc() if self._get_filter_fc() > 0 else 6.0 + current_bw = self._get_filter_bw() + current_type = self._get_filter_type() + + new_fc = fc if fc is not None else current_fc + new_bw = bw if bw is not None else current_bw + new_type = ftype if ftype is not None else current_type + + self.root_instrument._soc.rfb_set_ro_filter(self._channel, fc=new_fc, ftype=new_type, bw=new_bw) + + def _get_dc_enabled(self) -> bool: + """Check if DC input is enabled.""" + # DC state cannot be read back through Pyro4 + return False + + def _set_dc_enabled(self, enable: bool): + """Enable or disable DC input.""" + if enable: + gain = self._get_dc_gain() + self.root_instrument._soc.rfb_set_ro_dc(self._channel, gain) + + def _get_dc_gain(self) -> float: + """Get DC gain.""" + # DC gain cannot be read back through Pyro4 + # Store locally if needed, or return default + return 0.0 + + +class BiasChannel(InstrumentChannel): + """ + Channel class for DC bias outputs. + """ + + def __init__(self, parent: Instrument, name: str, channel: int, **kwargs): + super().__init__(parent, name, **kwargs) + self._channel = channel + self._voltage_step = 0.0001 # 0.1 mV step size + self._resistor = 500 #Ohms + self.step_delay_sec = 0.01 #sec + + self.add_parameter( + 'channel_index', + get_cmd=lambda: self._channel, + snapshot_value=True, + label='Channel Index', + docstring='Bias channel index' + ) + + self.add_parameter( + 'voltage', + get_cmd=lambda: self.root_instrument._soc.rfb_get_bias(self._channel), + set_cmd=lambda val: self.ramp_voltage(val), + vals=Numbers(-10, 10), + unit='V', + label='Bias Voltage', + docstring='Bias voltage (-10 to 10 V)' + ) + + self.add_parameter( + 'current', + get_cmd=lambda: self.root_instrument._soc.rfb_get_bias(self._channel)/self._resistor * 1e3, + set_cmd=lambda val: self.ramp_voltage(val*self._resistor*1e-3), + vals=Numbers(-100, 100), + unit='mA', + label='Bias Current', + docstring='Bias Current' + ) + + + # self.add_parameter( + # 'current_step', + # get_cmd=lambda : self._voltage_step/self._resistor, + # set_cmd=lambda new_current_step: setattr(self, '_voltage_step', new_current_step*self._resistor), + # vals=Numbers(0, 1), + # unit='V', + # label='Voltage Step', + # docstring='Voltage step size (0 to 1 V)' + # ) + + self.add_parameter( + 'current_ramp_rate', + get_cmd=lambda : self._voltage_step/self._resistor * 1/self.step_delay_sec * 1e6, + set_cmd=lambda new_current_step: setattr(self, '_voltage_step', new_current_step*self._resistor* self.step_delay_sec * 1e-6), + vals=Numbers(0, 500), + unit='uA per Sec', + label='Current Ramp Rate', + docstring='Usually around 10uA per sec' + ) + + self.add_parameter( + 'Resistor', + get_cmd = lambda : self._resistor, + set_cmd = lambda value: setattr(self, '_resistor', value), + unit = 'Ohms', + label = 'Resistor Value', + docstring='Resistor used to calculate the ramp rate' + ) + + def ramp_voltage(self, target_voltage: float, step_delay_sec: float = 0.01): + """Ramp bias voltage to target value in steps.""" + import time + from numpy import arange, round + if self.step_delay_sec : step_delay_sec = self.step_delay_sec + + current_voltage = self.root_instrument._soc.rfb_get_bias(self._channel) + step = self._voltage_step if target_voltage > current_voltage else -self._voltage_step + logging.info(__name__ + f"Ramping bias channel {self._channel}: {current_voltage}V ({current_voltage/self._resistor * 1e3}mA) to {target_voltage}V({target_voltage/self._resistor * 1e3}mA) in steps of {step}V ({step/self._resistor * 1e3}mA) with {step_delay_sec} s delay") + + for voltage in arange(current_voltage, target_voltage, step): + voltage = round(voltage, 7) # round off random floats to 7 decimals + self.root_instrument._soc.rfb_set_bias(self._channel, voltage) + time.sleep(step_delay_sec) + + # Ensure final voltage is set + self.root_instrument._soc.rfb_set_bias(self._channel, target_voltage) + + +class QickSoC_RFBoard(Instrument): + """ + QCoDes driver for QICK SoC with RF Board. + + Automatically detects available DAC and ADC channels and creates + appropriate parameters for attenuators, filters, DC biases, etc. + + Parameters + ---------- + name : str + Name of the instrument + soc : QickSoc or RFQickSoc + The QICK SoC object (already initialized) + **kwargs + Additional keyword arguments passed to Instrument + + Examples + -------- + >>> from qick import QickSoc + >>> soc = QickSoc() + >>> qick_driver = QickSoC_RFBoard('qick', soc=soc) + >>> qick_driver.dac0.att1(10) # Set attenuator 1 to 10 dB + >>> qick_driver.adc0.filter_center_freq(6.5) # Set filter center freq to 6.5 GHz + """ + + def __init__(self, name: str,nameserver_host,nameserver_port,nameserver_name, **kwargs): + super().__init__(name, **kwargs) + + + import Pyro4 + from qick import QickConfig + Pyro4.config.SERIALIZER = 'pickle' + Pyro4.config.PICKLE_PROTOCOL_VERSION = 4 + ns = Pyro4.locateNS(host=nameserver_host, port=nameserver_port) + soc = Pyro4.Proxy(ns.lookup(nameserver_name)) + soccfg = QickConfig(soc.get_cfg()) + self._soc = soc + self._soc_cfg = soccfg.get_cfg() #get the dictionary version of the soc config + + # self._soc = soc + # self._soc_cfg = soccfg.get_cfg() #get the dictionary version of the soc config + + # Detect and add DAC channels + # self._detect_dac_channels() + + # Detect and add ADC channels + # self._detect_adc_channels() + + # Detect and add bias channels + self._detect_bias_channels() + + # Add board-level parameters + self._add_board_parameters() + + logging.info(__name__ + f"Initialized QickSoC_RFBoard driver '{name}' with " + # f"{len(self.dac_channels)} DAC channels, " + # f"{len(self.adc_channels)} ADC channels, and " + f"{len(self.bias_channels) if hasattr(self, 'bias_channels') else 0} bias channels") + + def _detect_dac_channels(self): + """Detect and create DAC channel instances.""" + self.dac_channels = ChannelList(self, "dac_channels", DACChannel) + + if 'gens' in self._soc_cfg: + for i, gen_config in enumerate(self._soc_cfg['gens']): + channel = DACChannel(self, f'dac{i}', i, gen_config) + self.dac_channels.append(channel) + self.add_submodule(f'dac{i}', channel) + + def _detect_adc_channels(self): + """Detect and create ADC channel instances.""" + self.adc_channels = ChannelList(self, "adc_channels", ADCChannel) + + if 'readouts' in self._soc_cfg: + for i, adc_config in enumerate(self._soc_cfg['readouts']): + channel = ADCChannel(self, f'adc{i}', i, adc_config) + self.adc_channels.append(channel) + self.add_submodule(f'adc{i}', channel) + + def _detect_bias_channels(self): + """Detect and create bias channel instances.""" + self.bias_channels = ChannelList(self, "bias_channels", BiasChannel) + + # Typically there are 8 bias channels on RF boards + try: + # Try to access bias channel 0 to see if it's supported + self._soc.rfb_get_bias(0) + # If successful, assume 8 bias channels (standard) + for i in range(8): + channel = BiasChannel(self, f'bias{i}', i) + self.bias_channels.append(channel) + self.add_submodule(f'bias{i}', channel) + except: + # No bias channels available + pass + + def _add_board_parameters(self): + """Add board-level parameters.""" + self.add_parameter( + 'board_type', + get_cmd=lambda: str(self._soc_cfg['board']), + snapshot_value=True, + label='Board Type', + docstring='QICK board type (ZCU111, ZCU216, etc.)' + ) + + self.add_parameter( + 'firmware_version', + get_cmd=lambda: str(self._soc_cfg['fw_timestamp']), + snapshot_value=True, + label='Firmware Version', + docstring='Firmware build timestamp' + ) + + self.add_parameter( + 'software_version', + get_cmd=lambda: str(self._soc_cfg['sw_version']), + snapshot_value=True, + label='Software Version', + docstring='QICK software version' + ) + try: + self.add_parameter( + 'ref_clock_freq', + get_cmd=lambda: float(self._soc_cfg['refclk_freq']), + unit='MHz', + snapshot_value=True, + label='Reference Clock Frequency', + docstring='RF reference clock frequency in MHz' + ) + except: + pass + + def get_idn(self) -> Dict[str, Optional[str]]: + """Return instrument identification.""" + return { + 'vendor': 'QICK', + 'model': self._soc_cfg['board'], + 'serial': None, + 'firmware': self._soc_cfg['fw_timestamp'] + } + + def print_overview(self): + """Print a comprehensive overview of all channels and their capabilities.""" + print("="*80) + print(f"QICK SoC RF Board Driver: {self.name}") + print("="*80) + print(f"Board: {self.board_type()}") + print(f"Firmware: {self.firmware_version()}") + print(f"Software: {self.software_version()}") + if 'refclk_freq' in self._soc_cfg: + print(f"Reference Clock: {self.ref_clock_freq()} MHz") + + print("\n" + "="*80) + print(f"DAC Channels ({len(self.dac_channels)}):") + print("="*80) + for dac in self.dac_channels: + print(f"\n{dac.short_name}:") + # print(f" Tile: {dac.dac_tile()}, Block: {dac.dac_block()}") + # print(f" Sampling Freq: {dac.sampling_freq()} MHz") + if hasattr(dac, 'att1'): + print(f" RF Chain: Yes (Att1={dac.att1()} dB, Att2={dac.att2()} dB)") + if hasattr(dac, 'filter_enabled'): + filter_status = "Enabled" if dac.filter_enabled() else "Disabled" + print(f" Filter: {filter_status}") + if dac.filter_enabled(): + print(f" Type: {dac.filter_type()}, FC: {dac.filter_center_freq()} GHz, BW: {dac.filter_bandwidth()} GHz") + if hasattr(dac, 'dc_enabled'): + print(f" DC Chain: Available") + + print("\n" + "="*80) + print(f"ADC Channels ({len(self.adc_channels)}):") + print("="*80) + for adc in self.adc_channels: + print(f"\n{adc.short_name}:") + # print(f" Tile: {adc.adc_tile()}, Block: {adc.adc_block()}") + # print(f" Sampling Freq: {adc.sampling_freq()} MHz") + if hasattr(adc, 'att'): + print(f" RF Chain: Yes (Att={adc.att()} dB)") + if hasattr(adc, 'filter_enabled'): + filter_status = "Enabled" if adc.filter_enabled() else "Disabled" + print(f" Filter: {filter_status}") + if adc.filter_enabled(): + print(f" Type: {adc.filter_type()}, FC: {adc.filter_center_freq()} GHz, BW: {adc.filter_bandwidth()} GHz") + if hasattr(adc, 'dc_enabled'): + print(f" DC Chain: Available (Gain: {adc.dc_gain() if hasattr(adc, 'dc_gain') else 'N/A'} dB)") + + if hasattr(self, 'bias_channels') and len(self.bias_channels) > 0: + print("\n" + "="*80) + print(f"Bias Channels ({len(self.bias_channels)}):") + print("="*80) + for bias in self.bias_channels: + print(f"{bias.short_name}: {bias.voltage()} V") + + print("\n" + "="*80) + + +if __name__ == "__main__": + + print("I want to quit") From fc3d2ebe6581bf4703924084281c5bf9b659eec0 Mon Sep 17 00:00:00 2001 From: Gaurav Agarwal Date: Thu, 18 Jun 2026 18:05:15 -0400 Subject: [PATCH 2/3] added support for live data streaming form qick, ability to change sweep name for qick_sweep. --- .../instruments/qick/qick_streaming_sweep.py | 519 ++++++++++++++++++ .../instruments/qick/qick_sweep_v2.py | 14 +- 2 files changed, 527 insertions(+), 6 deletions(-) create mode 100644 src/cqedtoolbox/instruments/qick/qick_streaming_sweep.py diff --git a/src/cqedtoolbox/instruments/qick/qick_streaming_sweep.py b/src/cqedtoolbox/instruments/qick/qick_streaming_sweep.py new file mode 100644 index 0000000..9f2500e --- /dev/null +++ b/src/cqedtoolbox/instruments/qick/qick_streaming_sweep.py @@ -0,0 +1,519 @@ +"""Streaming helper for using QICK programs with the Sweep framework. + +This module provides `QickBoardStreamingSweep`, a decorator that uses a program's +`stream_acquire()` method (if present) to receive incremental IQ chunks and +yield them as dictionaries with independent axes. + +Behavior: +- Precomputes sweep-value arrays for independent specs (PulseVariable, TimeVariable etc). +- Contracts round and per-round rep indices into a single 'rep' = (round + per-round rep) axis. +- For each streaming event/blocking acquire call, yields a single aggregated + dictionary containing: + - 'ro_channel_and_readout_trigger': identifier string for each data point + (format: "_r") + - 'channel': list of channel indices corresponding to each data point + - 'readout': list of readout trigger indices corresponding to each data point + - 'data': list of complex IQ values flattened in arrival order + - 'rep': list of global rep indices (collapsed from round and per-round indices) + - One key per independent sweep spec, mapping to a list of sweep values + +- Falls back to calling `acquire()` (blocking) if `stream_acquire()` is not + available on the program, yielding the same dictionary structure at the end just with all of the experiment's data at once + +All data is yielded as raw in dictionaries without any processing, only reshaping and tagging/labelling. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Dict, List, Optional +from collections import OrderedDict +import numpy as np + +from labcore.measurement import DataSpec +from labcore.measurement.record import make_data_spec + +from labcore.measurement.sweep import AsyncRecord +import logging + +logger = logging.getLogger(__name__) + + +config = None + + + +@dataclass +class ComplexQICKData(DataSpec): + """Complex IQ readout data spec with full dimensionality tracking. + + Attributes: + i_data_stream: label for I component (default 'I') + q_data_stream: label for Q component (default 'Q') + """ + i_data_stream: str = 'I' + q_data_stream: str = 'Q' + # ro_ch: Optional[int] = None + # sweep_dims: Optional[tuple] = None + # n_readouts: Optional[int] = None + + def set_name(self, name: str) -> 'ComplexQICKData': + """Change the name of this dataspec and return self for chaining.""" + self.name = name + return self + + +@dataclass +class PulseVariable(DataSpec): + pulse_parameter: Optional[str] = None + sweep_parameter: Optional[str] = None + loop_idx: Optional[int] = None + loop_name: Optional[str] = None + + +@dataclass +class TimeVariable(DataSpec): + time_parameter: Optional[str] = None + loop_idx: Optional[int] = None + loop_name: Optional[str] = None + +@dataclass +class RepVariable(DataSpec): + """Rep number variable spec for QICK sweeps.""" + loop_idx: Optional[int] = None + loop_name: Optional[str] = 'reps' + + + + +class QickBoardStreamingSweep(AsyncRecord): + """Decorator to run QICK programs and save streaming data incrementally. + The data specs must be defined in loop/measurement order. + """ + + def __init__(self, *specs, **kwargs): + self.communicator = {} + self.unordered_nonexhaustive_specs = list(specs) + self.specs = [] + for s in specs: + spec = make_data_spec(s) + self.specs.append(spec) + + def setup(self, func, *args, **kwargs): + + if config is None: + raise Exception("QickSweep: config is not set") + + self.config = config + conf = config.config() #TODO change this to be passed from kwargs, not from global + qick_program = func(soccfg=conf[0], reps=conf[1].get('reps'), final_delay=conf[1].get('final_delay'), cfg=conf[1]) + self.communicator['qick_program'] = qick_program + prog = qick_program + # at this point, the qick program has been compiled into asm. we can retreive loop info. + + ################################################ + ### ========= Get Independnent Sweep Axes ============== + # get loops info + loop_dict : OrderedDict = prog.loop_dict # { (reps, 10000), (gain_sweep, 100), (freq_sweep, 50) ... } + loop_dims : tuple = prog.loop_dims + + # construct data specs in the order of loops in the program + loop_idx = 0 + ordered_specs = [] + for name, dim in loop_dict.items(): + # check if spec loop name exists + # if it exists, add it according to order + if name == 'reps': + ordered_specs.append(RepVariable(name='rep', loop_name='reps', loop_idx=loop_idx)) + loop_idx +=1 + continue + + elif any( (spec.loop_name == name) for spec in self.unordered_nonexhaustive_specs): + for spec in self.unordered_nonexhaustive_specs: + if spec.loop_name == name: + spec.loop_idx = loop_idx + loop_idx +=1 + new_spec = make_data_spec(spec) + ordered_specs.append(new_spec) + break + else : pass + + else: + raise Exception(f"QickStreamingSweep: Spec for loop '{name}' not provided in specs.") + self.communicator['ordered_specs'] = ordered_specs + + # ========== Get sweep arrays ============ + # prepare sweep value iterators for independent specs, that will render sweep values + self.sweep_arrays: OrderedDict[str, np.ndarray] = OrderedDict() + + for ds in ordered_specs: + if isinstance(ds, PulseVariable): + arr = prog.get_pulse_param(ds.pulse_parameter, ds.sweep_parameter, as_array=True) + self.sweep_arrays[ds.name] = np.asarray(arr).flatten() + elif isinstance(ds, TimeVariable): + arr = prog.get_time_param(ds.time_parameter, 't', as_array=True) + self.sweep_arrays[ds.name] = np.asarray(arr).flatten() + # coalesce rounds and reps into single rep axis + elif isinstance(ds, RepVariable): + self.sweep_arrays[ds.name] = np.arange(1, prog.reps * conf[1].get('rounds') + 1, 1) + + # get number of triggers etc + readout_spec_number = sum([1 for ds in self.unordered_nonexhaustive_specs if isinstance(ds, ComplexQICKData)]) + + # Extract ComplexQICKData spec names in order for use in collect() + complex_data_specs = [ds for ds in self.unordered_nonexhaustive_specs if isinstance(ds, ComplexQICKData)] + self.communicator['complex_data_spec_names'] = [ds.name for ds in complex_data_specs] + + readout_dict = prog.ro_chs #outputs ordered dict, 'ro_ch_number : int' : { '#trigs': int, 'length': int, 'length_us': float, ...} + reads_per_shot = [ro['trigs'] for ro in prog.ro_chs.values()] # list of [# readouts for ro channel , # readouts for ro channel, ... ] + + # sanity check + assert sum(reads_per_shot) == readout_spec_number, "Mismatch in defined readout specs and program readout triggers. Check defined data specs." + + # Store loop metadata needed for chunking stream events by first ordered spec value + ordered_spec_names = [ds.name for ds in ordered_specs] + sweep_spec_names = list(self.sweep_arrays.keys()) + if ordered_spec_names != sweep_spec_names: + raise RuntimeError( + "QickStreamingSweep: Internal spec ordering mismatch between ordered_specs and sweep_arrays." + ) + + # Save spec ordering and loop dimensions for use in collect() chunking logic + self.communicator['sweep_spec_names'] = sweep_spec_names + self.communicator['loop_dims'] = tuple(loop_dims) + # points_per_round = product of all loop dimensions = total flattened indices per round + self.communicator['points_per_round'] = int(np.prod(loop_dims)) if len(loop_dims) > 0 else 1 + # points_per_outer_value = product of inner loop dimensions; used to map flat index to outermost axis value + self.communicator['points_per_outer_value'] = int(np.prod(loop_dims[1:])) if len(loop_dims) > 1 else 1 + + + def collect(self, len_normalize: bool = True, stream: bool = True, **kwargs): + """Collect streaming IQ data from QICK program and yield aggregated dictionaries. + Falls back to blocking acquire() if streaming not available. + + Parameters: + - `rounds`: number of averaging rounds forwarded to `stream_acquire`. + - `len_normalize`: if True, normalizes IQ points by their readout window values. + - `stream`: if True, uses streaming API; otherwise falls back to blocking acquire. + + Yields: + - dict with keys: { 'ro_channel_and_readout_idx' : np.array(IQ Values), + 'reps' : np.array(rep number corresponding to data above), + 'loop_name' : np.array(sweep loop value corresponding to data above),} + """ + prog = self.communicator['qick_program'] + complex_data_spec_names = self.communicator.get('complex_data_spec_names', []) + sweep_spec_names = self.communicator.get('sweep_spec_names', list(self.sweep_arrays.keys())) + loop_dims = self.communicator.get('loop_dims', tuple()) + points_per_round = self.communicator.get('points_per_round', 1) + points_per_outer_value = max(1, self.communicator.get('points_per_outer_value', 1)) + + # ========== Collect streaming data ================ + stream_fn = getattr(prog, 'stream_acquire', None) + # Use streaming API if available; otherwise fall back to a blocking acquire + if callable(stream_fn) and stream: + gen = prog.stream_acquire( + self.config.soc, + rounds=kwargs.get('rounds', 1), + progress=True, + len_normalize=len_normalize, + remove_offset=False, + include_full=False, + return_end_of_exp_raw=False, + ) + + # State for buffering data across stream events by outermost loop value + current_outer_idx: Optional[int] = None # Current outermost index being accumulated + chunk_buffer: Dict[str, List[Any]] = {} # Accumulates IQ and sweep-axis data until outermost value changes + loop_dims_warning_logged = False # Prevent log spam for loop dimension mismatch + + def _flush_chunk() -> Optional[Dict[str, np.ndarray]]: + """Convert accumulated lists to numpy arrays and reset buffer for next chunk.""" + nonlocal chunk_buffer + if not chunk_buffer: + return None + out = {k: np.asarray(v) for k, v in chunk_buffer.items()} + chunk_buffer = {} + return out + + # iterate events and yield one aggregated dictionary per outermost sweep-loop value + for ev in gen: + if ev.get('event') == 'data': + partial = ev.get('partial', {}) + round_idx = int(ev.get('round', 0)) + count_start_flat, count_stop_flat = ev.get('rep_slice', (0, 0)) # in per-round flattened space + new_points = max(0, int(count_stop_flat) - int(count_start_flat)) + if new_points == 0: + continue + + # Get ComplexQICKData spec names (in definition order) to use as keys + spec_idx = 0 + warning_logged = False + event_series: Dict[str, np.ndarray] = {} + + for ch in partial.keys(): + comp_data = partial[ch].dot([1,1j]) #data in shape of (new_points, nreads) + if comp_data.ndim == 1: + comp_data = comp_data[:, np.newaxis] + for ro_no in range(comp_data.shape[1]): + # Use spec name if available, otherwise fall back to generated key + # using extra logic in case some dataspec in decorator was mislabelled/undefined. Result : still collect data but warn user. + if spec_idx < len(complex_data_spec_names): + key = complex_data_spec_names[spec_idx] + else: + if not warning_logged: + logger.warning( + f"ComplexQICKData specs mismatch: Expected more specs than defined, received more data from QICK. All collected data is probably mislabeled. ") + warning_logged = True + key = f"roch{ch}_read{ro_no}" + event_series[key] = comp_data[:, ro_no] + spec_idx += 1 + + # Map each incoming point to its outermost loop index, buffering across stream events + for i in range(new_points): + per_round_flat_idx = int(count_start_flat) + i # Within-round flattened index from stream event + global_flat_idx = round_idx * points_per_round + per_round_flat_idx # Global across all rounds + outer_idx = global_flat_idx // points_per_outer_value # Which value of first ordered spec? + + # On outermost index change: flush accumulated chunk and start new one + if current_outer_idx is None: + current_outer_idx = outer_idx + elif outer_idx != current_outer_idx: + out = _flush_chunk() + if out is not None: + yield out # Yield one chunk per outermost loop value + current_outer_idx = outer_idx + + # Append IQ data from this point to chunk buffer + for key, vals in event_series.items(): + chunk_buffer.setdefault(key, []).append(vals[i]) + + # Map per-round flat index to multi-dimensional loop indices and append sweep values + if len(loop_dims) == len(sweep_spec_names) and len(loop_dims) > 0: + # Unravel flat index back to (outermost, inner1, inner2, ...) coordinates + point_indices = np.unravel_index(per_round_flat_idx % points_per_round, loop_dims) + for spec_pos, spec_name in enumerate(sweep_spec_names): + spec_vals = self.sweep_arrays[spec_name] + if spec_pos == 0: + # First spec is outermost; use global rep index (clamped if collect() uses more rounds than config) + arr_idx = outer_idx if outer_idx < len(spec_vals) else len(spec_vals) - 1 + else: + # Inner specs use coordinates from unraveled indices + arr_idx = int(point_indices[spec_pos]) + chunk_buffer.setdefault(spec_name, []).append(spec_vals[arr_idx]) + else: + # Fallback if loop dimension structure doesn't match spec count (edge case) + if not loop_dims_warning_logged: + logger.warning( + "QickStreamingSweep: loop_dims does not match spec ordering; using index-zero fallback for sweep values." + ) + loop_dims_warning_logged = True + for spec_name in sweep_spec_names: + spec_vals = self.sweep_arrays[spec_name] + chunk_buffer.setdefault(spec_name, []).append(spec_vals[0]) # Safe fallback to first value + + # Safety check: if chunk has accumulated enough points for one outermost value, yield it immediately + if chunk_buffer.get(sweep_spec_names[0]) is not None and len(chunk_buffer[sweep_spec_names[0]]) >= points_per_outer_value: + out = _flush_chunk() + if out is not None: + yield out # Yield complete chunk + current_outer_idx = None # Reset for next chunk + + elif ev.get('event') == 'round-complete': + # Prepare for next round: continue buffering without yielding + continue + + # After all rounds, flush any remaining buffered chunk + out = _flush_chunk() + if out is not None: + yield out + + + else: #TODO : acquire() fallback + logger.critical("Streaming not callable, check qick libraries. Falling back to blocking acquire().") + + + + + # def collect_streaming(self, rounds: int = 1, include_full: bool = False, remove_offset: bool = False): + # """Stream or acquire IQ data and yield aggregated dictionaries. + + # Yields one dictionary per streaming event (or one for blocking acquire) + # containing complex IQ data with corresponding metadata: channel, readout + # trigger indices, rep numbers, and sweep parameter values. + + # Parameters: + # - `rounds`: number of averaging rounds forwarded to `stream_acquire`. + # - `include_full`: passed to `stream_acquire` if available. + # - `remove_offset`: passed to `stream_acquire` if available. + + # Yields: + # - dict with keys: 'ro_channel_and_readout_trigger', 'channel', 'readout', + # 'data', 'rep', and one key per sweep spec. + # """ + # prog = self.communicator['qick_program'] + # cfg = self.config.config()[1] + + # # ------- Figure out independent axes and non-IQ dependent specs------------ + # # Extract loop dimensions and readout configuration from program, + # # this contains the loops as qick will execute them ; including reps + # # We try to follow QICK's terminology of shot, rep, round, loops. + # loop_dims = getattr(prog, 'loop_dims', []) # contains rep number, could be any one of them depending on execution + # loop_dims = tuple(loop_dims) # because order matters, immutable + # # number of per-round reps (flattened loop length) + # total_reps_per_round = int(np.prod(loop_dims)) if len(loop_dims) > 0 else 1 + + # # Prepare sweep-value arrays for independent specs so we can map flat indices -> values + # sweep_arrays: Dict[str, np.ndarray] = {} + # for ds in self.specs: + # #get independent specs only + # if ds.depends_on is None and not isinstance(ds, ComplexQICKData): + # spec_name = ds.name + # try: + # if isinstance(ds, PulseVariable): + # arr = prog.get_pulse_param(ds.pulse_parameter, ds.sweep_parameter, as_array=True) + # elif isinstance(ds, TimeVariable): + # arr = prog.get_time_param(ds.time_parameter, 't', as_array=True) * (cfg['n_echoes'] + 1) + # else: + # arr = np.asarray(ds.default if hasattr(ds, 'default') else []) + # except Exception: + # logger.error(f"Could not get sweep array for spec {spec_name}, will fill with None") + # arr = np.array([]) + + # arr = np.asarray(arr) # ensure numpy array, qick may return lists + # if arr.size == 0: + # # fill with None so indexing is safe + # sweep_arrays[spec_name] = np.array([None] * total_reps_per_round, dtype=object) + # else: + # # try to flatten arr into per-round flattened ordering + # if arr.size == total_reps_per_round: + # sweep_arrays[spec_name] = arr.flatten() + # else: + # # if arr has same shape as loop_dims, flatten in C-order + # if arr.shape == loop_dims: + # sweep_arrays[spec_name] = np.asarray(arr).flatten() + # else: + # # best-effort: resize/repeat to match length + # sweep_arrays[spec_name] = np.resize(np.asarray(arr).flatten(), total_reps_per_round) + + # aggregated_sweep_iterable = + # # Use streaming API if available; otherwise fall back to a blocking acquire + # stream_fn = getattr(prog, 'stream_acquire', None) + # if callable(stream_fn): + # gen = prog.stream_acquire( + # self.config.soc, + # rounds=rounds, + # include_full=include_full, + # remove_offset=remove_offset, + # progress=True, + # return_end_of_exp_raw=False, + # ) + + # # iterate events and yield a single aggregated dictionary per data event + # for ev in gen: + # if ev.get('event') == 'data': + # partial = ev.get('partial', {}) + # round_idx = int(ev.get('round', 0)) + # rep_slice = ev.get('rep_slice', (0, 0)) # (start_rep, stop_rep) in per-round flattened space + + # rep_start_flat, rep_stop_flat = rep_slice + + # # aggregated lists across channels and readouts for this event + # data_list: List[complex] = [] + # rep_list: List[int] = [] + # ro_ch_readout_list: List[str] = [] + # channel_list: List[Any] = [] + # readout_list: List[int] = [] + # sweep_values_for_specs: Dict[str, List[Any]] = {k: [] for k in sweep_arrays.keys()} + + # for ch, arr in partial.items(): + # arr = np.asarray(arr) + # if arr.size == 0: + # continue + # new_points, nreads, _ = arr.shape + + # for i in range(new_points): + # per_round_flat = rep_start_flat + i + # # global rep index collapses round and per-round index + # global_rep = round_idx * total_reps_per_round + per_round_flat + + # for readout_idx in range(nreads): + # val = arr[i, readout_idx, 0] + 1j * arr[i, readout_idx, 1] + # data_list.append(val) + # rep_list.append(global_rep) + # ro_ch_readout_list.append(f"{ch}_r{readout_idx}") + # channel_list.append(ch) + # readout_list.append(readout_idx) + + # for spec_name, vals in sweep_arrays.items(): + # try: + # sweep_values_for_specs[spec_name].append(vals[per_round_flat]) + # except Exception: + # sweep_values_for_specs[spec_name].append(None) + + # out: Dict[str, Any] = { + # 'ro_channel_and_readout_trigger': ro_ch_readout_list, + # 'channel': channel_list, + # 'readout': readout_list, + # 'data': data_list, + # 'rep': rep_list, + # } + # out.update(sweep_values_for_specs) + + # yield out + + # # ignore other event types for now (e.g., 'round-complete') + # return + + # else: + # # blocking fallback: acquire full buffers then yield same structured dicts + # try: + # data = prog.acquire(self.config.soc, progress=True) + # except Exception: + # raise + + # channels = data[0] if isinstance(data, tuple) else data + + # # aggregate across channels for blocking acquire + # data_list: List[complex] = [] + # rep_list: List[int] = [] + # ro_ch_readout_list: List[str] = [] + # channel_list: List[Any] = [] + # readout_list: List[int] = [] + # sweep_values_for_specs: Dict[str, List[Any]] = {k: [] for k in sweep_arrays.keys()} + + # for ch_idx, arr in enumerate(channels): + # arr = np.asarray(arr) + # # arr expected shape: (nreps, nreads, 2) + # if arr.ndim < 3: + # continue + # nreps, nreads, _ = arr.shape + + # for rep in range(nreps): + # for readout_idx in range(nreads): + # val = arr[rep, readout_idx, 0] + 1j * arr[rep, readout_idx, 1] + # data_list.append(val) + # rep_list.append(rep) + # ro_ch_readout_list.append(f"{ch_idx}_r{readout_idx}") + # channel_list.append(ch_idx) + # readout_list.append(readout_idx) + + # for spec_name, vals in sweep_arrays.items(): + # try: + # sweep_values_for_specs[spec_name].append(vals[rep]) + # except Exception: + # sweep_values_for_specs[spec_name].append(None) + + # out: Dict[str, Any] = { + # 'ro_channel_and_readout_trigger': ro_ch_readout_list, + # 'channel': channel_list, + # 'readout': readout_list, + # 'data': data_list, + # 'rep': rep_list, + # } + # out.update(sweep_values_for_specs) + + # yield out + # return + +__all__ = ["QickBoardStreamingSweep", "ComplexQICKData", "PulseVariable", "TimeVariable"] diff --git a/src/cqedtoolbox/instruments/qick/qick_sweep_v2.py b/src/cqedtoolbox/instruments/qick/qick_sweep_v2.py index c6142b0..864b4de 100755 --- a/src/cqedtoolbox/instruments/qick/qick_sweep_v2.py +++ b/src/cqedtoolbox/instruments/qick/qick_sweep_v2.py @@ -18,7 +18,7 @@ steps reps final_delay -""" +""" import numpy as np from collections.abc import Iterable, Generator @@ -39,6 +39,11 @@ class ComplexQICKData(DataSpec): i_data_stream: str = 'I' q_data_stream: str = 'Q' + def set_name(self, name: str) -> 'ComplexQICKData': + """Change the name of this dataspec and return self for chaining.""" + self.name = name + return self + @dataclass class PulseVariable(DataSpec): @@ -82,7 +87,7 @@ def collect(self, *args, **kwargs): """ Get the measurement data. Note that one can overload the given acquire function if one needs - to perform other specific tasks. e.g. Needs to plot each non-averaged + to perform other specific tasks. e.g. Needs to plot each non-averaged points in the I-Q plane for a readout fidelity experiment. Assumptions * Given DataSpecs are either independent, dependent, or ComplexQICKData. @@ -90,7 +95,7 @@ def collect(self, *args, **kwargs): # TODO: How can I extend this to multiple measurement rounds? (e.g. active reset) # Run the program - data = self.communicator["qick_program"].acquire(self.config.soc, progress=False)[0] + data = self.communicator["qick_program"].acquire(self.config.soc, progress=True)[0] cfg = self.config.config()[1] return_data = {} @@ -138,6 +143,3 @@ def collect(self, *args, **kwargs): shapeIdx += 1 yield return_data - - - From e8834087998137c97ed0aaa00d78f5771813b274 Mon Sep 17 00:00:00 2001 From: Gaurav Agarwal Date: Thu, 18 Jun 2026 18:05:34 -0400 Subject: [PATCH 3/3] added new api for SWMR reading and writing based on ddh5_xr --- src/cqedtoolbox/setup_measurements.py | 53 +++++++++++++++++---------- 1 file changed, 34 insertions(+), 19 deletions(-) diff --git a/src/cqedtoolbox/setup_measurements.py b/src/cqedtoolbox/setup_measurements.py index 10b7be5..e1a6923 100644 --- a/src/cqedtoolbox/setup_measurements.py +++ b/src/cqedtoolbox/setup_measurements.py @@ -9,11 +9,11 @@ from instrumentserver.client import Client, ProxyInstrument from instrumentserver.helpers import nestedAttributeFromString -from labcore.data.datadict import DataDict -from labcore.data.datadict_storage import data_info -from labcore.measurement.storage import run_and_save_sweep -from labcore.measurement import Sweep -from labcore.utils.misc import get_environment_packages, commit_changes_in_repo +from .data.datadict import DataDict +from .data.datadict_storage import data_info +from .measurement.storage import run_and_save_sweep, run_and_save_sweep_swmr +from .measurement import Sweep +from .utils.misc import get_environment_packages, commit_changes_in_repo # constants WD = os.getcwd() @@ -29,7 +29,7 @@ class Options: options = Options() -# TODO: This should be a general helper, not from setup_measurements + def param_from_name(name: str, ): return nestedAttributeFromString(options.parameters, name) @@ -38,8 +38,8 @@ def getp(name: str, default=None, raise_if_missing=False): if options.parameters is None: logger.error("No parameter manager defined. cannot get/set params!") return None - - try: + + try: p = param_from_name(name) return p() except AttributeError: @@ -123,7 +123,7 @@ def find_or_create_remote_instrument(cli: Client, ins_name: str, ins_class: Opti return ins -def run_measurement(sweep: Sweep, name: str, safe_write_mode: bool = False, **kwargs) -> Tuple[Union[str, Path], Optional[DataDict]]: +def run_measurement(sweep: Sweep, name: str, safe_write_mode: bool = False, save_path=None, save_gridded: bool = False, swmr: bool = False, **kwargs) -> Tuple[Union[str, Path], Optional[DataDict]]: """ Wrapper function around run_and_save_sweep that makes sure you are saving your measurement with all the necessary metadata around it. @@ -168,15 +168,20 @@ def run_measurement(sweep: Sweep, name: str, safe_write_mode: bool = False, **kw raise RuntimeError(f"Could not find snapshot method for client {n}. Please update all packages.") kwargs['parameters'] = options.parameters.toParamDict - + py_env = get_environment_packages() current_dir = Path.cwd() commit_hash = commit_changes_in_repo(current_dir) - + if commit_hash is None: logger.warning("The current directory is not a git repository, your measurement code will not be tracked.") - + + if save_path is not None: # allow user to override the default save path + if not os.path.exists(save_path): + os.makedirs(save_path) + DATADIR = save_path + save_kwargs = { 'sweep': sweep, 'data_dir': DATADIR, @@ -189,13 +194,23 @@ def run_measurement(sweep: Sweep, name: str, safe_write_mode: bool = False, **kw if commit_hash is not None: save_kwargs['current_commit'] = {"measurement-hash": commit_hash} - data_location, data = run_and_save_sweep(**save_kwargs) + if swmr: + data_location, data = run_and_save_sweep_swmr(**save_kwargs) + else: + data_location, data = run_and_save_sweep(**save_kwargs) logger.info(f""" -========== -Saved data at {data_location}: -{data_info(data_location, do_print=False)} -=========""") - return data_location, data - + ========== + Saved data at {data_location}: + {data_info(data_location, do_print=False)} + =========""") + if save_gridded: + from labcore.data import ddh5_to_gridded_ddh5 + data_location = ddh5_to_gridded_ddh5(data_location / 'data.ddh5') + logger.info(f""" + ========== + Saved gridded data at {data_location}: + {data_info(data_location, fn= '', do_print=False)} + =========""") + return data_location, data