import network
import machine
import time
import uasyncio as asyncio
import blynklib_mp # Make sure this library file is added to your Wokwi project
# ===== Blynk and WiFi Credentials =====
BLYNK_AUTH = "ipeKuBioKKvWjh1gWA3e9cUY6Ld3uCoT" # Replace with your Blynk Auth Token
WIFI_SSID = "Wokwi-GUEST" # Replace with your WiFi SSID
WIFI_PASS = "" # Replace with your WiFi Password
# Initialize Blynk
blynk = blynklib_mp.Blynk(BLYNK_AUTH)
# ===== Global Flags and Constants =====
gasLeakActive = False # Flag indicating an active gas leak
manualAlertTriggered = False # To ensure manual alert is sent only once
GAS_THRESHOLD = 600 # Adjust the ADC threshold as needed (max for ESP32 ADC ~4095)
DISTRESS_DELAY = 90 # 5 minutes (in seconds) before auto distress alert
HOME_GPS = "28.6139° N, 77.2090° E" # Replace with your actual GPS coordinates
# ===== WiFi Connection Setup =====
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print("Connecting to WiFi...")
wlan.connect(WIFI_SSID, WIFI_PASS)
while not wlan.isconnected():
time.sleep(1)
print("Connected to WiFi:", wlan.ifconfig())
# ===== Hardware Setup =====
# Gas sensor: Simulated on ADC pin GPIO34
gas_sensor_adc = machine.ADC(machine.Pin(34))
gas_sensor_adc.atten(machine.ADC.ATTN_11DB)
# Buzzer: Simulated with a buzzer (or LED with resistor) on GPIO25
buzzer = machine.Pin(25, machine.Pin.OUT)
buzzer.value(0)
# (Optional) Physical manual alert button on GPIO15 (for additional manual trigger)
manual_alert_button = machine.Pin(15, machine.Pin.IN, machine.Pin.PULL_UP)
def check_manual_alert():
# Active LOW: returns True if the button is pressed.
return manual_alert_button.value() == 0
# ===== Asynchronous Functions =====
# Auto distress alert if no manual alert within DISTRESS_DELAY seconds
async def auto_alert():
global manualAlertTriggered, gasLeakActive
await asyncio.sleep(DISTRESS_DELAY)
if gasLeakActive and (not manualAlertTriggered):
print("Auto distress alert triggered!")
blynk.notify("Auto Alert: No response to gas leak. Authorities alerted with GPS coordinates.")
blynk.email("[email protected]", "Auto Distress Alert",
"Auto Distress Alert: Gas leak detected with no response. GPS Coordinates: " + HOME_GPS)
# Using a virtual pin (e.g., V2) to send extra message (could be used by a Telegram integration)
blynk.virtual_write(2, "Auto Alert: Gas leak detected. GPS: " + HOME_GPS)
manualAlertTriggered = True
# Blynk event handler for manual distress alert via virtual button on V1
@blynk.handle_event("write V1")
def v1_write_handler(pin, values):
global manualAlertTriggered, gasLeakActive
# values is a list; if the button is pressed (value "1") and a leak is active…
if gasLeakActive and (not manualAlertTriggered) and int(values[0]) == 1:
print("Manual distress alert triggered via Blynk!")
blynk.notify("Manual Alert: Distress alert triggered. Sending GPS coordinates to authorities.")
blynk.email("[email protected]", "Manual Distress Alert",
"Manual Distress Alert: Gas leak detected. GPS Coordinates: " + HOME_GPS)
blynk.virtual_write(2, "Manual Alert: Gas leak detected. GPS: " + HOME_GPS)
manualAlertTriggered = True
# Continuously monitor the gas sensor
async def monitor_gas_sensor():
global gasLeakActive, manualAlertTriggered
while True:
sensor_value = gas_sensor_adc.read()
if sensor_value > GAS_THRESHOLD:
if not gasLeakActive:
gasLeakActive = True
manualAlertTriggered = False
print("Gas leak detected! Activating buzzer and sending notifications.")
buzzer.value(1) # Turn on buzzer
blynk.notify("Alert: Gas leak detected! Please take immediate action.")
blynk.email("[email protected]", "Gas Leak Alert",
"Gas leak detected at your home. Please take action immediately.")
blynk.virtual_write(2, "Alert: Gas leak detected! Check your home immediately.")
asyncio.create_task(auto_alert())
else:
if gasLeakActive:
print("Gas leak cleared. Resetting system.")
gasLeakActive = False
manualAlertTriggered = False
buzzer.value(0) # Turn off buzzer
await asyncio.sleep(1)
# (Optional) Monitor physical button for manual distress alert (if desired)
async def manual_alert_monitor():
global manualAlertTriggered, gasLeakActive
while True:
if gasLeakActive and not manualAlertTriggered:
if check_manual_alert():
manualAlertTriggered = True
print("Manual distress alert triggered via physical button!")
blynk.notify("Manual Alert (Button): Distress alert triggered. Sending GPS coordinates to authorities.")
blynk.email("[email protected]", "Manual Distress Alert",
"Manual Distress Alert: Gas leak detected. GPS Coordinates: " + HOME_GPS)
blynk.virtual_write(2, "Manual Alert (Button): Gas leak detected. GPS: " + HOME_GPS)
await asyncio.sleep(2) # Debounce delay
await asyncio.sleep(0.1)
# Task to run Blynk continuously in uasyncio loop
async def blynk_runner():
while True:
blynk.run() # Process incoming Blynk commands and maintain connection
await asyncio.sleep(0.1)
# Main async loop: Connect WiFi and run all tasks concurrently
async def main():
connect_wifi()
await asyncio.gather(
blynk_runner(),
monitor_gas_sensor(),
manual_alert_monitor()
)
# Run the async main loop
asyncio.run(main())