# 匯入必要的模組
from machine import Pin # 從 machine 模組匯入 Pin 類別
import network # 匯入 network 模組,用於設置網路連接
import time # 匯入 time 模組,用於時間相關功能
from umqtt.simple import MQTTClient # 從 umqtt.simple 模組匯入 MQTTClient 類別
import dht # 匯入 dht 模組,用於控制 DHT22 感測器
# 匯入異步處理庫
import uasyncio as asyncio # 匯入 uasyncio 模組,用於異步處理
# 初始化 WiFi 連接
sta = network.WLAN(network.STA_IF) # 創建 STA_IF 類別的實例,用於設置 Station 模式
sta.active(True) # 啟用 WiFi 連接
sta.connect('Wokwi-GUEST', '') # 連接到 WiFi 網絡,請更換成您自己的 SSID 和密碼
print('Linking...')
# 等待連接到 WiFi
while not sta.isconnected():
pass
print('Link OK')
# 初始化紅色和藍色 LED 引腳
LedRed = Pin(4, Pin.OUT) # 初始化紅色 LED,設置為輸出
LedRed.value(0) # 設置紅色 LED 初始狀態為關閉
LedBlue = Pin(7, Pin.OUT) # 初始化藍色 LED,設置為輸出
LedBlue.value(0) # 設置藍色 LED 初始狀態為關閉
# 初始化 DHT22 感測器引腳
dht22 = dht.DHT22(Pin(20, Pin.IN))
# 定義 MQTT 訂閱回調函數
def handle_callback(topic, msg):
m = msg.decode("utf-8") # 解碼消息為 UTF-8 格式
print("current topic is %s value is %s" % (topic, m)) # 打印當前主題和消息值
# 連接至 MQTT Broker
client_id = "GiEdMC0rNAg0LAExCjoxBC0" # 客戶端 ID
user_name = "GiEdMC0rNAg0LAExCjoxBC0" # 用戶名
password = "WbzSsFrcC8pBUXqRo2qfOON2" # 密碼
server = "mqtt3.thingspeak.com" # MQTT 伺服器地址
client = MQTTClient(client_id=client_id, server=server, user=user_name, password=password) # 創建 MQTT 客戶端實例
client.set_callback(handle_callback) # 設置 MQTT 客戶端回調函數
client.connect() # 連接至 MQTT 伺服器
client.subscribe(b"channels/2501612/subscribe/fields/+") # 訂閱指定主題
publish_topic = b"channels/2501612/publish" # 定義發布主題
# Task 1: 每5秒將數據發布到 MQTT Broker
async def task1():
while True:
print("Update temperature and humidity!") # 打印更新溫度和濕度信息
dht22.measure() # 測量溫度和濕度
t = dht22.temperature() # 讀取溫度
h = dht22.humidity() # 讀取濕度
payload = f"field1={t}&field2={h}&status=MQTTPUBLISH" # 構建發布消息內容
client.publish(publish_topic, payload.encode("utf-8")) # 發布消息到指定主題
await asyncio.sleep_ms(5000) # 等待5000毫秒(5秒)
# 如果溫度超過35度,則打印警告信息並點亮紅色 LED
if t > 35:
print("warning")
LedRed.value(1)
# 如果濕度超過80%,則打印警告信息並點亮藍色 LED
if h > 80:
print("warning")
LedBlue.value(1)
# Task 2: 每0.5秒檢查是否有從 MQTT Broker 接收到數據
async def task2():
while True:
client.check_msg() # 檢查是否有消息
await asyncio.sleep_ms(500) # 等待500毫秒(0.5秒)
# 如果是主程序
if __name__ == "__main__":
loop = asyncio.get_event_loop() # 獲取異步處理事件循環
loop.create_task(task1()) # 創建 Task 1
loop.create_task(task2()) # 創建 Task 2
loop.run_forever() # 開始異步處理事件循環