from machine import Pin
from time import sleep, ticks_ms
import time
from dht import DHT22
import network
import ujson
import urequests
# Function to connect to WiFi
def connect_to_wifi():
print("Connecting to WiFi", end="")
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect("Wokwi-GUEST", "")
while not wlan.isconnected():
print(".", end="")
time.sleep(0.1)
print(" Connected!")
print(wlan.ifconfig())
connect_to_wifi()
# Relay setup
Relays = [Pin(i, Pin.OUT) for i in [28, 27, 26, 22]]
# DHT22 sensor setup
dht_sensor = DHT22(Pin(16))
# Button setup
control_switch = Pin(2, Pin.IN, Pin.PULL_UP) # شستی کنترل
manual_switch = Pin(3, Pin.IN, Pin.PULL_UP) # شستی دستی
power_button = Pin(4, Pin.IN, Pin.PULL_UP) # شستی پاور
manual_buttons = [Pin(i, Pin.IN, Pin.PULL_UP) for i in range(5, 8)] # شستیهای دستی
# Button states and debouncing
button_flags = [False] * len(manual_buttons)
button_last_press = [0] * (len(manual_buttons) + 3) # اضافه کردن فضای کافی برای همه دکمهها
# System state
state = {
"power_on": False,
"manual_mode": False,
"last_power_state": False,
"last_action_time": 0,
"cooldown_time": 5000,
"last_temp": None,
"last_auto_message": None,
"messages": []
}
DEBOUNCE_TIME = 50
TEMP_RANGES = {"MIN": 18, "NORMAL_MIN": 24, "NORMAL_MAX": 27}
# Log messages
def log_message(message):
print(message)
state["messages"].append(message)
if len(state["messages"]) > 50:
state["messages"] = state["messages"][-50:]
# Function to read temperature
def temperature():
try:
dht_sensor.measure()
temp = dht_sensor.temperature()
if temp != state["last_temp"]:
log_message(f"Temperature measured: {temp}C")
state["last_temp"] = temp
return temp
except Exception as e:
log_message(f"Sensor Error: {e}")
return None
# Function to read status from URL
def read_status_from_url():
try:
response = urequests.get("https://www.rash32.ir/python/micropython/SmartWaterCooler/data.php", timeout=5)
if response.status_code == 200:
data = ujson.loads(response.text)
if isinstance(data, dict):
return data
elif isinstance(data, list) and len(data) > 0:
log_message("Received a list, extracting the first item.")
return data[0]
else:
log_message(f"Error: Unexpected data format: {type(data)}")
return None
else:
log_message(f"Error reading data from URL: {response.status_code}")
return None
except Exception as e:
log_message(f"Connection Error: {e}")
return None
# Update relay states
def update_relays(status):
if isinstance(status, dict):
log_message(f"Updating relays with status: {status}")
Relays[0].value(int(status.get("Power", 0)))
Relays[1].value(int(status.get("Pump", 0)))
Relays[2].value(int(status.get("Engine", 0)))
speed = status.get("Speed", "0")
Relays[3].value(1 if speed == "1" else 0)
else:
log_message("Error: Status is not a dictionary")
# Automatic control logic
def auto_control():
temp = temperature()
if temp is None:
return
current_time = ticks_ms()
if current_time - state["last_action_time"] < state["cooldown_time"]:
return
relay_status = [0] * 4
new_message = ""
if temp < TEMP_RANGES["MIN"]:
relay_status = [0, 0, 0, 0]
new_message = f"Temperature below {TEMP_RANGES['MIN']}C, all relays off."
elif temp < TEMP_RANGES["NORMAL_MIN"]:
relay_status = [1, 0, 0, 0]
new_message = f"Temperature below {TEMP_RANGES['NORMAL_MIN']}C, power on."
elif temp <= TEMP_RANGES["NORMAL_MAX"]:
relay_status = [1, 1, 1, 1]
new_message = f"Temperature in normal range ({TEMP_RANGES['NORMAL_MIN']}-{TEMP_RANGES['NORMAL_MAX']}C), all relays on."
else:
relay_status = [1, 1, 1, 0]
new_message = f"Temperature above {TEMP_RANGES['NORMAL_MAX']}C, relays 1, 2, 3 on, relay 4 off."
for i in range(len(relay_status)):
Relays[i].value(relay_status[i])
if new_message != state["last_auto_message"]:
log_message(f"Auto control: {new_message}")
state["last_auto_message"] = new_message
state["last_action_time"] = current_time
# Manual control logic
def manual_control():
current_time = ticks_ms()
for i in range(len(manual_buttons)):
if manual_buttons[i].value() == 0 and not button_flags[i] and current_time - button_last_press[i + 3] > DEBOUNCE_TIME:
button_flags[i] = True
Relays[i + 1].value(1) # Turn on the relay
button_last_press[i + 3] = current_time
log_message(f"Manual control: Relay {i + 1} on")
elif manual_buttons[i].value() == 1 and button_flags[i] and current_time - button_last_press[i + 3] > DEBOUNCE_TIME:
button_flags[i] = False
Relays[i + 1].value(0) # Turn off the relay
button_last_press[i + 3] = current_time
log_message(f"Manual control: Relay {i + 1} off")
# Function to update relays based on the current state
def set_relays_based_on_mode():
if not state["power_on"]:
# If power is off, turn off all relays
turn_off_all_relays()
return
# Set relays based on mode
if state["manual_mode"]:
# In manual mode, set relays based on button states
manual_control()
else:
# In automatic mode, use automatic control function
auto_control()
# Function to turn off all relays
def turn_off_all_relays():
for relay in Relays:
relay.value(0)
log_message("All relays turned off.")
# Main loop
while True:
current_time = ticks_ms()
# Power button control
if power_button.value() == 0 and not state["power_on"]:
if current_time - button_last_press[0] > DEBOUNCE_TIME:
state["power_on"] = True
Relays[0].value(1) # Power relay on
log_message("Power on: Relays enabled.")
button_last_press[0] = current_time
elif power_button.value() == 1 and state["power_on"]:
if current_time - button_last_press[0] > DEBOUNCE_TIME:
state["power_on"] = False
turn_off_all_relays() # Turn off all relays
log_message("Power off: All relays disabled.")
button_last_press[0] = current_time
# Control Mode
if state["power_on"]:
if control_switch.value() == 0 and state["manual_mode"]:
# سوئیچ به حالت خودکار (سنسور)
turn_off_all_relays() # خاموش کردن رلهها قبل از تغییر حالت
state["manual_mode"] = False # حالت خودکار
log_message("Switched to sensor mode")
elif control_switch.value() == 1 and not state["manual_mode"]:
# سوئیچ به حالت دستی
state["manual_mode"] = True # حالت دستی
log_message("Switched to manual mode")
# Manual or Automatic control based on mode
set_relays_based_on_mode()
sleep(0.1) # Add a small delay for smoother operation