from utime import sleep
from hx711 import HX711
import network
import machine
import ujson
from umqtt.simple import MQTTClient
import neopixel
from machine import Pin, I2C, PWM
from ssd1306 import SSD1306_I2C
import utime
# OLED configuration
WIDTH = 128
HEIGHT = 64
i2c = I2C(0, scl=Pin(22), sda=Pin(21), freq=200000)
oled = SSD1306_I2C(WIDTH, HEIGHT, i2c)
# NeoPixel configuration
NEOPIXEL_PIN = 14 # Pin on ESP32
NUM_PIXELS = 16 # Number of NeoPixels
np = neopixel.NeoPixel(Pin(NEOPIXEL_PIN), NUM_PIXELS)
# Pressure sensor configuration
capteur_hx711 = HX711(4, 5, 1)
# Servo motor configuration
SERVO_PIN = 15
servo = PWM(Pin(SERVO_PIN), freq=50)
def set_servo_angle(angle):
duty = 40 + int((angle / 180.0) * 115) # Duty cycle range for 0 to 180 degrees
servo.duty(duty)
# Button configuration
BUTTON_PIN = 19
button = Pin(BUTTON_PIN, Pin.IN, Pin.PULL_UP)
# WiFi configuration
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# MQTT configuration
MQTT_BROKER = "broker.mqttdashboard.com"
MQTT_PORT = 1883
MQTT_CLIENT_ID = "ESP32_Chickens"
MQTT_TOPIC_MASS = "home/chickens/mass"
MQTT_TOPIC_PIXELS = "home/chickens/pixels"
MQTT_TOPIC_MISSING = "home/chickens/missing"
MQTT_CONTROL_TOPIC = "home/chickens/control"
# Connect to WiFi
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
while not wlan.isconnected():
pass
print("Connected to WiFi")
connect_wifi()
# Connect to MQTT broker
def connect_mqtt():
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, port=MQTT_PORT)
client.set_callback(on_message)
client.connect()
client.subscribe(MQTT_CONTROL_TOPIC)
print("Connected to MQTT Broker and subscribed to control topic")
return client
def on_message(topic, msg):
print(f"Received message: {msg} on topic: {topic}")
try:
payload = int(msg)
if payload == 1:
set_servo_angle(90)
print("Set servo to 90 degrees")
elif payload == 0:
set_servo_angle(0)
print("Set servo to 0 degrees")
except Exception as e:
print(f"Failed to process message: {e}")
mqtt_client = connect_mqtt()
# Calibration factor for converting sensor readings to kilograms
CALIBRATION_FACTOR = 420
# Function to convert sensor reading to kilograms
def convert_to_kg(raw_reading):
return raw_reading / CALIBRATION_FACTOR
# Global variable to track button state
button_pressed = False
# Interrupt service routine for button press
def handle_button_interrupt(pin):
global button_pressed
button_pressed = not button_pressed
if button_pressed:
set_servo_angle(90)
print("Button pressed: Set servo to 90 degrees")
else:
set_servo_angle(0)
print("Button released: Set servo to 0 degrees")
# Attach interrupt to button
button.irq(trigger=Pin.IRQ_FALLING, handler=handle_button_interrupt)
while True:
capteur_hx711.power_on()
# Wait until the sensor is ready
while not capteur_hx711.is_ready():
pass
# Read the sensor value
raw_reading = capteur_hx711.read()
# Convert the raw sensor reading to kilograms
mass_kg = convert_to_kg(raw_reading)
# Calculate the number of pixels to light up based on mass
pixels_to_light = min(int(mass_kg / 3), NUM_PIXELS) # 4kg per pixel, cap at NUM_PIXELS
# Clear the OLED display
oled.fill(0)
# Display the mass in kilograms on the OLED display
oled.text("Mass (kg): {:.2f}".format(mass_kg), 10, 10)
# Display the number of chickens (same as the number of pixels lit up)
oled.text("Chickens: {}".format(pixels_to_light), 10, 30)
# Check if all chickens are present
if pixels_to_light == NUM_PIXELS:
oled.text("ALL PRESENT", 10, 50)
else:
# Display how many chickens are missing
chickens_missing = NUM_PIXELS - pixels_to_light
oled.text("{} MISSING!".format(chickens_missing), 10, 50)
# Update the OLED display
oled.show()
# Light up NeoPixels based on mass
for i in range(NUM_PIXELS):
if i < pixels_to_light:
np[i] = (255, 255, 255) # White color
else:
np[i] = (0, 0, 0) # Turn off NeoPixel
np.write()
# Publish data to separate MQTT topics
mqtt_client.publish(MQTT_TOPIC_MASS, ujson.dumps(mass_kg))
mqtt_client.publish(MQTT_TOPIC_PIXELS, ujson.dumps(pixels_to_light))
mqtt_client.publish(MQTT_TOPIC_MISSING, ujson.dumps(NUM_PIXELS - pixels_to_light))
# Check for incoming MQTT messages
mqtt_client.check_msg()
# Wait for a short period before taking the next reading
sleep(1)