from machine import Pin, ADC, I2C, PWM, time_pulse_us
import time
import dht
import math
from ssd1306 import SSD1306_I2C
from mpu6050 import MPU6050
# ── Pins ─────────────────────────────────────
TRIG = Pin(5, Pin.OUT)
ECHO = Pin(18, Pin.IN)
pot = ADC(Pin(34))
pot.atten(ADC.ATTN_11DB)
dht_sensor = dht.DHT22(Pin(4))
button = Pin(12, Pin.IN, Pin.PULL_UP)
red_led = Pin(26, Pin.OUT)
green_led = Pin(27, Pin.OUT)
# ── Buzzer via PWM (replaces Pin) ────────────
buzzer = PWM(Pin(25))
buzzer.duty(0) # start silent
# ── Servo ────────────────────────────────────
servo = PWM(Pin(14), freq=50)
# ── I2C (OLED + MPU6050) ─────────────────────
i2c = I2C(0, scl=Pin(22), sda=Pin(21))
oled = SSD1306_I2C(128, 64, i2c)
mpu = MPU6050(i2c)
# ── Thresholds ───────────────────────────────
LEVEL_SAFE = 80
LEVEL_WATCH = 50
LEVEL_DANGER = 30
RAIN_WATCH = 1500
RAIN_HEAVY = 3000
# ── Vibration detection ───────────────────────
WINDOW_SIZE = 5
VIB_THRESHOLD = 1.5
vibration_avg = 0.0
def get_vibration():
ax = mpu.accel.x
ay = mpu.accel.y
az = mpu.accel.z - 1.0 # remove gravity
return math.sqrt(ax*ax + ay*ay + az*az)
def moving_average(new, old):
return (old * (WINDOW_SIZE - 1) + new) / WINDOW_SIZE
def vibration_detected():
global vibration_avg
current = get_vibration()
vibration_avg = moving_average(current, vibration_avg)
return vibration_avg > VIB_THRESHOLD
# ── Servo helper ──────────────────────────────
def set_servo(angle):
duty = int((angle / 180) * 75 + 40)
servo.duty(duty)
def set_outputs(red, green, angle):
red_led.value(1 if red else 0)
green_led.value(1 if green else 0)
set_servo(angle)
# ── OLED helper ───────────────────────────────
def show_oled(line1, line2, line3=""):
oled.fill(0)
oled.text(line1, 0, 0)
oled.text(line2, 0, 20)
oled.text(line3, 0, 40)
oled.show()
# ── Buzzer patterns (PWM-based, graduated) ────
#
# SAFE → silent
# LIGHT → soft low-pitch single beep (500 Hz, low duty)
# WATCH → medium-pitch double beep (1000 Hz, mid duty)
# DANGER → high-pitch rapid triple (2000 Hz, high duty)
# SOS/VIB → full alarm continuous (3000 Hz, max duty)
#
def buzzer_beep(freq, duty, on_ms, off_ms, repeats=1):
"""Generic beep: sets freq+duty, waits on_ms, silences, waits off_ms."""
buzzer.freq(freq)
for _ in range(repeats):
buzzer.duty(duty)
time.sleep_ms(on_ms)
buzzer.duty(0)
time.sleep_ms(off_ms)
def buzzer_pattern(level):
if level == "LIGHT":
# Soft single beep — barely audible
buzzer_beep(freq=500, duty=128, on_ms=120, off_ms=0, repeats=1)
elif level == "WATCH":
# Medium double beep — noticeable warning
buzzer_beep(freq=1000, duty=300, on_ms=150, off_ms=100, repeats=2)
elif level == "DANGER":
# Loud rapid triple burst — urgent
buzzer_beep(freq=2000, duty=600, on_ms=100, off_ms=70, repeats=3)
elif level in ("SOS", "VIBRATION"):
# Full alarm — max volume continuous burst
buzzer_beep(freq=3000, duty=900, on_ms=100, off_ms=50, repeats=3)
else:
# Silent
buzzer.duty(0)
# ── Boot sequence ────────────────────────────
show_oled("Flood Monitor", "Starting...", "")
time.sleep(0.5)
set_outputs(True, False, 0)
time.sleep(0.3)
set_outputs(False, True, 0)
time.sleep(0.5)
show_oled("System Ready", "Monitoring...", "")
# ── Distance ─────────────────────────────────
def get_distance():
TRIG.off()
time.sleep_us(2)
TRIG.on()
time.sleep_us(10)
TRIG.off()
duration = time_pulse_us(ECHO, 1, 30000)
if duration < 0:
return 999 # no signal / safe default
distance = (duration * 0.034) / 2
return distance
# ── Button Toggle System ───────────────────
last_button_state = 1
btnSOS = False
# ── Main loop ─────────────────────────────────
show_weather = False
while True:
try:
dist = get_distance()
pot_val = pot.read()
dht_sensor.measure()
temp = dht_sensor.temperature()
hum = dht_sensor.humidity()
tilt = vibration_detected()
current_state = button.value()
if last_button_state == 1 and current_state == 0:
btnSOS = not btnSOS
time.sleep_ms(20)
last_button_state = current_state
rain_pct = int((pot_val / 4095) * 100)
print("Dist:{:.1f} Rain:{}% T:{}C H:{}% Vib:{}".format(
dist, rain_pct, temp, hum, tilt))
# ── Priority 1: Manual SOS ────────────
if btnSOS:
set_outputs(True, False, 90)
show_oled(" !!! SOS !!!", " EVACUATE NOW", "")
buzzer_pattern("SOS")
time.sleep(0.2)
continue
# ── Priority 2: Ground vibration ──────
if tilt:
set_outputs(True, False, 90)
show_oled(" LANDSLIDE!", " Vibration!", " Evacuate!")
buzzer_pattern("VIBRATION")
time.sleep(0.3)
continue
# ── Priority 3: Flood danger ──────────
if 0 < dist < LEVEL_DANGER:
set_outputs(True, False, 90)
show_oled("FLOOD DANGER!", "Water:{:.1f}cm".format(dist), "Evacuate!")
buzzer_pattern("DANGER")
time.sleep(0.3)
continue
rising = (0 < dist < LEVEL_SAFE)
heavy_rain = pot_val > RAIN_HEAVY
light_rain = pot_val > RAIN_WATCH
# ── Priority 4: Watch / heavy rain ────
if rising or heavy_rain:
set_outputs(False, False, 45)
msg = " Heavy Rain" if heavy_rain else "Water rising"
show_oled(" WARNING", msg, " Water:{:.1f}cm".format(dist))
buzzer_pattern("WATCH")
time.sleep(0.7)
continue
# ── Priority 5: Light rain ────────────
if light_rain:
set_outputs(False, False, 20)
show_oled(
" Light Rain",
"rain:{}% ".format(rain_pct),
"T:{}C H:{}%".format(temp, hum)
)
buzzer_pattern("LIGHT")
time.sleep(0.8)
continue
# ── All clear ─────────────────────────
set_outputs(False, True, 0)
buzzer.duty(0)
if show_weather:
show_oled("Temp:{}C".format(temp),
"Humidity:{}%".format(hum),
"All Clear")
else:
show_oled("All Clear",
"Water:{:.1f}cm".format(dist), "")
show_weather = not show_weather
time.sleep(2)
except Exception as e:
print("Error:", e)
show_oled("SYSTEM ERROR", str(e)[:16], "")
buzzer.duty(0)
time.sleep(1)
Loading
esp32-devkit-c-v4
esp32-devkit-c-v4
Loading
ssd1306
ssd1306