from utime import sleep
from hx711 import HX711
from time import sleep
import network
import machine
import ujson
from umqtt.simple import MQTTClient
import neopixel
from machine import Pin, I2C, PWM
from ssd1306 import SSD1306_I2Cfrom utime import sleep
from hx711 import HX711
from time import sleep
import network
import machine
import ujson
from umqtt.simple import MQTTClient
import neopixel
from machine import Pin, I2C, PWM
from ssd1306 import SSD1306_I2C
import utime
# OLED configuration
WIDTH = 128
HEIGHT = 64
i2c = I2C(0, scl=Pin(22), sda=Pin(21), freq=200000)
oled = SSD1306_I2C(WIDTH, HEIGHT, i2c)
# NeoPixel configuration
NEOPIXEL_PIN = 14 # Pin on ESP32
NUM_PIXELS = 16 # Number of NeoPixels
np = neopixel.NeoPixel(Pin(NEOPIXEL_PIN), NUM_PIXELS)
# Pressure sensor configuration
capteur_hx711 = HX711(4, 5, 1)
# Servo motor configuration
SERVO_PIN = 15
servo = PWM(Pin(SERVO_PIN), freq=50)
def set_servo_angle(angle):
duty = 40 + int((angle / 180.0) * 115) # Duty cycle range for 0 to 180 degrees
servo.duty(duty)
# Button configuration
BUTTON_PIN = 19
button = Pin(BUTTON_PIN, Pin.IN, Pin.PULL_UP) # Assuming active-low button
# Servo state
servo_state = 0 # 0 degrees initially
# Button press handling
def handle_button_press(pin):
global servo_state
if servo_state == 0:
set_servo_angle(90)
servo_state = 90
print("Button pressed, set servo to 90 degrees")
else:
set_servo_angle(0)
servo_state = 0
print("Button pressed, set servo to 0 degrees")
button.irq(trigger=Pin.IRQ_FALLING, handler=handle_button_press)
# WiFi configuration
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# MQTT configuration
MQTT_BROKER = "broker.mqttdashboard.com"
MQTT_PORT = 1883
MQTT_CLIENT_ID = "ESP32_Chickens"
MQTT_TOPIC_MASS = "home/chickens/mass"
MQTT_TOPIC_PIXELS = "home/chickens/pixels"
MQTT_TOPIC_MISSING = "home/chickens/missing"
MQTT_CONTROL_TOPIC = "home/chickens/control"
MQTT_BUTTON_TOPIC = "home/chickens/button"
# Connect to WiFi
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
while not wlan.isconnected():
pass
print("Connected to WiFi")
connect_wifi()
# Connect to MQTT broker
def connect_mqtt():
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, port=MQTT_PORT)
client.set_callback(on_message)
client.connect()
client.subscribe(MQTT_CONTROL_TOPIC)
client.subscribe(MQTT_BUTTON_TOPIC)
print("Connected to MQTT Broker and subscribed to control and button topics")
return client
def on_message(topic, msg):
global servo_state
print(f"Received message: {msg} on topic: {topic}")
try:
payload = int(msg)
if topic == MQTT_CONTROL_TOPIC:
if payload == 1:
set_servo_angle(90)
servo_state = 90
print("Set servo to 90 degrees")
elif payload == 0:
set_servo_angle(0)
servo_state = 0
print("Set servo to 0 degrees")
elif topic == MQTT_BUTTON_TOPIC:
handle_button_press(None)
except Exception as e:
print(f"Failed to process message: {e}")
mqtt_client = connect_mqtt()
# Calibration factor for converting sensor readings to kilograms
CALIBRATION_FACTOR = 420
# Function to convert sensor reading to kilograms
def convert_to_kg(raw_reading):
return raw_reading / CALIBRATION_FACTOR
while True:
capteur_hx711.power_on()
# Wait until the sensor is ready
while not capteur_hx711.is_ready():
pass
# Read the sensor value
raw_reading = capteur_hx711.read()
# Convert the raw sensor reading to kilograms
mass_kg = convert_to_kg(raw_reading)
# Calculate the number of pixels to light up based on mass
pixels_to_light = min(int(mass_kg / 3), NUM_PIXELS) # 4kg per pixel, cap at NUM_PIXELS
# Clear the OLED display
oled.fill(0)
# Display the mass in kilograms on the OLED display
oled.text("Mass (kg): {:.2f}".format(mass_kg), 10, 10)
# Display the number of chickens (same as the number of pixels lit up)
oled.text("Chickens: {}".format(pixels_to_light), 10, 30)
# Check if all chickens are present
if pixels_to_light == NUM_PIXELS:
oled.text("ALL PRESENT", 10, 50)
else:
# Display how many chickens are missing
chickens_missing = NUM_PIXELS - pixels_to_light
oled.text("{} MISSING!".format(chickens_missing), 10, 50)
# Update the OLED display
oled.show()
# Light up NeoPixels based on mass
for i in range(NUM_PIXELS):
if i < pixels_to_light:
np[i] = (255, 255, 255) # White color
else:
np[i] = (0, 0, 0) # Turn off NeoPixel
np.write()
# Publish data to separate MQTT topics
mqtt_client.publish(MQTT_TOPIC_MASS, ujson.dumps(mass_kg))
mqtt_client.publish(MQTT_TOPIC_PIXELS, ujson.dumps(pixels_to_light))
mqtt_client.publish(MQTT_TOPIC_MISSING, ujson.dumps(NUM_PIXELS - pixels_to_light))
# Check for incoming MQTT messages
mqtt_client.check_msg()
# Wait for a short period before taking the next reading
sleep(1)
import utime
# OLED configuration
WIDTH = 128
HEIGHT = 64
i2c = I2C(0, scl=Pin(22), sda=Pin(21), freq=200000)
oled = SSD1306_I2C(WIDTH, HEIGHT, i2c)
# NeoPixel configuration
NEOPIXEL_PIN = 14 # Pin on ESP32
NUM_PIXELS = 16 # Number of NeoPixels
np = neopixel.NeoPixel(Pin(NEOPIXEL_PIN), NUM_PIXELS)
# Pressure sensor configuration
capteur_hx711 = HX711(4, 5, 1)
# Servo motor configuration
SERVO_PIN = 15
servo = PWM(Pin(SERVO_PIN), freq=50)
def set_servo_angle(angle):
duty = 40 + int((angle / 180.0) * 115) # Duty cycle range for 0 to 180 degrees
servo.duty(duty)
# Button configuration
BUTTON_PIN = 19
button = Pin(BUTTON_PIN, Pin.IN, Pin.PULL_UP) # Assuming active-low button
# Button press handling
def handle_button_press(pin):
# Toggle servo between 0 and 90 degrees
current_angle = servo.duty()
if current_angle > 90:
set_servo_angle(0)
print("Button pressed, set servo to 0 degrees")
else:
set_servo_angle(90)
print("Button pressed, set servo to 90 degrees")
button.irq(trigger=Pin.IRQ_FALLING, handler=handle_button_press)
# WiFi configuration
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# MQTT configuration
MQTT_BROKER = "broker.mqttdashboard.com"
MQTT_PORT = 1883
MQTT_CLIENT_ID = "ESP32_Chickens"
MQTT_TOPIC_MASS = "home/chickens/mass"
MQTT_TOPIC_PIXELS = "home/chickens/pixels"
MQTT_TOPIC_MISSING = "home/chickens/missing"
MQTT_CONTROL_TOPIC = "home/chickens/control"
# Connect to WiFi
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
while not wlan.isconnected():
pass
print("Connected to WiFi")
connect_wifi()
# Connect to MQTT broker
def connect_mqtt():
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, port=MQTT_PORT)
client.set_callback(on_message)
client.connect()
client.subscribe("shai/button")
print("Connected to MQTT Broker and subscribed to control topic")
return client
def on_message(topic, msg):
print(f"Received message: {msg} on topic: {topic}")
try:
payload = int(msg)
if payload == 1:
set_servo_angle(90)
print("Set servo to 90 degrees")
elif payload == 0:
set_servo_angle(0)
print("Set servo to 0 degrees")
except Exception as e:
print(f"Failed to process message: {e}")
mqtt_client = connect_mqtt()
# Calibration factor for converting sensor readings to kilograms
CALIBRATION_FACTOR = 420
# Function to convert sensor reading to kilograms
def convert_to_kg(raw_reading):
return raw_reading / CALIBRATION_FACTOR
while True:
capteur_hx711.power_on()
# Wait until the sensor is ready
while not capteur_hx711.is_ready():
pass
# Read the sensor value
raw_reading = capteur_hx711.read()
# Convert the raw sensor reading to kilograms
mass_kg = convert_to_kg(raw_reading)
# Calculate the number of pixels to light up based on mass
pixels_to_light = min(int(mass_kg / 3), NUM_PIXELS) # 4kg per pixel, cap at NUM_PIXELS
# Clear the OLED display
oled.fill(0)
# Display the mass in kilograms on the OLED display
oled.text("Mass (kg): {:.2f}".format(mass_kg), 10, 10)
# Display the number of chickens (same as the number of pixels lit up)
oled.text("Chickens: {}".format(pixels_to_light), 10, 30)
# Check if all chickens are present
if pixels_to_light == NUM_PIXELS:
oled.text("ALL PRESENT", 10, 50)
else:
# Display how many chickens are missing
chickens_missing = NUM_PIXELS - pixels_to_light
oled.text("{} MISSING!".format(chickens_missing), 10, 50)
# Update the OLED display
oled.show()
# Light up NeoPixels based on mass
for i in range(NUM_PIXELS):
if i < pixels_to_light:
np[i] = (255, 255, 255) # White color
else:
np[i] = (0, 0, 0) # Turn off NeoPixel
np.write()
# Publish data to separate MQTT topics
mqtt_client.publish(MQTT_TOPIC_MASS, ujson.dumps(mass_kg))
mqtt_client.publish(MQTT_TOPIC_PIXELS, ujson.dumps(pixels_to_light))
mqtt_client.publish(MQTT_TOPIC_MISSING, ujson.dumps(NUM_PIXELS - pixels_to_light))
# Check for incoming MQTT messages
mqtt_client.check_msg()
# Wait for a short period before taking the next reading
sleep(1)