from machine import Pin
from umqtt.simple import MQTTClient
import network
import time
# Traffic light pin configuration
green_light = Pin(4, Pin.OUT)
yellow_light = Pin(16, Pin.OUT)
red_light = Pin(17, Pin.OUT)
# Network settings
ssid = "Wokwi-GUEST"
password = ""
# MQTT settings
mqtt_broker = "test.mosquitto.org"
mqtt_port = 1883
client_id = "esp32_door"
mqtt_topic = "traffic"
# Traffic light control variables
lights_on = True # Flag to track whether lights are ON or OFF
# Connect to WiFi
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, password)
while not wlan.isconnected():
time.sleep(1)
print("Connected to WiFi")
# Connect to MQTT
def connect_mqtt():
client = MQTTClient(client_id, mqtt_broker)
client.connect()
print("Connected to MQTT Broker")
return client
# Traffic light states
def set_light_state(green, yellow, red):
green_light.value(green)
yellow_light.value(yellow)
red_light.value(red)
# Turn off all lights
def turn_off_lights():
set_light_state(0, 0, 0)
print("All lights OFF")
# Handle state transitions
def transition_lights():
if green_light.value() == 1: # Green to Yellow to Red
set_light_state(0, 1, 0)
print("Yellow light ON")
time.sleep(3)
set_light_state(0, 0, 1)
print("Red light ON")
elif red_light.value() == 1: # Red to Green directly
print("Red light ON for 3 seconds")
time.sleep(3)
set_light_state(1, 0, 0)
print("Green light ON")
# Handle incoming MQTT messages
def on_message(topic, msg):
global lights_on
if msg == b"on":
lights_on = True
print("Traffic lights ON")
transition_lights()
elif msg == b"off":
lights_on = False
turn_off_lights()
elif msg == b"change_states":
if lights_on:
transition_lights()
# Main setup
connect_wifi()
client = connect_mqtt()
client.set_callback(on_message)
client.subscribe(mqtt_topic)
# Initialize with Green light ON
set_light_state(1, 0, 0)
# Loop to listen for messages
while True:
client.check_msg()
time.sleep(1)