from machine import UART, Pin, SoftI2C, ADC
import network
import time
from umqtt.simple import MQTTClient
from ssd1306 import SSD1306_I2C
# ------------------ 配置参数 ------------------ #
# WiFi 配置
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# 巴法云 MQTT 配置
BAFA_CLIENT_ID = "bd6ff39ba76075f7d2dfbc26e14658c7"
BAFA_TOPIC = "MQ2" # 主题改为 MQ2
MQTT_BROKER = "bemfa.com"
MQTT_PORT = 9501
# OLED 屏幕配置
i2c = SoftI2C(sda=Pin(23), scl=Pin(22))
oled_width = 128
oled_height = 64
oled = SSD1306_I2C(oled_width, oled_height, i2c)
# 声音传感器 ADC 配置:声音传感器接 ADC8(GPIO36)
adc = ADC(Pin(34))
adc.atten(ADC.ATTN_11DB) # 11dB 衰减,支持 0~3.3V 输入
adc.width(ADC.WIDTH_12BIT) # 12 位分辨率,数值范围 0~4095
# ------------------ 硬件初始化 ------------------ #
uart = UART(1, baudrate=115200, tx=1, rx=3)
# MQTT 客户端对象(全局)
mqtt_client = None
# ------------------ 函数定义 ------------------ #
# 连接 WiFi
def connect_wifi():
uart.write(b"\rConnecting to WiFi...\n")
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
max_wait = 20
while max_wait > 0:
if wlan.isconnected():
break
max_wait -= 1
time.sleep(1)
uart.write(b".")
if wlan.isconnected():
uart.write(b"\rWiFi connected! IP: " + wlan.ifconfig()[0] + b"\n")
return True
else:
uart.write(b"\rWiFi failed!\n")
return False
# 连接 MQTT 服务器
def connect_mqtt():
global mqtt_client
uart.write(b"\rConnecting to MQTT Broker...\n")
mqtt_client = MQTTClient(
client_id=BAFA_CLIENT_ID,
server=MQTT_BROKER,
port=MQTT_PORT,
keepalive=60
)
try:
mqtt_client.connect()
uart.write(b"\rMQTT connected! Ready to send/receive.\n")
# 连接成功后 OLED 显示“在线”
oled.fill(0)
oled.text("MQTT Online", 0, 0)
oled.show()
return True
except Exception as e:
uart.write(b"\rMQTT connect failed: " + str(e).encode() + b"\n")
return False
# MQTT 消息回调(此处仅做简单打印,可根据需求扩展)
def on_mqtt_message(topic, msg):
uart.write(b"\rReceived MQTT message: " + topic + b" -> " + msg + b"\n")
# 主程序逻辑
def main():
# 1. 连接 WiFi
if not connect_wifi():
return
# 2. 连接 MQTT 服务器
if not connect_mqtt():
return
# 3. 订阅主题
mqtt_client.set_callback(on_mqtt_message)
mqtt_client.subscribe(BAFA_TOPIC)
uart.write(b"\rSubscribed to topic: " + BAFA_TOPIC + b"\n")
# 4. 初始化 OLED 显示(先清屏提示)
oled.fill(0)
oled.text("System Ready", 0, 0)
oled.show()
# 5. 主循环
uart.write(b"\rSystem ready! Will show sound ADC data.\n")
while True:
# 读取声音传感器 ADC 值
adc_value = adc.read()
uart.write(b"Sound ADC Value: " + str(adc_value).encode() + b"\n")
# OLED 显示处理
oled.fill(0)
oled.text("Sound ADC: ", 0, 0)
oled.text(str(adc_value), 0, 16)
oled.show()
# 发布声音 ADC 数据到巴法云(按需开启,若只需接收可注释)
mqtt_client.publish(BAFA_TOPIC, str(adc_value))
# 检查 MQTT 消息
if mqtt_client is not None:
mqtt_client.check_msg()
time.sleep(0.5) # 延迟 0.5 秒,可调整采集频率
# ------------------ 启动程序 ------------------ #
if __name__ == "__main__":
main()