import network
import time
from machine import Pin, ADC, PWM
import dht
import ujson
from umqtt.simple import MQTTClient
# MQTT Server Parameters
MQTT_CLIENT_ID = "ow29hvgkjoar2wrokesl"
MQTT_BROKER = "thingsboard.cloud"
MQTT_PORT = 8883
MQTT_USER = "0pnfhl8zl2ayrmuh9suh"
MQTT_PASSWORD = "8wcvywyu0etxfb16rsnp"
MQTT_SUB_TOPIC = "v1/devices/me/rpc/request/+"
MQTT_PUB_TOPIC = "v1/devices/me/telemetry"
# 传感器阈值设置(使用百分比)
MOISTURE_THRESHOLD = 30 # 土壤湿度阈值(百分比)
LDR_THRESHOLD = 1000 # 光线传感器阈值(原始ADC值)
# 初始化硬件
# 土壤湿度传感器
soil_moisture = ADC(Pin(34))
soil_moisture.atten(ADC.ATTN_11DB) # 0-3.3V
soil_moisture.width(ADC.WIDTH_10BIT) # 0-1023
# 舵机 (浇水控制)
servo_pin = Pin(18)
servo = PWM(servo_pin, freq=50)
# 温湿度传感器
dht_sensor = dht.DHT22(Pin(15))
# 光线传感器
ldr = ADC(Pin(36))
ldr.atten(ADC.ATTN_11DB)
# LED指示灯
led = Pin(2, Pin.OUT)
def set_servo_angle(angle):
"""控制舵机角度(0-180度)"""
duty = int(40 + (angle / 180) * 115)
servo.duty(duty)
time.sleep(0.5)
def read_soil_moisture():
"""读取土壤湿度并转换为百分比"""
raw_value = soil_moisture.read()
# ADC值越高越干燥,所以反转百分比 (0=最湿,100=最干)
percentage = 100 - int((raw_value / 1023) * 100)
# 限制在0-100范围内
return max(0, min(100, percentage)), raw_value
def connect_wifi():
"""连接WiFi网络"""
print("Connecting to WiFi", end="")
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('Wokwi-GUEST', '')
while not sta_if.isconnected():
print(".", end="")
time.sleep(0.1)
print(" Connected!")
def connect_mqtt():
"""连接MQTT服务器"""
print("Connecting to MQTT server... ", end="")
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER,
user=MQTT_USER, password=MQTT_PASSWORD)
client.connect()
print("Connected!")
return client
def main():
"""主程序"""
connect_wifi()
client = connect_mqtt()
servo_triggered = False
try:
while True:
print("\nReading sensors...")
# 读取土壤湿度(百分比和原始值)
moisture_percent, moisture_raw = read_soil_moisture()
# 控制浇水系统
if moisture_percent < MOISTURE_THRESHOLD and not servo_triggered:
print(f"干燥土壤检测 ({moisture_percent}%)! 启动浇水...")
set_servo_angle(90)
servo_triggered = True
elif moisture_percent >= MOISTURE_THRESHOLD and servo_triggered:
print(f"湿度恢复 ({moisture_percent}%). 停止浇水...")
set_servo_angle(0)
servo_triggered = False
# 读取温湿度
dht_sensor.measure()
temp = dht_sensor.temperature()
humidity = dht_sensor.humidity()
# 读取光线强度
light_level = ldr.read()
# 控制LED
if light_level < LDR_THRESHOLD:
led.on()
led_status = "on"
else:
led.off()
led_status = "off"
# 准备MQTT消息
message = ujson.dumps({
"soil_moisture": moisture_percent,
"soil_moisture_raw": moisture_raw,
"servo_status": "active" if servo_triggered else "inactive",
"temperature": temp,
"humidity": humidity,
"light_level": light_level,
"led_status": led_status
})
print(f"上报数据: {message}")
client.publish(MQTT_PUB_TOPIC, message)
time.sleep(5) # 5秒间隔
except KeyboardInterrupt:
print("\n程序停止")
set_servo_angle(0)
led.off()
client.disconnect()
if __name__ == "__main__":
main()