import time
import network
import urequests
from machine import Pin, ADC
from dht import DHT22
# Constants
THINGSPEAK_WRITE_API = "OTWLW2CRFDBQ6PFR" # Your ThingSpeak API Key
WIFI_SSID = "Wokwi-GUEST" # Wi-Fi SSID
WIFI_PASS = ""
# Sensor Setup
dht = DHT22(Pin(15)) # DHT22 connected to pin 15
pressure_sensor = ADC(Pin(28)) # Custom Pressure Sensor connected to pin 26
buzzer = Pin(14, Pin.OUT) # Buzzer connected to pin 14
# Telegram Alert Function
def send_telegram_alert(message):
TELEGRAM_BOT_TOKEN = "7836110854:AAHjruURwDdafFmZ0ZlVtqRMtthNFb80OH8" # Your Telegram bot token
TELEGRAM_CHAT_ID = "1353742706" # Your Telegram chat ID
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage?chat_id={TELEGRAM_CHAT_ID}&text={message}"
try:
response = urequests.get(url)
print("Telegram Alert Sent:", response.text)
response.close()
except Exception as e:
print("Error sending Telegram alert:", e)
# Connect to Wi-Fi
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASS)
timeout = 15 # 15 seconds timeout for Wi-Fi connection
while not wlan.isconnected() and timeout > 0:
print("Connecting to Wi-Fi...")
time.sleep(1)
timeout -= 1
if wlan.isconnected():
print("Connected to Wi-Fi")
print("IP Address:", wlan.ifconfig()[0])
else:
print("Failed to connect to Wi-Fi.")
send_telegram_alert("Wi-Fi connection failed") # Send alert if Wi-Fi fails
raise Exception("Wi-Fi connection failed")
def read_pressure():
# Read raw ADC value (0-65535)
raw_value = pressure_sensor.read_u16()
print(f"Raw ADC Value: {raw_value}")
# Ensure ADC is not reading 0 (which might indicate sensor issue)
if raw_value == 0:
print("Warning: Raw ADC value is 0. Check sensor connection or configuration.")
return None
# Convert the raw value to voltage (assuming a 3.3V system)
voltage = (raw_value / 65535) * 3.3
print(f"Voltage: {voltage:.2f}V")
# Convert the voltage to pressure (assuming 0V = 300 hPa, 3.3V = 1100 hPa)
min_voltage = 0.0 # 0V corresponds to 300 hPa
max_voltage = 3.3 # 3.3V corresponds to 1100 hPa
min_pressure = 950 # Minimum pressure value in hPa
max_pressure = 1050 # Maximum pressure value in hPa
# Linear mapping from voltage to pressure
pressure = ((voltage - min_voltage) / (max_voltage - min_voltage)) * (max_pressure - min_pressure) + min_pressure
return pressure
# Function to activate the buzzer for a short period
def activate_buzzer():
buzzer.on() # Turn on the buzzer
time.sleep(1) # Keep the buzzer on for 1 second
buzzer.off() # Turn off the buzzer
# Main loop
connect_wifi()
while True:
try:
# Read DHT22 data
dht.measure()
temperature = dht.temperature()
humidity = dht.humidity()
# Read pressure from the sensor
pressure = read_pressure()
# Print data to console
print(f"Temperature: {temperature:.1f} °C")
print(f"Humidity: {humidity:.1f} %")
print(f"Pressure: {pressure:.2f} hPa")
# Send data to ThingSpeak
url = f"https://api.thingspeak.com/update?api_key={THINGSPEAK_WRITE_API}&field1={temperature}&field2={humidity}&field3={pressure}"
response = urequests.get(url)
print("ThingSpeak Response:", response.text)
response.close()
# Send Telegram alert if any value goes out of expected range
if temperature > 35:
send_telegram_alert(f"High temperature alert: {temperature} °C")
send_telegram_alert(f"Atmospheric Pressure is: {pressure} hPa")
activate_buzzer() # Activate the buzzer for high temperature
elif temperature < 0:
send_telegram_alert(f"Low temperature alert: {temperature} °C")
send_telegram_alert(f"Atmospheric Pressure is: {pressure} hPa")
activate_buzzer() # Activate the buzzer for low temperature
if humidity > 40:
send_telegram_alert(f"High humidity alert: {humidity}%")
activate_buzzer() # Activate the buzzer for high humidity
time.sleep(10) # Update every 10 seconds
except Exception as e:
print("Error:", e)
send_telegram_alert(f"Error occurred: {str(e)}") # Send alert on error
time.sleep(10) # Retry after 10 seconds