from machine import Pin, time_pulse_us
from utime import sleep
import ujson
from umqtt.simple import MQTTClient
import network
import time
MQTT_CLIENT_ID = "coba123421234"
MQTT_BROKER = "broker.emqx.io"
MQTT_USER = ""
MQTT_PASSWORD = ""
MQTT_TOPIC = "/adhybobo/test/data"
# WiFi connection
print("Connecting to WiFi", end="")
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('Wokwi-GUEST', '')
while not sta_if.isconnected():
print(".", end="")
time.sleep(0.1)
print(" Connected!")
print("IP Address:", sta_if.ifconfig()[0])
# MQTT connection
while True:
try:
print("Connecting to MQTT server... ", end="")
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, user=MQTT_USER, password=MQTT_PASSWORD)
client.connect()
print("Connected!")
break
except Exception as e:
print(f"Failed to connect to MQTT server: {e}")
time.sleep(5) # Retry every 5 seconds
# Ultrasonic Sensor Setup
TRIG_PIN = 5 # Adjust based on Wokwi simulation
ECHO_PIN = 18 # Adjust based on Wokwi simulation
trig = Pin(TRIG_PIN, Pin.OUT)
echo = Pin(ECHO_PIN, Pin.IN)
def measure_distance():
trig.value(0)
time.sleep_us(2)
trig.value(1)
time.sleep_us(10)
trig.value(0)
duration = time_pulse_us(echo, 1, 30000)
if duration == -1:
return None
distance = (duration * 0.0343) / 2
return distance
# LED setup
led = Pin(15, Pin.OUT)
# Main loop
while True:
distance = measure_distance()
if distance is None:
print("No object detected")
continue
print(f"Distance: {distance:.2f} cm")
# Prepare MQTT message
message = ujson.dumps({
"distance": distance
})
# Publish MQTT message
try:
client.publish(MQTT_TOPIC, message)
print("Message published")
except Exception as e:
print(f"Failed to publish message: {e}")
# Blink LED
led.on()
sleep(0.5)
led.off()
sleep(0.5)