from machine import Pin, I2C
from time import sleep
import time
import ntptime
from lcd_api import LcdApi
from i2c_lcd import I2cLcd
from umqtt.simple import MQTTClient
import network
import ussl #import ssl module
# Pin Configuration
PIR_PIN = 2 # GPIO pin for PIR motion sensor
TRIG_PIN = 4 # GPIO pin for ultrasonic sensor trigger
ECHO_PIN = 15 # GPIO pin for ultrasonic sensor echo
LED_PIN = 5 # GPIO pin for LED
# I2C Configuration for LCD
I2C_ADDR = 0x27 # I2C address for LCD
I2C_SDA = 21 # GPIO pin for I2C SDA
I2C_SCL = 22 # GPIO pin for I2C SCL
# Distance Threshold (in cm)
DISTANCE_THRESHOLD = 300 # 3 meters
# WiFi Configuration
SSID = 'Wokwi-GUEST'
PASSWORD = ''
# MQTT Configuration
MQTT_BROKER = "b6971126a04640a0a67fc15d53a03070.s1.eu.hivemq.cloud"
MQTT_PORT = 8883
MQTT_CLIENT_ID = "ESP32_SmartLighting"
MQTT_USERNAME = "papermate9" # Replace with your MQTT username
MQTT_PASSWORD = "#############" # Replace with your MQTT password
MQTT_TOPIC_PUBLISH = "smartlight/status"
MQTT_TOPIC_SUBSCRIBE = "smartlight/control"
ca_cert = """#######################
####################################
####################################
####################################
####################################
####################################
####################################
"""
# Setup GPIO Pins
pir = Pin(PIR_PIN, Pin.IN)
led = Pin(LED_PIN, Pin.OUT)
trig = Pin(TRIG_PIN, Pin.OUT)
echo = Pin(ECHO_PIN, Pin.IN)
# Setup I2C and LCD
i2c = I2C(1, scl=Pin(I2C_SCL), sda=Pin(I2C_SDA), freq=400000)
lcd = I2cLcd(i2c, I2C_ADDR, 4, 20) # 4x20 LCD
def sync_time():
try:
print("Syncing time with NTP...")
ntptime.settime()
time.sleep(2)
print("Updated system time:", time.localtime())
except Exception as e:
print("NTP sync failed, setting manual time...")
set_manual_time()
def set_manual_time():
rtc = machine.RTC()
rtc.datetime((2025, 2, 1, 0, 12, 30, 0, 0))
print("Manually set time:", time.localtime())
# Function to measure distance using ultrasonic sensor
def measure_distance():
try:
# Trigger a pulse
trig.value(0)
time.sleep_us(2)
trig.value(1)
time.sleep_us(10)
trig.value(0)
# Measure echo pulse duration
while echo.value() == 0:
start = time.ticks_us()
while echo.value() == 1:
end = time.ticks_us()
# Calculate distance
duration = time.ticks_diff(end, start)
distance = (duration * 0.0343) / 2 # Speed of sound: 343 m/s
return distance
except Exception as e:
print(f"Error measuring distance: {e}")
return -1 # Return -1 to indicate an error
# MQTT Callback
def mqtt_callback(topic, msg):
print(f"Received message on {topic.decode()}: {msg.decode()}")
if msg == b"turn on":
led.value(1)
lcd.move_to(0, 3)
lcd.putstr("LED: ON ")
elif msg == b"turn off":
led.value(0)
lcd.move_to(0, 3)
lcd.putstr("LED: OFF")
# Connect to Wi-Fi
def connect_to_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(SSID, PASSWORD)
while not wlan.isconnected():
print("Connecting to Wi-Fi...")
sleep(1)
print("Connected to Wi-Fi:", wlan.ifconfig())
# Connect to MQTT Broker
def connect_to_mqtt():
try:
# Define SSL parameters for certificate verification
ssl_params = {
"server_hostname": MQTT_BROKER, # Required for hostname validation
"cert_reqs": ussl.CERT_REQUIRED, # Require broker certificate verification
"cadata": ca_cert # Use the CA certificate
}
client = MQTTClient(
client_id=MQTT_CLIENT_ID,
server=MQTT_BROKER,
port=MQTT_PORT,
user=MQTT_USERNAME,
password=MQTT_PASSWORD,
ssl=True,
ssl_params=ssl_params # Pass SSL certificate verification parameters
)
client.set_callback(mqtt_callback)
client.connect()
client.subscribe(MQTT_TOPIC_SUBSCRIBE)
print(f"Connected to MQTT broker at {MQTT_BROKER} with TLS/SSL and Certificate Verification")
return client
except Exception as e:
print(f"Error connecting to MQTT broker with TLS/SSL: {e}")
return None
# Main loop
def main():
connect_to_wifi()
sync_time()
client = connect_to_mqtt()
if not client:
print("Failed to connect to MQTT broker. Exiting...")
return
try:
lcd.putstr("Smart Lighting\nInitializing...")
sleep(2)
lcd.clear()
lcd.putstr("Waiting for motion...")
while True:
try:
motion_detected = pir.value() # Read PIR sensor
distance = measure_distance() # Measure distance
lcd.clear()
lcd.putstr(f"Motion: {'Yes' if motion_detected else 'No'}")
if distance >= 0: # Ensure distance measurement is valid
lcd.move_to(0, 1)
lcd.putstr(f"Distance: {distance:.1f} cm")
if motion_detected and distance <= DISTANCE_THRESHOLD:
# Turn on the light and publish status
led.value(1)
lcd.move_to(0, 2)
lcd.putstr("Light: ON ")
client.publish(MQTT_TOPIC_PUBLISH, "Motion detected, light ON")
else:
# Turn off the light and publish status
led.value(0)
lcd.move_to(0, 2)
lcd.putstr("Light: OFF")
client.publish(MQTT_TOPIC_PUBLISH, "No motion, light OFF")
# Check for MQTT messages
client.check_msg()
sleep(0.5)
except Exception as loop_error:
print(f"Error in loop: {loop_error}")
lcd.clear()
lcd.putstr("Error occurred")
sleep(2)
except KeyboardInterrupt:
print("Program interrupted by user.")
finally:
print("Cleaning up...")
lcd.clear()
lcd.putstr("System stopped")
led.value(0) # Ensure the LED is turned off
client.disconnect()
# Run the program
main()