from machine import Pin, time_pulse_us
import time
from umqtt.simple import MQTTClient
import network
# Pin definitions
trig_pin = Pin(15, Pin.OUT) # Trig connected to GPIO15
echo_pin = Pin(4, Pin.IN) # Echo connected to GPIO4
led_pin = Pin(2, Pin.OUT) # LED connected to GPIO2
# MQTT configurations
MQTT_BROKER = "broker.emqx.io"
MQTT_PORT = 1883
MQTT_CLIENT_ID = "sensor_hyd62nd63is"
MQTT_TOPIC = "NganjogSpirit/AuliaNur/sensor_data"
MQTT_CONTROL_TOPIC = "NganjogSpirit/AuliaNur/control_led" # Topic to control LED
# WiFi configurations
SSID = "Wokwi-GUEST"
PASSWORD = ""
# Distance threshold in centimeters
DISTANCE_THRESHOLD = 20
RECONNECT_DELAY = 5 # Delay between reconnection attempts in seconds
def connect_wifi():
"""Connect to WiFi network"""
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
if not wifi.isconnected():
print("Connecting to WiFi...")
wifi.connect(SSID, PASSWORD)
# Wait for connection with timeout
max_wait = 10
while max_wait > 0 and not wifi.isconnected():
max_wait -= 1
print(".", end="")
time.sleep(1)
if wifi.isconnected():
print("\nConnected to WiFi:", wifi.ifconfig())
return True
else:
print("\nFailed to connect to WiFi")
return False
def connect_mqtt():
"""Connect to MQTT broker with retry mechanism"""
while True:
try:
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, port=MQTT_PORT, keepalive=60)
client.connect()
print("Connected to MQTT Broker!")
return client
except Exception as e:
print("Failed to connect to MQTT broker:", e)
print(f"Retrying in {RECONNECT_DELAY} seconds...")
time.sleep(RECONNECT_DELAY)
def measure_distance():
"""Measure distance using ultrasonic sensor"""
try:
# Send a 10-microsecond pulse to the trig pin
trig_pin.off()
time.sleep_us(2)
trig_pin.on()
time.sleep_us(10)
trig_pin.off()
# Measure the duration of the echo pulse
duration = time_pulse_us(echo_pin, 1, 30000) # Reduced timeout to 30ms
if duration < 0:
return None
# Calculate distance in cm (duration * speed of sound / 2)
distance = (duration * 0.0343) / 2
return distance
except Exception as e:
print("Error measuring distance:", e)
return None
def handle_control_message(topic, msg):
"""Handle the received control message for LED"""
if topic == MQTT_CONTROL_TOPIC:
if msg == b"lampu_nyala":
led_pin.on()
print("LED ON")
elif msg == b"lampu_mati":
led_pin.off()
print("LED OFF")
def main():
# Initial connections
if not connect_wifi():
raise SystemExit
mqtt_client = connect_mqtt()
# Subscribe to the control topic to control LED
mqtt_client.subscribe(MQTT_CONTROL_TOPIC)
print(f"Subscribed to topic: {MQTT_CONTROL_TOPIC}")
while True:
try:
# Check for new messages
mqtt_client.check_msg() # This will call handle_control_message when a message is received
distance = measure_distance()
if distance is None:
print("No echo received!")
payload = '{"distance": "None"}'
else:
print(f"Distance: {distance:.2f} cm")
payload = f'{{"distance": {distance:.2f}}}'
# Turn LED on/off based on distance
if distance <= DISTANCE_THRESHOLD:
led_pin.on()
print("LED ON")
else:
led_pin.off()
print("LED OFF")
# Publish distance data to MQTT broker
try:
# Check if MQTT client is connected
mqtt_client.ping()
mqtt_client.publish(MQTT_TOPIC, payload)
print("Published to MQTT:", payload)
except Exception as e:
print("MQTT publish failed:", e)
print("Reconnecting to MQTT...")
mqtt_client = connect_mqtt() # Reconnect to MQTT
except Exception as e:
print("Main loop error:", e)
time.sleep(0.5)
# Start the program
try:
main()
except KeyboardInterrupt:
print("\nProgram terminated by user")
except Exception as e:
print("Fatal error:", e)
finally:
led_pin.off() # Ensure LED is off when program exits