from machine import Pin, time_pulse_us, I2C
import time
import network
from simple import MQTTClient
import ssd1306
# Pin Definitions
TRIG_PIN = Pin(5, Pin.OUT)
ECHO_PIN = Pin(18, Pin.IN)
BUZZER_PIN = Pin(14, Pin.OUT)
GREEN_LED = Pin(26, Pin.OUT)
RED_LED = Pin(25, Pin.OUT)
# WiFi credentials
SSID = "Wokwi-GUEST"
PASSWORD = ""
# MQTT settings
MQTT_SERVER = "broker.hivemq.com"
MQTT_CLIENT_ID = "esp32-bus-alert"
MQTT_TOPIC_DISTANCE = b"teddy/distance"
MQTT_TOPIC_STATUS = b"teddy/status"
# Threshold
ARRIVAL_THRESHOLD = 30.0
bus_previously_arrived = False
# WiFi Connection
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(SSID, PASSWORD)
print("Connecting to WiFi", end="")
while not wlan.isconnected():
print(".", end="")
time.sleep(0.3)
print(" connected!")
# MQTT Setup
client = MQTTClient(MQTT_CLIENT_ID, MQTT_SERVER)
def connect_mqtt():
try:
client.connect()
print("Connected to MQTT!")
except Exception as e:
print("Failed to connect to MQTT:", e)
time.sleep(2)
machine.reset()
# I2C & OLED Display Setup
i2c = I2C(0, scl=Pin(22), sda=Pin(21))
oled = ssd1306(128, 64, i2c)
# Function to update OLED display
def update_oled(message):
oled.fill(0) # Clear display
oled.text("Bus Status:", 0, 0)
oled.text(message, 0, 20)
oled.show()
# Read distance using ultrasonic sensor
def read_distance_cm():
TRIG_PIN.off()
time.sleep_us(2)
TRIG_PIN.on()
time.sleep_us(10)
TRIG_PIN.off()
try:
duration = time_pulse_us(ECHO_PIN, 1, 30000) # 30ms timeout
distance = duration * 0.034 / 2
return distance
except OSError:
return 999 # timeout or error
# Main logic
def main():
global bus_previously_arrived
connect_wifi()
connect_mqtt()
while True:
distance = read_distance_cm()
print("Distance:", distance, "cm")
client.publish(MQTT_TOPIC_DISTANCE, str(round(distance, 2)))
if distance <= ARRIVAL_THRESHOLD:
if not bus_previously_arrived:
print("Bus is arriving!")
BUZZER_PIN.on()
GREEN_LED.on()
RED_LED.off()
client.publish(MQTT_TOPIC_STATUS, b"Arriving")
update_oled("Arriving")
bus_previously_arrived = True
else:
if bus_previously_arrived:
print("Bus left / Not near.")
BUZZER_PIN.off()
GREEN_LED.off()
RED_LED.on()
client.publish(MQTT_TOPIC_STATUS, b"Not Yet")
update_oled("Not Yet")
bus_previously_arrived = False
time.sleep(1)
# Run the main program
main()