import network
import time
import ujson
from machine import Pin, time_pulse_us
from umqtt.simple import MQTTClient
# Wi-Fi and MQTT Broker settings
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
MQTT_BROKER = "broker.emqx.io"
MQTT_PORT = 1883
MQTT_TOPIC_DISTANCE = "/UNI559/alex/data_sensor"
MQTT_TOPIC_LED = "/UNI559/alex/aktuasi_led"
MQTT_USERNAME = "emqx"
MQTT_PASSWORD = "public"
# Pin configuration
LED_PIN = Pin(23, Pin.OUT)
TRIGGER_PIN = Pin(5, Pin.OUT) # Trigger pin for ultrasonic sensor
ECHO_PIN = Pin(18, Pin.IN) # Echo pin for ultrasonic sensor
def connect_wifi():
"""Connect to WiFi."""
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print("Connecting to WiFi...")
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
while not wlan.isconnected():
print(".", end="")
time.sleep(0.5)
print("\nWiFi connected!")
print("IP Address:", wlan.ifconfig()[0])
else:
print("WiFi already connected.")
print("IP Address:", wlan.ifconfig()[0])
def connect_mqtt():
"""Connect to the MQTT broker and subscribe to the LED control topic."""
print("Connecting to MQTT broker...")
client = MQTTClient(
"WokwiESP32Client",
MQTT_BROKER,
MQTT_PORT,
MQTT_USERNAME,
MQTT_PASSWORD,
)
try:
client.set_callback(on_message)
client.connect()
client.subscribe(MQTT_TOPIC_LED)
print("Connected to MQTT broker.")
print(f"Subscribed to topic: {MQTT_TOPIC_LED}")
except Exception as e:
print("Error connecting to MQTT broker:", str(e))
raise e
return client
def measure_distance():
"""Measure distance using the ultrasonic sensor."""
TRIGGER_PIN.off()
time.sleep_us(2)
TRIGGER_PIN.on()
time.sleep_us(10)
TRIGGER_PIN.off()
try:
duration = time_pulse_us(ECHO_PIN, 1, 30000) # Timeout 30ms
distance = (duration / 2) / 29.1 # Convert to cm
return round(distance, 2)
except Exception as e:
print("Error measuring distance:", str(e))
return None
def on_message(topic, msg):
"""Handle incoming MQTT messages."""
try:
print(f"Received message: {msg.decode()} on topic: {topic.decode()}")
if topic.decode() == MQTT_TOPIC_LED:
payload = ujson.loads(msg)
command = payload.get("msg", "").lower()
if command == "on":
LED_PIN.on()
print("LED turned ON")
elif command == "off":
LED_PIN.off()
print("LED turned OFF")
else:
print("Unknown command received for LED.")
except Exception as e:
print("Error processing message:", str(e))
def main():
"""Main loop to measure distance and check for MQTT messages."""
connect_wifi()
client = connect_mqtt()
while True:
try:
# Measure distance
distance = measure_distance()
if distance is not None:
payload = ujson.dumps({"distance": distance})
client.publish(MQTT_TOPIC_DISTANCE, payload)
print("Distance sent to MQTT broker:", payload)
# Check for incoming MQTT messages
client.check_msg()
time.sleep(2)
except Exception as e:
print("Error in main loop:", str(e))
time.sleep(2)
if __name__ == "__main__":
main()