from machine import Pin, I2C, PWM
import time, network, machine
import ssd1306
import hx711_gpio
from simple import MQTTClient
# ——— PolledPin wrapper so the HX711 driver falls back to polling ———
class PolledPin:
def __init__(self, pin):
self.pin = pin
def __call__(self):
return self.pin.value()
def value(self, *args):
if args:
self.pin.value(args[0])
return self.pin.value()
# === Pin Definitions ===
SERVO_PIN = Pin(13, Pin.OUT)
# === OLED Setup ===
i2c = I2C(scl=Pin(22), sda=Pin(21))
oled = ssd1306.SSD1306_I2C(128, 64, i2c)
# === HX711 Setup (clock, data, gain) ===
clk = Pin(33, Pin.OUT) # SCK on GPIO 33
dat = PolledPin(Pin(32, Pin.IN)) # DOUT on GPIO 32, wrapped
hx711 = hx711_gpio.HX711(clk, dat, gain=128)
print("Taring... place no weight on the load cell")
hx711.tare()
time.sleep(1)
print("Tare done.")
# ——— Monkey‑patch read() to skip waiting loops ———
def raw_read():
result = 0
for _ in range(24 + hx711.GAIN):
hx711.clock(True)
hx711.clock(False)
result = (result << 1) | hx711.data()
result >>= hx711.GAIN
if result > 0x7FFFFF:
result -= 0x1000000
return result
hx711.read = raw_read
# === Servo Setup ===
servo = PWM(SERVO_PIN, freq=50)
# === Wi‑Fi / MQTT Setup ===
SSID = "Wokwi-GUEST"
PASSWORD = ""
MQTT_SERVER = "broker.hivemq.com"
MQTT_CLIENT_ID = "esp32-petfeeder"
MQTT_TOPIC_WT = b"petfeeder/weight"
MQTT_TOPIC_ST = b"petfeeder/status"
client = MQTTClient(MQTT_CLIENT_ID, MQTT_SERVER)
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(SSID, PASSWORD)
print("Connecting to Wi‑Fi", end="")
while not wlan.isconnected():
print(".", end="")
time.sleep(0.3)
print(" ✓")
def connect_mqtt():
try:
client.connect()
print("Connected to MQTT!")
except Exception as e:
print("MQTT connect failed:", e)
time.sleep(2)
machine.reset()
# === Helper Functions ===
def read_weight():
return hx711.read()
def control_servo(weight):
""" Activates servo based on weight threshold """
if weight is not None and weight < 1000:
servo.duty(40) # Open (dispensing)
else:
servo.duty(115) # Close
def update_display(w, status):
oled.fill(0)
oled.text("Food: {} g".format(w if w is not None else "---"), 0, 0)
oled.text("Status: {}".format(status), 0, 20)
oled.show()
# === Main Loop ===
def main():
connect_wifi()
connect_mqtt()
client.publish(MQTT_TOPIC_ST, b"Online", retain=True)
while True:
try:
w = read_weight()
client.publish(MQTT_TOPIC_WT, str(w), retain=True)
except Exception:
w = None
client.publish(MQTT_TOPIC_ST, b"Sensor Error", retain=True)
status = "Idle"
if w is not None and w < 1000:
status = "Refill!"
client.publish(MQTT_TOPIC_ST, status, retain=True)
control_servo(w) # <-- Activates the servo
update_display(w, status)
time.sleep(5)
if __name__ == "__main__":
main()