import dht
from machine import Pin, I2C, ADC
import machine
import time
from ssd1306 import SSD1306_I2C
import network
from umqtt.simple import MQTTClient
import ubinascii
import neopixel
import time
# 传感器引脚配置
DHT_PIN = 14 # DHT22温湿度传感器
TRIG_PIN = 15 # HC-SR04触发引脚
ECHO_PIN = 4 # HC-SR04回声引脚
IR_RECEIVE_PIN = 5 # 红外接收引脚
IR_SEND_PIN = 16 # 红外发送引脚
MQ2_PIN = 36 # MQ2气体传感器(ESP32 ADC引脚)
# 如果是ESP8266,MQ2_PIN = 0(GPIO0)
# OLED配置
OLED_WIDTH = 128
OLED_HEIGHT = 64
I2C_SCL_PIN = 22
I2C_SDA_PIN = 21
# 网络配置
WIFI_SSID = 'Wokwi-GUEST'
WIFI_PASSWORD = ''
# 巴法云配置
MQTT_SERVER = 'bemfa.com'
MQTT_PORT = 1883
MQTT_USER = '3ebe0dd53c374379b354b58b854aded4'
MQTT_TOPIC_TEMP = '111' # 温度主题
MQTT_TOPIC_HUMI = '222' # 湿度主题
MQTT_TOPIC_DIST = '333' # 距离主题
MQTT_TOPIC_GAS = '444' # 气体浓度主题
MQTT_TOPIC_IR = '555' # 红外信号主题
# 初始化I2C和OLED
i2c = I2C(scl=Pin(I2C_SCL_PIN), sda=Pin(I2C_SDA_PIN), freq=400000)
oled = SSD1306_I2C(OLED_WIDTH, OLED_HEIGHT, i2c)
# 初始化传感器引脚
dht_sensor = dht.DHT22(Pin(DHT_PIN))
trig = Pin(TRIG_PIN, Pin.OUT)
echo = Pin(ECHO_PIN, Pin.IN)
ir_receive = Pin(IR_RECEIVE_PIN, Pin.IN)
ir_send = Pin(IR_SEND_PIN, Pin.OUT)
mq2_adc = ADC(Pin(MQ2_PIN))
mq2_adc.atten(ADC.ATTN_11DB) # 配置ADC衰减,测量范围0-3.3V
# 红外编码映射表(示例,可根据实际遥控器修改)
ir_code_map = {
0x20DF10EF: "POWER",
0x20DF30CF: "VOL+",
0x20DF708F: "VOL-",
0x20DF906F: "CHANNEL+",
0x20DFB04F: "CHANNEL-",
0x20DF00FF: "MUTE"
}
# 生成唯一客户端ID
def generate_client_id():
device_id = ubinascii.hexlify(machine.unique_id()).decode()
timestamp = int(time.time())
return f"{MQTT_USER}.multi_sensor_{device_id}_{timestamp}"
# WiFi连接函数
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if wlan.isconnected():
ip = wlan.ifconfig()[0]
print(f"已连接WiFi: IP={ip}")
return wlan
print(f"连接WiFi: {WIFI_SSID}...", end='')
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
timeout = 20
while not wlan.isconnected() and timeout > 0:
time.sleep(0.5)
print('.', end='')
timeout -= 1
if wlan.isconnected():
ip = wlan.ifconfig()[0]
print(f"\nWiFi连接成功!IP: {ip}")
return wlan
else:
print("\nWiFi连接失败,5秒后重试...")
time.sleep(5)
return None
# MQTT连接函数
def connect_mqtt():
client_id = generate_client_id()
print(f"连接MQTT: {MQTT_SERVER}:{MQTT_PORT}, 客户端ID: {client_id}")
client = MQTTClient(
client_id=client_id,
server=MQTT_SERVER,
port=MQTT_PORT,
user=MQTT_USER,
password='',
keepalive=60,
ssl=False
)
try:
client.set_last_will(MQTT_TOPIC_TEMP, "offline")
client.connect()
print("MQTT连接成功!正在订阅主题...")
client.subscribe(MQTT_TOPIC_TEMP)
client.subscribe(MQTT_TOPIC_HUMI)
client.subscribe(MQTT_TOPIC_DIST)
client.subscribe(MQTT_TOPIC_GAS)
client.subscribe(MQTT_TOPIC_IR)
print(f"已订阅所有主题")
return client
except Exception as e:
print(f"MQTT连接异常: {str(e)}")
return None
# 读取DHT22温湿度数据
def read_dht_data():
try:
dht_sensor.measure()
temp = dht_sensor.temperature()
humi = dht_sensor.humidity()
return temp, humi
except Exception as e:
print(f"读取温湿度异常: {str(e)}")
return None, None
# 读取HC-SR04距离数据
def read_distance():
try:
# 触发HC-SR04
trig.value(0)
time.sleep_us(2)
trig.value(1)
time.sleep_us(10)
trig.value(0)
# 测量回声时间
while echo.value() == 0:
signal_off = time.ticks_us()
while echo.value() == 1:
signal_on = time.ticks_us()
# 计算距离(单位:cm)
time_passed = signal_on - signal_off
distance = (time_passed * 0.0343) / 2
return round(distance, 1)
except Exception as e:
print(f"读取距离异常: {str(e)}")
return None
# 读取MQ2气体浓度
def read_mq2_gas():
try:
# 读取ADC值
adc_value = mq2_adc.read()
# 将ADC值转换为电压值(0-3.3V)
voltage = adc_value * (3.3 / 4095)
# 简单计算气体浓度(实际应用中需要校准)
# 浓度值与电压值成反比
gas_concentration = 100 - (voltage / 3.3 * 100)
return round(gas_concentration, 1)
except Exception as e:
print(f"读取气体浓度异常: {str(e)}")
return None
# 读取红外接收数据
def read_ir_receive():
try:
# 简单示例:检测红外信号电平变化
# 实际应用中需要实现完整的红外解码
if ir_receive.value() == 0:
# 检测到低电平(红外信号)
# 这里可以添加完整的红外解码逻辑
# 示例:假设检测到一个已知编码
return "IR DETECTED"
return None
except Exception as e:
print(f"读取红外信号异常: {str(e)}")
return None
# 在OLED上显示所有传感器数据
def display_all_data(temp, humi, distance, gas, ir_signal):
oled.fill(0)
# 显示WiFi和MQTT状态(需要在主循环中更新)
oled.text("WiFi: --", 0, 0)
oled.text("MQTT: --", 0, 10)
# 显示温湿度
if temp is not None and humi is not None:
oled.text(f"Temp: {temp:.1f}C", 0, 20)
oled.text(f"Humi: {humi:.1f}%", 0, 30)
else:
oled.text("Temp/Humi: ERR", 0, 20)
# 显示距离
if distance is not None:
oled.text(f"Dist: {distance}cm", 0, 40)
else:
oled.text("Dist: ERR", 0, 40)
# 显示气体浓度
if gas is not None:
oled.text(f"Gas: {gas:.1f}%", 0, 50)
else:
oled.text("Gas: ERR", 0, 50)
# 显示红外信号
if ir_signal:
oled.text("IR: Detected", 64, 20)
else:
oled.text("IR: --", 64, 20)
oled.show()
# 发送数据到巴法云
def send_data_to_bemfa(client, temp, humi, distance, gas, ir_signal):
if not client:
return
try:
if temp is not None:
client.publish(MQTT_TOPIC_TEMP, f"{temp}")
if humi is not None:
client.publish(MQTT_TOPIC_HUMI, f"{humi}")
if distance is not None:
client.publish(MQTT_TOPIC_DIST, f"{distance}")
if gas is not None:
client.publish(MQTT_TOPIC_GAS, f"{gas}")
if ir_signal:
client.publish(MQTT_TOPIC_IR, "1") # 简单发送1表示检测到红外信号
except Exception as e:
print(f"发送数据到巴法云异常: {str(e)}")
# 主函数
def main():
print("=== 多传感器监控系统启动 ===")
wifi_connected = False
mqtt_connected = False
last_send_time = 0
send_interval = 10 # 数据发送间隔(秒)
mqtt_client = None
try:
while True:
# 维护WiFi连接
if not wifi_connected:
wifi = connect_wifi()
wifi_connected = True if wifi else False
# 维护MQTT连接
if wifi_connected and not mqtt_connected:
mqtt_client = connect_mqtt()
mqtt_connected = True if mqtt_client else False
# 读取传感器数据
temp, humi = read_dht_data()
distance = read_distance()
gas = read_mq2_gas()
ir_signal = read_ir_receive()
# 定时发送数据到巴法云
current_time = time.time()
if mqtt_connected and (current_time - last_send_time) >= send_interval:
send_data_to_bemfa(mqtt_client, temp, humi, distance, gas, ir_signal)
last_send_time = current_time
# 显示所有数据
display_all_data(temp, humi, distance, gas, ir_signal)
time.sleep(1) # 主循环延时
except KeyboardInterrupt:
print("\n=== 程序停止 ===")
except Exception as e:
print(f"系统错误: {str(e)},5秒后重启...")
time.sleep(5)
machine.reset()
if __name__ == "__main__":
main()