# #############################################################################
# ** Proyecto : 601 Programación Concurrente
# ** Plataforma : ESP32 / WROOM
# ** Objetivo : Demostración didáctica de programación concurrente
# ** Para alumnos : Principiantes en programación de sistemas embebidos
# **
# ** Conceptos : Concurrencia, Hilos, Sincronización, LEDs
# **
# ** By : Jorge Anzaldo
# ** contact : [email protected]
# #############################################################################
"""
=======================================================
INTRODUCCIÓN A LA CONCURRENCIA
=======================================================
¿Qué es la concurrencia?
Es la capacidad de un programa para ejecutar múltiples tareas
APARENTEMENTE al mismo tiempo. En sistemas con un solo núcleo
(como el ESP32), en realidad se intercalan las tareas rápidamente.
IMPORTANTE: No es lo mismo que paralelismo (ejecución real
simultánea en múltiples núcleos).
"""
# ::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
# : MÓDULOS NECESARIOS :
# ::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
import _thread # Para crear hilos (tareas concurrentes)
import time # Para controlar tiempos de espera
from machine import Pin # Para controlar pines GPIO del ESP32
# =============================================================================
# CLASE LED
# =============================================================================
class Led:
def __init__(self, pin=2, nombre="LED"):
"""
Constructor: Inicializa el LED
Args:
pin (int): Número del pin GPIO (pin 2 = LED interno del ESP32)
nombre (str): Nombre descriptivo para identificar el LED
"""
self.pin = Pin(pin, Pin.OUT) # Configuramos el pin como SALIDA
self.nombre = nombre
self.estado = False # Comienza apagado
print(f"✅ LED '{nombre}' inicializado en pin {pin}")
def encender(self):
self.pin.value(1)
self.estado = True
print(f"💡 {self.nombre}: ENCENDIDO")
def apagar(self):
self.pin.value(0)
self.estado = False
print(f"🌑 {self.nombre}: APAGADO")
def toggle(self):
"""
Alterna el estado del LED
Si estaba encendido → lo apaga
Si estaba apagado → lo enciende
"""
if self.estado:
self.apagar()
else:
self.encender()
def parpadeo(self, tiempo_encendido=1, tiempo_apagado=1):
self.encender()
time.sleep(tiempo_encendido)
self.apagar()
time.sleep(tiempo_apagado)
# =============================================================================
# CLASE RGB LED
# =============================================================================
class RGBLed:
"""
Controla un LED RGB (tres colores en uno)
ANALOGÍA: PENSEMOS EN UN SEMÁFORO
- Rojo, Verde, Azul como las luces del semáforo
- Combinando intensidades creamos diferentes colores
"""
def __init__(self, pin_r=16, pin_g=17, pin_b=18):
"""Inicializa los tres LEDs de colores"""
self.rojo = Pin(pin_r, Pin.OUT)
self.verde = Pin(pin_g, Pin.OUT)
self.azul = Pin(pin_b, Pin.OUT)
# Diccionario de colores predefinidos (R, G, B)
self.colores = {
0: (0, 0, 0), # Apagado
1: (1, 0, 0), # Rojo
2: (0, 1, 0), # Verde
3: (0, 0, 1), # Azul
4: (1, 1, 0), # Amarillo
5: (1, 0, 1), # Magenta
6: (0, 1, 1), # Cian
7: (1, 1, 1), # Blanco
8: (1, 0.5, 0) # Naranja
}
def establecer_color(self, r, g, b):
"""Establece el color con intensidades específicas (0-1)"""
self.rojo.value(int(r))
self.verde.value(int(g))
self.azul.value(int(b))
def ciclo_colores(self, delay=0.5):
"""
Recorre todos los colores disponibles
EJEMPLO DE CONCURRENCIA:
Esta función podría estar en un hilo mientras otros
hilos hacen otras tareas con diferentes LEDs
"""
for i in range(9):
print(f"🎨 Cambiando a color {i}: {self.colores[i]}")
self.establecer_color(*self.colores[i])
time.sleep(delay)
# =============================================================================
# FUNCIONES DE HILOS (TAREAS)
# =============================================================================
def tarea_led_interno():
"""
HILO 1: Controla el LED INTERNO del ESP32
CARACTERÍSTICAS DE UN HILO:
- Se ejecuta INDEPENDIENTEMENTE de otros hilos
- Tiene su propio flujo de ejecución
- Comparte recursos (memoria) con otros hilos
"""
print("🟢 HILO 1 INICIADO: LED Interno")
led = Led(2, "LED Interno") # Pin 2 = LED interno del ESP32
contador = 0
while True:
contador += 1
print(f"📊 Hilo1 - Ciclo: {contador}")
led.parpadeo(0.2, 0.2) # Parpadeo rápido
def tarea_led_externo():
"""
HILO 2: Controla un LED EXTERNO conectado al pin 4
IMAGINEMOS QUE:
Este hilo representa una alarma que parpadea lentamente
mientras el sistema hace otras cosas
"""
print("🔵 HILO 2 INICIADO: LED Externo")
led = Led(4, "LED Externo (Alarma)")
contador = 0
while True:
contador += 1
print(f"📊 Hilo2 - Ciclo: {contador}")
led.parpadeo(0.5, 1.5) # Parpadeo lento (como alarma)
def tarea_led_rgb():
"""
HILO 3: Controla un LED RGB
EJEMPLO DE APLICACIÓN REAL:
Podría ser un indicador de estado del sistema
que cambia de color según condiciones
"""
print("🌈 HILO 3 INICIADO: LED RGB")
rgb = RGBLed()
contador = 0
while True:
contador += 1
print(f"📊 Hilo3 - Ciclo: {contador}")
rgb.ciclo_colores(0.3) # Ciclo rápido de colores
# =============================================================================
# DEMOSTRACIÓN DE PROBLEMAS DE CONCURRENCIA
# =============================================================================
def demostracion_problema_concurrencia():
"""
MUESTRA UN PROBLEMA COMÚN: Condición de carrera
¿QUÉ ES UNA CONDICIÓN DE CARRERA?
Ocurre cuando dos hilos acceden al mismo recurso
simultáneamente y el resultado depende del orden
de ejecución (que no podemos controlar).
"""
print("\n" + "="*60)
print("DEMOSTRACIÓN: Problema de condición de carrera")
print("="*60)
recurso_compartido = {"contador": 0}
def hilo_a():
for _ in range(1000):
valor_actual = recurso_compartido["contador"]
# Simulamos una interrupción aquí
time.sleep(0.001) # Pequeña pausa
recurso_compartido["contador"] = valor_actual + 1
def hilo_b():
for _ in range(1000):
valor_actual = recurso_compartido["contador"]
# Otra posible interrupción
time.sleep(0.001)
recurso_compartido["contador"] = valor_actual + 1
# Ejecutamos ambos hilos
_thread.start_new_thread(hilo_a, ())
_thread.start_new_thread(hilo_b, ())
time.sleep(3)
print(f"\n🔍 Resultado esperado: 2000")
print(f"🔍 Resultado obtenido: {recurso_compartido['contador']}")
print("⚠️ ¡El resultado es IMPREDECIBLE! (condición de carrera)")
return recurso_compartido["contador"]
# =============================================================================
# PROGRAMA PRINCIPAL
# =============================================================================
if __name__ == '__main__':
"""
PUNTO DE ENTRADA DEL PROGRAMA
Aquí es donde todo comienza. Observa cómo:
1. Creamos múltiples hilos
2. Cada hilo ejecuta su función independientemente
3. Todos parecen funcionar "al mismo tiempo"
"""
print("\n" + "="*60)
print("INICIANDO DEMOSTRACIÓN DE CONCURRENCIA")
print("="*60)
print("\n🎯 OBJETIVO: Mostrar cómo múltiples tareas")
print(" pueden ejecutarse concurrentemente")
print("\n" + "-"*40)
print("CREANDO HILOS (TAREAS CONCURRENTES)...")
print("-"*40)
# Crear hilos (tareas concurrentes)
try:
_thread.start_new_thread(tarea_led_interno, ())
print("✅ Hilo 1 creado")
time.sleep(0.5) # Pequeña pausa para ver los mensajes
_thread.start_new_thread(tarea_led_externo, ())
print("✅ Hilo 2 creado")
time.sleep(0.5)
_thread.start_new_thread(tarea_led_rgb, ())
print("✅ Hilo 3 creado")
except Exception as e:
print(f"❌ Error al crear hilos: {e}")
print("\n" + "-"*40)
print("🎭 LOS 3 HILOS ESTÁN EJECUTÁNDOSE CONCURRENTEMENTE")
print("-"*40)
print("""
╔══════════════════════════════════════════════╗
║ EJECUCIÓN CONCURRENTE ║
║ ║
║ Hilo 1: LED Interno ║
║ Hilo 2: LED Externo ║
║ Hilo 3: LED RGB ║
║ ║
║ Todos parpadean a DIFERENTES ritmos ║
║ pero APARENTEMENTE al mismo tiempo ║
╚══════════════════════════════════════════════╝
""")
# Mantener el programa principal activo
print("\n⏳ Programa principal en ejecución...")
print(" (Presiona Ctrl+C en Thonny para detener)")
print("\n📈 Observa en la consola cómo se intercalan")
print(" los mensajes de los diferentes hilos")
try:
# El programa principal también puede hacer algo
contador_principal = 0
while True:
contador_principal += 1
print(f"\n🏠 PROGRAMA PRINCIPAL - Ciclo: {contador_principal}")
print(" (Mientras tanto, los hilos siguen trabajando...)")
time.sleep(5) # Espera más larga para no saturar la consola
except KeyboardInterrupt:
print("\n\n🛑 Programa interrumpido por el usuario")
print("✨ Fin de la demostración de concurrencia")