diff --git a/example.py b/example.py deleted file mode 100644 index 6a20967..0000000 --- a/example.py +++ /dev/null @@ -1,47 +0,0 @@ - -""" -Basic usage of PPK2 Python API. -The basic ampere mode sequence is: -1. read modifiers -2. set ampere mode -3. read stream of data -""" -import time -from ppk2_api.ppk2_api import PPK2_API - -ppk2s_connected = PPK2_API.list_devices() -if(len(ppk2s_connected) == 1): - ppk2_port = ppk2s_connected[0] - print(f'Found PPK2 at {ppk2_port}') -else: - print(f'Too many connected PPK2\'s: {ppk2s_connected}') - exit() - -ppk2_test = PPK2_API(ppk2_port) -ppk2_test.get_modifiers() -ppk2_test.use_ampere_meter() # set ampere meter mode -ppk2_test.toggle_DUT_power("OFF") # disable DUT power - -ppk2_test.start_measuring() # start measuring -# measurements are a constant stream of bytes -# the number of measurements in one sampling period depends on the wait between serial reads -# it appears the maximum number of bytes received is 1024 -# the sampling rate of the PPK2 is 100 samples per millisecond -for i in range(0, 1000): - read_data = ppk2_test.get_data() - if read_data != b'': - samples = ppk2_test.get_samples(read_data) - print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA") - time.sleep(0.01) - -ppk2_test.toggle_DUT_power("ON") - -ppk2_test.start_measuring() -for i in range(0, 1000): - read_data = ppk2_test.get_data() - if read_data != b'': - samples = ppk2_test.get_samples(read_data) - print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA") - time.sleep(0.001) # lower time between sampling -> less samples read in one sampling period - -ppk2_test.stop_measuring() \ No newline at end of file diff --git a/example_mp.py b/example_mp.py deleted file mode 100644 index bb084b4..0000000 --- a/example_mp.py +++ /dev/null @@ -1,49 +0,0 @@ - -""" -Basic usage of PPK2 Python API - multiprocessing version -The basic ampere mode sequence is: -1. read modifiers -2. set ampere mode -3. read stream of data -""" -import time -from ppk2_api.ppk2_api import PPK2_MP - -ppk2s_connected = PPK2_MP.list_devices() -if(len(ppk2s_connected) == 1): - ppk2_port = ppk2s_connected[0] - print(f'Found PPK2 at {ppk2_port}') -else: - print(f'Too many connected PPK2\'s: {ppk2s_connected}') - exit() - -ppk2_test = PPK2_MP(ppk2_port) -ppk2_test.get_modifiers() -ppk2_test.use_ampere_meter() # set ampere meter mode -ppk2_test.toggle_DUT_power("OFF") # disable DUT power - -ppk2_test.start_measuring() # start measuring - -# measurements are a constant stream of bytes -# multiprocessing variant starts a process in the background which constantly -# polls the device in order to prevent losing samples. It will buffer the -# last 10s (by default) of data so get_data() can be called less frequently. -for i in range(0, 10): - read_data = ppk2_test.get_data() - if read_data != b'': - samples = ppk2_test.get_samples(read_data) - print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA") - time.sleep(0.5) - -ppk2_test.toggle_DUT_power("ON") - -ppk2_test.start_measuring() -for i in range(0, 10): - read_data = ppk2_test.get_data() - if read_data != b'': - samples = ppk2_test.get_samples(read_data) - print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA") - time.sleep(0.5) # lower time between sampling -> less samples read in one sampling period - -ppk2_test.stop_measuring() - diff --git a/examples/example.py b/examples/example.py new file mode 100644 index 0000000..e89ef7b --- /dev/null +++ b/examples/example.py @@ -0,0 +1,70 @@ + +""" +Basic usage of PPK2 Python API. +The basic ampere mode sequence is: +1. read modifiers +2. set ampere mode +3. read stream of data +""" +import time +from ppk2_api.ppk2_api import PPK2_API + +ppk2s_connected = PPK2_API.list_devices() +if len(ppk2s_connected) == 1: + ppk2_port = ppk2s_connected[0][0] + ppk2_serial = ppk2s_connected[0][1] + print(f"Found PPK2 at {ppk2_port} with serial number {ppk2_serial}") +else: + print(f"Too many connected PPK2's: {ppk2s_connected}") + exit() + +ppk2_test = PPK2_API(ppk2_port, timeout=1, write_timeout=1, exclusive=True) +ppk2_test.get_modifiers() +ppk2_test.set_source_voltage(3300) + +ppk2_test.use_source_meter() # set source meter mode +ppk2_test.toggle_DUT_power("ON") # enable DUT power + +ppk2_test.start_measuring() # start measuring +# measurements are a constant stream of bytes +# the number of measurements in one sampling period depends on the wait between serial reads +# it appears the maximum number of bytes received is 1024 +# the sampling rate of the PPK2 is 100 samples per millisecond +for i in range(0, 1000): + read_data = ppk2_test.get_data() + if read_data != b'': + samples, raw_digital = ppk2_test.get_samples(read_data) + print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA") + + # Raw digital contains the raw digital data from the PPK2 + # The number of raw samples is equal to the number of samples in the samples list + # We have to process the raw digital data to get the actual digital data + digital_channels = ppk2_test.digital_channels(raw_digital) + for ch in digital_channels: + # Print last 10 values of each channel + print(ch[-10:]) + print() + time.sleep(0.01) + +ppk2_test.toggle_DUT_power("OFF") # disable DUT power + +ppk2_test.use_ampere_meter() # set ampere meter mode + +ppk2_test.start_measuring() +for i in range(0, 1000): + read_data = ppk2_test.get_data() + if read_data != b'': + samples, raw_digital = ppk2_test.get_samples(read_data) + print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA") + + # Raw digital contains the raw digital data from the PPK2 + # The number of raw samples is equal to the number of samples in the samples list + # We have to process the raw digital data to get the actual digital data + digital_channels = ppk2_test.digital_channels(raw_digital) + for ch in digital_channels: + # Print last 10 values of each channel + print(ch[-10:]) + print() + time.sleep(0.01) # lower time between sampling -> less samples read in one sampling period + +ppk2_test.stop_measuring() diff --git a/examples/example_measure_mw.py b/examples/example_measure_mw.py new file mode 100644 index 0000000..1694d26 --- /dev/null +++ b/examples/example_measure_mw.py @@ -0,0 +1,107 @@ +import time +import threading + +from ppk2_api.ppk2_api import PPK2_API + + +class PowerProfiler: + conversion_factor = 1_000_000 # uA to mW + + def __init__(self, voltage_mv=3700): + self.voltage_mv = voltage_mv + self.total_power_mW = 0 + self.total_samples = 0 + + self.lock = threading.Lock() + self.ppk2 = None + self.sampling_thread = None + self.sampling_enabled = False + + def _connect_ppk2(self): + ppk2s_connected = [] + #After reset, the PPK2 can take a moment to reappear on the system. We wait for it to show up for a few seconds before giving up. + for _ in range(10): + ppk2s_connected = PPK2_API.list_devices() + print(f"PPK2 devices found: {ppk2s_connected}") + if ppk2s_connected: + break + time.sleep(0.5) + + if not ppk2s_connected: + raise ConnectionError("No PPK2 devices found!") + + print(ppk2s_connected) + # Just select the first available PPK2 device + for ppk2_port_tuple in ppk2s_connected: + ppk2_port = ppk2_port_tuple[0] #Just get the port part of the tuple + print(f"Connecting to {ppk2_port}") + try: + self.ppk2 = PPK2_API(ppk2_port, timeout=1, write_timeout=1, exclusive=True) + self.ppk2.get_modifiers() + return + except Exception: + print(f"Failed to connect to {ppk2_port}") + self.ppk2 = None + + raise ConnectionError("Could not initialize any detected PPK2 device.") + + + def _setup_ppk2(self): + self._connect_ppk2() + + self.ppk2.set_source_voltage(self.voltage_mv) + self.ppk2.use_source_meter() + self.ppk2.toggle_DUT_power("ON") + + self.ppk2.start_measuring() + print("Initialized Power Profiler") + + + def _run_sampling(self): + try: + self._setup_ppk2() + while self.sampling_enabled: + time.sleep(0.01) + read_data = self.ppk2.get_data() + + if read_data == b"": + continue + + samples, raw_digital = self.ppk2.get_samples(read_data) + if not samples: + continue + + average_current_uA = sum(samples) / len(samples) + average_power_mW = (average_current_uA * self.voltage_mv) / self.conversion_factor + formatted_power = round(average_power_mW, 2) + + with self.lock: + self.total_power_mW += formatted_power + self.total_samples += 1 + average_of_averages_mW = self.total_power_mW / self.total_samples + + print(f"{formatted_power} mW, Avg: {average_of_averages_mW:.2f} mW") + + except Exception as e: + self.sampling_enabled = False + print(f"An error occurred: {e}") + + def start_sampling(self): + self.sampling_enabled = True + self.sampling_thread = threading.Thread(target=self._run_sampling, daemon=True) + self.sampling_thread.start() + + def stop_sampling(self): + self.sampling_enabled = False + self.sampling_thread.join() + + +def main(): + sampler = PowerProfiler(voltage_mv=3800) + sampler.start_sampling() + input("Press Enter to exit...\n") + sampler.stop_sampling() + + +if __name__ == "__main__": + main() diff --git a/examples/example_mp.py b/examples/example_mp.py new file mode 100644 index 0000000..523dcd9 --- /dev/null +++ b/examples/example_mp.py @@ -0,0 +1,78 @@ + +""" +Basic usage of PPK2 Python API - multiprocessing version. +The basic ampere mode sequence is: +1. read modifiers +2. set ampere mode +3. read stream of data +""" +import time +from ppk2_api.ppk2_api import PPK2_MP as PPK2_API + +ppk2s_connected = PPK2_API.list_devices() +if len(ppk2s_connected) == 1: + ppk2_port = ppk2s_connected[0][0] + ppk2_serial = ppk2s_connected[0][1] + print(f"Found PPK2 at {ppk2_port} with serial number {ppk2_serial}") +else: + print(f"Too many connected PPK2's: {ppk2s_connected}") + exit() + +ppk2_test = PPK2_API(ppk2_port, buffer_max_size_seconds=1, buffer_chunk_seconds=0.01, timeout=1, write_timeout=1, exclusive=True) +ppk2_test.get_modifiers() +ppk2_test.set_source_voltage(3300) + +""" +Source mode example +""" +ppk2_test.use_source_meter() # set source meter mode +ppk2_test.toggle_DUT_power("ON") # enable DUT power + +ppk2_test.start_measuring() # start measuring +# measurements are a constant stream of bytes +# the number of measurements in one sampling period depends on the wait between serial reads +# it appears the maximum number of bytes received is 1024 +# the sampling rate of the PPK2 is 100 samples per millisecond +while True: + read_data = ppk2_test.get_data() + if read_data != b'': + samples, raw_digital = ppk2_test.get_samples(read_data) + print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA") + + # Raw digital contains the raw digital data from the PPK2 + # The number of raw samples is equal to the number of samples in the samples list + # We have to process the raw digital data to get the actual digital data + digital_channels = ppk2_test.digital_channels(raw_digital) + for ch in digital_channels: + # Print last 10 values of each channel + print(ch[-10:]) + print() + + time.sleep(0.001) + +ppk2_test.toggle_DUT_power("OFF") # disable DUT power +ppk2_test.stop_measuring() + +""" +Ampere mode example +""" +ppk2_test.use_ampere_meter() # set ampere meter mode + +ppk2_test.start_measuring() +while True: + read_data = ppk2_test.get_data() + if read_data != b'': + samples, raw_digital = ppk2_test.get_samples(read_data) + print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA") + + # Raw digital contains the raw digital data from the PPK2 + # The number of raw samples is equal to the number of samples in the samples list + # We have to process the raw digital data to get the actual digital data + digital_channels = ppk2_test.digital_channels(raw_digital) + for ch in digital_channels: + # Print last 10 values of each channel + print(ch[-10:]) + print() + time.sleep(0.001) # lower time between sampling -> less samples read in one sampling period + +ppk2_test.stop_measuring() diff --git a/setup.py b/setup.py index 850f849..bab7e24 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ def read(*names, **kwargs): setup( name="ppk2-api", - version="0.0.2", + version="0.9.2", description="API for Nordic Semiconductor's Power Profiler Kit II (PPK 2).", url="https://github.com/IRNAS/ppk2-api-python", packages=find_packages("src"), diff --git a/src/power_profiler.py b/src/power_profiler.py index 7af0f6f..30d2810 100644 --- a/src/power_profiler.py +++ b/src/power_profiler.py @@ -26,48 +26,43 @@ def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None): self.ppk2 = PPK2_API(serial_port) try: - ret = self.ppk2.get_modifiers() # try to read modifiers, if it fails serial port is probably not correct - print(f"Initialized ppk2 api: {ret}") + self.ppk2.get_modifiers() # try to read modifiers, if it fails serial port is probably not correct + print("Initialized ppk2 api") except Exception as e: print(f"Error initializing power profiler: {e}") - ret = None raise e - if not ret: - self.ppk2 = None - raise Exception(f"Error when initing PowerProfiler with serial port {serial_port}") - else: - self.ppk2.use_source_meter() + self.ppk2.use_source_meter() - self.source_voltage_mV = source_voltage_mV + self.source_voltage_mV = source_voltage_mV - self.ppk2.set_source_voltage(self.source_voltage_mV) # set to 3.3V + self.ppk2.set_source_voltage(self.source_voltage_mV) # set to 3.3V - print(f"Set power profiler source voltage: {self.source_voltage_mV}") + print(f"Set power profiler source voltage: {self.source_voltage_mV}") - self.measuring = False - self.current_measurements = [] + self.measuring = False + self.current_measurements = [] - # local variables used to calculate power consumption - self.measurement_start_time = None - self.measurement_stop_time = None + # local variables used to calculate power consumption + self.measurement_start_time = None + self.measurement_stop_time = None - time.sleep(1) + time.sleep(1) - self.stop = False + self.stop = False - self.measurement_thread = Thread(target=self.measurement_loop, daemon=True) - self.measurement_thread.start() + self.measurement_thread = Thread(target=self.measurement_loop, daemon=True) + self.measurement_thread.start() - # write to csv - self.filename = filename - if self.filename is not None: - with open(self.filename, 'w', newline='') as file: - writer = csv.writer(file) - row = [] - for key in ["ts", "avg1000"]: - row.append(key) - writer.writerow(row) + # write to csv + self.filename = filename + if self.filename is not None: + with open(self.filename, 'w', newline='') as file: + writer = csv.writer(file) + row = [] + for key in ["ts", "avg1000"]: + row.append(key) + writer.writerow(row) def write_csv_rows(self, samples): """Write csv row""" @@ -127,7 +122,7 @@ def measurement_loop(self): if self.measuring: # read data if currently measuring read_data = self.ppk2.get_data() if read_data != b'': - samples = self.ppk2.get_samples(read_data) + samples, raw_digital = self.ppk2.get_samples(read_data) self.current_measurements += samples # can easily sum lists, will append individual data time.sleep(0.001) # TODO figure out correct sleep duration @@ -192,4 +187,4 @@ def get_average_charge_mC(self): def get_measurement_duration_s(self): """Returns duration of measurement""" measurement_duration_s = (self.measurement_stop_time - self.measurement_start_time) # measurement duration in seconds - return measurement_duration_s \ No newline at end of file + return measurement_duration_s diff --git a/src/ppk2_api/ppk2_api.py b/src/ppk2_api/ppk2_api.py index a636044..c6e5236 100644 --- a/src/ppk2_api/ppk2_api.py +++ b/src/ppk2_api/ppk2_api.py @@ -7,10 +7,15 @@ import time import serial import struct -import logging import os import queue -import multiprocessing +import threading +from typing import Dict, List, Optional, Tuple + +DeviceInfo = Tuple[str, str] +Mask = Dict[str, int] +RawMeasurement = Tuple[Optional[float], Optional[int]] +SampleBatch = Tuple[List[float], List[int]] class PPK2_Command(): """Serial command opcodes""" @@ -46,11 +51,21 @@ class PPK2_Modes(): class PPK2_API(): - def __init__(self, port): + def __init__(self, port: str, **kwargs) -> None: + ''' + port - port where PPK2 is connected + **kwargs - keyword arguments to pass to the pySerial constructor + ''' self.ser = None - self.ser = serial.Serial(port) + self.ser = serial.Serial(port, **kwargs) self.ser.baudrate = 9600 + # Give some time for the serial to connect + time.sleep(0.05) + # Residual data can be present after a bad shutdown of the PPK2. + # This can cause issues with parsing metadata and getting clean readings. + # To mitigate this, we clear the serial buffer at initialization to ensure we start with a clean slate. + self._empty_serial_buffer() self.modifiers = { "Calibrated": None, @@ -90,34 +105,42 @@ def __init__(self, port): # adc measurement buffer remainder and len of remainder self.remainder = {"sequence": b'', "len": 0} - - def __del__(self): + + def __del__(self) -> None: """Destructor""" try: + # reset device + self._write_serial((PPK2_Command.RESET,)) + if self.ser: self.ser.close() - except Exception as e: - logging.error(f"An error occured while closing ppk2_api: {e}") + except Exception: + # Destructors should not raise; cleanup is best-effort here. + pass - def _pack_struct(self, cmd_tuple): + def _pack_struct(self, cmd_tuple: Tuple[int, ...]) -> bytes: """Returns packed struct""" return struct.pack("B" * len(cmd_tuple), *cmd_tuple) - def _write_serial(self, cmd_tuple): + def _write_serial(self, cmd_tuple: Tuple[int, ...]) -> None: """Writes cmd bytes to serial""" - try: - cmd_packed = self._pack_struct(cmd_tuple) - self.ser.write(cmd_packed) - except Exception as e: - logging.error(f"An error occured when writing to serial port: {e}") + cmd_packed = self._pack_struct(cmd_tuple) + self.ser.write(cmd_packed) + + def _empty_serial_buffer(self) -> None: + #read until there is no more data in buffer + buf = 1 + while buf: + buf = self.ser.read_all() + time.sleep(0.01) - def _twos_comp(self, val): + def _twos_comp(self, val: int) -> int: """Compute the 2's complement of int32 value""" if (val & (1 << (32 - 1))) != 0: val = val - (1 << 32) # compute negative value return val - def _convert_source_voltage(self, mV): + def _convert_source_voltage(self, mV: int) -> Tuple[int, int]: """Convert input voltage to device command""" # minimal possible mV is 800 if mV < self.vdd_low: @@ -142,19 +165,31 @@ def _convert_source_voltage(self, mV): return set_b_1, set_b_2 - def _read_metadata(self): + def _read_metadata(self) -> str: """Read metadata""" # try to get metadata from device for _ in range(0, 5): - # it appears the second reading is the metadata read = self.ser.read(self.ser.in_waiting) time.sleep(0.1) - # TODO add a read_until serial read function with a timeout - if read != b'' and "END" in read.decode("utf-8"): - return read.decode("utf-8") + if not read: + continue # No data, try again + + # Try decoding the data + try: + metadata = read.decode("utf-8") + except UnicodeDecodeError: + # If decoding fails, try again in next iteration + continue + + # Check if the metadata is valid (i.e., contains "END") + if "END" in metadata: + return metadata - def _parse_metadata(self, metadata): + # If we exit the loop, it means we couldn't get valid metadata + raise ValueError("Could not retrieve valid metadata from the device.") + + def _parse_metadata(self, metadata: str) -> None: """Parse metadata and store it to modifiers""" # TODO handle more robustly try: @@ -174,25 +209,20 @@ def _parse_metadata(self, metadata): else: self.modifiers[key][str(ind)] = float( data_pair[1]) - return True except Exception as e: - # if exception triggers serial port is probably not correct - return None + raise ValueError("Could not parse metadata.") from e - def _generate_mask(self, bits, pos): + def _generate_mask(self, bits: int, pos: int) -> Mask: pos = pos mask = ((2**bits-1) << pos) mask = self._twos_comp(mask) return {"mask": mask, "pos": pos} - def _get_masked_value(self, value, meas): + def _get_masked_value(self, value: int, meas: Mask, is_bits: bool=False) -> int: masked_value = (value & meas["mask"]) >> meas["pos"] - if meas["pos"] == 24: - if masked_value == 255: - masked_value = -1 return masked_value - def _handle_raw_data(self, adc_value): + def _handle_raw_data(self, adc_value: int) -> RawMeasurement: """Convert raw value to analog value""" try: current_measurement_range = min(self._get_masked_value( @@ -201,34 +231,50 @@ def _handle_raw_data(self, adc_value): bits = self._get_masked_value(adc_value, self.MEAS_LOGIC) analog_value = self.get_adc_result( current_measurement_range, adc_result) * 10**6 - return analog_value - except Exception as e: - print("Measurement outside of range!") - return None + return analog_value, bits + except Exception: + return None, None @staticmethod - def list_devices(): + def list_devices() -> List[DeviceInfo]: import serial.tools.list_ports + ports = serial.tools.list_ports.comports() - if os.name == 'nt': - devices = [port.device for port in ports if port.description.startswith("nRF Connect USB CDC ACM")] + if os.name == "nt": + devices = [ + (port.device, (port.serial_number or "")[:8]) + for port in ports + if (port.description or "").startswith("nRF Connect USB CDC ACM") and (port.location or "").endswith("1") + ] else: - devices = [port.device for port in ports if port.product == 'PPK2'] + devices = [ + (port.device, (port.serial_number or "")[:8]) + for port in ports + if (port.product or "") == "PPK2" and (port.location or "").endswith("1") + ] return devices - def get_data(self): + + def get_data(self) -> bytes: """Return readings of one sampling period""" sampling_data = self.ser.read(self.ser.in_waiting) return sampling_data - def get_modifiers(self): - """Gets and sets modifiers from device memory""" + def get_modifiers(self) -> None: + """ + Retrieve and parse modifiers from the device memory. + """ + + # Send command to request metadata self._write_serial((PPK2_Command.GET_META_DATA, )) - metadata = self._read_metadata() - ret = self._parse_metadata(metadata) - return ret + try: + metadata = self._read_metadata() + self._parse_metadata(metadata) + except ValueError as e: + raise ValueError(f"Failed to read valid metadata: {e}") from e - def start_measuring(self): + + def start_measuring(self) -> None: """Start continuous measurement""" if not self.current_vdd: if self.mode == PPK2_Modes.SOURCE_MODE: @@ -238,11 +284,11 @@ def start_measuring(self): self._write_serial((PPK2_Command.AVERAGE_START, )) - def stop_measuring(self): + def stop_measuring(self) -> None: """Stop continuous measurement""" self._write_serial((PPK2_Command.AVERAGE_STOP, )) - def set_source_voltage(self, mV): + def set_source_voltage(self, mV: int) -> None: """Inits device - based on observation only REGULATOR_SET is the command. The other two values correspond to the voltage level. @@ -252,7 +298,7 @@ def set_source_voltage(self, mV): self._write_serial((PPK2_Command.REGULATOR_SET, b_1, b_2)) self.current_vdd = mV - def toggle_DUT_power(self, state): + def toggle_DUT_power(self, state: str) -> None: """Toggle DUT power based on parameter""" if state == "ON": self._write_serial( @@ -262,19 +308,19 @@ def toggle_DUT_power(self, state): self._write_serial( (PPK2_Command.DEVICE_RUNNING_SET, PPK2_Command.NO_OP)) # 12,0 - def use_ampere_meter(self): + def use_ampere_meter(self) -> None: """Configure device to use ampere meter""" self.mode = PPK2_Modes.AMPERE_MODE self._write_serial((PPK2_Command.SET_POWER_MODE, PPK2_Command.TRIGGER_SET)) # 17,1 - def use_source_meter(self): + def use_source_meter(self) -> None: """Configure device to use source meter""" self.mode = PPK2_Modes.SOURCE_MODE self._write_serial((PPK2_Command.SET_POWER_MODE, PPK2_Command.AVG_NUM_SET)) # 17,2 - def get_adc_result(self, current_range, adc_value): + def get_adc_result(self, current_range: int, adc_value: int) -> float: """Get result of adc conversion""" current_range = str(current_range) result_without_gain = (adc_value - self.modifiers["O"][current_range]) * ( @@ -319,11 +365,31 @@ def get_adc_result(self, current_range, adc_value): self.prev_range = current_range return adc - def _digital_to_analog(self, adc_value): + def _digital_to_analog(self, adc_value: bytes) -> int: """Convert discrete value to analog value""" return int.from_bytes(adc_value, byteorder="little", signed=False) # convert reading to analog value - def get_samples(self, buf): + def digital_channels(self, bits: List[int]) -> List[List[int]]: + """ + Convert raw digital data to digital channels. + + Returns a 2d matrix with 8 rows (one for each channel). Each row contains HIGH and LOW values for the selected channel. + """ + + # Prepare 2d matrix with 8 rows (one for each channel) + digital_channels = [[], [], [], [], [], [], [], []] + for sample in bits: + digital_channels[0].append((sample & 1) >> 0) + digital_channels[1].append((sample & 2) >> 1) + digital_channels[2].append((sample & 4) >> 2) + digital_channels[3].append((sample & 8) >> 3) + digital_channels[4].append((sample & 16) >> 4) + digital_channels[5].append((sample & 32) >> 5) + digital_channels[6].append((sample & 64) >> 6) + digital_channels[7].append((sample & 128) >> 7) + return digital_channels + + def get_samples(self, buf: bytes) -> SampleBatch: """ Returns list of samples read in one sampling period. The number of sampled values depends on the delay between serial reads. @@ -334,13 +400,16 @@ def get_samples(self, buf): sample_size = 4 # one analog value is 4 bytes in size offset = self.remainder["len"] samples = [] + raw_digital_output = [] first_reading = ( self.remainder["sequence"] + buf[0:sample_size-offset])[:4] adc_val = self._digital_to_analog(first_reading) - measurement = self._handle_raw_data(adc_val) + measurement, bits = self._handle_raw_data(adc_val) if measurement is not None: samples.append(measurement) + if bits is not None: + raw_digital_output.append(bits) offset = sample_size - offset @@ -348,21 +417,25 @@ def get_samples(self, buf): next_val = buf[offset:offset + sample_size] offset += sample_size adc_val = self._digital_to_analog(next_val) - measurement = self._handle_raw_data(adc_val) + measurement, bits = self._handle_raw_data(adc_val) if measurement is not None: samples.append(measurement) + if bits is not None: + raw_digital_output.append(bits) self.remainder["sequence"] = buf[offset:len(buf)] self.remainder["len"] = len(buf)-offset - return samples # return list of samples, handle those lists in PPK2 API wrapper + # return list of samples and raw digital outputs + # handle those lists in PPK2 API wrapper + return samples, raw_digital_output -class PPK_Fetch(multiprocessing.Process): +class PPK_Fetch(threading.Thread): ''' - Background process for polling the data in multiprocessing variant + Background process for polling the data in multi-threaded variant ''' - def __init__(self, ppk2, quit_evt, buffer_len_s=10, buffer_chunk_s=0.5): + def __init__(self, ppk2: "PPK2_API", quit_evt: threading.Event, buffer_len_s: float=10, buffer_chunk_s: float=0.5) -> None: super().__init__() self._ppk2 = ppk2 self._quit = quit_evt @@ -380,9 +453,9 @@ def __init__(self, ppk2, quit_evt, buffer_len_s=10, buffer_chunk_s=0.5): if self._buffer_chunk % 4 != 0: self._buffer_chunk = (self._buffer_chunk // 4) * 4 - self._buffer_q = multiprocessing.Queue() + self._buffer_q = queue.Queue() - def run(self): + def run(self) -> None: s = 0 t = time.time() local_buffer = b'' @@ -397,7 +470,6 @@ def run(self): self._buffer_q.get() local_buffer = local_buffer[self._buffer_chunk:] self._last_timestamp = tm_now - #print(len(d), len(local_buffer), self._buffer_q.qsize()) # calculate stats s += len(d) @@ -418,12 +490,12 @@ def run(self): except queue.Empty: break - def get_data(self): + def get_data(self) -> bytes: ret = b'' count = 0 while True: try: - ret += self._buffer_q.get(timeout=0.01) # get_nowait sometimes skips a chunk for some reason + ret += self._buffer_q.get(timeout=0.001) # get_nowait sometimes skips a chunk for some reason count += 1 except queue.Empty: break @@ -435,17 +507,21 @@ class PPK2_MP(PPK2_API): Multiprocessing variant of the object. The interface is the same as for the regular one except it spawns a background process on start_measuring() ''' - def __init__(self, port, buffer_seconds=10): + def __init__(self, port: str, buffer_max_size_seconds: float=10, buffer_chunk_seconds: float=0.1, **kwargs) -> None: ''' port - port where PPK2 is connected - buffer_seconds - how many seconds of data to keep in the buffer + buffer_max_size_seconds - how many seconds of data to keep in the buffer + buffer_chunk_seconds - how many seconds of data to put in the queue at once + **kwargs - keyword arguments to pass to the pySerial constructor ''' - super().__init__(port) + super().__init__(port, **kwargs) + self._fetcher = None - self._quit_evt = multiprocessing.Event() - self._buffer_seconds = buffer_seconds + self._quit_evt = threading.Event() + self._buffer_max_size_seconds = buffer_max_size_seconds + self._buffer_chunk_seconds = buffer_chunk_seconds - def __del__(self): + def __del__(self) -> None: """Destructor""" PPK2_API.stop_measuring(self) self._quit_evt.clear() @@ -456,7 +532,7 @@ def __del__(self): self._fetcher = None del self._fetcher - def start_measuring(self): + def start_measuring(self) -> None: # discard the data in the buffer self.stop_measuring() while self.get_data()!=b'': @@ -467,10 +543,10 @@ def start_measuring(self): if self._fetcher is not None: return - self._fetcher = PPK_Fetch(self, self._quit_evt, self._buffer_seconds) + self._fetcher = PPK_Fetch(self, self._quit_evt, self._buffer_max_size_seconds, self._buffer_chunk_seconds) self._fetcher.start() - def stop_measuring(self): + def stop_measuring(self) -> None: PPK2_API.stop_measuring(self) self.get_data() # flush the serial buffer (to prevent unicode error on next command) self._quit_evt.set() @@ -478,7 +554,7 @@ def stop_measuring(self): self._fetcher.join() # join() will block if the queue isn't empty self._fetcher = None - def get_data(self): + def get_data(self) -> bytes: try: return self._fetcher.get_data() except (TypeError, AttributeError):