from machine import UART, Pin, Timer, SoftI2C
import time
import network
from umqtt.simple import MQTTClient
from ssd1306 import SSD1306_I2C # OLED驱动库
# 网络与巴法云配置
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
BAFAYUN_USER_KEY = "ff052c9e9dc236744787ff0862ab8902" # 巴法云私钥
BAFAYUN_TOPIC = "light002" # 主题名称
BAFAYUN_SERVER = "bemfa.com" # 服务器地址
BAFAYUN_PORT = 9501 # 端口号
# 初始化硬件
uart = UART(1, baudrate=115200, tx=1, rx=3) # 串口1
led = Pin(15, Pin.OUT) # LED控制引脚
# OLED初始化
i2c = SoftI2C(sda=Pin(23), scl=Pin(18)) # I2C接口引脚
oled = SSD1306_I2C(128, 64, i2c) # 128x64 OLED显示屏
# 全局变量
mqtt_client = None
last_heartbeat = 0
heartbeat_interval = 10 # 心跳包间隔(秒)
led_status = 0 # LED当前状态(0=关,1=开)
wifi_connected = False # WiFi连接状态
mqtt_connected = False # MQTT连接状态
remote_control_enabled = False # 远程控制开关
# OLED显示函数 - 刷新LED状态
def update_oled():
oled.fill(0) # 清屏
# 显示标题
oled.text("Bemfa LED Control", 0, 0)
# 显示WiFi状态
wifi_text = "WiFi: Connected" if wifi_connected else "WiFi: Disconnected"
oled.text(wifi_text, 0, 16)
# 显示MQTT状态
mqtt_text = "MQTT: Connected" if mqtt_connected else "MQTT: Disconnected"
oled.text(mqtt_text, 0, 28)
# 显示LED状态(大字体)
led_text = "LED: ON" if led_status else "LED: OFF"
oled.text(led_text, 0, 44)
# 显示控制模式
mode_text = "Mode: Local" if not remote_control_enabled else "Mode: Remote"
oled.text(mode_text, 0, 56)
oled.show() # 更新显示
# WiFi连接函数
def connect_wifi():
global wifi_connected
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
# 更新OLED显示连接中
oled.fill(0)
oled.text("Connecting WiFi...", 0, 0)
oled.show()
if not wlan.isconnected():
print("连接WiFi...")
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
max_wait = 15
while max_wait > 0:
if wlan.isconnected():
break
max_wait -= 1
time.sleep(1)
# 更新连接进度
oled.fill(0)
oled.text(f"WiFi Connecting...", 0, 0)
oled.text(f"Wait: {max_wait}s", 0, 16)
oled.show()
if wlan.isconnected():
print(f"WiFi连接成功: {wlan.ifconfig()}")
wifi_connected = True
# 更新OLED显示IP信息
oled.fill(0)
oled.text("WiFi Connected", 0, 0)
oled.text(f"IP: {wlan.ifconfig()[0]}", 0, 16)
oled.show()
time.sleep(1)
return True
else:
print("WiFi连接失败")
wifi_connected = False
# 更新OLED显示失败信息
oled.fill(0)
oled.text("WiFi Failed", 0, 0)
oled.show()
time.sleep(1)
return False
# 初始化MQTT客户端
def init_mqtt():
global mqtt_client, mqtt_connected
try:
# 更新OLED显示连接中
oled.fill(0)
oled.text("Connecting MQTT...", 0, 0)
oled.show()
client_id = BAFAYUN_USER_KEY
mqtt_client = MQTTClient(client_id, BAFAYUN_SERVER, BAFAYUN_PORT)
mqtt_client.set_callback(mqtt_callback)
mqtt_client.connect()
mqtt_client.subscribe(BAFAYUN_TOPIC)
print("MQTT连接成功,已订阅主题")
mqtt_connected = True
# 更新OLED显示连接成功
oled.fill(0)
oled.text("MQTT Connected", 0, 0)
oled.text(f"Topic: {BAFAYUN_TOPIC}", 0, 16)
oled.show()
time.sleep(1)
return True
except Exception as e:
print(f"MQTT连接失败: {e}")
mqtt_client = None
mqtt_connected = False
# 更新OLED显示失败信息
oled.fill(0)
oled.text("MQTT Failed", 0, 0)
oled.show()
time.sleep(1)
return False
# MQTT消息回调
def mqtt_callback(topic, msg):
global remote_control_enabled
print(f"收到消息: {topic.decode()} - {msg.decode()}")
# 更新OLED显示收到消息
oled.fill(0)
oled.text("Message Received", 0, 0)
oled.text(f"Topic: {topic.decode()}", 0, 16)
oled.text(f"Msg: {msg.decode()}", 0, 28)
oled.show()
time.sleep(1) # 显示1秒
# 仅当远程控制启用时才响应MQTT消息
if remote_control_enabled:
if msg.decode() == "on":
if led_status != 1: # 仅在状态不同时改变
led.value(1)
publish_led_status(1)
elif msg.decode() == "off":
if led_status != 0: # 仅在状态不同时改变
led.value(0)
publish_led_status(0)
elif msg.decode() == "toggle": # 新增切换命令
new_status = 0 if led_status else 1
led.value(new_status)
publish_led_status(new_status)
# 发布LED状态到巴法云
def publish_led_status(status):
global led_status
led_status = status
# 更新OLED显示LED状态变化
oled.fill(0)
oled.text("Publishing Status...", 0, 0)
oled.text(f"LED: {'ON' if status else 'OFF'}", 0, 16)
oled.show()
if not mqtt_client:
return False
try:
state_str = "on" if status else "off"
mqtt_client.publish(BAFAYUN_TOPIC, state_str)
print(f"已发布状态: {state_str}")
return True
except Exception as e:
print(f"发布失败: {e}")
return False
# 发布状态信息
def publish_status(status):
if not mqtt_client:
return False
try:
# 使用主题本身作为状态发布,避免使用额外的/status主题
mqtt_client.publish(BAFAYUN_TOPIC, status)
print(f"已发布状态: {status}")
return True
except Exception as e:
print(f"发布状态失败: {e}")
return False
# 检查MQTT连接并发送心跳
def check_mqtt_connection():
global mqtt_client, last_heartbeat, mqtt_connected
if not mqtt_client:
return init_mqtt()
current_time = time.time()
if current_time - last_heartbeat >= heartbeat_interval:
try:
mqtt_client.ping() # 发送心跳包
last_heartbeat = current_time
mqtt_connected = True
return True
except:
print("心跳包发送失败,尝试重连...")
mqtt_client = None
mqtt_connected = False
return init_mqtt()
return True
# 程序初始化
oled.fill(0)
oled.text("System Initializing...", 0, 0)
oled.show()
time.sleep(1)
uart.write("\r请输入1或0控制LED(1开/0关)\n")
uart.write("\r输入r切换远程控制模式(默认本地模式)\n")
if connect_wifi():
init_mqtt()
else:
print("未连接WiFi,仅运行本地功能")
# 初始化发布当前状态
if mqtt_client:
publish_led_status(led.value())
# 显示初始状态
update_oled()
# 主循环
try:
while True:
uart.write("\r") # 光标归位
# 处理串口输入
if uart.any():
data = uart.read()
print(f"收到输入: {data}")
if data == b'1':
led.value(1)
uart.write("\rinput 1, LED is ON.\n")
publish_led_status(1)
elif data == b'0':
led.value(0)
uart.write("\rinput 0, LED is OFF.\n")
publish_led_status(0)
elif data == b'r' or data == b'R': # 切换远程控制模式
remote_control_enabled = not remote_control_enabled
mode_text = "远程控制已启用" if remote_control_enabled else "远程控制已禁用"
uart.write(f"\r{mode_text}\n")
# 更新OLED显示模式变化
oled.fill(0)
oled.text("Mode Changed", 0, 0)
oled.text(mode_text, 0, 16)
oled.show()
time.sleep(1)
else:
uart.write("\rinvalid input! Please enter 1, 0 or r.\n")
# 维护MQTT连接状态
if mqtt_client:
try:
mqtt_client.check_msg() # 检查消息
check_mqtt_connection() # 发送心跳
except Exception as e:
print(f"MQTT异常: {e}")
# 增加更详细的错误处理
print("尝试重新连接MQTT...")
mqtt_client = None
mqtt_connected = False
# 增加重连前的短暂延迟
time.sleep(2)
init_mqtt()
# 每秒更新一次OLED显示
update_oled()
time.sleep(0.1)
except KeyboardInterrupt:
print("程序被用户中断")
finally:
# 确保程序退出时正确断开连接
if mqtt_client:
try:
# 使用主主题发布离线状态
publish_status("offline")
# 增加断开前的短暂延迟,确保消息发送成功
time.sleep(0.5)
mqtt_client.disconnect()
print("MQTT连接已断开")
except Exception as e:
print(f"断开MQTT连接时出错: {e}")
print("程序已安全退出")