import network
import time
from machine import Pin, PWM
import ujson
from umqtt.simple import MQTTClient
import urandom
import utime
def random_id(n=6):
chars = "abcdefghijklmnopqrstuvwxyz0123456789"
return "".join(chars[urandom.getrandbits(6) % len(chars)] for _ in range(n))
CLIENT_ID = "esp32-" + random_id()
SERVER = "5232a6122a3a401aaeaa51b71372284c.s1.eu.hivemq.cloud"
USER = "Wokwi123"
PASSWORD = "Wokwi123"
topic = "home/sensor/movement" # topico do HiveMQ Cloud
# setup
print("Connecting to WiFi", end="")
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('Wokwi-GUEST', '')
while not sta_if.isconnected():
print(".", end="")
time.sleep(0.1)
print(" Connected!")
print("Connecting to MQTT server... ", end="")
client = MQTTClient(
client_id=CLIENT_ID,
server=SERVER,
port=8883,
user=USER,
password=PASSWORD,
ssl=True,
ssl_params={"server_hostname": SERVER}
)
btn_pressed = False
command_alarm_off = False
state = 'stoped'
alarm_pattern = 0
last_alarm = 0
first_alarm_message = True
alarm_has_rang = False
last_message = ''
def sub_cb(topic, msg):
print(f"Received message on topic '{topic.decode()}': {msg.decode()}")
if topic.decode() == "home/action/" and msg.decode() == "alarm.off":
if (state == "alarm"):
print("* turning alarm off")
global command_alarm_off
command_alarm_off = True
client.set_callback(sub_cb)
client.connect()
print("Connected!")
client.subscribe(b"home/action/") # actions to the ESP32 do
# end setup
led = Pin(12, Pin.OUT)
btn_stop_alarm = Pin(22, Pin.IN)
motion_sensor = Pin(23, Pin.IN)
buzzer_pin = Pin(15, Pin.OUT)
buzzer = PWM(buzzer_pin)
buzzer.duty(0)
def play_tone(frequency):
buzzer.freq(frequency)
buzzer.duty(512)
# the alarm function
def alarm(pattern: int):
if pattern == 0:
buzzer.duty(0)
led.on()
return 1
play_tone(100)
led.off()
return 0
btn_null_value = btn_stop_alarm.value()
while True: # loop
client.check_msg();
message = ''
# print(motion_sensor.value())
# verify the state of the motion sensor
if motion_sensor.value() == 0:
btn_pressed = False
command_alarm_off = False
state = 'stoped'
else: # if the button was pressed change the state to 'stoped'
if btn_pressed or command_alarm_off:
state = 'stoped'
else:
btn_pressed = False
state = 'alarm'
# = STATE CONTROL ============================================================
# 1. alarm: when the alarm is on; movement was detected by the motion sensor
if state == 'alarm':
message = 'movement detected'
# avoid spam of the same message
if last_message != message:
print(message)
last_message = message
if btn_stop_alarm.value() != btn_null_value:
print('* button pressed')
btn_pressed = True
now = utime.ticks_ms()
# do this code every 500ms
if now - last_alarm >= 500:
if alarm_has_rang == False:
print(f'submitting message to {topic}: Movimento Detectado')
client.publish(topic, "Movimento Detectado")
alarm_has_rang = True
# activate the alarm actions then update the alarm_pattern
alarm_pattern = alarm(alarm_pattern)
last_alarm = now
# 2. stoped: when the alarm is off; motion sensor don't detected any movement
elif state == 'stoped':
message = 'no movement detected'
if last_message != message:
print(message)
last_message = message
if alarm_has_rang == True:
print(f'submitting message to {topic}: Alarme parou')
client.publish(topic, 'Alarme parou')
alarm_has_rang = False
buzzer.duty(0)
led.off()
# time.sleep(1)