import time
import network
import ssd1306
from machine import Pin, PWM, I2C
from umqtt.simple import MQTTClient
# Modify wherever the values contain <>
MQTT_BROKER = "broker.hivemq.com"
MQTT_CLIENT_ID = "buzzer-<class>-<number in class>-<name>"
MQTT_TOPIC = "elsys/<class>/<number in class>/<name>"
# Buzzer constants
BUZZER_PINS = [26, 25, 33, 32]
BUZZERS = []
# Display constants
DISPLAY_SCL_PIN = 21
DISPLAY_SDA_PIN = 22
DISPLAY_WIDTH = 128
DISPLAY_HEIGHT = 64
# Connect to Wi-Fi
def connect_to_wifi():
print("Connecting to Wi-Fi", end="")
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('Wokwi-GUEST', '')
while not sta_if.isconnected():
print(".", end="")
time.sleep(0.1)
print(" Connected!")
# Connect to MQTT Broker
def connect_to_mqtt_broker():
connect_to_wifi()
print("Connecting to MQTT server... ", end="")
client.connect()
print("Connected!")
client.subscribe(MQTT_TOPIC)
print("Subscribed to {}".format(MQTT_TOPIC))
# Handle MQTT messages
def on_message(topic, message):
topic = topic.decode("utf-8")
buzzer = int(message.decode("utf-8"))
end_time = time.time() + 5
while time.time() < end_time:
fill_display(end_time - time.time())
BUZZERS[buzzer].duty_u16(512)
time.sleep(0.1)
BUZZERS[buzzer].duty_u16(0)
clear_display()
BUZZERS[buzzer].duty_u16(0)
# Fill display
def fill_display(time):
display.fill(0)
display.text("Time left:", 0, 0)
display.text("{:.2f}".format(time), 0, 30)
display.show()
# Clear display
def clear_display():
display.fill(0)
display.show()
# Init MQTT client
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER)
client.set_callback(on_message)
connect_to_mqtt_broker()
# Init buzzers
for i in range(0, 4):
BUZZERS.append(PWM(Pin(BUZZER_PINS[i], Pin.OUT), freq=400, duty_u16=0))
# Init OLED display
i2c = I2C(0, scl=Pin(DISPLAY_SCL_PIN), sda=Pin(DISPLAY_SDA_PIN))
display = ssd1306.SSD1306_I2C(DISPLAY_WIDTH, DISPLAY_HEIGHT, i2c)
# Loop
try:
while 1:
client.wait_msg()
finally:
client.disconnect()