"""
This project is inteneded to develop a robust system completly autonomously operted
based on the pre-determined water levels in the over head tank.
Out comes:
1. Reduces the wastage of overflow water from overhead tank.
2. Avoids manual intervetion for operating the pump.
3. Also reduces the Power Consumption, By cutting of the overflow time power.
Requirements:
1. Completely Autonoumous.
2. should be run On any conditions.
3. should be powered completely OFF GRID.
4. should be completely Wireless Communitcation.(PEER-PEER)
5. OTA updates.
6. IF required it should also be able to WORK on IOT.
7. try to keep logging
Use Cases:
1. Domestic.
2. Industrial.
3. Commercial.
"""
# TANK UNIT controller
"""
Requirements:
1. OFF-GRID power source.
2. Wireless Communication. (ESPNOW)
3. Auto-Restart.
4. Auto-Deep sleep.
"""
# variables
tank_height_cm = 100 # change this accordingly the tank height
battery_level = 0
is_pump= False
# constants
CRITCAL_BATTERY_LEVEL = 3.2 #change this according to critical battery level
# import Required modules
#------------------------
# initializing
#--------------------------
"""
#ultra sonic module
trigger_pin = machine.Pin(5, machine.Pin.OUT)
echo_pin = machine.Pin(18, machine.Pin.IN)
# Calculate the timeout for the ultrasonic sensor based on the tank height (in cm)
# Formula: timeout_us = (2 * tank_height_cm) / speed_of_sound (343 m/s converted to 0.0343 cm/µs)
# Adding a 10% buffer to account for potential delays or inaccuracies in timing
timeout_us = int((2 * tank_height_cm) / 0.0343 * 1.1)
"""
def get_distance():
# gets distance from ultransonic sensor
trigger_pin.value(0)
time.sleep_us(2)
trigger_pin.value(1)
time.sleep_us(10)
trigger_pin.value(0)
try:
pulse_time = machine.time_pulse_us(echo_pin, 1, timeout_us)
distance = (pulse_time * 0.0343) / 2
return round(distance, 2)
except OSError as e:
#print("Ultrasonic sensor error:", e)
return None
def get_tank_level():
# gets the distance from get_distance funtion and calculate the water level
distance_cm = get_distance_cm() # Get the distance from the ultrasonic sensor
if distance_cm is None or distance_cm > tank_height_cm:
return 0 # If sensor fails or distance is greater than tank height, return 0%
# Calculate the water level as a percentage and convert to a whole number (integer)
water_level_percentage = int((tank_height_cm - distance_cm) / tank_height_cm * 100)
return water_level_percentage
def is_sensor():
# checks wether the sensor is working or not
pass
#-------------------------
#power source module
def get_battery_level():
#gets battery level from the batter
pass
def is_solar():
#checks wether solar is available or not
pass
def is_battery():
return battery_level > CRITCAL_BATTERY_LEVEL
#--------------------------
# ESP NOW module
"""
data model
100,3.3
two data types with comma seperated values
"""
# required functions for esp now
import network # Import networking module for Wi-Fi functionality
import time # Import time module for delay functions
import espnow # Import ESP-NOW communication module
from machine import Pin # Import machine module for Pin control
#keep this in try block
# Set up the ESP32 Wi-Fi in station mode for ESP-NOW communication
try:
sta_if = network.WLAN(network.STA_IF) # Create WLAN interface object in STA mode (station mode)
sta_if.active(True) # Activate the Wi-Fi interface
sta_if.scan() # Perform a network scan to initialize Wi-Fi interface
# Initialize ESP-NOW communication
espnow.init()
# Prepare the ESP-NOW communication to send data to Tank Unit
peer_mac = b'\x00\x00\x00\x00\x00\x00' # Replace with the actual MAC address of the Tank Unit
espnow.add_peer(peer_mac) # Add Tank Unit as a peer in ESP-NOW communication
except OSError:
print('error',OSError)
"""
# Main loop to send a pulse and receive data from Tank Unit
while True:
#send_pulse() # Send a pulse to the Tank Unit to request data
print("Pulse sent to Tank Unit") # Print message for debugging
# Wait for a response (water level and battery level data) from the Tank Unit
message = espnow.recv() # Receive incoming data
if message:
sender_mac, received_data = message # Extract MAC address and received data
print("Received data:", received_data.decode()) # Print the received data (water level, battery level)
time.sleep(5) # Wait for 5 seconds before sending another pulse
"""
def send_data_to_pump_unit(water_level,battery_level):
#sends data to the pump unit as the given data model
# expects a return type true or false based on data sent or not
# implement try 3 times for re sending the data if not recived
try:
e.send(peer, 'Hello')
except OSError as err:
if len(err.args) < 2:
raise err
if err.args[1] == 'ESP_ERR_ESPNOW_NOT_INIT':
e.active(True)
elif err.args[1] == 'ESP_ERR_ESPNOW_NOT_FOUND':
e.add_peer(peer)
elif err.args[1] == 'ESP_ERR_ESPNOW_IF':
network.WLAN(network.WLAN.IF_STA).active(True)
else:
raise err
try:
status = espnow.send(peer_mac, to_byte(water_level,battery_level))
if status == 0:
print("Data sent successfully")
else:
print("Failed to send data")
except Exception as e:
print(f"Error while sending data: {e}")
def to_byte(water_level, battery_level):
"""
This function takes water level and battery level as inputs,
and returns byte data on custom payload
"""
return (str(water_level)+","+str(battery_level)).encode()
#-------------------------
""" decoding logic
def decode(string):
water,battery=string.split(",")
return [int(water),float(battery)]
"""
# controller module
#ensures that the module is working fine without crashing and other
# implemment internal and external WATCH DOG TIMEER
#------------------------
# status led module
# indicating it based on the blinking n
# main logic ____
def is_level_changed():
# returns true if water level changes
temp_tank_level = get_tank_level()
if tank_level != temp_tank_level:
tank_level = temp_tank_level
return True
return False
def if_level_changed_send_data():
# sends data if water level changed
if is_level_changed():
send_data_to_pump_unit()
def is_listener_active():
# checks wherter the pump unit is acitve or not
# and also try to implement the is_pump in this method
pass
def is_pump():
# implement check wether the pump is active or not in controller unit
def main():
# main method for logic implementation
if is_pump:
if is_level_changed():
send_data_to_pump_unit()
# implement to check wether the pump is switched of or listener went offline
pass
else:
pass
# implement the pump unit should maintain handshake protocol if failse should modify the is_pump to false
if_level_changed_send_data()
# if the pump is in on state it should never go into deep sleep
else:
if is_listener_active():
if_level_changed_send_data()
is_pump()
else:
# should sleep for 5 mins
# after 5 mins should intintlize rquired after sleep
pass
is_listener_active=False
is_pump_active=False
def is_listener_active():
# should have to send data
# if true recived then have to wait for handshake protocol and should modify
# two variables
# if false retuurn false or modify the data
def handshake():
# trys to recive data instantlty after sendign the wawter level
is_pump_active=False
try:
for i in range(3):
peer, msg = e.recv(timeout_ms=1000)
if msg == b'PACT':
is_pump_active=True
return
except Exception as e:
# restart or do oter thing
print(f"An error occurred during receiving: {e}")
return
def is_pump_active():
# it will sends data to tank unit
# if true returns it will ask for handshake
try:
status = espnow.send(peer_mac, to_byte(water_level,battery_level))
if status == True:
# shluld do handshake
handshake()
print("Data sent successfully")
else:
# if data fails then should modify two variable
is_listener_active=False
is_pump_active=False
print("Failed to send data")
except Exception as e:
print(f"Error while sending data: {e}")
def main2():
if is_listener_active:
if is_pump_active:
# if pump is acitve it should never go to sleep
# and constantly give data to the pump unit
# should check if the motor is swithced off
#send_data_to_pump_unit()
is_pump_active() # checks wether the pump is acitve or not
else:
# should have to check wether the listener is active or not for 10 mins freq
# should also checks if the motor is switched on or not
# should have to check the for 1 min
pass
else:
# should have to check if listner is acitive or not
# if not should go to deep sleep for 5 mins
pass
def suspend():
# susspends device untill the battery level increaases
# device notificaion critcal report
# deep sleep for sometime
pass
while True:
if is_battery():
main()
else:
suspend()
if is_battery():
# things which are required to get initialized after deepsleep
pass
# watch dog
# garbage collector
#___________________