import uasyncio as asyncio
from machine import Pin
import network
import time
from umqtt.simple import MQTTClient
# Wi-Fi 配置
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# MQTT 配置
MQTT_BROKER = "test.mosquitto.org"
MQTT_TOPIC = "stust/traffic/" # 與前端一致
CLIENT_ID = "pico-w-traffic"
# 定義 GPIO 腳位
RED_PINS = [Pin(8, Pin.OUT), Pin(9, Pin.OUT)]
RED_PINS_2 = [Pin(17, Pin.OUT), Pin(18, Pin.OUT)]
YELLOW_PINS = [Pin(10, Pin.OUT), Pin(11, Pin.OUT)]
YELLOW_PINS_2 = [Pin(19, Pin.OUT), Pin(20, Pin.OUT)]
GREEN_PINS = [Pin(12, Pin.OUT), Pin(13, Pin.OUT)]
GREEN_PINS_2 = [Pin(21, Pin.OUT), Pin(22, Pin.OUT)]
# 當前模式
current_mode = 0
current_task = None
# 燈號重置
def reset_pins():
for pin in RED_PINS + RED_PINS_2 + YELLOW_PINS + YELLOW_PINS_2 + GREEN_PINS + GREEN_PINS_2:
pin.off()
# 連接 Wi-Fi
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
print("Connecting to WiFi...")
while not wlan.isconnected():
time.sleep(1)
print("Connected to WiFi:", wlan.ifconfig())
return wlan
# MQTT 回調函數
def on_message(topic, msg):
global current_mode, current_task
print(f"Received MQTT message: {msg}")
try:
mode = int(msg.decode("utf-8"))
if 0 <= mode < len(modes):
current_mode = mode
if current_task:
current_task.cancel()
try:
asyncio.run(current_task)
except asyncio.CancelledError:
pass
current_task = asyncio.create_task(modes[current_mode]())
print(f"Switched to mode {current_mode}")
except ValueError:
print("Invalid MQTT message format")
# 初始化 MQTT
def connect_mqtt():
client = MQTTClient(CLIENT_ID, MQTT_BROKER)
client.set_callback(on_message)
client.connect()
client.subscribe(MQTT_TOPIC)
print(f"Connected to MQTT broker at {MQTT_BROKER}, subscribed to {MQTT_TOPIC}")
return client
# 模式邏輯
async def mode_0(): # 模式 0:所有燈按順序閃爍
reset_pins()
all_pins = RED_PINS + YELLOW_PINS + GREEN_PINS + RED_PINS_2 + YELLOW_PINS_2 + GREEN_PINS_2
while True:
for pin in all_pins:
pin.on()
await asyncio.sleep(0.3)
pin.off()
async def mode_1(): # 模式 1:所有綠燈亮
reset_pins()
for pin in GREEN_PINS + GREEN_PINS_2:
pin.on()
await asyncio.sleep(1)
async def mode_2(): # 模式 2:所有黃燈亮
reset_pins()
for pin in YELLOW_PINS + YELLOW_PINS_2:
pin.on()
await asyncio.sleep(1)
async def mode_3(): # 模式 3:所有紅燈亮
reset_pins()
for pin in RED_PINS + RED_PINS_2:
pin.on()
await asyncio.sleep(1)
async def mode_4(): # 模式 4:黃燈閃爍
reset_pins()
while True:
for pin in YELLOW_PINS + YELLOW_PINS_2:
pin.on()
await asyncio.sleep(0.5)
for pin in YELLOW_PINS + YELLOW_PINS_2:
pin.off()
await asyncio.sleep(0.5)
async def mode_5(): # 模式 5:紅燈閃爍
reset_pins()
while True:
for pin in RED_PINS + RED_PINS_2:
pin.on()
await asyncio.sleep(0.5)
for pin in RED_PINS + RED_PINS_2:
pin.off()
await asyncio.sleep(0.5)
async def mode_8(): # 模式 8:交通燈序列運行
reset_pins()
while True:
for pin in RED_PINS:
pin.on()
for pin in GREEN_PINS_2:
pin.on()
await asyncio.sleep(4)
for pin in GREEN_PINS_2:
pin.off()
for pin in YELLOW_PINS_2:
pin.on()
await asyncio.sleep(1)
for pin in YELLOW_PINS_2:
pin.off()
for pin in RED_PINS:
pin.off()
for pin in RED_PINS_2:
pin.on()
for pin in GREEN_PINS:
pin.on()
await asyncio.sleep(4)
for pin in GREEN_PINS:
pin.off()
for pin in YELLOW_PINS:
pin.on()
await asyncio.sleep(1)
for pin in YELLOW_PINS:
pin.off()
for pin in RED_PINS_2:
pin.off()
# 模式列表
modes = [mode_0, mode_1, mode_2, mode_3, mode_4, mode_5, mode_8]
# 主程序
async def main():
global current_task
wlan = connect_wifi()
mqtt_client = connect_mqtt()
asyncio.create_task(mqtt_loop(mqtt_client))
current_task = asyncio.create_task(modes[current_mode]())
# MQTT 輪詢
async def mqtt_loop(client):
while True:
client.check_msg()
await asyncio.sleep(0.1)
# 啟動程式
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Program interrupted")