import dht
import machine
import time
import ssd1306
from umqtt.simple import MQTTClient
import network
# 全局配置
DHT_PIN = 14 # DHT22数据引脚连接到GPIO14
I2C_SCL = 5 # I2C时钟线连接到GPIO5
I2C_SDA = 4 # I2C数据线连接到GPIO4
OLED_WIDTH = 128 # OLED宽度
OLED_HEIGHT = 64 # OLED高度
TRIG_PIN = 12 # SR04触发引脚连接到GPIO12
ECHO_PIN = 13 # SR04回声引脚连接到GPIO13
BUZZER_PIN = 2 # 蜂鸣器引脚连接到GPIO2
LED_PIN = 15 # LED引脚连接到GPIO15
DS1307_ADDR = 0x68 # DS1307 I2C地址
# 需要修改的地方
wifiName = "Wokwi-GUEST" # wifi名称
wifiPassword = "" # wifi密码
clientID = "a7da001603df461e8d7ec9f43f217678" # Client ID,巴法云控制台获取
dht22Topic = "DHT22" # DHT22综合主题
distTopic = "SR04Distance" # 距离主题
buzzerTopic = "Buzzer" # 蜂鸣器状态主题
ledTopic = "Led" # LED状态主题
ds1307Topic = "DS1307" # DS1307时间主题(新增)
# 阈值设置
distance_threshold = 150 # 距离阈值
temp_threshold = 35 # 温度阈值(新增)
# 默认设置
serverIP = "bemfa.com" # mqtt服务器地址
port = 9501
keepalive = 60 # MQTT保活时间
# DS1307寄存器地址
DS1307_SECONDS = 0x00
DS1307_MINUTES = 0x01
DS1307_HOURS = 0x02
DS1307_DAY = 0x03
DS1307_DATE = 0x04
DS1307_MONTH = 0x05
DS1307_YEAR = 0x06
DS1307_CONTROL = 0x07
# 定义订阅回调函数
def sub_cb(topic, msg):
print(f"Received message on topic {topic}: {msg}")
# WIFI连接函数
def do_connect():
sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected():
print('connecting to network...')
sta_if.active(True)
sta_if.connect(wifiName, wifiPassword) # 注意参数顺序:wifiName在前
max_retries = 10
retry_count = 0
while not sta_if.isconnected() and retry_count < max_retries:
time.sleep(1)
retry_count += 1
if not sta_if.isconnected():
print("Failed to connect to WiFi after multiple retries.")
raise OSError("WiFi connection failed")
print('connect WiFi ok')
return sta_if
# 初始化mqtt连接配置
def connect_and_subscribe():
max_retries = 5
retry_count = 0
while retry_count < max_retries:
try:
client = MQTTClient(clientID, serverIP, port, keepalive=keepalive)
client.set_callback(sub_cb)
client.connect()
print("Connected to %s" % serverIP)
# 订阅所有需要的主题
client.subscribe(buzzerTopic)
client.subscribe(ledTopic)
client.subscribe(ds1307Topic)
client.subscribe(dht22Topic)
client.subscribe(distTopic)
print(f"Subscribed to: {buzzerTopic}, {ledTopic}, {ds1307Topic}, {dht22Topic}, {distTopic}")
return client
except OSError as e:
print(f"MQTT connection error (attempt {retry_count + 1}): {e}")
retry_count += 1
time.sleep(2)
print("Failed to connect to MQTT broker after multiple retries.")
raise OSError("MQTT connection failed")
# 初始化OLED显示屏
def init_display():
i2c = machine.I2C(scl=machine.Pin(I2C_SCL), sda=machine.Pin(I2C_SDA))
display = ssd1306.SSD1306_I2C(OLED_WIDTH, OLED_HEIGHT, i2c)
return display
# BCD码转十进制
def bcd_to_decimal(bcd):
return ((bcd >> 4) * 10) + (bcd & 0x0F)
# 十进制转BCD码
def decimal_to_bcd(decimal):
return ((decimal // 10) << 4) + (decimal % 10)
# 初始化DS1307 RTC模块
def init_ds1307(i2c):
seconds = i2c.readfrom_mem(DS1307_ADDR, DS1307_SECONDS, 1)[0]
if seconds & 0x80: # 时钟停止时初始化时间
# 设置示例时间:2025-06-15 12:00:00(星期日)
i2c.writeto_mem(DS1307_ADDR, DS1307_SECONDS, bytes([decimal_to_bcd(0)])) # 秒
i2c.writeto_mem(DS1307_ADDR, DS1307_MINUTES, bytes([decimal_to_bcd(0)])) # 分
i2c.writeto_mem(DS1307_ADDR, DS1307_HOURS, bytes([decimal_to_bcd(12)])) # 时(24小时制)
i2c.writeto_mem(DS1307_ADDR, DS1307_DAY, bytes([decimal_to_bcd(1)])) # 星期(1=星期日)
i2c.writeto_mem(DS1307_ADDR, DS1307_DATE, bytes([decimal_to_bcd(15)])) # 日
i2c.writeto_mem(DS1307_ADDR, DS1307_MONTH, bytes([decimal_to_bcd(6)])) # 月
i2c.writeto_mem(DS1307_ADDR, DS1307_YEAR, bytes([decimal_to_bcd(25)])) # 年(2025)
i2c.writeto_mem(DS1307_ADDR, DS1307_CONTROL, bytes([0x00])) # 启用1Hz方波
print("DS1307时间已设置")
else:
print("DS1307时间正常运行")
# 从DS1307获取当前时间
def get_ds1307_time(i2c):
data = i2c.readfrom_mem(DS1307_ADDR, DS1307_SECONDS, 7)
seconds = bcd_to_decimal(data[0] & 0x7F) # 忽略CH位
minutes = bcd_to_decimal(data[1])
# 处理12/24小时制
hours = data[2]
if hours & 0x40: # 12小时制
hours = bcd_to_decimal(hours & 0x1F)
if hours & 0x20: # PM
hours = (hours % 12) + 12
else: # 24小时制
hours = bcd_to_decimal(hours & 0x3F)
day = bcd_to_decimal(data[3])
date = bcd_to_decimal(data[4])
month = bcd_to_decimal(data[5])
year = bcd_to_decimal(data[6]) + 2000
return f"{year}-{month:02d}-{date:02d} {hours:02d}:{minutes:02d}:{seconds:02d}"
# 读取DHT22传感器数据
def read_sensor_data():
sensor = dht.DHT22(machine.Pin(DHT_PIN))
try:
sensor.measure()
return sensor.temperature(), sensor.humidity()
except OSError as e:
print(f"传感器读取错误: {e}")
return None, None
# 测量SR04距离(厘米)
def measure_distance():
trig = machine.Pin(TRIG_PIN, machine.Pin.OUT)
echo = machine.Pin(ECHO_PIN, machine.Pin.IN)
trig.value(0)
time.sleep_us(2)
trig.value(1)
time.sleep_us(10)
trig.value(0)
timeout = time.ticks_us() + 300000 # 300ms超时
while echo.value() == 0:
if time.ticks_us() > timeout:
print("测距超时")
return None
pulse_start = time.ticks_us()
while echo.value() == 1:
if time.ticks_us() > timeout:
print("测距超时")
return None
pulse_end = time.ticks_us()
pulse_duration = pulse_end - pulse_start
return round(pulse_duration * 0.034 / 2, 2) # 声速340m/s
# 初始化蜂鸣器和LED
def init_buzzer_led():
return machine.Pin(BUZZER_PIN, machine.Pin.OUT), machine.Pin(LED_PIN, machine.Pin.OUT)
# 根据温度和距离控制蜂鸣器和LED(修改点)
def control_buzzer_led(buzzer, led, distance, temperature):
# 蜂鸣器控制:距离超过阈值时响起
buzzer_status = "on" if distance is not None and distance < distance_threshold else "off"
buzzer.value(1 if buzzer_status == "on" else 0)
# LED控制:温度超过35℃时亮起(新增逻辑)
led_status = "on" if temperature is not None and temperature > temp_threshold else "off"
led.value(1 if led_status == "on" else 0)
return buzzer_status, led_status
# 在OLED显示数据
def display_data(temperature, humidity, distance, time_str, display):
display.fill(0)
if temperature is not None and humidity is not None and distance is not None:
print(f"时间: {time_str} | 温度: {temperature}°C | 湿度: {humidity}% | 距离: {distance}cm")
display.text(time_str, 0, 0)
display.text(f"Temp: {temperature}°C", 0, 15)
display.text(f"Humi: {humidity}%", 0, 30)
display.text(f"Dist: {distance}cm", 0, 45)
else:
print("无法读取传感器数据")
display.text(time_str, 0, 0)
display.text("Sensor Error", 0, 15)
display.show()
# 上报DS1307时间到MQTT
def publish_ds1307_time(client, i2c):
time_str = get_ds1307_time(i2c)
try:
client.publish(ds1307Topic, time_str)
print(f"DS1307时间已上报: {time_str}")
except Exception as e:
print(f"上报DS1307时间失败: {e}")
# 上报在线状态(新增函数)
def publish_online_status(client):
try:
client.publish(ds1307Topic, "online")
client.publish(dht22Topic, "online")
client.publish(distTopic, "online")
client.publish(buzzerTopic, "online")
client.publish(ledTopic, "online")
print("设备在线状态已上报")
except Exception as e:
print(f"上报在线状态失败: {e}")
# 发布传感器数据到MQTT
def publish_to_mqtt(client, temperature, humidity, distance, buzzer_status, led_status):
if temperature is not None and humidity is not None and distance is not None:
try:
dht22_data = f"{temperature} {humidity}"
client.publish(dht22Topic, dht22_data)
client.publish(distTopic, str(distance))
client.publish(buzzerTopic, buzzer_status)
client.publish(ledTopic, led_status)
print(f"数据已发布: {dht22Topic}={dht22_data}, {distTopic}={distance}")
except Exception as e:
print(f"MQTT发布失败: {e}")
raise
# 主函数
def main():
print("环境监控系统启动...")
last_status_publish = time.time()
# 连接WiFi
try:
sta_if = do_connect()
except OSError as e:
print(f"WiFi连接错误: {e}")
time.sleep(10)
machine.reset()
# 连接MQTT
try:
client = connect_and_subscribe()
publish_online_status(client) # 启动时上报在线状态
except OSError as e:
print(f"MQTT连接错误: {e}")
time.sleep(10)
machine.reset()
# 初始化I2C和显示屏
i2c = machine.I2C(scl=machine.Pin(I2C_SCL), sda=machine.Pin(I2C_SDA))
display = init_display()
display.text("System Starting...", 0, 0)
display.show()
# 初始化DS1307
init_ds1307(i2c)
display.text("RTC Initialized", 0, 15)
display.show()
time.sleep(1)
# 初始化蜂鸣器和LED
buzzer, led = init_buzzer_led()
# MQTT心跳管理
last_heartbeat = time.time()
try:
while True:
client.check_msg() # 检查MQTT消息
# 获取当前时间
time_str = get_ds1307_time(i2c)
# 定时上报状态(每30秒)
current_time = time.time()
if current_time - last_status_publish >= 30:
publish_online_status(client)
publish_ds1307_time(client, i2c)
last_status_publish = current_time
# 读取传感器数据
temperature, humidity = read_sensor_data()
distance = measure_distance()
# 控制蜂鸣器和LED(传递温度参数)
buzzer_status, led_status = control_buzzer_led(buzzer, led, distance, temperature)
# 显示数据
display_data(temperature, humidity, distance, time_str, display)
# 发布数据到MQTT
try:
publish_to_mqtt(client, temperature, humidity, distance, buzzer_status, led_status)
except Exception as e:
print(f"MQTT发布失败,尝试重连: {e}")
client = connect_and_subscribe()
publish_to_mqtt(client, temperature, humidity, distance, buzzer_status, led_status)
# 发送MQTT心跳
if current_time - last_heartbeat >= keepalive / 2:
try:
client.ping()
last_heartbeat = current_time
print("Sent MQTT heartbeat")
except Exception as e:
print(f"心跳包发送失败: {e}")
time.sleep(2) # 主循环间隔
except KeyboardInterrupt:
print("\n程序已停止")
display.fill(0)
display.text("Program Stopped", 0, 0)
display.show()
except Exception as e:
print(f"发生错误: {e}")
display.fill(0)
display.text("System Error", 0, 0)
display.show()
finally:
# 关闭设备
buzzer.value(0)
led.value(0)
try:
client.disconnect()
print("Disconnected from MQTT Broker")
except Exception:
pass
# 程序入口
if __name__ == "__main__":
main()