import time
import network
import urequests
from machine import Pin, ADC, I2C
from dht import DHT22
# Wi-Fi & ThingSpeak Constants
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASS = ""
THINGSPEAK_WRITE_API = "OTWLW2CRFDBQ6PFR"
# Initialize Sensors
dht = DHT22(Pin(15)) # DHT22 on GP15
rain_sensor = ADC(27) # Rain sensor on ADC27
i2c = I2C(0, scl=Pin(1), sda=Pin(0), freq=100000) # BMP280 on I2C (GP0, GP1)
# Connect to Wi-Fi
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASS)
timeout = 15
while not wlan.isconnected() and timeout > 0:
print("Connecting to Wi-Fi...")
time.sleep(1)
timeout -= 1
if wlan.isconnected():
print("Connected to Wi-Fi, IP:", wlan.ifconfig()[0])
else:
print("Failed to connect to Wi-Fi.")
# Send Telegram Alert
def send_telegram_alert(message):
TELEGRAM_BOT_TOKEN = "7836110854:AAHjruURwDdafFmZ0ZlVtqRMtthNFb80OH8"
TELEGRAM_CHAT_ID = "1353742706"
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage?chat_id={TELEGRAM_CHAT_ID}&text={message}"
try:
response = urequests.get(url)
print("Telegram Alert Sent:", response.text)
response.close()
except Exception as e:
print("Error sending Telegram alert:", e)
# Read BMP280 Pressure & Temperature from I2C
def read_bmp280():
try:
# BMP280 register address for pressure and temperature
bmp280_address = 0x76 # Default address for BMP280
i2c.writeto(bmp280_address, b'\xF7') # Request data from BMP280
# Read the 6-byte data (2 bytes for pressure, 2 bytes for temperature, and 2 bytes for humidity)
data = i2c.readfrom(bmp280_address, 6)
# Process the pressure data
raw_press = (data[0] << 8) | data[1] # Combine high and low byte to get raw pressure
# Convert raw pressure to hPa (simulating a conversion)
pressure = raw_press / 256.0 # Conversion factor for simulation
print(f"Pressure Read: {pressure:.2f} hPa")
return pressure
except Exception as e:
print("Error reading BMP280:", e)
return None
# Simulate Rain Sensor Data (For Testing)
def read_rain_sensor():
rain_value = random.uniform(0, 100) # Simulated rain value between 0 and 100
return rain_value
# Main Loop
connect_wifi()
while True:
try:
# Read DHT22 data
dht.measure()
temperature = dht.temperature()
humidity = dht.humidity()
# Read pressure from BMP280 sensor
pressure = read_bmp280()
# Read rain sensor data (simulated for testing)
rain_level = read_rain_sensor()
# Print data to console
print(f"Temperature: {temperature:.1f} °C")
print(f"Humidity: {humidity:.1f} %")
print(f"Pressure: {pressure:.2f} hPa")
print(f"Rain Level: {rain_level:.2f}%")
# Send data to ThingSpeak
url = f"https://api.thingspeak.com/update?api_key={THINGSPEAK_WRITE_API}&field1={temperature}&field2={humidity}&field3={pressure}&field4={rain_level}"
response = urequests.get(url)
print("ThingSpeak Response:", response.text)
response.close()
# Send Telegram alert if any value goes out of expected range
if temperature > 35:
send_telegram_alert(f"High temperature alert: {temperature} °C")
send_telegram_alert(f"Atmospheric Pressure is: {pressure} hPa")
elif temperature < 0:
send_telegram_alert(f"Low temperature alert: {temperature} °C")
send_telegram_alert(f"Atmospheric Pressure is: {pressure} hPa")
if humidity > 40:
send_telegram_alert(f"High humidity alert: {humidity}%")
# Alert for rain
if rain_level > 70:
print("Heavy rain detected!")
send_telegram_alert(f"Heavy rain detected! Rain Level: {rain_level:.2f}%")
time.sleep(10) # Update every 10 seconds
except Exception as e:
print("Error:", e)
send_telegram_alert(f"Error occurred: {str(e)}") # Send alert on error
time.sleep(10) # Retry after 10 seconds