from machine import Pin, PWM, ADC, I2C
import time
import network
from umqtt.simple import MQTTClient
import ssd1306
import json
# ----------------------------
# CONFIG PINES
# ----------------------------
BTN_PIN = 23
SERVO_PIN = 13
BUZZER_PIN = 2
NTC_PIN = 34
I2C_SCL = 22
I2C_SDA = 21
TEMP_MAX = 70
# ----------------------------
# WIFI
# ----------------------------
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASS = ""
# ----------------------------
# MQTT
# ----------------------------
MQTT_BROKER = "test.mosquitto.org"
MQTT_PORT = 1883
MQTT_CLIENT_ID = "esp32-ntc-safety"
TOPIC = b"inacap/esp32/safety" # <-- TÓPICO ÚNICO
mqtt = None
mqtt_ok = False
# ----------------------------
# OLED
# ----------------------------
i2c = I2C(0, scl=Pin(I2C_SCL), sda=Pin(I2C_SDA))
oled = ssd1306.SSD1306_I2C(128, 64, i2c)
def oled_show(status, temp):
oled.fill(0)
oled.text("Temp:{:.1f}C".format(temp), 0, 10)
oled.text("Estado:" + status, 0, 30)
oled.text("MQTT:OK" if mqtt_ok else "MQTT:--", 0, 50)
oled.show()
# ----------------------------
# NTC ADC LINEAL
# ----------------------------
adc = ADC(Pin(NTC_PIN))
adc.atten(ADC.ATTN_11DB)
def read_temp_ntc():
value = adc.read()
voltage = value * 3.3 / 4095
temp_c = (voltage / 3.3) * (150 - 20) + 20
return temp_c
# ----------------------------
# SERVO
# ----------------------------
servo = PWM(Pin(SERVO_PIN), freq=50)
def set_servo(angle):
min_us = 500
max_us = 2500
us = min_us + (max_us - min_us) * angle / 180
duty = int(us * servo.freq() * 1024 / 1_000_000)
servo.duty(duty)
def servo_open():
set_servo(0)
def servo_close():
set_servo(90)
# ----------------------------
# BOTÓN + BUZZER
# ----------------------------
button = Pin(BTN_PIN, Pin.IN, Pin.PULL_UP)
buzzer = Pin(BUZZER_PIN, Pin.OUT)
buzzer.value(0)
# ----------------------------
# VARIABLES
# ----------------------------
angle = 0
status = "ABIERTO"
last_btn = 1
safety_lock = False
last_beep = 0 # GO AGREGÓ
# ----------------------------
# MQTT CALLBACK
# ----------------------------
def mqtt_callback(topic, msg):
global safety_lock, angle, status
cmd = msg.decode().upper().strip()
if cmd == "RESET":
safety_lock = False
buzzer.value(0)
angle = 0
status = "ABIERTO"
servo_open()
mqtt_publish() # publicar estado nuevo
print(">> Reset remoto ejecutado")
# ----------------------------
# WIFI + MQTT FUNCIONES
# ----------------------------
def wifi_connect():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASS)
while not wlan.isconnected():
time.sleep(0.2)
print("WiFi OK:", wlan.ifconfig())
def mqtt_connect():
global mqtt_ok
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, port=MQTT_PORT)
client.set_callback(mqtt_callback)
client.connect()
client.subscribe(TOPIC)
mqtt_ok = True
print("MQTT conectado")
return client
def mqtt_publish():
global mqtt_ok
try:
payload = json.dumps({"temp": temp, "estado": status})
mqtt.publish(TOPIC, payload)
except Exception as e:
print("MQTT error:", e)
mqtt_ok = False
# ----------------------------
# ARRANQUE
# ----------------------------
wifi_connect()
try:
mqtt = mqtt_connect()
except:
mqtt_ok = False
servo_open()
oled_show(status, 0)
# ----------------------------
# LOOP PRINCIPAL
# ----------------------------
while True:
try:
if mqtt_ok:
mqtt.check_msg()
except:
mqtt_ok = False
temp = read_temp_ntc()
# ALERTA
if temp >= TEMP_MAX:
# buzzer.value(1) GO MODIFICÓ
safety_lock = True
status = "CORTE"
angle = 90
servo_close()
# GO AGREGÓ
if time.ticks_ms() - last_beep > 500:
buzzer.value(not buzzer.value())
last_beep = time.ticks_ms()
else:
buzzer.value(0)
safety_lock = False
# Botón solo si NO hay alarma
b = button.value()
if last_btn == 1 and b == 0:
if angle == 0:
angle = 90
status = "CERRADO"
servo_close()
else:
angle = 0
status = "ABIERTO"
servo_open()
time.sleep_ms(200)
last_btn = b
if mqtt_ok:
mqtt_publish()
oled_show(status, temp)
time.sleep(0.4)