import network
import time
from machine import Pin, PWM
import ujson
from umqtt.simple import MQTTClient
import urandom
import utime
# CONFIGURAÇÕES
# ===================================================================
def random_id(n=6):
chars = "abcdefghijklmnopqrstuvwxyz0123456789"
return "".join(chars[urandom.getrandbits(6) % len(chars)] for _ in range(n))
CLIENT_ID = "esp32-" + random_id()
SERVER = "5232a6122a3a401aaeaa51b71372284c.s1.eu.hivemq.cloud"
USER = "Wokwi123"
PASSWORD = "Wokwi123"
TOPIC = "home/sensor/movement"
TOPIC_ACTION = "home/action/"
# WIFI
# ===================================================================
print("Connecting to WiFi", end="")
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('Wokwi-GUEST', '')
while not sta_if.isconnected():
print(".", end="")
time.sleep(0.1)
print(" Connected!")
# MQTT
# ===================================================================
def sub_cb(topic, msg):
topic = topic.decode()
msg = msg.decode()
print(f"[MQTT] {topic}: {msg}")
if topic == TOPIC_ACTION:
if msg == "alarm.off" and state["mode"] == "alarm":
print("* turning alarm off")
state["command_alarm_off"] = True
elif msg == "turn-off":
print("* turning system off")
state["is_on"] = False
elif msg == "turn-on":
print("* turning system on")
state["is_on"] = True
client = MQTTClient(
client_id=CLIENT_ID,
server=SERVER,
port=8883,
user=USER,
password=PASSWORD,
ssl=True,
ssl_params={"server_hostname": SERVER}
)
client.set_callback(sub_cb)
client.connect()
client.subscribe(TOPIC_ACTION.encode())
print("Connected to MQTT!")
def send_message(msg: str):
# Helper para imprimir e publicar mensagens MQTT
print(f"submitting message to {TOPIC}: {msg}")
client.publish(TOPIC, msg)
# HARDWARE
# ==================================================================
led = Pin(12, Pin.OUT)
led_on = Pin(14, Pin.OUT)
btn_stop_alarm = Pin(22, Pin.IN, Pin.PULL_UP) # ativo em LOW
btn_turn_on = Pin(16, Pin.IN, Pin.PULL_UP) # ativo em LOW
motion_sensor = Pin(23, Pin.IN)
buzzer_pin = Pin(15, Pin.OUT)
buzzer = PWM(buzzer_pin)
buzzer.duty(0)
def play_tone(frequency):
buzzer.freq(frequency)
buzzer.duty(512)
def alarm(pattern: int):
if pattern == 0:
buzzer.duty(0)
led.on()
return 1
play_tone(100)
led.off()
return 0
# ESTADO DO SISTEMA
# =================================================================
state = {
"btn_pressed": False,
"command_alarm_off": False,
"mode": "stopped",
"alarm_pattern": 0,
"last_alarm": 0,
"alarm_has_rang": False,
"last_message": "",
"is_on": False,
}
# LOOP PRINCIPAL
# =================================================================
while True:
client.check_msg()
message = ""
# Toggle power pelo botão verde (ativo em LOW)
if btn_turn_on.value() == 0:
state["is_on"] = not state["is_on"]
message = "* toggle power (on/off)"
time.sleep(0.2) # debounce simples
if not state["is_on"]:
if led_on.value() == 1: # estava ligado
message = "* turning off (loop)"
send_message("Alarme desligado")
led_on.off()
continue
if led_on.value() == 0: # ligar alarme
send_message("Alarme ligado")
led_on.on()
# verificar movimento
if motion_sensor.value() == 0:
state["btn_pressed"] = False
state["command_alarm_off"] = False
state["mode"] = "stopped"
else:
if state["btn_pressed"] or state["command_alarm_off"]:
state["mode"] = "stopped"
else:
state["btn_pressed"] = False
state["mode"] = "alarm"
# ~ STATE CONTROL =================================================
if state["mode"] == "alarm":
message = "movement detected"
if btn_stop_alarm.value() == 0: # botão azul (ativo em LOW)
print("* stop button pressed")
state["btn_pressed"] = True
time.sleep(0.2)
continue
now = utime.ticks_ms()
if now - state["last_alarm"] >= 500:
if not state["alarm_has_rang"]:
send_message("Movimento Detectado")
state["alarm_has_rang"] = True
state["alarm_pattern"] = alarm(state["alarm_pattern"])
state["last_alarm"] = now
elif state["mode"] == "stopped":
message = "no movement detected"
if state["alarm_has_rang"]:
send_message("Alarme parou")
state["alarm_has_rang"] = False
state["btn_pressed"] = False
buzzer.duty(0)
led.off()
if state["last_message"] != message:
print(message)
state["last_message"] = message