Skip to content
2 changes: 2 additions & 0 deletions docs/release-notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ Improvements
- Fixed a bug where the PPS would crash if it was polled too frequently.
- Changed tolerances for PPS and DMM tests to more accurately match device accuracy.
- Created a new FixateError base class for all exceptions raised by fixate to use. It inherits from Exception instead of BaseExcepetion to improve error handling.
- DSO Driver function 'waveform_values' now returns a single channels x and y data as two separate lists, without re-acquiring the signal. This function should
now be called after performing signal acquisition.

*************
Version 0.6.4
Expand Down
156 changes: 112 additions & 44 deletions src/fixate/drivers/dso/agilent_mso_x.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import struct
import pyvisa
from fixate.core.exceptions import InstrumentError
from fixate.drivers.dso.helper import DSO
Expand Down Expand Up @@ -643,57 +642,126 @@ def waveform_preamble(self):
preamble[labels[index]] = val
return preamble

def waveform_values(self, signals, file_name="", file_type="csv"):
def waveform_values(self, signal, file_name="", file_type="csv"):
"""
:param signals:
The channel ie "1", "2", "3", "4", "MATH", "FUNC"
:param file_name:
If
:param file_type:
:return:
Retrieves currently present waveform data from the specified channel and optionally saves it to a file.
The oscilliscope must be in the stopped state to retrive waveform data.

This method queries the instrument for raw data points, scales them using
the waveform preamble (origin, increment, and reference values), and
converts them into time and voltage arrays.

Args:
signal (str|int): The source channel (e.g., 1, "2").
file_name (str, optional): The path/name of the file to save data to.
Defaults to "", which skips file saving.
file_type (str, optional): The format for the output file.
Supported: "csv". Defaults to "csv".

Returns:
tuple: A tuple containing (time_values, values) as lists of floats.

Raises:
ValueError: If no data is available on the selected channel.
NotImplementedError: If an unsupported file_type is requested.
"""
signals = self.digitize(signals)
return_vals = {}
for sig in signals:
return_vals[sig] = []
results = return_vals[sig]
self.write(":WAV:SOUR {}".format(sig))
self.write(":WAV:FORM BYTE")
self.write(":WAV:POIN:MODE RAW")
preamble = self.waveform_preamble()
data = self.retrieve_waveform_data()
for index, datum in enumerate(data):
time_val = index * preamble["x_increment"]
try:
# If the channel is not able to be converted to an int, then its almost definitely not an analogue source
# i.e. you might have requested "math" or "function" that is not supported by this method.
int_signal = int(signal)
except ValueError:
raise ValueError(
"Please select an analog channel. Math or function channels are not supported."
)

# Exit early if the requested channel is not currently displayed:
ch_state = int(self.instrument.query(f":CHANnel{int_signal}:DISPlay?"))
if not ch_state:
raise ValueError("Requested channel is not active!")

# Set the channel:
self.instrument.write(f":WAVeform:SOURce CHANnel{int_signal}")
# Explicitly set this to avoid confusion
self.instrument.write(":WAVeform:FORMat BYTE")
self.instrument.write(":WAVeform:UNSigned 0")

# Pick the points mode depending on the current acquisiton mode:
acq_type = str(self.instrument.query(":ACQuire:TYPE?")).strip("\n")
if acq_type == "AVER" or acq_type == "HRES":
points_mode = "NORMal"
# Use for Average and High Resoultion acquisition Types.
# If the :WAVeform:POINts:MODE is RAW, and the Acquisition Type is Average, the number of points available is 0. If :WAVeform:POINts:MODE is MAX, it may or may not return 0 points.
# If the :WAVeform:POINts:MODE is RAW, and the Acquisition Type is High Resolution, then the effect is (mostly) the same as if the Acq. Type was Normal (no box-car averaging).
# Note: if you use :SINGle to acquire the waveform in AVERage Acq. Type, no average is performed, and RAW works.
else:
points_mode = "RAW" # Use for Acq. Type NORMal or PEAK

# This command sets the points mode to MAX AND ensures that the maximum # of points to be transferred is set, though they must still be on screen
self.instrument.write(":WAVeform:POINts MAX")
# The above command sets the points mode to MAX. So we set it here to make sure its what we want.
self.instrument.write(":WAVeform:POINts:MODE " + points_mode)

# Check if there is actually data to acquire:
data_available = int(self.query(":WAVeform:POINTs?"))
if data_available == 0:
# No data is available
# Setting a channel to be a waveform source turns it on, so we need to turn it off now:
self.write(f":CHANnel{int_signal}:DISPlay OFF")
raise ValueError("No data is available")

preamble = self.waveform_preamble()
# Grab the data from the scope:
# datatype definition is "b" for byte. See struct module details.
data = self.instrument.query_binary_values(
":WAV:DATA?", datatype="b", is_big_endian=True
)

x = []
y = []
# Modify some things if we are in peak detect mode:
data_len = int(len(data) / 2) if acq_type == "PEAK" else len(data)
multiplier = 2 if acq_type == "PEAK" else 1
for i in range(data_len):
x_val = (i - preamble["x_reference"]) * preamble["x_increment"] + preamble[
"x_origin"
]

if acq_type == "PEAK":
# We need to double up on the time index
# In peak detect mode, the points come out as low(t1),high(t1),low(t2),high(t2)
y_min = (
preamble["y_origin"]
+ (data[i * multiplier] - preamble["y_reference"])
* preamble["y_increment"]
)
y_max = (
preamble["y_origin"]
+ (data[i * multiplier + 1] - preamble["y_reference"])
* preamble["y_increment"]
)
x.append(x_val)
x.append(x_val)
y.append(y_min)
y.append(y_max)

else:
y_val = (
preamble["y_origin"]
+ (datum - preamble["y_reference"]) * preamble["y_increment"]
+ (data[i] - preamble["y_reference"]) * preamble["y_increment"]
)
results.append((time_val, y_val))
if file_name and file_type == "csv": # Needs work for multiple references

x.append(x_val)
y.append(y_val)

if file_name and file_type == "csv":
with open(file_name, "w") as f:
f.write("x,y")
for label in sorted(preamble):
f.write(",{},{}".format(label, preamble[label]))
f.write("\n")
for time_val, y_val in enumerate(results):
f.write(
"{time_val},{voltage}\n".format(
time_val=time_val, voltage=y_val
)
)
f.write("x,y\n")
for x_val, y_val in zip(x, y):
f.write(f"{x_val},{y_val}")

elif file_name and file_type == "bin":
raise NotImplementedError("Binary Output not implemented")
return results

def retrieve_waveform_data(self):
self.instrument.write(":WAV:DATA?")
time.sleep(0.2)
data = self.read_raw()[:-1] # Strip \n
if data[0:1] != "#".encode():
raise InstrumentError("Pound Character missing in waveform data response")
valid_bytes = data[int(data[1:2]) + 2 :] # data[1] denotes length value digits
values = struct.unpack("%dB" % len(valid_bytes), valid_bytes)
return values
return x, y

def digitize(self, signals):
signals = [self.validate_signal(sig) for sig in signals]
Expand Down