diff --git a/.gitignore b/.gitignore index 5e08b19..5e01160 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ **/__pycache__/* src/ppk2_api.egg-info build -dist \ No newline at end of file +dist!/.gitignore +.idea \ No newline at end of file diff --git a/README.md b/README.md index ed0e941..12fdfb0 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,6 @@ ## Description The new Nordic Semiconductor's [Power Profiler Kit II (PPK 2)](https://www.nordicsemi.com/Software-and-tools/Development-Tools/Power-Profiler-Kit-2) is very useful for real time measurement of device power consumption. The official [nRF Connect Power Profiler tool](https://github.com/NordicSemiconductor/pc-nrfconnect-ppk) provides a friendly GUI with real-time data display. However there is no support for automated power monitoring. The puropose of this Python API is to enable automated power monitoring and data logging in Python applications. +Original description comes from https://docs.nordicsemi.com/bundle/ug_ppk2/page/UG/ppk/ppk_downloadable_content.html ![Power Profiler Kit II](https://github.com/IRNAS/ppk2-api-python/blob/master/images/power-profiler-kit-II.jpg) diff --git a/doc/PCA63100_Schematic_And_PCB.pdf b/doc/PCA63100_Schematic_And_PCB.pdf new file mode 100644 index 0000000..d31cb7b Binary files /dev/null and b/doc/PCA63100_Schematic_And_PCB.pdf differ diff --git a/doc/pca63100-power-profiler-kit-ii---hw-files---1_0_1.zip b/doc/pca63100-power-profiler-kit-ii---hw-files---1_0_1.zip new file mode 100644 index 0000000..8b27d42 Binary files /dev/null and b/doc/pca63100-power-profiler-kit-ii---hw-files---1_0_1.zip differ diff --git a/doc/ppk2_block_diagram.svg b/doc/ppk2_block_diagram.svg new file mode 100644 index 0000000..9a582e7 --- /dev/null +++ b/doc/ppk2_block_diagram.svg @@ -0,0 +1,371 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Page-1 + + + + + Nordic Blue + USB data and power supply + + + + USB data and power supply + + Nordic Blue.7 + USB power supply + + + + USB power supply + + Custom 2 + MUX + + + + + + + + + + + + + + + + + + + + MUX + + Nordic Blue.14 + Adjustable Regulator + + + + Adjustable Regulator + + Dynamic connector + + + + Dynamic connector.17 + + + + Dynamic connector.18 + + + + Custom 2.19 + MUX + + + + + + + + + + + + + + + + + + + + MUX + + Nordic Blue.25 + Power in + + + + Power in + + Dynamic connector.26 + + + + Sheet.27 + Max 5 V + + + + Max 5 V + + Nordic Lake + SoC + + + + SoC + + Dynamic connector.29 + + + + Sheet.30 + USB CDC Communication + + + + USB CDC Communication + + Dynamic connector.31 + + + + Sheet.33 + Voltage Control + + + + Voltage Control + + Dynamic connector.34 + + + + Sheet.35 + Input control + + + + Input control + + Sheet.36 + + + + + + + + + Dynamic connector.41 + + + + Sheet.42 + Temperature sensor + + + + Temperature sensor + + Dynamic connector.43 + + + + Dynamic connector.44 + + + + Sheet.46 + Voltage measurement + + + + Voltage measurement + + Dynamic connector.52 + + + + Sheet.57 + Current measurement + + + + Current measurement + + Dynamic connector.58 + + + + Nordic Blue.66 + Logic port + + + + Logic port + + Dynamic connector.74 + + + + Sheet.75 + Output control + + + + Output control + + Nordic Blue.77 + LEDs + + + + LEDs + + Nordic Blue.78 + EEPROM + + + + EEPROM + + Nordic Blue.79 + Level Shifter + + + + Level Shifter + + Dynamic connector.81 + + + + Signal, shares physical pin with other signal.225 + + + + Dynamic connector.88 + + + + Signal, shares physical pin with other signal.89 + + + + Sheet.91 + + Nordic Red.219 + + + + Sheet.40 + + + + + + Sheet.92 + + Nordic Blueslate + + + + Nordic Blue.48 + Measurement circuitry + + + + Measurement circuitry + + Nordic Blue.49 + Automatic switch circuitry + + + + Automatic switch circuitry + + + Nordic Blue.65 + DUT + + + + DUT + + Dynamic connector.68 + + + + Dynamic connector.69 + + + + Dynamic connector.67 + + + + Dynamic connector.71 + + + + Sheet.95 + 8-bit bidirectional port + + + + 8-bit bidirectional port + + Sheet.98 + 0.8 V-5.0 V + + + + 0.8 V-5.0 V + + \ No newline at end of file diff --git a/doc/ppk2_pcb1a_t.png b/doc/ppk2_pcb1a_t.png new file mode 100644 index 0000000..96bdcf5 Binary files /dev/null and b/doc/ppk2_pcb1a_t.png differ diff --git a/doc/ppk2_pcb2_t.png b/doc/ppk2_pcb2_t.png new file mode 100644 index 0000000..9bc3015 Binary files /dev/null and b/doc/ppk2_pcb2_t.png differ diff --git a/doc/ppk2_userguide_1.0.1.pdf b/doc/ppk2_userguide_1.0.1.pdf new file mode 100644 index 0000000..f92672d Binary files /dev/null and b/doc/ppk2_userguide_1.0.1.pdf differ diff --git a/example.py b/example.py index e89ef7b..ff7cdf2 100644 --- a/example.py +++ b/example.py @@ -1,4 +1,3 @@ - """ Basic usage of PPK2 Python API. The basic ampere mode sequence is: @@ -6,6 +5,7 @@ 2. set ampere mode 3. read stream of data """ + import time from ppk2_api.ppk2_api import PPK2_API @@ -22,17 +22,20 @@ ppk2_test.get_modifiers() ppk2_test.set_source_voltage(3300) -ppk2_test.use_source_meter() # set source meter mode -ppk2_test.toggle_DUT_power("ON") # enable DUT power +# set source meter mode +ppk2_test.use_source_meter() +# enable DUT power +ppk2_test.toggle_DUT_power("ON") +# start measuring +ppk2_test.start_measuring() -ppk2_test.start_measuring() # start measuring # measurements are a constant stream of bytes # the number of measurements in one sampling period depends on the wait between serial reads # it appears the maximum number of bytes received is 1024 # the sampling rate of the PPK2 is 100 samples per millisecond for i in range(0, 1000): read_data = ppk2_test.get_data() - if read_data != b'': + if read_data != b"": samples, raw_digital = ppk2_test.get_samples(read_data) print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA") @@ -46,14 +49,15 @@ print() time.sleep(0.01) -ppk2_test.toggle_DUT_power("OFF") # disable DUT power - -ppk2_test.use_ampere_meter() # set ampere meter mode +# disable DUT power +ppk2_test.toggle_DUT_power("OFF") +# set ampere meter mode +ppk2_test.use_ampere_meter() ppk2_test.start_measuring() for i in range(0, 1000): read_data = ppk2_test.get_data() - if read_data != b'': + if read_data != b"": samples, raw_digital = ppk2_test.get_samples(read_data) print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA") @@ -65,6 +69,8 @@ # Print last 10 values of each channel print(ch[-10:]) print() - time.sleep(0.01) # lower time between sampling -> less samples read in one sampling period -ppk2_test.stop_measuring() + # lower time between sampling -> less samples read in one sampling period + time.sleep(0.01) + +ppk2_test.stop_measuring() \ No newline at end of file diff --git a/example_mp.py b/example_mp.py index 523dcd9..9de3c1c 100644 --- a/example_mp.py +++ b/example_mp.py @@ -1,4 +1,3 @@ - """ Basic usage of PPK2 Python API - multiprocessing version. The basic ampere mode sequence is: @@ -6,6 +5,7 @@ 2. set ampere mode 3. read stream of data """ + import time from ppk2_api.ppk2_api import PPK2_MP as PPK2_API @@ -18,24 +18,34 @@ print(f"Too many connected PPK2's: {ppk2s_connected}") exit() -ppk2_test = PPK2_API(ppk2_port, buffer_max_size_seconds=1, buffer_chunk_seconds=0.01, timeout=1, write_timeout=1, exclusive=True) +ppk2_test = PPK2_API( + ppk2_port, + buffer_max_size_seconds=1, + buffer_chunk_seconds=0.01, + timeout=1, + write_timeout=1, + exclusive=True, +) ppk2_test.get_modifiers() ppk2_test.set_source_voltage(3300) """ Source mode example """ -ppk2_test.use_source_meter() # set source meter mode -ppk2_test.toggle_DUT_power("ON") # enable DUT power +# set source meter mode +ppk2_test.use_source_meter() +# enable DUT power +ppk2_test.toggle_DUT_power("ON") +# start measuring +ppk2_test.start_measuring() -ppk2_test.start_measuring() # start measuring # measurements are a constant stream of bytes # the number of measurements in one sampling period depends on the wait between serial reads # it appears the maximum number of bytes received is 1024 # the sampling rate of the PPK2 is 100 samples per millisecond while True: read_data = ppk2_test.get_data() - if read_data != b'': + if read_data != b"": samples, raw_digital = ppk2_test.get_samples(read_data) print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA") @@ -50,18 +60,21 @@ time.sleep(0.001) -ppk2_test.toggle_DUT_power("OFF") # disable DUT power + +# disable DUT power +ppk2_test.toggle_DUT_power("OFF") ppk2_test.stop_measuring() """ Ampere mode example """ -ppk2_test.use_ampere_meter() # set ampere meter mode +# set ampere meter mode +ppk2_test.use_ampere_meter() ppk2_test.start_measuring() while True: read_data = ppk2_test.get_data() - if read_data != b'': + if read_data != b"": samples, raw_digital = ppk2_test.get_samples(read_data) print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA") @@ -73,6 +86,8 @@ # Print last 10 values of each channel print(ch[-10:]) print() - time.sleep(0.001) # lower time between sampling -> less samples read in one sampling period -ppk2_test.stop_measuring() + # lower time between sampling -> less samples read in one sampling period + time.sleep(0.001) + +ppk2_test.stop_measuring() \ No newline at end of file diff --git a/setup.py b/setup.py index bab7e24..5c9308a 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ def read(*names, **kwargs): name="ppk2-api", version="0.9.2", description="API for Nordic Semiconductor's Power Profiler Kit II (PPK 2).", - url="https://github.com/IRNAS/ppk2-api-python", + url="https://github.com/RolandWa/ppk2-api-python", packages=find_packages("src"), package_dir={"": "src"}, py_modules=[splitext(basename(path))[0] for path in glob("src/*.py")], diff --git a/src/power_profiler.py b/src/power_profiler.py index 7af0f6f..6d9b12d 100644 --- a/src/power_profiler.py +++ b/src/power_profiler.py @@ -2,19 +2,21 @@ import csv import datetime from threading import Thread + # import numpy as np # import matplotlib.pyplot as plt # import matplotlib from ppk2_api.ppk2_api import PPK2_MP as PPK2_API -class PowerProfiler(): + +class PowerProfiler: def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None): """Initialize PPK2 power profiler with serial""" self.measuring = None self.measurement_thread = None self.ppk2 = None - print(f"Initing power profiler") + print("Initing power profiler") # try: if serial_port: @@ -26,7 +28,8 @@ def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None): self.ppk2 = PPK2_API(serial_port) try: - ret = self.ppk2.get_modifiers() # try to read modifiers, if it fails serial port is probably not correct + # try to read modifiers, if it fails serial port is probably not correct + ret = self.ppk2.get_modifiers() print(f"Initialized ppk2 api: {ret}") except Exception as e: print(f"Error initializing power profiler: {e}") @@ -35,13 +38,16 @@ def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None): if not ret: self.ppk2 = None - raise Exception(f"Error when initing PowerProfiler with serial port {serial_port}") + raise Exception( + f"Error when initing PowerProfiler with serial port {serial_port}" + ) else: self.ppk2.use_source_meter() self.source_voltage_mV = source_voltage_mV - self.ppk2.set_source_voltage(self.source_voltage_mV) # set to 3.3V + # set to 3.3V + self.ppk2.set_source_voltage(self.source_voltage_mV) print(f"Set power profiler source voltage: {self.source_voltage_mV}") @@ -62,7 +68,7 @@ def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None): # write to csv self.filename = filename if self.filename is not None: - with open(self.filename, 'w', newline='') as file: + with open(self.filename, "w", newline="") as file: writer = csv.writer(file) row = [] for key in ["ts", "avg1000"]: @@ -71,10 +77,10 @@ def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None): def write_csv_rows(self, samples): """Write csv row""" - with open(self.filename, 'a', newline='') as file: + with open(self.filename, "a", newline="") as file: writer = csv.writer(file) for sample in samples: - row = [datetime.datetime.now().strftime('%d-%m-%Y %H:%M:%S.%f'), sample] + row = [datetime.datetime.now().strftime("%d-%m-%Y %H:%M:%S.%f"), sample] writer.writerow(row) def delete_power_profiler(self): @@ -85,26 +91,26 @@ def delete_power_profiler(self): print("Deleting power profiler") if self.measurement_thread: - print(f"Joining measurement thread") + print("Joining measurement thread") self.measurement_thread.join() self.measurement_thread = None if self.ppk2: - print(f"Disabling ppk2 power") + print("Disabling ppk2 power") self.disable_power() del self.ppk2 - print(f"Deleted power profiler") + print("Deleted power profiler") def discover_port(self): """Discovers ppk2 serial port""" ppk2s_connected = PPK2_API.list_devices() - if(len(ppk2s_connected) == 1): + if len(ppk2s_connected) == 1: ppk2_port = ppk2s_connected[0] - print(f'Found PPK2 at {ppk2_port}') + print(f"Found PPK2 at {ppk2_port}") return ppk2_port else: - print(f'Too many connected PPK2\'s: {ppk2s_connected}') + print(f"Too many connected PPK2's: {ppk2s_connected}") return None def enable_power(self): @@ -124,16 +130,21 @@ def disable_power(self): def measurement_loop(self): """Endless measurement loop will run in a thread""" while True and not self.stop: - if self.measuring: # read data if currently measuring + # read data if currently measuring + if self.measuring: read_data = self.ppk2.get_data() - if read_data != b'': + if read_data != b"": samples = self.ppk2.get_samples(read_data) - self.current_measurements += samples # can easily sum lists, will append individual data - time.sleep(0.001) # TODO figure out correct sleep duration + # can easily sum lists, will append individual data + self.current_measurements += samples + # TODO figure out correct sleep duration + time.sleep(0.001) def _average_samples(self, list, window_size): """Average samples based on window size""" - chunks = [list[val:val + window_size] for val in range(0, len(list), window_size)] + chunks = [ + list[val : val + window_size] for val in range(0, len(list), window_size) + ] avgs = [] for chunk in chunks: avgs.append(sum(chunk) / len(chunk)) @@ -142,19 +153,24 @@ def _average_samples(self, list, window_size): def start_measuring(self): """Start measuring""" - if not self.measuring: # toggle measuring flag only if currently not measuring - self.current_measurements = [] # reset current measurements - self.measuring = True # set internal flag - self.ppk2.start_measuring() # send command to ppk2 + # toggle measuring flag only if currently not measuring + if not self.measuring: + # reset current measurements + self.current_measurements = [] + # set internal flag + self.measuring = True + # send command to ppk2 + self.ppk2.start_measuring() self.measurement_start_time = time.time() def stop_measuring(self): """Stop measuring and return average of period""" self.measurement_stop_time = time.time() self.measuring = False - self.ppk2.stop_measuring() # send command to ppk2 + # send command to ppk2 + self.ppk2.stop_measuring() - #samples_average = self._average_samples(self.current_measurements, 1000) + # samples_average = self._average_samples(self.current_measurements, 1000) if self.filename is not None: self.write_csv_rows(self.current_measurements) @@ -172,24 +188,33 @@ def get_average_current_mA(self): if len(self.current_measurements) == 0: return 0 - average_current_mA = (sum(self.current_measurements) / len(self.current_measurements)) / 1000 # measurements are in microamperes, divide by 1000 + # measurements are in microamperes, divide by 1000 + average_current_mA = ( + sum(self.current_measurements) / len(self.current_measurements) + ) / 1000 return average_current_mA def get_average_power_consumption_mWh(self): """Return average power consumption of last measurement in mWh""" average_current_mA = self.get_average_current_mA() - average_power_mW = (self.source_voltage_mV / 1000) * average_current_mA # divide by 1000 as source voltage is in millivolts - this gives us milliwatts - measurement_duration_h = self.get_measurement_duration_s() / 3600 # duration in seconds, divide by 3600 to get hours + # divide by 1000 as source voltage is in millivolts - this gives us milliwatts + average_power_mW = (self.source_voltage_mV / 1000) * average_current_mA + # duration in seconds, divide by 3600 to get hours + measurement_duration_h = self.get_measurement_duration_s() / 3600 average_consumption_mWh = average_power_mW * measurement_duration_h return average_consumption_mWh def get_average_charge_mC(self): """Returns average charge in milli coulomb""" average_current_mA = self.get_average_current_mA() - measurement_duration_s = self.get_measurement_duration_s() # in seconds + # in seconds + measurement_duration_s = self.get_measurement_duration_s() return average_current_mA * measurement_duration_s def get_measurement_duration_s(self): """Returns duration of measurement""" - measurement_duration_s = (self.measurement_stop_time - self.measurement_start_time) # measurement duration in seconds + # measurement duration in seconds + measurement_duration_s = ( + self.measurement_stop_time - self.measurement_start_time + ) return measurement_duration_s \ No newline at end of file diff --git a/src/ppk2_api/__init__.py b/src/ppk2_api/__init__.py index e69de29..afc9510 100644 --- a/src/ppk2_api/__init__.py +++ b/src/ppk2_api/__init__.py @@ -0,0 +1 @@ +from .ppk2_api import PPK2_MP \ No newline at end of file diff --git a/src/ppk2_api/ppk2_api.py b/src/ppk2_api/ppk2_api.py index 4b39a69..9c222f7 100644 --- a/src/ppk2_api/ppk2_api.py +++ b/src/ppk2_api/ppk2_api.py @@ -12,8 +12,10 @@ import queue import threading -class PPK2_Command(): + +class PPK2_Command: """Serial command opcodes""" + NO_OP = 0x00 TRIGGER_SET = 0x01 AVG_NUM_SET = 0x02 # no-firmware @@ -24,11 +26,11 @@ class PPK2_Command(): AVERAGE_STOP = 0x07 RANGE_SET = 0x08 LCD_SET = 0x09 - TRIGGER_STOP = 0x0a - DEVICE_RUNNING_SET = 0x0c - REGULATOR_SET = 0x0d - SWITCH_POINT_DOWN = 0x0e - SWITCH_POINT_UP = 0x0f + TRIGGER_STOP = 0x0A + DEVICE_RUNNING_SET = 0x0C + REGULATOR_SET = 0x0D + SWITCH_POINT_DOWN = 0x0E + SWITCH_POINT_UP = 0x0F TRIGGER_EXT_TOGGLE = 0x11 SET_POWER_MODE = 0x11 RES_USER_SET = 0x12 @@ -39,18 +41,19 @@ class PPK2_Command(): SET_USER_GAINS = 0x25 -class PPK2_Modes(): +class PPK2_Modes: """PPK2 measurement modes""" + AMPERE_MODE = "AMPERE_MODE" SOURCE_MODE = "SOURCE_MODE" -class PPK2_API(): +class PPK2_API: def __init__(self, port: str, **kwargs): - ''' + """ port - port where PPK2 is connected **kwargs - keyword arguments to pass to the pySerial constructor - ''' + """ self.ser = None self.ser = serial.Serial(port, **kwargs) @@ -66,7 +69,7 @@ def __init__(self, port: str, **kwargs): "I": {"0": 0, "1": 0, "2": 0, "3": 0, "4": 0}, "UG": {"0": 1, "1": 1, "2": 1, "3": 1, "4": 1}, "HW": None, - "IA": None + "IA": None, } self.vdd_low = 800 @@ -93,7 +96,7 @@ def __init__(self, port: str, **kwargs): self.after_spike = 0 # adc measurement buffer remainder and len of remainder - self.remainder = {"sequence": b'', "len": 0} + self.remainder = {"sequence": b"", "len": 0} def __del__(self): """Destructor""" @@ -121,7 +124,8 @@ def _write_serial(self, cmd_tuple): def _twos_comp(self, val): """Compute the 2's complement of int32 value""" if (val & (1 << (32 - 1))) != 0: - val = val - (1 << 32) # compute negative value + # compute negative value + val = val - (1 << 32) return val def _convert_source_voltage(self, mV): @@ -138,11 +142,13 @@ def _convert_source_voltage(self, mV): # get difference to baseline (the baseline is 800mV but the initial offset is 32) diff_to_baseline = mV - self.vdd_low + offset base_b_1 = 3 - base_b_2 = 0 # is actually 32 - compensated with above offset + # is actually 32 - compensated with above offset + base_b_2 = 0 # get the number of times we have to increase the first byte of the command ratio = int(diff_to_baseline / 256) - remainder = diff_to_baseline % 256 # get the remainder for byte 2 + # get the remainder for byte 2 + remainder = diff_to_baseline % 256 set_b_1 = base_b_1 + ratio set_b_2 = base_b_2 + remainder @@ -158,7 +164,7 @@ def _read_metadata(self): time.sleep(0.1) # TODO add a read_until serial read function with a timeout - if read != b'' and "END" in read.decode("utf-8"): + if read != b"" and "END" in read.decode("utf-8"): return read.decode("utf-8") def _parse_metadata(self, metadata): @@ -172,23 +178,22 @@ def _parse_metadata(self, metadata): if key == data_pair[0]: self.modifiers[key] = data_pair[1] for ind in range(0, 5): - if key+str(ind) == data_pair[0]: + if key + str(ind) == data_pair[0]: if "R" in data_pair[0]: # problem on some PPK2s with wrong calibration values - this doesn't fix it if float(data_pair[1]) != 0: - self.modifiers[key][str(ind)] = float( - data_pair[1]) + self.modifiers[key][str(ind)] = float(data_pair[1]) else: - self.modifiers[key][str(ind)] = float( - data_pair[1]) + self.modifiers[key][str(ind)] = float(data_pair[1]) return True except Exception as e: # if exception triggers serial port is probably not correct + print(f"Failde to parse metadata: {e}") return None def _generate_mask(self, bits, pos): pos = pos - mask = ((2**bits-1) << pos) + mask = (2**bits - 1) << pos mask = self._twos_comp(mask) return {"mask": mask, "pos": pos} @@ -199,15 +204,18 @@ def _get_masked_value(self, value, meas, is_bits=False): def _handle_raw_data(self, adc_value): """Convert raw value to analog value""" try: - current_measurement_range = min(self._get_masked_value( - adc_value, self.MEAS_RANGE), 4) # 5 is the number of parameters + # 5 is the number of parameters + current_measurement_range = min( + self._get_masked_value(adc_value, self.MEAS_RANGE), 4 + ) adc_result = self._get_masked_value(adc_value, self.MEAS_ADC) * 4 bits = self._get_masked_value(adc_value, self.MEAS_LOGIC) - analog_value = self.get_adc_result( - current_measurement_range, adc_result) * 10**6 + analog_value = ( + self.get_adc_result(current_measurement_range, adc_result) * 10**6 + ) return analog_value, bits except Exception as e: - print("Measurement outside of range!") + print(f"Measurement outside of range! {e}") return None, None @staticmethod @@ -219,7 +227,8 @@ def list_devices(): devices = [ (port.device, port.serial_number[:8]) for port in ports - if port.description.startswith("nRF Connect USB CDC ACM") and port.location.endswith("1") + if port.description.startswith("nRF Connect USB CDC ACM") + and port.location.endswith("1") ] else: devices = [ @@ -236,7 +245,7 @@ def get_data(self): def get_modifiers(self): """Gets and sets modifiers from device memory""" - self._write_serial((PPK2_Command.GET_META_DATA, )) + self._write_serial((PPK2_Command.GET_META_DATA,)) metadata = self._read_metadata() ret = self._parse_metadata(metadata) return ret @@ -249,14 +258,14 @@ def start_measuring(self): if self.mode == PPK2_Modes.AMPERE_MODE: raise Exception("Input voltage not set!") - self._write_serial((PPK2_Command.AVERAGE_START, )) + self._write_serial((PPK2_Command.AVERAGE_START,)) def stop_measuring(self): """Stop continuous measurement""" - self._write_serial((PPK2_Command.AVERAGE_STOP, )) + self._write_serial((PPK2_Command.AVERAGE_STOP,)) def set_source_voltage(self, mV): - """Inits device - based on observation only REGULATOR_SET is the command. + """Inits device - based on observation only REGULATOR_SET is the command. The other two values correspond to the voltage level. 800mV is the lowest setting - [3,32] - the values then increase linearly @@ -268,32 +277,44 @@ def set_source_voltage(self, mV): def toggle_DUT_power(self, state): """Toggle DUT power based on parameter""" if state == "ON": + # 12,1 self._write_serial( - (PPK2_Command.DEVICE_RUNNING_SET, PPK2_Command.TRIGGER_SET)) # 12,1 + (PPK2_Command.DEVICE_RUNNING_SET, PPK2_Command.TRIGGER_SET) + ) if state == "OFF": - self._write_serial( - (PPK2_Command.DEVICE_RUNNING_SET, PPK2_Command.NO_OP)) # 12,0 + # 12,0 + self._write_serial((PPK2_Command.DEVICE_RUNNING_SET, PPK2_Command.NO_OP)) def use_ampere_meter(self): """Configure device to use ampere meter""" self.mode = PPK2_Modes.AMPERE_MODE - self._write_serial((PPK2_Command.SET_POWER_MODE, - PPK2_Command.TRIGGER_SET)) # 17,1 + # 17,1 + self._write_serial((PPK2_Command.SET_POWER_MODE, PPK2_Command.TRIGGER_SET)) def use_source_meter(self): """Configure device to use source meter""" self.mode = PPK2_Modes.SOURCE_MODE - self._write_serial((PPK2_Command.SET_POWER_MODE, - PPK2_Command.AVG_NUM_SET)) # 17,2 + # 17,2 + self._write_serial((PPK2_Command.SET_POWER_MODE, PPK2_Command.AVG_NUM_SET)) def get_adc_result(self, current_range, adc_value): """Get result of adc conversion""" current_range = str(current_range) result_without_gain = (adc_value - self.modifiers["O"][current_range]) * ( - self.adc_mult / self.modifiers["R"][current_range]) - adc = self.modifiers["UG"][current_range] * (result_without_gain * (self.modifiers["GS"][current_range] * result_without_gain + self.modifiers["GI"][current_range]) + ( - self.modifiers["S"][current_range] * (self.current_vdd / 1000) + self.modifiers["I"][current_range])) + self.adc_mult / self.modifiers["R"][current_range] + ) + adc = self.modifiers["UG"][current_range] * ( + result_without_gain + * ( + self.modifiers["GS"][current_range] * result_without_gain + + self.modifiers["GI"][current_range] + ) + + ( + self.modifiers["S"][current_range] * (self.current_vdd / 1000) + + self.modifiers["I"][current_range] + ) + ) prev_rolling_avg = self.rolling_avg prev_rolling_avg4 = self.rolling_avg4 @@ -302,12 +323,18 @@ def get_adc_result(self, current_range, adc_value): if self.rolling_avg is None: self.rolling_avg = adc else: - self.rolling_avg = self.spike_filter_alpha * adc + (1 - self.spike_filter_alpha) * self.rolling_avg - + self.rolling_avg = ( + self.spike_filter_alpha * adc + + (1 - self.spike_filter_alpha) * self.rolling_avg + ) + if self.rolling_avg4 is None: self.rolling_avg4 = adc else: - self.rolling_avg4 = self.spike_filter_alpha5 * adc + (1 - self.spike_filter_alpha5) * self.rolling_avg4 + self.rolling_avg4 = ( + self.spike_filter_alpha5 * adc + + (1 - self.spike_filter_alpha5) * self.rolling_avg4 + ) if self.prev_range is None: self.prev_range = current_range @@ -326,7 +353,7 @@ def get_adc_result(self, current_range, adc_value): adc = self.rolling_avg4 else: adc = self.rolling_avg - + self.after_spike -= 1 self.prev_range = current_range @@ -334,7 +361,8 @@ def get_adc_result(self, current_range, adc_value): def _digital_to_analog(self, adc_value): """Convert discrete value to analog value""" - return int.from_bytes(adc_value, byteorder="little", signed=False) # convert reading to analog value + # convert reading to analog value + return int.from_bytes(adc_value, byteorder="little", signed=False) def digital_channels(self, bits): """ @@ -363,14 +391,13 @@ def get_samples(self, buf): Manipulation of samples is left to the user. See example for more info. """ - - sample_size = 4 # one analog value is 4 bytes in size + # one analog value is 4 bytes in size + sample_size = 4 offset = self.remainder["len"] samples = [] raw_digital_output = [] - first_reading = ( - self.remainder["sequence"] + buf[0:sample_size-offset])[:4] + first_reading = (self.remainder["sequence"] + buf[0 : sample_size - offset])[:4] adc_val = self._digital_to_analog(first_reading) measurement, bits = self._handle_raw_data(adc_val) if measurement is not None: @@ -381,7 +408,7 @@ def get_samples(self, buf): offset = sample_size - offset while offset <= len(buf) - sample_size: - next_val = buf[offset:offset + sample_size] + next_val = buf[offset : offset + sample_size] offset += sample_size adc_val = self._digital_to_analog(next_val) measurement, bits = self._handle_raw_data(adc_val) @@ -390,18 +417,19 @@ def get_samples(self, buf): if bits is not None: raw_digital_output.append(bits) - self.remainder["sequence"] = buf[offset:len(buf)] - self.remainder["len"] = len(buf)-offset + self.remainder["sequence"] = buf[offset : len(buf)] + self.remainder["len"] = len(buf) - offset # return list of samples and raw digital outputs # handle those lists in PPK2 API wrapper - return samples, raw_digital_output + return samples, raw_digital_output class PPK_Fetch(threading.Thread): - ''' + """ Background process for polling the data in multi-threaded variant - ''' + """ + def __init__(self, ppk2, quit_evt, buffer_len_s=10, buffer_chunk_s=0.5): super().__init__() self._ppk2 = ppk2 @@ -411,8 +439,10 @@ def __init__(self, ppk2, quit_evt, buffer_len_s=10, buffer_chunk_s=0.5): self._stats = (None, None) self._last_timestamp = 0 - self._buffer_max_len = int(buffer_len_s * 100000 * 4) # 100k 4-byte samples per second - self._buffer_chunk = int(buffer_chunk_s * 100000 * 4) # put in the queue in chunks of 0.5s + # 100k 4-byte samples per second + self._buffer_max_len = int(buffer_len_s * 100000 * 4) + # put in the queue in chunks of 0.5s + self._buffer_chunk = int(buffer_chunk_s * 100000 * 4) # round buffers to a whole sample if self._buffer_max_len % 4 != 0: @@ -425,17 +455,19 @@ def __init__(self, ppk2, quit_evt, buffer_len_s=10, buffer_chunk_s=0.5): def run(self): s = 0 t = time.time() - local_buffer = b'' + local_buffer = b"" while not self._quit.is_set(): d = PPK2_API.get_data(self._ppk2) tm_now = time.time() local_buffer += d while len(local_buffer) >= self._buffer_chunk: # FIXME: check if lock might be needed when discarding old data - self._buffer_q.put(local_buffer[:self._buffer_chunk]) - while self._buffer_q.qsize()>self._buffer_max_len/self._buffer_chunk: + self._buffer_q.put(local_buffer[: self._buffer_chunk]) + while ( + self._buffer_q.qsize() > self._buffer_max_len / self._buffer_chunk + ): self._buffer_q.get() - local_buffer = local_buffer[self._buffer_chunk:] + local_buffer = local_buffer[self._buffer_chunk :] self._last_timestamp = tm_now # calculate stats @@ -458,11 +490,12 @@ def run(self): break def get_data(self): - ret = b'' + ret = b"" count = 0 while True: try: - ret += self._buffer_q.get(timeout=0.001) # get_nowait sometimes skips a chunk for some reason + # get_nowait sometimes skips a chunk for some reason + ret += self._buffer_q.get(timeout=0.001) count += 1 except queue.Empty: break @@ -470,17 +503,20 @@ def get_data(self): class PPK2_MP(PPK2_API): - ''' + """ Multiprocessing variant of the object. The interface is the same as for the regular one except it spawns a background process on start_measuring() - ''' - def __init__(self, port, buffer_max_size_seconds=10, buffer_chunk_seconds=0.1, **kwargs): - ''' + """ + + def __init__( + self, port, buffer_max_size_seconds=10, buffer_chunk_seconds=0.1, **kwargs + ): + """ port - port where PPK2 is connected buffer_max_size_seconds - how many seconds of data to keep in the buffer buffer_chunk_seconds - how many seconds of data to put in the queue at once **kwargs - keyword arguments to pass to the pySerial constructor - ''' + """ super().__init__(port, **kwargs) self._fetcher = None @@ -502,27 +538,32 @@ def __del__(self): def start_measuring(self): # discard the data in the buffer self.stop_measuring() - while self.get_data()!=b'': + while self.get_data() != b"": pass PPK2_API.start_measuring(self) self._quit_evt.clear() if self._fetcher is not None: return - - self._fetcher = PPK_Fetch(self, self._quit_evt, self._buffer_max_size_seconds, self._buffer_chunk_seconds) + + self._fetcher = PPK_Fetch( + self, + self._quit_evt, + self._buffer_max_size_seconds, + self._buffer_chunk_seconds, + ) self._fetcher.start() def stop_measuring(self): PPK2_API.stop_measuring(self) - self.get_data() # flush the serial buffer (to prevent unicode error on next command) + self.get_data() # flush the serial buffer (to prevent unicode error on next command) self._quit_evt.set() if self._fetcher is not None: - self._fetcher.join() # join() will block if the queue isn't empty + self._fetcher.join() # join() will block if the queue isn't empty self._fetcher = None def get_data(self): try: return self._fetcher.get_data() except (TypeError, AttributeError): - return b'' + return b"" \ No newline at end of file diff --git a/tools/ppk2_battery_emulator_gui.py b/tools/ppk2_battery_emulator_gui.py new file mode 100644 index 0000000..a362f40 --- /dev/null +++ b/tools/ppk2_battery_emulator_gui.py @@ -0,0 +1,818 @@ +import tkinter as tk +from tkinter import filedialog, ttk, messagebox +import threading +import time +import math +import pandas as pd +import matplotlib.pyplot as plt +from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg +import numpy as np +import random +import re +from datetime import datetime + +# ---------------------------------------------------------------------- +# PPK2 API INTEGRATION (Forced MOCK Mode) +# ---------------------------------------------------------------------- + +# --- MOCK DEFINITIONS (Always available for fallback) --- +BASE_I_QUIET = 10.0 # Static base for quiet current (mA) +BASE_I_PEAK = 130.0 # Static base for peak current (mA) +NOISE_RANGE = 0.5 # Random fluctuation range (±0.5 mA) + +class PPK2Mock: + """Emulates the PPK2_API interface for GUI testing.""" + def start_source_meter(self, samplerate, logic_enabled=False): + self.samplerate = samplerate + self.logic_enabled = logic_enabled + self.voltage = 0 + self.start_time = time.time() + print(f"MOCK: Started Source Meter @ {samplerate} Hz (Logic: {logic_enabled})") + + def set_voltage(self, voltage_v): + self.voltage = voltage_v + + def get_data(self): + """Simulate dynamic current draw with random variance around static values.""" + t = time.time() + + # A common sampling buffer size + num_samples = 100 + data = [] + for i in range(num_samples): + # Simulate bursts of current every 5 seconds + if (t + i/self.samplerate) % 5 < 0.2: + I_base = BASE_I_PEAK + else: + I_base = BASE_I_QUIET + + I_noise = random.uniform(-NOISE_RANGE, NOISE_RANGE) + data.append(I_base + I_noise) + return data + + def get_logic_data(self): + """Simulate dynamic logic data.""" + t = time.time() + # D0 toggles every 1s, D1 toggles every 0.5s, D5 toggles every 10s + return [ + (t % 2) > 1, (t % 0.5) > 0.25, random.choice([True, False]), False, True, (t % 10) > 9.5, False, False + ] + + def stop(self): + print("MOCK: Stopped.") + +# --- API LOADING LOGIC (FORCED MOCK MODE) --- + +def init_ppk2_device(voltage_v, samplerate_hz, logic_enabled=False): + """Initializes the Mock PPK2 device.""" + mock_device = PPK2Mock() + mock_device.start_source_meter(samplerate=samplerate_hz, logic_enabled=logic_enabled) + mock_device.set_voltage(voltage_v) + return mock_device + +API_MODE = "MOCK" +print("WARNING: PPK2 API not found. Using MOCK mode for simulation.") + +# ---------------------------------------------------------------------- +# EMULATION CONSTANTS AND BATTERY PARAMETERS +# ---------------------------------------------------------------------- + +DISCHARGE_RATE = 100 # Emulation acceleration factor (100x faster than real time) +TIME_STEP_REAL_SEC = 1.0 # The amount of real time simulated in one emulation step +TIME_STEP_EMUL_SEC = TIME_STEP_REAL_SEC / DISCHARGE_RATE # The actual time the PPK2 samples current + +class BatteryEmulator(tk.Frame): + + # Static data for battery technologies and their discharge limits + BATTERY_PROFILES = { + "Li-Po/Li-Ion (3.7V)": {"V_START": 4.2, "V_STOP": 3.2}, + "LiFePO4 (3.2V)": {"V_START": 3.6, "V_STOP": 2.8}, + "NiMH/NiCd (1.2V)": {"V_START": 1.4, "V_STOP": 1.0}, + "Alkaline (1.5V)": {"V_START": 1.6, "V_STOP": 1.2} + } + + def __init__(self, master=None): + super().__init__(master) + self.master = master + self.master.title(f"PPK2 [{API_MODE}] - Battery Simulator and Current Profiler") + self.ppk2 = None + self.is_running = False + self.data_thread = None + + # State Variables + self.current_voltage = 0.0 + self.current_capacity_mah = 0.0 + self.time_elapsed_real_sec = 0 + self.all_current_samples = [] + self.all_time_real_s = [] + self.all_voltage_v = [] + self.all_logic_data = [] + + # Configuration + self.config = { + "V_STOP": self.BATTERY_PROFILES["Li-Po/Li-Ion (3.7V)"]["V_STOP"], + "SAMPLE_RATE_HZ": 1000 + } + + self.CAPACITY_OPTIONS = [1500, 3000, 5000, 8000] + self.V_START_DEFAULT = self.BATTERY_PROFILES["Li-Po/Li-Ion (3.7V)"]["V_START"] + self.CAPACITY_DEFAULT = 3000 + self.SIM_TIME_DEFAULT_STR = "05:00" + + # GUI Variables + self.battery_type_var = tk.StringVar(value="Li-Po/Li-Ion (3.7V)") + self.v_start_var = tk.DoubleVar(value=self.V_START_DEFAULT) + self.v_stop_var = tk.DoubleVar(value=self.config["V_STOP"]) + self.capacity_var = tk.StringVar(value=str(self.CAPACITY_DEFAULT)) + self.logic_enabled_var = tk.BooleanVar(value=False) + self.sim_discharge_var = tk.BooleanVar(value=False) + self.sim_time_str_var = tk.StringVar(value=self.SIM_TIME_DEFAULT_STR) + + # Bind voltage variable changes to update the config + self.v_start_var.trace_add("write", self.update_config_voltage) + self.v_stop_var.trace_add("write", self.update_config_voltage) + + # Simulation specific variables + self.total_sim_time_sec = 0 + self.required_mah_loss_per_sec = 0 + + self.create_widgets() + self.load_initial_state() + self.print_battery_info() + + def print_battery_info(self): + print("\n--- Common Battery Technology Information ---") + for tech, limits in self.BATTERY_PROFILES.items(): + print(f"| {tech:<20} | V_Start: {limits['V_START']:.1f}V | V_Stop: {limits['V_STOP']:.1f}V | Capacity: 100 mAh to 50,000 mAh+") + print("-------------------------------------------\n") + + def convert_mm_ss_to_sec(self, mm_ss_str): + """Converts MM:SS string to total seconds.""" + match = re.match(r"(\d+):(\d+)", mm_ss_str.strip()) + if match: + minutes = int(match.group(1)) + seconds = int(match.group(2)) + return minutes * 60 + seconds + try: + return float(mm_ss_str) * 60 + except: + return 0 + + def format_seconds_to_hms(self, total_seconds): + """Converts total seconds into HH:MM:SS format.""" + if total_seconds == float('inf'): + return "INF" + + total_seconds = int(total_seconds) + hours = total_seconds // 3600 + minutes = (total_seconds % 3600) // 60 + seconds = total_seconds % 60 + return f"{hours:02}:{minutes:02}:{seconds:02}" + + def update_config_voltage(self, *args): + """Updates the internal config from the GUI DoubleVars.""" + try: + self.config["V_START"] = self.v_start_var.get() + self.config["V_STOP"] = self.v_stop_var.get() + except tk.TclError: + # Handle case where conversion fails (e.g., user is typing non-float data) + pass + + def update_voltage_from_type(self, *args): + """Updates V_START and V_STOP based on the selected battery type.""" + selected_type = self.battery_type_var.get() + profile = self.BATTERY_PROFILES.get(selected_type) + + if profile: + self.v_start_var.set(profile["V_START"]) + self.v_stop_var.set(profile["V_STOP"]) + print(f"INFO: Set V_START={profile['V_START']}V and V_STOP={profile['V_STOP']}V for {selected_type}.") + else: + print(f"WARNING: Unknown battery type selected: {selected_type}") + + def create_voltage_spinbox(self, parent, variable, row, column, label_text): + """Creates a label and an editable Spinbox with 0.01 increment/decrement.""" + ttk.Label(parent, text=label_text).grid(row=row, column=column, padx=5, pady=2, sticky='w') + + spinbox = tk.Spinbox(parent, + from_=0.0, to_=5.0, + increment=0.01, + textvariable=variable, + format="%.2f", + width=6, + command=self.update_config_voltage) # Update on button press + + spinbox.grid(row=row, column=column+1, padx=5, pady=2, sticky='w') + + # Bind the Enter key to update the config (for manual entry) + spinbox.bind('', lambda event: self.update_config_voltage()) + + return spinbox + + def load_initial_state(self): + # Apply current settings to configuration variables + self.config["V_STOP"] = self.v_stop_var.get() + self.config["V_START"] = self.v_start_var.get() + + try: + capacity = float(self.capacity_var.get()) + except ValueError: + capacity = self.CAPACITY_DEFAULT + self.capacity_var.set(str(self.CAPACITY_DEFAULT)) + + self.config["CAPACITY_NOMINAL_MAH"] = capacity + + self.current_capacity_mah = self.config["CAPACITY_NOMINAL_MAH"] + self.current_voltage = self.config["V_START"] + self.v_label.config(text=f"V_SYS Voltage (V): {self.current_voltage:.3f}") + self.soc_label.config(text="SoC (%): 100.0 (Avg I: 0.0 mA)") + + # Prepare for Simulated Discharge Mode + if self.sim_discharge_var.get(): + self.total_sim_time_sec = self.convert_mm_ss_to_sec(self.sim_time_str_var.get()) + + # --- Calculation for console/summary output (not used for V drop) --- + soc_start = 100.0 + sim_V_stop = max(self.config["V_STOP"], self.config["V_START"] - 0.01) + soc_stop = self.get_soc_percent_from_voltage(sim_V_stop) + + required_soc_drop = soc_start - soc_stop + required_mah_drop = required_soc_drop * (self.config["CAPACITY_NOMINAL_MAH"] / 100.0) + + if self.total_sim_time_sec > 0: + self.required_mah_loss_per_sec = required_mah_drop / self.total_sim_time_sec + else: + self.required_mah_loss_per_sec = 0 + + self.current_capacity_mah = self.config["CAPACITY_NOMINAL_MAH"] + + + def create_widgets(self): + # --- TOP CONTROL FRAME --- + control_frame = ttk.LabelFrame(self.master, text="Control and Configuration") + control_frame.pack(padx=10, pady=5, fill="x") + + # Row 0: Battery Type Selection and Capacity + ttk.Label(control_frame, text="Battery Type:").grid(row=0, column=0, padx=5, pady=2, sticky='w') + battery_menu = ttk.Combobox(control_frame, + textvariable=self.battery_type_var, + values=list(self.BATTERY_PROFILES.keys()), + width=20, + state="readonly") + battery_menu.grid(row=0, column=1, padx=5, pady=2, sticky='w', columnspan=2) + self.battery_type_var.trace_add("write", self.update_voltage_from_type) + + ttk.Label(control_frame, text="Capacity (mAh):").grid(row=0, column=3, padx=5, pady=2, sticky='w') + capacity_menu = ttk.Combobox(control_frame, textvariable=self.capacity_var, values=self.CAPACITY_OPTIONS, width=8) + capacity_menu.grid(row=0, column=4, padx=5, pady=2, sticky='w') + + + # Row 1: Voltage Settings (Using new Spinbox helper) + self.create_voltage_spinbox(control_frame, self.v_start_var, 1, 0, "Start V (V):") + self.create_voltage_spinbox(control_frame, self.v_stop_var, 1, 2, "Stop V (V):") + + # Logic Analyzer and Start Button (Row 2) + ttk.Checkbutton(control_frame, text="Enable Logic Analyzer", variable=self.logic_enabled_var).grid(row=2, column=0, columnspan=2, padx=5, pady=2, sticky='w') + self.start_button = ttk.Button(control_frame, text="START Emulation", command=self.toggle_emulation) + self.start_button.grid(row=2, column=4, padx=10, pady=5) + + # Row 3: Simulated Discharge Controls + sim_frame = ttk.LabelFrame(control_frame, text="Simulated Time Discharge") + sim_frame.grid(row=3, column=0, columnspan=5, padx=5, pady=5, sticky='we') + + ttk.Checkbutton(sim_frame, text="Enable Time Simulation", variable=self.sim_discharge_var).pack(side='left', padx=5, pady=2) + ttk.Label(sim_frame, text="Target Time (MM:SS):").pack(side='left', padx=5, pady=2) + ttk.Entry(sim_frame, textvariable=self.sim_time_str_var, width=8).pack(side='left', padx=5, pady=2) + ttk.Label(sim_frame, text=f"({self.SIM_TIME_DEFAULT_STR} default)").pack(side='left', padx=5, pady=2) + + + # --- STATE DISPLAY FRAME --- + state_frame = ttk.LabelFrame(self.master, text="Simulation State") + state_frame.pack(padx=10, pady=5, fill="x") + self.v_label = ttk.Label(state_frame, text="V_SYS Voltage (V): 0.000", font=('Arial', 12, 'bold')) + self.v_label.grid(row=0, column=0, padx=5, pady=5) + self.soc_label = ttk.Label(state_frame, text="SoC (%): 0.0 (Avg I: 0.0 mA)", font=('Arial', 12, 'bold')) + self.soc_label.grid(row=0, column=1, padx=5, pady=5) + self.runtime_label = ttk.Label(state_frame, text="Real Runtime (h): 0.00") + self.runtime_label.grid(row=0, column=2, padx=5, pady=5) + + # Logic Analyzer Status + logic_status_frame = ttk.Frame(state_frame) + logic_status_frame.grid(row=1, column=0, columnspan=3, pady=5) + ttk.Label(logic_status_frame, text="Logic Pins:").pack(side='left', padx=5) + self.logic_indicators = [] + for i in range(8): + label = ttk.Label(logic_status_frame, text=f"D{i}: LOW", width=6) + label.pack(side='left', padx=2) + self.logic_indicators.append(label) + + # --- PLOT FRAME --- + plot_frame = ttk.LabelFrame(self.master, text="Current and Voltage Profile") + plot_frame.pack(padx=10, pady=5, fill="both", expand=True) + + self.fig, (self.ax_i, self.ax_v) = plt.subplots(2, 1, figsize=(8, 6), sharex=True) + + # Current Plot (Top) + self.ax_i.set_ylabel("Current (mA)") + self.ax_i.grid(True) + self.line_i, = self.ax_i.plot([], [], 'r-') + + # Voltage Plot (Bottom) + self.ax_v.set_xlabel("Relative Time (s)") + self.ax_v.set_ylabel("Voltage (V)") + self.ax_v.grid(True) + self.line_v, = self.ax_v.plot([], [], 'b-') + + self.canvas = FigureCanvasTkAgg(self.fig, master=plot_frame) + self.canvas_widget = self.canvas.get_tk_widget() + self.canvas_widget.pack(fill="both", expand=True) + + # --- STATISTICS FRAME --- + stats_frame = ttk.LabelFrame(self.master, text="Measurement Statistics") + stats_frame.pack(padx=10, pady=5, fill="x") + self.peak_label = ttk.Label(stats_frame, text="Peak Current (mA): 0.0") + self.peak_label.grid(row=0, column=0, padx=5, pady=5) + self.rms_label = ttk.Label(stats_frame, text="RMS Current (mA): 0.0") + self.rms_label.grid(row=0, column=1, padx=5, pady=5) + self.save_button = ttk.Button(stats_frame, text="Save Log (.csv)", command=self.save_data) + self.save_button.grid(row=0, column=2, padx=10, pady=5) + + def toggle_emulation(self): + if not self.is_running: + self.start_emulation() + else: + self.stop_emulation() + + def start_emulation(self): + try: + self.load_initial_state() + + # 1. Initialize PPK2 using the unified function + self.ppk2 = init_ppk2_device( + self.config["V_START"], + self.config["SAMPLE_RATE_HZ"], + self.logic_enabled_var.get() + ) + + # 2. Reset State and Logs + self.all_current_samples = [] + self.all_time_real_s = [] + self.all_voltage_v = [] + self.all_logic_data = [] + self.time_elapsed_real_sec = 0 + + # 3. Start Data Thread + self.is_running = True + self.start_button.config(text="STOP Emulation", state=tk.NORMAL) + self.data_thread = threading.Thread(target=self.emulation_loop) + self.data_thread.daemon = True + self.data_thread.start() + + # Initial console debug message + print("\n--- EMULATION STARTING ---") + print(f"Mode: {'Time Simulation' if self.sim_discharge_var.get() else 'Current Emulation'} | Battery: {self.battery_type_var.get()}") + print(f"Capacity: {self.config['CAPACITY_NOMINAL_MAH']} mAh | V_Start: {self.config['V_START']} V | V_Stop: {self.config['V_STOP']} V") + if self.sim_discharge_var.get(): + print(f"Target Discharge Time: {self.sim_time_str_var.get()} (MM:SS) | Required Loss Rate: {self.required_mah_loss_per_sec * 3600.0:.2f} mA (Theoretical)") + print("--------------------------") + + except Exception as e: + messagebox.showerror("PPK2 Error", f"Failed to initialize PPK2: {e}. Check API connection.") + self.stop_emulation(is_error=True) + + def stop_emulation(self, is_error=False): + if not self.is_running and not is_error: + return + + self.is_running = False + + if self.ppk2: + try: + # We do not need the real stop() for mock, but keep it structured + if API_MODE == "REAL": + # self.ppk2.stop() # Uncomment for real API + pass + else: + self.ppk2.stop() # Mock stop + except Exception as e: + print(f"Error while stopping PPK2 device: {e}") + self.ppk2 = None + + if self.data_thread and self.data_thread.is_alive(): + self.data_thread.join(timeout=0.1) + + self.start_button.config(text="START Emulation", state=tk.NORMAL) + + def get_voltage_from_soc(self, soc_percent): + """Calculates voltage from SoC percentage based on simplified discharge curve.""" + V_start = self.config["V_START"] + V_stop = self.config["V_STOP"] + + if V_start <= V_stop: return V_start # Failsafe + + V_range = V_start - V_stop + + # Simplified S-curve model + if soc_percent > 90: + V_drop_initial = V_range * 0.1 + V_current = V_start - ((100 - soc_percent) * (V_drop_initial / 10)) + return max(V_stop, V_current) + + elif soc_percent > 10: + V_plateau_start = V_start - (V_range * 0.1) + V_plateau_end = V_stop + (V_range * 0.1) + + V_plateau_drop = V_plateau_start - V_plateau_end + + V_current = V_plateau_start - ((90 - soc_percent) * (V_plateau_drop / 80)) + return max(V_stop, V_current) + + else: + V_plateau_end = V_stop + (V_range * 0.1) + + V_current = V_stop + (soc_percent * (V_plateau_end - V_stop) / 10) + return max(V_stop, V_current) + + + def get_soc_percent_from_voltage(self, voltage): + """Reverse calculation: Estimates SoC from voltage (used for setup).""" + V_start = self.config["V_START"] + V_stop = self.config["V_STOP"] + + if V_start <= V_stop: return 100.0 + + V_range = V_start - V_stop + + if voltage >= V_start: + return 100.0 + elif voltage <= V_stop: + return 0.0 + + # Simplified inverse mapping + V_drop_10_percent = V_range * 0.1 + V_plateau_start = V_start - V_drop_10_percent + + if voltage > V_plateau_start: # Top 10% + return 100 - ((V_start - voltage) / V_drop_10_percent) * 10 + + V_plateau_end = V_stop + V_drop_10_percent + + if voltage >= V_plateau_end: # Middle 80% + V_plateau_drop = V_plateau_start - V_plateau_end + if V_plateau_drop <= 0: return 90.0 + return 90 - ((V_plateau_start - voltage) / V_plateau_drop) * 80 + + # Bottom 10% + V_final_range = V_plateau_end - V_stop + if V_final_range <= 0: return 10.0 + return ((voltage - V_stop) / V_final_range) * 10 + + + def emulation_loop(self): + last_emulation_time = time.time() + is_sim_discharge = self.sim_discharge_var.get() + + while self.is_running and self.current_voltage > self.config["V_STOP"]: + + if not self.is_running: + break + + # 1. CURRENT AND LOGIC DATA MEASUREMENT + try: + currents_ma = self.ppk2.get_data() + logic_data = self.ppk2.get_logic_data() if self.logic_enabled_var.get() else [False] * 8 + except Exception as e: + print(f"Error fetching data: {e}") + self.is_running = False + break + + if not currents_ma or len(currents_ma) == 0: + time.sleep(0.01) + continue + + currents_ma = np.array(currents_ma) + + # --- ENERGY CONSUMPTION LOGIC --- + + time_h_simulated = TIME_STEP_REAL_SEC / 3600.0 + I_avg_ma_realtime = np.mean(currents_ma) + + # 2. Update time elapsed + self.time_elapsed_real_sec += TIME_STEP_REAL_SEC + + if is_sim_discharge: + # SIMULATED DISCHARGE MODE: Voltage loss is time-based (FIXED). + + # Calculate elapsed percentage + elapsed_ratio = min(1.0, self.time_elapsed_real_sec / self.total_sim_time_sec) + + # Calculate the target voltage based on time percentage + V_start = self.config["V_START"] + V_stop = self.config["V_STOP"] + + # Linearly scale V_start to V_stop over the total time + self.current_voltage = V_start - (elapsed_ratio * (V_start - V_stop)) + + # Check for end condition + if self.time_elapsed_real_sec >= self.total_sim_time_sec: + self.current_voltage = V_stop # Ensure it hits V_STOP exactly + self.is_running = False + break + + # Calculate SoC based on the new time-based voltage + soc_percent = self.get_soc_percent_from_voltage(self.current_voltage) + self.current_capacity_mah = self.config["CAPACITY_NOMINAL_MAH"] * (soc_percent / 100.0) + + # Time remaining calculation (simple countdown) + time_remaining_sec = max(0, self.total_sim_time_sec - self.time_elapsed_real_sec) + + else: + # REAL EMULATION MODE: Voltage loss is current-based. + + consumed_mah = I_avg_ma_realtime * time_h_simulated + self.current_capacity_mah -= consumed_mah + soc_percent = (self.current_capacity_mah / self.config["CAPACITY_NOMINAL_MAH"]) * 100.0 + self.current_voltage = self.get_voltage_from_soc(soc_percent) + + # Estimated remaining time is based on current consumption rate + mah_remaining = self.current_capacity_mah - self.config["CAPACITY_NOMINAL_MAH"] * self.get_soc_percent_from_voltage(self.config["V_STOP"]) / 100.0 + time_remaining_sec = (mah_remaining / I_avg_ma_realtime) * 3600.0 if I_avg_ma_realtime > 0 else float('inf') + + + # 3. UPDATE VOLTAGE ON PPK2 + self.ppk2.set_voltage(self.current_voltage) + + # 4. Update Logs + + for i, current_sample in enumerate(currents_ma): + # Log the current voltage for every sample in this step + self.all_current_samples.append(current_sample) + self.all_time_real_s.append(self.time_elapsed_real_sec) + self.all_voltage_v.append(self.current_voltage) + + if self.logic_enabled_var.get(): + self.all_logic_data.append(logic_data) + else: + self.all_logic_data.append([False] * 8) + + + # Calculate statistics + peak_current = np.max(currents_ma) + rms_current = np.sqrt(np.mean(currents_ma**2)) + + # --- CONSOLE DEBUG OUTPUT --- + # Print every 5 seconds (50 steps of 0.1s) + if int(self.time_elapsed_real_sec * 10) % 50 == 0 or self.time_elapsed_real_sec == self.total_sim_time_sec: + time_rem_str = self.format_seconds_to_hms(time_remaining_sec) + + print( + f"TIME: {self.time_elapsed_real_sec:.1f} s | " + f"V_OUT: {self.current_voltage:.3f} V | " + f"I_AVG: {I_avg_ma_realtime:.2f} mA | " + f"SoC: {soc_percent:.1f}% | " + f"Rem. Time: {time_rem_str}" + ) + + # Update GUI (Thread-safe way) + self.master.after(0, self.update_gui, soc_percent, I_avg_ma_realtime, peak_current, rms_current, currents_ma, logic_data) + + # Wait for the next measurement step + time.sleep(max(0, TIME_STEP_EMUL_SEC - (time.time() - last_emulation_time))) + last_emulation_time = time.time() + + + # Final cleanup if loop exited + print("--- EMULATION LOOP EXIT ---") + self.master.after(0, self.stop_emulation) + if self.current_voltage <= self.config["V_STOP"] or is_sim_discharge: + self.master.after(0, self.display_final_results) + + + def update_gui(self, soc_percent, I_avg_ma_realtime, peak_current, rms_current, currents_to_plot, logic_data): + # Update Labels + self.v_label.config(text=f"V_SYS Voltage (V): {self.current_voltage:.3f}") + self.soc_label.config(text=f"SoC (%): {soc_percent:.1f} (Avg I: {I_avg_ma_realtime:.1f} mA)") + + # Display runtime in HH:MM:SS format + runtime_hms = self.format_seconds_to_hms(self.time_elapsed_real_sec) + self.runtime_label.config(text=f"Real Runtime (H:M:S): {runtime_hms}") + + self.peak_label.config(text=f"Peak Current (mA): {peak_current:.2f}") + self.rms_label.config(text=f"RMS Current (mA): {rms_current:.2f}") + + mode = "TIME SIMULATION" if self.sim_discharge_var.get() else "CURRENT EMULATION" + self.master.title(f"PPK2 [{API_MODE}] - Battery Simulator ({mode})") + + # Update Logic Indicators + for i, val in enumerate(logic_data): + color = 'green' if val else 'red' + text = 'HIGH' if val else 'LOW' + self.logic_indicators[i].config(text=f"D{i}: {text}", background=color) + + # Update Plots (Oscilloscope) + plot_times = np.arange(0, len(currents_to_plot)) / self.config["SAMPLE_RATE_HZ"] + + # Current Plot + self.line_i.set_data(plot_times, currents_to_plot) + self.ax_i.set_xlim(0, plot_times[-1] if plot_times.size > 0 else 0.01) + self.ax_i.set_ylim(np.min(currents_to_plot) * 0.9, np.max(currents_to_plot) * 1.1 if currents_to_plot.size > 0 else 10) + + # Voltage Plot + voltage_plot = np.full_like(plot_times, self.current_voltage) + self.line_v.set_data(plot_times, voltage_plot) + self.ax_v.set_ylim(self.config["V_STOP"] * 0.9, self.config["V_START"] * 1.1) + + self.fig.tight_layout() + self.canvas.draw() + + def display_final_results(self): + total_time_real_sec = self.time_elapsed_real_sec + total_time_real_h = total_time_real_sec / 3600.0 + + # Format runtime to HH:MM:SS for the final message + runtime_hms = self.format_seconds_to_hms(total_time_real_sec) + + if self.sim_discharge_var.get(): + # In time mode, we calculate the charge loss required to hit V_STOP based on the SoC curve. + final_soc = self.get_soc_percent_from_voltage(self.config["V_STOP"]) + initial_soc = self.get_soc_percent_from_voltage(self.config["V_START"]) + + soc_drop = initial_soc - final_soc + mah_consumed_simulated = self.config["CAPACITY_NOMINAL_MAH"] * (soc_drop / 100.0) + + I_test_avg_ma = mah_consumed_simulated / total_time_real_h if total_time_real_h > 0 else 0 + + final_message = ( + f"!!! SIMULATION FINISHED (Time Mode) !!!\n" + f"Targeted simulation time: {self.sim_time_str_var.get()} (MM:SS).\n" + f"Discharge from {self.config['V_START']:.2f}V to {self.config['V_STOP']:.2f}V achieved.\n" + f"\n--- RESULTS ---\n" + f"Simulated Runtime: {runtime_hms} (HH:MM:SS)\n" + f"Theoretical Average Current drawn: {I_test_avg_ma:.2f} mA (to achieve V drop)" + ) + else: + total_consumed_mah = self.config["CAPACITY_NOMINAL_MAH"] - self.current_capacity_mah + I_test_avg_ma = total_consumed_mah / total_time_real_h if total_time_real_h > 0 else 0 + estimated_runtime_h = self.config["CAPACITY_NOMINAL_MAH"] / I_test_avg_ma if I_test_avg_ma > 0 else float('inf') + estimated_runtime_hms = self.format_seconds_to_hms(estimated_runtime_h * 3600.0) + + final_message = ( + f"!!! EMULATION FINISHED !!!\n" + f"Algorithm reached V_STOP = {self.config['V_STOP']} V.\n" + f"\n--- TEST RESULTS ---\n" + f"Simulated Runtime until V_STOP: {runtime_hms} (HH:MM:SS)\n" + f"Overall Average Current from test: {I_test_avg_ma:.2f} mA\n" + f"*** ESTIMATED TOTAL RUNTIME (Full Capacity): {estimated_runtime_hms} (HH:MM:SS) ***" + ) + + messagebox.showinfo("Simulation Results", final_message) + + def save_data(self): + """Saves collected data to a CSV file and appends summary statistics. + Includes an aggregated Hex value for the Logic Analyzer pins.""" + if not self.all_current_samples: + messagebox.showwarning("Error", "No data to save.") + return + + # --- Generate File Name with Date/Time --- + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + default_filename = f"{timestamp}_ppk2_battery_log.csv" + + filepath = filedialog.asksaveasfilename(defaultextension=".csv", + filetypes=[("CSV files", "*.csv")], + initialfile=default_filename) + if not filepath: + return + + # --- 1. Prepare Time Data in s:ms format --- + def format_time_s_ms(total_seconds): + seconds = int(total_seconds) + milliseconds = int(round((total_seconds - seconds) * 1000)) + if milliseconds == 1000: # Handle rounding up to the next second + seconds += 1 + milliseconds = 0 + return f"{seconds}:{milliseconds:03d}" + + time_s_ms = [format_time_s_ms(t) for t in self.all_time_real_s] + + # --- 2. Bit-to-Hex Conversion --- + def logic_to_hex(logic_list): + """ + Converts a list of 8 booleans (D0 is LSB) to a 0xXX hex string. + Now prepends '0x' for standard hex format. + """ + if not logic_list: + return '0x00' + + logic_list = logic_list[:8] + [False] * (8 - len(logic_list)) + + # Assuming D0 is LSB (2^0) and D7 is MSB (2^7). + decimal_value = 0 + for i, state in enumerate(logic_list): + if state: + decimal_value += (2 ** i) + + # Return with '0x' prefix + return f'0x{decimal_value:02X}' + + + # --- 3. Prepare Data Frame for main log --- + min_len = min(len(time_s_ms), len(self.all_current_samples), len(self.all_voltage_v), len(self.all_logic_data)) + + # Create columns for logic data (D0, D1, ..., D7) + logic_cols = {f"D{i}": [log[i] for log in self.all_logic_data[:min_len]] for i in range(8)} + + # New Hexadecimal column + logic_hex_values = [logic_to_hex(log) for log in self.all_logic_data[:min_len]] + + + data_dict = { + "Time_s:ms": time_s_ms[:min_len], + "Current_mA_Measured": self.all_current_samples[:min_len], + "Voltage_V_Emulated": self.all_voltage_v[:min_len], + **logic_cols, + "Logic_Hex_D0-D7": logic_hex_values + } + df = pd.DataFrame(data_dict) + + # --- 4. Calculate Summary Statistics --- + total_time_real_sec = self.time_elapsed_real_sec + total_time_real_h = total_time_real_sec / 3600.0 + + # Format runtime to HH:MM:SS for the summary + runtime_hms = self.format_seconds_to_hms(total_time_real_sec) + + if self.sim_discharge_var.get(): + # Time Simulation Mode + final_soc = self.get_soc_percent_from_voltage(self.config["V_STOP"]) + initial_soc = self.get_soc_percent_from_voltage(self.config["V_START"]) + soc_drop = initial_soc - final_soc + + mah_consumed_simulated = self.config["CAPACITY_NOMINAL_MAH"] * (soc_drop / 100.0) + I_avg_ma = mah_consumed_simulated / total_time_real_h if total_time_real_h > 0 else 0 + I_peak_ma = np.max(self.all_current_samples) if self.all_current_samples else 0 + + total_coulomb_uAh = mah_consumed_simulated * 1000 + estimated_runtime_hms = self.format_seconds_to_hms(self.total_sim_time_sec) + runtime_status = f"Targeted Runtime (H:M:S): {estimated_runtime_hms}" + + else: + # Current Emulation Mode + total_consumed_mah = self.config["CAPACITY_NOMINAL_MAH"] - self.current_capacity_mah + I_avg_ma = total_consumed_mah / total_time_real_h if total_time_real_h > 0 else 0 + I_peak_ma = np.max(self.all_current_samples) if self.all_current_samples else 0 + + total_coulomb_uAh = total_consumed_mah * 1000 + estimated_runtime_h = self.config["CAPACITY_NOMINAL_MAH"] / I_avg_ma if I_avg_ma > 0 else float('inf') + estimated_runtime_hms = self.format_seconds_to_hms(estimated_runtime_h * 3600.0) + runtime_status = f"Estimated Total Runtime (H:M:S): {estimated_runtime_hms}" + + # --- 5. Create Summary Block --- + summary_lines = [ + f"\n\n--- SIMULATION SUMMARY ---", + f"Mode,{ 'TIME_SIMULATION' if self.sim_discharge_var.get() else 'CURRENT_EMULATION'}", + f"Battery_Type,{self.battery_type_var.get()}", + f"Nominal_Capacity_mAh,{self.config['CAPACITY_NOMINAL_MAH']:.2f}", + f"Time_to_V_STOP_s,{self.time_elapsed_real_sec:.2f}", + f"Total_Charge_Consumed_uAh,{total_coulomb_uAh:.2f}", + f"Average_Current_mA,{I_avg_ma:.2f}", + f"Peak_Current_mA,{I_peak_ma:.2f}", + f"Runtime_to_V_STOP_H:M:S,{runtime_hms}", # Use H:M:S format here + f"Estimated_Total_Runtime_H:M:S,{estimated_runtime_hms}", + f"V_START_V,{self.config['V_START']:.2f}", + f"V_STOP_V,{self.config['V_STOP']:.2f}", + f"--------------------------" + ] + summary_block = "\n".join(summary_lines) + + # --- 6. Save to CSV --- + with open(filepath, 'w') as f: + # Write data frame (main log) + f.write(df.to_csv(index=False)) + # Append summary block + f.write(summary_block) + + messagebox.showinfo("Saved", f"Data and Summary successfully saved to:\n{filepath}") + + +# --- Main application loop --- +if __name__ == "__main__": + # Ensure dependencies are available (though no guarantee) + try: + import numpy as np + import pandas as pd + import matplotlib.pyplot as plt + except ImportError: + print("FATAL ERROR: Missing dependencies (numpy, pandas, matplotlib). Please install them using: pip install numpy pandas matplotlib") + exit() + + root = tk.Tk() + app = BatteryEmulator(master=root) + + def on_closing(): + if messagebox.askokcancel("Quit", "Do you want to quit the application?"): + app.stop_emulation() + root.destroy() + + root.protocol("WM_DELETE_WINDOW", on_closing) + root.mainloop() \ No newline at end of file