import network
import time
import json
from machine import Pin, ADC, PWM
import dht
from umqtt.simple import MQTTClient
# =========================
# WIFI CONFIG
# =========================
SSID = "Wokwi-GUEST"
PASSWORD = ""
# =========================
# MQTT CONFIG — HiveMQ Cloud
# =========================
# Attention: HiveMQ Cloud nécessite le port 8883 et le SSL.
MQTT_BROKER = "dda993b5c95f4c76bf23f590d5db30d5.s1.eu.hivemq.cloud"
MQTT_PORT = 8883
MQTT_USER = "neuropace_user"
MQTT_PASSWORD = "Neuropace123"
TOPIC = "sport/neuropace/data"
# =========================
# CAPTEURS
# =========================
dht_sensor = dht.DHT22(Pin(4))
hrv_adc = ADC(Pin(34))
hrv_adc.atten(ADC.ATTN_11DB)
tremb_adc = ADC(Pin(35))
tremb_adc.atten(ADC.ATTN_11DB)
# =========================
# ACTIONNEURS
# =========================
led_r = Pin(13, Pin.OUT)
led_g = Pin(12, Pin.OUT)
led_b = Pin(14, Pin.OUT)
buzzer = PWM(Pin(32))
buzzer.duty(0)
# =========================
# WIFI CONNECTION
# =========================
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print("Connexion WiFi...")
wlan.connect(SSID, PASSWORD)
while not wlan.isconnected():
time.sleep(1)
print("WiFi connecté ✔")
print("IP:", wlan.ifconfig()[0])
# =========================
# MQTT CONNECTION
# =========================
def connect_mqtt():
client = MQTTClient(
"esp32_neuropace",
MQTT_BROKER,
port=MQTT_PORT,
user=MQTT_USER,
password=MQTT_PASSWORD,
keepalive=60,
ssl=True,
ssl_params={"server_hostname": MQTT_BROKER}
)
client.connect()
print("MQTT connecté ✔")
return client
# =========================
# LED CONTROL
# =========================
def set_color(r, g, b):
led_r.value(r)
led_g.value(g)
led_b.value(b)
# =========================
# BUZZER CONTROL
# =========================
def buzzer_on():
buzzer.freq(1000)
buzzer.duty(512)
def buzzer_off():
buzzer.duty(0)
# =========================
# NPI CALCULATION
# =========================
def calculate_npi(temp, hrv, tremb):
delta_temp = abs(temp - 36.5)
hrv_score = (1 - (hrv / 100)) * 100 # low HRV = high risk
tremb_score = tremb * 100 # 0–1 → 0–100
npi = (delta_temp / 3.5 * 100 * 0.4) + (hrv_score * 0.35) + (tremb_score * 0.25)
return round(npi, 2)
# =========================
# START
# =========================
connect_wifi()
try:
client = connect_mqtt()
except Exception as e:
print("Erreur de connexion MQTT initiale:", e)
# =========================
# LOOP
# =========================
while True:
# 1. Lecture des capteurs (isolée pour éviter de couper MQTT si le DHT plante)
try:
dht_sensor.measure()
temperature = dht_sensor.temperature()
except OSError as e:
print("Erreur de lecture DHT22 (ignorée pour ce cycle):", e)
time.sleep(2)
continue # On passe au cycle suivant sans planter
# Lecture des potentiomètres (0 à 4095 sur ESP32)
hrv = hrv_adc.read() / 4095 * 100 # 0–100 %
tremb = tremb_adc.read() / 4095 # 0–1
# 2. Logique et Actionneurs
npi = calculate_npi(temperature, hrv, tremb)
if npi < 40:
statut = "OK"
set_color(0, 1, 0) # Vert
buzzer_off()
elif npi < 75:
statut = "ATTENTION"
set_color(1, 1, 0) # Jaune
buzzer_off()
else:
statut = "DANGER"
set_color(1, 0, 0) # Rouge
buzzer_on()
# 3. Préparation et Envoi des données MQTT
data = {
"temperature": temperature,
"hrv": round(hrv, 2),
"tremblements": round(tremb, 2),
"npi": npi,
"statut": statut
}
msg = json.dumps(data)
try:
print("Envoi MQTT:", msg)
client.publish(TOPIC, msg)
except Exception as e:
print("Erreur MQTT/Réseau:", type(e).__name__, e)
print("Tentative de reconnexion dans 2s...")
time.sleep(2)
try:
connect_wifi() # Vérifie et reconnecte le WiFi si besoin
client = connect_mqtt()
except Exception as e2:
print("Reconnexion échouée:", e2)
# Le DHT22 a besoin d'au moins 2 secondes entre chaque lecture
time.sleep(2)