import time
import network
import urequests
import machine
from machine import Pin, ADC
import dht
import sys
import json
# Configuration
CONFIG = {
'WIFI_SSID': 'Wokwi-GUEST',
'WIFI_PASSWORD': '',
'API_V3_KEY': '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',
'SENDER_EMAIL': "[email protected]",
'RECIPIENT_EMAIL': "[email protected]",
'ADDRESS': "Ambernath east(421501)",
'PINS': {
'SOIL_MOISTURE': 34,
'DHT_PIN': 15,
'LDR_PIN': 35
},
'EMAIL_DELAY': 300,
'THRESHOLDS': {
'SOIL_MOISTURE': 1500,
'TEMPERATURE': 30,
'HUMIDITY': 30,
'LIGHT_INTENSITY': 1000
}
}
class EnvironmentalMonitor:
def __init__(self):
# Initialize sensors
self.soil_moisture_sensor = ADC(Pin(CONFIG['PINS']['SOIL_MOISTURE']))
self.soil_moisture_sensor.atten(ADC.ATTN_11DB)
self.dht_sensor = dht.DHT22(Pin(CONFIG['PINS']['DHT_PIN']))
self.ldr_sensor = ADC(Pin(CONFIG['PINS']['LDR_PIN']))
self.ldr_sensor.atten(ADC.ATTN_11DB)
# Email tracking
self.last_email_time = 0
self.last_email_type = None
def connect_wifi(self, max_attempts=5):
"""Robust WiFi connection with multiple attempts."""
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
for attempt in range(max_attempts):
try:
print(f"WiFi Connection Attempt {attempt + 1}")
sta_if.connect(CONFIG['WIFI_SSID'], CONFIG['WIFI_PASSWORD'])
start_time = time.time()
while not sta_if.isconnected():
if time.time() - start_time > 10:
print(f"Connection attempt {attempt + 1} failed.")
break
time.sleep(1)
if sta_if.isconnected():
print(f"WiFi Connected. IP: {sta_if.ifconfig()[0]}")
return True
except Exception as e:
print(f"WiFi Connection Error: {e}")
time.sleep(2)
print("Failed to connect to WiFi after multiple attempts.")
return False
def send_email_alert(self, subject, message):
url = "https://api.brevo.com/v3/smtp/email"
headers = {
'accept': 'application/json',
'api-key': CONFIG['API_V3_KEY'],
'content-type': 'application/json'
}
current_time = time.localtime()
formatted_time = f"{current_time[0]}-{current_time[1]:02d}-{current_time[2]:02d} {current_time[3]:02d}:{current_time[4]:02d}:{current_time[5]:02d}"
html_content = f"""
<html>
<body>
<h1>Environmental Alert</h1>
<p><strong>Location:</strong> {CONFIG['ADDRESS']}</p>
<p>{message}</p>
<p><em>Sent at: {formatted_time}</em></p>
</body>
</html>
"""
payload = {
"sender": {"email": CONFIG['SENDER_EMAIL'], "name": "Environmental Monitor"},
"to": [{"email": CONFIG['RECIPIENT_EMAIL'], "name": "Swapnil Sonawane"}],
"subject": str(subject),
"htmlContent": html_content
}
try:
json_payload = json.dumps(payload)
print("JSON Payload:", json_payload)
print("Attempting to send email...")
response = urequests.post(
url,
data=json_payload,
headers=headers
)
print(f"Response Status Code: {response.status_code}")
print(f"Response Headers: {response.headers}")
if response.status_code == 201:
print("Email sent successfully!")
response_text = response.text
print(f"Response Text: {response_text}")
response.close()
return True
else:
print(f"Email sending failed. Status code: {response.status_code}")
print(f"Response body: {response.text}")
response.close()
return False
except Exception as e:
print(f"Critical error in email sending: {e}")
sys.print_exception(e)
return False
def should_send_email(self, email_type):
current_time = time.time()
time_elapsed = current_time - self.last_email_time
if time_elapsed >= CONFIG['EMAIL_DELAY']:
self.last_email_type = None
if (time_elapsed >= CONFIG['EMAIL_DELAY']) or (email_type != self.last_email_type):
return True
return False
def measure_soil_moisture(self):
try:
value = self.soil_moisture_sensor.read()
print(f"Soil Moisture Level: {value}")
return value
except Exception as e:
print(f"Soil Moisture Sensor Error: {e}")
return None
def measure_temperature_humidity(self):
try:
self.dht_sensor.measure()
temperature = self.dht_sensor.temperature()
humidity = self.dht_sensor.humidity()
print(f"Temperature: {temperature}°C, Humidity: {humidity}%")
return temperature, humidity
except Exception as e:
print(f"DHT Sensor Error: {e}")
sys.print_exception(e)
return None, None
def measure_light_intensity(self):
try:
value = self.ldr_sensor.read()
print(f"Light Intensity: {value}")
return value
except Exception as e:
print(f"Light Sensor Error: {e}")
return None
def monitor_environment(self):
soil_moisture = self.measure_soil_moisture()
temperature, humidity = self.measure_temperature_humidity()
light_intensity = self.measure_light_intensity()
# Soil Moisture Alert
if soil_moisture is not None and soil_moisture < CONFIG['THRESHOLDS']['SOIL_MOISTURE']:
if self.should_send_email('soil_moisture'):
if self.send_email_alert("Soil Moisture Low", "The soil moisture is critically low. Please water the plants immediately."):
self.last_email_time = time.time()
self.last_email_type = 'soil_moisture'
# Temperature Alert
if temperature is not None and temperature > CONFIG['THRESHOLDS']['TEMPERATURE']:
if self.should_send_email('temperature'):
if self.send_email_alert("High Temperature Alert", f"Warning: Temperature is {temperature}°C, which is above the safe threshold."):
self.last_email_time = time.time()
self.last_email_type = 'temperature'
# Humidity Alert
if humidity is not None and humidity < CONFIG['THRESHOLDS']['HUMIDITY']:
if self.should_send_email('humidity'):
if self.send_email_alert("Low Humidity Warning", f"Humidity has dropped to {humidity}%, which might affect plant health."):
self.last_email_time = time.time()
self.last_email_type = 'humidity'
# Light Intensity Alert
if light_intensity is not None and light_intensity < CONFIG['THRESHOLDS']['LIGHT_INTENSITY']:
if self.should_send_email('light_intensity'):
if self.send_email_alert("Low Light Condition", f"Light intensity is {light_intensity}, which might be insufficient for plant growth."):
self.last_email_time = time.time()
self.last_email_type = 'light_intensity'
def run(self):
# WiFi Connection
if not self.connect_wifi():
print("Persistent WiFi connection failure. Rebooting...")
machine.reset()
print("Environmental Monitoring Initiated...")
while True:
try:
self.monitor_environment()
time.sleep(10)
except Exception as e:
print("Critical error in monitoring loop:")
sys.print_exception(e)
time.sleep(5)
try:
machine.reset()
except:
pass
# Main Execution
try:
monitor = EnvironmentalMonitor()
monitor.run()
except Exception as e:
print("Unhandled Exception in Main Execution:")
sys.print_exception(e)
machine.reset()