from machine import Pin, I2C, ADC
from i2c_lcd import I2cLcd
from mpu6050 import MPU6050
import time
# ====================== PARAMÈTRES ======================
FS = 2 # Fréquence d'échantillonnage choisie (Hz)
DT = 1 / FS # Période entre chaque mesure
DUREE = 5 * 60 # Durée d'acquisition 5 minutes
N = int(DUREE * FS)
# Seuils de stress (en volts)
EDA_HIGH = 1.8
EDA_LOW = 1.6
ECG_HIGH = 1.8
ECG_LOW = 1.6
# ====================== INITIALISATION ======================
# LED
led = Pin(2, Pin.OUT)
# I2C LCD
i2c = I2C(0, scl=Pin(18), sda=Pin(19))
lcd = I2cLcd(i2c, 0x27, 2, 16)
# ADC Capteurs
adc_ecg = ADC(Pin(32))
adc_eda = ADC(Pin(33))
adc_pot = ADC(Pin(36))
for adc in (adc_ecg, adc_eda, adc_pot):
adc.atten(ADC.ATTN_11DB)
adc.width(ADC.WIDTH_12BIT)
# MPU6050
i2c_mpu = I2C(1, scl=Pin(22), sda=Pin(21))
devices = i2c_mpu.scan()
if 104 in devices:
try:
mpu = MPU6050(i2c_mpu)
print("MPU6050 détecté")
except OSError:
mpu = None
print("⚠ Impossible d'initialiser MPU6050")
else:
mpu = None
print("⚠ MPU6050 non détecté, valeurs simulées")
# ====================== LISTES POUR STOCKAGE ======================
timestamps = []
ecg_list = []
eda_list = []
pot_list = []
accel_list = []
gyro_list = []
stress_list = []
etat_stress = False
compteur_stress = 0
print("=== DÉMARRAGE ACQUISITION ===")
# ====================== FONCTION MOYENNE ======================
def moyenne(liste, n=5):
"""Renvoie la moyenne glissante des n dernières valeurs"""
if len(liste) < n:
return sum(liste)/len(liste)
return sum(liste[-n:])/n
# ====================== ACQUISITION ======================
start_time = time.time()
while time.time() - start_time < DUREE:
# Timestamp
t = time.localtime()
ts = "{:02d}:{:02d}:{:02d}".format(t[3], t[4], t[5])
# Lecture ADC
v_ecg = adc_ecg.read() * 3.3 / 4095
v_eda = adc_eda.read() * 3.3 / 4095
v_pot = adc_pot.read() * 3.3 / 4095
# Lecture MPU6050
if mpu:
try:
data_mpu = mpu.get_values()
accel = (data_mpu["AcX"], data_mpu["AcY"], data_mpu["AcZ"])
gyro = (data_mpu["GyX"], data_mpu["GyY"], data_mpu["GyZ"])
except OSError:
accel = (0,0,16384)
gyro = (0,0,0)
else:
accel = (0,0,16384)
gyro = (0,0,0)
# RPM simulé
rpm = 5 + (v_pot / 3.3) * 20
# Détection stress
if v_ecg >= ECG_HIGH or v_eda >= EDA_HIGH:
stress = True
elif v_ecg <= ECG_LOW and v_eda <= EDA_LOW:
stress = False
else:
stress = etat_stress
# Compteur et LED
if stress and not etat_stress:
compteur_stress += 1
if stress:
led.on()
else:
led.off()
etat_stress = stress
# Stockage des mesures dans les listes
timestamps.append(ts)
ecg_list.append(v_ecg)
eda_list.append(v_eda)
pot_list.append(v_pot)
accel_list.append(accel)
gyro_list.append(gyro)
stress_list.append(1 if stress else 0)
# Affichage LCD
lcd.move_to(0,0)
lcd.putstr("ECG:{:.2f} EDA:{:.2f}".format(v_ecg, v_eda))
lcd.move_to(0,1)
lcd.putstr(("Stress" if stress else "Normal") + f" {compteur_stress:02d}")
# Affichage console
print(f"{ts} | ECG:{v_ecg:.2f}V | EDA:{v_eda:.2f}V | POT:{v_pot:.2f}V | RPM:{rpm:.2f}")
print(f"ACCEL:{accel} | GYRO:{gyro} | Compteur Stress:{compteur_stress}\n")
# Attente jusqu'à la prochaine mesure
time.sleep(DT)
# ====================== ENREGISTREMENT CSV ======================
with open("data.txt", "w") as f:
f.write("Timestamp,ECG,EDA,POT,AcX,AcY,AcZ,GyX,GyY,GyZ,Stress\n")
for i in range(len(timestamps)):
ecg_moy = moyenne(ecg_list[:i+1])
eda_moy = moyenne(eda_list[:i+1])
pot_val = pot_list[i]
accel_val = accel_list[i]
gyro_val = gyro_list[i]
stress_val = stress_list[i]
ligne = "{},{:.2f},{:.2f},{:.2f},{},{},{},{},{},{},{}\n".format(
timestamps[i],
ecg_moy,
eda_moy,
pot_val,
accel_val[0], accel_val[1], accel_val[2],
gyro_val[0], gyro_val[1], gyro_val[2],
stress_val
)
f.write(ligne)
print("=== ACQUISITION TERMINÉE ===")
print(f"Compteur stress: {compteur_stress}")
print("Fichier 'data.txt' enregistré avec succès")