from machine import ADC, Pin, I2C
from time import sleep_us, sleep_ms, ticks_us, ticks_diff
from math import sin, pi
import dht
from ds1307 import DS1307
from ssd1306 import SSD1306_I2C
import network
import time
import urequests
import json
print("Connecting to WiFi", end="")
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('Wokwi-GUEST', '')
while not sta_if.isconnected():
print(".", end="")
time.sleep(0.1)
print(" Connected!")
# ---------- CAPTEUR COURANT ----------
class ACS712:
def __init__(self, pin, vref=3.3, adc_res=4095, sensitivity=185):
self.adc = ADC(Pin(pin))
self.adc.atten(ADC.ATTN_11DB)
self.vref = vref
self.adc_res = adc_res
self.sensitivity = sensitivity
self.mid_point = 0
self.noise_mv = 0
def read_raw(self):
return self.adc.read()
def voltage(self, raw):
return raw * self.vref / self.adc_res
def auto_midpoint(self, samples=100):
values = [self.read_raw() for _ in range(samples)]
self.mid_point = sum(values) // len(values)
noise = max(values) - min(values)
self.noise_mv = (noise * self.vref * 1000) / self.adc_res
def get_midpoint(self):
return self.mid_point
def get_noise_mv(self):
return self.noise_mv
def mA_AC_sampling(self, cycles=1, freq=60):
period_us = int(1e6 / freq)
total_samples = []
t0 = ticks_us()
while ticks_diff(ticks_us(), t0) < cycles * period_us:
raw = self.read_raw()
centered = raw - self.mid_point
total_samples.append(centered)
sleep_us(200)
squared = [x**2 for x in total_samples]
rms_raw = (sum(squared) / len(squared))**0.5
mv_rms = rms_raw * self.vref * 1000 / self.adc_res
current_mA = mv_rms / self.sensitivity * 1000
self.form_factor = rms_raw / max(1, max([abs(x) for x in total_samples]))
return int(current_mA)
def get_form_factor(self):
return self.form_factor
# ---------- INITIALISATION ----------
# ACS712 sur pin 34
acs = ACS712(pin=34)
# Capteur de tension sur GPIO 35 (ADC1_CH7)
adc_voltage = ADC(Pin(35))
adc_voltage.atten(ADC.ATTN_11DB) # Pour mesurer jusqu'à ~3.3V
adc_voltage.width(ADC.WIDTH_12BIT) # Résolution 12 bits (0-4095)
val = adc_voltage.read()
R1 = 68000 # 68k ohms
R2 = 1000 # 1k ohms
# Calcul du vrai voltage
voltage = (val / 4095) * 3.3 # Tension à l'entrée ADC
real_voltage = voltage * (R1 + R2) / R2 # Tension réelle mesurée
# RTC DS1307 : SDA = GPIO16, SCL = GPIO15
i2c_rtc = I2C(0, scl=Pin(15), sda=Pin(16))
rtc = DS1307(i2c_rtc)
# DHT22 sur pin 5
dht_sensor = dht.DHT22(Pin(5))
# OLED SSD1306 : SDA = GPIO21, SCL = GPIO22
i2c_oled = I2C(1, scl=Pin(22), sda=Pin(21))
oled = SSD1306_I2C(128, 64, i2c_oled)
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
mac_bytes = wlan.config('mac')
mac_addr = ':'.join('{:02X}'.format(b) for b in mac_bytes)
print("Adresse MAC :", mac_addr)
# Calibrage ACS712
print("Initialisation...")
acs.auto_midpoint()
print("MidPoint:", acs.get_midpoint())
print("Noise mV:", acs.get_noise_mv())
###########
def is_device_linked(mac_address):
try:
url = "https://memoire-c397c-default-rtdb.firebaseio.com/linked_devices/{}.json".format(mac_address.replace(":", ""))
print("Vérification à l’URL :", url)
response = urequests.get(url)
data = response.json()
print("Réponse Firebase:", data)
response.close()
if data and data.get("linked") and data.get("user_id"):
return data.get("user_id")
else:
return None
except Exception as e:
print("Erreur de vérification:", e)
return None
def send_to_firebase(data, user_id, mac_address):
try:
# Formatage dynamique du chemin vers /symptoms/user_id/mac_address/
path = "SensorData/{}/{}/data.json".format(user_id, mac_address.replace(":", ""))
url = "https://memoire-c397c-default-rtdb.firebaseio.com/{}".format(path)
response = urequests.post(url, data=json.dumps(data))
print("Firebase:", response.text)
response.close()
except Exception as e:
print("Erreur Firebase:", e)
###########
# ---------- BOUCLE PRINCIPALE ----------
while True:
sleep_ms(100)
start = ticks_us()
# Lecture des capteurs
try:
dht_sensor.measure()
temp = dht_sensor.temperature()
hum = dht_sensor.humidity()
date_time = rtc.datetime()
# Lecture de la tension et conversion en tension réelle
raw_voltage = adc_voltage.read()
voltage = (raw_voltage / 4095) * 3.3
real_voltage = voltage * (R1 + R2) / R2
except OSError as e:
print("Erreur capteur :", e)
temp, hum = None, None
real_voltage = 0.0
date_time = (0, 0, 0, 0, 0, 0, 0)
current_mA = acs.mA_AC_sampling()
current_A = current_mA / 1000 # Conversion en ampères
puissance = real_voltage * current_A # Puissance en watts
duration = ticks_diff(ticks_us(), start)
# Affichage console
print("RTC: {:04d}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}".format(
date_time[0], date_time[1], date_time[2],
date_time[4], date_time[5], date_time[6]))
print("Température: {:.1f}°C | Humidité: {:.1f}%".format(temp, hum))
print("Courant (mA):", current_mA)
print("Form factor:", round(acs.get_form_factor(), 2))
print("Durée mesure (us):", duration)
print("Tension:", real_voltage)
print("Puissance: {:.2f} W".format(puissance))
print("MAC:", mac_addr)
print("-" * 30)
# Affichage OLED
oled.fill(0)
# Ligne 1 : Heure centrée
heure_str = "{:02d}:{:02d}:{:02d}".format(date_time[4], date_time[5], date_time[6])
puissance_str = "{:.1f}W".format(puissance)
# Concaténer les deux avec un espace
ligne1 = "{} {}".format(heure_str, puissance_str)
oled.text(ligne1, 0, 0) # Afficher à gauche ligne 1
# Ligne 2 : Température et Humidité (côte à côte)
oled.text("Temp:", 0, 15)
oled.text("{:.1f}C ".format(temp), 40, 15)
oled.text("Hum:", 80, 15)
oled.text("{:.1f}%".format(hum), 110, 15)
# Ligne 3 : Courant et Tension
oled.text("I:{}mA ".format(current_mA), 0, 30)
oled.text("U:{:.2f}V".format(real_voltage), 64, 30)
# Ligne 4 : Adresse MAC, tronquée ou complète
mac_affiche = "{}".format(mac_addr)
oled.text(mac_affiche, 0, 45)
oled.show()
user_id = is_device_linked(mac_addr.replace(":", ""))
if user_id:
payload = {
"temperature": temp,
"humidity": hum,
"current_mA": current_mA,
"voltage": real_voltage,
"puissance": puissance_str,
"heure": heure_str,
}
send_to_firebase(payload, user_id, mac_addr)
else:
print("Appareil non associé. Pas d'envoi.")
sleep_ms(5000)
ERC Warnings
gnd1:GND: Short circuit