from umqtt.simple import MQTTClient
from hcsr04 import HCSR04
import utime
import machine
from machine import Pin, PWM
import network
import micropython
import time
from utime import sleep
class Dispenser:
def __init__(self):
# Configuración de WiFi
self.ssid = 'Wokwi-GUEST'
self.wifipassword = ''
# Configuración de MQTT
self.mqtt_server = 'io.adafruit.com'
self.port = 1883
self.user = 'micacorvera'
self.password = 'aio_jYyt96XcLo12W9mDpFRH3Pgr1D34'
self.client_id = 'Dispenser'
self.topic_1 = 'micacorvera/Feeds/sensor HC-SR04'
self.topic_2 = 'micacorvera/Feeds/servo'
# Inicializar componentes
self.configurar_componentes()
def configurar_componentes(self):
# Configuración de botones
self.boton0 = Pin(0, Pin.IN, Pin.PULL_UP)
self.boton1 = Pin(1, Pin.IN, Pin.PULL_UP)
self.boton2 = Pin(3, Pin.IN, Pin.PULL_UP)
# Configuración del servo
self.servo_pin = machine.PWM(machine.Pin(2))
self.servo_pin.freq(50)
# Configuración del sensor
self.sensor = HCSR04(trigger_pin=4, echo_pin=5)
# Estados iniciales de los botones
self.ultimo_estado_boton0 = self.boton0.value()
self.ultimo_estado_boton1 = self.boton1.value()
self.ultimo_estado_boton2 = self.boton2.value()
# Conectar a WiFi y MQTT
self.configurar_wifi()
self.configurar_mqtt()
def configurar_wifi(self):
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect(self.ssid, self.wifipassword)
print("Conectando a WiFi...")
while not sta_if.isconnected():
print(".", end="")
time.sleep(0.1)
print("\nConectado a WiFi")
print(sta_if.ifconfig())
def configurar_mqtt(self):
try:
self.conexionMQTT = MQTTClient(
self.client_id,
self.mqtt_server,
user=self.user,
password=self.password,
port=int(self.port)
)
self.conexionMQTT.set_callback(self.callback_mqtt)
self.conexionMQTT.connect()
self.conexionMQTT.subscribe(self.topic_1)
print("Conectado al broker MQTT")
except OSError as e:
print(f"Error en conexión MQTT: {e}")
time.sleep(5)
machine.reset()
def callback_mqtt(self, topic, msg):
print(f"Mensaje recibido en {topic}: {msg.decode('utf-8')}")
def servo(self, grados):
if grados > 180:
grados = 180
if grados < 0:
grados = 0
maxDuty = 9000
minDuty = 1000
newDuty = minDuty + (maxDuty - minDuty) * (grados / 180)
self.servo_pin.duty_u16(int(newDuty))
def girar_servo(self, tiempo_espera):
for grado in range(0, 181, 1):
self.servo(grado)
sleep(0.01)
sleep(tiempo_espera)
for grado in range(180, -1, -1):
self.servo(grado)
sleep(0.01)
def leer_sensor(self):
print(str(self.sensor.distance_cm()))
return self.sensor.distance_cm()
def publicar_estado(self, boton, accion):
try:
mensaje = {
"boton": boton,
"accion": accion,
"tiempo": time.time()
}
self.conexionMQTT.publish(self.topic_2, json.dumps(mensaje))
except OSError as e:
print(f"Error al publicar estado: {e}")
time.sleep(5)
machine.reset()
def ejecutar_accion(self, boton, tiempo_espera):
# Primero mover el servo
self.girar_servo(tiempo_espera)
# Luego publicar el estado (no bloqueante)
try:
self.publicar_estado(boton, "ejecutado")
except OSError as e:
print(f"Error al publicar estado: {e}")
# Continuar ejecución aunque falle la publicación
def iniciar(self):
print("Sistema iniciado. Presiona Ctrl+C para detener.")
try:
while True:
# Leer estados actuales de los botones
estado_actual_boton0 = self.boton0.value()
estado_actual_boton1 = self.boton1.value()
estado_actual_boton2 = self.boton2.value()
# Detectar cambios en los botones
if estado_actual_boton0 != self.ultimo_estado_boton0:
if estado_actual_boton0 == 0: # Botón presionado
self.ejecutar_accion("boton0", 1)
if estado_actual_boton1 != self.ultimo_estado_boton1:
if estado_actual_boton1 == 0: # Botón presionado
self.ejecutar_accion("boton1", 2)
if estado_actual_boton2 != self.ultimo_estado_boton2:
if estado_actual_boton2 == 0: # Botón presionado
self.ejecutar_accion("boton2", 3)
# Actualizar estados anteriores
self.ultimo_estado_boton0 = estado_actual_boton0
self.ultimo_estado_boton1 = estado_actual_boton1
self.ultimo_estado_boton2 = estado_actual_boton2
# Leer sensor y publicar datos
distancia = self.leer_sensor()
self.conexionMQTT.publish(self.topic_1, f"{distancia}")
# Verificar mensajes MQTT
try:
self.conexionMQTT.check_msg()
except OSError as e:
print(f"Error en check_msg: {e}")
time.sleep(1)
self.configurar_mqtt()
sleep(0.1)
except KeyboardInterrupt:
print("\nSistema detenido por el usuario")
except Exception as e:
print(f"Error inesperado: {e}")
machine.reset()
# Iniciar el sistema
dispenser = Dispenser()
dispenser.iniciar()