import sys
from machine import Pin, PWM
import dht
import ujson
from umqtt.simple import MQTTClient
from _thread import start_new_thread
import ntptime
import utime
print("starting...")
# Global variables
is_mqtt_connected = False
def localtime():
# UTC+8 时区偏移量(单位为秒)
UTC_OFFSET = 8 * 3600
# 获取UTC时间戳
utc_time = utime.time()
# 计算本地时间戳(加上时区偏移量)
local_timestamp = utc_time + UTC_OFFSET
# 转换为本地时间元组
local_time = utime.localtime(local_timestamp)
return local_time
def measure_and_publish(client, sensor, topic):
global is_mqtt_connected
print("Measuring weather conditions... ")
sensor.measure()
# 获取中国时间
china_time = localtime()
current_time = china_time
time_string = "{:04}-{:02}-{:02}_{:02}:{:02}:{:02}".format(current_time[0], current_time[1], current_time[2], current_time[3], current_time[4], current_time[5])
message = ujson.dumps({
"time": time_string,
"temp": sensor.temperature(),
"humidity": sensor.humidity(),
})
try:
client.publish(topic, message)
print("Reporting to MQTT topic {}: {}".format(topic, message))
is_mqtt_connected = True # MQTT 发布成功,设置为连接状态
except OSError as e:
print("Failed to publish message:", e)
is_mqtt_connected = False # MQTT 发布失败,设置为断开状态
try:
client.connect()
except OSError as e:
print("Failed to reconnect to MQTT server:", e)
return message
def breathing_lights():
led2 = PWM(Pin(2))
led2.freq(1000)
while True:
if is_mqtt_connected:
# 呼吸灯效果
for i in range(0, 1024):
led2.duty(i)
utime.sleep_ms(1)
for i in range(1023, -1, -1):
led2.duty(i)
utime.sleep_ms(1)
else:
# 信号不能传输时常亮
led2.duty(1023) # 设置为最大亮度,即常亮状态
utime.sleep(1) # 等待1秒后重新检查
def connect_to_mqtt(client_id, broker, user, password):
global is_mqtt_connected
print("Connecting to MQTT server... ", end="")
client = MQTTClient(client_id, broker, user=user, password=password)
try:
client.connect()
print("Connected!")
is_mqtt_connected = True
except OSError as e:
print("Failed to connect to MQTT server:", e)
is_mqtt_connected = False
sys.exit()
return client
def main(sta_if, mqtt_client_id, mqtt_broker, mqtt_user, mqtt_password, mqtt_topic):
# Synchronize with NTP server
ntptime.settime()
# Create DHT22 sensor object
sensor = dht.DHT22(Pin(15))
# Connect to MQTT broker
client = connect_to_mqtt(mqtt_client_id, mqtt_broker, mqtt_user, mqtt_password)
# Start a new thread for breathing lights
start_new_thread(breathing_lights, ())
# Main loop: measure and publish data
while True:
measure_and_publish(client, sensor, mqtt_topic)
utime.sleep(10)