import esp32, onewire, ds18x20, network, ntptime
from umqtt.robust import MQTTClient
from machine import RTC, Timer, Pin
from time import sleep_ms
from secrets import *
#--- Reloj de Tiempo Real ---#
def set_rtc():
rtc = RTC()
ntptime.settime()
timestamp = rtc.datetime()
year, month, day, weekday, hours, minutes, seconds, subseconds = timestamp
print(f"UTC: {year}-{month}-{day}T{hours}:{minutes}:{seconds}Z")
#--- Obtener la dirección MAC ---#
def read_mac_address():
mac = wlan.config('mac')
mac_address = ':'.join('{:02x}'.format(b) for b in mac)
print("MAC address:", mac_address)
return mac_address
#--- Obtener la temperatura ambiente ---#
def read_temperature():
ds_sensor.convert_temp()
temperature = ds_sensor.read_temp(roms[0])
print("Temperature", temperature, "°C")
return temperature
#--- Obtener la temperatura del CPU ---#
def read_cpu_temperature():
cpu_temp_fahrenheit = esp32.raw_temperature()
cpu_temperature = (cpu_temp_fahrenheit - 32) / 1.8
print("CPU", cpu_temperature, "°C")
return cpu_temperature
#--- Callback MQTT ---#
def callback(topic, msg):
topic = topic.decode()
msg = msg.decode()
print(f"Payload: {msg}, from {topic}")
# De acuerdo al mensaje recibido, encender o apagar el pin 2
if topic == MQTT_TOPIC_1 and msg == "ON":
led_builtin.on()
elif topic == MQTT_TOPIC_1 and msg == "OFF":
led_builtin.off()
#--- Callback Timer ---#
def publisher(timer):
# Enviar la temperatura ambiente
temperature = read_temperature()
cliente.publish(MQTT_TOPIC_2, str(temperature))
# Enviar la temperatura del SoC
cpu_temperature = read_cpu_temperature()
cliente.publish(MQTT_TOPIC_3, str(cpu_temperature))
#--- Setup ---#
led_builtin = Pin(2, Pin.OUT, value=0)
ds_pin = Pin(22)
ds_sensor = ds18x20.DS18X20(onewire.OneWire(ds_pin))
roms = ds_sensor.scan()
print('Found DS devices: ', roms)
set_rtc()
#--- Conexión a WiFi ---#
wlan = network.WLAN(network.STA_IF)
if not wlan.active():
wlan.active(True)
if not wlan.isconnected():
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
print("Connecting...")
while not wlan.isconnected():
status_led = not led_builtin.value()
led_builtin.value(status_led)
print(".", end="")
sleep_ms(500)
led_builtin.value(1)
config = wlan.ifconfig()
print(f"Connected with IP {config[0]}")
#--- Conexión a MQTT ---#
MQTT_CLIENT_ID = read_mac_address()
cliente = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, user=MQTT_USER,
password=MQTT_PASSWORD, keepalive=30)
print("Connecting to", MQTT_BROKER)
cliente.set_callback(callback)
cliente.connect(clean_session=False)
print("Connection successful")
cliente.subscribe(MQTT_TOPIC_1)
cliente.publish(MQTT_TOPIC_0, MQTT_WELCOME_MSG)
led_builtin.value(0)
#--- Timer 0 ---#
tim = Timer(0)
tim.init(period=30000, mode=Timer.PERIODIC, callback=publisher)
#--- Rutina Principal ---#
while True:
cliente.check_msg()
sleep_ms(500)