from machine import UART, Pin, I2C
import time
from umqtt.simple import MQTTClient
from ssd1306 import SSD1306_I2C
# ==================== 配置参数 ====================
# WiFi 配置
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# 巴法云 MQTT 配置
MQTT_CLIENT_ID = "8854b385911048edaa12808fb1f199e6"
MQTT_BROKER = "bemfa.com"
MQTT_PORT = 9501
MQTT_TOPIC_LED = "myled" # LED状态主题
MQTT_TOPIC_OLED = "mysta" # OLED状态主题
MQTT_USER = "+8618840167598"
MQTT_PASSWORD = "8854b385911048edaa12808fb1f199e6"
# 硬件配置
LED_PIN = 15
UART_ID = 1
UART_BAUDRATE = 115200
UART_TX = 1
UART_RX = 3
# OLED 屏幕 I2C 配置
I2C_SCL = Pin(18)
I2C_SDA = Pin(23)
i2c = I2C(0, scl=I2C_SCL, sda=I2C_SDA, freq=400000)
oled = SSD1306_I2C(128, 64, i2c)
# 初始化硬件
led = Pin(LED_PIN, Pin.OUT)
uart = UART(UART_ID, baudrate=UART_BAUDRATE, tx=UART_TX, rx=UART_RX)
# ==================== 函数定义 ====================
def connect_wifi():
import network
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('Connecting to network...')
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
while not wlan.isconnected():
time.sleep(1)
print('WiFi connected. Network config:', wlan.ifconfig())
def connect_mqtt():
client = MQTTClient(
client_id=MQTT_CLIENT_ID,
server=MQTT_BROKER,
port=MQTT_PORT,
user=MQTT_USER,
password=MQTT_PASSWORD,
keepalive=30
)
try:
client.connect()
print('MQTT connected to', MQTT_BROKER)
return client
except Exception as e:
print('MQTT connect failed:', e)
return None
def oled_show_status(status):
oled.fill(0)
oled.text("LED Status:", 10, 10)
oled.text(f"{status.upper()}", 50, 30)
oled.show()
def main():
connect_wifi()
mqtt_client = connect_mqtt()
if not mqtt_client:
print("MQTT 连接失败,程序退出")
return
# 串口输出提示信息
uart.write("请输入1或者0(1对应灯亮,0对应灯灭)\n".encode())
while True:
if uart.any():
data = uart.readline().decode().strip()
print(f"串口接收: '{data}'")
if data == "1":
led.value(1)
status = "on"
elif data == "0":
led.value(0)
status = "off"
else:
uart.write("无效输入!请输入1或者0(1对应灯亮,0对应灯灭)\n".encode())
continue
# OLED 显示状态
oled_show_status(status)
print(f"OLED 更新: {status.upper()}")
# MQTT 分别上报LED和OLED状态
mqtt_client.publish(MQTT_TOPIC_LED, status)
mqtt_client.publish(MQTT_TOPIC_OLED, status)
print(f"MQTT 已发布: {MQTT_TOPIC_LED}={status}, {MQTT_TOPIC_OLED}={status}")
# 刷新提示信息
uart.write("请输入1或者0(1对应灯亮,0对应灯灭)\n".encode())
time.sleep(0.1)
# ==================== 程序入口 ====================
if __name__ == "__main__":
main()