import network
import time
import ubinascii
import machine
from machine import Pin, PWM, ADC
import dht
from umqttsimple import MQTTClient
import ujson
# Wi-Fi credentials for Wokwi simulated network
SSID = 'Wokwi-GUEST'
PASSWORD = ''
# HiveMQ Cloud MQTT broker details (SSL/TLS enabled for secure communication)
MQTT_BROKER = 'b895136301624cc185f3f43c626c0890.s1.eu.hivemq.cloud'
MQTT_PORT = 8883 # TLS port (for secure SSL communication)
MQTT_USER = 'Oshani'
MQTT_PASSWORD = 'PoiuLkj@1234'
# MQTT Topics for publishing door status, sensor data, and receiving commands
CLIENT_ID = ubinascii.hexlify(machine.unique_id()).decode()
TOPIC_PUB = 'garage/door/status'
TOPIC_SUB = 'garage/door/command'
# Pin setup for HC-SR04 ultrasonic sensor (for distance measurement)
TRIG = Pin(5, Pin.OUT)
ECHO = Pin(18, Pin.IN)
# DHT22 sensor for temperature and humidity monitoring
dht22 = dht.DHT22(Pin(15))
# Servo motor to control the garage door
servo = PWM(Pin(16))
servo.freq(50)
# Potentiometer setup (for simulating atmospheric pressure)
potentiometer = ADC(Pin(34)) # GPIO 34 as an ADC pin
potentiometer.atten(ADC.ATTN_11DB) # Set attenuation for full 0-3.3V range
# Constants for servo positions (garage door open and closed angles)
SERVO_OPEN_ANGLE = 130 # Open position
SERVO_CLOSE_ANGLE = 60 # Closed position
# Variables to store previous sensor data and door state
prev_data = None
door_state = None # Track whether the door is open or closed
# SSL/TLS parameters for secure MQTT communication
ssl_params = {'server_hostname': MQTT_BROKER}
client = MQTTClient(CLIENT_ID, MQTT_BROKER, user=MQTT_USER, password=MQTT_PASSWORD, port=MQTT_PORT, ssl=True, ssl_params=ssl_params)
# ------------------------------
# Function: Connect to Wi-Fi
# Description: Connects the ESP32 to the specified Wi-Fi network.
# ------------------------------
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(SSID, PASSWORD)
# Retry Wi-Fi connection up to 10 times
for attempt in range(10):
if wlan.isconnected():
print(f"Connected to Wi-Fi! IP address: {wlan.ifconfig()}")
return
print(f"Attempting to connect to Wi-Fi... {attempt+1}")
time.sleep(2)
# If Wi-Fi connection fails after 10 attempts
print("Failed to connect to Wi-Fi after several attempts")
# ------------------------------
# Function: MQTT Callback
# Description: Handles incoming MQTT messages for controlling the garage door.
# Receives commands to open or close the door.
# ------------------------------
def mqtt_callback(topic, msg):
print(f"Received message on topic {topic.decode()}: {msg.decode()}")
if msg == b"open":
open_garage_door()
elif msg == b"close":
close_garage_door()
time.sleep(20)
# ------------------------------
# Function: Connect to MQTT Broker
# Description: Connects to the HiveMQ Cloud MQTT broker using SSL/TLS.
# ------------------------------
def connect_mqtt():
try:
print("Connecting to MQTT Broker (SSL)...")
client.set_callback(mqtt_callback)
client.connect()
client.subscribe(TOPIC_SUB)
print(f"Subscribed to {TOPIC_SUB}")
except Exception as e:
print(f"Error connecting to MQTT Broker: {e}")
# ------------------------------
# Function: Measure Distance using HC-SR04
# Description: Measures the distance to an object using the ultrasonic sensor.
# The garage door opens if a vehicle is detected within a certain range.
# ------------------------------
def measure_distance():
# Send a pulse to trigger the ultrasonic sensor
TRIG.value(0)
time.sleep_us(2)
TRIG.value(1)
time.sleep_us(10)
TRIG.value(0)
# Capture the time when the echo signal starts and stops
signal_off = signal_on = 0
while ECHO.value() == 0:
signal_off = time.ticks_us()
while ECHO.value() == 1:
signal_on = time.ticks_us()
# Calculate the distance based on the time it took for the echo to return
time_passed = time.ticks_diff(signal_on, signal_off)
distance = (time_passed * 0.0343) / 2 # Convert to cm (speed of sound)
return distance
# ------------------------------
# Function: Get Simulated Pressure
# Description: Simulates atmospheric pressure using a potentiometer.
# Maps the ADC value (0-4095) to a range of 950 hPa to 1050 hPa.
# ------------------------------
def get_pressure():
raw_value = potentiometer.read() # Read raw potentiometer value (0-4095)
# Map the raw value to a pressure range between 950 hPa and 1050 hPa
pressure = raw_value * (1050 - 950) / 4095 + 950
return pressure
# ------------------------------
# Function: Publish Door Status
# Description: Publishes the current door status (open or closed) to the MQTT broker.
# ------------------------------
def publish_door_status(status):
try:
message = ujson.dumps({"door_status": status})
client.publish(TOPIC_PUB, message)
print(f"Published door status: {status}")
except Exception as e:
print(f"Error publishing door status: {e}")
# ------------------------------
# Function: Open Garage Door
# Description: Controls the servo motor to open the garage door.
# The door state is published to the MQTT broker when the door is opened.
# ------------------------------
def open_garage_door():
global door_state
if door_state != 'open':
servo.duty_u16(int(SERVO_OPEN_ANGLE * 65535 / 180))
door_state = 'open'
print("Garage door opened")
publish_door_status(door_state)
# ------------------------------
# Function: Close Garage Door
# Description: Controls the servo motor to close the garage door.
# The door state is published to the MQTT broker when the door is closed.
# ------------------------------
def close_garage_door():
global door_state
if door_state != 'closed':
servo.duty_u16(int(SERVO_CLOSE_ANGLE * 65535 / 180))
door_state = 'closed'
print("Garage door closed")
publish_door_status(door_state)
# ------------------------------
# Function: Publish Sensor Data
# Description: Publishes temperature, humidity, and simulated pressure data to the MQTT broker.
# A delay is added after dht22.measure() to ensure stable readings.
# ------------------------------
def publish_sensor_data():
global prev_data
try:
dht22.measure()
time.sleep(2) # Delay added to allow the DHT22 sensor to stabilize before reading
temperature = dht22.temperature()
humidity = dht22.humidity()
pressure = get_pressure() # Get the simulated pressure value
# Create a message with temperature, humidity, and pressure
message = ujson.dumps({
"temp": temperature,
"humidity": humidity,
"pressure": pressure
})
# Only publish if the data has changed
if message != prev_data:
client.publish(TOPIC_PUB, message)
prev_data = message
print(f"Published sensor data: {message}")
except Exception as e:
print(f"Error publishing sensor data: {e}")
# ------------------------------
# Function: Main Program Loop
# Description: Runs the main logic of the system, including Wi-Fi/MQTT connection,
# distance measurement, and door control based on vehicle detection.
# ------------------------------
def main():
connect_wifi() # Connect to Wi-Fi
connect_mqtt() # Connect to MQTT broker
while True:
try:
client.check_msg() # Check for incoming MQTT messages
client.ping() # Keep MQTT connection alive
# Measure distance and control the garage door based on the proximity of a vehicle
distance = measure_distance()
if distance < 50: # Open the door if a vehicle is within 100 cm
open_garage_door()
else:
close_garage_door()
# Publish sensor data for temperature, humidity, and pressure
publish_sensor_data()
time.sleep(5) # Pause between iterations to reduce system load
except Exception as e:
print(f"Error in main loop: {e}")
# Start the program
main()