import time
import network
import urequests
from machine import Pin,ADC,I2C
from dht import DHT22
import random
import ssd1306
# Constants
THINGSPEAK_WRITE_API = "OTWLW2CRFDBQ6PFR" # Your ThingSpeak API Key
WIFI_SSID = "Wokwi-GUEST" # Wi-Fi SSID
WIFI_PASS = "" # Wi-Fi Password
# Pressure Sensor Threshold
PRESSURE_THRESHOLD = 500 # Pressure threshold (simulated)
MIN_VOLTAGE = 0.0 # 0V corresponds to 300 hPa
MAX_VOLTAGE = 3.3 # 3.3V corresponds to 1100 hPa
MIN_PRESSURE = 950 # Minimum pressure value in hPa
MAX_PRESSURE = 1050 # Maximum pressure value in hPa
# Using GPIO 26 for the ADC pin
pressurePin = ADC(Pin(26))
# Sensor Setup
dht = DHT22(Pin(15)) # DHT22 connected to pin 15
buzzer = Pin(14, Pin.OUT) # Buzzer connected to pin 14
# OLED Setup (I2C on GP8 & GP9)
i2c = I2C(0, scl=Pin(9), sda=Pin(8), freq=400000)
oled = ssd1306.SSD1306_I2C(128, 64, i2c)
# Telegram Alert Function
def send_telegram_alert(message):
TELEGRAM_BOT_TOKEN = "7836110854:AAHjruURwDdafFmZ0ZlVtqRMtthNFb80OH8" # Your Telegram bot token
TELEGRAM_CHAT_ID = "1353742706" # Your Telegram chat ID
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage?chat_id={TELEGRAM_CHAT_ID}&text={message}"
try:
response = urequests.get(url)
print("Telegram Alert Sent:", response.text)
response.close()
except Exception as e:
print("Error sending Telegram alert:", e)
# Connect to Wi-Fi
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASS)
timeout = 15 # 15 seconds timeout for Wi-Fi connection
while not wlan.isconnected() and timeout > 0:
print("Connecting to Wi-Fi...")
time.sleep(1)
timeout -= 1
if wlan.isconnected():
print("Connected to Wi-Fi")
print("IP Address:", wlan.ifconfig()[0])
else:
print("Failed to connect to Wi-Fi.")
send_telegram_alert("Wi-Fi connection failed") # Send alert if Wi-Fi fails
raise Exception("Wi-Fi connection failed")
def get_pressure():
try:
# Read the analog value from the pressure sensor
# Read the analog value (0-1023 range)
pressure_value = pressurePin.read_u16() /64 # Convert 16-bit to 10-bit range
# Convert the pressure value to hPa (based on your sensor characteristics)
pressure = (pressure_value / 1023) * (MAX_PRESSURE - MIN_PRESSURE) + MIN_PRESSURE
print(f"Pressure: {pressure_value:.2f} hPa")
# Detect if pressure exceeds threshold
if pressure > PRESSURE_THRESHOLD:
send_telegram_alert(f"Warning! High pressure detected: {pressure:.2f} hPa")
activate_buzzer()
except Exception as e:
print("Error reading pressure:", e)
send_telegram_alert(f"Error occurred while reading pressure: {str(e)}")
return None
return pressure
def update_oled(temperature, humidity, pressure):
oled.fill(0) # Clear screen
oled.text("Env Monitor", 25, 0)
oled.text(f"Temp: {temperature} C", 5, 16)
oled.text(f"Humidity: {humidity}%", 5, 32)
oled.text(f"Pressure: {pressure} hPa", 5, 48)
oled.show()
# Function to activate the buzzer for a short period
def activate_buzzer():
buzzer.on() # Turn on the buzzer
time.sleep(1) # Keep the buzzer on for 1 second
buzzer.off() # Turn off the buzzer
# Main loop
connect_wifi()
while True:
try:
# Read DHT22 data
dht.measure()
temperature = dht.temperature()
humidity = dht.humidity()
# Get pressure from the custom chip (via pin_read)
pressure = get_pressure()
update_oled(temperature, humidity, pressure)
if pressure is not None:
# Print data to console
print(f"Temperature: {temperature:.1f} °C")
print(f"Humidity: {humidity:.1f} %")
print(f"Pressure: {pressure:.2f} hPa")
# Send data to ThingSpeak
url = f"https://api.thingspeak.com/update?api_key={THINGSPEAK_WRITE_API}&field1={temperature}&field2={humidity}&field3={pressure}"
response = urequests.get(url)
print("ThingSpeak Response:", response.text)
response.close()
# Send Telegram alert if any value goes out of expected range
if temperature > 35:
send_telegram_alert(f"High temperature alert: {temperature} °C")
send_telegram_alert(f"Atmospheric Pressure is: {pressure} hPa")
activate_buzzer() # Activate the buzzer for high temperature
elif temperature < 0:
send_telegram_alert(f"Low temperature alert: {temperature} °C")
send_telegram_alert(f"Atmospheric Pressure is: {pressure} hPa")
activate_buzzer() # Activate the buzzer for low temperature
if humidity > 40:
send_telegram_alert(f"High humidity alert: {humidity}%")
activate_buzzer() # Activate the buzzer for high humidity
time.sleep(10) # Update every 10 seconds
except Exception as e:
print("Error:", e)
send_telegram_alert(f"Error occurred: {str(e)}") # Send alert on error
time.sleep(10) # Retry after 10 seconds