from machine import Pin, time_pulse_us, reset
import utime
import network
import ujson
import os
import _thread
import gc # Для управления сборщиком мусора
from http_server import start_http_server # Импортируем HTTP-сервер
# Пины для компонентов
TRIG_PIN = 26 # Триггер ультразвукового датчика
ECHO_PIN = 12 # Эхо ультразвукового датчика
RELAY_PIN = 27 # Реле для управления насосом
RED_LED = 4 # Красный светодиод (100 см)
ORANGE_LED = 17 # Оранжевый светодиод (75 см)
YELLOW_LED = 2 # Желтый светодиод (50 см)
GREEN_LED = 16 # Зеленый светодиод (10 см)
# Константы файлов конфигурации
WIFI_CONFIG_FILE = "wifi_config.json"
TANK_CONFIG_FILE = "tank_config.json"
LOG_FILE = "log.txt"
# Глобальные переменные
wlan = network.WLAN(network.STA_IF) # Клиентский режим Wi-Fi
ap = network.WLAN(network.AP_IF) # Точка доступа
last_water_level = -1
current_water_level = -1
auto_mode = True # Автоматический режим включен по умолчанию
# Глобальные переменные для управления миганием светодиодов
red_blink_start = 0
orange_blink_start = 0
yellow_blink_start = 0
green_blink_start = 0
# Логирование
def log(message):
try:
with open(LOG_FILE, 'a') as f:
f.write(f"{utime.time()}: {message}\n")
except Exception as e:
print(f"Ошибка записи в лог: {e}")
print(message)
# Инициализация компонентов
trig = Pin(TRIG_PIN, Pin.OUT)
echo = Pin(ECHO_PIN, Pin.IN)
relay = Pin(RELAY_PIN, Pin.OUT, value=0) # Реле для насоса
red_led = Pin(RED_LED, Pin.OUT, value=0)
orange_led = Pin(ORANGE_LED, Pin.OUT, value=0)
yellow_led = Pin(YELLOW_LED, Pin.OUT, value=0)
green_led = Pin(GREEN_LED, Pin.OUT, value=0)
log("Компоненты инициализированы:")
log(f"TRIG_PIN={TRIG_PIN}, ECHO_PIN={ECHO_PIN}, RELAY_PIN={RELAY_PIN}")
log(f"LED Pins: RED={RED_LED}, ORANGE={ORANGE_LED}, YELLOW={YELLOW_LED}, GREEN={GREEN_LED}")
# Функция для загрузки настроек Wi-Fi и бака
def load_configs():
wifi_config = {"ssid": "", "password": ""}
tank_config = {"tank_height": 40, "max_fill_level": 35, "sensor_position": 38}
# Загрузка настроек Wi-Fi
if WIFI_CONFIG_FILE in os.listdir():
try:
with open(WIFI_CONFIG_FILE, 'r') as f:
wifi_config = ujson.load(f)
log(f"Настройки Wi-Fi загружены: SSID={wifi_config.get('ssid', '')}, Password={'*' * len(wifi_config.get('password', '')) if wifi_config.get('password') else 'пусто'}")
except Exception as e:
log(f"Ошибка при чтении wifi_config.json: {e}")
else:
log("Файл wifi_config.json отсутствует.")
save_wifi_config("", "")
# Загрузка настроек бака
if TANK_CONFIG_FILE in os.listdir():
try:
with open(TANK_CONFIG_FILE, 'r') as f:
tank_config = ujson.load(f)
log(f"Настройки бака загружены: {tank_config}")
except Exception as e:
log(f"Ошибка при чтении tank_config.json: {e}")
else:
log("Файл tank_config.json отсутствует.")
save_tank_config({"tank_height": 40, "max_fill_level": 35, "sensor_position": 38})
return wifi_config, tank_config
# Сохранение настроек Wi-Fi
def save_wifi_config(ssid, password):
config = {"ssid": ssid, "password": password}
try:
with open(WIFI_CONFIG_FILE, 'w') as f:
ujson.dump(config, f)
log("Настройки Wi-Fi сохранены")
except Exception as e:
log(f"Ошибка при сохранении настроек Wi-Fi: {e}")
# Сохранение настроек бака
def save_tank_config(config):
try:
with open(TANK_CONFIG_FILE, 'w') as f:
ujson.dump(config, f)
log("Настройки бака сохранены")
except Exception as e:
log(f"Ошибка при сохранении настроек бака: {e}")
# Перезагрузка Wi-Fi модуля
def reset_wifi():
try:
wlan.active(False)
utime.sleep(1)
wlan.active(True)
utime.sleep(2)
log("Wi-Fi модуль перезагружен.")
except Exception as e:
log(f"Ошибка при перезагрузке Wi-Fi: {e}")
# Подключение к Wi-Fi
def connect_wifi(ssid, password):
try:
reset_wifi()
log(f"Попытка подключения к Wi-Fi ({ssid})...")
if not password:
log("Подключение к открытой сети.")
wlan.connect(ssid)
else:
wlan.connect(ssid, password)
attempts = 0
while not wlan.isconnected() and attempts < 2:
log(f"Попытка {attempts + 1}/2... Ожидание подключения.")
utime.sleep(5)
attempts += 1
if wlan.isconnected():
log(f"Успешно подключено к Wi-Fi ({ssid}). IP: {wlan.ifconfig()[0]}")
return True
else:
log(f"Не удалось подключиться к Wi-Fi ({ssid}) после {attempts} попыток.")
return False
except Exception as e:
log(f"Ошибка подключения к Wi-Fi: {e}")
return False
# Активация режима точки доступа (AP)
def start_ap():
try:
ap.active(True)
ap.config(essid="WaterTank_AP", authmode=network.AUTH_WPA_WPA2_PSK, password="water1234")
log("Режим AP активирован")
log(f"Подключитесь к сети Wi-Fi: WaterTank_AP (пароль: water1234)")
except Exception as e:
log(f"Ошибка при активации точки доступа: {e}")
# Измерение уровня воды
def get_water_level(tank_config):
try:
trig.off()
utime.sleep_us(2)
trig.on()
utime.sleep_us(10)
trig.off()
duration = time_pulse_us(echo, 1, 30000) # Измеряем время сигнала
distance = (duration * 0.0343) / 2
max_distance_cm = tank_config["tank_height"] # Максимальная высота бака
max_fill_level = tank_config["max_fill_level"] # Максимальный уровень заполнения
sensor_position = tank_config["sensor_position"] # Расположение датчика
water_level_cm = sensor_position - distance
water_level_percent = int((water_level_cm / max_fill_level) * 100)
return max(0, min(water_level_percent, 100))
except Exception as e:
log(f"Ошибка измерения уровня воды: {e}")
return None # Возвращаем None при ошибке
# Управление автоматическим режимом
def set_auto_mode(value):
global auto_mode, last_water_level, current_water_level, tank_config
auto_mode = value
if auto_mode: # Если включен автоматический режим
water_level = get_water_level(tank_config)
if water_level is not None:
control_relay(water_level, auto_mode) # Немедленно проверяем уровень воды и управляем насосом
log(f"Автоматический режим активирован. Текущий уровень воды: {water_level}%")
# Управление насосом
def control_relay(water_level_percent, auto_mode):
if auto_mode and water_level_percent is not None:
# Насос выключается, если вода ≤ 3%
if water_level_percent <= 3:
relay.value(0) # Выключаем насос
log(f"Насос выключен (уровень воды: {water_level_percent}%).")
# Насос включается, если вода ≥ 10%
elif water_level_percent >= 10:
relay.value(1) # Включаем насос
log(f"Насос включен (уровень воды: {water_level_percent}%).")
# Если уровень воды между 3% и 10%, ничего не делаем
else:
log(f"Уровень воды: {water_level_percent}%. Насос находится в промежуточном состоянии.")
# Обновление состояния светодиодов
def update_leds():
global red_blink_start, orange_blink_start, yellow_blink_start, green_blink_start, current_water_level
while True:
try:
water_level_percent = current_water_level
# Выключаем все светодиоды
green_led.value(0)
yellow_led.value(0)
orange_led.value(0)
red_led.value(0)
current_time = utime.ticks_ms()
# Красный светодиод (100 см)
if water_level_percent >= 6 and water_level_percent < 25:
if utime.ticks_diff(current_time, red_blink_start) > 600:
red_led.value(not red_led.value())
red_blink_start = current_time
elif water_level_percent < 5:
red_led.value(0)
# Оранжевый светодиод (75 см)
if water_level_percent >= 36 and water_level_percent < 50:
if utime.ticks_diff(current_time, orange_blink_start) > 1000:
orange_led.value(not orange_led.value())
orange_blink_start = current_time
elif water_level_percent < 25:
orange_led.value(0)
# Желтый светодиод (50 см)
if water_level_percent >= 61 and water_level_percent < 75:
if utime.ticks_diff(current_time, yellow_blink_start) > 1000:
yellow_led.value(not yellow_led.value())
yellow_blink_start = current_time
elif water_level_percent < 50:
yellow_led.value(0)
# Зеленый светодиод (10 см)
if water_level_percent >= 86:
green_led.value(1)
elif water_level_percent >= 76 and water_level_percent < 86:
if utime.ticks_diff(current_time, green_blink_start) > 1000:
green_led.value(not green_led.value())
green_blink_start = current_time
elif water_level_percent < 75:
green_led.value(0)
utime.sleep_ms(100)
except Exception as e:
log(f"Ошибка управления светодиодами: {e}")
# Закрытие всех портов перед перезагрузкой
def close_all_ports():
log("Закрытие всех портов перед перезагрузкой...")
try:
wlan.disconnect()
wlan.active(False)
log("Wi-Fi отключен.")
except Exception as e:
log(f"Ошибка отключения Wi-Fi: {e}")
try:
ap.active(False)
log("Точка доступа отключена.")
except Exception as e:
log(f"Ошибка отключения точки доступа: {e}")
# Основной цикл программы
def main():
global last_water_level, auto_mode, current_water_level
try:
wifi_config, tank_config = load_configs()
ssid = wifi_config.get("ssid", "")
password = wifi_config.get("password", "")
log(f"Загружены настройки Wi-Fi: SSID={ssid}, Password={'*' * len(password) if password else 'пусто'}")
log(f"Загружены настройки бака: {tank_config}")
if ssid:
if not connect_wifi(ssid, password):
start_ap()
else:
log("Настройки Wi-Fi отсутствуют. Переход в режим AP.")
start_ap()
ip = wlan.ifconfig()[0] if wlan.isconnected() else ap.ifconfig()[0]
log(f"HTTP-сервер будет работать по адресу {ip}")
# Запуск HTTP-сервера
start_http_server(
ip,
ap,
wlan,
relay,
lambda: get_water_level(tank_config),
set_auto_mode,
lambda: globals().get('auto_mode', True),
close_all_ports,
save_wifi_config,
save_tank_config,
tank_config
)
# Запуск управления светодиодами
_thread.start_new_thread(update_leds, ())
while True:
try:
water_level = get_water_level(tank_config)
if water_level != last_water_level:
last_water_level = water_level
current_water_level = water_level
log(f"Уровень воды: {water_level}%")
control_relay(water_level, auto_mode)
# Очистка памяти каждую итерацию цикла
gc.collect()
log(f"Свободная память: {gc.mem_free()} байт")
utime.sleep(7)
except Exception as e:
log(f"Ошибка в основном цикле: {e}")
except Exception as e:
log(f"Критическая ошибка: {e}")
reset()
main()