from machine import Pin, SoftI2C # 改用 SoftI2C 避免弃用警告
from ssd1306 import SSD1306_I2C
import time
from hcsr04 import HCSR04
import network
from umqtt.simple import MQTTClient
import ubinascii
# 硬件配置
trigger_pin = Pin(4, Pin.OUT) # 超声波触发引脚
echo_pin = Pin(27, Pin.IN) # 超声波接收引脚
i2c_sda = Pin(21) # I2C 数据线
i2c_scl = Pin(22) # I2C 时钟线
oled_width = 128
oled_height = 64
# 网络配置
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# 巴法云 MQTT 配置
MQTT_SERVER = "bemfa.com"
MQTT_PORT = 9501
MQTT_CLIENT_ID = "e9328d5400b64c579cbb62a014f878a7"
MQTT_USER = "+8613380444725"
MQTT_PASSWORD = "e9328d5400b64c579cbb62a014f878a7"
MQTT_TOPIC_PUB = "distance123456" # 发布距离信息的主题
MQTT_TOPIC_SUB = "control" # 订阅控制指令的主题
# 距离状态阈值(单位:厘米)
NEAR_THRESHOLD = 20 # 近距离阈值
FAR_THRESHOLD = 50 # 远距离阈值
# 全局变量
mqtt_client = None
is_mqtt_connected = False
wlan = None # WLAN 对象,用于网络操作
# 初始化 HC-SR04、OLED、网络
def setup():
global hcsr04, oled, wlan
# 初始化 HC-SR04 传感器
hcsr04 = HCSR04(trigger_pin=trigger_pin, echo_pin=echo_pin)
# 初始化 OLED 显示屏(使用 SoftI2C 避免弃用警告)
i2c = SoftI2C(scl=i2c_scl, sda=i2c_sda)
oled = SSD1306_I2C(oled_width, oled_height, i2c)
oled.fill(0)
oled.text('System Starting', 0, 0)
oled.show()
# 连接 WiFi
connect_wifi()
# 若 WiFi 已连接,尝试初始化 MQTT
if wlan and wlan.isconnected():
connect_mqtt()
else:
print("WiFi 未连接,暂不初始化 MQTT")
# 显示初始信息
update_oled(0, "INIT", "Connecting...")
# 连接 WiFi 函数(含详细调试)
def connect_wifi():
global wlan, is_mqtt_connected
oled.fill(0)
oled.text('Connecting WiFi', 0, 0)
oled.show()
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('开始连接 WiFi 网络...')
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
max_wait = 15 # 延长等待时间,适配网络协商
while max_wait > 0:
status_code = wlan.status()
print(f'当前 WiFi 状态码: {status_code}') # 打印状态码调试
# 状态码 3 = 已连接,0/1/2 = 连接中,其他 = 错误
if status_code == 3:
break
elif status_code in [ -1, 4, 5 ]: # 常见错误状态码(-1=超时,4=认证失败,5=关联失败)
print(f'WiFi 连接出错,状态码 {status_code}(可能密码错误/网络不可达)')
break
max_wait -= 1
print(f'等待连接... 剩余重试次数: {max_wait}')
time.sleep(1)
# 连接结果判断
if wlan.isconnected():
ip_info = wlan.ifconfig()
print(f'WiFi 连接成功!IP: {ip_info[0]} 子网掩码: {ip_info[1]}')
oled.fill(0)
oled.text('WiFi Connected', 0, 0)
oled.text(ip_info[0], 0, 15)
oled.show()
time.sleep(2)
is_mqtt_connected = True
else:
print('WiFi 连接失败,请检查网络配置')
oled.fill(0)
oled.text('WiFi Failed', 0, 0)
oled.show()
is_mqtt_connected = False
# 连接 MQTT 服务器(含重连逻辑)
def connect_mqtt():
global mqtt_client, is_mqtt_connected
try:
oled.fill(0)
oled.text('Connecting MQTT', 0, 0)
oled.show()
# 初始化 MQTT 客户端
mqtt_client = MQTTClient(
client_id=MQTT_CLIENT_ID,
server=MQTT_SERVER,
port=MQTT_PORT,
user=MQTT_USER,
password=MQTT_PASSWORD
)
mqtt_client.set_callback(mqtt_callback) # 配置消息回调
mqtt_client.connect() # 建立连接
mqtt_client.subscribe(MQTT_TOPIC_SUB) # 订阅控制主题
print('MQTT 连接成功!已订阅主题:', MQTT_TOPIC_SUB)
oled.fill(0)
oled.text('MQTT Connected', 0, 0)
oled.text(MQTT_SERVER, 0, 15)
oled.show()
time.sleep(2)
is_mqtt_connected = True
# 发布在线状态
publish_message("ONLINE")
except Exception as e:
print(f'MQTT 连接失败: {str(e)}')
oled.fill(0)
oled.text('MQTT Failed', 0, 0)
oled.text(str(e)[:15], 0, 15) # 截断长错误信息适配 OLED
oled.show()
is_mqtt_connected = False
# MQTT 消息回调函数
def mqtt_callback(topic, msg):
print(f'收到 MQTT 消息:主题={topic.decode()} 内容={msg.decode()}')
# 可扩展逻辑:根据 msg 控制硬件(如 LED、电机等)
if topic.decode() == MQTT_TOPIC_SUB:
try:
command = msg.decode()
print(f'执行指令: {command}')
# 示例:if command == "LED_ON": led.value(1)
except Exception as e:
print(f'处理消息失败: {e}')
# 发布 MQTT 消息(自动重连)
def publish_message(message):
global mqtt_client, is_mqtt_connected
if not is_mqtt_connected:
print('MQTT 未连接,尝试重连...')
connect_mqtt() # 触发重连逻辑
if not is_mqtt_connected:
return False
try:
mqtt_client.publish(MQTT_TOPIC_PUB, message)
print(f'已发布: {message} 到主题 {MQTT_TOPIC_PUB}')
return True
except Exception as e:
print(f'发布消息失败: {e}')
is_mqtt_connected = False
return False
# 获取距离状态描述
def get_distance_status(distance):
if distance < 0:
return "ERROR"
elif distance < NEAR_THRESHOLD:
return "NEAR"
elif distance > FAR_THRESHOLD:
return "FAR"
else:
return "OK"
# 更新 OLED 显示
def update_oled(distance, status, mqtt_status):
oled.fill(0) # 清屏
oled.text('HC-SR04 Distance', 0, 0)
oled.hline(0, 10, oled_width, 1) # 绘制水平线
# 显示距离值
oled.text(f'Dist: {distance:.2f} CM', 0, 25)
# 显示距离状态(带背景高亮)
oled.text('Status:', 0, 40)
if status == "NEAR":
oled.fill_rect(60, 40, 50, 10, 1)
oled.text("NEAR", 62, 40, 0)
elif status == "FAR":
oled.fill_rect(60, 40, 40, 10, 1)
oled.text("FAR", 62, 40, 0)
elif status == "ERROR":
oled.fill_rect(60, 40, 60, 10, 1)
oled.text("ERROR", 62, 40, 0)
else:
oled.text("OK", 60, 40)
# 显示 MQTT 状态
oled.hline(0, 55, oled_width, 1)
oled.text(f'MQTT: {mqtt_status}', 0, 57)
oled.show()
# 检查 MQTT 连接(含 WiFi 重连逻辑)
def check_mqtt_connection():
global is_mqtt_connected
try:
if not is_mqtt_connected:
# 先检查 WiFi 状态,若断开则重连
if wlan and not wlan.isconnected():
connect_wifi()
# WiFi 恢复后尝试重连 MQTT
if wlan and wlan.isconnected():
connect_mqtt()
else:
# 发送 PING 包维持连接
mqtt_client.ping()
except Exception as e:
print(f'MQTT 连接检查失败: {e}')
is_mqtt_connected = False
# 主循环(持续测距 + 状态上报)
def loop():
check_interval = 10 # 每 10 次循环检查一次 MQTT
counter = 0
while True:
try:
# 读取超声波距离
distance = hcsr04.distance_cm()
# 判断距离状态
status = get_distance_status(distance)
# 控制台打印信息
mqtt_status = "CONNECTED" if is_mqtt_connected else "DISCONNECTED"
print(f"Distance: {distance:.2f} CM, Status: {status}, MQTT: {mqtt_status}")
# 更新 OLED 显示
update_oled(distance, status, mqtt_status)
# 发布数据到 MQTT(JSON 格式)
if is_mqtt_connected:
message = f'{{"distance":{distance:.2f},"status":"{status}"}}'
publish_message(message)
# 定期检查 MQTT 连接
counter += 1
if counter >= check_interval:
check_mqtt_connection()
counter = 0
# 检查 MQTT 待处理消息
if is_mqtt_connected:
mqtt_client.check_msg()
except Exception as e:
print(f"传感器读取失败: {e}")
update_oled(-1, "ERROR", "ERROR")
if is_mqtt_connected:
publish_message('{"error":"sensor_failed"}')
time.sleep(0.5) # 控制测距频率
# 程序入口
if __name__ == "__main__":
setup() # 初始化硬件和网络
loop() # 进入主循环