import machine
import time
from machine import Pin, I2C, ADC, PWM
from umqtt.robust import MQTTClient
import ujson
import network
from dht import DHT22
import ssd1306
import math
from neopixel import NeoPixel
from random import randrange
online = True # Change to True to connect to NETPIE
cmdmode = False # Use push button to toggle this flag
# --- Wi-Fi Configuration ---
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASS = ""
# --- MQTT Configuration ---
MQTT_BROKER = "broker.netpie.io"
MQTT_CLIENT = "8838de57-33a4-4204-9412-10fde142c547"
MQTT_USER = "x5fnPcdW2Cy2do7GtscC1T8Z6KieeP81"
MQTT_PWD = "6dPaJMGmBQPwKCQgVUQRjaJ5t3qtbsFB"
PUBLISH_PERIOD = 10000 # milliseconds
fan = Pin(2, Pin.OUT) # Fan LED
heater = Pin(5, Pin.OUT) # Heater LED
pump = Pin(18, Pin.OUT) # Pump LED
red = Pin(14, Pin.OUT)
green = Pin(12, Pin.OUT)
blue = Pin(13, Pin.OUT)
pixels = NeoPixel(Pin(27), 16)
button = Pin(4, Pin.IN, Pin.PULL_UP)
# Sensor and Display initialization
sensor = DHT22(Pin(15))
i2c = I2C(0, scl=Pin(22), sda=Pin(21))
oled = ssd1306.SSD1306_I2C(128, 64, i2c)
ldr1 = ADC(Pin(32, Pin.IN))
# Global variables
settemp = 20 # Setpoint temperature
temp = 0 # Current temperature
sethumi = 50 # Setpoint humidity
humidity = 0 # Current humidity
setenergy = 25 # Setpoint energy
error = 0
# PID Control Variables
Kp = 1.0 # Proportional gain
Ki = 0.1 # Integral gain
Kd = 0.1 # Derivative gain
# PID state variables
previous_error = 0
integral = 0
# Data for MQTT Publishing
sensor_data = {
"set_temp": 0.0,
"temperature": 0.0,
"set_humi": 0.0,
"humidity": 0.0,
"set_energy": 0.0,
"fan": 0,
"heater": 0,
"pump": 0,
"Kp": 0,
"Ki": 0,
"Kd": 0,
"error": 0,
"fan_speed": 0,
"setpoint": 0
}
last_time = 0
TIMER_INTERVAL = 5000
def christmas():
global last_time
# Read the light level from the LDR sensor
lux = read_ldr()
I = lux_to_irradiance(lux, angle_of_incidence=0)
solar_potential = calculate_solar_energy_potential(I, A, eta, alpha, temp, humidity)
# Check if solar potential is too low to activate lights
if solar_potential < setenergy:
red.value(0)
green.value(0)
blue.value(0)
pixels.fill((0, 0, 0))
pixels.write()
#print("Sun parameter is low, turning off Christmas lights.")
time.sleep(1)
return # Exit the function early if the sun parameter is low
# Check if TIMER_INTERVAL has elapsed
current_time = time.ticks_ms()
if time.ticks_diff(current_time, last_time) >= TIMER_INTERVAL:
# Static light patterns
red.value(0)
green.value(1)
blue.value(1)
time.sleep(1)
red.value(1)
green.value(0)
blue.value(1)
time.sleep(1)
red.value(1)
green.value(1)
blue.value(0)
time.sleep(1)
for i in range(16):
r = randrange(0, 256, 10)
g = randrange(0, 256, 16)
b = randrange(0, 256, 16)
if i < len(pixels):
pixels[i] = (r, g, b) # Set color for each pixel
pixels.write() # Apply the updates to the NeoPixel strip
time.sleep(0.1) # Short delay for animation effect
# Update the last time the Christmas lights effect was run
last_time = current_time
GAMMA = 0.7
RL10 = 50
A = 20 # Area of the solar panel in m²
eta = 0.18 # Panel efficiency (18%)
alpha = -0.005 # Temperature coefficient in % per °C (e.g., -0.5%/°C)
def read_ldr():
analog_value = ldr1.read()
voltage = analog_value * 5 / 4095.0
resistance = 2000 * voltage / (1 - voltage / 5)
lux = pow(RL10 * 1e3 * pow(10, GAMMA) / resistance, (1 / GAMMA))
return lux
def calculate_solar_energy_potential(I, A, eta, alpha, T, RH, k=0.2, T_ref=25):
"""
Calculate Solar Energy Potential using irradiance.
Parameters:
I (float): Solar irradiance in W/m²
A (float): Area of the solar panel in m²
eta (float): Panel efficiency (0 < eta <= 1)
alpha (float): Temperature coefficient (% per °C, e.g., -0.005 for -0.5%/°C)
T (float): Ambient temperature in °C
RH (float): Relative humidity in %
k (float): Atmospheric transmissivity constant (default=0.2)
T_ref (float): Reference temperature in °C (default=25)
Returns:
float: Solar Energy Potential in Watts
"""
# Calculate Adjusted Efficiency
adjusted_efficiency = eta * (1 + alpha * (T_ref - T))
# Calculate Atmospheric Transmissivity Factor
tau = 1 - k * (RH / 100)
# Calculate Solar Energy Potential (P)
P = I * A * adjusted_efficiency * tau
return P
# --- Convert Lux to Irradiance (W/m²) ---
def lux_to_irradiance(lux, angle_of_incidence=0):
"""
Convert Lux value to Irradiance in W/m².
Parameters:
lux (float): Lux value from LDR sensor
angle_of_incidence (float): Angle of light incidence in degrees (0 for direct sunlight)
Returns:
float: Irradiance in W/m²
"""
# Convert angle of incidence from degrees to radians
angle_radians = math.radians(angle_of_incidence)
cos_theta = math.cos(angle_radians)
# Assuming the light is visible and has a peak luminous efficacy of 683 lumens per watt
irradiance = lux / 683.0 / cos_theta # Converting lux to W/m²
return irradiance
# OLED Display function
def oled_show():
oled.fill(0)
oled.text("Temp:{:.1f} [{:.1f}]".format(temp, settemp), 0, 0)
oled.text("Humi:{:.1f} [{:.1f}]".format(humidity, sethumi), 0, 12)
oled.text("Kp:{:.1f}".format(Kp), 0, 24)
oled.text("Ki:{:.1f}".format(Ki), 0, 36)
oled.text("Kd:{:.1f}".format(Kd), 0, 48)
oled.show()
# Connect to Wi-Fi
def wifi_connect():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('Connecting to network...')
wlan.connect(WIFI_SSID, WIFI_PASS)
while not wlan.isconnected():
pass
print('Network config:', wlan.ifconfig())
# Initialize MQTT client
def init_client():
global client
print("Trying to connect to MQTT broker.")
try:
client = MQTTClient(MQTT_CLIENT, MQTT_BROKER, port=1883, user=MQTT_USER, password=MQTT_PWD)
client.connect()
print("Connected to", MQTT_BROKER)
topic_sub = b"@msg/cmd"
print("Subscribed to", topic_sub)
client.set_callback(sub_cb)
client.subscribe(topic_sub)
except Exception as e:
print("Error initializing MQTT:", e)
# MQTT Subscription callback
def sub_cb(topic, msg):
print((topic, msg))
if topic == b'@msg/cmd':
cmdInt(msg.decode())
# Function to print fan speed when parameters change
def print_fan_speed():
print("Fan Speed:", fan_speed)
# Command interpreter
def cmdInt(userstr):
global settemp, sethumi, setenergy, Kp, Ki, Kd
userstr = userstr.strip()
if "=" not in userstr:
print("Invalid command format. Use <command>=<value>")
return
cmdstr, parmstr = userstr.split("=")
cmdstr = cmdstr.strip().lower()
parmstr = parmstr.strip()
try:
if cmdstr == "settemp":
settemp = float(parmstr)
settemp = max(0.0, min(40.0, settemp))
oled_show()
print_fan_speed()
elif cmdstr == "sethumi":
sethumi = float(parmstr)
sethumi = max(40.0, min(60.0, sethumi))
oled_show()
elif cmdstr == "setenergy":
setenergy = float(parmstr)
setenergy = max(0.0, min(30.0, setenergy))
# oled_show()
elif cmdstr == "kp":
Kp = float(parmstr)
print_fan_speed()
oled_show()
elif cmdstr == "ki":
Ki = float(parmstr)
print_fan_speed()
oled_show()
elif cmdstr == "kd":
Kd = float(parmstr)
print_fan_speed()
oled_show()
else:
print("Invalid command:", cmdstr)
except ValueError:
print(f"Invalid value for {cmdstr}. Please enter a numeric value.")
def send_message_on_button_press():
from time import localtime
current_time = localtime()
timestamp = "{:02}:{:02}:{:02}".format(current_time[3], current_time[4], current_time[5])
message = "Time-stamp : {} Temp : {} °C Humidity : {} %, Irradiance : {:.2f} W/m² Solar Potential : {:.2f} W".format(timestamp,temp,humidity,I,solar_potential)
if online and client:
print("Publishing to NETPIE:", message)
client.publish("@msg/update", message)
# Fan Speed Calculation Function
def calculate_fan_speed(settemp, temp):
global previous_error, integral
max_rpm = 5000 # Maximum fan speed in RPM
max_pwm = 1023 # Maximum PWM value (0-1023 for ESP32)
# Calculate PID terms
error = abs(settemp - temp)
integral += error
derivative = error - previous_error
previous_error = error
# PID Output
pid_output = Kp * error + Ki * integral + Kd * derivative
# Map PID Output to Fan Speed (0-1023 for PWM duty)
setpoint = (error / 80) * max_pwm # Scaled to PWM range (0-1023)
fan_speed = min(setpoint, max(0, int(pid_output))) # Fan duty cycle as PWM value
# Fan Duty Cycle in Percentage (0% to 100%)
fan_duty_cycle = (fan_speed / max_pwm) * 100 # Convert to duty cycle in percentage
# Calculate Actual Speed (RPM)
actual_speed = (fan_speed / max_pwm) * max_rpm # Scale PWM to actual RPM
return fan_duty_cycle, actual_speed, error, setpoint, fan_speed # Return all necessary values
# Initialization
time_prev = 0
fan_speed = 0 # Fan duty cycle (0-100%)
actual_speed = 0 # Fan speed in RPM
pwm_fan = PWM(fan, freq=500) # PWM for fan speed control
pwm_fan.duty(0) # Start with fan off
button_last_state = button.value()
button_last_time = time.ticks_ms()
DEBOUNCE_DELAY = 200 # milliseconds
# Main loop
if online:
wifi_connect()
init_client()
while True: # Main loop
try:
christmas()
button_current_state = button.value()
current_time = time.ticks_ms()
if button_current_state != button_last_state:
if time.ticks_diff(current_time, button_last_time) > DEBOUNCE_DELAY:
button_last_time = current_time
if not button_current_state: # Button pressed (active low)
send_message_on_button_press()
button_last_state = button_current_state
# Measure temperature and humidity
sensor.measure()
temp = sensor.temperature()
humidity = sensor.humidity()
except OSError as e:
print("Failed to read from DHT22:", e)
continue
# Fan and Heater Control
if temp > settemp:
max_pwm = 1023
fan_duty_cycle, actual_speed, error, setpoint, fan_speed = calculate_fan_speed(settemp, temp)
pwm_fan.duty(int(fan_duty_cycle * max_pwm / 100)) # Set fan PWM duty cycle (converted to 0-1023)
heater.off()
fan_status = 1
heater_status = 0
print(f"Error: {error:.2f} | Set Point: {setpoint:.2f} | Fan Speed: {fan_speed:.2f} | Actual RPM: {actual_speed:.2f}")
else:
pwm_fan.duty(0)
heater.on()
fan_status = 0
heater_status = 1
# Humidity Control
if humidity < sethumi:
pump.on()
pump_status = 1
else:
pump.off()
pump_status = 0
# Display updated status
oled_show()
lux = read_ldr()
I = lux_to_irradiance(lux, angle_of_incidence=0)
solar_potential = calculate_solar_energy_potential(I, A, eta, alpha, temp, humidity)
# Publish data via MQTT
if time.ticks_ms() - time_prev > PUBLISH_PERIOD:
time_prev = time.ticks_ms()
client.check_msg()
# Update data dictionary
sensor_data.update({
"set_temp": settemp,
"temperature": temp,
"set_humi": sethumi,
"humidity": humidity,
"set_energy": setenergy,
"fan": fan_status,
"heater": heater_status,
"pump": pump_status,
"Kp": Kp,
"Ki": Ki,
"Kd": Kd,
"error": error,
"fan_speed": fan_speed,
"fan_duty_cycle": fan_duty_cycle, # Fan duty cycle in percentage
"actual_speed": actual_speed, # Actual fan speed in RPM
"setpoint": setpoint
})
publish_str = ujson.dumps({"data": sensor_data})
client.publish("@shadow/data/update", publish_str)
if online:
client.disconnect()