import machine
from HCSR04 import HCSR04 # 從 HCSR04 模組中引入 HCSR04 用於超聲波測距
import network # 引入 network 模組,用於連接 Wi-Fi
import time # 引入 time 模組,用於延遲
import urequests # 引入 urequests 模組,用於發送 HTTP 請求
from umqtt.robust import MQTTClient # 引入MQTTClient模組,用於MQTT通訊
from machine import SoftI2C, Pin
from lcd_api import LcdApi
from i2c_lcd import I2cLcd
# 連接到 Wi-Fi 網絡
sta = network.WLAN(network.STA_IF) # 建立 Wi-Fi station(STA) 物件
sta.active(True) # 啟用 Wi-Fi 介面
sta.connect('Wokwi-GUEST', '') # 連接到指定的 Wi-Fi 網絡,請替換成你的 Wi-Fi SSID 和密碼
print('Linking...')
while not sta.isconnected(): # 等待直到連接成功
pass
print('Link OK')
# ThingSpeak MQTT 帳號和頻道訊息
THINGAPEAK_MQTT_CLIENT_ID = b"JSEOKS8fBAknNBM4PA0xKzY" # ThingSpeak MQTT客戶端ID
THINGAPEAK_MQTT_USERNAME = b"JSEOKS8fBAknNBM4PA0xKzY" # ThingSpeak MQTT用戶名
THINGAPEAK_MQTT_PASSWORD = b"6lYlTrEioZr6lSqCg6tgyMyS" # ThingSpeak MQTT密碼
THINGAPEAK_CHANNEL_ID = b"2574640" # ThingSpeak頻道ID
# 建立MQTT客戶端並連接到服務器
client = MQTTClient(
client_id=THINGAPEAK_MQTT_CLIENT_ID, # 客戶端ID
server=b"mqtt3.thingspeak.com", # MQTT服務器地址
user=THINGAPEAK_MQTT_USERNAME, # 用戶名
password=THINGAPEAK_MQTT_PASSWORD, # 密碼
ssl=False # 是否使用SSL
)
client.connect() # 連接到MQTT服務器
I2C_ADDR = 0x27
totalRows = 2
totalColumns = 16
i2c = SoftI2C(scl=Pin(22), sda=Pin(21), freq=10000) #initializing the I2C method for ESP32
# i2c = I2C(scl=Pin(5), sda=Pin(4), freq=10000) #initializing the I2C method for ESP8266
lcd = I2cLcd(i2c, I2C_ADDR, totalRows, totalColumns)
# 初始化超聲波傳感器
sr04_L = HCSR04(trigPin=25, echoPin=26,Pin.IN, Pin.PULL_UP)
sr04_R = HCSR04(trigPin=18, echoPin=19,Pin.IN, Pin.PULL_UP)
# 初始化中斷引腳,這裡假設有兩個引腳分別用於檢測左邊和右邊
int_pin_L = Pin(34, Pin.IN) # 根據實際接線選擇適當的引腳
int_pin_R = Pin(35, Pin.IN) # 根據實際接線選擇適當的引腳
# 顯示 "Under Detection" 只顯示一次
lcd.move_to(1,1)
lcd.putstr("Under Detection")
time.sleep(1)
# 中斷處理程序
def handle_interrupt(pin):
try:
if pin == int_pin_L:
L = sr04_L.distance()
if L < 100:
lcd.move_to(0,0)
lcd.putstr("Alert! ")
print('Alert! Left Distance:', L, 'cm')
credentials = bytes("channels/{:s}/publish".format(THINGAPEAK_CHANNEL_ID), 'utf-8')
payload = bytes("field1=ALARM_ON&field2=\n", 'utf-8')
client.publish(credentials, payload)
send_line_notify(f"警告!有人員闖入,左方超音波感測器偵測距離為{L} cm")
else:
lcd.move_to(0,0)
lcd.putstr(f"Left:{L}cm ")
print('Left Distance:', L, 'cm')
credentials = bytes("channels/{:s}/publish".format(THINGAPEAK_CHANNEL_ID), 'utf-8')
payload = bytes("field1={:.1f}&field2=\n".format(L), 'utf-8')
client.publish(credentials, payload)
elif pin == int_pin_R:
R = sr04_R.distance()
if R < 100:
lcd.move_to(0,1)
lcd.putstr("Alert! ")
print('Alert! Right Distance:', R, 'cm')
credentials = bytes("channels/{:s}/publish".format(THINGAPEAK_CHANNEL_ID), 'utf-8')
payload = bytes("field1=&field2=ALARM_ON\n", 'utf-8')
client.publish(credentials, payload)
send_line_notify(f"警告!有人員闖入,右方超音波感測器偵測距離為{R} cm")
else:
lcd.move_to(0,1)
lcd.putstr(f"Right:{R}cm ")
print('Right Distance:', R, 'cm')
credentials = bytes("channels/{:s}/publish".format(THINGAPEAK_CHANNEL_ID), 'utf-8')
payload = bytes("field1=\n&field2={:.1f}".format(R), 'utf-8')
client.publish(credentials, payload)
except Exception as e:
print(e.args[0])
# 設置外部中斷
int_pin_L.irq(trigger=Pin.IRQ_FALLING, handler=handle_interrupt)
int_pin_R.irq(trigger=Pin.IRQ_FALLING, handler=handle_interrupt)
# 發送Line Notify訊息
def send_line_notify(message):
token = '你的Line Notify權杖'
headers = {'Authorization': 'Bearer ' + token}
data = {'message': message}
url = 'https://notify-api.line.me/api/notify'
response = urequests.post(url, headers=headers, data=data)
response.close()
# 主循環
while True:
time.sleep(5)
# 每5秒發佈一次超音波距離
try:
L = sr04_L.distance()
R = sr04_R.distance()
lcd.move_to(0,0)
lcd.putstr(f"Left:{L}cm ")
lcd.move_to(0,1)
lcd.putstr(f"Right:{R}cm ")
print('Left Distance:', L, 'cm')
print('Right Distance:', R, 'cm')
credentials = bytes("channels/{:s}/publish".format(THINGAPEAK_CHANNEL_ID), 'utf-8')
payload = bytes("field1={:.1f}&field2={:.1f}\n".format(L, R), 'utf-8')
client.publish(credentials, payload)
except Exception as e:
print(e.args[0])