# ======================================== 导入相关模块 =========================================
import uasyncio as asyncio
import dht
from machine import Pin, ADC, I2C, Pin
import time
from time import sleep
import sys
from ssd1306 import SSD1306_I2C
import os
# ======================================== 全局变量 ============================================
# OLED屏幕地址
OLED_ADDRESS = 0
# 传感器数据
temp_data = 0.0
hum_data = 0.0
smoke_status = "Normal" # 初始状态为正常
oled = None
# ======================================== 功能函数 ============================================
async def read_dht22():
"""每1秒读取温湿度"""
global temp,hum
while True:
try:
#读取温度湿度
dht_pin.measure()
temp = dht_pin.temperature()
hum = dht_pin.humidity()
print("Temperature: %.1f C Humidity: %.1f %%" % (temp, hum))
except Exception as e:
print(f"DHT22读取失败: {e}")
await asyncio.sleep(1) # 非阻塞等待1秒
async def read_mq2():
"""每0.5秒读取MQ-2 DO状态"""
global smoke_status
while True:
#读取comp状态
comp_state = comp.value()
# 更新烟雾状态
if comp_state == 1:
smoke_status = "Normal" # 正常状态
if comp_state == 0:
smoke_status = "Excess" # 烟雾过多
print(f"Comp引脚状态: {comp_state} - Smoke: {smoke_status}")
await asyncio.sleep(0.5) # 非阻塞等待0.5秒
def update_oled_display():
"""更新OLED显示内容"""
global oled, temp_data, hum_data, smoke_status
if oled is None:
return
try:
# 清屏
oled.fill(0)
# 显示标题
oled.text("Sensor Monitor", 15, 0)
oled.rect(0, 10, 128, 1, 1) # 分隔线
# 显示温度
temp_line = f"Temp: {temp:.1f} C"
oled.text(temp_line, 5, 20)
# 显示湿度
hum_line = f"Humi: {hum:.1f} % RH"
oled.text(hum_line, 5, 35)
# 显示烟雾状态
smoke_line = f"Smoke: {smoke_status}"
oled.text(smoke_line, 5, 50)
# 更新显示
oled.show()
except Exception as e:
print(f"OLED更新失败: {e}")
async def update_display():
"""每1秒更新OLED显示"""
while True:
update_oled_display() # 调用OLED显示函数
await asyncio.sleep(0.5) # 每1秒更新一次
# ======================================== 自定义类 ============================================
# ======================================== 初始化配置 ===========================================
# 延时等待设备初始化
time.sleep(3)
#=======传感器配置==========
# 打印调试信息
print('FreakStudio : Using OneWire to read DHT22 sensor')
# 假设数据脚接 GPIO13
dht_pin = dht.DHT22(Pin(13))
# 打印调试消息
print("Measuring Gas Concentration with MQ Series Gas Sensor Modules")
# Comparator output (GPIO15, optional)
comp = Pin(15, Pin.IN)
#=======OLED配置=========
# 创建硬件I2C的实例,使用I2C1外设,时钟频率为400KHz,SDA引脚为6,SCL引脚为7
i2c = I2C(id=0, sda=Pin(4), scl=Pin(5), freq=400000)
# 输出当前目录下所有文件
print('START LIST ALL FILES')
for file in os.listdir():
print('file name:',file)
# 开始扫描I2C总线上的设备,返回从机地址的列表
devices_list = i2c.scan()
print('START I2C SCANNER')
# 若devices_list为空,则没有设备连接到I2C总线上
if len(devices_list) == 0:
print("No i2c device !")
# 若非空,则打印从机设备地址
else:
print('i2c devices found:', len(devices_list))
# 遍历从机设备地址列表
for device in devices_list:
print("I2C hexadecimal address: ", hex(device))
if device == 0x3c or device == 0x3d:
OLED_ADDRESS = device
# 创建SSD1306 OLED屏幕的实例,宽度为128像素,高度为64像素,不使用外部电源
oled = SSD1306_I2C(i2c, OLED_ADDRESS, 128, 64,False)
# 打印提示信息
print('OLED init success')
# 首先清除屏幕
oled.fill(0)
oled.show()
# (0,0)原点位置为屏幕左上角,右边为x轴正方向,下边为y轴正方向
# 绘制矩形外框
oled.rect(0, 0, 64, 32, 1)
# 显示文本
oled.text('Freak', 10, 5)
oled.text('Studio', 10, 15)
# 显示图像
oled.show()
# ======================================== 主程序 ============================================
async def main():
"""主任务:并行运行所有任务"""
print("启动非阻塞传感器采集系统")
# 收集所有任务
tasks = [
read_dht22(),
read_mq2(),
]
# 如果OLED初始化成功,添加显示任务
if oled: # 检查oled是否初始化成功
tasks.append(update_display()) # 添加显示任务
print("OLED显示已启用")
else:
print("OLED显示未启用,仅输出到控制台")
# 并行运行所有任务
await asyncio.gather(*tasks)
# 启动程序
try:
asyncio.run(main())
except KeyboardInterrupt:
print("程序结束")