import time
from umqttsimple import MQTTClient
import ubinascii
import machine
import micropython
import network
import esp
from machine import Pin, I2C
import dht
import ssd1306
# Disable debug logs
esp.osdebug(None)
# Enable garbage collection
import gc
gc.collect()
# Wi-Fi credentials
ssid = 'Wokwi-GUEST'
password = ''
# MQTT broker settings
mqtt_server = 'broker.hivemq.com'
mqtt_port = 1883 # Default MQTT port
mqtt_user = None # Add username if needed
mqtt_password = None # Add password if needed
client_id = ubinascii.hexlify(machine.unique_id())
topic_pub_temp = b'esp/dht/temperature'
topic_pub_hum = b'esp/dht/humidity'
topic_pub_status = b'esp/dht/status' # New topic for status
last_message = 0
message_interval = 5 # Send data every 5 seconds
# Temperature thresholds
COLD_THRESHOLD = 20 # Temperature below this is considered cold
HOT_THRESHOLD = 30 # Temperature above this is considered hot
# LED pins
led_cold = Pin(14, Pin.OUT) # Blue LED
led_normal = Pin(12, Pin.OUT) # Green LED
led_hot = Pin(13, Pin.OUT) # Red LED
# Initialize all LEDs to off
led_cold.off()
led_normal.off()
led_hot.off()
# Wi-Fi connection
station = network.WLAN(network.STA_IF)
station.active(True)
station.connect(ssid, password)
print("Connecting to Wi-Fi...")
while not station.isconnected():
time.sleep(1)
print('Connection successful')
print(station.ifconfig())
# DHT sensor setup (GPIO 4)
sensor = dht.DHT22(Pin(4))
# SSD1306 OLED setup (I2C)
i2c = I2C(scl=Pin(22), sda=Pin(21))
oled_width = 128
oled_height = 64
oled = ssd1306.SSD1306_I2C(oled_width, oled_height, i2c)
# Connect to MQTT broker
def connect_mqtt():
global client_id, mqtt_server, mqtt_port, mqtt_user, mqtt_password
try:
client = MQTTClient(client_id, mqtt_server, port=mqtt_port, user=mqtt_user, password=mqtt_password)
client.connect()
print('Connected to %s MQTT broker' % (mqtt_server))
return client
except Exception as e:
print(f"Failed to connect to MQTT broker: {e}")
raise e
# Reconnect on failure
def restart_and_reconnect():
print('Failed to connect to MQTT broker. Reconnecting...')
time.sleep(10)
machine.reset()
# Read DHT22 sensor
def read_sensor():
try:
sensor.measure()
temp = sensor.temperature()
hum = sensor.humidity()
if (isinstance(temp, (float, int)) and isinstance(hum, (float, int))):
return temp, hum
else:
print("Invalid sensor readings.")
return None, None
except OSError as e:
print('Failed to read sensor:', e)
return None, None
# Update OLED display and LEDs, and return status
def update_display_and_leds(temp, hum):
oled.fill(0) # Clear the display
oled.text(f"Temp: {temp:.1f}C", 0, 0)
oled.text(f"Hum: {hum:.1f}%", 0, 10)
if temp < COLD_THRESHOLD:
status = "Cold"
oled.text(f"Status: {status}", 0, 20)
led_cold.on()
led_normal.off()
led_hot.off()
elif temp > HOT_THRESHOLD:
status = "Hot"
oled.text(f"Status: {status}", 0, 20)
led_cold.off()
led_normal.off()
led_hot.on()
else:
status = "Normal"
oled.text(f"Status: {status}", 0, 20)
led_cold.off()
led_normal.on()
led_hot.off()
oled.show()
return status
try:
client = connect_mqtt()
except Exception as e:
restart_and_reconnect()
# Main loop
while True:
try:
if (time.time() - last_message) > message_interval:
temp, hum = read_sensor()
if temp is not None and hum is not None:
print(f"Temperature: {temp:.1f}C, Humidity: {hum:.1f}%")
# Publish temperature and humidity
client.publish(topic_pub_temp, b'{:.1f}'.format(temp))
client.publish(topic_pub_hum, b'{:.1f}'.format(hum))
# Update display and LEDs, and determine status
status = update_display_and_leds(temp, hum)
# Publish status
client.publish(topic_pub_status, status.encode('utf-8'))
print(f"Status: {status}")
last_message = time.time()
except OSError as e:
print(f"Error: {e}")
restart_and_reconnect()