"""
MicroPython MQTT NeoPixels Example for Wokwi.com
To control the pixels:
1. Go to http://www.hivemq.com/demos/websocket-client/
2. Click "Connect"
3. Under Public, set the Topic to "wokwi"
4. in the Message field type [255, 0, 0] (or any other color RGB value)
5. click "Publish"
Copyright (C) 2022, Uri Shaked
https://wokwi.com/arduino/projects/398371738722866177
"""
from machine import Pin, I2C, PWM,ADC
import time
from hcsr04 import HCSR04
from I2C_lcd import I2cLcd
import dht
from umqtt.simple import MQTTClient
import network
# WiFi settings
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# MQTT settings
MQTT_BROKER = "mqtt-dashboard.com"
MQTT_CLIENT_ID = "clientId-unique12345"
MQTT_USERNAME = ""
MQTT_PASSWORD = ""
MQTT_TOPIC1 = 'temp_bella'
MQTT_TOPIC2 = 'humi_bella'
BUZZER_PIN = 2
TEMP_THRESHOLD = 30 # Temperature threshold for buzzer activation (in Celsius)
TONE_FREQUENCY = 1000 # Define the frequency of the tone (in Hz)
POTENTIOMETER_PIN = 34 # Pin for potentiometer input
POTENTIOMETER_THRESHOLD = 2048 # Threshold for potentiometer value (mid-range)
HUMIDITY_GAS_LEAKAGE_THRESHOLD = 70 # Threshold for humidity indicating gas leakage
# Initialize I2C LCD
def init_lcd():
i2c = I2C(scl=Pin(22), sda=Pin(21), freq=400000)
lcd = I2cLcd(i2c, 0x27, 4, 20)
return lcd
# WiFi connection function
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
while not wlan.isconnected():
time.sleep(1)
print('WiFi connected:', wlan.ifconfig())
# Measure waterlevel
def measure_distance(sensor):
# Measure the distance using the provided sensor
distance_cm = sensor.distance_cm()
return distance_cm
# Measure temperature and humidity
def measure_temp_humidity(sensor):
sensor.measure()
temperature = sensor.temperature()
humidity = sensor.humidity()
return temperature, humidity
# Display temperature, humidity, and distance on lcd
def display_readings(lcd, temperature, humidity, distance, gas_leakage):
lcd.clear()
lcd.putstr("Temp: {:.2f} C\n".format(temperature))
lcd.putstr("Humid: {:.2f} %\n".format(humidity))
if gas_leakage:
lcd.putstr("\nGas Leak Detected!")
print('Temp:', "{:.2f}".format(temperature), 'C')
print('Humid:', "{:.2f}".format(humidity), '%')
# Send data to MQTT broker
def send_mqtt(client, topic, message):
try:
client.publish(topic, str(message))
print(f'Sent {message} to topic {topic}')
except Exception as e:
print(f"Failed to send {message} to topic {topic}. Error: {e}")
# Activate buzzer with tone
def activate_buzzer(temperature, potentiometer_value, humidity):
buzzer_pwm = PWM(Pin(BUZZER_PIN))
if (temperature > TEMP_THRESHOLD or potentiometer_value > POTENTIOMETER_THRESHOLD or humidity > HUMIDITY_GAS_LEAKAGE_THRESHOLD):
buzzer_pwm.freq(TONE_FREQUENCY) # Set frequency for tone
buzzer_pwm.duty(512) # Set duty cycle to half of the maximum (50% duty cycle)
time.sleep(1) # Buzz for 1 second
buzzer_pwm.deinit() # Turn off the buzzer
print("Gas Leakage Detected! Buzzer Activated.")
return True # Indicate gas leakage
else:
print("No gas leakage detected.")
return False # No gas leakage
def main():
# Connect to WiFi
connect_wifi()
# Initialize MQTT client
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER)
client.connect()
# Initialize LCD
lcd = init_lcd()
# Initialize DHT22 sensor
dht_sensor = dht.DHT22(Pin(14))
# Initialize Ultrasonic Sensor
ultrasonic_sensor = HCSR04(trigger_pin=12, echo_pin=13)
# Initialize Potentiometer
potentiometer = ADC(Pin(POTENTIOMETER_PIN)) # Create ADC object for potentiometer
while True:
# Measure temperature and humidity
temperature, humidity = measure_temp_humidity(dht_sensor)
# Measure distance
distance = measure_distance(ultrasonic_sensor)
# Measure potentiometer value
potentiometer_value = potentiometer.read() # Read the potentiometer value (0-4095)
# Check for gas leakage and activate buzzer if needed
gas_leakage = activate_buzzer(temperature, potentiometer_value, humidity)
# Display all readings on the LCD
display_readings(lcd, temperature, humidity, distance, gas_leakage)
# Send data to MQTT broker
send_mqtt(client, MQTT_TOPIC1, str(temperature))
send_mqtt(client, MQTT_TOPIC2, str(humidity))
time.sleep(5) # Wait for 5 seconds
if __name__ == "__main__":
main()