"""
Smart Catfish Tank IoT Model (MicroPython Pico W / Wokwi)
- ThingsBoard telemetry via HTTP POST
- Monitors: Temperature, pH, Water Level, Turbidity, DO, NH3
- Controls: Heater, Aerator, Fish Feeder (auto)
"""
import network
import time
import urandom
import ujson
import urequests
from machine import Pin, PWM, I2C
from pico_i2c_lcd import I2cLcd
# ---------------- WiFi ----------------
SSID = "Wokwi-GUEST"
PASSWORD = ""
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
wifi.connect(SSID, PASSWORD)
print("Connecting to WiFi...", end="")
retry = 0
while not wifi.isconnected() and retry < 200:
print(".", end="")
time.sleep(0.2)
retry += 1
if not wifi.isconnected():
raise SystemExit("Failed to connect WiFi")
print("\nWiFi connected:", wifi.ifconfig()[0])
# ---------------- ThingsBoard ----------------
THINGSBOARD_HTTP = "https://eu.thingsboard.cloud/api/v1/1kx2y89ft7t0rgjuo57q/telemetry"
def send_to_thingsboard(temperature, ph_value, water_level, turbidity, do, nh3):
telemetry = {
"temperature": temperature,
"pH": ph_value,
"water_level": water_level,
"turbidity": turbidity,
"DO": do,
"NH3": nh3
}
try:
response = urequests.post(
THINGSBOARD_HTTP,
data=ujson.dumps(telemetry),
headers={"Content-Type": "application/json"}
)
print("ThingsBoard status:", response.status_code)
response.close()
except Exception as e:
print("Error sending telemetry:", e)
# ---------------- Actuators ----------------
fish_feeder = PWM(Pin(17))
heater_pump = PWM(Pin(14))
fish_feeder.freq(50)
heater_pump.freq(50)
aerator = Pin(13, Pin.OUT)
# ---------------- LCD ----------------
i2c = I2C(0, sda=Pin(4), scl=Pin(5))
lcd = I2cLcd(i2c, 0x27, 2, 20)
previous_water_level = None
last_feed_time = time.time()
# ---------------- Utility Functions ----------------
def get_random_float(min_val, max_val):
return min_val + (urandom.getrandbits(16)/65535)*(max_val - min_val)
def feed_fish():
print("Feeding Fish!")
lcd.clear()
lcd.putstr("Feeding Fish!")
fish_feeder.duty_u16(4915)
time.sleep(0.5)
fish_feeder.duty_u16(1638)
lcd.clear()
def control_heater(temp):
if temp < 26:
heater_pump.duty_u16(4915)
lcd.clear()
lcd.putstr("Heater On")
else:
heater_pump.duty_u16(1638)
lcd.clear()
lcd.putstr("Heater Off")
time.sleep(0.2)
lcd.clear()
def control_aerator(do_level):
aerator.value(1 if do_level < 5.0 else 0)
def detect_leakage(water_level):
global previous_water_level
if previous_water_level is not None and water_level < previous_water_level:
print("Water Leakage Detected!")
lcd.clear()
lcd.putstr("Leakage Detected!")
time.sleep(0.5)
lcd.clear()
previous_water_level = water_level
# ---------------- Main Loop ----------------
while True:
# Simulated sensor readings
temperature = get_random_float(26.0, 30.0)
ph_value = get_random_float(6.5, 8.5)
turbidity = get_random_float(0.0, 50.0)
water_level = get_random_float(0.0, 150.0)
do = get_random_float(5.0, 15.0)
nh3_ppm = get_random_float(0.0, 0.2)
# Send telemetry to ThingsBoard
send_to_thingsboard(temperature, ph_value, water_level, turbidity, do, nh3_ppm)
# LCD display page 1
lcd.clear()
lcd.putstr(f"T:{temperature:.1f}C pH:{ph_value:.1f}")
lcd.move_to(0, 1)
lcd.putstr(f"H2O:{water_level:.1f} DO:{do:.1f}")
time.sleep(0.5)
# LCD display page 2
lcd.clear()
lcd.putstr(f"Turb:{turbidity:.1f} NH3:{nh3_ppm:.2f}")
time.sleep(0.5)
# Leak detection
if water_level < 80:
detect_leakage(water_level)
# Auto feeding every 60 seconds
current_time = time.time()
if current_time - last_feed_time >= 60:
feed_fish()
last_feed_time = current_time
# Heater and aerator control
control_heater(temperature)
control_aerator(do)
# Main loop delay (safe for cloud)
time.sleep(3)