import uasyncio as asyncio
from machine import Pin
import network
import ubinascii
from umqtt.simple import MQTTClient
# WiFi 配置
WIFI_SSID = "李昆霖的iPhone" # 輸入您的 WiFi SSID
WIFI_PASSWORD = "qwer1234" # 輸入您的 WiFi 密碼
# MQTT 配置
MQTT_BROKER = "mqtt-dashboard.com" # MQTT 代理伺服器地址
MQTT_PORT = 1883 # MQTT 代理伺服器埠號
MQTT_TOPIC = "stust/traffic/" # 訂閱的主題
CLIENT_ID = ubinascii.hexlify(network.WLAN().config('mac')).decode() # 使用裝置的唯一 ID 作為 MQTT 客戶端 ID
# 定義燈號的 GPIO 腳位
RED_PINS = Pin(8, Pin.OUT) # 水平紅燈
YELLOW_PINS = Pin(10, Pin.OUT) # 水平黃燈
GREEN_PINS = Pin(12, Pin.OUT) # 水平綠燈
RED_PINS_2 = Pin(9, Pin.OUT) # 垂直紅燈
YELLOW_PINS_2 = Pin(11, Pin.OUT) # 垂直黃燈
GREEN_PINS_2 = Pin(13, Pin.OUT) # 垂直綠燈
BUTTON_MODE = Pin(16, Pin.IN, Pin.PULL_UP) # 模式切換按鈕
# 當前模式與任務
current_mode = 0 # 初始模式為 0
current_task = None # 當前執行的任務
mqtt_client = None # MQTT 客戶端
# WiFi 連接邏輯
async def connect_wifi():
print("連接 WiFi 中...")
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
while not wlan.isconnected():
await asyncio.sleep(1)
print("等待 WiFi 連接...")
print("WiFi 已連接!")
print("IP 位址:", wlan.ifconfig()[0])
# MQTT 訊息回呼函數
def mqtt_callback(topic, msg):
global current_mode
try:
message = msg.decode("utf-8")
print(f"收到 MQTT 訊息: {message}")
if message.isdigit():
new_mode = int(message)
if 0 <= new_mode < len(modes):
current_mode = new_mode
asyncio.create_task(switch_mode()) # 切換模式
except Exception as e:
print("MQTT 回呼錯誤:", e)
# 切換模式函數
async def switch_mode():
global current_task
if current_task:
current_task.cancel()
try:
await current_task
except asyncio.CancelledError:
print("當前模式任務已取消")
current_task = asyncio.create_task(modes[current_mode]())
print(f"切換至模式: {current_mode}")
# MQTT 連接邏輯
async def connect_mqtt():
global mqtt_client
print("連接 MQTT 代理伺服器中...")
mqtt_client = MQTTClient(CLIENT_ID, MQTT_BROKER, MQTT_PORT)
mqtt_client.set_callback(mqtt_callback)
mqtt_client.connect()
mqtt_client.subscribe(MQTT_TOPIC)
print(f"已訂閱 MQTT 主題: {MQTT_TOPIC}")
# 重置所有燈號狀態
def reset_pins():
for pin in [RED_PINS, YELLOW_PINS, GREEN_PINS, RED_PINS_2, YELLOW_PINS_2, GREEN_PINS_2]:
pin.off()
# 模式 0:聖誕節燈效
async def mode_0():
reset_pins()
all_pins = [RED_PINS, YELLOW_PINS, GREEN_PINS, RED_PINS_2, YELLOW_PINS_2, GREEN_PINS_2]
while True:
for pin in all_pins:
pin.on()
await asyncio.sleep(0.3)
pin.off()
async def mode_1():
reset_pins() # 重置燈號
for pin in [GREEN_PINS, GREEN_PINS_2]:
pin.on()
await asyncio.sleep(1)
async def mode_2():
reset_pins() # 重置燈號
for pin in [YELLOW_PINS, YELLOW_PINS_2]:
pin.on()
await asyncio.sleep(1)
async def mode_3():
reset_pins() # 重置燈號
for pin in [RED_PINS, RED_PINS_2]:
pin.on()
await asyncio.sleep(1)
# 將模式與函數對應
modes = [mode_0, mode_1, mode_2, mode_3]
# 按鈕處理邏輯
async def button_handler():
global current_mode, current_task
while True:
if not BUTTON_MODE.value(): # 如果按鈕被按下
current_mode = (current_mode + 1) % len(modes) # 切換模式
print(f"切換至模式: {current_mode}")
if current_task:
current_task.cancel()
try:
await current_task
except asyncio.CancelledError:
pass
current_task = asyncio.create_task(modes[current_mode]())
await asyncio.sleep(0.5) # 防止按鈕抖動
await asyncio.sleep(0.1)
# 主程式邏輯
async def main():
global current_task
await connect_wifi() # 連接 WiFi
await connect_mqtt() # 連接 MQTT
current_task = asyncio.create_task(modes[current_mode]()) # 啟動初始模式
asyncio.create_task(button_handler()) # 啟動按鈕處理邏輯
while True:
mqtt_client.check_msg() # 檢查 MQTT 訊息
await asyncio.sleep(0.1)
try:
asyncio.run(main())
except KeyboardInterrupt:
print("程式中斷")