# ======================
# CONFIGURATION SETTINGS
# ======================
STUDENT_NAME = "Alejandra Gonzalez"
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
SIMULATE_DHT = True # True for Wokwi simulation
# ======================
# HARDWARE CONFIGURATION
# ======================
import machine
import network
import time
import socket
import dht
import uasyncio as asyncio
import json
from machine import I2C, Pin
import sys
import urandom # For random value generation
# HC-SR04 Ultrasonic Distance Sensor
ultrasonic_trig = machine.Pin(0, machine.Pin.OUT)
ultrasonic_echo = machine.Pin(1, machine.Pin.IN)
# LED Configuration
tank_led = machine.Pin(13, machine.Pin.OUT) # Red LED for tank level
pump_led = machine.Pin(14, machine.Pin.OUT) # Green LED for pump simulation
# RGB LED (Common Cathode)
rgb_red = machine.PWM(machine.Pin(10))
rgb_green = machine.PWM(machine.Pin(11))
rgb_blue = machine.PWM(machine.Pin(12))
rgb_red.freq(1000)
rgb_green.freq(1000)
rgb_blue.freq(1000)
# LCD Configuration (I2C)
i2c = I2C(0, sda=Pin(16), scl=Pin(17), freq=400000)
LCD_ADDR = 0x27
LCD_ROWS = 2
LCD_COLS = 16
# ==================
# GLOBAL VARIABLES
# ==================
temperature = 0.0
humidity = 0.0
distance = 0.0
pump_status = False
tank_status = "OK"
clients = []
# ==================
# LCD FUNCTIONS
# ==================
def lcd_init():
"""Initialize the LCD display"""
try:
lcd_byte(0x03)
lcd_byte(0x03)
lcd_byte(0x03)
lcd_byte(0x02)
lcd_byte(0x28)
lcd_byte(0x0C)
lcd_byte(0x06)
lcd_byte(0x01)
time.sleep_ms(5)
return True
except:
return False
def lcd_byte(bits, char_mode=False):
"""Send byte to LCD"""
try:
bits_high = char_mode | (bits & 0xF0) | 0x08
bits_low = char_mode | ((bits << 4) & 0xF0) | 0x08
i2c.writeto(LCD_ADDR, bytes([bits_high]))
time.sleep_us(1)
i2c.writeto(LCD_ADDR, bytes([bits_high | 0x04]))
time.sleep_us(1)
i2c.writeto(LCD_ADDR, bytes([bits_high & ~0x04]))
time.sleep_us(100)
i2c.writeto(LCD_ADDR, bytes([bits_low]))
time.sleep_us(1)
i2c.writeto(LCD_ADDR, bytes([bits_low | 0x04]))
time.sleep_us(1)
i2c.writeto(LCD_ADDR, bytes([bits_low & ~0x04]))
time.sleep_us(100)
return True
except:
return False
def lcd_string(message, line):
"""Write string to LCD"""
try:
message = message.ljust(LCD_COLS, " ")
lcd_byte(line)
for char in message:
lcd_byte(ord(char), True)
return True
except:
return False
def update_lcd():
"""Update LCD display with sensor data"""
line1 = f"T:{temperature:.1f}C H:{humidity:.1f}%"
pump_symbol = "P:ON!" if pump_status else "P:OFF"
status_symbol = "LOW!" if tank_status == "LOW" else "OK "
line2 = f"D:{distance:.1f}cm {status_symbol} {pump_symbol}"
lcd_string(line1, 0x80)
lcd_string(line2, 0xC0)
# ==================
# DISPLAY FUNCTIONS
# ==================
def print_status():
"""Display system status on serial console"""
print("\n===== SYSTEM STATUS =====")
print(f"Temperature: {temperature:.1f}°C")
print(f"Humidity: {humidity:.1f}%")
print(f"Distance: {distance:.1f}cm")
print(f"Tank Status: {tank_status}")
print(f"Pump Status: {'ON' if pump_status else 'OFF'}")
print("RGB Color: ", end="")
if temperature < 20:
print("Blue (Cold)")
elif 20 <= temperature <= 30:
print("Green (Moderate)")
else:
print("Red (Hot)")
print("========================")
# ==================
# SENSOR FUNCTIONS
# ==================
def read_dht_sensor():
"""Read temperature and humidity from DHT22 or simulate values"""
global temperature, humidity
if SIMULATE_DHT:
# Generate random values for testing
temperature = urandom.uniform(15, 40) # 15-40°C range
humidity = urandom.uniform(20, 90) # 20-90% range
return True
try:
dht_sensor.measure()
temperature = dht_sensor.temperature()
humidity = dht_sensor.humidity()
return True
except Exception as e:
print("DHT Error:", e)
return False
def read_distance():
"""Read distance from ultrasonic sensor"""
global distance, tank_status
if SIMULATE_DHT:
# Generate random distance with 30% chance of low level
distance = urandom.uniform(5, 50)
if urandom.getrandbits(1) and urandom.getrandbits(1): # 30% probability
distance = urandom.uniform(2, 9) # Low level
return True
try:
# Send trigger pulse
ultrasonic_trig.low()
time.sleep_us(2)
ultrasonic_trig.high()
time.sleep_us(10)
ultrasonic_trig.low()
# Measure echo pulse duration
pulse_time = machine.time_pulse_us(ultrasonic_echo, 1, 30000)
# Calculate distance (speed of sound = 343 m/s = 0.0343 cm/us)
distance = (pulse_time * 0.0343) / 2
# Handle invalid readings
if distance <= 0:
distance = 0.0
return True
except Exception as e:
print("Ultrasonic Error:", e)
distance = 0.0
return False
def update_tank_status():
"""Update tank status based on distance"""
global tank_status, pump_status
if distance < 10:
tank_status = "LOW"
tank_led.on()
# Automatically activate pump if level is low
if not pump_status:
pump_status = True
pump_led.on()
print("Pump activated automatically!")
else:
tank_status = "OK"
tank_led.off()
def set_rgb_color():
"""Set RGB LED color based on temperature"""
if temperature < 20: # Cold: Blue
rgb_red.duty_u16(0)
rgb_green.duty_u16(0)
rgb_blue.duty_u16(65535)
elif 20 <= temperature <= 30: # Moderate: Green
rgb_red.duty_u16(0)
rgb_green.duty_u16(65535)
rgb_blue.duty_u16(0)
else: # Hot: Red
rgb_red.duty_u16(65535)
rgb_green.duty_u16(0)
rgb_blue.duty_u16(0)
# ==================
# NETWORK FUNCTIONS
# ==================
def connect_wifi():
"""Connect to WiFi network"""
print("Connecting to WiFi...")
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
max_wait = 10
while max_wait > 0:
if wlan.status() < 0 or wlan.status() >= 3:
break
max_wait -= 1
print('Connecting...')
time.sleep(1)
if wlan.status() != 3:
print("WiFi Failed")
raise RuntimeError('Network connection failed')
else:
ip_address = wlan.ifconfig()[0]
print('Connected. IP:', ip_address)
time.sleep(1)
return ip_address, wlan
# ==================
# WEB SERVER
# ==================
html = """<!DOCTYPE html>
<html>
<head>
<title>IoT Tank Monitoring</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<style>
/* (Keep the same CSS styles from your original code) */
</style>
<script>
/* (Keep the same JavaScript from your original code) */
</script>
</head>
<body>
<div class="container">
<h1>IoT Tank Monitoring System</h1>
<div class="student-name">Student: """ + STUDENT_NAME + """</div>
<!-- (Keep the same HTML structure from your original code) -->
</div>
</body>
</html>
"""
async def handle_client(reader, writer):
"""Handle incoming web client requests"""
global clients, pump_status
clients.append(writer)
request = await reader.read(1024)
request = request.decode()
try:
if "GET / " in request or "GET /index.html" in request:
writer.write("HTTP/1.0 200 OK\r\nContent-type: text/html\r\n\r\n")
writer.write(html)
elif "GET /sensor-data" in request:
data = {
"temperature": temperature,
"humidity": humidity,
"distance": distance,
"pump_status": pump_status
}
writer.write("HTTP/1.0 200 OK\r\nContent-type: application/json\r\n\r\n")
writer.write(json.dumps(data))
elif "GET /pump-status" in request:
writer.write("HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n")
writer.write("ON" if pump_status else "OFF")
elif "GET /toggle" in request:
pump_status = not pump_status
pump_led.value(pump_status)
print("Pump " + ("activated" if pump_status else "deactivated") + " manually")
writer.write("HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n")
writer.write("Pump toggled")
else:
writer.write("HTTP/1.0 404 Not Found\r\n\r\n")
except Exception as e:
print("Client error:", e)
finally:
try:
writer.close()
except:
pass
if writer in clients:
clients.remove(writer)
async def sensor_loop():
"""Main sensor reading and control loop"""
# Initialize LCD
if lcd_init():
lcd_string("System Starting", 0x80)
lcd_string(STUDENT_NAME, 0xC0)
time.sleep(2)
while True:
# Read all sensors
read_dht_sensor()
read_distance()
# Update system status
update_tank_status()
set_rgb_color()
# Update LCD display
if LCD_ADDR in i2c.scan():
update_lcd()
# Print status to console
print_status()
# Sleep before next reading
await asyncio.sleep(2)
async def main():
"""Main application function"""
# Initialization sequence
print(f"\n{'='*40}")
print(f"IoT Tank Monitor by {STUDENT_NAME}")
print(f"{'='*40}")
# Initial pump state
pump_led.off()
# Connect to WiFi
try:
ip_address, wlan = connect_wifi()
print("Web server IP:", ip_address)
except Exception as e:
print("WiFi Error:", e)
wlan = None
print("Using offline mode")
# Start sensor loop
asyncio.create_task(sensor_loop())
# Start web server
server = asyncio.start_server(handle_client, "0.0.0.0", 80)
asyncio.create_task(server)
print("Web server started")
# System ready
print("System Ready! Monitoring...")
# Keep running
while True:
await asyncio.sleep(1)
# ==================
# APPLICATION START
# ==================
try:
# Run the main function
asyncio.run(main())
except KeyboardInterrupt:
print("Program stopped by user")
print("System Offline")
lcd_string("System Offline ", 0x80)
lcd_string(" Bye! ", 0xC0)
except Exception as e:
print("Critical error:", e)
print("System Error - Restarting...")
time.sleep(5)
machine.reset()