import machine
import network
from umqtt.simple import MQTTClient
import time
import dht
import lcd_api
import i2c_lcd
import ujson
# WiFi credentials
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# Servo setup
servo = machine.PWM(machine.Pin(23), freq=50) # Servo is using Pin 23
# DHT22 setup
dht_sensor = dht.DHT22(machine.Pin(14)) # DHT22 is using Pin 14
# PIR sensor setup
pir_sensor = machine.Pin(18, machine.Pin.IN) # PIR sensor using Pin 18
# Buzzer setup
buzzer = machine.PWM(machine.Pin(19), freq=1000) # Buzzer using Pin 19
buzzer.duty(0)
# I2C LCD setup
I2C_ADDR = 0x27
I2C_SDA_PIN = 21
I2C_SCL_PIN = 22
i2c = machine.I2C(scl=machine.Pin(I2C_SCL_PIN), sda=machine.Pin(I2C_SDA_PIN), freq=400000)
lcd = i2c_lcd.I2cLcd(i2c, I2C_ADDR, 4, 20)
# LED setup
led_temp = machine.Pin(5, machine.Pin.OUT) # LED for high temperature (Pin 15)
led_hum = machine.Pin(12, machine.Pin.OUT) # LED for low humidity (Pin 16)
# Ultrasonic sensor setup
trigger_pin = machine.Pin(32, machine.Pin.OUT) # Trigger pin using Pin 32
echo_pin = machine.Pin(33, machine.Pin.IN) # Echo pin using Pin 33
# Water level constants (in centimeters)
EMPTY_LEVEL = 50.0 # Distance from sensor to water surface when tank is empty
FULL_LEVEL = 10.0 # Distance from sensor to water surface when tank is full
# Thresholds
TEMP_THRESHOLD = 40.0 # threshold for high temperature in Celsius
HUM_THRESHOLD = 30.0 # threshold for low humidity in percentage
# MQTT Broker setup
CLIENT_ID = ""
BROKER = "mqtt-dashboard.com"
TOPIC_SERVO = "AfiqServo"
TOPIC_DHT = "dht22/data"
TOPIC_MOTION = "AfiqMotion"
TOPIC_TEMP = "temp"
TOPIC_HUMI = "humi"
TOPIC_DISTANCE = "ultrasonic/distance"
TOPIC_WATER_LEVEL = "water/level"
client = MQTTClient(CLIENT_ID, BROKER)
# Connect to Wi-Fi
def connect_to_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('Connecting to WiFi...')
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
while not wlan.isconnected():
pass
print('WiFi Connected:', wlan.ifconfig())
# Connect to MQTT Broker
def connect_to_broker():
try:
client.connect()
print("Connected to MQTT broker")
client.set_callback(on_message)
client.subscribe(TOPIC_SERVO)
except Exception as e:
print("Error connecting to MQTT broker:", e)
# MQTT message callback
def on_message(topic, msg):
topic_str = topic.decode('utf-8') # Convert bytes to string
if topic_str == TOPIC_SERVO:
process_servo_command(msg)
# Process servo commands
def process_servo_command(msg):
try:
# Decode the message to integer
angle = int(msg.decode('utf-8'))
print("Received angle command:", angle)
# Convert the angle to duty cycle values
duty_cycle = int(((angle / 180) * 1023)) # Mapping 0-180 to duty cycle
print("Calculated duty cycle:", duty_cycle)
servo.duty(duty_cycle)
except ValueError as ve:
print("Invalid integer format for servo command:", ve)
except Exception as e:
print("Error processing servo command:", e)
# Convert the angle to duty cycle
duty = (int(((angle)/180 *2 + 0.5) / 20 * 1023))
# Set duty cycle
servo.duty(duty)
# Measure distance using ultrasonic sensor
def measure_distance():
# Send a 10us pulse to trigger the sensor
trigger_pin.off()
time.sleep_us(2)
trigger_pin.on()
time.sleep_us(10)
trigger_pin.off()
# Measure the time for the echo to be received
while echo_pin.value() == 0:
pulse_start = time.ticks_us()
while echo_pin.value() == 1:
pulse_end = time.ticks_us()
pulse_duration = time.ticks_diff(pulse_end, pulse_start)
distance = (pulse_duration * 0.0343) / 2 # Speed of sound is 343 m/s
return distance
# Calculate water level percentage
def calculate_water_level_percentage(distance):
print("Measured distance (cm):", distance) # Debug print
if distance > EMPTY_LEVEL:
distance = EMPTY_LEVEL
elif distance < FULL_LEVEL:
distance = FULL_LEVEL
water_level_percentage = ((EMPTY_LEVEL - distance) / (EMPTY_LEVEL - FULL_LEVEL)) * 100
print("Calculated water level percentage:", water_level_percentage) # Debug print
return water_level_percentage
# Publish DHT22 sensor data and ultrasonic distance
def publish_sensor_data():
try:
dht_sensor.measure()
temperature = dht_sensor.temperature()
humidity = dht_sensor.humidity()
# Debug prints for temperature and humidity
print("Measured temperature:", temperature)
print("Measured humidity:", humidity)
distance = measure_distance()
water_level_percentage = calculate_water_level_percentage(distance)
payload = {
"temperature": temperature,
"humidity": humidity,
"distance": distance,
"water_level_percentage": water_level_percentage
}
client.publish(TOPIC_DHT, ujson.dumps(payload))
client.publish(TOPIC_TEMP, ujson.dumps(temperature))
client.publish(TOPIC_HUMI, ujson.dumps(humidity))
client.publish(TOPIC_DISTANCE, ujson.dumps(distance))
client.publish(TOPIC_WATER_LEVEL, ujson.dumps(water_level_percentage))
# Display data on LCD
lcd.clear()
lcd.putstr("Temp: {:.1f}C".format(temperature))
lcd.move_to(0, 1)
lcd.putstr("Hum: {:.1f}%".format(humidity))
lcd.move_to(0, 2)
lcd.putstr("Water: {:.1f}%".format(water_level_percentage))
# Update LEDs based on thresholds
if temperature > TEMP_THRESHOLD:
led_temp.on()
else:
led_temp.off()
if humidity < HUM_THRESHOLD:
led_hum.on()
else:
led_hum.off()
except Exception as e:
print("Error reading/publishing sensor data:", e)
# Monitor PIR sensor
def monitor_pir_sensor():
if pir_sensor.value() == 1:
print("Motion detected!")
buzzer.freq(1000) # Set frequency to 1 kHz
buzzer.duty(512) # 50% duty cycle (range 0-1023)
publish_motion_detected_data()
time.sleep(5)
else:
buzzer.duty(0) # Turn off buzzer
# Publish sensor data when motion is detected
def publish_motion_detected_data():
try:
payload = 'true'
client.publish(TOPIC_MOTION, payload)
print("Published motion detected sensor data:", payload)
except Exception as e:
print("Error reading/publishing sensor data:", e)
# Main loop
def main():
connect_to_wifi()
connect_to_broker()
last_publish = 0
while True:
client.check_msg()
current_time = time.time()
if current_time - last_publish > 10:
publish_sensor_data()
last_publish = current_time
monitor_pir_sensor()
time.sleep(0.1)
if __name__ == "__main__":
main()