# =============================================================================
# UNIVERSIDAD INTERNACIONAL DE LA RIOJA (UNIR) - MAESTRÍA INDUSTRIA 4.0
# Autor: [Henry] - Versión: 10.1 Final - Fecha: [28/04/2025]
# =============================================================================
# ========== IMPORTACIÓN DE LIBRERÍAS ==========
from machine import Pin, PWM, Timer, reset # Control de pines GPIO y PWM
import dht # Para el sensor DHT22
import network # Gestión de conexión WiFi
from umqtt.simple import MQTTClient # Cliente MQTT para IoT
import time # Control de tiempos y delays
import ujson # Serialización de datos JSON
# ========== CONFIGURACIÓN HARDWARE ==========
SERVO_PIN = 15 # GPIO conectado al servo motor (PWM)
DHT_PIN = 16 # GPIO conectado al sensor DHT22 (1-Wire)
PWM_FREQ = 50 # Frecuencia PWM estándar para servos (50Hz)
MIN_PULSE_US = 500 # Ancho de pulso mínimo para 0° (500μs)
MAX_PULSE_US = 2400 # Ancho de pulso máximo para 180° (2400μs)
PWM_PERIOD_US = 20000 # Período completo PWM (20,000μs = 20ms)
# ========== INICIALIZACIÓN HARDWARE ==========
servo = PWM(Pin(SERVO_PIN), freq=PWM_FREQ) # Configurar PWM para servo
sensor = dht.DHT22(Pin(DHT_PIN)) # Inicializar sensor ambiental
# ========== CONFIGURACIÓN MQTT ==========
MQTT_CLIENT_ID = "unir-industria4.0" # Identificador único del dispositivo
MQTT_BROKER = "broker.mqttdashboard.com" # Servidor MQTT público
MQTT_TOPIC = "unir/iot/clima" # Tópico para publicación de datos
# ========== PARÁMETROS DE CONTROL ==========
UMBRAL_TEMPERATURA = 25.0 # Temperatura base para activación (°C)
HISTERESIS = 1 # Margen de histéresis (±0.5°C)
VELOCIDAD_SUBIDA = 20 # Tiempo por grado al subir (ms/°)
VELOCIDAD_BAJADA = 1 # Tiempo por grado al bajar (ms/°)
TIEMPO_MUESTREO = 0.1 # Intervalo entre lecturas (100ms)
# ========== VARIABLES GLOBALES ==========
servo_activo = False # Estado de operación del servo (True/False)
angulo_actual = 0 # Posición angular actual del servo (0-180)
direccion_movimiento = 1 # Dirección de movimiento actual (1/-1)
ultimo_mensaje = "" # Último mensaje MQTT enviado (para detección de cambios)
ultima_actualizacion = time.ticks_ms() # Marca temporal de última actualización
# ========== FUNCIONES AUXILIARES ==========
def calcular_duty(angulo):
"""Calcula el ciclo de trabajo PWM para una posición angular específica"""
pulso_us = MIN_PULSE_US + (angulo * (MAX_PULSE_US - MIN_PULSE_US) / 180) # Mapeo lineal angular a μs
return int((pulso_us * 65535) / PWM_PERIOD_US) # Conversión a valor de 16 bits
def conectar_wifi():
"""Maneja la conexión WiFi con timeout y reintentos automáticos"""
print("⌛ Conectando a WiFi...", end="") # Mensaje inicial
wifi = network.WLAN(network.STA_IF) # Configurar interfaz en modo estación
wifi.active(True) # Activar la interfaz WiFi
try:
wifi.connect('Wokwi-GUEST', '') # Intentar conexión (SSID/contraseña)
inicio = time.ticks_ms() # Marca temporal inicial
while not wifi.isconnected(): # Bucle de espera de conexión
if time.ticks_diff(time.ticks_ms(), inicio) > 5000: # Timeout de 5 segundos
raise RuntimeError("Timeout de conexión") # Lanzar excepción
print(".", end="") # Indicador de progreso
time.sleep(0.1) # Pequeña pausa entre intentos
print(f"\n✅ Conexión exitosa | IP: {wifi.ifconfig()[0]}") # Mensaje éxito
return True # Retorno exitoso
except Exception as e:
print(f"\n❌ Error: {str(e)}") # Mostrar error
return False # Retorno fallido
# ========== FUNCIONES DE CONTROL ==========
def actualizar_servo(timer):
"""Actualiza la posición del servo de forma no bloqueante"""
global angulo_actual, direccion_movimiento, ultima_actualizacion
if not servo_activo: return # Salir si el servo está inactivo
delta_t = time.ticks_diff(time.ticks_ms(), ultima_actualizacion) # Calcular tiempo transcurrido
velocidad = VELOCIDAD_BAJADA if direccion_movimiento == -1 else VELOCIDAD_SUBIDA # Seleccionar velocidad
if delta_t >= velocidad: # Verificar si debe actualizarse
angulo_actual += direccion_movimiento # Actualizar posición
angulo_actual = max(0, min(180, angulo_actual)) # Limitar rango angular
if angulo_actual >= 180: direccion_movimiento = -1 # Invertir dirección al máximo
elif angulo_actual <= 0: direccion_movimiento = 1 # Invertir dirección al mínimo
servo.duty_u16(calcular_duty(angulo_actual)) # Aplicar nueva posición
ultima_actualizacion = time.ticks_ms() # Actualizar timestamp
def controlar_servo(temp):
"""Controla la activación/desactivación del servo basado en temperatura"""
global servo_activo, ultima_actualizacion
umbral_alto = UMBRAL_TEMPERATURA + (HISTERESIS / 2) # Calcular límite superior
umbral_bajo = UMBRAL_TEMPERATURA - (HISTERESIS / 2) # Calcular límite inferior
if temp > umbral_alto and not servo_activo: # Condición de activación
print(f"🔥 ACTIVACIÓN: {temp:.1f}°C") # Notificar activación
servo_activo = True # Cambiar estado
ultima_actualizacion = time.ticks_ms() # Reiniciar temporización
elif temp < umbral_bajo and servo_activo: # Condición de desactivación
print(f"❄️ DESACTIVACIÓN: {temp:.1f}°C") # Notificar desactivación
servo_activo = False # Cambiar estado
# ========== FUNCIÓN PRINCIPAL ==========
def main():
"""Bucle principal del sistema con gestión de comunicaciones"""
global ultimo_mensaje
if not conectar_wifi(): # Establecer conexión WiFi
print("⚡ Reinicio programado en 5s...") # Mensaje de reinicio
time.sleep(5) # Espera antes de reiniciar
reset() # Reinicio del dispositivo
cliente = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER) # Crear cliente MQTT
cliente.connect() # Conectar al broker
timer = Timer(0) # Usar temporizador hardware 0
timer.init(period=10, mode=Timer.PERIODIC, callback=actualizar_servo) # Configurar cada 10ms
while True: # Bucle principal infinito
try:
# ===== RUTINA DE LECTURA Y PUBLICACIÓN =====
sensor.measure() # Tomar nueva medición del sensor
temp = sensor.temperature() # Obtener temperatura actual
hum = sensor.humidity() # Obtener humedad actual
controlar_servo(temp) # Ejecutar lógica de control
payload = ujson.dumps({ # Crear payload JSON
"temp": temp, # Temperatura actual
"hum": hum, # Humedad relativa
"servo_activo": servo_activo, # Estado del actuador
"angulo": angulo_actual, # Posición angular
"ts": time.ticks_ms() # Marca temporal
})
if payload != ultimo_mensaje: # Detectar cambios en los datos
cliente.publish(MQTT_TOPIC, payload) # Publicar en MQTT
ultimo_mensaje = payload # Actualizar último mensaje
print(f"📤 TX: {payload}") # Confirmar envío
time.sleep(TIEMPO_MUESTREO) # Mantener intervalo de muestreo
except OSError as e: # Manejar errores de comunicación
print(f"⚠️ Error: {str(e)} - Reconectando...") # Notificar error
cliente.connect() # Intentar reconexión
# ========== INICIO DEL PROGRAMA ==========
if __name__ == "__main__":
print("\n⚙️ SISTEMA INDUSTRIAL 4.0 - INICIADO")
print("========================================")
print(" Copyright © 2024 UNIR - Industria 4.0 ")
print("========================================")
main()