import time
time.sleep(0.1) # Wait for USB to become ready
print("Hello, Pi Pico W!")
#!/usr/bin/env python3
"""
wokwi_gps_test.py — Inject GPS NMEA sentences into the Wokwi simulation.
How it works:
1. Starts `wokwi-cli pico/ --interactive` as a subprocess.
2. Waits for the device to print "Web server started" (boot complete).
3. Pipes 12 valid $GPRMC sentences (+ matching $GPGGA) into the process
stdin, which Wokwi routes to $serialMonitor → pico:GP1 (UART0 RX).
4. Reads stdout looking for "Logged 10 entries" — the firmware's
confirmation that GPS data was parsed and stored.
Usage:
python scripts/wokwi_gps_test.py [--timeout 60]
Requirements:
- wokwi-cli installed and on PATH
- WOKWI_CLI_TOKEN environment variable set
- Run from the repository root directory
"""
import argparse
import subprocess
import sys
import threading
import time
# ---------------------------------------------------------------------------
# NMEA helpers
# ---------------------------------------------------------------------------
def nmea_checksum(body: str) -> str:
"""Compute XOR checksum and return full NMEA sentence string."""
chk = 0
for ch in body:
chk ^= ord(ch)
return f"${body}*{chk:02X}"
def build_gprmc(seq: int, speed_knots: float) -> str:
"""Build a valid $GPRMC sentence with the given speed (active fix)."""
# Use a rotating timestamp so each sentence looks distinct
hh = 12
mm = 35
ss = (19 + seq) % 60
body = (
f"GPRMC,{hh:02d}{mm:02d}{ss:02d},"
f"A,"
f"4807.038,N,"
f"01131.000,E,"
f"{speed_knots:06.3f},"
f"084.4,"
f"230394,"
f"003.1,W"
)
return nmea_checksum(body) + "\r\n"
def build_gpgga(seq: int, satellites: int = 8) -> str:
"""Build a valid $GPGGA sentence for satellite count."""
hh = 12
mm = 35
ss = (19 + seq) % 60
body = (
f"GPGGA,{hh:02d}{mm:02d}{ss:02d},"
f"4807.038,N,"
f"01131.000,E,"
f"1,{satellites:02d},0.9,545.4,M,46.9,M,,"
)
return nmea_checksum(body) + "\r\n"
# ---------------------------------------------------------------------------
# Serial output reader (background thread)
# ---------------------------------------------------------------------------
class SerialReader(threading.Thread):
def __init__(self, stream):
super().__init__(daemon=True)
self.stream = stream
self.lines: list[str] = []
self._lock = threading.Lock()
def run(self):
for raw in self.stream:
try:
line = raw.decode("utf-8", "replace").rstrip("\r\n")
except Exception:
line = str(raw)
print(f"[SIM] {line}")
with self._lock:
self.lines.append(line)
def wait_for(self, text: str, timeout: float = 30.0) -> bool:
"""Block until `text` appears in captured output or timeout expires."""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
with self._lock:
if any(text in l for l in self.lines):
return True
time.sleep(0.1)
return False
# ---------------------------------------------------------------------------
# Main test
# ---------------------------------------------------------------------------
def run_test(project_dir: str = "pico", timeout_s: int = 60) -> int:
"""
Launch the simulation, inject NMEA data, and verify logging output.
Returns 0 on PASS, 1 on FAIL.
"""
cmd = ["wokwi-cli", project_dir, "--interactive", "--timeout", str(timeout_s * 1000)]
print(f"\n{'='*60}")
print("Wokwi GPS Injection Test")
print(f"{'='*60}")
print(f"Command: {' '.join(cmd)}\n")
try:
proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
except FileNotFoundError:
print("ERROR: wokwi-cli not found. Install with:")
print(" curl -L https://wokwi.com/ci/install.sh | sh")
return 1
reader = SerialReader(proc.stdout)
reader.start()
# ── Step 1: Wait for boot complete ─────────────────────────────────
print("[TEST] Waiting for boot to complete...")
if not reader.wait_for("Web server started", timeout=45.0):
print("FAIL: Device did not complete boot within 45 s")
proc.terminate()
return 1
print("[TEST] Boot complete ✓")
time.sleep(0.5) # Let main loop settle
# ── Step 2: Inject 12 GPRMC + GPGGA pairs ─────────────────────────
# 22.4 knots = 41.5 km/h (well above the 0.1 km/h threshold)
SPEED_KNOTS = 22.4
NUM_SENTENCES = 12
print(f"[TEST] Injecting {NUM_SENTENCES} GPRMC sentences at {SPEED_KNOTS} knots "
f"({SPEED_KNOTS * 1.852:.1f} km/h)...")
try:
for i in range(NUM_SENTENCES):
rmc = build_gprmc(i, SPEED_KNOTS)
gga = build_gpgga(i, satellites=8)
proc.stdin.write(rmc.encode())
proc.stdin.write(gga.encode())
proc.stdin.flush()
time.sleep(0.15) # 150 ms between sentences; main loop ~10 ms/iter
if i == 0:
print(f" Sample sentence: {rmc.strip()}")
except BrokenPipeError:
print("FAIL: Simulation process closed unexpectedly")
return 1
# ── Step 3: Wait for firmware to confirm logging ───────────────────
print("[TEST] Waiting for 'Logged 10 entries'...")
if not reader.wait_for("Logged 10 entries", timeout=15.0):
print("FAIL: Firmware did not log 10 GPS entries within 15 s")
print(" Possible causes:")
print(" - NMEA checksum rejected (bad sentence)")
print(" - GPS fix status is V (void) not A (active)")
print(" - UART baud rate mismatch (expected 9600)")
proc.terminate()
return 1
print("[TEST] GPS data logged ✓")
# ── Step 4: Verify speed value is in output ────────────────────────
expected_speed = f"{SPEED_KNOTS * 1.852:.1f}"
if not reader.wait_for(expected_speed, timeout=5.0):
print(f"WARN: Speed '{expected_speed} km/h' not found in output (might be rounding)")
else:
print(f"[TEST] Correct speed {expected_speed} km/h confirmed ✓")
# ── Cleanup ────────────────────────────────────────────────────────
proc.terminate()
proc.wait(timeout=5)
print(f"\n{'='*60}")
print("GPS INJECTION TEST: PASS")
print(f"{'='*60}\n")
return 0
# ---------------------------------------------------------------------------
# Wokwi MCP usage (reference for AI agent)
# ---------------------------------------------------------------------------
MCP_GUIDE = """
Using Wokwi MCP instead of this script
=======================================
If the Wokwi MCP server is connected, the AI agent can drive the simulation
directly without this script:
1. Start simulation:
wokwi_start_simulation(project_dir="pico")
2. Wait for boot:
wokwi_wait_serial(text="Web server started", timeout=45000)
3. Inject NMEA sentences (one at a time):
from scripts.wokwi_gps_test import build_gprmc, build_gpgga
for i in range(12):
wokwi_send_serial(data=build_gprmc(i, 22.4))
wokwi_send_serial(data=build_gpgga(i))
4. Assert logging:
wokwi_wait_serial(text="Logged 10 entries", timeout=15000)
5. (Optional) Take screenshot:
wokwi_screenshot(part_id="pico", file="gps_test.png")
6. Stop simulation:
wokwi_stop_simulation()
"""
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Wokwi GPS NMEA injection test")
parser.add_argument("--timeout", type=int, default=60, help="Max simulation time in seconds")
parser.add_argument("--project-dir", default="pico", help="Wokwi project directory")
parser.add_argument("--mcp-guide", action="store_true", help="Print MCP usage guide and exit")
args = parser.parse_args()
if args.mcp_guide:
print(MCP_GUIDE)
return 0
return run_test(project_dir=args.project_dir, timeout_s=args.timeout)
if __name__ == "__main__":
sys.exit(main())
Loading
pi-pico-w
pi-pico-w