diff --git a/.gitignore b/.gitignore
index 5e08b19..5e01160 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,5 @@
**/__pycache__/*
src/ppk2_api.egg-info
build
-dist
\ No newline at end of file
+dist!/.gitignore
+.idea
\ No newline at end of file
diff --git a/README.md b/README.md
index ed0e941..12fdfb0 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,6 @@
## Description
The new Nordic Semiconductor's [Power Profiler Kit II (PPK 2)](https://www.nordicsemi.com/Software-and-tools/Development-Tools/Power-Profiler-Kit-2) is very useful for real time measurement of device power consumption. The official [nRF Connect Power Profiler tool](https://github.com/NordicSemiconductor/pc-nrfconnect-ppk) provides a friendly GUI with real-time data display. However there is no support for automated power monitoring. The puropose of this Python API is to enable automated power monitoring and data logging in Python applications.
+Original description comes from https://docs.nordicsemi.com/bundle/ug_ppk2/page/UG/ppk/ppk_downloadable_content.html

diff --git a/doc/PCA63100_Schematic_And_PCB.pdf b/doc/PCA63100_Schematic_And_PCB.pdf
new file mode 100644
index 0000000..d31cb7b
Binary files /dev/null and b/doc/PCA63100_Schematic_And_PCB.pdf differ
diff --git a/doc/pca63100-power-profiler-kit-ii---hw-files---1_0_1.zip b/doc/pca63100-power-profiler-kit-ii---hw-files---1_0_1.zip
new file mode 100644
index 0000000..8b27d42
Binary files /dev/null and b/doc/pca63100-power-profiler-kit-ii---hw-files---1_0_1.zip differ
diff --git a/doc/ppk2_block_diagram.svg b/doc/ppk2_block_diagram.svg
new file mode 100644
index 0000000..9a582e7
--- /dev/null
+++ b/doc/ppk2_block_diagram.svg
@@ -0,0 +1,371 @@
+
\ No newline at end of file
diff --git a/doc/ppk2_pcb1a_t.png b/doc/ppk2_pcb1a_t.png
new file mode 100644
index 0000000..96bdcf5
Binary files /dev/null and b/doc/ppk2_pcb1a_t.png differ
diff --git a/doc/ppk2_pcb2_t.png b/doc/ppk2_pcb2_t.png
new file mode 100644
index 0000000..9bc3015
Binary files /dev/null and b/doc/ppk2_pcb2_t.png differ
diff --git a/doc/ppk2_userguide_1.0.1.pdf b/doc/ppk2_userguide_1.0.1.pdf
new file mode 100644
index 0000000..f92672d
Binary files /dev/null and b/doc/ppk2_userguide_1.0.1.pdf differ
diff --git a/example.py b/example.py
index e89ef7b..ff7cdf2 100644
--- a/example.py
+++ b/example.py
@@ -1,4 +1,3 @@
-
"""
Basic usage of PPK2 Python API.
The basic ampere mode sequence is:
@@ -6,6 +5,7 @@
2. set ampere mode
3. read stream of data
"""
+
import time
from ppk2_api.ppk2_api import PPK2_API
@@ -22,17 +22,20 @@
ppk2_test.get_modifiers()
ppk2_test.set_source_voltage(3300)
-ppk2_test.use_source_meter() # set source meter mode
-ppk2_test.toggle_DUT_power("ON") # enable DUT power
+# set source meter mode
+ppk2_test.use_source_meter()
+# enable DUT power
+ppk2_test.toggle_DUT_power("ON")
+# start measuring
+ppk2_test.start_measuring()
-ppk2_test.start_measuring() # start measuring
# measurements are a constant stream of bytes
# the number of measurements in one sampling period depends on the wait between serial reads
# it appears the maximum number of bytes received is 1024
# the sampling rate of the PPK2 is 100 samples per millisecond
for i in range(0, 1000):
read_data = ppk2_test.get_data()
- if read_data != b'':
+ if read_data != b"":
samples, raw_digital = ppk2_test.get_samples(read_data)
print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA")
@@ -46,14 +49,15 @@
print()
time.sleep(0.01)
-ppk2_test.toggle_DUT_power("OFF") # disable DUT power
-
-ppk2_test.use_ampere_meter() # set ampere meter mode
+# disable DUT power
+ppk2_test.toggle_DUT_power("OFF")
+# set ampere meter mode
+ppk2_test.use_ampere_meter()
ppk2_test.start_measuring()
for i in range(0, 1000):
read_data = ppk2_test.get_data()
- if read_data != b'':
+ if read_data != b"":
samples, raw_digital = ppk2_test.get_samples(read_data)
print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA")
@@ -65,6 +69,8 @@
# Print last 10 values of each channel
print(ch[-10:])
print()
- time.sleep(0.01) # lower time between sampling -> less samples read in one sampling period
-ppk2_test.stop_measuring()
+ # lower time between sampling -> less samples read in one sampling period
+ time.sleep(0.01)
+
+ppk2_test.stop_measuring()
\ No newline at end of file
diff --git a/example_mp.py b/example_mp.py
index 523dcd9..9de3c1c 100644
--- a/example_mp.py
+++ b/example_mp.py
@@ -1,4 +1,3 @@
-
"""
Basic usage of PPK2 Python API - multiprocessing version.
The basic ampere mode sequence is:
@@ -6,6 +5,7 @@
2. set ampere mode
3. read stream of data
"""
+
import time
from ppk2_api.ppk2_api import PPK2_MP as PPK2_API
@@ -18,24 +18,34 @@
print(f"Too many connected PPK2's: {ppk2s_connected}")
exit()
-ppk2_test = PPK2_API(ppk2_port, buffer_max_size_seconds=1, buffer_chunk_seconds=0.01, timeout=1, write_timeout=1, exclusive=True)
+ppk2_test = PPK2_API(
+ ppk2_port,
+ buffer_max_size_seconds=1,
+ buffer_chunk_seconds=0.01,
+ timeout=1,
+ write_timeout=1,
+ exclusive=True,
+)
ppk2_test.get_modifiers()
ppk2_test.set_source_voltage(3300)
"""
Source mode example
"""
-ppk2_test.use_source_meter() # set source meter mode
-ppk2_test.toggle_DUT_power("ON") # enable DUT power
+# set source meter mode
+ppk2_test.use_source_meter()
+# enable DUT power
+ppk2_test.toggle_DUT_power("ON")
+# start measuring
+ppk2_test.start_measuring()
-ppk2_test.start_measuring() # start measuring
# measurements are a constant stream of bytes
# the number of measurements in one sampling period depends on the wait between serial reads
# it appears the maximum number of bytes received is 1024
# the sampling rate of the PPK2 is 100 samples per millisecond
while True:
read_data = ppk2_test.get_data()
- if read_data != b'':
+ if read_data != b"":
samples, raw_digital = ppk2_test.get_samples(read_data)
print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA")
@@ -50,18 +60,21 @@
time.sleep(0.001)
-ppk2_test.toggle_DUT_power("OFF") # disable DUT power
+
+# disable DUT power
+ppk2_test.toggle_DUT_power("OFF")
ppk2_test.stop_measuring()
"""
Ampere mode example
"""
-ppk2_test.use_ampere_meter() # set ampere meter mode
+# set ampere meter mode
+ppk2_test.use_ampere_meter()
ppk2_test.start_measuring()
while True:
read_data = ppk2_test.get_data()
- if read_data != b'':
+ if read_data != b"":
samples, raw_digital = ppk2_test.get_samples(read_data)
print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA")
@@ -73,6 +86,8 @@
# Print last 10 values of each channel
print(ch[-10:])
print()
- time.sleep(0.001) # lower time between sampling -> less samples read in one sampling period
-ppk2_test.stop_measuring()
+ # lower time between sampling -> less samples read in one sampling period
+ time.sleep(0.001)
+
+ppk2_test.stop_measuring()
\ No newline at end of file
diff --git a/setup.py b/setup.py
index bab7e24..5c9308a 100644
--- a/setup.py
+++ b/setup.py
@@ -25,7 +25,7 @@ def read(*names, **kwargs):
name="ppk2-api",
version="0.9.2",
description="API for Nordic Semiconductor's Power Profiler Kit II (PPK 2).",
- url="https://github.com/IRNAS/ppk2-api-python",
+ url="https://github.com/RolandWa/ppk2-api-python",
packages=find_packages("src"),
package_dir={"": "src"},
py_modules=[splitext(basename(path))[0] for path in glob("src/*.py")],
diff --git a/src/power_profiler.py b/src/power_profiler.py
index 7af0f6f..6d9b12d 100644
--- a/src/power_profiler.py
+++ b/src/power_profiler.py
@@ -2,19 +2,21 @@
import csv
import datetime
from threading import Thread
+
# import numpy as np
# import matplotlib.pyplot as plt
# import matplotlib
from ppk2_api.ppk2_api import PPK2_MP as PPK2_API
-class PowerProfiler():
+
+class PowerProfiler:
def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None):
"""Initialize PPK2 power profiler with serial"""
self.measuring = None
self.measurement_thread = None
self.ppk2 = None
- print(f"Initing power profiler")
+ print("Initing power profiler")
# try:
if serial_port:
@@ -26,7 +28,8 @@ def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None):
self.ppk2 = PPK2_API(serial_port)
try:
- ret = self.ppk2.get_modifiers() # try to read modifiers, if it fails serial port is probably not correct
+ # try to read modifiers, if it fails serial port is probably not correct
+ ret = self.ppk2.get_modifiers()
print(f"Initialized ppk2 api: {ret}")
except Exception as e:
print(f"Error initializing power profiler: {e}")
@@ -35,13 +38,16 @@ def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None):
if not ret:
self.ppk2 = None
- raise Exception(f"Error when initing PowerProfiler with serial port {serial_port}")
+ raise Exception(
+ f"Error when initing PowerProfiler with serial port {serial_port}"
+ )
else:
self.ppk2.use_source_meter()
self.source_voltage_mV = source_voltage_mV
- self.ppk2.set_source_voltage(self.source_voltage_mV) # set to 3.3V
+ # set to 3.3V
+ self.ppk2.set_source_voltage(self.source_voltage_mV)
print(f"Set power profiler source voltage: {self.source_voltage_mV}")
@@ -62,7 +68,7 @@ def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None):
# write to csv
self.filename = filename
if self.filename is not None:
- with open(self.filename, 'w', newline='') as file:
+ with open(self.filename, "w", newline="") as file:
writer = csv.writer(file)
row = []
for key in ["ts", "avg1000"]:
@@ -71,10 +77,10 @@ def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None):
def write_csv_rows(self, samples):
"""Write csv row"""
- with open(self.filename, 'a', newline='') as file:
+ with open(self.filename, "a", newline="") as file:
writer = csv.writer(file)
for sample in samples:
- row = [datetime.datetime.now().strftime('%d-%m-%Y %H:%M:%S.%f'), sample]
+ row = [datetime.datetime.now().strftime("%d-%m-%Y %H:%M:%S.%f"), sample]
writer.writerow(row)
def delete_power_profiler(self):
@@ -85,26 +91,26 @@ def delete_power_profiler(self):
print("Deleting power profiler")
if self.measurement_thread:
- print(f"Joining measurement thread")
+ print("Joining measurement thread")
self.measurement_thread.join()
self.measurement_thread = None
if self.ppk2:
- print(f"Disabling ppk2 power")
+ print("Disabling ppk2 power")
self.disable_power()
del self.ppk2
- print(f"Deleted power profiler")
+ print("Deleted power profiler")
def discover_port(self):
"""Discovers ppk2 serial port"""
ppk2s_connected = PPK2_API.list_devices()
- if(len(ppk2s_connected) == 1):
+ if len(ppk2s_connected) == 1:
ppk2_port = ppk2s_connected[0]
- print(f'Found PPK2 at {ppk2_port}')
+ print(f"Found PPK2 at {ppk2_port}")
return ppk2_port
else:
- print(f'Too many connected PPK2\'s: {ppk2s_connected}')
+ print(f"Too many connected PPK2's: {ppk2s_connected}")
return None
def enable_power(self):
@@ -124,16 +130,21 @@ def disable_power(self):
def measurement_loop(self):
"""Endless measurement loop will run in a thread"""
while True and not self.stop:
- if self.measuring: # read data if currently measuring
+ # read data if currently measuring
+ if self.measuring:
read_data = self.ppk2.get_data()
- if read_data != b'':
+ if read_data != b"":
samples = self.ppk2.get_samples(read_data)
- self.current_measurements += samples # can easily sum lists, will append individual data
- time.sleep(0.001) # TODO figure out correct sleep duration
+ # can easily sum lists, will append individual data
+ self.current_measurements += samples
+ # TODO figure out correct sleep duration
+ time.sleep(0.001)
def _average_samples(self, list, window_size):
"""Average samples based on window size"""
- chunks = [list[val:val + window_size] for val in range(0, len(list), window_size)]
+ chunks = [
+ list[val : val + window_size] for val in range(0, len(list), window_size)
+ ]
avgs = []
for chunk in chunks:
avgs.append(sum(chunk) / len(chunk))
@@ -142,19 +153,24 @@ def _average_samples(self, list, window_size):
def start_measuring(self):
"""Start measuring"""
- if not self.measuring: # toggle measuring flag only if currently not measuring
- self.current_measurements = [] # reset current measurements
- self.measuring = True # set internal flag
- self.ppk2.start_measuring() # send command to ppk2
+ # toggle measuring flag only if currently not measuring
+ if not self.measuring:
+ # reset current measurements
+ self.current_measurements = []
+ # set internal flag
+ self.measuring = True
+ # send command to ppk2
+ self.ppk2.start_measuring()
self.measurement_start_time = time.time()
def stop_measuring(self):
"""Stop measuring and return average of period"""
self.measurement_stop_time = time.time()
self.measuring = False
- self.ppk2.stop_measuring() # send command to ppk2
+ # send command to ppk2
+ self.ppk2.stop_measuring()
- #samples_average = self._average_samples(self.current_measurements, 1000)
+ # samples_average = self._average_samples(self.current_measurements, 1000)
if self.filename is not None:
self.write_csv_rows(self.current_measurements)
@@ -172,24 +188,33 @@ def get_average_current_mA(self):
if len(self.current_measurements) == 0:
return 0
- average_current_mA = (sum(self.current_measurements) / len(self.current_measurements)) / 1000 # measurements are in microamperes, divide by 1000
+ # measurements are in microamperes, divide by 1000
+ average_current_mA = (
+ sum(self.current_measurements) / len(self.current_measurements)
+ ) / 1000
return average_current_mA
def get_average_power_consumption_mWh(self):
"""Return average power consumption of last measurement in mWh"""
average_current_mA = self.get_average_current_mA()
- average_power_mW = (self.source_voltage_mV / 1000) * average_current_mA # divide by 1000 as source voltage is in millivolts - this gives us milliwatts
- measurement_duration_h = self.get_measurement_duration_s() / 3600 # duration in seconds, divide by 3600 to get hours
+ # divide by 1000 as source voltage is in millivolts - this gives us milliwatts
+ average_power_mW = (self.source_voltage_mV / 1000) * average_current_mA
+ # duration in seconds, divide by 3600 to get hours
+ measurement_duration_h = self.get_measurement_duration_s() / 3600
average_consumption_mWh = average_power_mW * measurement_duration_h
return average_consumption_mWh
def get_average_charge_mC(self):
"""Returns average charge in milli coulomb"""
average_current_mA = self.get_average_current_mA()
- measurement_duration_s = self.get_measurement_duration_s() # in seconds
+ # in seconds
+ measurement_duration_s = self.get_measurement_duration_s()
return average_current_mA * measurement_duration_s
def get_measurement_duration_s(self):
"""Returns duration of measurement"""
- measurement_duration_s = (self.measurement_stop_time - self.measurement_start_time) # measurement duration in seconds
+ # measurement duration in seconds
+ measurement_duration_s = (
+ self.measurement_stop_time - self.measurement_start_time
+ )
return measurement_duration_s
\ No newline at end of file
diff --git a/src/ppk2_api/__init__.py b/src/ppk2_api/__init__.py
index e69de29..afc9510 100644
--- a/src/ppk2_api/__init__.py
+++ b/src/ppk2_api/__init__.py
@@ -0,0 +1 @@
+from .ppk2_api import PPK2_MP
\ No newline at end of file
diff --git a/src/ppk2_api/ppk2_api.py b/src/ppk2_api/ppk2_api.py
index 4b39a69..9c222f7 100644
--- a/src/ppk2_api/ppk2_api.py
+++ b/src/ppk2_api/ppk2_api.py
@@ -12,8 +12,10 @@
import queue
import threading
-class PPK2_Command():
+
+class PPK2_Command:
"""Serial command opcodes"""
+
NO_OP = 0x00
TRIGGER_SET = 0x01
AVG_NUM_SET = 0x02 # no-firmware
@@ -24,11 +26,11 @@ class PPK2_Command():
AVERAGE_STOP = 0x07
RANGE_SET = 0x08
LCD_SET = 0x09
- TRIGGER_STOP = 0x0a
- DEVICE_RUNNING_SET = 0x0c
- REGULATOR_SET = 0x0d
- SWITCH_POINT_DOWN = 0x0e
- SWITCH_POINT_UP = 0x0f
+ TRIGGER_STOP = 0x0A
+ DEVICE_RUNNING_SET = 0x0C
+ REGULATOR_SET = 0x0D
+ SWITCH_POINT_DOWN = 0x0E
+ SWITCH_POINT_UP = 0x0F
TRIGGER_EXT_TOGGLE = 0x11
SET_POWER_MODE = 0x11
RES_USER_SET = 0x12
@@ -39,18 +41,19 @@ class PPK2_Command():
SET_USER_GAINS = 0x25
-class PPK2_Modes():
+class PPK2_Modes:
"""PPK2 measurement modes"""
+
AMPERE_MODE = "AMPERE_MODE"
SOURCE_MODE = "SOURCE_MODE"
-class PPK2_API():
+class PPK2_API:
def __init__(self, port: str, **kwargs):
- '''
+ """
port - port where PPK2 is connected
**kwargs - keyword arguments to pass to the pySerial constructor
- '''
+ """
self.ser = None
self.ser = serial.Serial(port, **kwargs)
@@ -66,7 +69,7 @@ def __init__(self, port: str, **kwargs):
"I": {"0": 0, "1": 0, "2": 0, "3": 0, "4": 0},
"UG": {"0": 1, "1": 1, "2": 1, "3": 1, "4": 1},
"HW": None,
- "IA": None
+ "IA": None,
}
self.vdd_low = 800
@@ -93,7 +96,7 @@ def __init__(self, port: str, **kwargs):
self.after_spike = 0
# adc measurement buffer remainder and len of remainder
- self.remainder = {"sequence": b'', "len": 0}
+ self.remainder = {"sequence": b"", "len": 0}
def __del__(self):
"""Destructor"""
@@ -121,7 +124,8 @@ def _write_serial(self, cmd_tuple):
def _twos_comp(self, val):
"""Compute the 2's complement of int32 value"""
if (val & (1 << (32 - 1))) != 0:
- val = val - (1 << 32) # compute negative value
+ # compute negative value
+ val = val - (1 << 32)
return val
def _convert_source_voltage(self, mV):
@@ -138,11 +142,13 @@ def _convert_source_voltage(self, mV):
# get difference to baseline (the baseline is 800mV but the initial offset is 32)
diff_to_baseline = mV - self.vdd_low + offset
base_b_1 = 3
- base_b_2 = 0 # is actually 32 - compensated with above offset
+ # is actually 32 - compensated with above offset
+ base_b_2 = 0
# get the number of times we have to increase the first byte of the command
ratio = int(diff_to_baseline / 256)
- remainder = diff_to_baseline % 256 # get the remainder for byte 2
+ # get the remainder for byte 2
+ remainder = diff_to_baseline % 256
set_b_1 = base_b_1 + ratio
set_b_2 = base_b_2 + remainder
@@ -158,7 +164,7 @@ def _read_metadata(self):
time.sleep(0.1)
# TODO add a read_until serial read function with a timeout
- if read != b'' and "END" in read.decode("utf-8"):
+ if read != b"" and "END" in read.decode("utf-8"):
return read.decode("utf-8")
def _parse_metadata(self, metadata):
@@ -172,23 +178,22 @@ def _parse_metadata(self, metadata):
if key == data_pair[0]:
self.modifiers[key] = data_pair[1]
for ind in range(0, 5):
- if key+str(ind) == data_pair[0]:
+ if key + str(ind) == data_pair[0]:
if "R" in data_pair[0]:
# problem on some PPK2s with wrong calibration values - this doesn't fix it
if float(data_pair[1]) != 0:
- self.modifiers[key][str(ind)] = float(
- data_pair[1])
+ self.modifiers[key][str(ind)] = float(data_pair[1])
else:
- self.modifiers[key][str(ind)] = float(
- data_pair[1])
+ self.modifiers[key][str(ind)] = float(data_pair[1])
return True
except Exception as e:
# if exception triggers serial port is probably not correct
+ print(f"Failde to parse metadata: {e}")
return None
def _generate_mask(self, bits, pos):
pos = pos
- mask = ((2**bits-1) << pos)
+ mask = (2**bits - 1) << pos
mask = self._twos_comp(mask)
return {"mask": mask, "pos": pos}
@@ -199,15 +204,18 @@ def _get_masked_value(self, value, meas, is_bits=False):
def _handle_raw_data(self, adc_value):
"""Convert raw value to analog value"""
try:
- current_measurement_range = min(self._get_masked_value(
- adc_value, self.MEAS_RANGE), 4) # 5 is the number of parameters
+ # 5 is the number of parameters
+ current_measurement_range = min(
+ self._get_masked_value(adc_value, self.MEAS_RANGE), 4
+ )
adc_result = self._get_masked_value(adc_value, self.MEAS_ADC) * 4
bits = self._get_masked_value(adc_value, self.MEAS_LOGIC)
- analog_value = self.get_adc_result(
- current_measurement_range, adc_result) * 10**6
+ analog_value = (
+ self.get_adc_result(current_measurement_range, adc_result) * 10**6
+ )
return analog_value, bits
except Exception as e:
- print("Measurement outside of range!")
+ print(f"Measurement outside of range! {e}")
return None, None
@staticmethod
@@ -219,7 +227,8 @@ def list_devices():
devices = [
(port.device, port.serial_number[:8])
for port in ports
- if port.description.startswith("nRF Connect USB CDC ACM") and port.location.endswith("1")
+ if port.description.startswith("nRF Connect USB CDC ACM")
+ and port.location.endswith("1")
]
else:
devices = [
@@ -236,7 +245,7 @@ def get_data(self):
def get_modifiers(self):
"""Gets and sets modifiers from device memory"""
- self._write_serial((PPK2_Command.GET_META_DATA, ))
+ self._write_serial((PPK2_Command.GET_META_DATA,))
metadata = self._read_metadata()
ret = self._parse_metadata(metadata)
return ret
@@ -249,14 +258,14 @@ def start_measuring(self):
if self.mode == PPK2_Modes.AMPERE_MODE:
raise Exception("Input voltage not set!")
- self._write_serial((PPK2_Command.AVERAGE_START, ))
+ self._write_serial((PPK2_Command.AVERAGE_START,))
def stop_measuring(self):
"""Stop continuous measurement"""
- self._write_serial((PPK2_Command.AVERAGE_STOP, ))
+ self._write_serial((PPK2_Command.AVERAGE_STOP,))
def set_source_voltage(self, mV):
- """Inits device - based on observation only REGULATOR_SET is the command.
+ """Inits device - based on observation only REGULATOR_SET is the command.
The other two values correspond to the voltage level.
800mV is the lowest setting - [3,32] - the values then increase linearly
@@ -268,32 +277,44 @@ def set_source_voltage(self, mV):
def toggle_DUT_power(self, state):
"""Toggle DUT power based on parameter"""
if state == "ON":
+ # 12,1
self._write_serial(
- (PPK2_Command.DEVICE_RUNNING_SET, PPK2_Command.TRIGGER_SET)) # 12,1
+ (PPK2_Command.DEVICE_RUNNING_SET, PPK2_Command.TRIGGER_SET)
+ )
if state == "OFF":
- self._write_serial(
- (PPK2_Command.DEVICE_RUNNING_SET, PPK2_Command.NO_OP)) # 12,0
+ # 12,0
+ self._write_serial((PPK2_Command.DEVICE_RUNNING_SET, PPK2_Command.NO_OP))
def use_ampere_meter(self):
"""Configure device to use ampere meter"""
self.mode = PPK2_Modes.AMPERE_MODE
- self._write_serial((PPK2_Command.SET_POWER_MODE,
- PPK2_Command.TRIGGER_SET)) # 17,1
+ # 17,1
+ self._write_serial((PPK2_Command.SET_POWER_MODE, PPK2_Command.TRIGGER_SET))
def use_source_meter(self):
"""Configure device to use source meter"""
self.mode = PPK2_Modes.SOURCE_MODE
- self._write_serial((PPK2_Command.SET_POWER_MODE,
- PPK2_Command.AVG_NUM_SET)) # 17,2
+ # 17,2
+ self._write_serial((PPK2_Command.SET_POWER_MODE, PPK2_Command.AVG_NUM_SET))
def get_adc_result(self, current_range, adc_value):
"""Get result of adc conversion"""
current_range = str(current_range)
result_without_gain = (adc_value - self.modifiers["O"][current_range]) * (
- self.adc_mult / self.modifiers["R"][current_range])
- adc = self.modifiers["UG"][current_range] * (result_without_gain * (self.modifiers["GS"][current_range] * result_without_gain + self.modifiers["GI"][current_range]) + (
- self.modifiers["S"][current_range] * (self.current_vdd / 1000) + self.modifiers["I"][current_range]))
+ self.adc_mult / self.modifiers["R"][current_range]
+ )
+ adc = self.modifiers["UG"][current_range] * (
+ result_without_gain
+ * (
+ self.modifiers["GS"][current_range] * result_without_gain
+ + self.modifiers["GI"][current_range]
+ )
+ + (
+ self.modifiers["S"][current_range] * (self.current_vdd / 1000)
+ + self.modifiers["I"][current_range]
+ )
+ )
prev_rolling_avg = self.rolling_avg
prev_rolling_avg4 = self.rolling_avg4
@@ -302,12 +323,18 @@ def get_adc_result(self, current_range, adc_value):
if self.rolling_avg is None:
self.rolling_avg = adc
else:
- self.rolling_avg = self.spike_filter_alpha * adc + (1 - self.spike_filter_alpha) * self.rolling_avg
-
+ self.rolling_avg = (
+ self.spike_filter_alpha * adc
+ + (1 - self.spike_filter_alpha) * self.rolling_avg
+ )
+
if self.rolling_avg4 is None:
self.rolling_avg4 = adc
else:
- self.rolling_avg4 = self.spike_filter_alpha5 * adc + (1 - self.spike_filter_alpha5) * self.rolling_avg4
+ self.rolling_avg4 = (
+ self.spike_filter_alpha5 * adc
+ + (1 - self.spike_filter_alpha5) * self.rolling_avg4
+ )
if self.prev_range is None:
self.prev_range = current_range
@@ -326,7 +353,7 @@ def get_adc_result(self, current_range, adc_value):
adc = self.rolling_avg4
else:
adc = self.rolling_avg
-
+
self.after_spike -= 1
self.prev_range = current_range
@@ -334,7 +361,8 @@ def get_adc_result(self, current_range, adc_value):
def _digital_to_analog(self, adc_value):
"""Convert discrete value to analog value"""
- return int.from_bytes(adc_value, byteorder="little", signed=False) # convert reading to analog value
+ # convert reading to analog value
+ return int.from_bytes(adc_value, byteorder="little", signed=False)
def digital_channels(self, bits):
"""
@@ -363,14 +391,13 @@ def get_samples(self, buf):
Manipulation of samples is left to the user.
See example for more info.
"""
-
- sample_size = 4 # one analog value is 4 bytes in size
+ # one analog value is 4 bytes in size
+ sample_size = 4
offset = self.remainder["len"]
samples = []
raw_digital_output = []
- first_reading = (
- self.remainder["sequence"] + buf[0:sample_size-offset])[:4]
+ first_reading = (self.remainder["sequence"] + buf[0 : sample_size - offset])[:4]
adc_val = self._digital_to_analog(first_reading)
measurement, bits = self._handle_raw_data(adc_val)
if measurement is not None:
@@ -381,7 +408,7 @@ def get_samples(self, buf):
offset = sample_size - offset
while offset <= len(buf) - sample_size:
- next_val = buf[offset:offset + sample_size]
+ next_val = buf[offset : offset + sample_size]
offset += sample_size
adc_val = self._digital_to_analog(next_val)
measurement, bits = self._handle_raw_data(adc_val)
@@ -390,18 +417,19 @@ def get_samples(self, buf):
if bits is not None:
raw_digital_output.append(bits)
- self.remainder["sequence"] = buf[offset:len(buf)]
- self.remainder["len"] = len(buf)-offset
+ self.remainder["sequence"] = buf[offset : len(buf)]
+ self.remainder["len"] = len(buf) - offset
# return list of samples and raw digital outputs
# handle those lists in PPK2 API wrapper
- return samples, raw_digital_output
+ return samples, raw_digital_output
class PPK_Fetch(threading.Thread):
- '''
+ """
Background process for polling the data in multi-threaded variant
- '''
+ """
+
def __init__(self, ppk2, quit_evt, buffer_len_s=10, buffer_chunk_s=0.5):
super().__init__()
self._ppk2 = ppk2
@@ -411,8 +439,10 @@ def __init__(self, ppk2, quit_evt, buffer_len_s=10, buffer_chunk_s=0.5):
self._stats = (None, None)
self._last_timestamp = 0
- self._buffer_max_len = int(buffer_len_s * 100000 * 4) # 100k 4-byte samples per second
- self._buffer_chunk = int(buffer_chunk_s * 100000 * 4) # put in the queue in chunks of 0.5s
+ # 100k 4-byte samples per second
+ self._buffer_max_len = int(buffer_len_s * 100000 * 4)
+ # put in the queue in chunks of 0.5s
+ self._buffer_chunk = int(buffer_chunk_s * 100000 * 4)
# round buffers to a whole sample
if self._buffer_max_len % 4 != 0:
@@ -425,17 +455,19 @@ def __init__(self, ppk2, quit_evt, buffer_len_s=10, buffer_chunk_s=0.5):
def run(self):
s = 0
t = time.time()
- local_buffer = b''
+ local_buffer = b""
while not self._quit.is_set():
d = PPK2_API.get_data(self._ppk2)
tm_now = time.time()
local_buffer += d
while len(local_buffer) >= self._buffer_chunk:
# FIXME: check if lock might be needed when discarding old data
- self._buffer_q.put(local_buffer[:self._buffer_chunk])
- while self._buffer_q.qsize()>self._buffer_max_len/self._buffer_chunk:
+ self._buffer_q.put(local_buffer[: self._buffer_chunk])
+ while (
+ self._buffer_q.qsize() > self._buffer_max_len / self._buffer_chunk
+ ):
self._buffer_q.get()
- local_buffer = local_buffer[self._buffer_chunk:]
+ local_buffer = local_buffer[self._buffer_chunk :]
self._last_timestamp = tm_now
# calculate stats
@@ -458,11 +490,12 @@ def run(self):
break
def get_data(self):
- ret = b''
+ ret = b""
count = 0
while True:
try:
- ret += self._buffer_q.get(timeout=0.001) # get_nowait sometimes skips a chunk for some reason
+ # get_nowait sometimes skips a chunk for some reason
+ ret += self._buffer_q.get(timeout=0.001)
count += 1
except queue.Empty:
break
@@ -470,17 +503,20 @@ def get_data(self):
class PPK2_MP(PPK2_API):
- '''
+ """
Multiprocessing variant of the object. The interface is the same as for the regular one except it spawns
a background process on start_measuring()
- '''
- def __init__(self, port, buffer_max_size_seconds=10, buffer_chunk_seconds=0.1, **kwargs):
- '''
+ """
+
+ def __init__(
+ self, port, buffer_max_size_seconds=10, buffer_chunk_seconds=0.1, **kwargs
+ ):
+ """
port - port where PPK2 is connected
buffer_max_size_seconds - how many seconds of data to keep in the buffer
buffer_chunk_seconds - how many seconds of data to put in the queue at once
**kwargs - keyword arguments to pass to the pySerial constructor
- '''
+ """
super().__init__(port, **kwargs)
self._fetcher = None
@@ -502,27 +538,32 @@ def __del__(self):
def start_measuring(self):
# discard the data in the buffer
self.stop_measuring()
- while self.get_data()!=b'':
+ while self.get_data() != b"":
pass
PPK2_API.start_measuring(self)
self._quit_evt.clear()
if self._fetcher is not None:
return
-
- self._fetcher = PPK_Fetch(self, self._quit_evt, self._buffer_max_size_seconds, self._buffer_chunk_seconds)
+
+ self._fetcher = PPK_Fetch(
+ self,
+ self._quit_evt,
+ self._buffer_max_size_seconds,
+ self._buffer_chunk_seconds,
+ )
self._fetcher.start()
def stop_measuring(self):
PPK2_API.stop_measuring(self)
- self.get_data() # flush the serial buffer (to prevent unicode error on next command)
+ self.get_data() # flush the serial buffer (to prevent unicode error on next command)
self._quit_evt.set()
if self._fetcher is not None:
- self._fetcher.join() # join() will block if the queue isn't empty
+ self._fetcher.join() # join() will block if the queue isn't empty
self._fetcher = None
def get_data(self):
try:
return self._fetcher.get_data()
except (TypeError, AttributeError):
- return b''
+ return b""
\ No newline at end of file
diff --git a/tools/ppk2_battery_emulator_gui.py b/tools/ppk2_battery_emulator_gui.py
new file mode 100644
index 0000000..a362f40
--- /dev/null
+++ b/tools/ppk2_battery_emulator_gui.py
@@ -0,0 +1,818 @@
+import tkinter as tk
+from tkinter import filedialog, ttk, messagebox
+import threading
+import time
+import math
+import pandas as pd
+import matplotlib.pyplot as plt
+from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
+import numpy as np
+import random
+import re
+from datetime import datetime
+
+# ----------------------------------------------------------------------
+# PPK2 API INTEGRATION (Forced MOCK Mode)
+# ----------------------------------------------------------------------
+
+# --- MOCK DEFINITIONS (Always available for fallback) ---
+BASE_I_QUIET = 10.0 # Static base for quiet current (mA)
+BASE_I_PEAK = 130.0 # Static base for peak current (mA)
+NOISE_RANGE = 0.5 # Random fluctuation range (±0.5 mA)
+
+class PPK2Mock:
+ """Emulates the PPK2_API interface for GUI testing."""
+ def start_source_meter(self, samplerate, logic_enabled=False):
+ self.samplerate = samplerate
+ self.logic_enabled = logic_enabled
+ self.voltage = 0
+ self.start_time = time.time()
+ print(f"MOCK: Started Source Meter @ {samplerate} Hz (Logic: {logic_enabled})")
+
+ def set_voltage(self, voltage_v):
+ self.voltage = voltage_v
+
+ def get_data(self):
+ """Simulate dynamic current draw with random variance around static values."""
+ t = time.time()
+
+ # A common sampling buffer size
+ num_samples = 100
+ data = []
+ for i in range(num_samples):
+ # Simulate bursts of current every 5 seconds
+ if (t + i/self.samplerate) % 5 < 0.2:
+ I_base = BASE_I_PEAK
+ else:
+ I_base = BASE_I_QUIET
+
+ I_noise = random.uniform(-NOISE_RANGE, NOISE_RANGE)
+ data.append(I_base + I_noise)
+ return data
+
+ def get_logic_data(self):
+ """Simulate dynamic logic data."""
+ t = time.time()
+ # D0 toggles every 1s, D1 toggles every 0.5s, D5 toggles every 10s
+ return [
+ (t % 2) > 1, (t % 0.5) > 0.25, random.choice([True, False]), False, True, (t % 10) > 9.5, False, False
+ ]
+
+ def stop(self):
+ print("MOCK: Stopped.")
+
+# --- API LOADING LOGIC (FORCED MOCK MODE) ---
+
+def init_ppk2_device(voltage_v, samplerate_hz, logic_enabled=False):
+ """Initializes the Mock PPK2 device."""
+ mock_device = PPK2Mock()
+ mock_device.start_source_meter(samplerate=samplerate_hz, logic_enabled=logic_enabled)
+ mock_device.set_voltage(voltage_v)
+ return mock_device
+
+API_MODE = "MOCK"
+print("WARNING: PPK2 API not found. Using MOCK mode for simulation.")
+
+# ----------------------------------------------------------------------
+# EMULATION CONSTANTS AND BATTERY PARAMETERS
+# ----------------------------------------------------------------------
+
+DISCHARGE_RATE = 100 # Emulation acceleration factor (100x faster than real time)
+TIME_STEP_REAL_SEC = 1.0 # The amount of real time simulated in one emulation step
+TIME_STEP_EMUL_SEC = TIME_STEP_REAL_SEC / DISCHARGE_RATE # The actual time the PPK2 samples current
+
+class BatteryEmulator(tk.Frame):
+
+ # Static data for battery technologies and their discharge limits
+ BATTERY_PROFILES = {
+ "Li-Po/Li-Ion (3.7V)": {"V_START": 4.2, "V_STOP": 3.2},
+ "LiFePO4 (3.2V)": {"V_START": 3.6, "V_STOP": 2.8},
+ "NiMH/NiCd (1.2V)": {"V_START": 1.4, "V_STOP": 1.0},
+ "Alkaline (1.5V)": {"V_START": 1.6, "V_STOP": 1.2}
+ }
+
+ def __init__(self, master=None):
+ super().__init__(master)
+ self.master = master
+ self.master.title(f"PPK2 [{API_MODE}] - Battery Simulator and Current Profiler")
+ self.ppk2 = None
+ self.is_running = False
+ self.data_thread = None
+
+ # State Variables
+ self.current_voltage = 0.0
+ self.current_capacity_mah = 0.0
+ self.time_elapsed_real_sec = 0
+ self.all_current_samples = []
+ self.all_time_real_s = []
+ self.all_voltage_v = []
+ self.all_logic_data = []
+
+ # Configuration
+ self.config = {
+ "V_STOP": self.BATTERY_PROFILES["Li-Po/Li-Ion (3.7V)"]["V_STOP"],
+ "SAMPLE_RATE_HZ": 1000
+ }
+
+ self.CAPACITY_OPTIONS = [1500, 3000, 5000, 8000]
+ self.V_START_DEFAULT = self.BATTERY_PROFILES["Li-Po/Li-Ion (3.7V)"]["V_START"]
+ self.CAPACITY_DEFAULT = 3000
+ self.SIM_TIME_DEFAULT_STR = "05:00"
+
+ # GUI Variables
+ self.battery_type_var = tk.StringVar(value="Li-Po/Li-Ion (3.7V)")
+ self.v_start_var = tk.DoubleVar(value=self.V_START_DEFAULT)
+ self.v_stop_var = tk.DoubleVar(value=self.config["V_STOP"])
+ self.capacity_var = tk.StringVar(value=str(self.CAPACITY_DEFAULT))
+ self.logic_enabled_var = tk.BooleanVar(value=False)
+ self.sim_discharge_var = tk.BooleanVar(value=False)
+ self.sim_time_str_var = tk.StringVar(value=self.SIM_TIME_DEFAULT_STR)
+
+ # Bind voltage variable changes to update the config
+ self.v_start_var.trace_add("write", self.update_config_voltage)
+ self.v_stop_var.trace_add("write", self.update_config_voltage)
+
+ # Simulation specific variables
+ self.total_sim_time_sec = 0
+ self.required_mah_loss_per_sec = 0
+
+ self.create_widgets()
+ self.load_initial_state()
+ self.print_battery_info()
+
+ def print_battery_info(self):
+ print("\n--- Common Battery Technology Information ---")
+ for tech, limits in self.BATTERY_PROFILES.items():
+ print(f"| {tech:<20} | V_Start: {limits['V_START']:.1f}V | V_Stop: {limits['V_STOP']:.1f}V | Capacity: 100 mAh to 50,000 mAh+")
+ print("-------------------------------------------\n")
+
+ def convert_mm_ss_to_sec(self, mm_ss_str):
+ """Converts MM:SS string to total seconds."""
+ match = re.match(r"(\d+):(\d+)", mm_ss_str.strip())
+ if match:
+ minutes = int(match.group(1))
+ seconds = int(match.group(2))
+ return minutes * 60 + seconds
+ try:
+ return float(mm_ss_str) * 60
+ except:
+ return 0
+
+ def format_seconds_to_hms(self, total_seconds):
+ """Converts total seconds into HH:MM:SS format."""
+ if total_seconds == float('inf'):
+ return "INF"
+
+ total_seconds = int(total_seconds)
+ hours = total_seconds // 3600
+ minutes = (total_seconds % 3600) // 60
+ seconds = total_seconds % 60
+ return f"{hours:02}:{minutes:02}:{seconds:02}"
+
+ def update_config_voltage(self, *args):
+ """Updates the internal config from the GUI DoubleVars."""
+ try:
+ self.config["V_START"] = self.v_start_var.get()
+ self.config["V_STOP"] = self.v_stop_var.get()
+ except tk.TclError:
+ # Handle case where conversion fails (e.g., user is typing non-float data)
+ pass
+
+ def update_voltage_from_type(self, *args):
+ """Updates V_START and V_STOP based on the selected battery type."""
+ selected_type = self.battery_type_var.get()
+ profile = self.BATTERY_PROFILES.get(selected_type)
+
+ if profile:
+ self.v_start_var.set(profile["V_START"])
+ self.v_stop_var.set(profile["V_STOP"])
+ print(f"INFO: Set V_START={profile['V_START']}V and V_STOP={profile['V_STOP']}V for {selected_type}.")
+ else:
+ print(f"WARNING: Unknown battery type selected: {selected_type}")
+
+ def create_voltage_spinbox(self, parent, variable, row, column, label_text):
+ """Creates a label and an editable Spinbox with 0.01 increment/decrement."""
+ ttk.Label(parent, text=label_text).grid(row=row, column=column, padx=5, pady=2, sticky='w')
+
+ spinbox = tk.Spinbox(parent,
+ from_=0.0, to_=5.0,
+ increment=0.01,
+ textvariable=variable,
+ format="%.2f",
+ width=6,
+ command=self.update_config_voltage) # Update on button press
+
+ spinbox.grid(row=row, column=column+1, padx=5, pady=2, sticky='w')
+
+ # Bind the Enter key to update the config (for manual entry)
+ spinbox.bind('', lambda event: self.update_config_voltage())
+
+ return spinbox
+
+ def load_initial_state(self):
+ # Apply current settings to configuration variables
+ self.config["V_STOP"] = self.v_stop_var.get()
+ self.config["V_START"] = self.v_start_var.get()
+
+ try:
+ capacity = float(self.capacity_var.get())
+ except ValueError:
+ capacity = self.CAPACITY_DEFAULT
+ self.capacity_var.set(str(self.CAPACITY_DEFAULT))
+
+ self.config["CAPACITY_NOMINAL_MAH"] = capacity
+
+ self.current_capacity_mah = self.config["CAPACITY_NOMINAL_MAH"]
+ self.current_voltage = self.config["V_START"]
+ self.v_label.config(text=f"V_SYS Voltage (V): {self.current_voltage:.3f}")
+ self.soc_label.config(text="SoC (%): 100.0 (Avg I: 0.0 mA)")
+
+ # Prepare for Simulated Discharge Mode
+ if self.sim_discharge_var.get():
+ self.total_sim_time_sec = self.convert_mm_ss_to_sec(self.sim_time_str_var.get())
+
+ # --- Calculation for console/summary output (not used for V drop) ---
+ soc_start = 100.0
+ sim_V_stop = max(self.config["V_STOP"], self.config["V_START"] - 0.01)
+ soc_stop = self.get_soc_percent_from_voltage(sim_V_stop)
+
+ required_soc_drop = soc_start - soc_stop
+ required_mah_drop = required_soc_drop * (self.config["CAPACITY_NOMINAL_MAH"] / 100.0)
+
+ if self.total_sim_time_sec > 0:
+ self.required_mah_loss_per_sec = required_mah_drop / self.total_sim_time_sec
+ else:
+ self.required_mah_loss_per_sec = 0
+
+ self.current_capacity_mah = self.config["CAPACITY_NOMINAL_MAH"]
+
+
+ def create_widgets(self):
+ # --- TOP CONTROL FRAME ---
+ control_frame = ttk.LabelFrame(self.master, text="Control and Configuration")
+ control_frame.pack(padx=10, pady=5, fill="x")
+
+ # Row 0: Battery Type Selection and Capacity
+ ttk.Label(control_frame, text="Battery Type:").grid(row=0, column=0, padx=5, pady=2, sticky='w')
+ battery_menu = ttk.Combobox(control_frame,
+ textvariable=self.battery_type_var,
+ values=list(self.BATTERY_PROFILES.keys()),
+ width=20,
+ state="readonly")
+ battery_menu.grid(row=0, column=1, padx=5, pady=2, sticky='w', columnspan=2)
+ self.battery_type_var.trace_add("write", self.update_voltage_from_type)
+
+ ttk.Label(control_frame, text="Capacity (mAh):").grid(row=0, column=3, padx=5, pady=2, sticky='w')
+ capacity_menu = ttk.Combobox(control_frame, textvariable=self.capacity_var, values=self.CAPACITY_OPTIONS, width=8)
+ capacity_menu.grid(row=0, column=4, padx=5, pady=2, sticky='w')
+
+
+ # Row 1: Voltage Settings (Using new Spinbox helper)
+ self.create_voltage_spinbox(control_frame, self.v_start_var, 1, 0, "Start V (V):")
+ self.create_voltage_spinbox(control_frame, self.v_stop_var, 1, 2, "Stop V (V):")
+
+ # Logic Analyzer and Start Button (Row 2)
+ ttk.Checkbutton(control_frame, text="Enable Logic Analyzer", variable=self.logic_enabled_var).grid(row=2, column=0, columnspan=2, padx=5, pady=2, sticky='w')
+ self.start_button = ttk.Button(control_frame, text="START Emulation", command=self.toggle_emulation)
+ self.start_button.grid(row=2, column=4, padx=10, pady=5)
+
+ # Row 3: Simulated Discharge Controls
+ sim_frame = ttk.LabelFrame(control_frame, text="Simulated Time Discharge")
+ sim_frame.grid(row=3, column=0, columnspan=5, padx=5, pady=5, sticky='we')
+
+ ttk.Checkbutton(sim_frame, text="Enable Time Simulation", variable=self.sim_discharge_var).pack(side='left', padx=5, pady=2)
+ ttk.Label(sim_frame, text="Target Time (MM:SS):").pack(side='left', padx=5, pady=2)
+ ttk.Entry(sim_frame, textvariable=self.sim_time_str_var, width=8).pack(side='left', padx=5, pady=2)
+ ttk.Label(sim_frame, text=f"({self.SIM_TIME_DEFAULT_STR} default)").pack(side='left', padx=5, pady=2)
+
+
+ # --- STATE DISPLAY FRAME ---
+ state_frame = ttk.LabelFrame(self.master, text="Simulation State")
+ state_frame.pack(padx=10, pady=5, fill="x")
+ self.v_label = ttk.Label(state_frame, text="V_SYS Voltage (V): 0.000", font=('Arial', 12, 'bold'))
+ self.v_label.grid(row=0, column=0, padx=5, pady=5)
+ self.soc_label = ttk.Label(state_frame, text="SoC (%): 0.0 (Avg I: 0.0 mA)", font=('Arial', 12, 'bold'))
+ self.soc_label.grid(row=0, column=1, padx=5, pady=5)
+ self.runtime_label = ttk.Label(state_frame, text="Real Runtime (h): 0.00")
+ self.runtime_label.grid(row=0, column=2, padx=5, pady=5)
+
+ # Logic Analyzer Status
+ logic_status_frame = ttk.Frame(state_frame)
+ logic_status_frame.grid(row=1, column=0, columnspan=3, pady=5)
+ ttk.Label(logic_status_frame, text="Logic Pins:").pack(side='left', padx=5)
+ self.logic_indicators = []
+ for i in range(8):
+ label = ttk.Label(logic_status_frame, text=f"D{i}: LOW", width=6)
+ label.pack(side='left', padx=2)
+ self.logic_indicators.append(label)
+
+ # --- PLOT FRAME ---
+ plot_frame = ttk.LabelFrame(self.master, text="Current and Voltage Profile")
+ plot_frame.pack(padx=10, pady=5, fill="both", expand=True)
+
+ self.fig, (self.ax_i, self.ax_v) = plt.subplots(2, 1, figsize=(8, 6), sharex=True)
+
+ # Current Plot (Top)
+ self.ax_i.set_ylabel("Current (mA)")
+ self.ax_i.grid(True)
+ self.line_i, = self.ax_i.plot([], [], 'r-')
+
+ # Voltage Plot (Bottom)
+ self.ax_v.set_xlabel("Relative Time (s)")
+ self.ax_v.set_ylabel("Voltage (V)")
+ self.ax_v.grid(True)
+ self.line_v, = self.ax_v.plot([], [], 'b-')
+
+ self.canvas = FigureCanvasTkAgg(self.fig, master=plot_frame)
+ self.canvas_widget = self.canvas.get_tk_widget()
+ self.canvas_widget.pack(fill="both", expand=True)
+
+ # --- STATISTICS FRAME ---
+ stats_frame = ttk.LabelFrame(self.master, text="Measurement Statistics")
+ stats_frame.pack(padx=10, pady=5, fill="x")
+ self.peak_label = ttk.Label(stats_frame, text="Peak Current (mA): 0.0")
+ self.peak_label.grid(row=0, column=0, padx=5, pady=5)
+ self.rms_label = ttk.Label(stats_frame, text="RMS Current (mA): 0.0")
+ self.rms_label.grid(row=0, column=1, padx=5, pady=5)
+ self.save_button = ttk.Button(stats_frame, text="Save Log (.csv)", command=self.save_data)
+ self.save_button.grid(row=0, column=2, padx=10, pady=5)
+
+ def toggle_emulation(self):
+ if not self.is_running:
+ self.start_emulation()
+ else:
+ self.stop_emulation()
+
+ def start_emulation(self):
+ try:
+ self.load_initial_state()
+
+ # 1. Initialize PPK2 using the unified function
+ self.ppk2 = init_ppk2_device(
+ self.config["V_START"],
+ self.config["SAMPLE_RATE_HZ"],
+ self.logic_enabled_var.get()
+ )
+
+ # 2. Reset State and Logs
+ self.all_current_samples = []
+ self.all_time_real_s = []
+ self.all_voltage_v = []
+ self.all_logic_data = []
+ self.time_elapsed_real_sec = 0
+
+ # 3. Start Data Thread
+ self.is_running = True
+ self.start_button.config(text="STOP Emulation", state=tk.NORMAL)
+ self.data_thread = threading.Thread(target=self.emulation_loop)
+ self.data_thread.daemon = True
+ self.data_thread.start()
+
+ # Initial console debug message
+ print("\n--- EMULATION STARTING ---")
+ print(f"Mode: {'Time Simulation' if self.sim_discharge_var.get() else 'Current Emulation'} | Battery: {self.battery_type_var.get()}")
+ print(f"Capacity: {self.config['CAPACITY_NOMINAL_MAH']} mAh | V_Start: {self.config['V_START']} V | V_Stop: {self.config['V_STOP']} V")
+ if self.sim_discharge_var.get():
+ print(f"Target Discharge Time: {self.sim_time_str_var.get()} (MM:SS) | Required Loss Rate: {self.required_mah_loss_per_sec * 3600.0:.2f} mA (Theoretical)")
+ print("--------------------------")
+
+ except Exception as e:
+ messagebox.showerror("PPK2 Error", f"Failed to initialize PPK2: {e}. Check API connection.")
+ self.stop_emulation(is_error=True)
+
+ def stop_emulation(self, is_error=False):
+ if not self.is_running and not is_error:
+ return
+
+ self.is_running = False
+
+ if self.ppk2:
+ try:
+ # We do not need the real stop() for mock, but keep it structured
+ if API_MODE == "REAL":
+ # self.ppk2.stop() # Uncomment for real API
+ pass
+ else:
+ self.ppk2.stop() # Mock stop
+ except Exception as e:
+ print(f"Error while stopping PPK2 device: {e}")
+ self.ppk2 = None
+
+ if self.data_thread and self.data_thread.is_alive():
+ self.data_thread.join(timeout=0.1)
+
+ self.start_button.config(text="START Emulation", state=tk.NORMAL)
+
+ def get_voltage_from_soc(self, soc_percent):
+ """Calculates voltage from SoC percentage based on simplified discharge curve."""
+ V_start = self.config["V_START"]
+ V_stop = self.config["V_STOP"]
+
+ if V_start <= V_stop: return V_start # Failsafe
+
+ V_range = V_start - V_stop
+
+ # Simplified S-curve model
+ if soc_percent > 90:
+ V_drop_initial = V_range * 0.1
+ V_current = V_start - ((100 - soc_percent) * (V_drop_initial / 10))
+ return max(V_stop, V_current)
+
+ elif soc_percent > 10:
+ V_plateau_start = V_start - (V_range * 0.1)
+ V_plateau_end = V_stop + (V_range * 0.1)
+
+ V_plateau_drop = V_plateau_start - V_plateau_end
+
+ V_current = V_plateau_start - ((90 - soc_percent) * (V_plateau_drop / 80))
+ return max(V_stop, V_current)
+
+ else:
+ V_plateau_end = V_stop + (V_range * 0.1)
+
+ V_current = V_stop + (soc_percent * (V_plateau_end - V_stop) / 10)
+ return max(V_stop, V_current)
+
+
+ def get_soc_percent_from_voltage(self, voltage):
+ """Reverse calculation: Estimates SoC from voltage (used for setup)."""
+ V_start = self.config["V_START"]
+ V_stop = self.config["V_STOP"]
+
+ if V_start <= V_stop: return 100.0
+
+ V_range = V_start - V_stop
+
+ if voltage >= V_start:
+ return 100.0
+ elif voltage <= V_stop:
+ return 0.0
+
+ # Simplified inverse mapping
+ V_drop_10_percent = V_range * 0.1
+ V_plateau_start = V_start - V_drop_10_percent
+
+ if voltage > V_plateau_start: # Top 10%
+ return 100 - ((V_start - voltage) / V_drop_10_percent) * 10
+
+ V_plateau_end = V_stop + V_drop_10_percent
+
+ if voltage >= V_plateau_end: # Middle 80%
+ V_plateau_drop = V_plateau_start - V_plateau_end
+ if V_plateau_drop <= 0: return 90.0
+ return 90 - ((V_plateau_start - voltage) / V_plateau_drop) * 80
+
+ # Bottom 10%
+ V_final_range = V_plateau_end - V_stop
+ if V_final_range <= 0: return 10.0
+ return ((voltage - V_stop) / V_final_range) * 10
+
+
+ def emulation_loop(self):
+ last_emulation_time = time.time()
+ is_sim_discharge = self.sim_discharge_var.get()
+
+ while self.is_running and self.current_voltage > self.config["V_STOP"]:
+
+ if not self.is_running:
+ break
+
+ # 1. CURRENT AND LOGIC DATA MEASUREMENT
+ try:
+ currents_ma = self.ppk2.get_data()
+ logic_data = self.ppk2.get_logic_data() if self.logic_enabled_var.get() else [False] * 8
+ except Exception as e:
+ print(f"Error fetching data: {e}")
+ self.is_running = False
+ break
+
+ if not currents_ma or len(currents_ma) == 0:
+ time.sleep(0.01)
+ continue
+
+ currents_ma = np.array(currents_ma)
+
+ # --- ENERGY CONSUMPTION LOGIC ---
+
+ time_h_simulated = TIME_STEP_REAL_SEC / 3600.0
+ I_avg_ma_realtime = np.mean(currents_ma)
+
+ # 2. Update time elapsed
+ self.time_elapsed_real_sec += TIME_STEP_REAL_SEC
+
+ if is_sim_discharge:
+ # SIMULATED DISCHARGE MODE: Voltage loss is time-based (FIXED).
+
+ # Calculate elapsed percentage
+ elapsed_ratio = min(1.0, self.time_elapsed_real_sec / self.total_sim_time_sec)
+
+ # Calculate the target voltage based on time percentage
+ V_start = self.config["V_START"]
+ V_stop = self.config["V_STOP"]
+
+ # Linearly scale V_start to V_stop over the total time
+ self.current_voltage = V_start - (elapsed_ratio * (V_start - V_stop))
+
+ # Check for end condition
+ if self.time_elapsed_real_sec >= self.total_sim_time_sec:
+ self.current_voltage = V_stop # Ensure it hits V_STOP exactly
+ self.is_running = False
+ break
+
+ # Calculate SoC based on the new time-based voltage
+ soc_percent = self.get_soc_percent_from_voltage(self.current_voltage)
+ self.current_capacity_mah = self.config["CAPACITY_NOMINAL_MAH"] * (soc_percent / 100.0)
+
+ # Time remaining calculation (simple countdown)
+ time_remaining_sec = max(0, self.total_sim_time_sec - self.time_elapsed_real_sec)
+
+ else:
+ # REAL EMULATION MODE: Voltage loss is current-based.
+
+ consumed_mah = I_avg_ma_realtime * time_h_simulated
+ self.current_capacity_mah -= consumed_mah
+ soc_percent = (self.current_capacity_mah / self.config["CAPACITY_NOMINAL_MAH"]) * 100.0
+ self.current_voltage = self.get_voltage_from_soc(soc_percent)
+
+ # Estimated remaining time is based on current consumption rate
+ mah_remaining = self.current_capacity_mah - self.config["CAPACITY_NOMINAL_MAH"] * self.get_soc_percent_from_voltage(self.config["V_STOP"]) / 100.0
+ time_remaining_sec = (mah_remaining / I_avg_ma_realtime) * 3600.0 if I_avg_ma_realtime > 0 else float('inf')
+
+
+ # 3. UPDATE VOLTAGE ON PPK2
+ self.ppk2.set_voltage(self.current_voltage)
+
+ # 4. Update Logs
+
+ for i, current_sample in enumerate(currents_ma):
+ # Log the current voltage for every sample in this step
+ self.all_current_samples.append(current_sample)
+ self.all_time_real_s.append(self.time_elapsed_real_sec)
+ self.all_voltage_v.append(self.current_voltage)
+
+ if self.logic_enabled_var.get():
+ self.all_logic_data.append(logic_data)
+ else:
+ self.all_logic_data.append([False] * 8)
+
+
+ # Calculate statistics
+ peak_current = np.max(currents_ma)
+ rms_current = np.sqrt(np.mean(currents_ma**2))
+
+ # --- CONSOLE DEBUG OUTPUT ---
+ # Print every 5 seconds (50 steps of 0.1s)
+ if int(self.time_elapsed_real_sec * 10) % 50 == 0 or self.time_elapsed_real_sec == self.total_sim_time_sec:
+ time_rem_str = self.format_seconds_to_hms(time_remaining_sec)
+
+ print(
+ f"TIME: {self.time_elapsed_real_sec:.1f} s | "
+ f"V_OUT: {self.current_voltage:.3f} V | "
+ f"I_AVG: {I_avg_ma_realtime:.2f} mA | "
+ f"SoC: {soc_percent:.1f}% | "
+ f"Rem. Time: {time_rem_str}"
+ )
+
+ # Update GUI (Thread-safe way)
+ self.master.after(0, self.update_gui, soc_percent, I_avg_ma_realtime, peak_current, rms_current, currents_ma, logic_data)
+
+ # Wait for the next measurement step
+ time.sleep(max(0, TIME_STEP_EMUL_SEC - (time.time() - last_emulation_time)))
+ last_emulation_time = time.time()
+
+
+ # Final cleanup if loop exited
+ print("--- EMULATION LOOP EXIT ---")
+ self.master.after(0, self.stop_emulation)
+ if self.current_voltage <= self.config["V_STOP"] or is_sim_discharge:
+ self.master.after(0, self.display_final_results)
+
+
+ def update_gui(self, soc_percent, I_avg_ma_realtime, peak_current, rms_current, currents_to_plot, logic_data):
+ # Update Labels
+ self.v_label.config(text=f"V_SYS Voltage (V): {self.current_voltage:.3f}")
+ self.soc_label.config(text=f"SoC (%): {soc_percent:.1f} (Avg I: {I_avg_ma_realtime:.1f} mA)")
+
+ # Display runtime in HH:MM:SS format
+ runtime_hms = self.format_seconds_to_hms(self.time_elapsed_real_sec)
+ self.runtime_label.config(text=f"Real Runtime (H:M:S): {runtime_hms}")
+
+ self.peak_label.config(text=f"Peak Current (mA): {peak_current:.2f}")
+ self.rms_label.config(text=f"RMS Current (mA): {rms_current:.2f}")
+
+ mode = "TIME SIMULATION" if self.sim_discharge_var.get() else "CURRENT EMULATION"
+ self.master.title(f"PPK2 [{API_MODE}] - Battery Simulator ({mode})")
+
+ # Update Logic Indicators
+ for i, val in enumerate(logic_data):
+ color = 'green' if val else 'red'
+ text = 'HIGH' if val else 'LOW'
+ self.logic_indicators[i].config(text=f"D{i}: {text}", background=color)
+
+ # Update Plots (Oscilloscope)
+ plot_times = np.arange(0, len(currents_to_plot)) / self.config["SAMPLE_RATE_HZ"]
+
+ # Current Plot
+ self.line_i.set_data(plot_times, currents_to_plot)
+ self.ax_i.set_xlim(0, plot_times[-1] if plot_times.size > 0 else 0.01)
+ self.ax_i.set_ylim(np.min(currents_to_plot) * 0.9, np.max(currents_to_plot) * 1.1 if currents_to_plot.size > 0 else 10)
+
+ # Voltage Plot
+ voltage_plot = np.full_like(plot_times, self.current_voltage)
+ self.line_v.set_data(plot_times, voltage_plot)
+ self.ax_v.set_ylim(self.config["V_STOP"] * 0.9, self.config["V_START"] * 1.1)
+
+ self.fig.tight_layout()
+ self.canvas.draw()
+
+ def display_final_results(self):
+ total_time_real_sec = self.time_elapsed_real_sec
+ total_time_real_h = total_time_real_sec / 3600.0
+
+ # Format runtime to HH:MM:SS for the final message
+ runtime_hms = self.format_seconds_to_hms(total_time_real_sec)
+
+ if self.sim_discharge_var.get():
+ # In time mode, we calculate the charge loss required to hit V_STOP based on the SoC curve.
+ final_soc = self.get_soc_percent_from_voltage(self.config["V_STOP"])
+ initial_soc = self.get_soc_percent_from_voltage(self.config["V_START"])
+
+ soc_drop = initial_soc - final_soc
+ mah_consumed_simulated = self.config["CAPACITY_NOMINAL_MAH"] * (soc_drop / 100.0)
+
+ I_test_avg_ma = mah_consumed_simulated / total_time_real_h if total_time_real_h > 0 else 0
+
+ final_message = (
+ f"!!! SIMULATION FINISHED (Time Mode) !!!\n"
+ f"Targeted simulation time: {self.sim_time_str_var.get()} (MM:SS).\n"
+ f"Discharge from {self.config['V_START']:.2f}V to {self.config['V_STOP']:.2f}V achieved.\n"
+ f"\n--- RESULTS ---\n"
+ f"Simulated Runtime: {runtime_hms} (HH:MM:SS)\n"
+ f"Theoretical Average Current drawn: {I_test_avg_ma:.2f} mA (to achieve V drop)"
+ )
+ else:
+ total_consumed_mah = self.config["CAPACITY_NOMINAL_MAH"] - self.current_capacity_mah
+ I_test_avg_ma = total_consumed_mah / total_time_real_h if total_time_real_h > 0 else 0
+ estimated_runtime_h = self.config["CAPACITY_NOMINAL_MAH"] / I_test_avg_ma if I_test_avg_ma > 0 else float('inf')
+ estimated_runtime_hms = self.format_seconds_to_hms(estimated_runtime_h * 3600.0)
+
+ final_message = (
+ f"!!! EMULATION FINISHED !!!\n"
+ f"Algorithm reached V_STOP = {self.config['V_STOP']} V.\n"
+ f"\n--- TEST RESULTS ---\n"
+ f"Simulated Runtime until V_STOP: {runtime_hms} (HH:MM:SS)\n"
+ f"Overall Average Current from test: {I_test_avg_ma:.2f} mA\n"
+ f"*** ESTIMATED TOTAL RUNTIME (Full Capacity): {estimated_runtime_hms} (HH:MM:SS) ***"
+ )
+
+ messagebox.showinfo("Simulation Results", final_message)
+
+ def save_data(self):
+ """Saves collected data to a CSV file and appends summary statistics.
+ Includes an aggregated Hex value for the Logic Analyzer pins."""
+ if not self.all_current_samples:
+ messagebox.showwarning("Error", "No data to save.")
+ return
+
+ # --- Generate File Name with Date/Time ---
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+ default_filename = f"{timestamp}_ppk2_battery_log.csv"
+
+ filepath = filedialog.asksaveasfilename(defaultextension=".csv",
+ filetypes=[("CSV files", "*.csv")],
+ initialfile=default_filename)
+ if not filepath:
+ return
+
+ # --- 1. Prepare Time Data in s:ms format ---
+ def format_time_s_ms(total_seconds):
+ seconds = int(total_seconds)
+ milliseconds = int(round((total_seconds - seconds) * 1000))
+ if milliseconds == 1000: # Handle rounding up to the next second
+ seconds += 1
+ milliseconds = 0
+ return f"{seconds}:{milliseconds:03d}"
+
+ time_s_ms = [format_time_s_ms(t) for t in self.all_time_real_s]
+
+ # --- 2. Bit-to-Hex Conversion ---
+ def logic_to_hex(logic_list):
+ """
+ Converts a list of 8 booleans (D0 is LSB) to a 0xXX hex string.
+ Now prepends '0x' for standard hex format.
+ """
+ if not logic_list:
+ return '0x00'
+
+ logic_list = logic_list[:8] + [False] * (8 - len(logic_list))
+
+ # Assuming D0 is LSB (2^0) and D7 is MSB (2^7).
+ decimal_value = 0
+ for i, state in enumerate(logic_list):
+ if state:
+ decimal_value += (2 ** i)
+
+ # Return with '0x' prefix
+ return f'0x{decimal_value:02X}'
+
+
+ # --- 3. Prepare Data Frame for main log ---
+ min_len = min(len(time_s_ms), len(self.all_current_samples), len(self.all_voltage_v), len(self.all_logic_data))
+
+ # Create columns for logic data (D0, D1, ..., D7)
+ logic_cols = {f"D{i}": [log[i] for log in self.all_logic_data[:min_len]] for i in range(8)}
+
+ # New Hexadecimal column
+ logic_hex_values = [logic_to_hex(log) for log in self.all_logic_data[:min_len]]
+
+
+ data_dict = {
+ "Time_s:ms": time_s_ms[:min_len],
+ "Current_mA_Measured": self.all_current_samples[:min_len],
+ "Voltage_V_Emulated": self.all_voltage_v[:min_len],
+ **logic_cols,
+ "Logic_Hex_D0-D7": logic_hex_values
+ }
+ df = pd.DataFrame(data_dict)
+
+ # --- 4. Calculate Summary Statistics ---
+ total_time_real_sec = self.time_elapsed_real_sec
+ total_time_real_h = total_time_real_sec / 3600.0
+
+ # Format runtime to HH:MM:SS for the summary
+ runtime_hms = self.format_seconds_to_hms(total_time_real_sec)
+
+ if self.sim_discharge_var.get():
+ # Time Simulation Mode
+ final_soc = self.get_soc_percent_from_voltage(self.config["V_STOP"])
+ initial_soc = self.get_soc_percent_from_voltage(self.config["V_START"])
+ soc_drop = initial_soc - final_soc
+
+ mah_consumed_simulated = self.config["CAPACITY_NOMINAL_MAH"] * (soc_drop / 100.0)
+ I_avg_ma = mah_consumed_simulated / total_time_real_h if total_time_real_h > 0 else 0
+ I_peak_ma = np.max(self.all_current_samples) if self.all_current_samples else 0
+
+ total_coulomb_uAh = mah_consumed_simulated * 1000
+ estimated_runtime_hms = self.format_seconds_to_hms(self.total_sim_time_sec)
+ runtime_status = f"Targeted Runtime (H:M:S): {estimated_runtime_hms}"
+
+ else:
+ # Current Emulation Mode
+ total_consumed_mah = self.config["CAPACITY_NOMINAL_MAH"] - self.current_capacity_mah
+ I_avg_ma = total_consumed_mah / total_time_real_h if total_time_real_h > 0 else 0
+ I_peak_ma = np.max(self.all_current_samples) if self.all_current_samples else 0
+
+ total_coulomb_uAh = total_consumed_mah * 1000
+ estimated_runtime_h = self.config["CAPACITY_NOMINAL_MAH"] / I_avg_ma if I_avg_ma > 0 else float('inf')
+ estimated_runtime_hms = self.format_seconds_to_hms(estimated_runtime_h * 3600.0)
+ runtime_status = f"Estimated Total Runtime (H:M:S): {estimated_runtime_hms}"
+
+ # --- 5. Create Summary Block ---
+ summary_lines = [
+ f"\n\n--- SIMULATION SUMMARY ---",
+ f"Mode,{ 'TIME_SIMULATION' if self.sim_discharge_var.get() else 'CURRENT_EMULATION'}",
+ f"Battery_Type,{self.battery_type_var.get()}",
+ f"Nominal_Capacity_mAh,{self.config['CAPACITY_NOMINAL_MAH']:.2f}",
+ f"Time_to_V_STOP_s,{self.time_elapsed_real_sec:.2f}",
+ f"Total_Charge_Consumed_uAh,{total_coulomb_uAh:.2f}",
+ f"Average_Current_mA,{I_avg_ma:.2f}",
+ f"Peak_Current_mA,{I_peak_ma:.2f}",
+ f"Runtime_to_V_STOP_H:M:S,{runtime_hms}", # Use H:M:S format here
+ f"Estimated_Total_Runtime_H:M:S,{estimated_runtime_hms}",
+ f"V_START_V,{self.config['V_START']:.2f}",
+ f"V_STOP_V,{self.config['V_STOP']:.2f}",
+ f"--------------------------"
+ ]
+ summary_block = "\n".join(summary_lines)
+
+ # --- 6. Save to CSV ---
+ with open(filepath, 'w') as f:
+ # Write data frame (main log)
+ f.write(df.to_csv(index=False))
+ # Append summary block
+ f.write(summary_block)
+
+ messagebox.showinfo("Saved", f"Data and Summary successfully saved to:\n{filepath}")
+
+
+# --- Main application loop ---
+if __name__ == "__main__":
+ # Ensure dependencies are available (though no guarantee)
+ try:
+ import numpy as np
+ import pandas as pd
+ import matplotlib.pyplot as plt
+ except ImportError:
+ print("FATAL ERROR: Missing dependencies (numpy, pandas, matplotlib). Please install them using: pip install numpy pandas matplotlib")
+ exit()
+
+ root = tk.Tk()
+ app = BatteryEmulator(master=root)
+
+ def on_closing():
+ if messagebox.askokcancel("Quit", "Do you want to quit the application?"):
+ app.stop_emulation()
+ root.destroy()
+
+ root.protocol("WM_DELETE_WINDOW", on_closing)
+ root.mainloop()
\ No newline at end of file