"""
ESP32 IoT Project with MQTT
- publishes data from 2 DHT22 sensors and an ultrasonic sensor
- controls 3 LEDs (orange, pink, blue) via MQTT subscription
- uses EMQX broker for MQTT communication
- designed for Wokwi ESP32 simulation
created by: bhagaskara rafael w/ ❤
"""
from machine import Pin, Timer, time_pulse_us
import network
import time
import dht
import ujson
from umqtt.simple import MQTTClient
# pin setup for sensors and leds
PINS = {
'trig': 18,
'echo': 19,
'dht1': 32,
'dht2': 33,
'led_orange': 25,
'led_pink': 26,
'led_blue': 27
}
# mqtt settings
MQTT = {
'client_id': "glitchhunters-esp32",
'broker': "broker.hivemq.com",
'user': "",
'password': "",
'topic_sensor': "/glitchhunters/bhagaskara/data_sensor",
'topic_led': "/glitchhunters/bhagaskara/aktuasi_led"
}
# initialize our hardware
trig = Pin(PINS['trig'], Pin.OUT)
echo = Pin(PINS['echo'], Pin.IN)
dht1 = dht.DHT22(Pin(PINS['dht1']))
dht2 = dht.DHT22(Pin(PINS['dht2']))
led_orange = Pin(PINS['led_orange'], Pin.OUT)
led_pink = Pin(PINS['led_pink'], Pin.OUT)
led_blue = Pin(PINS['led_blue'], Pin.OUT)
def connect_wifi():
"""connects esp32 to wifi network"""
print("connecting to wifi", end="")
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('Wokwi-GUEST', '')
# wait until connected
while not sta_if.isconnected():
print(".", end="")
time.sleep(0.1)
print(" connected!")
def measure_distance():
"""gets distance reading from ultrasonic sensor"""
# trigger the ultrasonic pulse
trig.value(0)
time.sleep_us(2)
trig.value(1)
time.sleep_us(10)
trig.value(0)
# measure the return pulse
duration = time_pulse_us(echo, 1)
return duration / 58.0 # convert to centimeters
def handle_led_message(topic, msg):
"""processes incoming mqtt messages for led control"""
topic = topic.decode('utf-8')
msg = msg.decode('utf-8')
print(f"got message on {topic}: {msg}")
try:
# parse the json message
data = ujson.loads(msg)
# update led states
if "orange" in data:
led_orange.value(data["orange"])
if "pink" in data:
led_pink.value(data["pink"])
if "blue" in data:
led_blue.value(data["blue"])
except:
print("couldn't process the message")
def read_sensors():
"""gets readings from all sensors"""
try:
dht1.measure()
dht2.measure()
return {
"temp1": dht1.temperature(),
"humidity1": dht1.humidity(),
"temp2": dht2.temperature(),
"humidity2": dht2.humidity(),
"distance": measure_distance()
}
except:
print("sensor reading failed")
return None
def main():
"""main program loop"""
# setup wifi
connect_wifi()
# connect to mqtt broker
print("connecting to mqtt broker...", end="")
client = MQTTClient(MQTT['client_id'], MQTT['broker'],
user=MQTT['user'], password=MQTT['password'])
client.set_callback(handle_led_message)
client.connect()
print(" connected!")
# subscribe to led control topic
client.subscribe(MQTT['topic_led'])
# keep track of last readings to avoid duplicate publishing
prev_data = ""
# main loop
while True:
try:
# get sensor readings
sensor_data = read_sensors()
if sensor_data:
message = ujson.dumps(sensor_data)
# publish only if readings changed
if message != prev_data:
print(f"publishing: {message}")
client.publish(MQTT['topic_sensor'], message)
prev_data = message
# check for new led control messages
client.check_msg()
# wait a bit before next reading
time.sleep(1)
except Exception as e:
print(f"oops! something went wrong: {e}")
time.sleep(1)
# kick things off
if __name__ == "__main__":
main()