import machine
import time
import ssd1306
from umqtt.simple import MQTTClient
import network
# 全局配置
I2C_SCL = 5 # I2C时钟线连接到GPIO5
I2C_SDA = 4 # I2C数据线连接到GPIO4
OLED_WIDTH = 128 # OLED宽度
OLED_HEIGHT = 64 # OLED高度
# 需要修改的地方
wifiName = "Wokwi-GUEST" # wifi名称,不支持5G wifi
wifiPassword = "" # wifi密码
clientID = "d2bc31d11dbd41a08255a2d5a60a1dee" # Client ID,密钥,巴法云控制台获取
sdcardTopic = "sdcard" # 要连接的主题,巴法MQTT控制台创建
statusTopic = "status" # 在线状态主题
# 默认设置
serverIP = "bemfa.com" # mqtt服务器地址
port = 9501
# 在线状态消息
ONLINE_MSG = "online"
OFFLINE_MSG = "offline"
# MQTT回调函数(处理收到的消息,可按需扩展)
def mqtt_callback(topic, msg):
print("Received message on topic:", topic.decode(), "message:", msg.decode())
# 若需要在OLED显示收到的消息,可在此处调用 display_text 等逻辑
# WIFI连接函数(复用原有逻辑)
def do_connect():
sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected():
print('connecting to network...')
sta_if.active(True)
sta_if.connect(wifiName, wifiPassword)
max_retries = 10
retry_count = 0
while not sta_if.isconnected() and retry_count < max_retries:
time.sleep(1)
retry_count += 1
if not sta_if.isconnected():
print("Failed to connect to WiFi after multiple retries.")
raise OSError("WiFi connection failed")
print('connect WiFi ok')
# 初始化mqtt连接配置(调整订阅主题为 sdcard 相关,保留状态主题)
def connect_and_subscribe():
max_retries = 5
retry_count = 0
while retry_count < max_retries:
try:
# 创建MQTT客户端并设置遗嘱消息
client = MQTTClient(clientID, serverIP, port)
client.set_callback(mqtt_callback) # 设置回调函数
client.set_last_will(statusTopic, OFFLINE_MSG, retain=True)
# 连接MQTT服务器
client.connect()
print("Connected to %s" % serverIP)
# 订阅主题(订阅 sdcard 主题,让服务器知道我们关注该主题)
client.subscribe(sdcardTopic)
print(f"Subscribed to topic: {sdcardTopic}")
# 发布在线状态
client.publish(statusTopic, ONLINE_MSG, retain=True)
print(f"Published ONLINE status to {statusTopic}")
return client
except OSError as e:
print(f"MQTT connection error (attempt {retry_count + 1}): {e}")
retry_count += 1
time.sleep(2)
print("Failed to connect to MQTT broker after multiple retries.")
raise OSError("MQTT connection failed")
def init_display():
"""初始化OLED显示屏"""
i2c = machine.I2C(scl=machine.Pin(I2C_SCL), sda=machine.Pin(I2C_SDA))
display = ssd1306.SSD1306_I2C(OLED_WIDTH, OLED_HEIGHT, i2c)
return display
def display_text(text, display, y_pos=0):
"""在OLED指定位置显示文本,自动清屏后显示"""
display.fill(0) # 清屏
display.text(text, 0, y_pos)
display.show() # 更新显示
def main():
"""主函数,程序入口点"""
print("Connect to sdcard Topic on bemfa.com...")
# 连接WiFi
try:
do_connect()
except OSError as e:
print(f"WiFi连接错误: {e}")
time.sleep(10)
machine.reset()
# 连接MQTT
try:
client = connect_and_subscribe()
except OSError as e:
print(f"MQTT连接错误: {e}")
time.sleep(10)
machine.reset()
# 初始化显示屏
display = init_display()
display.text("Connecting...", 0, 0)
display.show()
time.sleep(2)
last_ping = time.time()
example_msg = "Shell Terminal Info" # 模拟终端提示信息,实际可替换为真实数据
try:
while True:
# 检查MQTT消息(监听 sdcard 主题收到的内容)
client.check_msg()
# 每30秒发送一次PING保持连接
if time.time() - last_ping > 30:
try:
client.ping()
last_ping = time.time()
print("MQTT ping sent")
except:
try:
client = connect_and_subscribe() # 重连
except:
pass
# 模拟获取终端提示信息(这里写死示例,实际需替换为真实数据来源)
terminal_info = example_msg
print(f"Terminal Info: {terminal_info}") # 终端打印
display_text(f"Info: {terminal_info}", display, y_pos=10) # OLED显示
# 发布终端信息到 sdcard 主题
try:
client.publish(sdcardTopic, terminal_info)
print("Data published to sdcard Topic!")
except Exception as e:
print(f"Error publishing data to MQTT: {e}")
# 尝试重连并再次发布
client = connect_and_subscribe()
client.publish(sdcardTopic, terminal_info)
# 等待2秒(可按需调整间隔)
time.sleep(2)
except KeyboardInterrupt:
print("\n程序已停止")
display.fill(0)
display.text("Program Stopped", 0, 0)
display.show()
except Exception as e:
print(f"发生错误: {e}")
display.fill(0)
display.text("System Error", 0, 0)
display.show()
finally:
# 断开MQTT连接,发布离线状态
try:
client.publish(statusTopic, OFFLINE_MSG, retain=True)
print(f"Published OFFLINE status to {statusTopic}")
client.disconnect()
print("Disconnected from MQTT Broker")
except Exception:
pass
# 程序入口
if __name__ == "__main__":
main()