import machine
import time
from machine import Pin, I2C
import ssd1306
import network
from umqtt.simple import MQTTClient
# 全局配置
TRIG_PIN = 14 # HC-SR04触发引脚连接到GPIO14
ECHO_PIN = 12 # HC-SR04回声引脚连接到GPIO12
BUZZER_PIN = 15 # 蜂鸣器连接到GPIO15
# 报警阈值配置(单位:厘米)
DISTANCE_THRESHOLD = 30.0 # 距离小于此值时触发报警
# SSD1306 OLED配置
OLED_WIDTH = 128
OLED_HEIGHT = 64
OLED_SDA = 4 # GPIO4
OLED_SCL = 5 # GPIO5
# Wokwi WiFi配置
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# 巴法云MQTT配置
MQTT_CLIENT_ID = "c34f996c831c59799e2b0b5ef4c0aac9" # 替换为你的Client ID
MQTT_BROKER = "bemfa.com"
MQTT_PORT = 9501
MQTT_USER = "" # 如果不需要用户名,可以留空
MQTT_SECRET_KEY = "c34f996c831c59799e2b0b5ef4c0aac9" # 替换为你的巴法云密钥
TOPIC_DISTANCE = "Distance" # 距离主题
# 初始化I2C和OLED
i2c = I2C(0, sda=Pin(OLED_SDA), scl=Pin(OLED_SCL), freq=400000)
oled = ssd1306.SSD1306_I2C(OLED_WIDTH, OLED_HEIGHT, i2c)
# 初始化蜂鸣器
buzzer = Pin(BUZZER_PIN, Pin.OUT)
buzzer.value(0) # 初始状态关闭蜂鸣器
# 初始化WiFi
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
def connect_wifi():
"""连接WiFi网络"""
if not wifi.isconnected():
print("正在连接WiFi...")
oled.fill(0)
oled.text("Connecting WiFi", 0, 20)
oled.show()
wifi.connect(WIFI_SSID, WIFI_PASSWORD)
max_attempts = 20 # 增加尝试次数
while not wifi.isconnected() and max_attempts > 0:
max_attempts -= 1
print(f"等待WiFi连接...剩余尝试 {max_attempts}次")
time.sleep(1)
if wifi.isconnected():
print("WiFi连接成功:", wifi.ifconfig())
oled.fill(0)
oled.text("WiFi Connected", 0, 20)
oled.text(wifi.ifconfig()[0], 0, 35)
oled.show()
time.sleep(1)
else:
print("WiFi连接失败")
oled.fill(0)
oled.text("WiFi Failed", 0, 20)
oled.show()
raise RuntimeError("无法连接WiFi")
# 初始化MQTT客户端
mqtt_client = MQTTClient(
client_id=MQTT_CLIENT_ID,
server=MQTT_BROKER,
port=MQTT_PORT,
user=MQTT_USER,
password=MQTT_SECRET_KEY, # 使用密钥而不是密码
keepalive=30 # 缩短心跳间隔
)
def mqtt_callback(topic, msg):
"""MQTT消息回调函数"""
print(f"收到消息: {topic.decode()} -> {msg.decode()}")
# 可以在OLED上显示收到的消息
oled.fill(0)
oled.text("MQTT Msg:", 0, 0)
oled.text(f"{topic.decode()}:{msg.decode()}", 0, 20)
oled.show()
time.sleep(2)
def connect_mqtt():
"""连接MQTT服务器并订阅主题"""
try:
mqtt_client.set_callback(mqtt_callback)
mqtt_client.connect()
print("MQTT连接成功")
# 订阅主题(使状态显示为在线)
mqtt_client.subscribe(TOPIC_DISTANCE)
print(f"已订阅主题: {TOPIC_DISTANCE}")
oled.fill(0)
oled.text("MQTT Connected", 0, 20)
oled.text("Subscribed!", 0, 35)
oled.show()
time.sleep(1)
except Exception as e:
print(f"MQTT连接失败: {e}")
oled.fill(0)
oled.text("MQTT Failed", 0, 20)
oled.text(str(e)[:20], 0, 35) # 显示部分错误信息
oled.show()
raise
def read_sensor_data():
"""读取HC-SR04传感器数据"""
try:
# 触发HC-SR04传感器
trigger = Pin(TRIG_PIN, Pin.OUT)
echo = Pin(ECHO_PIN, Pin.IN)
trigger.value(0)
time.sleep_us(2)
trigger.value(1)
time.sleep_us(10)
trigger.value(0)
# 等待回声信号
while echo.value() == 0:
pulse_start = time.ticks_us()
while echo.value() == 1:
pulse_end = time.ticks_us()
# 计算脉冲持续时间
pulse_duration = pulse_end - pulse_start
# 计算距离 (声音速度 = 34300厘米/秒)
distance = pulse_duration * 0.0343 / 2
return distance
except Exception as e:
print(f"传感器读取错误: {e}")
return None
def publish_data(distance, alarm_status=False):
"""通过MQTT发布数据到巴法云"""
try:
if distance is not None:
# 包含距离和报警状态
data = f"dist:{distance:.1f},alarm:{1 if alarm_status else 0}"
mqtt_client.publish(TOPIC_DISTANCE, data)
print(f"已发布数据: {data}")
except Exception as e:
print(f"MQTT发布失败: {e}")
# 尝试重新连接
connect_mqtt()
def control_buzzer(alarm_active):
"""控制蜂鸣器"""
if alarm_active:
# 距离小于阈值,触发报警 - 蜂鸣器发出警报声
buzzer.value(1)
time.sleep(0.2) # 蜂鸣器响200ms
buzzer.value(0)
time.sleep(0.1) # 暂停100ms
else:
# 距离正常,关闭蜂鸣器
buzzer.value(0)
def display_data(distance):
"""在OLED和串口上显示数据"""
oled.fill(0)
oled.text("Distance Monitor", 0, 0)
for x in range(OLED_WIDTH):
oled.pixel(x, 12, 1)
alarm_status = False
if distance is not None:
oled.text(f"Dist: {distance:.1f} cm", 0, 20)
# 根据距离显示状态和控制报警
if distance < 10:
oled.text("Very Close!", 0, 35)
alarm_status = True
elif distance < 30:
oled.text("Close", 0, 35)
alarm_status = True
elif distance < 100:
oled.text("Medium", 0, 35)
else:
oled.text("Far", 0, 35)
print(f"距离: {distance}厘米 - 报警状态: {'开启' if alarm_status else '关闭'}")
# 控制蜂鸣器
control_buzzer(alarm_status)
# 发布数据
publish_data(distance, alarm_status)
else:
oled.text("Sensor Error!", 0, 20)
print("无法获取传感器数据")
# 显示连接状态
status = f"WiFi:{'On' if wifi.isconnected() else 'Off'}"
status += f" MQTT:{'On' if mqtt_client.sock else 'Off'}"
oled.text(status, 0, 50)
# 显示报警状态
if alarm_status:
oled.text("ALARM!", 90, 50)
oled.show()
def main():
"""主函数,程序入口点"""
print("HC-SR04距离监控系统启动...")
print(f"距离报警阈值: {DISTANCE_THRESHOLD}厘米")
# 初始连接
connect_wifi()
connect_mqtt()
try:
while True:
# 检查MQTT消息
try:
mqtt_client.check_msg() # 检查新消息
except Exception as e:
print(f"检查消息错误: {e}")
connect_mqtt() # 出错时重新连接
# 读取和显示数据
distance = read_sensor_data()
display_data(distance)
# 保持连接
try:
mqtt_client.ping()
except:
connect_mqtt()
# 短暂延迟
time.sleep(0.1)
except KeyboardInterrupt:
print("\n程序已停止")
buzzer.value(0) # 确保关闭蜂鸣器
mqtt_client.disconnect()
except Exception as e:
print(f"发生错误:{e}")
buzzer.value(0) # 确保关闭蜂鸣器
mqtt_client.disconnect()
if __name__ == "__main__":
main()