"""
MicroPython IoT Weather Station Example for Wokwi.com
To view the data:
1. Go to http://www.hivemq.com/demos/websocket-client/
2. Click "Connect"
3. Under Subscriptions, click "Add New Topic Subscription"
4. In the Topic field, type "wokwi-weather" then click "Subscribe"
Now click on the DHT22 sensor in the simulation,
change the temperature/humidity, and you should see
the message appear on the MQTT Broker, in the "Messages" pane.
Copyright (C) 2022, Uri Shaked
https://wokwi.com/arduino/projects/322577683855704658
"""
import time
import network
import ubinascii
import machine
from machine import Pin, ADC, I2C
import dht
import ujson
from umqtt.simple import MQTTClient
# =======================
# WIFI / MQTT
# =======================
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASS = ""
MQTT_BROKER = "broker.hivemq.com"
MQTT_PORT = 1883
TEAM = "teamX"
SALLE = "salle1"
TOPIC_TELEMETRY = "hackathon/{}/{}/telemetry".format(TEAM, SALLE)
# =======================
# PINS (selon ton câblage)
# =======================
I2C_SDA = 21
I2C_SCL = 22
DOOR_PIN = 27 # switch porte (GPIO <-> GND)
PIR_PIN = 14 # PIR OUT
DHT_PIN = 15 # DHT22
LDR_ADC_PIN = 34 # LDR AO
NOISE_ADC_PIN = 32 # pot wiper
GAS_ADC_PIN = 33 # gaz AOUT
PAGE_BTN_PIN = 26 # bouton page (GPIO <-> GND)
# =======================
# OLED
# =======================
oled = None
try:
import ssd1306
i2c = I2C(0, scl=Pin(I2C_SCL), sda=Pin(I2C_SDA), freq=400000)
oled = ssd1306.SSD1306_I2C(128, 64, i2c)
except Exception as e:
print("OLED non detecte / erreur I2C:", e)
# =======================
# CAPTEURS
# =======================
dht_sensor = dht.DHT22(Pin(DHT_PIN))
# ✅ FIX STABILITÉ (MAIS TOUJOURS 0/1 BRUT)
# Porte: switch entre GPIO et GND => pull-up pour éviter flottement
door = Pin(DOOR_PIN, Pin.IN, Pin.PULL_UP)
# PIR: pull-down pour que repos soit 0 stable (et mouvement => 1)
pir = Pin(PIR_PIN, Pin.IN, Pin.PULL_DOWN)
# bouton page
page_btn = Pin(PAGE_BTN_PIN, Pin.IN, Pin.PULL_UP)
def adc_setup(pin_num):
adc = ADC(Pin(pin_num))
adc.atten(ADC.ATTN_11DB)
adc.width(ADC.WIDTH_12BIT)
return adc
adc_ldr = adc_setup(LDR_ADC_PIN)
adc_noise = adc_setup(NOISE_ADC_PIN)
adc_gas = adc_setup(GAS_ADC_PIN)
# =======================
# UTILS
# =======================
def pct_from_adc(v):
return int((v * 100) / 4095)
def gas_level(gas_pct):
if gas_pct >= 70:
return "DANGER"
if gas_pct >= 40:
return "MOYEN"
return "OK"
def draw_hline(y, step=2):
if oled is None:
return
for x in range(0, 128, step):
oled.pixel(x, y, 1)
def bar(x, y, w, pct):
if oled is None:
return
if pct < 0:
pct = 0
if pct > 100:
pct = 100
fill = int(w * (pct / 100))
for i in range(w):
oled.pixel(x + i, y, 1)
for i in range(fill):
oled.pixel(x + i, y - 1, 1)
# =======================
# OLED (2 pages)
# =======================
def header(page_idx):
oled.text("SmartClass", 0, 0)
oled.text("{}/{}".format(TEAM, SALLE), 72, 0)
draw_hline(10)
# indicateur page
x = 118
y = 12
if page_idx == 0:
oled.pixel(x, y, 1); oled.pixel(x+1, y, 1)
oled.pixel(x, y+1, 1); oled.pixel(x+1, y+1, 1)
oled.pixel(x+4, y, 1)
else:
oled.pixel(x, y, 1)
oled.pixel(x+4, y, 1); oled.pixel(x+5, y, 1)
oled.pixel(x+4, y+1, 1); oled.pixel(x+5, y+1, 1)
# ✅ PAGE 1 lisible + gaz OK/MOYEN/DANGER
def oled_page_1(data):
header(0)
y = 14
dy = 10
# 0/1 BRUT (pas de OUV/FER ici)
oled.text("Porte : {}".format(data["door"]), 0, y); y += dy
oled.text("Motion: {}".format(data["motion"]), 0, y); y += dy
oled.text("Temp : {:.1f}C".format(data["temperature"]), 0, y); y += dy
oled.text("Hum : {:.0f}%".format(data["humidite"]), 0, y); y += dy
oled.text("Gaz : {}".format(data["gas_state"]), 0, y)
def oled_page_2(data):
header(1)
oled.text("Lux {:3d}%".format(data["lux_pct"]), 0, 16)
bar(70, 22, 55, data["lux_pct"])
oled.text("Bruit{:3d}%".format(data["noise_pct"]), 0, 32)
bar(70, 38, 55, data["noise_pct"])
oled.text("Gaz {:3d}%".format(data["gas_pct"]), 0, 48)
bar(70, 54, 55, data["gas_pct"])
def oled_render(data, page_idx):
if oled is None:
return
oled.fill(0)
if page_idx == 0:
oled_page_1(data)
else:
oled_page_2(data)
oled.show()
# =======================
# WIFI / MQTT
# =======================
def wifi_connect():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
wlan.connect(WIFI_SSID, WIFI_PASS)
t0 = time.time()
while not wlan.isconnected():
time.sleep(0.2)
if time.time() - t0 > 15:
raise RuntimeError("Timeout WiFi")
return wlan
def mqtt_connect():
client_id = b"esp32-" + ubinascii.hexlify(machine.unique_id())
c = MQTTClient(client_id, MQTT_BROKER, port=MQTT_PORT, keepalive=30)
c.connect()
return c
# =======================
# LECTURE CAPTEURS (door & motion = 0/1 BRUT)
# =======================
def read_sensors():
try:
dht_sensor.measure()
t = dht_sensor.temperature()
h = dht_sensor.humidity()
except Exception:
t = -99.9
h = -1.0
lux_raw = adc_ldr.read()
noise_raw = adc_noise.read()
gas_raw = adc_gas.read()
lux_pct = pct_from_adc(lux_raw)
noise_pct = pct_from_adc(noise_raw)
gas_pct = pct_from_adc(gas_raw)
data = {
"ts": int(time.time()),
"team": TEAM,
"salle": SALLE,
# ✅ 0/1 BRUT (stable grâce aux pulls)
"door": door.value(),
"motion": pir.value(),
"temperature": t,
"humidite": h,
"lux_pct": lux_pct,
"noise_pct": noise_pct,
"gas_pct": gas_pct,
"gas_state": gas_level(gas_pct),
}
return data
def publish(mqtt, data):
mqtt.publish(TOPIC_TELEMETRY, ujson.dumps(data))
# =======================
# BOUTON PAGE (anti-rebond)
# =======================
def button_edge(btn, last_state, last_ms, debounce_ms=150):
now = time.ticks_ms()
state = btn.value()
pressed = False
if state != last_state:
if time.ticks_diff(now, last_ms) > debounce_ms:
last_ms = now
if last_state == 1 and state == 0:
pressed = True
last_state = state
return pressed, last_state, last_ms
# =======================
# MAIN
# =======================
def main():
wifi_connect()
mqtt = mqtt_connect()
page = 0
last_btn_state = page_btn.value()
last_btn_ms = time.ticks_ms()
while True:
pressed, last_btn_state, last_btn_ms = button_edge(page_btn, last_btn_state, last_btn_ms)
if pressed:
page = 1 - page
data = read_sensors()
# debug console
print("door=", data["door"], "motion=", data["motion"],
"gaz%=", data["gas_pct"], "gaz_state=", data["gas_state"])
oled_render(data, page)
try:
publish(mqtt, data)
except Exception as e:
print("MQTT error:", e)
try:
mqtt = mqtt_connect()
except:
pass
time.sleep(0.2)
main()