import network
import ntptime
import time
from machine import Pin, I2C, ADC, RTC
import ssd1306
from dht import DHT22
# 需确保 remote.py 已正确实现 REMOTE_IR 类及 remote_scan 方法,此处根据实际模块调整
from remote import REMOTE_IR
# ====================== 配置区 ======================
# WiFi 配置(替换为实际的 WiFi 名称和密码)
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# 基础信息
NAME = "WangWenjun"
STUDENT_ID = "2023103030083"
SCREEN_UPDATE_INTERVAL = 1 # 屏幕刷新时间间隔(秒)
# OLED 初始化(I2C 引脚根据硬件连接调整,这里用 ESP32 的 22、21 引脚示例)
i2c = I2C(0, scl=Pin(22), sda=Pin(21), freq=400000)
oled = ssd1306.SSD1306_I2C(128, 64, i2c)
# 传感器初始化
# DHT22 接在 GPIO12
dht_sensor = DHT22(Pin(12))
# MQ2 模拟信号接在 GPIO34,设置衰减为 11dB(支持 0-3.3V 输入)
mq2_adc = ADC(Pin(34))
mq2_adc.atten(ADC.ATTN_11DB)
# PIR 人体红外传感器接在 GPIO4,输入模式
pir_sensor = Pin(4, Pin.IN)
# 红外接收模块(示例接在 GPIO14,需与硬件实际连接一致)
ir = REMOTE_IR(gpio_num=14)
# 输出设备
# 继电器接在 GPIO2,初始值设为 1(高电平,根据继电器类型确认有效电平)
relay = Pin(2, Pin.OUT, value=1)
# 蜂鸣器接在 GPIO13,初始值设为 0(低电平,根据驱动电路确认有效电平)
buzzer = Pin(13, Pin.OUT, value=0)
# RTC 初始化(联网后会通过 NTP 同步,这里先设一个初始值)
rtc = RTC()
# 阈值设置(可根据实际场景校准调整)
thresholds = {
# DHT22 温度阈值,超过触发报警
"dht_temp": 30.0,
# MQ2 气体传感器阈值,数值需实际校准
"mq2": 2000
}
# 标记当前是否处于阈值设置模式,None 表示正常显示模式
current_setting = None
# ====================== WiFi 连接与时间同步函数 ======================
def connect_wifi():
"""连接 WiFi 网络,增加超时重试和状态判断"""
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
if not sta_if.isconnected():
print("正在连接 WiFi...")
sta_if.connect(WIFI_SSID, WIFI_PASSWORD)
retry_count = 0
# 重试 20 次,每次间隔 0.5 秒,约 10 秒超时
while not sta_if.isconnected():
time.sleep(0.5)
retry_count += 1
if retry_count > 20:
print("WiFi 连接超时!")
break
if sta_if.isconnected():
print("WiFi 已连接,IP:", sta_if.ifconfig()[0])
else:
print("WiFi 连接失败,请检查配置和网络。")
def sync_ntp_time():
"""通过 NTP 同步时间,适配 MicroPython,手动转换为东八区(UTC+8)"""
try:
ntptime.host = "ntp.aliyun.com" # 使用国内 NTP 服务器
ntptime.settime() # 同步 UTC 时间
beijing_time = time.localtime(time.time() + 8 * 3600) # 更简洁的时区转换
year, month, mday, hour, minute, second, _, _ = beijing_time
rtc.datetime((year, month, mday, 0, hour, minute, second, 0)) # 周几参数(第 4 位)可设为 0(不影响显示)
print("时间同步成功")
except Exception as e:
print("NTP 同步失败:", str(e))
# 可选:设置默认时间(如当前日期)
# rtc.datetime(time.localtime())
# ====================== OLED 显示函数 ======================
def show_first_screen():
"""第一屏:显示姓名、学号、完整的年月日时分秒,持续 3 秒"""
start_ms = time.ticks_ms()
# 持续显示 3 秒
while time.ticks_diff(time.ticks_ms(), start_ms) < 3000:
# 获取 RTC 时间的年、月、日、时、分、秒
year, month, mday, _, hour, minute, second, _ = rtc.datetime()
# 格式化时间字符串,如:2025-06-10 14:30:05
time_info = f"Time: {year:04d}-{month:02d}-{mday:02d} {hour:02d}:{minute:02d}:{second:02d}"
oled.fill(0) # 清屏
oled.text(f"Name: {NAME}", 0, 0, 1) # 显示姓名
oled.text(f"ID: {STUDENT_ID}", 0, 10, 1) # 显示学号
oled.text(time_info, 0, 20, 1) # 显示完整时间
oled.show() # 刷新显示
time.sleep(SCREEN_UPDATE_INTERVAL)
def show_second_screen():
"""第二屏:显示传感器数据及阈值,持续 2 秒"""
oled.fill(0) # 清屏
# 读取 DHT22 温湿度,异常时赋默认值 -1
temp, humi = -1, -1
try:
dht_sensor.measure()
temp, humi = dht_sensor.temperature(), dht_sensor.humidity()
except Exception as e:
print("DHT22 读取失败:", str(e))
# 读取 MQ2 模拟值
mq2_value = mq2_adc.read()
# 判断 PIR 状态,有人为 YES,否则为 NO
pir_status = "YES" if pir_sensor.value() else "NO"
# 显示各类传感器数据
oled.text("Sensors Data", 0, 0, 1)
oled.text(f"Temp: {temp:.1f}℃ Th:{thresholds['dht_temp']}", 0, 10, 1)
oled.text(f"Hum: {humi:.1f}%", 0, 25, 1)
oled.text(f"MQ2: {mq2_value} Th:{thresholds['mq2']}", 0, 40, 1)
oled.text(f"PIR: {pir_status}", 0, 55, 1)
oled.show() # 刷新显示
time.sleep(2) # 显示停留 2 秒
# ====================== 红外遥控处理函数 ======================
def handle_remote():
"""处理红外遥控指令,需根据 remote.py 实际映射调整键值"""
global current_setting
# 获取红外指令(需确保 remote_scan 逻辑正确)
cmd, key = ir.remote_scan()
# 数字键 1:进入温度阈值设置模式
if key == "1":
current_setting = "dht_temp"
show_setting_screen()
# 数字键 2:进入 MQ2 阈值设置模式
elif key == "2":
current_setting = "mq2"
show_setting_screen()
# VOL+:增加当前设置的阈值(区分温度和 MQ2 调整步长)
elif key == "VOL+" and current_setting:
if current_setting == "dht_temp":
# 温度每次加 0.5,最大不超过 100℃
thresholds[current_setting] = min(thresholds[current_setting] + 0.5, 100)
else:
# MQ2 每次加 100,最大不超过 ADC 最大值 4095
thresholds[current_setting] = min(thresholds[current_setting] + 100, 4095)
# VOL-:减少当前设置的阈值(区分温度和 MQ2 调整步长)
elif key == "VOL-" and current_setting:
if current_setting == "dht_temp":
# 温度每次减 0.5,最小不低于 0℃
thresholds[current_setting] = max(thresholds[current_setting] - 0.5, 0)
else:
# MQ2 每次减 100,最小不低于 0
thresholds[current_setting] = max(thresholds[current_setting] - 100, 0)
# PLAY:保存设置并退出设置模式
elif key == "PLAY" and current_setting:
current_setting = None
oled.fill(0)
oled.text("Saved!", 0, 30, 1)
oled.show()
time.sleep(1)
def show_setting_screen():
"""阈值设置界面,显示当前设置的阈值及操作提示"""
oled.fill(0)
oled.text(f"SET {current_setting.upper()}", 0, 0, 1)
oled.text(f"Current: {thresholds[current_setting]}", 0, 10, 1)
oled.text("VOL+/- to adjust", 0, 25, 1)
oled.text("PLAY to save", 0, 40, 1)
oled.show()
# ====================== 主程序逻辑 ======================
def main():
# 初始化:先连 WiFi,再同步时间
connect_wifi()
sync_ntp_time()
while True:
if current_setting:
# 如果处于设置模式,优先处理红外遥控
handle_remote()
else:
# 正常模式:显示第一屏 → 第二屏 → 处理遥控(防止切换时漏按键)
show_first_screen()
show_second_screen()
handle_remote()
# 阈值触发逻辑(以温度过高为例,可扩展其他传感器)
try:
dht_sensor.measure()
temp = dht_sensor.temperature()
# 温度超过阈值,触发继电器和蜂鸣器
if temp > thresholds["dht_temp"]:
relay.value(0)
buzzer.value(1)
time.sleep(0.5)
buzzer.value(0)
else:
# 温度正常,继电器断开
relay.value(1)
except Exception as e:
print("阈值检测异常:", str(e))
pass
if __name__ == "__main__":
main()