From b6d5cf3f9102c1753d8b254ec09604ebea046667 Mon Sep 17 00:00:00 2001 From: leogao Date: Mon, 30 Mar 2026 18:54:24 -0700 Subject: [PATCH 1/4] Improve package usability --- README.md | 11 +++ ppk2Prelude.py | 205 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 216 insertions(+) create mode 100644 ppk2Prelude.py diff --git a/README.md b/README.md index ed0e941..78774a6 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,14 @@ +## Embedder Fork + +This is a fork of [IRNAS/ppk2-api-python](https://github.com/IRNAS/ppk2-api-python) maintained by [Embedder](https://embedder.com). + +**Added in this fork:** +- `ppk2Prelude.py` -- GPL-2.0-licensed bridge script used by the Embedder CLI to interface with PPK2 devices. This file is authored by Embedder Pty Ltd and licensed under GPL-2.0-only. + +The Embedder CLI downloads this file at runtime during the PPK2 setup flow. It is never embedded in the proprietary Embedder binary. + +--- + ## Description The new Nordic Semiconductor's [Power Profiler Kit II (PPK 2)](https://www.nordicsemi.com/Software-and-tools/Development-Tools/Power-Profiler-Kit-2) is very useful for real time measurement of device power consumption. The official [nRF Connect Power Profiler tool](https://github.com/NordicSemiconductor/pc-nrfconnect-ppk) provides a friendly GUI with real-time data display. However there is no support for automated power monitoring. The puropose of this Python API is to enable automated power monitoring and data logging in Python applications. diff --git a/ppk2Prelude.py b/ppk2Prelude.py new file mode 100644 index 0000000..2815317 --- /dev/null +++ b/ppk2Prelude.py @@ -0,0 +1,205 @@ +# ppk2Prelude.py — PPK2 hardware helper bridge +# Copyright (c) 2026 Embedder Pty Ltd +# SPDX-License-Identifier: GPL-2.0-only +# +# This file is licensed under the GNU General Public License v2.0. +# It serves to improve the usability of the following package: +# ppk2-api Python package (https://github.com/IRNAS/ppk2-api-python). +# You may redistribute and/or modify it under the terms of the GPL v2. +# See https://www.gnu.org/licenses/old-licenses/gpl-2.0.html + +import json +import time + +from ppk2_api.ppk2_api import PPK2_MP + +_ppk2_connections = {} + +def ppk2_list_devices(): + raw = PPK2_MP.list_devices() + if not raw: + return [] + result = [] + for d in raw: + port, sn, *_ = d + result.append({"port": port, "serial_number": sn}) + return result + +def ppk2_connect(port=None, use_buffered_reader=True): + global _ppk2_connections + if port is None: + devices = ppk2_list_devices() + if not devices: + raise RuntimeError("No PPK2 device found") + port = devices[0]["port"] + if port in _ppk2_connections: + return _ppk2_connections[port] + if use_buffered_reader: + ppk2 = PPK2_MP(port) + else: + from ppk2_api.ppk2_api import PPK2_API + ppk2 = PPK2_API(port) + ppk2.get_modifiers() + _ppk2_connections[port] = ppk2 + return ppk2 + +def ppk2_use_source_meter(port=None): + ppk2 = ppk2_connect(port) + ppk2.use_source_meter() + return {"success": True, "mode": "source_meter"} + +def ppk2_use_ampere_meter(port=None): + ppk2 = ppk2_connect(port) + ppk2.use_ampere_meter() + return {"success": True, "mode": "ampere_meter"} + +def ppk2_set_source_voltage(mv, port=None): + mv = int(mv) + if mv < 800 or mv > 5000: + raise ValueError("Voltage must be between 800 and 5000 mV") + ppk2 = ppk2_connect(port) + ppk2.set_source_voltage(mv) + return {"success": True, "voltage_mv": mv} + +def ppk2_power_on_dut(port=None): + ppk2 = ppk2_connect(port) + ppk2.toggle_DUT_power("ON") + return {"success": True, "power": "on"} + +def ppk2_power_off_dut(port=None): + ppk2 = ppk2_connect(port) + ppk2.toggle_DUT_power("OFF") + return {"success": True, "power": "off"} + +def ppk2_start_measuring(port=None): + ppk2 = ppk2_connect(port) + ppk2.start_measuring() + return {"success": True, "measuring": True} + +def ppk2_read_samples(duration_ms=1000, port=None, include_digital=False): + ppk2 = ppk2_connect(port) + duration_s = float(duration_ms) / 1000.0 + all_samples = [] + end_time = time.time() + duration_s + while time.time() < end_time: + read_data = ppk2.get_data() + if read_data is not None: + samples, raw_digital = ppk2.get_samples(read_data) + if samples is not None: + all_samples.extend(samples) + time.sleep(0.001) + if not all_samples: + return {"success": False, "summary": "No samples collected", "sample_count": 0} + avg_ua = sum(all_samples) / len(all_samples) + min_ua = min(all_samples) + max_ua = max(all_samples) + result = { + "success": True, + "sample_count": len(all_samples), + "avg_ua": round(avg_ua, 3), + "min_ua": round(min_ua, 3), + "max_ua": round(max_ua, 3), + "duration_ms": duration_ms, + } + if include_digital: + result["summary"] = f"{len(all_samples)} samples, avg={avg_ua:.1f}uA (digital channels included)" + else: + result["summary"] = f"{len(all_samples)} samples, avg={avg_ua:.1f}uA" + return result + +def ppk2_stop_measuring(port=None): + ppk2 = ppk2_connect(port) + ppk2.stop_measuring() + return {"success": True, "measuring": False} + +def ppk2_disconnect(port=None): + global _ppk2_connections + if port is None: + for p, ppk2 in list(_ppk2_connections.items()): + try: + ppk2.stop_measuring() + except Exception: + pass + _ppk2_connections.clear() + return {"success": True, "disconnected": "all"} + ppk2 = _ppk2_connections.pop(port, None) + if ppk2 is not None: + try: + ppk2.stop_measuring() + except Exception: + pass + return {"success": True, "disconnected": port or "none"} + +def ppk2_measure(duration_ms=1000, source_voltage_mv=None, port=None, + settle_ms=100, sample_interval_ms=1, include_samples=False, + use_buffered_reader=True): + ppk2 = ppk2_connect(port, use_buffered_reader=use_buffered_reader) + actual_port = port + if actual_port is None: + for p in _ppk2_connections: + if _ppk2_connections[p] is ppk2: + actual_port = p + break + powered_on = False + if source_voltage_mv is not None: + mv = int(source_voltage_mv) + if mv < 800 or mv > 5000: + raise ValueError("source_voltage_mv must be between 800 and 5000") + ppk2.use_source_meter() + ppk2.set_source_voltage(mv) + ppk2.toggle_DUT_power("ON") + powered_on = True + time.sleep(float(settle_ms) / 1000.0) + try: + ppk2.start_measuring() + duration_s = float(duration_ms) / 1000.0 + all_samples = [] + end_time = time.time() + duration_s + while time.time() < end_time: + read_data = ppk2.get_data() + if read_data is not None: + samples, raw_digital = ppk2.get_samples(read_data) + if samples is not None: + all_samples.extend(samples) + time.sleep(float(sample_interval_ms) / 1000.0) + finally: + ppk2.stop_measuring() + if powered_on: + ppk2.toggle_DUT_power("OFF") + if not all_samples: + return { + "success": False, + "summary": "No samples collected", + "port": actual_port, + "sample_count": 0, + "avg_ua": 0, "min_ua": 0, "max_ua": 0, + "duration_ms": duration_ms, + } + avg_ua = sum(all_samples) / len(all_samples) + min_ua = min(all_samples) + max_ua = max(all_samples) + result = { + "success": True, + "summary": f"Measured {len(all_samples)} samples over {duration_ms}ms: avg={avg_ua:.1f}uA", + "port": actual_port, + "sample_count": len(all_samples), + "avg_ua": round(avg_ua, 3), + "min_ua": round(min_ua, 3), + "max_ua": round(max_ua, 3), + "duration_ms": duration_ms, + } + if include_samples: + result["samples_ua"] = [round(s, 3) for s in all_samples] + return result + +def _ppk2_cleanup(): + for _ppk2_port, _ppk2_dev in list(_ppk2_connections.items()): + try: + _ppk2_dev.stop_measuring() + except Exception: + pass + try: + _ppk2_dev.toggle_DUT_power("OFF") + except Exception: + pass + _ppk2_connections.clear() From f3c855a517c169feae47ec1fadb1a85e722aa5a4 Mon Sep 17 00:00:00 2001 From: leogao Date: Wed, 1 Apr 2026 18:36:25 -0700 Subject: [PATCH 2/4] Fix: set ampere meter mode when no source voltage is provided When source_voltage_mv is not specified, the PPK2 should be configured in ampere meter mode via use_ampere_meter() so measurements work correctly without sourcing voltage. --- ppk2Prelude.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ppk2Prelude.py b/ppk2Prelude.py index 2815317..fff05c5 100644 --- a/ppk2Prelude.py +++ b/ppk2Prelude.py @@ -150,6 +150,8 @@ def ppk2_measure(duration_ms=1000, source_voltage_mv=None, port=None, ppk2.toggle_DUT_power("ON") powered_on = True time.sleep(float(settle_ms) / 1000.0) + else: + ppk2.use_ampere_meter() try: ppk2.start_measuring() duration_s = float(duration_ms) / 1000.0 From 9cba18c037babf3c22fcc0fc833a7668335aaabd Mon Sep 17 00:00:00 2001 From: Bob Wei Date: Tue, 21 Apr 2026 22:33:05 -0500 Subject: [PATCH 3/4] Add capture state management and CSV export for PPK2 measurements This update introduces a new capture state management system to track measurement samples, sample rate, and publication status. It includes functionality to publish captured data as a CSV file, enhancing data handling capabilities. The cleanup process now ensures that captures are published upon termination. Additionally, the measuring functions have been updated to utilize this new state management. --- ppk2Prelude.py | 95 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/ppk2Prelude.py b/ppk2Prelude.py index fff05c5..aca8dc6 100644 --- a/ppk2Prelude.py +++ b/ppk2Prelude.py @@ -8,13 +8,28 @@ # You may redistribute and/or modify it under the terms of the GPL v2. # See https://www.gnu.org/licenses/old-licenses/gpl-2.0.html +import atexit +import csv import json +import os +import sys +import tempfile import time +import uuid from ppk2_api.ppk2_api import PPK2_MP +_PPK2_SAMPLE_RATE_HZ = 100_000 + _ppk2_connections = {} +_ppk2_capture_state = { + "samples": None, + "sample_rate": None, + "started_at": None, + "published": False, +} + def ppk2_list_devices(): raw = PPK2_MP.list_devices() if not raw: @@ -74,6 +89,12 @@ def ppk2_power_off_dut(port=None): def ppk2_start_measuring(port=None): ppk2 = ppk2_connect(port) ppk2.start_measuring() + _ppk2_capture_state.update({ + "samples": [], + "sample_rate": _PPK2_SAMPLE_RATE_HZ, + "started_at": time.monotonic(), + "published": False, + }) return {"success": True, "measuring": True} def ppk2_read_samples(duration_ms=1000, port=None, include_digital=False): @@ -87,6 +108,8 @@ def ppk2_read_samples(duration_ms=1000, port=None, include_digital=False): samples, raw_digital = ppk2.get_samples(read_data) if samples is not None: all_samples.extend(samples) + if _ppk2_capture_state["samples"] is not None: + _ppk2_capture_state["samples"].extend(samples) time.sleep(0.001) if not all_samples: return {"success": False, "summary": "No samples collected", "sample_count": 0} @@ -110,6 +133,13 @@ def ppk2_read_samples(duration_ms=1000, port=None, include_digital=False): def ppk2_stop_measuring(port=None): ppk2 = ppk2_connect(port) ppk2.stop_measuring() + try: + ppk2_publish_capture() + finally: + _ppk2_capture_state["samples"] = None + _ppk2_capture_state["sample_rate"] = None + _ppk2_capture_state["started_at"] = None + _ppk2_capture_state["published"] = False return {"success": True, "measuring": False} def ppk2_disconnect(port=None): @@ -152,6 +182,12 @@ def ppk2_measure(duration_ms=1000, source_voltage_mv=None, port=None, time.sleep(float(settle_ms) / 1000.0) else: ppk2.use_ampere_meter() + _ppk2_capture_state.update({ + "samples": [], + "sample_rate": _PPK2_SAMPLE_RATE_HZ, + "started_at": time.monotonic(), + "published": False, + }) try: ppk2.start_measuring() duration_s = float(duration_ms) / 1000.0 @@ -163,11 +199,20 @@ def ppk2_measure(duration_ms=1000, source_voltage_mv=None, port=None, samples, raw_digital = ppk2.get_samples(read_data) if samples is not None: all_samples.extend(samples) + if _ppk2_capture_state["samples"] is not None: + _ppk2_capture_state["samples"].extend(samples) time.sleep(float(sample_interval_ms) / 1000.0) finally: ppk2.stop_measuring() if powered_on: ppk2.toggle_DUT_power("OFF") + try: + ppk2_publish_capture(name=f"PPK2 measure {duration_ms}ms") + finally: + _ppk2_capture_state["samples"] = None + _ppk2_capture_state["sample_rate"] = None + _ppk2_capture_state["started_at"] = None + _ppk2_capture_state["published"] = False if not all_samples: return { "success": False, @@ -194,7 +239,55 @@ def ppk2_measure(duration_ms=1000, source_voltage_mv=None, port=None, result["samples_ua"] = [round(s, 3) for s in all_samples] return result +def _ppk2_bridge_publish_capture(payload): + try: + _hardware_bridge_request({"cmd": "capture_published", "payload": payload}) + except NameError: + return + except Exception as exc: + sys.stderr.write(f"[embedder] ppk2 capture_published failed: {exc}\n") + sys.stderr.flush() + +def _ppk2_write_analog_csv(samples, sample_rate): + tmpdir = tempfile.mkdtemp(prefix="embedder_ppk2_") + path = os.path.join(tmpdir, "analog.csv") + dt = 1.0 / sample_rate if sample_rate else 0.0 + with open(path, "w", newline="") as fh: + w = csv.writer(fh) + w.writerow(["time_sec", "current_ua"]) + for i, v in enumerate(samples): + w.writerow([f"{i * dt:.9f}", f"{v:.6f}"]) + return tmpdir + +def ppk2_publish_capture(name=None): + state = _ppk2_capture_state + if state["samples"] is None or state["published"]: + return None + samples = state["samples"] + sample_rate = state["sample_rate"] or 0 + duration = (len(samples) / sample_rate) if sample_rate else 0.0 + tmpdir = _ppk2_write_analog_csv(samples, sample_rate) if samples else None + payload = { + "capture_id": uuid.uuid4().hex, + "name": name or f"PPK2 {time.strftime('%H:%M:%S')}", + "sample_rate": int(sample_rate), + "duration_sec": float(duration), + "sample_count": len(samples), + "digital_channels": [], + "analog_channels": [{"name": "current_ua", "index": 0}], + "analog_csv_dir": tmpdir, + "sal_path": None, + "analyzers": [], + } + _ppk2_bridge_publish_capture(payload) + state["published"] = True + return payload["capture_id"] + def _ppk2_cleanup(): + try: + ppk2_publish_capture() + except Exception: + pass for _ppk2_port, _ppk2_dev in list(_ppk2_connections.items()): try: _ppk2_dev.stop_measuring() @@ -205,3 +298,5 @@ def _ppk2_cleanup(): except Exception: pass _ppk2_connections.clear() + +atexit.register(_ppk2_cleanup) From ceb942276e6663b9d850fb50370dc3fdc48e1d11 Mon Sep 17 00:00:00 2001 From: Bob Wei Date: Mon, 27 Apr 2026 11:03:41 -0700 Subject: [PATCH 4/4] Refactor payload construction in ppk2_publish_capture to conditionally include analog_csv_dir This change updates the ppk2_publish_capture function to only add the analog_csv_dir to the payload if tmpdir is not None, improving the clarity and efficiency of the payload creation process. --- ppk2Prelude.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ppk2Prelude.py b/ppk2Prelude.py index aca8dc6..184cda9 100644 --- a/ppk2Prelude.py +++ b/ppk2Prelude.py @@ -275,10 +275,10 @@ def ppk2_publish_capture(name=None): "sample_count": len(samples), "digital_channels": [], "analog_channels": [{"name": "current_ua", "index": 0}], - "analog_csv_dir": tmpdir, - "sal_path": None, "analyzers": [], } + if tmpdir is not None: + payload["analog_csv_dir"] = tmpdir _ppk2_bridge_publish_capture(payload) state["published"] = True return payload["capture_id"]