# 匯入所需的函式庫
from machine import Pin # 從machine模組引入Pin類別
import network # 引入network模組,用於連接Wi-Fi
import time # 引入time模組,用於時間延遲
from umqtt.simple import MQTTClient # 從umqtt.simple模組,引入MQTTClient類別,用於MQTT通訊
import dht # 引入dht模組,用於操作DHT22溫濕度傳感器
# asynchronous processing library
import uasyncio as asyncio # 引入uasyncio模組,給定別名asyncio
# 連接到 Wi-Fi 網絡
sta = network.WLAN(network.STA_IF) # 建立Wi-Fi station(STA)物件
sta.active(True) # 啟用Wi-Fi介面
sta.connect('Wokwi-GUEST','') # 連接到指定的Wi-Fi網絡,請替換成你的Wi-Fi SSID和密碼
print('Linking...') # shell顯示Linking...
while not sta.isconnected(): # 等待直到連接成功
pass
print('Link OK') # shell顯示Link OK
# 初始化 DHT22 溫濕度傳感器和連接的引腳
p0 = Pin(13, Pin.IN) # 創建Pin物件,將GPIO13設為資料腳,給定別名為p0,用於連接DHT22溫濕度傳感器
dht22 = dht.DHT22(p0) # 創建DHT22物件,並指定使用p0物件來連接DHT22溫溼度傳感器
# 初始化LED引腳及狀態
LED_Temp = Pin(18, Pin.OUT) # 創建 Pin 物件,指定引腳 18 為輸出引腳,並給定別名為LED_Temp
LED_Hum = Pin(19, Pin.OUT) # 創建 Pin 物件,指定引腳 19 為輸出引腳,並給定別名為LED_Hum
LED_Temp.value(0) # 關閉溫度 LED
LED_Hum.value(0) # 關閉濕度 LED
# 處理回調函式
def handle_callback(topic, msg):
m = msg.decode("utf-8") # 解碼收到的訊息
print(m, '我是m')
if "field2" in topic: # 如果主題包含 "field2",表示溫度資料
temp = float(m) # 將訊息轉換為浮點數
if temp > 35: # 如果溫度大於35度,則顯示溫度過高,並點亮溫度 LED
print("溫度過高,目前為 " + str(temp) + "°C")
LED_Temp.value(1)
else:
LED_Temp.value(0)
elif "field3" in topic: # 如果主題包含 "field3",表示濕度資料
hum = float(m) # 將訊息轉換為浮點數
if hum > 80: # 如果濕度大於80%,則顯示濕度過高,並點亮濕度 LED
print("濕度過高,目前為 " + str(hum) + "%")
LED_Hum.value(1)
else:
LED_Hum.value(0)
# key noi den broker
client_id = "JSEOKS8fBAknNBM4PA0xKzY" # 客戶端ID
user_name = "JSEOKS8fBAknNBM4PA0xKzY" # 使用者名稱
password = "6lYlTrEioZr6lSqCg6tgyMyS" # 密碼
server = "mqtt3.thingspeak.com" # 伺服器地址
client = MQTTClient(client_id = client_id, # 建立MQTT客戶端
server = server,
user = user_name,
password = password,)
client.set_callback(handle_callback) # 設定回調函式
client.connect() # 連接到MQTT Broker
client.subscribe(b"channels/2493946/subscribe/fields/+") # 訂閱特定主題
publish_topic = b"channels/2493946/publish" # 設定發布主題
# Task1:Publish data to MQTT broker each 5s
async def task1():
while True:
print("Update temperature and humidity!") # 更新溫度和濕度資料
total_temp = 0 # 初始化total_temp變數
total_hum = 0 # 初始化total_hum變數
for i in range(8): # 重複8次以取得平均值
dht22.measure() # 讀取 DHT22 溫濕度傳感器數據
t = dht22.temperature() # 讀取溫度值
h = dht22.humidity() # 讀取濕度值
total_temp += t # 累加溫度值
total_hum += h # 累加濕度值
await asyncio.sleep_ms(1000) # 等待1秒再取下一筆資料
avg_temp = total_temp / 8 # 計算平均溫度
avg_hum = total_hum / 8 # 計算平均濕度
payload = f"field2={avg_temp}&field3={avg_hum}&status=MQTTPUBLISH" # 組合發布資料
client.publish(publish_topic, payload.encode("utf-8")) # 發布資料到MQTT Broker
await asyncio.sleep_ms(5000) # 等待5秒再發布下一筆資料
# Task2:0.5s check if any data is sent from MQTT Broker?
async def task2():
while True:
client.check_msg() # 檢查是否有來自MQTT Broker的訊息
await asyncio.sleep_ms(500) # 等待0.5秒再檢查一次
# 如果這個程式是直接運行的,則執行以下代碼
if __name__=="__main__":
loop = asyncio.get_event_loop() # 獲取事件循環
loop.create_task(task1()) # 創建Task1任務
loop.create_task(task2()) # 創建Task2任務
loop.run_forever() # 開始執行任務循環