import network
import time
from machine import Pin, PWM
import dht
import ujson
from umqtt.simple import MQTTClient
# NOTE: Configuração Wi-Fi
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
def conecta_wifi():
print("Conectando dispositivo ao Wi-Fi", end="")
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect(WIFI_SSID, WIFI_PASSWORD)
while not sta_if.isconnected():
print(".", end="")
time.sleep(1)
# print("Conectado à rede Wi-Fi:", WIFI_SSID)
print("Conectado à rede Wi-Fi")
conecta_wifi()
# NOTE: Configuração MQTT
MQTT_CLIENT_ID = "wokwi001"
MQTT_BROKER = "broker.mqttdashboard.com"
# MQTT_BROKER = "test.mosquitto.org" # OBS: caso o de cima não funcione
MQTT_USER = ""
MQTT_PASSWORD = ""
MQTT_TEMPERATURA_TOPIC = "topico/temperatura"
MQTT_COMANDOS_TOPIC = "topico/comandos"
MQTT_CONTROL_TOPIC = "topico/control"
# NOTE: Função para conectar ao MQTT
def conecta_mqtt():
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, user=MQTT_USER, password=MQTT_PASSWORD)
try:
client.connect()
print("Conectado ao broker MQTT")
except Exception as e:
print("Falha na conexão com o broker MQTT:", e)
client = None
return client
# NOTE: Configuração de callback MQTT
def mqtt_callback(topic, msg):
comando = msg.decode()
print(f"\n\nDados Recebidos! \ntópico = {topic}, mensagem = {message}, comando = {comando}")
if comando == "ligar_led":
LED.on()
print("🟢 LED LIGADO")
elif comando == "desligar_led":
LED.off()
print("🔴 LED DESLIGADO")
# NOTE: Conectando o MQTT
client = conecta_mqtt()
if client:
client.set_callback(mqtt_callback)
client.subscribe(MQTT_COMANDOS_TOPIC)
# NOTE: Configuração dos sensores e atuadores
DHT_PIN = 18 # GPIO 15 for DHT22 data
LED_PIN = 19 # GPIO 2 for the LED # GPIO 21 for OLED I2C SDA
BUZZER_PIN = 4 # GPIO 4 for the buzzer
# NOTE: Inicializando componentes
dht_sensor = dht.DHT22(Pin(DHT_PIN))
led = Pin(LED_PIN, Pin.OUT)
buzzer = PWM(Pin(BUZZER_PIN)) # Initialize PWM for the buzzer
# NOTE: Inicializa estados
led.off()
buzzer.duty(0) # Garante que o buzzer esteja desligado
# NOTE: Conecte-se ao broker MQTT e configure a subscrição e callback
def mqtt_connect():
print("Connecting to MQTT broker ...", end="")
mqtt_client = MQTTClient(MQTT_CLIENT, MQTT_BROKER, user="", password="")
mqtt_client.set_callback(did_receive_callback)
mqtt_client.connect()
print("Connected.")
mqtt_client.subscribe(MQTT_CONTROL_TOPIC)
return mqtt_client
# NOTE: Função para publicar dados no MQTT
def mqtt_client_publish(topic, data):
print("\nUpdating MQTT Broker...")
mqtt_client.publish(topic, data)
print(data)
# NOTE: Função para ativar o buzzer com tom
def activate_buzzer_tone(frequency=1000, duration=1, volume=512):
"""
Play a tone on the buzzer.
:param frequency: Frequency of the tone in Hz (default: 1000 Hz)
:param duration: Duration of the tone in seconds (default: 1 second)
"""
buzzer.freq(frequency) # Set the frequency
buzzer.duty(volume) # Set the duty cycle (50% = 512 out of 1023 for ESP32)
time.sleep(duration) # Keep the tone active for the duration
buzzer.duty(0) # Turn off the buzzer
client = conecta_mqtt()
# NOTE: Conectando ao MQTT
client = conecta_mqtt()
while True:
try:
# NOTE: Leitura do sensor DHT22
dht_sensor.measure()
dht_temperature = dht_sensor.temperature() # temperatura sensor dht22
dht_humidity = dht_sensor.humidity() # umidade sensor dht22
print(f"Temperatura: {dht_temperature}°C, Umidade: {dht_humidity}%")
# NOTE: Controle do LED com base na temperatura do DHT22
if dht_temperature >= 40:
led_status = "🟢LED ON"
led.on()
else:
led_status = "🔴LED OFF"
led.off()
# NOTE: Controle do Buzzer com base na umidade do DHT22
if dht_humidity > 80 and dht_temperature < 25:
buzzer_status = "🟢BUZZER ON"
buzzer.on()
activate_buzzer_tone(frequency=2500, duration=1) # Play tone
time.sleep(5)
else:
buzzer_status = "🔴BUZZER OFF"
buzzer.off()
buzzer.duty(0) # Garante que o buzzer desligue
time.sleep(1)
# NOTE: Publica dados do MQTT
mensagem = ujson.dumps({
"temperatura": dht_temperature,
"umidade": dht_humidity,
"buzzer_status": buzzer_status,
"led_status": led_status,
})
if client:
client.publish(MQTT_TEMPERATURA_TOPIC, mensagem)
print("Dados de temperatura enviados ao servidor MQTT.")
except Exception as e:
print("Error reading sensor:", e)
time.sleep(5) # Aguarde 5 segundos antes da próxima leitura
# ALTERADO:
# from machine import Pin
# from umqtt.simple import MQTTClient
# import ujson
# import network
# import utime as time
# import dht
# # Configuração do dispositivo
# DEVICE_ID = "wokwi001"
# # Configuração do WiFi
# WIFI_SSID = "Wokwi-GUEST"
# WIFI_PASSWORD = ""
# # Configuração MQTT
# # MQTT_BROKER = "broker.mqttdashboard.com"
# MQTT_BROKER = "test.mosquitto.org"
# MQTT_CLIENT = DEVICE_ID
# MQTT_TELEMETRY_TOPIC = "iot/telemetry" # Tópico para envio de telemetria
# MQTT_CONTROL_TOPIC = "iot/control" # Tópico para controle do dispositivo
# # Configuração dos sensores e atuadores
# DHT_PIN = Pin(18)
# BUZZER = Pin(5, Pin.OUT)
# RED_LED = Pin(19, Pin.OUT)
# dht_sensor = dht.DHT22(DHT_PIN)
# # Estado inicial dos atuadores
# led_status = "OFF"
# buzzer_status = "OFF"
# # Conecte-se ao WiFi
# wifi_client = network.WLAN(network.STA_IF)
# wifi_client.active(True)
# print("Connecting device to WiFi...")
# wifi_client.connect(WIFI_SSID, WIFI_PASSWORD)
# while not wifi_client.isconnected():
# print("Connecting")
# time.sleep(0.1)
# print("WiFi Connected!")
# # Função de callback para recebimento de mensagens MQTT
# def did_receive_callback(topic, message):
# print(f'\n\nData Received! \ntopic = {topic}, message = {message}')
# # Controle do LED via MQTT
# if topic == MQTT_CONTROL_TOPIC.encode():
# if message == ('{0}/lamp/on'.format(DEVICE_ID)).encode() or message == b'lamp/on':
# RED_LED.on()
# global led_status
# led_status = "ON"
# elif message == ('{0}/lamp/off'.format(DEVICE_ID)).encode() or message == b'lamp/off':
# RED_LED.off()
# led_status = "OFF"
# elif message == ('{0}/status'.format(DEVICE_ID)).encode() or message == b'status':
# mqtt_client_publish(MQTT_TELEMETRY_TOPIC, telemetry_data_old)
# # Conecte-se ao broker MQTT e configure a subscrição e callback
# def mqtt_connect():
# print("Connecting to MQTT broker ...", end="")
# mqtt_client = MQTTClient(MQTT_CLIENT, MQTT_BROKER, user="", password="")
# mqtt_client.set_callback(did_receive_callback)
# mqtt_client.connect()
# print("Connected.")
# mqtt_client.subscribe(MQTT_CONTROL_TOPIC)
# return mqtt_client
# # Função para criar o JSON de telemetria
# def create_json_data(dht_temperature, dht_humidity, led_status, buzzer_status):
# data = ujson.dumps({
# "device_id": DEVICE_ID,
# "dht_temperature": dht_temperature,
# "dht_humidity": dht_humidity,
# "led_status": led_status,
# "buzzer_status": buzzer_status
# })
# return data
# # Função para publicar dados no MQTT
# def mqtt_client_publish(topic, data):
# print("\nUpdating MQTT Broker...")
# mqtt_client.publish(topic, data)
# print(data)
# # Conectar ao MQTT
# mqtt_client = mqtt_connect()
# RED_LED.off() # Inicializa o LED desligado
# BUZZER.off() # Inicializa o Buzzer desligado
# mqtt_client_publish(MQTT_CONTROL_TOPIC, 'lamp/off') # Publica estado inicial do LED
# # Variáveis de telemetria
# telemetry_data_old = None
# # Loop principal
# while True:
# try:
# # Leitura do sensor DHT22
# dht_sensor.measure()
# dht_temperature = dht_sensor.temperature()
# dht_humidity = dht_sensor.humidity()
# # Controle do LED com base na temperatura do DHT22
# if dht_temperature > 40:
# RED_LED.on()
# led_status = "ON"
# else:
# RED_LED.off()
# led_status = "OFF"
# # Controle do Buzzer com base na umidade do DHT22
# if dht_humidity > 80 and dht_temperature < 25:
# time.sleep(1)
# BUZZER.on()
# buzzer_status = "ON"
# else:
# BUZZER.off()
# buzzer_status = "OFF"
# # Criação e envio dos dados de telemetria
# telemetry_data_new = create_json_data(dht_temperature, dht_humidity, led_status, buzzer_status)
# # Envia telemetria se houver atualização
# if telemetry_data_new != telemetry_data_old:
# mqtt_client_publish(MQTT_TELEMETRY_TOPIC, telemetry_data_new)
# telemetry_data_old = telemetry_data_new
# # Checa mensagens MQTT
# mqtt_client.check_msg()
# except Exception as e:
# print("Error reading sensor:", e)
# time.sleep(5) # Aguarde 5 segundos antes da próxima leitura