"""
Para visualizar os dados e controlar a lixeira:
1. Acesse: http://www.hivemq.com/demos/websocket-client/
2. Clique em "Connect"
3. Em "Subscriptions", clique em "Add New Topic Subscription" e adicione os tópicos:
- "lixeira/status" (Para receber o status da lixeira)
- "lixeira/comandos" (Para enviar comandos para a lixeira, ex: "abrir_tampa")
"""
import network
import time
from machine import Pin, PWM
from umqtt.simple import MQTTClient
import ujson
import dht #lib pro DHT22
MQTT_CLIENT_ID = "andreylindao123lixeira"
MQTT_BROKER = "broker.mqttdashboard.com"
MQTT_USER = ""
MQTT_PASSWORD = ""
TOPICO_STATUS_LIXEIRA = "lixeira/status"
TOPICO_COMANDO_LIXEIRA = "lixeira/comandos"
PINO_TRIG = 18
PINO_ECHO = 19
PINO_DHT = 22
PINO_SERVO = 13
PINO_BUZZER = 23
sensor_dht = dht.DHT22(Pin(PINO_DHT))
buzzer = Pin(PINO_BUZZER, Pin.OUT)
# Frequência padrão para servo é 50 Hz
servo_pwm = PWM(Pin(PINO_SERVO))
servo_pwm.freq(50)
#Calibração da Lixeira (ajustar conforme tamanho da lixeira na vida real)
# Distância do sensor ao fundo da lixeira quando VAZIA (cm)
DISTANCIA_LIXEIRA_VAZIA_CM = 20.0
# Distância do sensor ao topo do lixo quando CHEIA (cm)
DISTANCIA_LIXEIRA_CHEIA_CM = 5.0
#Posições do Servo (ajuste se a tampa(lid) não abrir/fechar totalmente) ---
LID_OPEN_DUTY = 90
LID_CLOSED_DUTY = 40
status_tampa = "fechada" # Estado inicial da tampa
status_buzzer = "desligado" # Estado inicial do buzzer
def ler_distancia_cm():
trig_pin = Pin(PINO_TRIG, Pin.OUT)
echo_pin = Pin(PINO_ECHO, Pin.IN)
trig_pin.value(0)
time.sleep_us(2)
trig_pin.value(1)
time.sleep_us(10)
trig_pin.value(0)
timeout_us = 30000
t_start = time.ticks_us()
while echo_pin.value() == 0:
if time.ticks_diff(time.ticks_us(), t_start) > timeout_us:
return -1
t_pulse_start = time.ticks_us()
while echo_pin.value() == 1:
if time.ticks_diff(time.ticks_us(), t_pulse_start) > timeout_us:
return -1
duracao = time.ticks_diff(time.ticks_us(), t_pulse_start)
if duracao == -1:
return -1
distancia_cm = (duracao * 0.0343) / 2
return distancia_cm
def calcular_porcentagem_enchimento(distancia):
if distancia == -1: # Se a leitura da distância falhou
return -1 # Indica erro ou leitura inválida
if distancia >= DISTANCIA_LIXEIRA_VAZIA_CM:
return 0 # Lixeira vazia
elif distancia <= DISTANCIA_LIXEIRA_CHEIA_CM:
return 100 # Lixeira cheia
else:
# Mapeia a distância para uma porcentagem de 0 a 100
# Inverte a lógica: menor distância = mais cheio
range_total = DISTANCIA_LIXEIRA_VAZIA_CM - DISTANCIA_LIXEIRA_CHEIA_CM
if range_total <= 0: # Evita divisão por zero ou range inválido
return 100 # Assume cheia se a calibração tiver inválida
distancia_atual_no_range = distancia - DISTANCIA_LIXEIRA_CHEIA_CM
porcentagem = (1 - (distancia_atual_no_range / range_total)) * 100
return max(0, min(100, int(porcentagem))) # Garante entre 0 e 100
def ler_dht():
temperatura = -1.0
umidade_ar = -1.0
try:
sensor_dht.measure() # Recebendo as medições do sensor
temperatura = sensor_dht.temperature()
umidade_ar = sensor_dht.humidity()
except OSError as e: # Erro de comunicação com o DHT
print(f"Erro ao ler DHT22: {e}")
return temperatura, umidade_ar
def abrir_tampa():
global status_tampa
servo_pwm.duty(LID_OPEN_DUTY)
status_tampa = "aberta"
def fechar_tampa():
global status_tampa
servo_pwm.duty(LID_CLOSED_DUTY)
status_tampa = "fechada"
def tocar_buzzer(duracao_ms=200):
global status_buzzer
if status_buzzer == "desligado": # Evita ligar o buzzer se já estiver ligado
buzzer.value(1) # Liga o buzzer
status_buzzer = "ligado"
time.sleep_ms(duracao_ms)
buzzer.value(0) # Desliga o buzzer
status_buzzer = "desligado"
# pra receber comando
def mqtt_callback(topic, msg):
print(f"Comando MQTT recebido: {topic.decode()} -> {msg.decode()}")
comando = msg.decode()
if comando == "abrir_tampa":
status_tampa = "aberta"
abrir_tampa()
elif comando == "fechar_tampa":
status_tampa = "fechada"
fechar_tampa()
elif comando == "alarme_on":
buzzer.value(1)
global status_buzzer
status_buzzer = "ligado"
elif comando == "alarme_off":
buzzer.value(0)
global status_buzzer
status_buzzer = "desligado"
else:
print("Comando desconhecido.")
#Conexão Wi-Fi
print("Conectando ao WiFi", end="")
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('Wokwi-GUEST', '')
while not sta_if.isconnected():
print(".", end="")
time.sleep(0.1)
print(" Conectado!")
#Conexão MQTT
print("Conectando ao servidor MQTT... ", end="")
try:
cliente_mqtt = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, user=MQTT_USER, password=MQTT_PASSWORD)
cliente_mqtt.set_callback(mqtt_callback) # Define a função de callback para mensagens recebidas
cliente_mqtt.connect()
cliente_mqtt.subscribe(TOPICO_COMANDO_LIXEIRA) # A lixeira assina este tópico para receber comandos
print("Conectado!")
except Exception as e:
print(f"Falha na conexão MQTT: {e}")
time.sleep(5)
machine.reset() # Reinicia o ESP32
#Loop Principal da Lixeira
last_publish_time = 0
PUBLISH_INTERVAL_MS = 5000 # Publica o status a cada 5 segundos
while True:
#Verifica Mensagens MQTT Recebidas (Comandos)
cliente_mqtt.check_msg()
#Leitura e Processamento dos Sensores
distancia = ler_distancia_cm()
porcentagem_enchimento = calcular_porcentagem_enchimento(distancia)
temperatura, umidade_ar = ler_dht()
#Atuadores
#Notificação de Umidade / Líquido:
#Se a umidade estiver muito alta, consideramos que há líquido ou excesso de umidade.
liquido_detectado = False
if umidade_ar != -1.0 and umidade_ar > 85.0: #>85% de umidade no ar indica problema/líquido/o dht22 ta começando no -1 por algum motivo
liquido_detectado = True
# Controle da Tampa (Motor Servo)
if liquido_detectado:
fechar_tampa() # Se líquido, mantenha a tampa fechada
elif porcentagem_enchimento >= 95:
fechar_tampa() # Se lixeira cheia, mantenha a tampa fechada
elif distancia != -1 and distancia < DISTANCIA_LIXEIRA_VAZIA_CM and distancia > (DISTANCIA_LIXEIRA_CHEIA_CM + 3.0):
# Se um objeto é detectado perto do sensor (simulando alguém se aproximando para jogar lixo), parte mais experimental[ideia]
abrir_tampa()
else:
fechar_tampa() # Condição padrão: tampa fechada
# Controle do Buzzer
if liquido_detectado:
tocar_buzzer(500) # Alerta mais longo para detecção de líquido
elif porcentagem_enchimento >= 90:
tocar_buzzer(200) # Alerta curto para lixeira quase cheia
# Publicar Status via MQTT
if (time.ticks_ms() - last_publish_time) > PUBLISH_INTERVAL_MS:
dados_lixeira = {
"distancia": f"{distancia:.1f}cm",
"preenchimento": f"{porcentagem_enchimento}%",
"temperatura": f"{temperatura:.1f}C",
"umidade_ar": f"{umidade_ar:.1f}%",
"tampa": status_tampa,
"alarme": status_buzzer
}
# Converte o dicionário para uma string JSON
mensagem_mqtt_json = ujson.dumps(dados_lixeira)
print(f"Publicando MQTT: {mensagem_mqtt_json}")
cliente_mqtt.publish(TOPICO_STATUS_LIXEIRA, mensagem_mqtt_json.encode('utf-8'))#alfabeto
last_publish_time = time.ticks_ms()
time.sleep(1)