import machine
import time
from machine import Pin, ADC, PWM, I2C
from pico_i2c_lcd import I2cLcd
# ==========================================
# 1. HARDWARE CONFIGURATION & INITIALIZATION
# ==========================================
# --- HC-SR04 Ultrasonic Sensor Pins ---
TRIG = Pin(2, Pin.OUT)
ECHO = Pin(3, Pin.IN)
# --- Actuators (SG90 Servo Motors @ 50Hz) ---
servo_harvest = PWM(Pin(4))
servo_harvest.freq(50)
servo_drain = PWM(Pin(5))
servo_drain.freq(50)
# --- Analog Inputs ---
LDR = ADC(Pin(26))
# --- Telemetry Status LED ---
telemetry_led = Pin(15, Pin.OUT)
# --- Global State Variable for Blynk Override ---
blynk_override_state = 0 # 0 = Auto, 1 = Remote Override Active
# --- LCD Setup ---
i2c = I2C(0, sda=Pin(16), scl=Pin(17), freq=400000)
print("Scanning I2C...")
devices = i2c.scan()
print("Found:", [hex(d) for d in devices])
lcd = None
try:
lcd = I2cLcd(i2c, 0x27, 2, 16)
lcd.clear()
lcd.putstr("System Booting\nPlease Wait...")
print("LCD Initialized Successfully")
time.sleep(2)
except Exception as e:
print("LCD Error:", e)
# ==========================================
# 2. HARDWARE CONTROL FUNCTIONS
# ==========================================
def get_distance():
"""Get distance from HC-SR04 with timeout protection."""
TRIG.low()
time.sleep_us(2)
TRIG.high()
time.sleep_us(10)
TRIG.low()
timeout = time.ticks_us()
while ECHO.value() == 0:
if time.ticks_diff(time.ticks_us(), timeout) > 30000:
return 999
start = time.ticks_us()
while ECHO.value() == 1:
if time.ticks_diff(time.ticks_us(), start) > 30000:
return 999
end = time.ticks_us()
duration = time.ticks_diff(end, start)
distance = (duration * 0.0343) / 2
return distance
def set_servo_angle(servo, angle):
min_duty = 1800
max_duty = 8000
duty = int(min_duty + (angle / 180) * (max_duty - min_duty))
servo.duty_u16(duty)
# --- Blynk Virtual Pin Callback Layout ---
# When incorporating your Blynk connection library setup, link Virtual Pin V1
# to this state flag changer to capture remote dashboard clicks:
#
# @blynk.on("V1")
# def v1_write_handler(value):
# global blynk_override_state
# blynk_override_state = int(value[0])
# Initialize hardware states
set_servo_angle(servo_harvest, 0)
set_servo_angle(servo_drain, 0)
telemetry_led.low()
print("Hardware System Fully Online")
# ==========================================
# 3. MAIN SYSTEM LOOP
# ==========================================
while True:
# ---------- 1. INPUTS ----------
distance_cm = get_distance()
ldr_value = LDR.read_u16()
# ---------- 2. PROCESSING ----------
tank_percentage = max(0, min(100, int(((400 - distance_cm) / 400) * 100)))
tank_full = distance_cm < 5
water_is_clear = ldr_value > 30000
# ---------- 3. CONTROL LOGIC & OUTPUTS ----------
# PRIORITY 1 - BLYNK CLOUD OVERRIDE
if blynk_override_state == 1:
print("ALERT: Blynk Dashboard Override Activated")
set_servo_angle(servo_harvest, 0)
set_servo_angle(servo_drain, 180)
telemetry_led.high()
if lcd:
lcd.clear()
lcd.putstr("BLYNK REMOTE MODE\nSAFE DRAIN")
# PRIORITY 2 - TANK FULL
elif tank_full:
print("ALERT: Tank Full " + str(round(distance_cm, 1)) + "cm")
set_servo_angle(servo_harvest, 0)
set_servo_angle(servo_drain, 180)
telemetry_led.high()
if lcd:
lcd.clear()
lcd.putstr("TANK FULL 100%\nOVERFLOW MODE")
# PRIORITY 3 - NORMAL AUTOMATED OPERATION
else:
if water_is_clear:
print("LOG: Clear Water Detected")
set_servo_angle(servo_harvest, 180)
set_servo_angle(servo_drain, 0)
telemetry_led.low()
if lcd:
lcd.clear()
lcd.putstr("Tank:" + str(tank_percentage) + "%\n" + "Water:CLEAR")
else:
print("LOG: Turbid Water Detected")
set_servo_angle(servo_harvest, 0)
set_servo_angle(servo_drain, 180)
telemetry_led.high()
if lcd:
lcd.clear()
lcd.putstr("Tank:" + str(tank_percentage) + "%\n" + "Water:TURBID")
time.sleep(1)Loading
pi-pico-w
pi-pico-w