from machine import Pin, time_pulse_us
import urequests
import utime
import network
import ujson
import _thread
# Конфигурация бака
tank_config = {
"tank_height": 40, # Общая высота бака (см)
"max_fill_level": 35, # Максимальный уровень заполнения (см)
"sensor_position": 40 # Высота установки датчика от дна (см)
}
# Пины для компонентов
TRIG_PIN = 26
ECHO_PIN = 12
RELAY_PIN = 27
RED_LED = 4 # Красный светодиод (100 см) и индикатор Wi-Fi
ORANGE_LED = 17 # Оранжевый светодиод (75 см)
YELLOW_LED = 2 # Желтый светодиод (50 см)
GREEN_LED = 16 # Зеленый светодиод (10 см)
# Настройки Wi-Fi и Telegram
SSID = "Wokwi-GUEST"
PASSWORD = ""
TELEGRAM_BOT_TOKEN = "7991237697:AAHRjliS3nf2JoDmUQ0Zyd4G778HzOntqvI"
CHAT_ID = "1285166140"
# Инициализация Wi-Fi
wlan = network.WLAN(network.STA_IF)
wifi_connected = False # Флаг состояния Wi-Fi
# Подключение к Wi-Fi
def connect_wifi():
global wifi_connected
wlan.active(True)
wlan.connect(SSID, PASSWORD)
while not wlan.isconnected():
print("Подключение к Wi-Fi...")
utime.sleep(20)
wifi_connected = True
print("Wi-Fi подключен")
print("IP:", wlan.ifconfig()[0])
# Поток для поддержания подключения к Wi-Fi
def wifi_connection_thread():
global wifi_connected
while True:
if not wlan.isconnected():
wifi_connected = False
print("Потеряно соединение с Wi-Fi. Повторное подключение...")
connect_wifi()
utime.sleep(60) # Проверяем подключение каждые 60 секунд
# Инициализация компонентов
trig = Pin(TRIG_PIN, Pin.OUT)
echo = Pin(ECHO_PIN, Pin.IN)
relay = Pin(RELAY_PIN, Pin.OUT, value=1)
red_led = Pin(RED_LED, Pin.OUT)
orange_led = Pin(ORANGE_LED, Pin.OUT)
yellow_led = Pin(YELLOW_LED, Pin.OUT)
green_led = Pin(GREEN_LED, Pin.OUT)
last_water_level = -1
# Измерение уровня воды
def get_water_level():
trig.off()
utime.sleep_us(2)
trig.on()
utime.sleep_us(10)
trig.off()
duration = time_pulse_us(echo, 1, 30000) # Измеряем время сигнала
if duration < 0:
return 0 # Ошибка измерения
distance = (duration * 0.0343) / 2 # Расстояние до воды в см
# Рассчитываем уровень воды
water_level_cm = tank_config["sensor_position"] - distance
water_level_cm = max(0, min(water_level_cm, tank_config["max_fill_level"])) # Ограничиваем диапазон
water_level_percent = int((water_level_cm / tank_config["max_fill_level"]) * 100)
return water_level_percent
# Обновление состояния светодиодов
def update_leds(water_level_percent):
# Выключаем все светодиоды
green_led.value(0)
yellow_led.value(0)
orange_led.value(0)
red_led.value(0)
# Красный светодиод (низкий уровень)
if water_level_percent <= 25:
red_led.value(1)
elif 5 <= water_level_percent <= 14:
while 5 <= water_level_percent <= 14: # Мигает непрерывно
red_led.value(1)
utime.sleep(0.5)
red_led.value(0)
utime.sleep(0.5)
water_level_percent = get_water_level()
# Оранжевый светодиод (средний уровень)
if 26 <= water_level_percent <= 50:
orange_led.value(1)
elif 26 <= water_level_percent <= 35:
while 26 <= water_level_percent <= 35: # Мигает непрерывно
orange_led.value(1)
utime.sleep(0.5)
orange_led.value(0)
utime.sleep(0.5)
water_level_percent = get_water_level()
# Желтый светодиод (высокий уровень)
if 51 <= water_level_percent <= 75:
yellow_led.value(1)
elif 51 <= water_level_percent <= 60:
while 51 <= water_level_percent <= 60: # Мигает непрерывно
yellow_led.value(1)
utime.sleep(0.5)
yellow_led.value(0)
utime.sleep(0.5)
water_level_percent = get_water_level()
# Зеленый светодиод (полный уровень)
if water_level_percent >= 76:
green_led.value(1)
elif 76 <= water_level_percent <= 85:
while 76 <= water_level_percent <= 85: # Мигает непрерывно
green_led.value(1)
utime.sleep(0.5)
green_led.value(0)
utime.sleep(0.5)
water_level_percent = get_water_level()
# Управление насосом
def control_relay(water_level_percent):
relay.value(1 if water_level_percent >= 5 else 0)
# Отправка уведомлений в Telegram
def send_telegram_message(water_level_percent):
water_level_cm = int(tank_config["max_fill_level"] * (water_level_percent / 100))
message = f"Уровень воды: {water_level_percent}% ({water_level_cm} см)"
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
try:
response = urequests.post(url, json={"chat_id": CHAT_ID, "text": message})
response.close()
except Exception as e:
print("Ошибка отправки сообщения в Telegram:", e)
# HTTP-сервер для Android-приложения
def http_server(ip):
import socket
addr = socket.getaddrinfo(ip, 80)[0][-1]
s = socket.socket()
s.bind(addr)
s.listen(1)
print("Сервер запущен по адресу", addr)
while True:
cl, addr = s.accept()
print("Клиент подключен с адреса", addr)
request = cl.recv(1024)
print("Запрос:", request)
# Отправка JSON с уровнем воды
water_level_json = ujson.dumps({"water_level": last_water_level})
response = f"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n\r\n{water_level_json}"
cl.send(response)
cl.close()
# Основная логика
def main():
global last_water_level
# Запуск потока для Wi-Fi
_thread.start_new_thread(wifi_connection_thread, ())
# Стартуем HTTP-сервер в отдельном потоке
ip = wlan.ifconfig()[0]
_thread.start_new_thread(http_server, (ip,))
while True:
water_level = get_water_level()
# Проверка изменения уровня воды
if water_level != last_water_level:
last_water_level = water_level
print(f"Уровень воды: {water_level}%")
# Отправка уведомления в Telegram
send_telegram_message(water_level)
# Обновляем состояние светодиодов и управление реле
update_leds(water_level)
control_relay(water_level)
utime.sleep(5) # Основная задержка между измерениями
# Запуск основной функции
main()