import machine
import time
from machine import Pin, I2C
import ssd1306
import network
from umqtt.simple import MQTTClient
# 硬件配置
TRIG_PIN = 14 # HC-SR04触发引脚
ECHO_PIN = 12 # HC-SR04回声引脚
BUZZER_PIN = 15 # 蜂鸣器引脚
# SSD1306 OLED配置
OLED_WIDTH = 128
OLED_HEIGHT = 64
OLED_SDA = 4
OLED_SCL = 5
# 系统状态
system_mode = "auto" # 自动/手动模式
distance_threshold = 10.0 # 距离阈值(厘米)
buzzer_status = 0 # 蜂鸣器状态: 0-关闭, 1-开启
# Wokwi WiFi配置
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# 巴法云MQTT配置
MQTT_CLIENT_ID = "106786ca35a6454b871d685f23273ef9" # 替换为你的Client ID
MQTT_BROKER = "bemfa.com"
MQTT_PORT = 9501
MQTT_USER = ""
MQTT_SECRET_KEY = "106786ca35a6454b871d685f23273ef9" # 替换为你的巴法云密钥
TOPIC_DISTANCE = "SR04" # 距离数据主题
TOPIC_CONTROL = "SR04CTRL" # 控制命令主题
# 初始化硬件
i2c = I2C(0, sda=Pin(OLED_SDA), scl=Pin(OLED_SCL), freq=400000)
oled = ssd1306.SSD1306_I2C(OLED_WIDTH, OLED_HEIGHT, i2c)
buzzer = Pin(BUZZER_PIN, Pin.OUT)
buzzer.value(0) # 初始关闭蜂鸣器
# 初始化WiFi
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
def connect_wifi():
"""连接WiFi网络"""
if not wifi.isconnected():
print("正在连接WiFi...")
oled.fill(0)
oled.text("Connecting WiFi", 0, 20)
oled.show()
wifi.connect(WIFI_SSID, WIFI_PASSWORD)
max_attempts = 20
while not wifi.isconnected() and max_attempts > 0:
max_attempts -= 1
print(f"等待WiFi连接...剩余尝试 {max_attempts}次")
time.sleep(1)
if wifi.isconnected():
print("WiFi连接成功:", wifi.ifconfig())
oled.fill(0)
oled.text("WiFi Connected", 0, 20)
oled.text(wifi.ifconfig()[0], 0, 35)
oled.show()
time.sleep(1)
else:
print("WiFi连接失败")
oled.fill(0)
oled.text("WiFi Failed", 0, 20)
oled.show()
raise RuntimeError("无法连接WiFi")
# 初始化MQTT客户端
mqtt_client = MQTTClient(
client_id=MQTT_CLIENT_ID,
server=MQTT_BROKER,
port=MQTT_PORT,
user=MQTT_USER,
password=MQTT_SECRET_KEY,
keepalive=30
)
def mqtt_callback(topic, msg):
"""MQTT消息回调函数"""
global system_mode, distance_threshold, buzzer_status
topic = topic.decode('utf-8')
msg = msg.decode('utf-8')
print(f"收到命令: {topic} -> {msg}")
# 在OLED上显示消息
oled.fill(0)
oled.text("MQTT Command", 0, 0)
oled.text(f"{topic}:{msg}", 0, 20)
oled.show()
time.sleep(1)
# 解析控制命令
if topic == TOPIC_CONTROL:
if msg.startswith("mode:"):
mode = msg.split(":")[1]
if mode in ["auto", "manual"]:
system_mode = mode
print(f"模式切换为: {system_mode}")
if system_mode == "manual":
buzzer.value(0) # 切换到手动模式时关闭蜂鸣器
elif msg.startswith("threshold:"):
try:
distance_threshold = float(msg.split(":")[1])
print(f"阈值设置为: {distance_threshold}cm")
except ValueError:
print("无效的阈值格式")
elif msg.startswith("buzzer:") and system_mode == "manual":
try:
buzzer_status = int(msg.split(":")[1])
buzzer.value(buzzer_status)
print(f"蜂鸣器设置为: {'开启' if buzzer_status else '关闭'}")
except ValueError:
print("无效的蜂鸣器命令")
def connect_mqtt():
"""连接MQTT服务器并订阅主题"""
try:
mqtt_client.set_callback(mqtt_callback)
mqtt_client.connect()
print("MQTT连接成功")
# 订阅控制主题
mqtt_client.subscribe(TOPIC_CONTROL)
print(f"已订阅控制主题: {TOPIC_CONTROL}")
oled.fill(0)
oled.text("MQTT Connected", 0, 20)
oled.text("Subscribed!", 0, 35)
oled.show()
time.sleep(1)
except Exception as e:
print(f"MQTT连接失败: {e}")
oled.fill(0)
oled.text("MQTT Failed", 0, 20)
oled.text(str(e)[:20], 0, 35)
oled.show()
raise
def read_sensor_data():
"""读取HC-SR04传感器数据"""
try:
trigger = Pin(TRIG_PIN, Pin.OUT)
echo = Pin(ECHO_PIN, Pin.IN)
trigger.value(0)
time.sleep_us(2)
trigger.value(1)
time.sleep_us(10)
trigger.value(0)
# 等待回声信号
while echo.value() == 0:
pulse_start = time.ticks_us()
while echo.value() == 1:
pulse_end = time.ticks_us()
# 计算脉冲持续时间
pulse_duration = pulse_end - pulse_start
# 计算距离 (声音速度 = 34300厘米/秒)
distance = pulse_duration * 0.0343 / 2
return distance
except Exception as e:
print(f"传感器读取错误: {e}")
return None
def handle_alarm(distance):
"""处理报警逻辑"""
global buzzer_status
if system_mode == "auto":
if distance is not None and distance < distance_threshold:
# 自动模式下距离低于阈值时蜂鸣器报警
buzzer.value(1)
buzzer_status = 1
else:
buzzer.value(0)
buzzer_status = 0
def publish_data(distance):
"""通过MQTT发布数据到巴法云"""
try:
if distance is not None:
data = f"distance:{distance:.1f},mode:{system_mode},threshold:{distance_threshold},buzzer:{buzzer_status}"
mqtt_client.publish(TOPIC_DISTANCE, data)
print(f"已发布数据: {data}")
except Exception as e:
print(f"MQTT发布失败: {e}")
# 尝试重新连接
connect_mqtt()
def display_data(distance):
"""在OLED和串口上显示数据"""
oled.fill(0)
oled.text("Distance Monitor", 0, 0)
oled.hline(0, 12, OLED_WIDTH, 1) # 分隔线
if distance is not None:
# 显示距离
oled.text(f"Dist: {distance:.1f} cm", 0, 20)
# 显示阈值
oled.text(f"Thresh: {distance_threshold} cm", 0, 35)
# 显示模式
oled.text(f"Mode: {system_mode}", 0, 50)
# 显示蜂鸣器状态
buzzer_text = "Buzzer: ON" if buzzer_status else "Buzzer: OFF"
oled.text(buzzer_text, 0, 58)
print(f"距离: {distance:.1f}cm | 模式: {system_mode} | 蜂鸣器: {'开启' if buzzer_status else '关闭'}")
publish_data(distance)
else:
oled.text("Sensor Error!", 0, 20)
print("无法获取传感器数据")
# 显示连接状态
wifi_status = "WiFi: ON" if wifi.isconnected() else "WiFi: OFF"
mqtt_status = "MQTT: ON" if mqtt_client.sock else "MQTT: OFF"
oled.text(f"{wifi_status} {mqtt_status}", 0, 45)
oled.show()
def main():
"""主函数,程序入口点"""
print("HC-SR04距离监控系统启动...")
# 初始连接
connect_wifi()
connect_mqtt()
try:
while True:
# 检查MQTT消息
try:
mqtt_client.check_msg()
except Exception as e:
print(f"检查消息错误: {e}")
connect_mqtt() # 出错时重新连接
# 读取和显示数据
distance = read_sensor_data()
handle_alarm(distance)
display_data(distance)
# 保持连接
try:
mqtt_client.ping()
except:
connect_mqtt()
time.sleep(1) # 每秒读取一次
except KeyboardInterrupt:
print("\n程序已停止")
mqtt_client.disconnect()
except Exception as e:
print(f"发生错误:{e}")
mqtt_client.disconnect()
if __name__ == "__main__":
main()