import machine
from machine import Pin, ADC
import time
import ssd1306
from machine import SoftI2C
from machine import Pin, ADC
from time import sleep
import network
from umqtt.simple import MQTTClient
# 巴法云平台配置
SERVER = "bemfa.com"
PORT = 9501
PRIVATE_KEY = "9940ffb1ef5467aab50734426b9eaef3" # 私钥
TOPIC = "MQ2" # 主题名
# WiFi 配置
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# 传感器配置
gas_pin = 34
gas_ADC = ADC(Pin(gas_pin))
gas_ADC.atten(ADC.ATTN_11DB) # 11dB 衰减,最大输入电压约 3.6V
# WiFi连接函数
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('正在连接WiFi...')
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
while not wlan.isconnected():
pass
print('WiFi连接成功,网络配置:', wlan.ifconfig())
# MQTT回调函数(处理订阅消息)
def sub_callback(topic, msg):
print('收到来自主题 %s 的消息: %s' % (topic, msg))
# 初始化MQTT客户端
def init_mqtt():
client = MQTTClient(PRIVATE_KEY, SERVER, PORT)
client.set_callback(sub_callback)
try:
client.connect()
client.subscribe(TOPIC.encode())
print(f'已连接到MQTT服务器 {SERVER},订阅主题: {TOPIC}')
return client
except Exception as e:
print(f'MQTT连接失败: {e}')
return None
# ====================== 硬件配置 ======================
I2C_SCL = machine.Pin(18) # OLED屏幕SCL引脚
I2C_SDA = machine.Pin(19) # OLED屏幕SDA引脚
OLED_WIDTH = 128 # OLED屏幕宽度
OLED_HEIGHT = 64 # OLED屏幕高度
# 烟雾传感器引脚
SMOKE_PIN = 34 # ADC复用管脚为GP34
# 初始化I2C和OLED显示屏
i2c = SoftI2C(scl=I2C_SCL, sda=I2C_SDA)
oled = ssd1306.SSD1306_I2C(OLED_WIDTH, OLED_HEIGHT, i2c)
# 烟雾警报阈值
SMOKE_THRESHOLD = 2000 # 烟雾浓度阈值,超过此值触发警报
# 在OLED屏幕上显示信息
def display_on_oled(smoke_value):
oled.fill(0) # 清屏
oled.text("MQ2 Smoke Sensor", 0, 0)
oled.text("Value: {}".format(smoke_value), 0, 20)
# 显示烟雾状态
if smoke_value > SMOKE_THRESHOLD:
oled.text("Status: WARNING!", 0, 40)
else:
oled.text("Status: Normal", 0, 40)
oled.show() # 更新显示
# 主程序
def main():
# 初始化烟雾传感器
smoke_adc = ADC(Pin(SMOKE_PIN))
smoke_adc.atten(ADC.ATTN_11DB) # 11dB 衰减, 最大输入电压约3.6v
connect_wifi()
mqtt_client = init_mqtt()
if mqtt_client is None:
print('MQTT连接失败,使用HTTP上传数据')
while True:
gas_value = gas_ADC.read()
print(f'传感器值: {gas_value}')
# 发布数据到MQTT主题
if mqtt_client:
try:
mqtt_client.publish(TOPIC.encode(), str(gas_value).encode())
mqtt_client.check_msg() # 检查订阅消息
except Exception as e:
print(f'MQTT发布失败: {e}')
mqtt_client = init_mqtt() # 尝试重新连接
sleep(2) # 每2秒上传一次数据
try:
# 读取烟雾传感器数据
smoke_value = smoke_adc.read()
# 在控制台显示数据
print("Smoke Value:", smoke_value)
# 在OLED屏幕显示数据
display_on_oled(smoke_value)
# 等待2秒
time.sleep(2)
except Exception as e:
print("Error:", e)
time.sleep(1)
if __name__ == "__main__":
main()