"""
MicroPython IoT Weather Station Example for Wokwi.com
To view the data:
1. Go to http://www.hivemq.com/demos/websocket-client/
2. Click "Connect"
3. Under Subscriptions, click "Add New Topic Subscription"
4. In the Topic field, type "wokwi-weather" then click "Subscribe"
Now click on the DHT22 sensor in the simulation,
change the temperature/humidity, and you should see
the message appear on the MQTT Broker, in the "Messages" pane.
Copyright (C) 2022, Uri Shaked
https://wokwi.com/arduino/projects/322577683855704658
"""
import network
import time
import ujson
from machine import Pin, I2C, PWM
from hcsr04 import HCSR04
from i2c_lcd import I2cLcd
from umqtt.simple import MQTTClient
# WiFi and MQTT parameters
WIFI_SSID = "lotsocute_5G"
WIFI_PASSWORD = "Lotsocute01"
MQTT_CLIENT_ID = "clientId-cTksCRTUKQ"
MQTT_BROKER = "broker.mqttdashboard.com"
MQTT_USER = ""
MQTT_PASSWORD = ""
MQTT_TOPIC = "Feed The Chicken"
# Initialize I2C
i2c = I2C(scl=Pin(22), sda=Pin(21), freq=400000)
# Initialize LCD
lcd = I2cLcd(i2c, 0x27, 2, 16)
# Initialize Ultrasonic Sensor
sensor = HCSR04(trigger_pin=25, echo_pin=26)
# Initialize Servo Motor
servo_pin = Pin(12)
servo = PWM(servo_pin, freq=50)
servo.duty(77) # Adjust initial duty cycle
# Distance threshold for servo activation
DISTANCE_THRESHOLD = 10.0 # Adjust as needed
def connect_wifi(ssid, password):
print("Connecting to WiFi", end="")
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('Wokwi-GUEST', '')
while not sta_if.isconnected():
print(".", end="")
time.sleep(0.1)
print(" Connected!")
def connect_mqtt(client_id, broker, user, password):
print("Connecting to MQTT server... ", end="")
client = MQTTClient(client_id, broker, user=user, password=password)
try:
client.connect()
print("Connected!")
except Exception as e:
print("Failed to connect to MQTT broker:", e)
return client
def measure_distance():
try:
# Wait for a short time for the sensor to settle
time.sleep(0.1)
distance = sensor.distance_cm()
return distance
except Exception as e:
print("Error:", e)
return None
def display_distance(distance):
# Clear LCD and display distance without decimal places
lcd.clear()
lcd.putstr("Feed The Chicken:\n{:.2f} cm".format(distance))
print('Distance:', distance, 'cm')
def main():
connect_wifi(WIFI_SSID, WIFI_PASSWORD)
client = connect_mqtt(MQTT_CLIENT_ID, MQTT_BROKER, MQTT_USER, MQTT_PASSWORD)
prev_distance = None
while True:
distance = measure_distance()
if distance is not None:
display_distance(distance)
# Control servo based on distance
if distance < DISTANCE_THRESHOLD:
servo.duty(77) # Adjust duty cycle for the servo position
else:
servo.duty(30) # Adjust duty cycle for the servo position
print("Feed The Chicken: {:.2f} cm".format(distance))
# Report to MQTT if distance has changed
if distance != prev_distance:
message = ujson.dumps({"distance_cm": distance})
print("Reporting to MQTT topic {}: {}".format(MQTT_TOPIC, message))
try:
client.publish(MQTT_TOPIC, message)
print("Message published successfully")
except Exception as e:
print("Failed to publish message:", e)
prev_distance = distance
else:
print("No change")
# Wait for a moment before the next measurement
time.sleep(1)
if __name__ == "__main__":
main()