from machine import Pin, ADC, I2C
from time import sleep_ms, ticks_ms, ticks_diff
from i2c_lcd import I2cLcd
#LCD Connections
i2c_sda = 21
i2c_scl = 22
lcd_addr = 0x27
i2c = I2C(0, sda=Pin(i2c_sda), scl=Pin(i2c_scl), freq=400000)
lcd = I2cLcd(i2c, lcd_addr, 2, 16)
# manual_icon = bytearray([0x11,0x1B,0x15,0x11,0x11,0x11,0x11,0x11])
# auto_icon = bytearray([0x1F,0x11,0x11,0x1F,0x1F,0x11,0x11,0x11])
# lcd.custom_char(1, auto_icon)
# Mux control pins
s0_pin = Pin(14, Pin.OUT)
s1_pin = Pin(27, Pin.OUT)
s2_pin = Pin(26, Pin.OUT)
s3_pin = Pin(25, Pin.OUT)
en_pin = Pin(12, Pin.OUT)
SIG_pin = 33
pump1_pin = Pin(15, Pin.OUT) # Pump1 is in tank1
pump2_pin = Pin(2, Pin.OUT) # Pump2 is in tank2
nfb_pump_pin = Pin(0, Pin.OUT) #nfb_pump is non-feedback pump which is in nfb_source
sys_mode = Pin(32, Pin.IN, Pin.PULL_DOWN)
current_mode = 0 # Variable to store the current mode (0 for auto, 1 for manual)
last_mode = 0
## User defined Constraints ##
tank_capacity = [1000,2000] # in Litres
source_capacity = [10000,5000] # in Litres
noise_filter = 15 # Filter value to remove wire-resistance and voltage related distortions
sampling_rate = 1500 # in milli-seconds
sample_sizing_time = 7000 # in milli-seconds
tank_selection = [1, 1] # Number of active tanks out of 2 tanks (max. of 2 tanks are supported)
source_selection = [1, 1, 0] # Number of active sources out of 3 sources (max. of 3 sources are supported). 0 denotes not connected, 1 denotes connected. Configuration in manner: [source1, source2, nfb_source]
pump_rated_power = [1.1, 1, 0] # in kW
source_priority = [1, 2, 65535] # Source priority for pumping water in numerical order of this arrangement: [source1_level, source2_level, nfb_source]. if [1,2,3] is assigned then first priority is given to source1_level and next priority is given to source2_level and last priority is given to nfb_source. nfb_source is non-feedback source (eg, borewell). If [0,0,0] then no priority is set. If 65535, then pump is not available.
source_mapping = [1, 2, 1] # Source to tank mapping for auto cycle. source mapping is in order of tank selection.
pump_prn_time = [300,0,0] # in seconds. If the water level is not changed after pump started and time has crossed "pump_prn_time" threshold, pump is disabled
# profile = [profile_no, name, source_demand, pumping_time, sleep_time, cycle_duration]
adc = ADC(Pin(SIG_pin))
def setup():
lcd.clear()
lcd.move_to(4,0)
lcd.putstr("Wayotek")
lcd.move_to(2,1)
lcd.putstr("Gemini Mini")
sleep_ms(2000)
lcd.clear()
lcd.putstr("System Initializing")
# Initializing Inputs and Outputs as Zero
s0_pin.value(0)
s1_pin.value(0)
s2_pin.value(0)
s3_pin.value(0)
pump1_pin.value(0)
pump2_pin.value(0)
nfb_pump_pin.value(0)
def read_mux(channel):
control_pins = [s0_pin, s1_pin, s2_pin, s3_pin]
mux_channel = [
[1, 0, 0, 0], # channel 0
[0, 1, 0, 0], # channel 1
[1, 1, 0, 0], # channel 2
[0, 0, 1, 0], # channel 3
[1, 0, 1, 0], # channel 4
[0, 1, 1, 0], # channel 5
[1, 1, 1, 0], # channel 6
[0, 0, 0, 1], # channel 7
[1, 0, 0, 1], # channel 8
[0, 1, 0, 1], # channel 9
[1, 1, 0, 1], # channel 10
[0, 0, 1, 1], # channel 11
[1, 0, 1, 1], # channel 12
[0, 1, 1, 1], # channel 13
[1, 1, 1, 1], # channel 14
[0, 0, 0, 0], # channel 15
]
for k in range(0, 4):
control_pins[k].value(mux_channel[channel][k])
val = adc.read()
filter_val = val > noise_filter
return channel, filter_val
def calculate_water_level(averages):
total = len(averages)
filled = averages.count(1)
water_level = int((filled / total) * 100)
return water_level
def print_lcd(message):
lcd.clear()
lcd.putstr(message)
def tank_values(averages):
if tank_selection[0] == 1:
tank1_level = calculate_water_level(averages[0:4])
else:
tank1_level = 65535
if tank_selection[1] == 1:
tank2_level = calculate_water_level(averages[4:8])
else:
tank2_level = 65535
if source_selection[0] == 1:
source1_level = calculate_water_level(averages[8:12])
else:
source1_level = 65535
if source_selection[1] == 1:
source2_level = calculate_water_level(averages[12:16])
else:
source2_level = 65535
water_level = [tank1_level, tank2_level, source1_level, source2_level]
print(water_level)
# Call pump_demand function with water_level array
pump_demand(water_level)
def buttons():
pressed_time = 0
released_time = 0
time_pressed = 0
last_state = 0
exit_loop = False
if sys_mode.value() == 1:
while not exit_loop:
value = sys_mode.value()
if last_state != value:
last_state = value
if value == 1:
pressed_time = ticks_ms()
lcd.clear()
lcd.putstr("pressed")
elif value == 0:
released_time = ticks_ms()
# print("released")
time_pressed = ticks_diff(released_time, pressed_time)
lcd.clear()
lcd.putstr("{}s Pressed".format(time_pressed))
if 500 <= time_pressed <= 2000:
mode_change()
sleep_ms(50)
exit_loop = True
elif 2001 <= time_pressed <= 4000:
reset()
sleep_ms(50)
exit_loop = True
else:
exit_loop = True
sleep_ms(20)
def reset():
lcd.clear()
lcd.putstr("Reset Pressed")
sleep_ms(950)
lcd.clear()
s0_pin.value(0)
s1_pin.value(0)
s2_pin.value(0)
s3_pin.value(0)
pump1_pin.value(0)
pump2_pin.value(0)
nfb_pump_pin.value(0)
def mode_change():
lcd.clear()
lcd.putstr("Mode Changed")
global current_mode, last_mode
last_mode = current_mode
current_mode = 1 if last_mode == 0 else 0
sleep_ms(50)
def pump_demand(water_level):
tank1_level, tank2_level, source1_level, source2_level = water_level
if current_mode == 0:
lcd.clear()
# lcd.putstr(chr(1))
lcd.putstr("Automatic Mode")
auto_cycle(tank1_level, tank2_level, source1_level, source2_level)
# lcd.clear()
# lcd.putstr("T:{}, S1:{}, S2:{}".format(tank1_level, source1_level, source2_level))
elif current_mode == 1:
lcd.clear()
lcd.putstr("Manual Mode")
manual_cycle(tank1_level, tank2_level, source1_level, source2_level)
def auto_cycle(tank1_level, tank2_level, source1_level, source2_level):
# Defining probabilities of tank and source selection based on priority by the user
if source_priority[0] == 1:
# if ((source1_level/100)*source1_capacity) > (0.25*source1_capacity):
# print((source1_level/100)*source_capacity[0])
if tank1_level <= 25 and source_selection[0] == 1 and source_selection[1] == 1:
# print("type 1")
print_lcd("1")
pump1_pin.value(1)
if tank1_level >= 100 and source_selection[0] == 1 and source_selection[1] == 1:
pump1_pin.value(0)
if tank1_level <= 25 and source_selection[0] == 0 and source_selection[1] == 1:
print("3")
pump2_pin.value(1)
if tank1_level >= 100 and source_selection[0] == 0 and source_selection[1] == 1:
print("4")
pump2_pin.value(0)
if tank2_level <= 25 and source_selection[0] == 1 and source_selection[1] == 0:
print("5")
pump1_pin.value(1)
if tank2_level >= 100 and source_selection[0] == 1 and source_selection[1] == 0:
print("6")
pump1_pin.value(0)
if tank2_level <= 25 and source_selection[0] == 0 and source_selection[1] == 1:
print("7")
pump2_pin.value(1)
if tank2_level >= 100 and source_selection[0] == 0 and source_selection[1] == 1:
print("8")
pump2_pin.value(0)
else:
pump1_pin.value(0)
def manual_cycle(tank1_level, tank2_level, source1_level, source2_level):
pump1_pin.value(0)
# print(source1_level, source2_level)
def main():
setup()
# Store values for each channel
channel_values = [[] for _ in range(16)]
# Record the start time
start_time = ticks_ms()
while True:
buttons()
elapsed_time = ticks_ms() - start_time
if elapsed_time >= sample_sizing_time:
# Read all channels and accumulate values
for i in range(16):
channel, value = read_mux(i)
sleep_ms(10)
channel_values[channel].append(value)
# Calculate averages for each channel
averages = []
for j in range(16):
if len(channel_values[j]) > 0:
average = round(sum(channel_values[j]) / len(channel_values[j]))
averages.append(int(average))
else:
averages.append(None)
# Calculate values of tank and source based on configuration and derive the output
tank_values(averages)
# Clear old array storing averages
averages = []
# Reset variables
channel_values = [[] for _ in range(16)]
start_time = ticks_ms()
sleep_ms(sampling_rate)
if __name__ == "__main__":
main()
Loading
cd74hc4067
cd74hc4067