import machine
import dht
import time
from machine import Pin, ADC, I2C
from lcd_api import LcdApi
from i2c_lcd import I2cLcd
# --- Pin Setup ---
dht_sensor = dht.DHT22(Pin(4))
pir = Pin(27, Pin.IN)
mq135 = ADC(Pin(34))
mq135.atten(ADC.ATTN_11DB) # Full 0-3.3V range
red_led = Pin(26, Pin.OUT)
green_led = Pin(25, Pin.OUT)
# --- LCD Setup ---
i2c = I2C(0, sda=Pin(21), scl=Pin(22), freq=400000)
lcd = I2cLcd(i2c, 0x27, 2, 16)
# --- Thresholds ---
TEMP_MAX = 26.0 # Celsius — CIBSE comfort upper limit
HUMIDITY_MAX = 65.0 # % RH
CO2_THRESHOLD = 1000 # ppm — ASHRAE standard
def read_co2_ppm(raw_adc):
"""Convert raw ADC to approximate CO2 ppm (simplified linear mapping)"""
return int((raw_adc / 4095) * 2000) + 400 # Range: 400–2400 ppm
def check_alerts(temp, humidity, co2, occupied):
"""Edge-level threshold checking — runs without cloud connectivity"""
alert = False
if temp > TEMP_MAX:
alert = True
if humidity > HUMIDITY_MAX:
alert = True
if co2 > CO2_THRESHOLD:
alert = True
return alert
# --- Main Loop ---
while True:
try:
dht_sensor.measure()
temp = dht_sensor.temperature()
humidity = dht_sensor.humidity()
co2 = read_co2_ppm(mq135.read())
occupied = pir.value()
# Edge processing — alert decision made locally
alert = check_alerts(temp, humidity, co2, occupied)
# Actuator control
red_led.value(1 if alert else 0)
green_led.value(0 if alert else 1)
# Local display
lcd.clear()
lcd.putstr(f"T:{temp:.1f}C H:{humidity:.0f}%")
lcd.move_to(0, 1)
lcd.putstr(f"CO2:{co2}ppm {'!' if alert else 'OK'}")
# Serial output (simulates MQTT publish payload)
print(f'{{"temp":{temp},"humidity":{humidity},"co2":{co2},"occupied":{occupied},"alert":{int(alert)}}}')
except OSError as e:
print(f"Sensor error: {e}")
time.sleep(5)