import network
import time
from simple import MQTTClient
import machine
from machine import ADC, Pin, PWM
import ujson
# 网络配置(第一个修改的地方)
WIFI_SSID = 'Wokwi-GUEST'
WIFI_PASSWORD = ''
# MQTT配置
BROKER = "broker.hivemq.com"
PORT = 1883
#(第2个修改的地方)
CLIENT_ID = "smart_light_18850659845"
#(第3个修改的地方)
TOPIC_BASE = "greenhouse/18850659845"
TOPIC_LIGHT_DATA = f"{TOPIC_BASE}/sensors/light"
TOPIC_DEVICE_STATUS = f"{TOPIC_BASE}/devices/status"
TOPIC_CONTROL = f"{TOPIC_BASE}/control/#"
# 硬件引脚配置
LIGHT_SENSOR_PIN = 39 # 光照传感器
LED_MAIN_PIN = 2 # 主大棚遮阳板LED
LED_ROOM1_PIN = 26 # 大棚1遮阳板LED
LED_ROOM2_PIN = 27 # 大棚2遮阳板LED
BUZZER_PIN = 15 # 蜂鸣器
BUTTON_PIN = 14 # 按钮
# 光照阈值设置
LIGHT_THRESHOLD_LOW = 1000 # 低光照阈值 - 低于此值自动开灯
LIGHT_THRESHOLD_HIGH = 3000 # 高光照阈值 - 高于此值自动关灯
# 定时控制设置
SCHEDULE_ENABLED = False # 是否启用定时控制
SCHEDULE_ON_HOUR = 18 # 定时开灯时间(小时)
SCHEDULE_OFF_HOUR = 6 # 定时关灯时间(小时)
# 全局变量
client = None
leds = {}
buzzer = None
light_sensor = None
last_button_state = 1
auto_mode = False
last_light_value = 0
last_auto_action_time = 0
auto_action_cooldown = 5000 # 5秒冷却时间,防止频繁切换
last_schedule_check = 0 # 上次定时检查时间
# 报警相关变量
last_alarm_time = 0
alarm_interval = 10000 # 10秒报警间隔
low_light_alarm_enabled = True # 低光照报警开关
def connect_wifi():
"""连接WiFi网络"""
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
print("正在连接WiFi...")
timeout = 20
while not wlan.isconnected() and timeout > 0:
time.sleep(1)
timeout -= 1
print(".", end="")
if wlan.isconnected():
print(f"\nWiFi连接成功: {wlan.ifconfig()}")
return True
else:
print("\nWiFi连接失败")
return False
def init_hardware():
"""初始化硬件设备"""
global leds, buzzer, light_sensor
# 初始化LED
leds = {
'main': Pin(LED_MAIN_PIN, Pin.OUT),
'room1': Pin(LED_ROOM1_PIN, Pin.OUT),
'room2': Pin(LED_ROOM2_PIN, Pin.OUT)
}
# 关闭所有LED
for led in leds.values():
led.off()
# 初始化光照传感器
light_sensor = ADC(Pin(LIGHT_SENSOR_PIN))
light_sensor.atten(ADC.ATTN_11DB) # 设置衰减,支持0-3.3V
# 初始化蜂鸣器
buzzer = PWM(Pin(BUZZER_PIN))
buzzer.duty(0) # 初始关闭
# 初始化按钮
Pin(BUTTON_PIN, Pin.IN, Pin.PULL_UP)
print("硬件初始化完成")
def read_light_sensor():
"""读取光照传感器数据"""
try:
raw_value = light_sensor.read()
# 将ADC值转换为光照强度 (0-4095 -> 0-4095 lux)
light_intensity = raw_value
return light_intensity
except Exception as e:
print(f"光照传感器读取错误: {e}")
return None
def control_led(led_name, state):
"""控制LED状态"""
if led_name in leds:
if state:
leds[led_name].on()
else:
leds[led_name].off()
print(f"LED {led_name}: {'开启' if state else '关闭'}")
return True
return False
def auto_light_control(light_value):
"""自动光照控制逻辑(基于光照强度)"""
global last_auto_action_time, last_alarm_time
if not auto_mode:
return
current_time = time.ticks_ms()
action_taken = False
if light_value < LIGHT_THRESHOLD_LOW:
# 检查是否所有LED都已开启
all_on = all(led.value() for led in leds.values())
# LED控制逻辑(带防抖)
if not all_on and time.ticks_diff(current_time, last_auto_action_time) >= auto_action_cooldown:
# 光照不足,开启所有灯
for led_name in leds:
control_led(led_name, True)
play_notification_sound()
action_taken = True
print(f"光照自动开灯: 当前{light_value} < 阈值{LIGHT_THRESHOLD_LOW}")
last_auto_action_time = current_time
# 持续低光照报警逻辑
if low_light_alarm_enabled and time.ticks_diff(current_time, last_alarm_time) >= alarm_interval:
play_low_light_alarm()
last_alarm_time = current_time
print(f"低光照报警: 当前{light_value} < 阈值{LIGHT_THRESHOLD_LOW}")
elif light_value > LIGHT_THRESHOLD_HIGH:
# 检查是否所有LED都已关闭
all_off = all(not led.value() for led in leds.values())
if not all_off and time.ticks_diff(current_time, last_auto_action_time) >= auto_action_cooldown:
# 光照充足,关闭所有灯
for led_name in leds:
control_led(led_name, False)
action_taken = True
print(f"光照自动关灯: 当前{light_value} > 阈值{LIGHT_THRESHOLD_HIGH}")
last_auto_action_time = current_time
def schedule_light_control():
"""定时光照控制逻辑(基于时间)"""
global last_schedule_check
if not SCHEDULE_ENABLED or not auto_mode:
return
current_time = time.ticks_ms()
# 每分钟检查一次定时任务
if time.ticks_diff(current_time, last_schedule_check) < 60000:
return
last_schedule_check = current_time
# 获取当前时间(这里简化处理,实际应该使用RTC)
# 注意:ESP32需要配置NTP时间同步才能获取准确时间
try:
import ntptime
ntptime.settime()
current_hour = time.localtime()[3]
if current_hour == SCHEDULE_ON_HOUR:
# 定时开灯
all_off = all(not led.value() for led in leds.values())
if all_off:
for led_name in leds:
control_led(led_name, True)
print(f"定时开灯: {current_hour}:00")
elif current_hour == SCHEDULE_OFF_HOUR:
# 定时关灯
all_on = all(led.value() for led in leds.values())
if all_on:
for led_name in leds:
control_led(led_name, False)
print(f"定时关灯: {current_hour}:00")
except:
# NTP同步失败,跳过定时控制
pass
def play_notification_sound():
"""播放通知音"""
try:
buzzer.freq(1000) # 1kHz频率
buzzer.duty(512) # 50%占空比
time.sleep(0.2)
buzzer.duty(0) # 关闭
except Exception as e:
print(f"蜂鸣器错误: {e}")
def play_low_light_alarm():
"""播放低光照报警音"""
try:
# 播放三声短促的报警音
for i in range(3):
buzzer.freq(800) # 800Hz频率
buzzer.duty(256) # 25%占空比
time.sleep(0.1)
buzzer.duty(0)
time.sleep(0.1)
except Exception as e:
print(f"报警蜂鸣器错误: {e}")
def check_button():
"""检查按钮状态"""
global last_button_state, auto_mode
current_state = Pin(BUTTON_PIN, Pin.IN, Pin.PULL_UP).value()
if current_state != last_button_state:
if current_state == 0: # 按下
auto_mode = not auto_mode
print(f"自动模式: {'开启' if auto_mode else '关闭'}")
# 发布状态更新
if client:
status_data = {
'auto_mode': auto_mode,
'timestamp': time.time()
}
client.publish(TOPIC_DEVICE_STATUS, ujson.dumps(status_data))
play_notification_sound()
last_button_state = current_state
time.sleep(0.1) # 消抖
def publish_sensor_data(light_value):
"""发布传感器数据"""
if not client:
return
try:
# 获取当前LED状态
led_status = {}
for name, led in leds.items():
led_status[name] = led.value()
# 构建数据包
sensor_data = {
'light_intensity': light_value,
'threshold_low': LIGHT_THRESHOLD_LOW,
'threshold_high': LIGHT_THRESHOLD_HIGH,
'auto_mode': auto_mode,
'led_status': led_status,
'timestamp': time.time(),
'device_id': CLIENT_ID
}
# 发布数据
client.publish(TOPIC_LIGHT_DATA, ujson.dumps(sensor_data))
print(f"发布数据: 光照={light_value}, 自动模式={auto_mode}")
except Exception as e:
print(f"数据发布错误: {e}")
def on_mqtt_message(topic, msg):
"""处理MQTT消息"""
try:
topic_str = topic.decode()
msg_str = msg.decode()
print(f"收到消息: {topic_str} -> {msg_str}")
# 解析控制命令
if topic_str.startswith(f"{TOPIC_BASE}/control/"):
command_type = topic_str.split('/')[-1]
if command_type == 'led':
# LED控制命令 - 手动控制时暂时切换到手动模式
try:
cmd_data = ujson.loads(msg_str)
led_name = cmd_data.get('led')
state = cmd_data.get('state')
# 临时切换到手动模式以避免自动控制干扰
global auto_mode
original_auto_mode = auto_mode
auto_mode = False
if led_name == 'all':
# 控制所有LED
for name in leds:
control_led(name, state)
else:
# 控制单个LED
control_led(led_name, state)
# 延迟一段时间后恢复原模式
time.sleep(0.5)
auto_mode = original_auto_mode
except Exception as e:
print(f"LED控制命令解析错误: {e}")
elif command_type == 'mode':
# 模式切换命令
global auto_mode
if msg_str == 'auto':
auto_mode = True
elif msg_str == 'manual':
auto_mode = False
print(f"模式切换为: {'自动' if auto_mode else '手动'}")
elif command_type == 'threshold':
# 光照阈值设置命令
try:
threshold_data = ujson.loads(msg_str)
global LIGHT_THRESHOLD_LOW, LIGHT_THRESHOLD_HIGH
LIGHT_THRESHOLD_LOW = threshold_data.get('low', LIGHT_THRESHOLD_LOW)
LIGHT_THRESHOLD_HIGH = threshold_data.get('high', LIGHT_THRESHOLD_HIGH)
print(f"光照阈值更新: 低={LIGHT_THRESHOLD_LOW}, 高={LIGHT_THRESHOLD_HIGH}")
except Exception as e:
print(f"光照阈值设置错误: {e}")
elif command_type == 'schedule':
# 定时设置命令
try:
schedule_data = ujson.loads(msg_str)
global SCHEDULE_ENABLED, SCHEDULE_ON_HOUR, SCHEDULE_OFF_HOUR
SCHEDULE_ENABLED = schedule_data.get('enabled', SCHEDULE_ENABLED)
SCHEDULE_ON_HOUR = schedule_data.get('on_hour', SCHEDULE_ON_HOUR)
SCHEDULE_OFF_HOUR = schedule_data.get('off_hour', SCHEDULE_OFF_HOUR)
print(f"定时设置更新: 启用={SCHEDULE_ENABLED}, 开灯={SCHEDULE_ON_HOUR}:00, 关灯={SCHEDULE_OFF_HOUR}:00")
except Exception as e:
print(f"定时设置错误: {e}")
elif command_type == 'alarm':
# 报警设置命令
try:
alarm_data = ujson.loads(msg_str)
global low_light_alarm_enabled, alarm_interval
low_light_alarm_enabled = alarm_data.get('enabled', low_light_alarm_enabled)
alarm_interval = alarm_data.get('interval', alarm_interval) * 1000 # 转换为毫秒
print(f"报警设置更新: 启用={low_light_alarm_enabled}, 间隔={alarm_interval//1000}秒")
except Exception as e:
print(f"报警设置错误: {e}")
except Exception as e:
print(f"消息处理错误: {e}")
def connect_mqtt():
"""连接MQTT服务器"""
global client
try:
print("正在连接MQTT服务器...")
client = MQTTClient(CLIENT_ID, BROKER, port=PORT)
client.set_callback(on_mqtt_message)
client.connect()
client.subscribe(TOPIC_CONTROL)
print(f"MQTT连接成功: {BROKER}:{PORT}")
return True
except Exception as e:
print(f"MQTT连接失败: {e}")
return False
def main():
"""主程序"""
global last_light_value
try:
print("=== 大棚遮阳板智能调控系统启动 ===")
# 初始化硬件
init_hardware()
# 连接WiFi
if not connect_wifi():
print("WiFi连接失败,程序退出")
return
# 连接MQTT
if not connect_mqtt():
print("MQTT连接失败,程序退出")
return
print("系统初始化完成,开始监控...")
# 主循环
last_publish_time = 0
publish_interval = 2000 # 2秒发布一次数据
while True:
try:
# 检查MQTT消息
client.check_msg()
# 检查按钮
check_button()
# 读取光照传感器
light_value = read_light_sensor()
if light_value is not None:
# 基于光照强度的自动控制
auto_light_control(light_value)
# 基于时间的定时控制
schedule_light_control()
# 定期发布数据
current_time = time.ticks_ms()
if time.ticks_diff(current_time, last_publish_time) >= publish_interval:
publish_sensor_data(light_value)
last_publish_time = current_time
last_light_value = light_value
time.sleep(0.1)
except Exception as e:
print(f"主循环错误: {e}")
# 尝试重新连接MQTT
try:
client.disconnect()
except:
pass
time.sleep(5)
if connect_mqtt():
print("MQTT重连成功")
else:
print("MQTT重连失败")
except KeyboardInterrupt:
print("\n程序被用户中断")
except Exception as e:
print(f"程序异常: {e}")
finally:
# 清理资源
try:
if client:
client.disconnect()
# 关闭所有LED
for led in leds.values():
led.off()
if buzzer:
buzzer.duty(0)
buzzer.deinit()
except:
pass
print("程序结束")
if __name__ == "__main__":
main()