import machine
import network
from umqtt.simple import MQTTClient
import time
import dht
import lcd_api
import i2c_lcd
# WiFi credentials
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# Servo setup
servo = machine.PWM(machine.Pin(23), freq=50) # Servo is using Pin 23
# DHT22 setup
dht_sensor = dht.DHT22(machine.Pin(14)) # DHT22 is using Pin 14
# PIR sensor setup
pir_sensor = machine.Pin(18, machine.Pin.IN) # PIR sensor using Pin 18
# Buzzer setup
buzzer = machine.PWM(machine.Pin(19), freq=1000) # Buzzer using Pin 19
buzzer.duty(0)
# I2C LCD setup
I2C_ADDR = 0x27
I2C_SDA_PIN = 21
I2C_SCL_PIN = 22
i2c = machine.I2C(scl=machine.Pin(I2C_SCL_PIN), sda=machine.Pin(I2C_SDA_PIN), freq=400000)
lcd = i2c_lcd.I2cLcd(i2c, I2C_ADDR, 2, 16)
# LED setup
led_temp = machine.Pin(5, machine.Pin.OUT) # LED for high temperature (Pin 15)
led_hum = machine.Pin(12, machine.Pin.OUT) # LED for low humidity (Pin 16)
# Thresholds
TEMP_THRESHOLD = 40.0 # threshold for high temperature in Celsius
HUM_THRESHOLD = 30.0 # threshold for low humidity in percentage
# MQTT Broker setup
CLIENT_ID = ""
BROKER = "broker.emqx.io"
TOPIC_SERVO = "Servo"
TOPIC_DHT = "dht22/data"
TOPIC_MOTION = "Motion"
client = MQTTClient(CLIENT_ID, BROKER)
# Connect to Wi-Fi
def connect_to_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('Connecting to WiFi...')
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
while not wlan.isconnected():
pass
print('WiFi Connected:', wlan.ifconfig())
# Connect to MQTT Broker
def connect_to_broker():
try:
client.connect()
print("Connected to MQTT broker")
client.set_callback(on_message)
client.subscribe(TOPIC_SERVO)
except Exception as e:
print("Error connecting to MQTT broker:", e)
# MQTT message callback
def on_message(topic, msg):
topic_str = topic.decode('utf-8') # Convert bytes to string
if topic_str == TOPIC_SERVO:
process_servo_command(msg)
# Process servo commands
def process_servo_command(msg):
try:
# Decode the message to integer
angle = int(msg.decode('utf-8'))
print("Received angle command:", angle)
# Convert the angle to duty cycle values
duty_cycle = int(((angle / 180) * 1023)) # Mapping 0-180 to duty cycle
print("Calculated duty cycle:", duty_cycle)
servo.duty(duty_cycle)
except ValueError as ve:
print("Invalid integer format for servo command:", ve)
except Exception as e:
print("Error processing servo command:", e)
# Convert the angle to duty cycle
duty = (int(((angle)/180 *2 + 0.5) / 20 * 1023))
# Set duty cycle
servo.duty(duty)
# Publish DHT22 sensor data
def publish_sensor_data():
try:
dht_sensor.measure()
temperature = dht_sensor.temperature()
humidity = dht_sensor.humidity()
payload = '{"temperature": ' + str(temperature) + ', "humidity": ' + str(humidity) + '}'
client.publish(TOPIC_DHT, payload)
print("Published sensor data:", payload)
# Display data on LCD
lcd.clear()
lcd.putstr("Temp: {:.1f}C".format(temperature))
lcd.move_to(0, 1)
lcd.putstr("Hum: {:.1f}%".format(humidity))
# Update LEDs based on thresholds
if temperature > TEMP_THRESHOLD:
led_temp.on()
else:
led_temp.off()
if humidity < HUM_THRESHOLD:
led_hum.on()
else:
led_hum.off()
except Exception as e:
print("Error reading/publishing sensor data:", e)
# Monitor PIR sensor
def monitor_pir_sensor():
if pir_sensor.value() == 1:
print("Motion detected!")
buzzer.freq(1000) # Set frequency to 1 kHz
buzzer.duty(512) # 50% duty cycle (range 0-1023)
publish_motion_detected_data()
time.sleep(5)
else:
buzzer.duty(0) # Turn off buzzer
# # Publish sensor data when motion is detected
# def publish_motion_detected_data():
# try:
# if pir_sensor.value() == 1:
# pir = pir_sensor.pir()
# payload = '{"Motion detected": ' + str(pir) '}'
# client.publish(TOPIC_MOTION, payload)
# print("Published motion detected sensor data:", payload)
# except Exception as e:
# print("Error reading/publishing sensor data:", e)
# Publish sensor data when motion is detected
def publish_motion_detected_data():
try:
# Since you're already calling this function after detecting motion,
# there's no need to check `pir_sensor.value()` again here.
# I'm assuming you want to simply publish a payload indicating motion detection.
payload = '{"Motion detected": true}'
client.publish(TOPIC_MOTION, payload)
print("Published motion detected sensor data:", payload)
except Exception as e:
print("Error reading/publishing sensor data:", e)
# Main loop
def main():
connect_to_wifi()
connect_to_broker()
last_publish = 0
while True:
client.check_msg()
current_time = time.time()
if current_time - last_publish > 10:
publish_sensor_data()
last_publish = current_time
monitor_pir_sensor()
time.sleep(0.1)
if __name__ == "__main__":
main()