import machine
import dht
import time
import math
# Define the DHT22 sensor pin (GPIO15)
dht22_pin = machine.Pin(15, machine.Pin.IN, machine.Pin.PULL_UP)
# Initialize the DHT22 sensor
dht22 = dht.DHT22(dht22_pin)
# Parameters for running statistics
num_measurements = 10 # Number of measurements for running statistics
temperature_data = []
def running_welford(data):
n = len(data)
if n == 0:
return 0, 0
elif n == 1:
return 0, data[0]
mean_prev = data[0]
M2 = 0
for i in range(1, n):
x = data[i]
delta = x - mean_prev
mean = mean_prev + delta / (i + 1)
delta2 = x - mean
M2 += delta * delta2
mean_prev = mean
variance = M2 / (n - 1)
std_dev = math.sqrt(variance)
return std_dev, mean
while True:
try:
# Read data from the DHT22 sensor
dht22.measure()
# Get the temperature reading
temperature = dht22.temperature()
# Add temperature reading to data list
temperature_data.append(temperature)
# Limit the list to the specified number of measurements
if len(temperature_data) > num_measurements:
temperature_data.pop(0)
# Get the current timestamp
timestamp = time.localtime()
# Calculate running standard deviation and average using Welford's method
std_dev, avg = running_welford(temperature_data)
# Print the data in a single line
print("Timestamp: {:04d}-{:02d}-{:02d} {:02d}:{:02d}:{:02d} | "
"Temperature: {:.2f} °C | Running Std Dev: {:.2f} | Running Avg: {:.2f}".format(
timestamp[0], timestamp[1], timestamp[2], timestamp[3], timestamp[4], timestamp[5],
temperature, std_dev, avg))
except Exception as e:
print("Error:", e)
# Wait for some time before taking the next reading (e.g., 2 seconds)
time.sleep(2)