from machine import Pin, time_pulse_us, Timer
import urequests
import utime
import network
import ujson
import _thread
import socket
import machine
# Пины для компонентов
TRIG_PIN = 26
ECHO_PIN = 12
RELAY_PIN = 27
RED_LED = 4 # Красный светодиод (100 см) и индикатор Wi-Fi
ORANGE_LED = 17 # Оранжевый светодиод (75 см)
YELLOW_LED = 2 # Желтый светодиод (50 см)
GREEN_LED = 16 # Зеленый светодиод (10 см)
MAX_DISTANCE_CM = 39 # Максимальная высота бака в см
SSID = "Home130"
PASSWORD = "rooter2017"
TELEGRAM_BOT_TOKEN = "7991237697:AAHRjliS3nf2JoDmUQ0Zyd4G778HzOntqvI"
CHAT_ID = "1285166140"
# Инициализация Wi-Fi
wlan = network.WLAN(network.STA_IF)
wifi_connected = False # Флаг состояния Wi-Fi
# Инициализация компонентов
trig = Pin(TRIG_PIN, Pin.OUT)
echo = Pin(ECHO_PIN, Pin.IN)
relay = Pin(RELAY_PIN, Pin.OUT, value=1)
red_led = Pin(RED_LED, Pin.OUT)
orange_led = Pin(ORANGE_LED, Pin.OUT)
yellow_led = Pin(YELLOW_LED, Pin.OUT)
green_led = Pin(GREEN_LED, Pin.OUT)
last_water_level = -1
led_blink_state = False # Состояние мигания светодиодов
# Пытаемся подключиться к Wi-Fi
def connect_wifi():
global wifi_connected
if not wlan.isconnected():
print("Подключение к Wi-Fi...")
wlan.active(True)
wlan.connect(SSID, PASSWORD)
# Ожидание подключения с таймаутом
timeout = 20 # Максимум 20 секунд
start_time = utime.time()
while not wlan.isconnected():
if utime.time() - start_time > timeout:
print("Не удалось подключиться к Wi-Fi")
wifi_connected = False
return False
utime.sleep(1)
print("Wi-Fi подключен")
print("IP:", wlan.ifconfig()[0])
wifi_connected = True
return True
return True
# Поток для поддержания подключения к Wi-Fi
def wifi_connection_thread():
global wifi_connected
while True:
if not wlan.isconnected():
wifi_connected = False
print("Потеряно соединение с Wi-Fi. Повторное подключение...")
connect_wifi()
utime.sleep(10) # Проверяем подключение каждые 10 секунд
# Измерение уровня воды
def get_water_level():
try:
trig.off()
utime.sleep_us(2)
trig.on()
utime.sleep_us(10)
trig.off()
duration = time_pulse_us(echo, 1, 30000) # Измеряем время сигнала
if duration < 0:
print("Ошибка измерения расстояния")
return -1
distance = (duration * 0.0343) / 2
water_level_percent = int((MAX_DISTANCE_CM - distance) * 100 / MAX_DISTANCE_CM)
return max(0, min(water_level_percent, 100))
except Exception as e:
print("Ошибка в get_water_level:", e)
return -1
# Обновление состояния светодиодов в зависимости от уровня воды
def update_leds(water_level_percent):
global led_blink_state
# Красный светодиод
if 15 <= water_level_percent <= 25:
red_led.value(1)
elif 5 <= water_level_percent <= 14:
red_led.value(led_blink_state)
else:
red_led.value(0)
# Оранжевый светодиод
if 36 <= water_level_percent <= 50:
orange_led.value(1)
elif 26 <= water_level_percent <= 35:
orange_led.value(led_blink_state)
else:
orange_led.value(0)
# Желтый светодиод
if 61 <= water_level_percent <= 75:
yellow_led.value(1)
elif 51 <= water_level_percent <= 60:
yellow_led.value(led_blink_state)
else:
yellow_led.value(0)
# Зеленый светодиод
if water_level_percent >= 86:
green_led.value(1)
elif 76 <= water_level_percent <= 85:
green_led.value(led_blink_state)
else:
green_led.value(0)
# Управление насосом
def control_relay(water_level_percent):
relay.value(1 if water_level_percent >= 5 else 0)
# Отправка уведомления в Telegram
def send_telegram_message(water_level_percent):
if not wifi_connected:
print("Wi-Fi не подключен. Сообщение не отправлено.")
return
message = f"Уровень воды в баке: {water_level_percent}%"
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
try:
response = urequests.post(url, json={"chat_id": CHAT_ID, "text": message})
response.close()
except Exception as e:
print("Ошибка отправки сообщения в Telegram:", e)
# HTTP-сервер для дашборда
def http_server(ip):
addr = socket.getaddrinfo(ip, 80)[0][-1]
s = socket.socket()
s.bind(addr)
s.listen(1)
print("Сервер запущен по адресу", addr)
while True:
cl, addr = s.accept()
print("Клиент подключен с адреса", addr)
request = cl.recv(1024).decode('utf-8')
print("Запрос:", request)
# Определяем маршрут
if "GET /dashboard" in request:
# Возвращаем HTML-страницу
response = """HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n"""
with open("dashboard_1.html", "r") as f:
response += f.read()
cl.send(response.encode('utf-8'))
elif "GET /water-level" in request:
# Возвращаем JSON с уровнем воды и статусом насоса
water_level_json = ujson.dumps({
"water_level": last_water_level,
"pump_status": "on" if relay.value() == 1 else "off"
})
response = f"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n\r\n{water_level_json}"
cl.send(response.encode('utf-8'))
elif "POST /toggle-pump" in request:
# Переключаем насос
relay.value(1 if relay.value() == 0 else 0)
response = ujson.dumps({"pump_status": "on" if relay.value() == 1 else "off"})
cl.send(f"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n\r\n{response}".encode('utf-8'))
elif "POST /start-pump" in request:
# Включаем насос
relay.value(1)
response = ujson.dumps({"pump_status": "on"})
cl.send(f"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n\r\n{response}".encode('utf-8'))
elif "POST /stop-pump" in request:
# Выключаем насос
relay.value(0)
response = ujson.dumps({"pump_status": "off"})
cl.send(f"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n\r\n{response}".encode('utf-8'))
elif "POST /restart" in request:
# Перезагрузка ESP32
response = "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n\r\nПерезагрузка ESP32..."
cl.send(response.encode('utf-8'))
machine.reset()
else:
# Возвращаем 404 для неизвестных маршрутов
response = "HTTP/1.1 404 Not Found\r\nContent-Type: text/plain\r\n\r\n404 Not Found"
cl.send(response.encode('utf-8'))
cl.close()
# Основная логика
def main():
global last_water_level, led_blink_state
# Запуск потока для Wi-Fi
_thread.start_new_thread(wifi_connection_thread, ())
# Запуск HTTP-сервера
ip = wlan.ifconfig()[0]
_thread.start_new_thread(http_server, (ip,))
# Таймер для мигания светодиодов
def blink_timer_callback(timer):
global led_blink_state
led_blink_state = not led_blink_state
blink_timer = Timer(-1)
blink_timer.init(period=500, mode=Timer.PERIODIC, callback=blink_timer_callback)
while True:
try:
water_level = get_water_level()
if water_level == -1:
print("Ошибка измерения уровня воды")
utime.sleep(5)
continue
# Проверка изменения уровня воды
if water_level != last_water_level:
last_water_level = water_level
print(f"Уровень воды: {water_level}%")
# Отправка уведомления в Telegram
if water_level in [10, 30, 50, 70]:
send_telegram_message(water_level)
# Обновляем состояние светодиодов и управление реле
update_leds(water_level)
control_relay(water_level)
utime.sleep(5) # Основная задержка между измерениями
except Exception as e:
print("Ошибка в основном цикле:", e)
utime.sleep(5)
# Запуск основной функции
main()