import time
import network
import urequests
from machine import Pin,ADC
from dht import DHT22
import random
# Constants
THINGSPEAK_WRITE_API = "OTWLW2CRFDBQ6PFR" # Your ThingSpeak API Key
WIFI_SSID = "Wokwi-GUEST" # Wi-Fi SSID
WIFI_PASS = "" # Wi-Fi Password
# Pressure Sensor Threshold
PRESSURE_THRESHOLD = 500 # Pressure threshold (simulated)
MIN_VOLTAGE = 0.0 # 0V corresponds to 300 hPa
MAX_VOLTAGE = 3.3 # 3.3V corresponds to 1100 hPa
MIN_PRESSURE = 950 # Minimum pressure value in hPa
MAX_PRESSURE = 1050 # Maximum pressure value in hPa
pressurePin = ADC(Pin(26))
pressurePin = ADC(Pin(26)) # Using GPIO 26 for the ADC pin
# The ADC class in MicroPython typically requires you to set the width for readings:
#pressurePin.atten(ADC.ATTN_0DB) # Set the input attenuation to 0dB (0-1V range)
#pressurePin.width(ADC.WIDTH_12BIT) # Set the width to 12-bit resolution (0-4095)
# Sensor Setup
dht = DHT22(Pin(15)) # DHT22 connected to pin 15
buzzer = Pin(14, Pin.OUT) # Buzzer connected to pin 14
# Telegram Alert Function
def send_telegram_alert(message):
TELEGRAM_BOT_TOKEN = "7836110854:AAHjruURwDdafFmZ0ZlVtqRMtthNFb80OH8" # Your Telegram bot token
TELEGRAM_CHAT_ID = "1353742706" # Your Telegram chat ID
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage?chat_id={TELEGRAM_CHAT_ID}&text={message}"
try:
response = urequests.get(url)
print("Telegram Alert Sent:", response.text)
response.close()
except Exception as e:
print("Error sending Telegram alert:", e)
# Connect to Wi-Fi
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASS)
timeout = 15 # 15 seconds timeout for Wi-Fi connection
while not wlan.isconnected() and timeout > 0:
print("Connecting to Wi-Fi...")
time.sleep(1)
timeout -= 1
if wlan.isconnected():
print("Connected to Wi-Fi")
print("IP Address:", wlan.ifconfig()[0])
else:
print("Failed to connect to Wi-Fi.")
send_telegram_alert("Wi-Fi connection failed") # Send alert if Wi-Fi fails
raise Exception("Wi-Fi connection failed")
def get_pressure():
try:
# Read the analog value from the pressure sensor
pressure_value = pressurePin.read() # Read the analog value (0-1023 range)
# Convert the pressure value to hPa (based on your sensor characteristics)
pressure = (pressure_value / 1023) * (MAX_PRESSURE - MIN_PRESSURE) + MIN_PRESSURE
print(f"Pressure: {pressure:.2f} hPa")
# Detect if pressure exceeds threshold
if pressure > PRESSURE_THRESHOLD:
send_telegram_alert(f"Warning! High pressure detected: {pressure:.2f} hPa")
activate_buzzer()
except Exception as e:
print("Error reading pressure:", e)
send_telegram_alert(f"Error occurred while reading pressure: {str(e)}")
return None
return pressure
# Function to activate the buzzer for a short period
def activate_buzzer():
buzzer.on() # Turn on the buzzer
time.sleep(1) # Keep the buzzer on for 1 second
buzzer.off() # Turn off the buzzer
# Main loop
connect_wifi()
while True:
try:
# Read DHT22 data
dht.measure()
temperature = dht.temperature()
humidity = dht.humidity()
# Get pressure from the custom chip (via pin_read)
pressure = get_pressure()
if pressure is not None:
# Print data to console
print(f"Temperature: {temperature:.1f} °C")
print(f"Humidity: {humidity:.1f} %")
print(f"Pressure: {pressure:.2f} hPa")
# Send data to ThingSpeak
url = f"https://api.thingspeak.com/update?api_key={THINGSPEAK_WRITE_API}&field1={temperature}&field2={humidity}&field3={pressure}"
response = urequests.get(url)
print("ThingSpeak Response:", response.text)
response.close()
# Send Telegram alert if any value goes out of expected range
if temperature > 35:
send_telegram_alert(f"High temperature alert: {temperature} °C")
send_telegram_alert(f"Atmospheric Pressure is: {pressure} hPa")
activate_buzzer() # Activate the buzzer for high temperature
elif temperature < 0:
send_telegram_alert(f"Low temperature alert: {temperature} °C")
send_telegram_alert(f"Atmospheric Pressure is: {pressure} hPa")
activate_buzzer() # Activate the buzzer for low temperature
if humidity > 40:
send_telegram_alert(f"High humidity alert: {humidity}%")
activate_buzzer() # Activate the buzzer for high humidity
time.sleep(10) # Update every 10 seconds
except Exception as e:
print("Error:", e)
send_telegram_alert(f"Error occurred: {str(e)}") # Send alert on error
time.sleep(10) # Retry after 10 seconds