from machine import Pin, SoftI2C, Timer
import time
import network
try:
from ssd1306 import SSD1306_I2C
except ImportError:
from machine.ssd1306 import SSD1306_I2C
from umqtt.simple import MQTTClient
from hcsr04 import HCSR04
# ------------------- 硬件配置 ------------------- #
# OLED 引脚
SDA_PIN = 21
SCL_PIN = 22
# 超声波传感器引脚
TRIG_PIN = 4
ECHO_PIN = 27
# ------------------- 巴法云 MQTT 配置 ------------------- #
MQTT_BROKER = "bemfa.com"
MQTT_PORT = 9501
MQTT_CLIENT_ID = "8d1004f1c025bd633b94302f5a92e15c" # 替换为你的 ClientID
MQTT_DISTANCE_TOPIC = "sr004" # 上传距离的主题
MQTT_CONTROL_TOPIC = "CMD004" # 订阅命令的主题
MQTT_STATUS_TOPIC = "status" # 在线状态主题
ONLINE_STATUS = "online"
OFFLINE_STATUS = "offline"
# ------------------- 全局变量 ------------------- #
hcsr04 = HCSR04(trigger_pin=TRIG_PIN, echo_pin=ECHO_PIN)
# OLED 初始化
i2c = SoftI2C(sda=Pin(SDA_PIN), scl=Pin(SCL_PIN))
display = SSD1306_I2C(128, 64, i2c)
mqtt_client = None
wifi_connected = False
mqtt_connected = False
subscribed = False # 标记是否已订阅主题
last_upload_time = 0
upload_interval = 5 # 数据上传间隔(秒)
last_online_publish = 0
online_publish_interval = 30 # 在线状态发布间隔(秒)
# ------------------- 显示函数 ------------------- #
def show_welcome():
display.fill(0)
display.text("PRECHIN TECHNOLOGY", 0, 0)
display.text("HC-SR04 to Bemfa", 0, 16)
display.text("Distance Monitor", 0, 32)
display.text("Connecting...", 0, 48)
display.show()
time.sleep(2)
def show_status(distance, wifi_status, mqtt_status, sub_status):
display.fill(0)
# 标题 + 水平线
display.text("HC-SR04 Distance", 0, 0)
display.hline(0, 12, 128, 1)
# 距离显示
dist_text = f"{distance:.2f} CM"
x_pos = (128 - len(dist_text) * 8) // 2
display.text(dist_text, x_pos, 24)
# 连接状态
wifi_text = "WiFi: " + ("Connected" if wifi_status else "Disconnected")
display.text(wifi_text, 0, 40)
mqtt_text = "MQTT: " + ("Connected" if mqtt_status else "Disconnected")
display.text(mqtt_text, 0, 50)
# 订阅状态
sub_text = "Sub: " + ("Online" if sub_status else "Offline")
display.text(sub_text, 0, 60)
display.show()
# ------------------- 网络与 MQTT 连接 ------------------- #
def connect_wifi():
global wifi_connected
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print("Connecting to WiFi...")
wlan.connect("Wokwi-GUEST", "")
timeout = 10
while timeout > 0:
if wlan.isconnected():
print(f"WiFi Connected: {wlan.ifconfig()}")
wifi_connected = True
return True
timeout -= 1
time.sleep(1)
print("WiFi Connection Failed")
wifi_connected = False
return False
else:
print(f"WiFi Already Connected: {wlan.ifconfig()}")
wifi_connected = True
return True
def connect_mqtt():
global mqtt_client, mqtt_connected, subscribed
try:
# 断开旧连接
if mqtt_client:
mqtt_client.disconnect()
# 初始化 MQTT 客户端
mqtt_client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, MQTT_PORT)
mqtt_client.set_callback(mqtt_callback) # 消息回调
mqtt_client.connect()
print("MQTT Broker Connected")
mqtt_connected = True
subscribed = False # 重置订阅标记
# 发布在线状态
publish_online_status(force=True)
# 订阅主题(仅订阅一次)
subscribe_topic()
return True
except Exception as e:
print(f"MQTT Connection Failed: {e}")
mqtt_connected = False
subscribed = False
return False
def subscribe_topic():
global subscribed
if mqtt_connected and not subscribed:
try:
mqtt_client.subscribe(MQTT_CONTROL_TOPIC)
print(f"Subscribed to {MQTT_CONTROL_TOPIC}")
subscribed = True
return True
except Exception as e:
print(f"Subscribe Failed: {e}")
subscribed = False
return False
return subscribed
# ------------------- MQTT 消息与状态发布 ------------------- #
def mqtt_callback(topic, msg):
topic_str = topic.decode('utf-8')
msg_str = msg.decode('utf-8')
print(f"Received: {topic_str} -> {msg_str}")
# 可在此扩展命令处理逻辑(如根据 msg_str 执行操作)
def publish_online_status(force=False):
global last_online_publish
current_time = time.time()
if not force and (current_time - last_online_publish < online_publish_interval):
return
last_online_publish = current_time
if mqtt_connected:
try:
status_topic = f"{MQTT_CLIENT_ID}/{MQTT_STATUS_TOPIC}"
mqtt_client.publish(status_topic, ONLINE_STATUS, retain=True)
print("Online Status Published")
except Exception as e:
print(f"Publish Online Status Failed: {e}")
def publish_offline_status():
if mqtt_connected:
try:
status_topic = f"{MQTT_CLIENT_ID}/{MQTT_STATUS_TOPIC}"
mqtt_client.publish(status_topic, OFFLINE_STATUS, retain=True)
print("Offline Status Published")
except Exception as e:
print(f"Publish Offline Status Failed: {e}")
def publish_distance(distance):
global last_upload_time
current_time = time.time()
if current_time - last_upload_time < upload_interval:
return
last_upload_time = current_time
if mqtt_connected:
try:
mqtt_client.publish(MQTT_DISTANCE_TOPIC, f"{distance:.2f}")
print(f"Distance Published: {distance:.2f} CM")
except Exception as e:
print(f"Publish Distance Failed: {e}")
# ------------------- 主程序 ------------------- #
if __name__ == "__main__":
show_welcome()
# 连接 WiFi
if not connect_wifi():
print("WiFi Failed, Running Offline")
# 连接 MQTT(依赖 WiFi)
if wifi_connected:
connect_mqtt()
# 主循环
while True:
try:
# 1. 读取距离
distance = hcsr04.distance_cm()
print(f"Distance: {distance:.2f} CM")
# 2. 显示状态(含订阅状态)
show_status(distance, wifi_connected, mqtt_connected, subscribed)
# 3. 维护 MQTT 连接
if wifi_connected and not mqtt_connected:
connect_mqtt()
# 4. 处理 MQTT 消息(检查新消息)
if mqtt_connected:
try:
mqtt_client.check_msg()
except Exception as e:
print(f"MQTT Check Msg Failed: {e}")
mqtt_connected = False
subscribed = False
# 5. 发布距离数据
if wifi_connected and mqtt_connected:
publish_distance(distance)
# 6. 定期发布在线状态
publish_online_status()
except Exception as e:
print(f"Error: {e}")
display.fill(0)
display.text("ERROR OCCURRED", 0, 24)
display.text(f"{str(e)}", 0, 40)
display.show()
time.sleep(2)
time.sleep(0.5)