import machine
import time
import network
from umqtt.simple import MQTTClient
import ssd1306
# ====================== 巴法云配置 ======================
SECRET_KEY = "03de05b5472ec6bc0d993ae846f83e4d" # 你的巴法云秘钥
MQTT_SERVER = "mqtt.bemfa.com" # 巴法云 MQTT 服务器
MQTT_PORT = 9501 # 端口号
TOPIC_LED = "ledstatus" # LED状态主题名
# ====================== WiFi 配置 ======================
WIFI_SSID = "Wokwi-GUEST" # WiFi 名称
WIFI_PASSWORD = "" # WiFi 密码
# ====================== OLED 配置 ======================
OLED_WIDTH = 128
OLED_HEIGHT = 64
OLED_ADDR = 0x3C # 明确指定OLED的I2C地址
# 初始化串口
uart1 = machine.UART(1, tx=1, rx=3, baudrate=115200)
# 初始化LED引脚
led = machine.Pin(2, machine.Pin.OUT)
# 初始化I2C和OLED
# 调整I2C引脚,ESP32在Wokwi中常用的I2C引脚
i2c = machine.I2C(0, scl=machine.Pin(18), sda=machine.Pin(19), freq=400000)
# 扫描I2C设备,帮助调试
def scan_i2c():
devices = i2c.scan()
if len(devices) == 0:
print("没有找到I2C设备")
return False
else:
print(f"找到 {len(devices)} 个I2C设备,地址为: {[hex(d) for d in devices]}")
return True
# 更新OLED显示
def update_oled(status):
try:
oled.fill(0) # 清屏
oled.text("LED Control", 20, 10)
oled.text("Status: " + status, 20, 30)
# 添加一些额外的测试信息
oled.text("Time: " + str(time.time()), 10, 50)
oled.show()
print(f"OLED已更新,状态: {status}")
except Exception as e:
print(f"OLED更新失败: {e}")
# 初始化WiFi连接
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('连接到WiFi...')
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
# 等待连接或失败
max_wait = 10
while max_wait > 0:
if wlan.isconnected():
break
max_wait -= 1
print('等待连接...')
time.sleep(1)
if wlan.isconnected():
print('WiFi连接成功')
print('网络配置:', wlan.ifconfig())
return True
else:
print('WiFi连接失败')
return False
# 初始化MQTT客户端
def init_mqtt():
client_id = SECRET_KEY # 使用SECRET_KEY作为客户端ID
client = MQTTClient(client_id, MQTT_SERVER, MQTT_PORT)
try:
client.connect()
print('MQTT连接成功')
return client
except Exception as e:
print(f'MQTT连接失败: {e}')
return None
# 发送LED状态到巴法云
def send_led_status(client, status):
try:
client.publish(TOPIC_LED, status)
print(f'已发送LED状态: {status}')
except Exception as e:
print(f'发送LED状态失败: {e}')
# 显示启动信息
def display_startup_info():
try:
oled.fill(0)
oled.text("System Starting", 10, 20)
oled.text("I2C Scan...", 10, 40)
oled.show()
time.sleep(1)
except Exception as e:
print(f"显示启动信息失败: {e}")
# 主程序
def main():
# 显示启动信息
display_startup_info()
# 扫描I2C设备
i2c_devices_found = scan_i2c()
global oled
try:
# 初始化OLED,明确指定地址
oled = ssd1306.SSD1306_I2C(OLED_WIDTH, OLED_HEIGHT, i2c, addr=OLED_ADDR)
print(f"OLED初始化成功,地址: {hex(OLED_ADDR)}")
except Exception as e:
print(f"OLED初始化失败: {e}")
if i2c_devices_found:
print("提示: I2C设备已找到,但OLED初始化失败,可能地址不正确")
# 尝试其他可能的地址
try:
oled = ssd1306.SSD1306_I2C(OLED_WIDTH, OLED_HEIGHT, i2c, addr=0x3D)
print(f"OLED初始化成功,地址: 0x3D")
except:
print("尝试地址0x3D也失败了")
else:
print("提示: 未找到I2C设备,请检查连接")
# 显示初始信息
update_oled("OFF")
print("串口LED控制程序已启动")
print("发送 '1' 打开LED,发送 '0' 关闭LED")
# 连接WiFi和MQTT
wifi_connected = connect_wifi()
mqtt_client = None
if wifi_connected:
mqtt_client = init_mqtt()
# 初始LED状态
led_status = "OFF"
led.value(0) # 确保LED初始状态为关闭
# 主循环
last_update_time = 0
while True:
current_time = time.time()
# 每5秒更新一次OLED时间显示,确保OLED正常工作
if current_time - last_update_time > 5:
update_oled(led_status)
last_update_time = current_time
# 检查串口是否有数据
if uart1.any():
# 读取串口数据
data = uart1.read().decode('utf-8').strip()
print(f"收到命令: {data}")
# 根据接收到的命令控制LED
if data == '1':
led.value(1) # 打开LED
led_status = "ON"
uart1.write("LED已打开\r\n")
update_oled(led_status) # 更新OLED显示
if mqtt_client:
send_led_status(mqtt_client, led_status) # 发送到巴法云
elif data == '0':
led.value(0) # 关闭LED
led_status = "OFF"
uart1.write("LED已关闭\r\n")
update_oled(led_status) # 更新OLED显示
if mqtt_client:
send_led_status(mqtt_client, led_status) # 发送到巴法云
else:
uart1.write("未知命令,请发送 '1' 或 '0'\r\n")
time.sleep(0.1) # 短暂延时,避免CPU占用过高
# 运行主程序
if __name__ == "__main__":
main()