from machine import Pin, time_pulse_us, SoftI2C # type: ignore
from umqtt.simple import MQTTClient # type: ignore
from lcd_i2c import I2cLcd
import sys
import network # type: ignore
import ujson # type: ignore
# Only for IDE (Neovim) purposes.
if sys.implementation.name != 'micropython':
from typing import Callable
def sleep_us(_: int) -> None: ...
def sleep_ms(_: int) -> None: ...
def ticks_ms() -> int: ...
from time import sleep
else:
from time import sleep_us, sleep, ticks_ms # type: ignore
def connect_wifi(ssid: str, password: str) -> None:
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
wifi.connect(ssid, password)
print("Connecting to WiFi...", end="")
while not wifi.isconnected():
print(".", end="")
sleep(0.5)
print("\nWiFi connected:", wifi.ifconfig())
class UltrasonicSensor:
def __init__(self, trig: int, echo: int) -> None:
self._trig = Pin(trig, Pin.OUT)
self._echo = Pin(echo, Pin.IN)
self._trig.value(0)
def get_distance(self) -> float:
self._trig.value(0)
sleep_us(5)
self._trig.value(1)
sleep_us(50)
self._trig.value(0)
time_pulse = time_pulse_us(self._echo, 1, 500*2*30)
return time_pulse / 58.2 if time_pulse > 0.0 else -1.0
class Button:
def __init__(self, pin: int) -> None:
self._button = Pin(pin, Pin.IN)
self._button.irq(trigger=Pin.IRQ_RISING, handler=self._handle_interrupt)
self._state = False
def _handle_interrupt(self, _) -> None:
self._state = True
def is_press(self) -> bool:
if self._state:
self._state = False
return True
return False
class TimedLed:
def __init__(self, pin: int) -> None:
self._led = Pin(pin, Pin.OUT)
self._state = False
self._on_at: int | None = None
self._on_time: int | None = None
def on_for(self, on_time_ms: int) -> None:
if self._state:
return
self._led.value(1)
self._state = True
self._on_at = ticks_ms()
self._on_time = on_time_ms
def reset(self) -> None:
self._on_at = None
self._on_time = None
self._state = False
self._led.value(0)
def update(self) -> None:
if self._on_at == None or self._on_time == None:
return
if ticks_ms() - self._on_at >= self._on_time:
self.reset()
class RateLimiter:
def __init__(self, rate: int) -> None:
self._rate = rate
self._last_ensure = ticks_ms()
def ensure(self) -> bool:
result = False
current_tick = ticks_ms()
if current_tick - self._last_ensure > self._rate:
result = True
self._last_ensure = current_tick
return result
class ItemsCounter:
OBJECT_DISTANCE_THRESHOLD = 10
def __init__(self, ultrasonic: UltrasonicSensor) -> None:
self._ultrasonic = ultrasonic
self.counter = 0
self._object_passed = True
self._last_check_time: int | None = None
def check(self) -> None:
distance = self._ultrasonic.get_distance()
if not self._object_passed:
if distance > self.OBJECT_DISTANCE_THRESHOLD:
self._object_passed = True
if distance < self.OBJECT_DISTANCE_THRESHOLD and self._object_passed:
self.counter += 1
self._object_passed = False
self._last_check_time = ticks_ms()
def reset(self) -> None:
self.counter = 0
class MqttSystem:
MQTT_BROKER = "broker.netpie.io"
TOPIC_SHADOW = "@shadow/data/update"
TOPIC_CONTROL = "@msg/operator"
def __init__(self, client: str, user: str, token: str) -> None:
self._rate_limiter = RateLimiter(5000)
self._client = self._create_client(client, user, token)
self._handler: dict[str, Callable[[bytes, bytes], None]] = {}
self._update_item_count(0)
Pin(14, Pin.OUT).value(1)
def _create_client(self, client: str, user: str, token: str) -> MQTTClient:
c = MQTTClient(client, MqttSystem.MQTT_BROKER, user=user, password=token)
c.set_callback(self._callback)
c.connect()
c.subscribe(MqttSystem.TOPIC_CONTROL)
return c
def new_handler(self, msg: str, callback: Callable[[bytes, bytes], None]) -> None:
self._handler[msg] = callback
def _callback(self, topic: bytes, msg: bytes) -> None:
print(f"{topic}: {msg}")
msg_decode = msg.decode()
if msg_decode not in self._handler:
raise Exception(f"Handler does not exist for {msg}")
self._handler[msg_decode](topic, msg)
def _update_item_count(self, count: int) -> None:
self._client.publish(MqttSystem.TOPIC_SHADOW, ujson.dumps({
"data": {
"counter": count
}
}))
def update(self, item_counter: ItemsCounter) -> None:
if not self._rate_limiter.ensure():
return
self._update_item_count(item_counter.counter)
self._client.check_msg()
class StatusDisplay:
_lcd = I2cLcd(SoftI2C(sda=Pin(21), scl=Pin(22), freq=400000), 0x27, 2, 16)
_rate_limiter = RateLimiter(500)
@classmethod
def _display_counter(cls, n: int) -> None:
cls._lcd.putstr(f"{n}")
@classmethod
def update(cls, item_counter: ItemsCounter) -> None:
if not cls._rate_limiter.ensure():
return
cls._lcd.clear()
cls._display_counter(item_counter.counter)
class ResetCounterSystem:
def __init__(self, reset_button: Button, timed_led: TimedLed):
self._reset_button = reset_button
self._timed_led = timed_led
def update(self, item_counter: ItemsCounter) -> None:
self._timed_led.update()
if self._reset_button.is_press():
item_counter.reset()
self._timed_led.on_for(300)
if __name__ == "__main__":
connect_wifi("Wokwi-GUEST", "")
mqtt = MqttSystem(
client="31f8de60-3225-4e30-8594-00f72d258996",
user="N7C8ZFnAXkbNTQkcjZRuZqhKZngY3CcX",
token="79bye9WQDnJurqctxcEnVZp1NJ49udCt")
item_counter = ItemsCounter(UltrasonicSensor(17, 16))
reset_counter = ResetCounterSystem(Button(18), TimedLed(27))
temp_led = Pin(25, Pin.OUT)
mqtt.new_handler("RESET_COUNTER", lambda *_: item_counter.reset())
mqtt.new_handler("BELT_ON", lambda *_: temp_led.value(1))
mqtt.new_handler("BELT_OFF", lambda *_: temp_led.value(0))
while True:
item_counter.check()
StatusDisplay.update(item_counter)
reset_counter.update(item_counter)
mqtt.update(item_counter)