# main.py
# =============================================================================
# ESP32 + Botones + LDR + Potenciómetro + PWM + DHT + Terminal ANSI
# + LED RGB térmico + Buzzer pasivo + WiFi + NTP + Provisioning + MQTT
# + ThingsBoard
#
# Versión definitiva sin OLED
# -----------------------------------------------------------------------------
# Archivos requeridos en la placa / proyecto:
# - main.py
# - ansi_terminal_dashboard.py
# - passive_buzzer.py
# - rgb_led.py
#
# Librerías MicroPython requeridas:
# - dht
# - network
# - ntptime
# - umqtt.simple
# - ujson
# - ubinascii
# =============================================================================
# =============================================================================
# IMPORTS
# =============================================================================
import network
import time
import math
import dht
import ujson
import ubinascii
import machine
import ntptime
import os
import ansi_terminal_dashboard as atd
import passive_buzzer as buz
import rgb_led as rgb
from machine import Pin, ADC, PWM
from umqtt.simple import MQTTClient
# =============================================================================
# 1) CONFIGURACIÓN GENERAL
# =============================================================================
MODE = "WOKWI" # "WOKWI" | "PHYSICAL"
if MODE == "WOKWI":
SSID = "Wokwi-GUEST"
PASSWORD = ""
else:
SSID = "WILLEDTECH"
PASSWORD = "Aula3186"
STATION_PREFIX = "WS-BGA-"
START_ID = "WS-BGA-000" # Reemplaza XXX por
MAX_TRIES = 10000
CITY = "Bucaramanga"
LATITUDE = 7.116 # Revisa la latitud y la longitud de la ubicación que quieres que aparezca.
LONGITUDE = -73.110
TZ_OFFSET = -5 * 3600
NTP_RESYNC_MS = 6 * 60 * 60 * 1000
# En True borra token e ID guardados y fuerza un nuevo provisioning.
# Úsalo solo cuando quieras registrar la placa como un nuevo dispositivo.
FORCE_REPROVISION = False
# Para Wokwi normalmente funciona con False si el RGB es cátodo común.
# Si en el montaje físico el RGB es ánodo común, cambiar a True.
RGB_COMMON_ANODE = True
# Jitter opcional para evitar que muchas placas publiquen al mismo tiempo.
STARTUP_JITTER_MAX_MS = 3000
# =============================================================================
# 2) CONFIGURACIÓN THINGSBOARD / MQTT
# =============================================================================
TB_HOST = "thingsboard.apps.willedtech.io"
TB_MQTT_PORT = 1883
PROVISION_DEVICE_KEY = "3ku06ikwiwhyh2djz0ga"
PROVISION_DEVICE_SECRET = "yluue4yaypmvyitu7c26"
TOKEN_FILE = "tb_token.txt"
STATION_FILE = "tb_station_id.txt"
TOPIC_TELEMETRY = b"v1/devices/me/telemetry"
TOPIC_ATTRS = b"v1/devices/me/attributes"
TOPIC_RPC_REQ = b"v1/devices/me/rpc/request/+"
PROVISION_REQUEST_TOPIC = b"/provision/request"
PROVISION_RESPONSE_TOPIC = b"/provision/response"
# =============================================================================
# 3) UMBRALES TÉRMICOS
# =============================================================================
TEMP_TOO_LOW_MAX = 18.0
TEMP_NORMAL_MIN = 24.0
TEMP_NORMAL_MAX = 28.0
# Interpretación:
# temp < TEMP_TOO_LOW_MAX -> VERY_LOW -> RGB magenta + alarma grave
# TEMP_TOO_LOW_MAX <= temp < NORMAL_MIN -> LOW -> RGB verde + alerta suave
# TEMP_NORMAL_MIN <= temp <= NORMAL_MAX -> NORMAL -> RGB amarillo+ melodía suave
# temp > TEMP_NORMAL_MAX -> HIGH -> RGB rojo + alerta aguda
# sin dato -> NO_DATA -> RGB azul + silencio
# =============================================================================
# 4) MAPA DE PINES
# =============================================================================
BTN_PULLUP_GPIO = 14 # Botón A -> activo en 0
BTN_PULLDOWN_GPIO = 13 # Botón B -> activo en 1
LED_ROJO_GPIO = 23
LED_VERDE_GPIO = 22
LED_PWM_GPIO = 21
LDR_ADC_GPIO = 34
POT_ADC_GPIO = 35
DHT_GPIO = 15
BUZZER_GPIO = 18
RGB_R_GPIO = 25
RGB_G_GPIO = 26
RGB_B_GPIO = 27
# =============================================================================
# 5) INTERVALOS
# =============================================================================
PRINT_INTERVAL_MS = 500
DHT_INTERVAL_MS = 2000
LDR_INTERVAL_MS = 1500
PUBLISH_INTERVAL_MS = 2000
LDR_AVG_SAMPLES = 10
# =============================================================================
# 6) HARDWARE
# =============================================================================
btn_pullup_pin = Pin(BTN_PULLUP_GPIO, Pin.IN, Pin.PULL_UP)
btn_pulldn_pin = Pin(BTN_PULLDOWN_GPIO, Pin.IN, Pin.PULL_DOWN)
ldr = ADC(Pin(LDR_ADC_GPIO))
pot = ADC(Pin(POT_ADC_GPIO))
ldr.atten(ADC.ATTN_11DB)
pot.atten(ADC.ATTN_11DB)
ldr.width(ADC.WIDTH_12BIT)
pot.width(ADC.WIDTH_12BIT)
led_rojo = Pin(LED_ROJO_GPIO, Pin.OUT)
led_verde = Pin(LED_VERDE_GPIO, Pin.OUT)
led_pwm = PWM(Pin(LED_PWM_GPIO), freq=1000)
led_pwm.duty(0)
if MODE == "PHYSICAL":
sensor_dht = dht.DHT11(Pin(DHT_GPIO))
else:
sensor_dht = dht.DHT22(Pin(DHT_GPIO))
buzzer = buz.PassiveBuzzer(BUZZER_GPIO)
thermal_rgb = rgb.ThermalRgbLed(
r_gpio=RGB_R_GPIO,
g_gpio=RGB_G_GPIO,
b_gpio=RGB_B_GPIO,
common_anode=RGB_COMMON_ANODE
)
terminal_dash = atd.AnsiTerminalDashboard(
width=70,
ldr_avg_samples=LDR_AVG_SAMPLES
)
# =============================================================================
# 7) VARIABLES DE ESTADO
# =============================================================================
temperatura = None
humedad = None
v_up = 1
v_dn = 0
ldr_raw = 0
pot_raw = 0
ldr_pct = 0
pot_pct = 0
ldr_pct_avg = 0
duty_pwm = 0
lux = 0.0
led_rojo_state = 0
led_verde_state = 0
station_id = "WS-BOOT"
wifi_ip = "-"
wifi_ok = False
mqtt_ok = False
page_index = 0
page_count = 1
last_publish_reason = "boot"
startup_jitter_ms = 0
temp_state = rgb.STATE_NO_DATA
rgb_color_name = "AZUL"
sound_profile_name = "OFF"
# =============================================================================
# 8) TIEMPO / NTP
# =============================================================================
def sync_time_ntp(max_tries=3):
for _ in range(max_tries):
try:
ntptime.host = "pool.ntp.org"
ntptime.settime()
return True
except Exception as e:
print("NTP error:", e)
time.sleep(1)
return False
def year_is_valid():
try:
return time.localtime()[0] >= 2022
except Exception:
return False
def get_local_time_str():
try:
if year_is_valid():
lt = time.localtime(time.time() + TZ_OFFSET)
return "{:02d}:{:02d}".format(lt[3], lt[4])
except Exception:
pass
return "--:--"
# =============================================================================
# 9) UTILIDADES GENERALES
# =============================================================================
def adc_to_pct(raw_value):
return int((raw_value * 100) / 4095)
def adc_to_pwm(raw_value):
return int((raw_value * 1023) / 4095)
def pwm_to_pct(duty_value):
return int((duty_value * 100) / 1023)
def classify_temperature_state(temp_value):
"""
Clasifica la temperatura usando los umbrales definidos en main.py.
Esta función evita depender de umbrales dentro de rgb_led.py.
"""
try:
t = float(temp_value)
except Exception:
return rgb.STATE_NO_DATA
if t < TEMP_TOO_LOW_MAX:
return rgb.STATE_VERY_LOW
if t < TEMP_NORMAL_MIN:
return rgb.STATE_LOW
if t <= TEMP_NORMAL_MAX:
return rgb.STATE_NORMAL
return rgb.STATE_HIGH
# =============================================================================
# 10) LUX DESDE LDR
# =============================================================================
R_FIXED = 10_000
rl10 = 50e3
gamma = 0.7
points = [
(0.1, 1.25e6),
(1, 250e3),
(10, 50e3),
(50, 16.2e3),
(100, 9.98e3),
(400, 3.78e3),
(1000, 1.99e3),
(10000, 397),
(100000, 79),
]
def calculate_resistance_from_raw(raw_value):
value = raw_value
if value <= 0:
value = 1
if value >= 4095:
value = 4094
return R_FIXED * (value / (4095 - value))
def calculate_lux(resistance):
return 10 * math.pow(rl10 / resistance, 1 / gamma)
def linear_interpolation(resistance):
if resistance >= 1.25e6:
return 0.1
if resistance <= 79:
return 100000
for i in range(len(points) - 1):
lux1, r1 = points[i]
lux2, r2 = points[i + 1]
if r2 <= resistance <= r1:
return lux1 + ((resistance - r1) / (r2 - r1)) * (lux2 - lux1)
return calculate_lux(resistance)
def calculate_lux_from_raw(raw_value):
r = calculate_resistance_from_raw(raw_value)
return linear_interpolation(r)
# =============================================================================
# 11) DHT SEGURO
# =============================================================================
def read_dht_safe(max_tries=3):
for _ in range(max_tries):
try:
sensor_dht.measure()
t = float(sensor_dht.temperature())
h = float(sensor_dht.humidity())
if not (0.0 <= t <= 80.0):
raise ValueError("Temp fuera de rango")
if not (0.0 <= h <= 100.0):
raise ValueError("Hum fuera de rango")
return t, h
except Exception:
time.sleep_ms(250)
return None, None
# =============================================================================
# 12) SALIDAS TÉRMICAS: RGB + BUZZER
# =============================================================================
def update_temperature_outputs():
global temp_state, rgb_color_name, sound_profile_name
new_state = classify_temperature_state(temperatura)
temp_state = new_state
thermal_rgb.apply_state(temp_state)
rgb_color_name = thermal_rgb.get_color_name()
# Los nombres de estado coinciden entre rgb_led.py y passive_buzzer.py.
buzzer.set_profile(temp_state)
sound_profile_name = buzzer.get_profile_name()
# =============================================================================
# 13) DASHBOARD ANSI
# =============================================================================
def render_terminal_dashboard(now_ms, last_publish_ms):
heartbeat_s = terminal_dash.heartbeat_remaining_s(
now_ms,
last_publish_ms,
PUBLISH_INTERVAL_MS
)
terminal_dash.render({
"station_id": station_id,
"hora": get_local_time_str(),
"wifi_ok": wifi_ok,
"wifi_ip": wifi_ip,
"mqtt_ok": mqtt_ok,
"page_index": page_index,
"page_count": page_count,
"last_publish_reason": last_publish_reason,
"temperatura": temperatura,
"humedad": humedad,
"ldr_raw": ldr_raw,
"ldr_pct_avg": ldr_pct_avg,
"lux": lux,
"pot_pct": pot_pct,
"duty_pwm_pct": pwm_to_pct(duty_pwm),
"heartbeat_s": heartbeat_s,
"btn_a": (v_up == 0),
"btn_b": (v_dn == 1),
"led_verde": led_verde_state,
"led_rojo": led_rojo_state,
"startup_jitter_ms": startup_jitter_ms
})
print(
"\nEstado temp:", temp_state,
"| RGB:", rgb_color_name,
"| Sonido:", sound_profile_name,
"| Umbrales:",
"TL<{}".format(TEMP_TOO_LOW_MAX),
"N:{}-{}".format(TEMP_NORMAL_MIN, TEMP_NORMAL_MAX),
end=""
)
# =============================================================================
# 14) WIFI
# =============================================================================
def connect_wifi(timeout_s=20):
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print("Conectando WiFi:", SSID)
wlan.connect(SSID, PASSWORD)
t0 = time.time()
while not wlan.isconnected():
if time.time() - t0 > timeout_s:
raise RuntimeError("WiFi timeout")
time.sleep(0.5)
ip = wlan.ifconfig()[0]
print("WiFi OK. IP:", ip)
return ip
def ensure_wifi():
global wifi_ok, wifi_ip
try:
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if wlan.isconnected():
wifi_ok = True
wifi_ip = wlan.ifconfig()[0]
return True
wifi_ip = connect_wifi()
wifi_ok = True
return True
except Exception as e:
wifi_ok = False
wifi_ip = "-"
print("WiFi error:", e)
return False
# =============================================================================
# 15) PERSISTENCIA
# =============================================================================
def load_token():
try:
with open(TOKEN_FILE, "r") as f:
t = f.read().strip()
return t if t else None
except Exception:
return None
def save_token(token):
with open(TOKEN_FILE, "w") as f:
f.write(token)
def load_station_id():
try:
with open(STATION_FILE, "r") as f:
s = f.read().strip()
return s if s else None
except Exception:
return None
def save_station_id(sid):
with open(STATION_FILE, "w") as f:
f.write(sid)
def clear_saved_credentials():
try:
os.remove(TOKEN_FILE)
except Exception:
pass
try:
os.remove(STATION_FILE)
except Exception:
pass
# =============================================================================
# 16) PROVISIONING THINGSBOARD
# =============================================================================
def parse_suffix(sid):
try:
return int(str(sid).split("-")[-1])
except Exception:
return 0
def make_station_id(prefix, n):
return "{}{:03d}".format(prefix, n)
def provision_get_token(device_name, timeout_s=15):
result = {"status": None, "token": None, "error": None}
def cb(topic, msg):
try:
data = ujson.loads(msg)
status = data.get("status") or data.get("provisionDeviceStatus")
result["status"] = status
token = (
data.get("accessToken")
or data.get("credentialsValue")
or data.get("credentials")
)
if status == "SUCCESS" and token:
result["token"] = token
result["error"] = (
data.get("errorMsg")
or data.get("error")
or data.get("message")
)
except Exception as e:
result["status"] = "FAILURE"
result["error"] = "Invalid provisioning response: {}".format(e)
c = None
try:
c = MQTTClient(
client_id=b"prov-" + ubinascii.hexlify(machine.unique_id()),
server=TB_HOST,
port=TB_MQTT_PORT,
user="provision",
password="",
keepalive=30
)
c.set_callback(cb)
c.connect()
c.subscribe(PROVISION_RESPONSE_TOPIC)
req = {
"deviceName": device_name,
"provisionDeviceKey": PROVISION_DEVICE_KEY,
"provisionDeviceSecret": PROVISION_DEVICE_SECRET
}
c.publish(PROVISION_REQUEST_TOPIC, ujson.dumps(req).encode())
t0 = time.time()
while result["status"] is None and (time.time() - t0) < timeout_s:
c.check_msg()
time.sleep(0.2)
except Exception as e:
result["status"] = "FAILURE"
result["error"] = str(e)
finally:
try:
if c is not None:
c.disconnect()
except Exception:
pass
if result["status"] is None:
result["status"] = "FAILURE"
result["error"] = "Provision timeout"
return result
def is_fatal_provision_error(err):
"""
Solo errores realmente fatales deben detener el barrido de IDs.
Errores como ID ocupado o dispositivo ya existente permiten probar el siguiente.
"""
if not err:
return False
e = str(err).lower()
non_fatal_hints = [
"already exists",
"exist",
"duplicate",
"already registered",
"device name",
"name"
]
for h in non_fatal_hints:
if h in e:
return False
fatal_hints = [
"secret",
"key",
"credential",
"unauthorized",
"forbidden",
"bad request",
"not allowed",
"dns",
"network",
"refused",
"unreachable"
]
for h in fatal_hints:
if h in e:
return True
return False
def provision_find_first_available(prefix, start_id, max_tries=50):
"""
Busca el primer ID disponible.
Si un intento falla y no es fatal, continúa con el siguiente ID.
"""
start_n = parse_suffix(start_id)
for n in range(start_n, start_n + max_tries):
candidate = make_station_id(prefix, n)
print("Provision attempt:", candidate)
prov = provision_get_token(candidate)
if prov.get("status") == "SUCCESS" and prov.get("token"):
print("Provision SUCCESS:", candidate)
return candidate, prov["token"]
err = prov.get("error") or prov.get("status") or "FAILURE"
print("Provision FAIL:", candidate, err)
if is_fatal_provision_error(err):
print("Provision fatal error:", err)
return None, None
time.sleep_ms(350)
return None, None
# =============================================================================
# 17) MQTT / THINGSBOARD
# =============================================================================
def mqtt_callback(topic, msg):
print("MQTT RX:", topic, msg)
def connect_tb(access_token, sid):
client_id = (sid + "-").encode() + ubinascii.hexlify(machine.unique_id())
client = MQTTClient(
client_id=client_id,
server=TB_HOST,
port=TB_MQTT_PORT,
user=access_token,
password="",
keepalive=60
)
client.set_callback(mqtt_callback)
client.connect()
client.subscribe(TOPIC_RPC_REQ)
print("Conectado a TB MQTT:", TB_HOST, TB_MQTT_PORT)
return client
def publish_attrs(client, sid):
attrs = {
"station_id": sid,
"city": CITY,
"mode": MODE,
"latitude": LATITUDE,
"longitude": LONGITUDE,
"temp_too_low_max": TEMP_TOO_LOW_MAX,
"temp_normal_min": TEMP_NORMAL_MIN,
"temp_normal_max": TEMP_NORMAL_MAX,
"rgb_common_anode": RGB_COMMON_ANODE,
"btn_pullup_gpio": BTN_PULLUP_GPIO,
"btn_pulldown_gpio": BTN_PULLDOWN_GPIO,
"led_rojo_gpio": LED_ROJO_GPIO,
"led_verde_gpio": LED_VERDE_GPIO,
"led_pwm_gpio": LED_PWM_GPIO,
"ldr_gpio": LDR_ADC_GPIO,
"pot_gpio": POT_ADC_GPIO,
"dht_gpio": DHT_GPIO,
"buzzer_gpio": BUZZER_GPIO,
"rgb_r_gpio": RGB_R_GPIO,
"rgb_g_gpio": RGB_G_GPIO,
"rgb_b_gpio": RGB_B_GPIO
}
client.publish(TOPIC_ATTRS, ujson.dumps(attrs).encode())
def publish_telemetry(client):
telemetry = {
"station_id": station_id,
"local_time": get_local_time_str(),
"btn_up_raw": v_up,
"btn_down_raw": v_dn,
"btn_a_pressed": (v_up == 0),
"btn_b_pressed": (v_dn == 1),
"led_green": led_verde_state,
"led_red": led_rojo_state,
"ldr_raw": ldr_raw,
"ldr_pct": ldr_pct,
"ldr_pct_avg": ldr_pct_avg,
"pot_raw": pot_raw,
"pot_pct": pot_pct,
"pwm": duty_pwm,
"pwm_pct": pwm_to_pct(duty_pwm),
"lux": round(float(lux), 2),
"temperature_state": temp_state,
"rgb_color": rgb_color_name,
"sound_profile": sound_profile_name,
"wifi_ok": wifi_ok,
"mqtt_ok": mqtt_ok
}
if temperatura is not None:
telemetry["temperature"] = round(float(temperatura), 2)
if humedad is not None:
telemetry["humidity"] = round(float(humedad), 2)
client.publish(TOPIC_TELEMETRY, ujson.dumps(telemetry).encode())
# =============================================================================
# 18) MAIN
# =============================================================================
def main():
global temperatura, humedad
global v_up, v_dn
global ldr_raw, pot_raw, ldr_pct, pot_pct, ldr_pct_avg, duty_pwm, lux
global led_rojo_state, led_verde_state
global station_id, wifi_ip, wifi_ok, mqtt_ok
global last_publish_reason, startup_jitter_ms
terminal_dash.init()
print("Iniciando estación ambiental IoT...")
print("Modo actual:", MODE)
print("Dashboard: Terminal ANSI")
print("OLED: No usado")
print("RGB_COMMON_ANODE:", RGB_COMMON_ANODE)
thermal_rgb.apply_state(rgb.STATE_NO_DATA)
buzzer.play_boot_melody()
startup_jitter_ms = terminal_dash.get_jitter_ms(STARTUP_JITTER_MAX_MS)
if startup_jitter_ms > 0:
print("Startup jitter:", startup_jitter_ms, "ms")
time.sleep_ms(startup_jitter_ms)
if FORCE_REPROVISION:
print("FORCE_REPROVISION activo: borrando credenciales guardadas")
clear_saved_credentials()
# -------------------------------------------------------------------------
# WiFi inicial
# -------------------------------------------------------------------------
while not ensure_wifi():
now = time.ticks_ms()
render_terminal_dashboard(now, 0)
time.sleep(3)
# -------------------------------------------------------------------------
# NTP inicial
# -------------------------------------------------------------------------
if sync_time_ntp():
print("Hora OK: NTP sincronizado")
else:
print("Hora NO: se continúa sin NTP válido")
# -------------------------------------------------------------------------
# Token / provisioning
# -------------------------------------------------------------------------
token = load_token()
loaded_station_id = load_station_id()
if token and loaded_station_id:
station_id = loaded_station_id
print("Credenciales existentes:", station_id)
else:
print("Sin token guardado. Buscando primer ID disponible...")
sid, new_token = provision_find_first_available(
prefix=STATION_PREFIX,
start_id=START_ID,
max_tries=MAX_TRIES
)
if not new_token:
raise RuntimeError("No se pudo provisionar: sin token")
token = new_token
station_id = sid
save_token(token)
save_station_id(station_id)
print("Guardado:", station_id, "(token + station_id)")
client = None
last_print = time.ticks_ms()
last_dht = time.ticks_ms()
last_ldr = time.ticks_ms()
last_pub = 0
last_ntp = time.ticks_ms()
print("Iniciando lectura local + nube + terminal ANSI...")
while True:
try:
now = time.ticks_ms()
# -----------------------------------------------------------------
# WiFi
# -----------------------------------------------------------------
if not ensure_wifi():
mqtt_ok = False
time.sleep(3)
continue
# -----------------------------------------------------------------
# MQTT / ThingsBoard
# -----------------------------------------------------------------
if client is None:
client = connect_tb(token, station_id)
mqtt_ok = True
publish_attrs(client, station_id)
last_publish_reason = "attrs"
client.check_msg()
# -----------------------------------------------------------------
# LECTURA DE BOTONES
# -----------------------------------------------------------------
v_up = btn_pullup_pin.value()
v_dn = btn_pulldn_pin.value()
# -----------------------------------------------------------------
# ENTRADAS ANALÓGICAS
# -----------------------------------------------------------------
ldr_raw = ldr.read()
pot_raw = pot.read()
ldr_pct = adc_to_pct(ldr_raw)
pot_pct = adc_to_pct(pot_raw)
ldr_pct_avg = terminal_dash.update_ldr_average(ldr_pct)
# -----------------------------------------------------------------
# SALIDAS DIGITALES DESDE BOTONES
# -----------------------------------------------------------------
if v_up == 0:
led_verde.value(1)
led_verde_state = 1
else:
led_verde.value(0)
led_verde_state = 0
if v_dn == 1:
led_rojo.value(1)
led_rojo_state = 1
else:
led_rojo.value(0)
led_rojo_state = 0
# -----------------------------------------------------------------
# CONTROL PWM DESDE POTENCIÓMETRO
# -----------------------------------------------------------------
duty_pwm = adc_to_pwm(pot_raw)
led_pwm.duty(duty_pwm)
# -----------------------------------------------------------------
# RESINCRONIZACIÓN NTP
# -----------------------------------------------------------------
if time.ticks_diff(now, last_ntp) >= NTP_RESYNC_MS:
if sync_time_ntp():
last_ntp = now
# -----------------------------------------------------------------
# LUX
# -----------------------------------------------------------------
if time.ticks_diff(now, last_ldr) >= LDR_INTERVAL_MS:
last_ldr = now
lux = calculate_lux_from_raw(ldr_raw)
# -----------------------------------------------------------------
# DHT
# -----------------------------------------------------------------
if time.ticks_diff(now, last_dht) >= DHT_INTERVAL_MS:
last_dht = now
t, h = read_dht_safe()
if t is None:
print("DHT inválido: revisa cableado/sensor/pin")
else:
temperatura = t
humedad = h
# -----------------------------------------------------------------
# RGB + BUZZER SEGÚN TEMPERATURA
# -----------------------------------------------------------------
update_temperature_outputs()
buzzer.update(now)
# -----------------------------------------------------------------
# PUBLICACIÓN MQTT
# -----------------------------------------------------------------
if time.ticks_diff(now, last_pub) >= PUBLISH_INTERVAL_MS:
publish_telemetry(client)
last_pub = now
last_publish_reason = "telemetry"
# -----------------------------------------------------------------
# DASHBOARD ANSI
# -----------------------------------------------------------------
if time.ticks_diff(now, last_print) >= PRINT_INTERVAL_MS:
last_print = now
render_terminal_dashboard(now, last_pub)
time.sleep_ms(20)
except Exception as e:
print("\nLoop error:", e)
mqtt_ok = False
last_publish_reason = "error"
# Señal temporal de error. En la siguiente vuelta se restaurará
# el estado térmico según temperatura.
try:
thermal_rgb.apply_state(rgb.STATE_HIGH)
buzzer.set_profile(buz.STATE_HIGH, restart=True)
except Exception:
pass
try:
if client:
client.disconnect()
except Exception:
pass
client = None
time.sleep(3)
# =============================================================================
# 19) ARRANQUE
# =============================================================================
main()