import machine
import time
import network
from hcsr04 import HCSR04
from umqtt.simple import MQTTClient
import ssd1306
import ubinascii
import sys
# ====================== 配置参数 ======================
CONFIG = {
# 巴法云MQTT配置
"mqtt": {
"server": "mqtt.bemfa.com",
"port": 9501,
"user": "a019001c4fee4049a0e55087d01bfc91",
"password": "",
"topic_distance": "mydistance",
"topic_status": "mystatus"
},
# WiFi配置
"wifi": {
"ssid": "Wokwi-GUEST",
"password": ""
},
# 硬件配置
"hardware": {
"ultrasonic": {
"trigger_pin": 4,
"echo_pin": 27
},
"oled": {
"scl_pin": 18,
"sda_pin": 19,
"width": 128,
"height": 64
},
"led_pin": 2
},
# 系统参数
"system": {
"publish_interval": 3,
"max_retries": 5,
"retry_delay": 5
}
}
# ====================== 系统初始化 ======================
class DistanceMonitor:
def __init__(self, config):
self.config = config
self.distance = 0
self.mqtt_client = None
self.initialize_hardware()
def initialize_hardware(self):
"""初始化硬件设备"""
# 超声波传感器
self.sensor = HCSR04(
trigger_pin=self.config["hardware"]["ultrasonic"]["trigger_pin"],
echo_pin=self.config["hardware"]["ultrasonic"]["echo_pin"]
)
# OLED显示屏
self.i2c = machine.SoftI2C(
scl=machine.Pin(self.config["hardware"]["oled"]["scl_pin"]),
sda=machine.Pin(self.config["hardware"]["oled"]["sda_pin"])
)
self.oled = ssd1306.SSD1306_I2C(
self.config["hardware"]["oled"]["width"],
self.config["hardware"]["oled"]["height"],
self.i2c
)
# LED指示灯
self.led = machine.Pin(self.config["hardware"]["led_pin"], machine.Pin.OUT)
self.led.off()
def connect_wifi(self):
"""连接WiFi网络"""
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print(f"Connecting to WiFi: {self.config['wifi']['ssid']}")
wlan.connect(self.config["wifi"]["ssid"], self.config["wifi"]["password"])
for _ in range(10): # 等待10秒
if wlan.isconnected():
break
time.sleep(1)
print(".", end="")
if wlan.isconnected():
print("\nWiFi connected:", wlan.ifconfig())
return True
print("\nWiFi connection failed")
return False
def connect_mqtt(self):
"""连接MQTT服务器"""
try:
# 生成唯一客户端ID
client_id = f"{self.config['mqtt']['user']}_{ubinascii.hexlify(machine.unique_id()).decode()}"
# 创建MQTT客户端
self.mqtt_client = MQTTClient(
client_id=client_id,
server=self.config["mqtt"]["server"],
port=self.config["mqtt"]["port"],
user=self.config["mqtt"]["user"],
password=self.config["mqtt"]["password"],
keepalive=60
)
# 设置最后遗嘱
self.mqtt_client.set_last_will(
f"{client_id}/{self.config['mqtt']['topic_status']}",
"offline",
retain=True
)
# 建立连接
self.mqtt_client.connect()
print(f"MQTT connected as {client_id}")
# 发布在线状态
self.mqtt_client.publish(
f"{client_id}/{self.config['mqtt']['topic_status']}",
"online",
retain=True
)
return True
except Exception as e:
print("MQTT connection failed:", e)
self.mqtt_client = None
return False
def measure_distance(self):
"""测量距离并返回厘米值"""
try:
dist = self.sensor.distance_cm()
return max(2, min(500, dist)) # 限制在2-500cm范围内
except Exception as e:
print("Distance measurement error:", e)
return -1
def publish_data(self, distance):
"""发布数据到MQTT"""
if not self.mqtt_client or distance < 0:
return False
try:
payload = str(distance)
self.mqtt_client.publish(
self.config["mqtt"]["topic_distance"],
payload
)
print(f"Published: {payload}cm")
# LED闪烁指示
self.led.on()
time.sleep(0.1)
self.led.off()
return True
except Exception as e:
print("Publish failed:", e)
self.mqtt_client = None
return False
def display_status(self, distance, wifi_ok, mqtt_ok):
"""在OLED上显示系统状态"""
self.oled.fill(0)
self.oled.text("Distance Monitor", 0, 0)
self.oled.text(f"Dist: {distance:.1f}cm", 0, 20)
self.oled.text(f"WiFi: {'OK' if wifi_ok else 'NO'}", 0, 35)
self.oled.text(f"MQTT: {'OK' if mqtt_ok else 'NO'}", 0, 50)
self.oled.show()
def run(self):
"""主运行循环"""
# 初始连接
wifi_ok = self.connect_wifi()
mqtt_ok = wifi_ok and self.connect_mqtt()
last_publish = 0
while True:
try:
# 测量距离
self.distance = self.measure_distance()
print(f"Distance: {self.distance:.1f}cm")
# 更新显示
self.display_status(
self.distance,
network.WLAN(network.STA_IF).isconnected(),
mqtt_ok and self.mqtt_client is not None
)
# 发布数据
current_time = time.time()
if (mqtt_ok and
current_time - last_publish >= self.config["system"]["publish_interval"]):
if self.publish_data(self.distance):
last_publish = current_time
else:
mqtt_ok = False
# 检查并恢复连接
if not network.WLAN(network.STA_IF).isconnected():
wifi_ok = self.connect_wifi()
mqtt_ok = False
if not mqtt_ok and wifi_ok:
mqtt_ok = self.connect_mqtt()
time.sleep(1)
except Exception as e:
print("System error:", e)
time.sleep(5)
# ====================== 主程序 ======================
if __name__ == "__main__":
monitor = DistanceMonitor(CONFIG)
monitor.run()