from machine import Pin,ADC
import network
import time
from machine import Pin
from umqtt.simple import MQTTClient
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
wifi.connect('Wokwi-GUEST'," ")
while not wifi.isconnected():
time.sleep(0.5)
print("WiFi Connected")
# -----------------------
# NETPIE CONFIG
# -----------------------
CLIENT_ID = "454bfb18-d372-4f5e-8886-823e26900655"
TOKEN = "ACfKeFqwRNCRJovCAVezYNxh3gjsmLhB"
MQTT_BROKER = "broker.netpie.io"
SECRET = "VrHMH6a323tqs8pC9nhFHrj8EDZtqDH"
STATUS_TOPIC = b"@msg/traffic/status"
EMERGENCY_TOPIC = b"@msg/traffic/emergency"
# -----------------------
# GPIO SETUP
# -----------------------
red = Pin(19, Pin.OUT)
yellow = Pin(18, Pin.OUT)
green = Pin(5, Pin.OUT)
button = Pin(14, Pin.IN, Pin.PULL_UP)
# -----------------------
# GLOBAL STATE
# -----------------------
emergency_mode = False
# -----------------------
# WiFi (Wokwi auto-connect)
# -----------------------
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
while not wlan.isconnected():
time.sleep(0.5)
print("WiFi Connected")
# -----------------------
# MQTT CALLBACK
# -----------------------
def mqtt_callback(topic, msg):
global emergency_mode
print("Received:", topic, msg)
if topic == EMERGENCY_TOPIC:
if msg == b"ON":
emergency_mode = True
else:
emergency_mode = False
# -----------------------
# MQTT CONNECT
# -----------------------
def connect_mqtt():
client = MQTTClient(
CLIENT_ID,
MQTT_BROKER,
user=TOKEN,
password=SECRET
)
client.set_callback(mqtt_callback)
client.connect()
client.subscribe(EMERGENCY_TOPIC)
print("MQTT Connected & Subscribed")
return client
# -----------------------
# LED CONTROL
# -----------------------
def set_light(r, y, g):
red.value(r)
yellow.value(y)
green.value(g)
# -----------------------
# MAIN
# -----------------------
connect_wifi()
mqtt = connect_mqtt()
while True:
mqtt.check_msg()
# ===== EMERGENCY MODE =====
if emergency_mode:
set_light(0, 1, 0)
mqtt.publish(STATUS_TOPIC, b"EMERGENCY")
time.sleep(0.5)
set_light(0, 0, 0)
time.sleep(0.5)
continue
# ===== NORMAL MODE =====
# RED
set_light(1, 0, 0)
mqtt.publish(STATUS_TOPIC, b"RED")
time.sleep(3)
# GREEN
set_light(0, 0, 1)
mqtt.publish(STATUS_TOPIC, b"GREEN")
for _ in range(6):
mqtt.check_msg()
if button.value() == 0:
break
time.sleep(0.5)
# YELLOW
set_light(0, 1, 0)
mqtt.publish(STATUS_TOPIC, b"YELLOW")
time.sleep(2)