# HaemoHelper — iBX020 Fault Monitor
# Main firmware for Raspberry Pi Pico 2W
# MicroPython
import time
import network
import machine
from machine import I2C, Pin
from config import (
WIFI_SSID, WIFI_PASSWORD,
I2C_SDA, I2C_SCL, I2C_FREQ,
ADS1115_U3, ADS1115_U4,
GPIO_DI, GPIO_T,
DIVIDER_RATIO,
CURRENT_ZERO_VOLTAGE, CURRENT_SENSITIVITY,
SAMPLE_INTERVAL_MS
)
from ads1115 import ADS1115
from di_decoder import DIDecoder
from logger import Logger
from webserver import WebServer
# ─── Boot indicator ───────────────────────────────────────────────
led = Pin("LED", Pin.OUT)
def blink(n, ms=100):
for _ in range(n):
led.on(); time.sleep_ms(ms); led.off(); time.sleep_ms(ms)
print("\n╔══════════════════════════════╗")
print("║ HaemoHelper v1.0 ║")
print("║ iBX020 Fault Monitor ║")
print("╚══════════════════════════════╝\n")
blink(3)
# ─── WiFi ─────────────────────────────────────────────────────────
print(f"Connecting to WiFi: {WIFI_SSID}")
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
timeout = 20
while not wlan.isconnected() and timeout > 0:
led.toggle()
time.sleep(0.5)
timeout -= 1
print(".", end="")
print()
if wlan.isconnected():
ip = wlan.ifconfig()[0]
print(f"Connected. IP: {ip}")
print(f"Dashboard: http://{ip}")
blink(5, 50)
led.on()
else:
print("WiFi failed — continuing without network")
blink(10, 50)
# ─── I2C and ADC setup ────────────────────────────────────────────
print("\nInitialising I2C and ADC modules...")
i2c = I2C(0, sda=Pin(I2C_SDA), scl=Pin(I2C_SCL), freq=I2C_FREQ)
# Scan I2C bus
devices = i2c.scan()
print(f"I2C devices found: {[hex(d) for d in devices]}")
if ADS1115_U3 not in devices:
print(f"WARNING: ADS1115 U3 (0x{ADS1115_U3:02x}) not found on I2C bus")
if ADS1115_U4 not in devices:
print(f"WARNING: ADS1115 U4 (0x{ADS1115_U4:02x}) not found on I2C bus")
adc_u3 = ADS1115(i2c, ADS1115_U3) # Voltage dividers
adc_u4 = ADS1115(i2c, ADS1115_U4) # Current sensor
# ─── GPIO setup ───────────────────────────────────────────────────
pin_di = Pin(GPIO_DI, Pin.IN, Pin.PULL_DOWN)
pin_t = Pin(GPIO_T, Pin.IN, Pin.PULL_DOWN)
print(f"GPIO ready — D/I on GP{GPIO_DI}, T on GP{GPIO_T}")
# ─── Modules ──────────────────────────────────────────────────────
logger = Logger()
di_decoder = DIDecoder()
server = WebServer(logger, di_decoder)
server.start()
print("\nAll systems ready. Logging started.\n")
# ─── Timestamp helper ─────────────────────────────────────────────
def timestamp():
t = time.localtime()
return f"{t[0]:04d}-{t[1]:02d}-{t[2]:02d} {t[3]:02d}:{t[4]:02d}:{t[5]:02d}"
# ─── Voltage scaling ──────────────────────────────────────────────
def adc_to_fridge_voltage(adc_voltage):
"""Convert scaled ADC voltage back to original fridge circuit voltage"""
if DIVIDER_RATIO == 0:
return 0.0
return adc_voltage / DIVIDER_RATIO
def adc_to_current(adc_voltage):
"""Convert HSTS016L output voltage to amps"""
return (adc_voltage - CURRENT_ZERO_VOLTAGE) / CURRENT_SENSITIVITY
# ─── Main loop ────────────────────────────────────────────────────
last_sample_ms = 0
last_di_event_code = None
print("timestamp,psu_v,fan_v,ssr_v,ctrl_v,current_a,t_state,di_state,di_code")
while True:
now_ms = time.ticks_ms()
# ── Sample all channels at 1Hz ──────────────────────────────
if time.ticks_diff(now_ms, last_sample_ms) >= SAMPLE_INTERVAL_MS:
last_sample_ms = now_ms
ts = timestamp()
# Read voltage channels from U3
psu_v = adc_to_fridge_voltage(adc_u3.read_voltage(0))
fan_v = adc_to_fridge_voltage(adc_u3.read_voltage(1))
ssr_v = adc_to_fridge_voltage(adc_u3.read_voltage(2))
ctrl_v = adc_to_fridge_voltage(adc_u3.read_voltage(3))
# Read current from U4
current_raw = adc_u4.read_voltage(0)
current_a = adc_to_current(current_raw)
# Read digital inputs
t_state = pin_t.value()
di_state = pin_di.value()
# Update D/I decoder
di_code = di_decoder.update(di_state, now_ms)
# Log D/I fault event if new code detected
if di_code is not None and di_code != last_di_event_code:
desc = di_decoder.describe(di_code)
logger.log_event(ts, f"D/I fault code {di_code}: {desc}")
last_di_event_code = di_code
# Write to log
logger.log_reading(ts, psu_v, fan_v, ssr_v, ctrl_v,
current_a, t_state, di_state, di_code)
# Print to serial for debugging
print(f"{ts},{psu_v:.2f},{fan_v:.2f},{ssr_v:.2f},{ctrl_v:.2f},"
f"{current_a:.3f},{t_state},{di_state},"
f"{di_code if di_code is not None else ''}")
# ── Serve web requests (non-blocking) ───────────────────────
server.serve()
# Small sleep to avoid busy loop hammering the CPU
time.sleep_ms(10)