from machine import Pin, I2C, PWM
import ssd1306
import json
import network
import time
from umqtt.simple import MQTTClient
# ESP32 Pin assignment
i2c = I2C(0, scl=Pin(22), sda=Pin(21))
oled_width = 128
oled_height = 64
oled = ssd1306.SSD1306_I2C(oled_width, oled_height, i2c)
# MQTT Server Parameters
MQTT_CLIENT_ID = "buzzer-module"
MQTT_BROKER = "broker.mqttdashboard.com"
MQTT_BUZZER_TOPIC = "elsys/12/a/buzzer/buzz"
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER)
pins = [
Pin(5, Pin.OUT, value = 0),
Pin(17, Pin.OUT, value = 0),
Pin(16, Pin.OUT, value = 0),
Pin(4, Pin.OUT, value = 0)
]
def connect_to_wifi():
# Connect to Wi-Fi
print("Connecting to Wi-Fi", end="")
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('Wokwi-GUEST', '')
while not sta_if.isconnected():
print(".", end="")
time.sleep(0.1)
print(" Connected!")
def connect_to_mqtt_broker():
# Connect to HiveMQ MQTT broker
connect_to_wifi()
print("Connecting to MQTT server... ", end="")
client.connect()
print("Connected!")
client.subscribe(MQTT_BUZZER_TOPIC)
print("Subscribed to {}".format(MQTT_BUZZER_TOPIC))
def buzz(topic, message):
topic = topic.decode("utf-8")
message = message.decode("utf-8")
print(message)
print("Received from MQTT topic: {} - message: {}".format(topic, message))
message = json.loads(message)
buzzer_pin = pins[message["button"]]
buzzer = PWM(buzzer_pin, 100)
buzzer.duty(100)
for i in range(5, 0, -1):
oled.text(str(i), 10, 10)
oled.show()
time.sleep(1)
oled.fill(0)
buzzer.duty(0)
client.set_callback(buzz)
connect_to_mqtt_broker()
oled.text("Waiting...", 10, 10)
oled.show()
try:
while 1:
oled.fill(0)
client.wait_msg()
oled.text("Waiting...", 10, 10)
oled.show()
time.sleep(5)
finally:
client.disconnect()