# Smart Food Container Bigyan
# Raspberry Pi Pico W MicroPython
# HX711Data = GPIO16, Clock = GPIO17
# OLED SDA = GP14, SCL = GP15
# Buttons GP7, GP8, GP9
# LEDs GP20, GP21, GP22
# Servo PWM = GP18
# Sunlight sensor AO -> GP26 (ADC0)
import time
import network
import ujson
import ubinascii
import machine
from machine import Pin, I2C, PWM, ADC
import ssd1306
from umqtt_simple import MQTTClient
# Buttons
button1 = Pin(7, Pin.IN, Pin.PULL_UP) # Tare
button2 = Pin(8, Pin.IN, Pin.PULL_UP) # Manual refill event
button3 = Pin(9, Pin.IN, Pin.PULL_UP) # Mix motor
# LEDs
led1 = Pin(20, Pin.OUT) # OK
led2 = Pin(21, Pin.OUT) # Low
led3 = Pin(22, Pin.OUT) # Empty
# OLED
i2c = I2C(1, sda=Pin(14), scl=Pin(15), freq=400000)
oled = ssd1306.SSD1306_I2C(128, 64, i2c)
# Servo Motor
SERVO_PIN = 18
servo = PWM(Pin(SERVO_PIN))
servo.freq(50)
SERVO_MIN = 1800
SERVO_MAX = 7800
# Sunlight sensor (photoresistor) on GP26 / ADC0
ldr = ADC(26)
SUNLIGHT_THRESHOLD = 5000
sun_alert_active = False
# Wi-Fi
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASS = ""
# MQTT
MQTT_BROKER = "broker.hivemq.com"
MQTT_PORT = 1883
MQTT_USER = ""
MQTT_PASS = ""
# Container settings
CAPACITY = 5000.0
LOW_PERCENT = 20.0
DAILY_USE = 150.0
# Calibration
CAL_FACTOR = 0.42
#WaitTimer
PULSE_US = 25
CHANGE_THRESHOLD_G = 2.0
HEARTBEAT_MS = 3000 # 3 seconds for faster MQTT updates
# MQTT topics
BASE = "home/foodcontainer/g08-smartfoodcontainer"
T_WEIGHT = BASE + "/weight"
T_STATUS = BASE + "/status"
T_EVENT = BASE + "/event"
T_DAYS = BASE + "/days_remaining"
T_AVAIL = BASE + "/availability"
T_STATE = BASE + "/state"
T_CMD = BASE + "/command"
T_SUN = BASE + "/sunlight"
T_DISC = "homeassistant/device/group08_food_container/config"
# HX711 / HX710 class
class HX711:
def __init__(self, data_pin, clock_pin):
self.data = Pin(data_pin, Pin.IN)
self.clock = Pin(clock_pin, Pin.OUT)
self.clock.value(0)
self.offset = 0
self.scale = 1.0
self._last_avg = 0
def wake(self):
self.clock.value(1)
time.sleep_ms(2)
self.clock.value(0)
# Give HX711 enough time to settle
time.sleep_ms(1000)
def _pulse(self):
self.clock.value(1)
time.sleep_us(PULSE_US)
self.clock.value(0)
time.sleep_us(PULSE_US)
def _wait_ready(self, timeout_ms=3000):
self.clock.value(0)
start = time.ticks_ms()
while self.data.value() == 1:
if time.ticks_diff(time.ticks_ms(), start) > timeout_ms:
return False
time.sleep_ms(1)
return True
def read_raw(self):
self.clock.value(0)
if not self._wait_ready(3000):
raise OSError("HX711 timeout: sensor not ready")
raw = 0
for _ in range(24):
self.clock.value(1)
time.sleep_us(PULSE_US)
raw = (raw << 1) | self.data.value()
self.clock.value(0)
time.sleep_us(PULSE_US)
# 25th pulse: gain 128
self._pulse()
# Convert 24-bit two's complement
if raw & 0x800000:
raw -= 0x1000000
# StableDelay
time.sleep_ms(50)
return raw
def read_average(self, times=1):
total = 0
count = 0
for _ in range(times):
try:
total += self.read_raw()
count += 1
except OSError as e:
print("read skipped:", e)
if count > 0:
self._last_avg = total / count
return self._last_avg
def tare(self, times=5):
print("taring... please wait")
total = 0
count = 0
for i in range(times):
try:
val = self.read_raw()
total += val
count += 1
print("tare sample", i + 1, "raw:", val)
except OSError as e:
print("tare sample failed:", e)
self.offset = total / count if count > 0 else 0
print("tare done, offset =", self.offset)
def set_scale(self, factor):
if factor == 0:
factor = 1.0
self.scale = factor
def get_grams(self, times=1):
val = (self.read_average(times) - self.offset) / self.scale
if val < 0:
val = 0.0
return val
# Global state
scale = HX711(16, 17)
buf = []
last_w = None
stable_w = None
last_st = "unknown"
last_evt = "none"
sun_status = "OK"
last_pub = 0
move_time = 0
is_moving = False
mqtt_connected = False
uid = ubinascii.hexlify(machine.unique_id()).decode()
mqtt = MQTTClient(
"grp08-" + uid,
MQTT_BROKER,
port=MQTT_PORT,
user=MQTT_USER,
password=MQTT_PASS,
keepalive=60
)
# Servo helpers
def servo_angle(angle):
if angle < 0:
angle = 0
if angle > 180:
angle = 180
duty = SERVO_MIN + int((SERVO_MAX - SERVO_MIN) * angle / 180)
servo.duty_u16(duty)
def mix_contents(cycles=3, delay=0.4):
global last_evt
print("mixing contents...")
last_evt = "mixing"
for _ in range(cycles):
servo_angle(30)
time.sleep(delay)
servo_angle(150)
time.sleep(delay)
servo_angle(90)
last_evt = "mixed"
print("mixing complete")
# Sunlight helpers
def read_sunlight():
raw = ldr.read_u16()
direct_sun = raw < SUNLIGHT_THRESHOLD
return raw, direct_sun
def publish_sunlight(sun_raw, warning):
if not mqtt_connected:
return
try:
mqtt.publish(
T_SUN,
ujson.dumps({
"sunlight_raw": sun_raw,
"warning": warning
}),
retain=True
)
except Exception as e:
print("sunlight publish error:", e)
# Helper functions
def smooth(value):
buf.append(value)
if len(buf) > 3:
buf.pop(0)
return sum(buf) / len(buf)
def get_status(w):
if w <= 5:
return "Empty"
if w <= CAPACITY * (LOW_PERCENT / 100.0):
return "Low"
return "OK"
def get_days(w):
if DAILY_USE <= 0:
return 0
return int(w / DAILY_USE + 0.5)
def show_oled(w, status, days, event):
kg = round(w / 1000, 2)
pct = min(100, int(w / CAPACITY * 100))
oled.fill(0)
oled.text("Food Container", 0, 0)
oled.text("W: " + str(kg) + " kg", 0, 14)
oled.text(str(pct) + "%", 96, 14)
oled.text("Status: " + status, 0, 26)
oled.text("Days: " + str(days), 0, 38)
oled.text("Event: " + event[:8], 0, 50)
oled.show()
def update_leds(status):
led1.value(status == "OK")
led2.value(status == "Low")
led3.value(status == "Empty")
def do_tare():
global stable_w, last_w, last_evt
scale.tare()
buf.clear()
stable_w = None
last_w = None
last_evt = "tare"
print("tare complete, smoothing reset")
# Wi-Fi
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(False)
time.sleep(0.5)
wlan.active(True)
time.sleep(0.5)
if wlan.isconnected():
return True
print("connecting to wifi...")
wlan.connect(WIFI_SSID, WIFI_PASS)
timeout = 30
while not wlan.isconnected() and timeout > 0:
time.sleep(1)
timeout -= 1
print(".", end="")
print()
if wlan.isconnected():
print("wifi connected:", wlan.ifconfig())
time.sleep(2)
return True
print("wifi failed - running without network")
return False
# MQTT
def on_message(topic, msg):
try:
cmd = msg.decode().strip().lower()
print("command:", cmd)
if cmd == "tare":
do_tare()
elif cmd == "ping":
if mqtt_connected:
mqtt.publish(T_EVENT, b'{"event":"pong"}')
elif cmd == "mix":
mix_contents()
except Exception as e:
print("command error:", e)
def connect_mqtt():
global mqtt_connected
wlan = network.WLAN(network.STA_IF)
if not wlan.isconnected():
print("skipping mqtt - no wifi")
return
print("connecting to mqtt...")
try:
mqtt.set_callback(on_message)
mqtt.set_last_will(T_AVAIL, b"offline", retain=True)
mqtt.connect()
mqtt.subscribe(T_CMD)
mqtt.publish(T_AVAIL, b"online", retain=True)
mqtt_connected = True
print("mqtt connected")
send_discovery()
except Exception as e:
print("mqtt failed:", e)
mqtt_connected = False
def send_discovery():
if not mqtt_connected:
return
config = {
"dev": {
"ids": "group08_food_container",
"name": "Smart Food Container",
"mf": "Group 08",
"mdl": "Pico W HX711 OLED Servo Sun"
},
"state_topic": T_STATE,
"availability": [
{
"topic": T_AVAIL
}
],
"cmps": {
"weight": {
"p": "sensor",
"name": "Weight",
"unique_id": "grp08_weight",
"device_class": "weight",
"unit_of_measurement": "g",
"state_class": "measurement",
"value_template": "{{ value_json.weight }}"
},
"status": {
"p": "sensor",
"name": "Status",
"unique_id": "grp08_status",
"icon": "mdi:food",
"value_template": "{{ value_json.status }}"
},
"days_remaining": {
"p": "sensor",
"name": "Days Remaining",
"unique_id": "grp08_days",
"unit_of_measurement": "d",
"icon": "mdi:calendar-clock",
"value_template": "{{ value_json.days_remaining }}"
}
}
}
try:
mqtt.publish(T_DISC, ujson.dumps(config), retain=True)
print("discovery sent")
except Exception as e:
print("discovery failed:", e)
def publish_state(w):
if not mqtt_connected:
return
status = get_status(w)
days = get_days(w)
payload = ujson.dumps({
"weight": round(w, 1),
"status": status,
"days_remaining": days,
"last_event": last_evt
})
try:
mqtt.publish(T_STATE, payload, retain=True)
mqtt.publish(T_WEIGHT, str(round(w, 1)), retain=True)
mqtt.publish(T_STATUS, status, retain=True)
mqtt.publish(T_DAYS, str(days), retain=True)
except Exception as e:
print("publish error:", e)
def publish_event(name, diff, w):
global last_evt
last_evt = name
print("event:", name, round(diff, 1), "g")
if not mqtt_connected:
return
try:
mqtt.publish(T_EVENT, ujson.dumps({
"event": name,
"change": round(diff, 1),
"weight": round(w, 1)
}))
except Exception as e:
print("event publish error:", e)
# Startup
oled.fill(0)
oled.text("Group 08", 0, 0)
oled.text("Food Container", 0, 14)
oled.text("Starting...", 0, 28)
oled.show()
time.sleep(1)
servo_angle(90)
connect_wifi()
scale.set_scale(CAL_FACTOR)
scale.wake()
time.sleep(1)
# Keep the Wokwi HX710 slider at 0 kg during startup
do_tare()
connect_mqtt()
# Main loop
while True:
try:
if mqtt_connected:
try:
mqtt.check_msg()
except OSError:
pass
except Exception as e:
print("mqtt check error:", e)
mqtt_connected = False
raw = scale.get_grams(1)
w = smooth(raw)
st = get_status(w)
days = get_days(w)
if stable_w is None:
stable_w = w
last_w = w
if abs(w - stable_w) >= CHANGE_THRESHOLD_G:
is_moving = True
move_time = time.ticks_ms()
if is_moving and time.ticks_diff(time.ticks_ms(), move_time) > 3000:
diff = w - stable_w
if diff >= 20.0:
publish_event("refill", diff, w)
elif diff <= -10.0:
publish_event("consumption", diff, w)
stable_w = w
is_moving = False
now = time.ticks_ms()
time_up = time.ticks_diff(now, last_pub) > HEARTBEAT_MS
changed = last_w is None or abs(w - last_w) >= CHANGE_THRESHOLD_G
st_diff = st != last_st
if time_up or changed or st_diff:
sun_raw, direct_sun = read_sunlight()
sun_status = "NOK" if direct_sun else "OK"
if direct_sun and not sun_alert_active:
print("sunlight warning: direct sunlight detected")
last_evt = "sunlight"
sun_alert_active = True
elif not direct_sun and sun_alert_active:
print("sunlight normal again")
sun_alert_active = False
publish_state(w)
publish_sunlight(sun_raw, sun_status)
last_pub = now
last_w = w
last_st = st
print(
"w:",
round(w / 1000, 2),
"kg | Status:",
st,
"| Estimated to last for:",
days,
"days | Sun:",
sun_status,
"| MQTT:",
mqtt_connected
)
update_leds(st)
show_oled(w, st, days, last_evt)
# Button 1: tare
if not button1.value():
time.sleep_ms(50)
if not button1.value():
print("button1 pressed - tare")
do_tare()
show_oled(w, st, days, "tare")
time.sleep(0.3)
# Button 2: manual refill event
if not button2.value():
time.sleep_ms(50)
if not button2.value():
print("button2 pressed - refill event")
publish_event("refill", 200, w)
time.sleep(0.3)
# Button 3: mix motor
if not button3.value():
time.sleep_ms(50)
if not button3.value():
print("button3 pressed - mix motor")
mix_contents()
time.sleep(0.3)
time.sleep(1)
except Exception as e:
print("error:", e)
if mqtt_connected:
try:
mqtt.publish(T_AVAIL, b"offline", retain=True)
except:
pass
mqtt_connected = False
time.sleep(3)
try:
connect_wifi()
connect_mqtt()
except:
passLoading
pi-pico-w
pi-pico-w
Loading
ssd1306
ssd1306