import machine
import time
from hcsr04 import HCSR04 # Assuming you saved the HC-SR04 driver code as `hcsr04.py`
from ssd1306 import SSD1306_I2C # Assuming you added SSD1306 driver
from umqtt.simple import MQTTClient
# Hardware setup (based on updated Wokwi diagram)
trigger_pin = 4 # Ultrasonic Trigger (D4)
echo_pin = 15 # Ultrasonic Echo (D15)
oled_scl = 22 # OLED SCL (D22)
oled_sda = 21 # OLED SDA (D21)
button_pin = 13 # Push button (D13)
led_pin = 2 # LED (D2)
# Initialize components
sensor = HCSR04(trigger_pin=trigger_pin, echo_pin=echo_pin)
i2c = machine.I2C(0, scl=machine.Pin(oled_scl), sda=machine.Pin(oled_sda))
oled = SSD1306_I2C(128, 64, i2c)
button = machine.Pin(button_pin, machine.Pin.IN, machine.Pin.PULL_UP)
led = machine.Pin(led_pin, machine.Pin.OUT)
# MQTT setup (NETPIE 2020 credentials)
CLIENT_ID = "your-client-id"
TOKEN = "your-token"
HOST = "broker.netpie.io"
TOPIC_SHADOW = "@shadow/data/update"
TOPIC_STRING = "esp32/data"
mqtt_client = MQTTClient(CLIENT_ID, HOST, user=TOKEN, password="")
# Variables
mindist = 20 # Default minimum distance in cm
actual_distance = 0
last_sent_time = 0 # Time tracking for MQTT updates
NAME = "Your Name"
STUDENT_ID = "Your Student ID"
# Connect to NETPIE
def connect_mqtt():
try:
mqtt_client.connect()
print("Connected to MQTT broker")
except Exception as e:
print("MQTT connection failed:", e)
# Send shadow update to NETPIE
def send_shadow():
global actual_distance, mindist
alert_status = "Too Close" if actual_distance < mindist else "OK"
payload = {
"data": {
"mindist": mindist,
"actual_distance": actual_distance,
"alert": alert_status,
}
}
try:
mqtt_client.publish(TOPIC_SHADOW, str(payload))
print("Shadow updated:", payload)
except Exception as e:
print("Failed to send shadow:", e)
# Send string to NETPIE and Node-RED
def send_string():
timestamp = time.ticks_ms()
message = f"{timestamp} : {NAME} {STUDENT_ID}"
try:
mqtt_client.publish(TOPIC_STRING, message)
print("String sent:", message)
except Exception as e:
print("Failed to send string:", e)
# Button press handler
def button_handler(pin):
send_string()
# Display information on OLED
def update_display():
global actual_distance, mindist
oled.fill(0)
oled.text(f"Dist: {actual_distance} cm", 0, 0)
oled.text(f"MinDist: {mindist} cm", 0, 10)
if actual_distance < mindist:
oled.text("Alert: Too Close!", 0, 20)
led.on()
else:
oled.text("Alert: OK", 0, 20)
led.off()
oled.show()
# Initialize button interrupt
button.irq(trigger=machine.Pin.IRQ_FALLING, handler=button_handler)
# Main loop
def main():
global actual_distance, last_sent_time, mindist
connect_mqtt()
while True:
try:
# Measure distance
actual_distance = sensor.distance_cm()
update_display()
# Send shadow every 2 seconds
if time.ticks_ms() - last_sent_time > 2000:
send_shadow()
last_sent_time = time.ticks_ms()
time.sleep(0.3) # Measure every 300 ms
except OSError as e:
print("Sensor error:", e)
except Exception as e:
print("Error:", e)
if __name__ == "__main__":
main()