import time
import network
from machine import Pin
from umqtt.simple import MQTTClient
# WiFi settings
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# MQTT settings
MQTT_BROKER = "broker.mqttdashboard.com"
CLIENT_ID = "esp32_client"
MQTT_TOPIC = "LAMP"
MQTT_USER = ""
MQTT_PASSWORD = ""
# Define MQTT callbacks
def sub_cb(topic, msg):
print("Received message on topic {topic.decode()}: {msg.decode()}")
if msg.decode().lower() == "on":
led_pin.on()
print("LED turned ON via MQTT")
elif msg.decode().lower() == "off":
led_pin.off()
print("LED turned OFF via MQTT")
def connect_to_wifi():
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
wifi.connect(WIFI_SSID, WIFI_PASSWORD)
print("Connecting to Wi-Fi...")
while not wifi.isconnected():
pass
print("Connected to Wi-Fi")
# Initialize MQTT client
client = MQTTClient(CLIENT_ID, MQTT_BROKER)
client.set_callback(sub_cb)
# Initialize LED and PIR motion sensor
led_pin = Pin(33, Pin.OUT)
motion_pin = Pin(14, Pin.IN)
# Blink LED function
def blink_led(times, delay=0.5):
"""Blink LED a specified number of times."""
for _ in range(times):
led_pin.on()
time.sleep(delay)
led_pin.off()
time.sleep(delay)
def main():
connect_to_wifi()
client.connect()
print("Connected to MQTT broker")
client.subscribe(MQTT_TOPIC)
print(f"Subscribed to topic: {MQTT_TOPIC}")
while True:
try:
# Check for MQTT messages
client.check_msg()
# Control LED based on motion detection
if motion_pin.value():
print("Motion detected!")
blink_led(3)
client.publish(MQTT_TOPIC, "Motion detected!")
else:
print("No motion detected.")
client.publish(MQTT_TOPIC, "No motion detected!")
# Delay
time.sleep(5)
except OSError as e:
print('Failed to read sensor or MQTT. Reconnecting...')
client.disconnect()
connect_to_wifi()
client.connect()
client.subscribe(MQTT_TOPIC)
if __name__ == '__main__':
main()