"""
Circuit (Wokwi ESP32):
DHT22 GPIO 15 cargo temperature and humidity
MPU6050 I2C GPIO 21/22 accelerometer for g-force
Potmtr 1 ADC GPIO 34 engine temperature (70–120 °C)
Potmtr 2 ADC GPIO 32 tyre pressure (85–115 PSI)
Potmtr 3 ADC GPIO 33 fuel level (0–100 %)
Potmtr 4 ADC GPIO 35 battery voltage (11.0–14.5 V)
SSD1306 I2C GPIO 21/22 OLED edge display
Red LED GPIO 12 alert indicator
Buzzer PWM GPIO 13 audible alert
Push btn 1 GPIO 14 driver SOS
Push btn 2 GPIO 4 cargo tamper seal
Slide sw 1 GPIO 25 cargo door contact
Slide sw 2 GPIO 26 seatbelt sensor
Slide sw 3 GPIO 27 ignition state
"""
import network
import time
from machine import Pin, SoftI2C, ADC, PWM
import dht
import ujson
from umqtt.simple import MQTTClient
import ssd1306
MQTT_CLIENT_ID = "clearpath_truck_001"
MQTT_BROKER = "broker.emqx.io"
MQTT_PORT = 1883
MQTT_TOPIC_PUB = b"helixpark/clearpath/truck001/telemetry"
MQTT_TOPIC_SUB = b"helixpark/clearpath/truck001/commands"
# WHO Technical Report 961 cold chain limits
TEMP_MIN_PHARMA = 2.0 # °C lower limit
TEMP_MAX_PHARMA = 8.0 # °C upper limit
HUM_MIN = 30.0 # %RH
HUM_MAX = 70.0 # %RH
G_FORCE_LIMIT = 2.5 # g harsh driving event threshold
ENGINE_TEMP_MAX = 105.0 # °C engine overheat threshold
# Dead-band filter thresholds
DEAD_BAND_TEMP = 0.5 # °C
DEAD_BAND_HUM = 2.0 # %RH
DEAD_BAND_ENG = 2.0 # °C
DEAD_BAND_G = 0.15 # g
HEARTBEAT_CYCLES = 15 # publish at minimum every 15 cycles
print("ClearPath initialising hardware...")
# Shared I2C bus (OLED + MPU6050)
i2c = SoftI2C(scl=Pin(22), sda=Pin(21))
# SSD1306 OLED display
try:
oled = ssd1306.SSD1306_I2C(128, 64, i2c)
oled.fill(0)
oled.text("ClearPath OS", 10, 20)
oled.text("Booting...", 10, 40)
oled.show()
has_oled = True
except Exception as exc:
print("OLED init error:", exc)
has_oled = False
# DHT22 cold chain sensor
dht_sensor = dht.DHT22(Pin(15))
# MPU6050 accelerometer (driver behaviour)
MPU6050_ADDR = 0x68
try:
i2c.writeto_mem(MPU6050_ADDR, 0x6B, b'\x00')
except Exception as exc:
print("MPU6050 init error:", exc)
# Engine temperature simulation
engine_temp_adc = ADC(Pin(34))
engine_temp_adc.atten(ADC.ATTN_11DB)
# Tyre pressure simulation
tyre_pressure_adc = ADC(Pin(32))
tyre_pressure_adc.atten(ADC.ATTN_11DB)
# Fuel level simulation
fuel_adc = ADC(Pin(33))
fuel_adc.atten(ADC.ATTN_11DB)
# Battery voltage simulation
battery_adc = ADC(Pin(35))
battery_adc.atten(ADC.ATTN_11DB)
alert_led = Pin(12, Pin.OUT)
alert_buzzer = PWM(Pin(13), freq=1000, duty=0)
sos_button = Pin(14, Pin.IN, Pin.PULL_UP)
sos_active = False
_last_sos_ms = 0
tamper_button = Pin(4, Pin.IN, Pin.PULL_UP)
tamper_active = False
_last_tamp_ms = 0
# Brake wear decreases with each harsh driving
brake_wear_pct = 75.0
cargo_door_sw = Pin(25, Pin.IN, Pin.PULL_UP)
seatbelt_sw = Pin(26, Pin.IN, Pin.PULL_UP)
ignition_sw = Pin(27, Pin.IN, Pin.PULL_UP)
def _handle_sos(pin):
global sos_active, _last_sos_ms
now = time.ticks_ms()
if time.ticks_diff(now, _last_sos_ms) > 500:
sos_active = True
_last_sos_ms = now
print("SOS BUTTON PRESSED , alerting cloud!")
def _handle_tamper(pin):
global tamper_active, _last_tamp_ms
now = time.ticks_ms()
if time.ticks_diff(now, _last_tamp_ms) > 500:
tamper_active = True
_last_tamp_ms = now
print("TAMPER SEAL TRIGGERED - cargo breach!")
sos_button.irq(trigger=Pin.IRQ_FALLING, handler=_handle_sos)
tamper_button.irq(trigger=Pin.IRQ_FALLING, handler=_handle_tamper)
def show_oled(l1: str, l2: str, l3: str, l4: str) -> None:
if not has_oled:
return
oled.fill(0)
oled.text(l1[:16], 0, 0)
oled.text(l2[:16], 0, 16)
oled.text(l3[:16], 0, 32)
oled.text(l4[:16], 0, 48)
oled.show()
def check_cold_chain_local(temp: float, hum: float, door_open: bool) -> list:
alerts = []
if temp < TEMP_MIN_PHARMA or temp > TEMP_MAX_PHARMA:
alerts.append("TEMP_BREACH")
if hum < HUM_MIN or hum > HUM_MAX:
alerts.append("HUM_BREACH")
if door_open:
alerts.append("DOOR_OPEN")
return alerts
def read_mpu6050() -> tuple:
try:
raw = i2c.readfrom_mem(MPU6050_ADDR, 0x3B, 6)
def to_int(msb, lsb):
v = (msb << 8) | lsb
return v if v <= 32767 else v - 65536
ax = to_int(raw[0], raw[1]) / 16384.0
ay = to_int(raw[2], raw[3]) / 16384.0
az = to_int(raw[4], raw[5]) / 16384.0
return ax, ay, az
except Exception:
return 0.0, 0.0, 0.0
def connect_wifi() -> None:
show_oled("Connecting WiFi", "Please wait...", "", "")
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.connect("Wokwi-GUEST", "")
while not sta.isconnected():
time.sleep(0.2)
print("WiFi connected:", sta.ifconfig())
show_oled("WiFi OK", sta.ifconfig()[0], "", "")
def mqtt_command_callback(topic: bytes, msg: bytes) -> None:
try:
cmd = ujson.loads(msg)
if cmd.get("alert") is True:
alert_led.value(1)
alert_buzzer.duty(512)
print("[CMD] Cloud alert ON")
elif cmd.get("alert") is False:
alert_led.value(0)
alert_buzzer.duty(0)
print("[CMD] Cloud alert cleared")
except Exception as exc:
print("Command parse error:", exc)
def connect_mqtt() -> MQTTClient:
show_oled("Connecting MQTT", MQTT_BROKER[:16], "", "")
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, port=MQTT_PORT, keepalive=60)
client.set_callback(mqtt_command_callback)
client.connect()
client.subscribe(MQTT_TOPIC_SUB)
print("MQTT connected to", MQTT_BROKER)
return client
# GPS position
loc_lat = 53.4808
loc_lon = -2.2426
emissions = 1420.5
# Dead-band filter
_last_pub_temp = None
_last_pub_hum = None
_last_pub_eng = None
_last_pub_g = None
_cycles_since = 0
time.sleep(1)
connect_wifi()
mqtt_client = connect_mqtt()
show_oled("System Active", "Reading sensors", "", "")
time.sleep(0.5)
while True:
try:
mqtt_client.check_msg()
try:
dht_sensor.measure()
cargo_temp = dht_sensor.temperature()
cargo_hum = dht_sensor.humidity()
except Exception:
cargo_temp, cargo_hum = 0.0, 0.0
ax, ay, az = read_mpu6050()
g_force = (ax ** 2 + ay ** 2) ** 0.5
if g_force > G_FORCE_LIMIT:
brake_wear_pct = max(0.0, round(brake_wear_pct - 0.5, 1))
pot_raw = engine_temp_adc.read()
engine_temp = 70 + (pot_raw / 4095.0) * 50
door_open = cargo_door_sw.value() == 1
seatbelt_unfastened = seatbelt_sw.value() == 1
ignition_on = ignition_sw.value() == 0
tyre_raw = tyre_pressure_adc.read()
tyre_pressure = 85 + (tyre_raw / 4095.0) * 30
fuel_raw = fuel_adc.read()
fuel_pct = round((fuel_raw / 4095.0) * 100, 1)
bat_raw = battery_adc.read()
battery_v = round(11.0 + (bat_raw / 4095.0) * 3.5, 2)
# Simulated GPS movement
loc_lat += 0.0001
loc_lon += 0.0001
emissions += 0.05
speed_kmh = round((pot_raw / 4095.0) * 90, 1)
# Edge cold chain threshold
local_alerts = check_cold_chain_local(cargo_temp, cargo_hum, door_open)
if local_alerts or g_force > G_FORCE_LIMIT or tamper_active:
alert_led.value(1)
alert_buzzer.duty(512)
elif not sos_active:
alert_led.value(0)
alert_buzzer.duty(0)
# Build telemetry payload
telemetry = {
"vehicle_id": "TRUCK-001",
"cold_chain": {
"cargo_temp_c": round(cargo_temp, 2),
"cargo_humidity_pct": round(cargo_hum, 2),
"door_contact_open": door_open,
"local_alerts": local_alerts,
},
"driver_behaviour": {
"accel_x": round(ax, 2),
"accel_y": round(ay, 2),
"g_force": round(g_force, 2),
"seatbelt_engaged": not seatbelt_unfastened,
"speed_kmh": speed_kmh,
},
"vehicle_health": {
"engine_temp_c": round(engine_temp, 2),
"battery_voltage": battery_v,
"tyre_pressure_psi": round(tyre_pressure, 1),
"ignition_on": ignition_on,
"brake_wear_pct": round(brake_wear_pct, 1),
},
"fleet_tracking": {
"latitude": round(loc_lat, 5),
"longitude": round(loc_lon, 5),
"fuel_level_pct": round(fuel_pct, 1),
},
"charter_compliance": {
"emissions_co2_kg": round(emissions, 2),
},
"safety": {
"sos_triggered": sos_active,
"tamper_alert": tamper_active,
},
}
# Update OLED edge display
temp_str = f"T:{cargo_temp:.1f}C H:{cargo_hum:.0f}%"
eng_str = f"Eng:{engine_temp:.0f}C {speed_kmh:.0f}kph"
g_str = f"G:{g_force:.2f}g F:{fuel_pct:.0f}%"
if sos_active:
status_str = "!! SOS ALERT !!"
elif local_alerts:
status_str = "COLD CHAIN BRK"
elif alert_led.value() == 1:
status_str = "!! CLOUD ALARM !!"
else:
status_str = "COMPLIANT OK"
show_oled(temp_str, eng_str, g_str, status_str)
# Dead-band filter
_cycles_since += 1
alert_condition = bool(local_alerts) or g_force > G_FORCE_LIMIT \
or sos_active or tamper_active
temp_changed = _last_pub_temp is None or abs(cargo_temp - _last_pub_temp) > DEAD_BAND_TEMP
hum_changed = _last_pub_hum is None or abs(cargo_hum - _last_pub_hum) > DEAD_BAND_HUM
eng_changed = _last_pub_eng is None or abs(engine_temp - _last_pub_eng) > DEAD_BAND_ENG
g_changed = _last_pub_g is None or abs(g_force - _last_pub_g) > DEAD_BAND_G
significant_change = temp_changed or hum_changed or eng_changed or g_changed
should_publish = alert_condition or significant_change or (_cycles_since >= HEARTBEAT_CYCLES)
if should_publish:
payload_str = ujson.dumps(telemetry)
mqtt_client.publish(MQTT_TOPIC_PUB, payload_str)
reason = "ALERT" if alert_condition else ("CHANGE" if significant_change else "HEARTBEAT")
print(f"Published [{reason}]:", payload_str[:60], "...")
_last_pub_temp = cargo_temp
_last_pub_hum = cargo_hum
_last_pub_eng = engine_temp
_last_pub_g = g_force
_cycles_since = 0
else:
print(f"Filtered [no change] cycle {_cycles_since}/{HEARTBEAT_CYCLES}")
# Reset latching flags once transmitted
if sos_active:
sos_active = False
if tamper_active:
tamper_active = False
# Adaptive sampling
if alert_condition or sos_active:
time.sleep(1)
elif should_publish:
time.sleep(2)
else:
time.sleep(4)
except OSError as exc:
print("Network error:", exc)
show_oled("Network Error", "Reconnecting...", "", "")
time.sleep(3)
try:
mqtt_client = connect_mqtt()
except Exception:
pass
except Exception as exc:
print("Loop error:", exc)
time.sleep(1)
ESP32 DEVKIT - EDGE NODE
EDGE DISPLAY
DOMAIN 1 - COLD CHAIN
DOMAIN 2 - DRIVER BEHAVIOUR
SOS Alert Button
GPIO 14
Seatbelt Sensor
GPIO 26
DOMAIN 3 - VEHICLE HEALTH
Engine Temperature
GPIO 34 | 70-120 C
Battery Voltage
GPIO 35 | 11.0-14.5 V
Tyre Pressure
GPIO 32 | 85-115 PSI
Ignition Switch
GPIO 27
DOMAIN 4 - FLEET TRACKING
CARGO SAFETY SENSORS
Cargo Door Sensor
GPIO 25
Tamper Seal Button
GPIO 4
ALERT ACTUATOR
Buzzer
Fuel Level Sensor GPIO 33