import network
import time
from umqtt.simple import MQTTClient
import ubinascii
import machine
import json
from machine import Timer
import esp32
# Configuración
CONFIG = {
'WIFI': {
'NETWORKS': [
{
'SSID': "Wokwi-GUEST",
'PASSWORD': "",
'IS_WOKWI': True
},
{
'SSID': "HEISENBERG",
'PASSWORD': "c0l0n14560",
'IS_WOKWI': False
}
],
'RETRY_LIMIT': 3,
'RETRY_DELAY': 5,
'CONNECTION_TIMEOUT': 10
},
'MQTT': {
'BROKER': "test.mosquitto.org",
'PORT': 1883,
'KEEP_ALIVE': 30,
'PREFIX': "fenix",
'RECONNECT_DELAY': 5
},
'PINS': {
'LED': 14,
'BUZZER': 12,
'PIR': 13
},
'INTERVALS': {
'PIR_CHECK': 2000, # ms
'TEMP_CHECK': 30000 # ms
}
}
class DeviceController:
def __init__(self):
# Inicialización de pines
self.led = machine.Pin(CONFIG['PINS']['LED'], machine.Pin.OUT)
self.buzzer = machine.Pin(CONFIG['PINS']['BUZZER'], machine.Pin.OUT)
self.pir = machine.Pin(CONFIG['PINS']['PIR'], machine.Pin.IN)
# Estado del dispositivo
self.device_state = {
'led': False,
'buzzer': False,
'movement': False,
'temperature': 0,
'last_movement': 0,
'wifi_strength': 0,
'is_wokwi': False
}
# Temporizadores
self.init_timers()
# Inicialización de topics MQTT
self.topics = {
'led': f"{CONFIG['MQTT']['PREFIX']}/led",
'buzzer': f"{CONFIG['MQTT']['PREFIX']}/buzzer",
'movement': f"{CONFIG['MQTT']['PREFIX']}/movement",
'status': f"{CONFIG['MQTT']['PREFIX']}/status",
'temperature': f"{CONFIG['MQTT']['PREFIX']}/temperature"
}
self.mqtt_client = None
self.wifi = network.WLAN(network.STA_IF)
def init_timers(self):
# Timer para revisar el sensor PIR
self.pir_timer = Timer(0)
self.pir_timer.init(period=CONFIG['INTERVALS']['PIR_CHECK'],
mode=Timer.PERIODIC,
callback=lambda t: self.check_movement())
# Timer para revisar la temperatura
self.temp_timer = Timer(1)
self.temp_timer.init(period=CONFIG['INTERVALS']['TEMP_CHECK'],
mode=Timer.PERIODIC,
callback=lambda t: self.check_temperature())
def connect_wifi(self):
print("Iniciando conexión WiFi...")
if not self.wifi.active():
self.wifi.active(True)
# Intentar conectar a cada red configurada
for network_config in CONFIG['WIFI']['NETWORKS']:
try:
print(f"Intentando conectar a {network_config['SSID']}", end="")
self.wifi.connect(network_config['SSID'], network_config['PASSWORD'])
# Contador para timeout
timeout_counter = 0
while not self.wifi.isconnected():
print(".", end="")
time.sleep(0.1)
timeout_counter += 0.1
if timeout_counter >= CONFIG['WIFI']['CONNECTION_TIMEOUT']:
print("\nTimeout de conexión")
break
if self.wifi.isconnected():
print("\n¡Conectado!")
self.device_state['is_wokwi'] = network_config['IS_WOKWI']
print(f"Conexión {'Wokwi' if network_config['IS_WOKWI'] else 'Real'} establecida")
print('Configuración IP:', self.wifi.ifconfig())
self.update_wifi_strength()
return True
except Exception as e:
print(f"\nError conectando a {network_config['SSID']}: {e}")
continue
raise Exception('No se pudo conectar a ninguna red WiFi configurada')
def update_wifi_strength(self):
if self.wifi.isconnected():
self.device_state['wifi_strength'] = self.wifi.status('rssi')
def mqtt_connect(self):
# Usar un ID único basado en si estamos en Wokwi o hardware real
prefix = "wokwi" if self.device_state['is_wokwi'] else "hw"
client_id = f"{prefix}-{ubinascii.hexlify(machine.unique_id()).decode()}"
self.mqtt_client = MQTTClient(
client_id=client_id,
server=CONFIG['MQTT']['BROKER'],
port=CONFIG['MQTT']['PORT'],
keepalive=CONFIG['MQTT']['KEEP_ALIVE']
)
self.mqtt_client.set_callback(self.on_mqtt_message)
self.mqtt_client.connect()
# Suscribirse a todos los topics relevantes
for topic in [self.topics['led'], self.topics['buzzer']]:
self.mqtt_client.subscribe(topic.encode())
# Publicar mensaje de conexión exitosa
self.publish_status("online")
print(f"Conectado a MQTT como {client_id}")
def on_mqtt_message(self, topic, msg):
try:
topic = topic.decode()
msg = msg.decode()
print(f"Mensaje recibido: {topic} -> {msg}")
if topic == self.topics['led']:
self.handle_led_command(msg)
elif topic == self.topics['buzzer']:
self.handle_buzzer_command(msg)
# Publicar estado actualizado
self.publish_status()
except Exception as e:
print(f"Error procesando mensaje: {e}")
def handle_led_command(self, msg):
if msg == "on":
self.led.on()
self.device_state['led'] = True
print("LED encendido")
elif msg == "off":
self.led.off()
self.device_state['led'] = False
print("LED apagado")
def handle_buzzer_command(self, msg):
if msg == "on":
self.buzzer.on()
self.device_state['buzzer'] = True
print("Buzzer encendido")
elif msg == "off":
self.buzzer.off()
self.device_state['buzzer'] = False
print("Buzzer apagado")
def check_movement(self):
movement = bool(self.pir.value())
if movement != self.device_state['movement']:
self.device_state['movement'] = movement
self.device_state['last_movement'] = time.time()
self.publish_movement()
print(f"{'Movimiento detectado' if movement else 'No hay movimiento'}")
def check_temperature(self):
temp = esp32.raw_temperature()
self.device_state['temperature'] = temp
self.publish_temperature()
print(f"Temperatura: {temp}°C")
def publish_movement(self):
if self.mqtt_client:
message = json.dumps({
'movement': self.device_state['movement'],
'timestamp': self.device_state['last_movement']
})
self.mqtt_client.publish(self.topics['movement'].encode(), message.encode())
def publish_temperature(self):
if self.mqtt_client:
message = json.dumps({
'temperature': self.device_state['temperature'],
'timestamp': time.time()
})
self.mqtt_client.publish(self.topics['temperature'].encode(), message.encode())
def publish_status(self, status="running"):
if self.mqtt_client:
self.update_wifi_strength()
message = json.dumps({
'status': status,
'state': self.device_state,
'timestamp': time.time()
})
self.mqtt_client.publish(self.topics['status'].encode(), message.encode())
def run(self):
while True:
try:
self.connect_wifi()
self.mqtt_connect()
while True:
if not self.wifi.isconnected():
raise Exception("Conexión WiFi perdida")
self.mqtt_client.check_msg()
time.sleep(0.1)
except Exception as e:
print(f"Error en ejecución: {e}")
self.publish_status("error")
# Intenta reconectarse
time.sleep(CONFIG['MQTT']['RECONNECT_DELAY'])
continue
def main():
controller = DeviceController()
controller.run()
if __name__ == "__main__":
main()