#include <WiFi.h>
#include <PubSubClient.h>
#include <DHTesp.h> // Librería para el sensor DHT22 (instalar DHT sensor library for ESPx)
#include <ArduinoJson.h> // Librería para parsear y crear JSON fácilmente
#include <HTTPClient.h> // Para hacer la petición pull-on-boot
#include <WiFiClientSecure.h> // Para hacer la petición por la capa HTTPS
// --- MACRO PARA DEPURACIÓN ---
// Muestra nombre de tarea, función y línea en los logs por puerto serie
#define DEBUG_STRING "["+String(pcTaskGetName(NULL))+" - "+String(__FUNCTION__)+"():"+String(__LINE__)+"] "
// ================================================================================
// CONSTANTES DE CONFIGURACIÓN (¡LOS ESTUDIANTES DEBEN MODIFICAR ESTO!)
// ================================================================================
// --- WiFi --- (En Wokwi se suele usar Wokwi-GUEST)
const String ssid = "Wokwi-GUEST";
const String password = "";
// --- MQTT Broker ---
const String mqtt_server = "mqtt.iot-uma.es";
const int mqtt_port = 1883;
const String mqtt_user = "micro9"; // USUARIO MQTT ASIGNADO (coincide con el namespace)
const String mqtt_pass = "dP3JySLl"; // CONTRASEÑA MQTT ASIGNADA
// --- Identificadores del Gemelo Digital (Ditto) ---
const String NAMESPACE = mqtt_user;
const String THING_NAME = "ESP32-example";
// --- Topics según la configuración de Eclipse Ditto ---
// Telemetría de entrada (Conexión 1: mqtt-micro-in)
const String topic_TELEMETRIA = "iot/devices/" + NAMESPACE + "/" + THING_NAME;
// Recepción de comandos nativos (Conexión 3: mqtt-micro-cmd, downlink)
const String topic_COMANDOS = NAMESPACE + "/" + THING_NAME + "/mensajes";
// Respuestas a los comandos nativos (Conexión 3: mqtt-micro-cmd, uplink)
const String topic_RESPUESTAS = NAMESPACE + "/" + THING_NAME + "/respuestas";
// Recepción de Desired Properties (Conexión 6: mqtt-desired-out)
const String topic_DESIRED = NAMESPACE + "/" + THING_NAME + "/desiredproperties";
// --- Tiempos y Hardware ---
#define PERIODO_PUBLICACION 30000 // Publicar cada 30 segundos (30000 ms)
#define DHTPIN 1 // Pin digital conectado al DHT22
// ================================================================================
// --- Variables Globales ---
WiFiClient wClient;
PubSubClient mqtt_client(wClient);
DHTesp dht;
// Semáforo para coordinar las tareas FreeRTOS
SemaphoreHandle_t semMqttReady;
//-----------------------------------------------------
// Utilidad: Muestra información de la prioridad de la tarea actual
//-----------------------------------------------------
inline void info_tarea_actual() {
Serial.println(DEBUG_STRING + "Prioridad de tarea " + String(pcTaskGetName(NULL)) + ": " + String(uxTaskPriorityGet(NULL)));
}
//-----------------------------------------------------
// Callback MQTT → Se ejecuta automáticamente al recibir un mensaje suscrito
//-----------------------------------------------------
void procesa_mensaje(char* topic, byte* payload, unsigned int length) {
String mensaje = "";
for(int i = 0; i < length; i++) {
mensaje += (char)payload[i];
}
Serial.println(DEBUG_STRING + "======= MENSAJE RECIBIDO =======");
Serial.println(DEBUG_STRING + "Topic: [" + String(topic) + "]");
Serial.println(DEBUG_STRING + "Payload: " + mensaje);
// Verificamos si el mensaje viene por el topic de comandos
if(String(topic) == topic_COMANDOS) {
// Parseamos el JSON recibido (que viene en formato Ditto Protocol nativo)
// Documentación: https://arduinojson.org/
StaticJsonDocument<512> doc;
DeserializationError error = deserializeJson(doc, mensaje);
if (error) {
Serial.print(DEBUG_STRING + "Error parseando JSON del comando: ");
Serial.println(error.c_str());
return;
}
// Ditto Protocol envía el nombre del mensaje en un campo temporal 'path' o 'topic' interno.
// Ej: "path": "/inbox/messages/reiniciar"
String path = doc["path"].as<String>();
if (path.endsWith("/reiniciar")) {
// Extraemos los parámetros enviados en el campo "value"
int retraso_segundos = doc["value"]["retraso_segundos"] | 5; // Valor por defecto 5 si no viene
String razon = doc["value"]["razon"] | "Desconocida";
Serial.println(DEBUG_STRING + "¡Orden de REINICIO recibida!");
Serial.println(DEBUG_STRING + "Razón: " + razon + " | Retraso: " + String(retraso_segundos) + "s");
// Construir respuesta de confirmación en formato Ditto Protocol
StaticJsonDocument<256> respDoc;
respDoc["topic"] = doc["topic"]; // Mismo topic interno de Ditto
respDoc["path"] = doc["path"];
respDoc["status"] = 200; // 200 = Éxito
// Es VITAL devolver el mismo correlation-id para que Ditto pueda enlazar la respuesta con la petición HTTP original
JsonObject headers = respDoc.createNestedObject("headers");
headers["correlation-id"] = doc["headers"]["correlation-id"];
headers["content-type"] = "application/json";
// Crear objeto value (IMPORTANTE)
JsonObject value = respDoc.createNestedObject("value");
value["status"] = 200;
value["error"] = nullptr;
value["message"] = "Orden de reinicio recibida y aceptada.";
// Construir descripción dinámica
String descripcion = "segundos para ejecucion " + String(retraso_segundos);
value["description"] = descripcion;
String respuestaStr;
serializeJson(respDoc, respuestaStr);
// Enviamos la confirmación por el topic de respuestas (uplink)
mqtt_client.publish(topic_RESPUESTAS.c_str(), respuestaStr.c_str());
Serial.println(DEBUG_STRING + "Respuesta enviada: " + respuestaStr);
Serial.println(DEBUG_STRING + "Reinicio programado en : " + String(retraso_segundos) + " segundos");
// Retardo y reinicio por hardware
delay(retraso_segundos * 1000);
ESP.restart();
}
else {
Serial.println(DEBUG_STRING + "Comando no reconocido: " + path);
}
}
// Verificamos si el mensaje viene por el topic de Desired Properties
else if (String(topic) == topic_DESIRED) {
StaticJsonDocument<512> doc;
DeserializationError error = deserializeJson(doc, mensaje);
if (error) {
Serial.print(DEBUG_STRING + "Error parseando JSON de desiredProperties: ");
Serial.println(error.c_str());
return;
}
// Buscamos si la orden afecta a nuestra válvula de agua
if (doc.containsKey("water_valve")) {
// El formato que entra de nuestra conexión mqtt-desired-out es: {"water_valve": {"value": "ON"}}
String nuevo_estado = doc["water_valve"]["value"].as<String>();
Serial.println(DEBUG_STRING + "=> Actualización DESIRED recibida para water_valve: " + nuevo_estado);
// ==========================================
// AQUÍ IRÍAN LAS ACCIONES FÍSICAS SOBRE EL HARDWARE
// if (nuevo_estado == "ON") { digitalWrite(PIN_VALVULA, HIGH); } else { digitalWrite(PIN_VALVULA, LOW); }
// ==========================================
// Tras ejecutar el accionamiento físico, actualizamos nuestro estado real ("properties")
// a través de la conexión de entrada de telemetría (mqtt-micro-in)
StaticJsonDocument<200> respuestaProg;
// Para la conexión de telemetría de entrada el formato es directo:
respuestaProg["water_valve"] = nuevo_estado;
String jsonOut;
serializeJson(respuestaProg, jsonOut);
// Confirmamos al Gemelo Digital "estado acatado"
mqtt_client.publish(topic_TELEMETRIA.c_str(), jsonOut.c_str());
Serial.println(DEBUG_STRING + "<= Sincronización publicada en telemetría: " + jsonOut);
}
}
}
//-----------------------------------------------------
// Conexión con WiFi
//-----------------------------------------------------
void conecta_wifi() {
Serial.println(DEBUG_STRING + "Conectando a " + ssid);
WiFi.mode(WIFI_STA);
WiFi.begin(ssid.c_str(), password.c_str());
while (WiFi.status() != WL_CONNECTED) {
vTaskDelay(pdMS_TO_TICKS(500));
Serial.print(".");
}
Serial.println();
Serial.println(DEBUG_STRING + "WiFi conectado. IP: " + WiFi.localIP().toString());
}
//-----------------------------------------------------
// Conexión con MQTT
//-----------------------------------------------------
void conecta_mqtt() {
// Bucle hasta que logremos conectar
while (!mqtt_client.connected()) {
Serial.println(DEBUG_STRING + "Intentando conexión MQTT...");
// El Client ID debe ser único, usamos la MAC por seguridad
String clientId = "ESP32Client-" + String(WiFi.macAddress());
if (mqtt_client.connect(clientId.c_str(), mqtt_user.c_str(), mqtt_pass.c_str(),topic_TELEMETRIA.c_str(),1,true,"{\"online\":false}")) {
Serial.println(DEBUG_STRING + "Conectado al broker: " + mqtt_server);
// Nos suscribimos al topic para recibir los mensajes/órdenes de Ditto
mqtt_client.subscribe(topic_COMANDOS.c_str());
Serial.println(DEBUG_STRING + "Suscrito a: " + topic_COMANDOS);
// Nos suscribimos al topic para monitorizar los cambios a desiredProperties
mqtt_client.subscribe(topic_DESIRED.c_str());
Serial.println(DEBUG_STRING + "Suscrito a: " + topic_DESIRED);
// Publicamos en el topic iot/devices/<namespace>/<name>
mqtt_client.publish(topic_TELEMETRIA.c_str(), "{\"online\":true}", true);
} else {
Serial.print(DEBUG_STRING + "Error MQTT state: ");
Serial.print(mqtt_client.state());
Serial.println(". Reintentando en 5 segundos...");
vTaskDelay(pdMS_TO_TICKS(5000));
}
}
}
//-----------------------------------------------------
// Pull on Boot: Recupera el estado actual del Gemelo Digital
//-----------------------------------------------------
void pull_on_boot() {
Serial.println(DEBUG_STRING + "Iniciando Pull-on-Boot (Sincronización inicial)...");
WiFiClientSecure client;
client.setInsecure(); // No validar el certificado SSL para simplificar
HTTPClient http;
// URL apuntando directamente al feature water_valve
String url = "https://ditto.iot-uma.es/api/2/things/" + NAMESPACE + ":" + THING_NAME + "/features/water_valve";
http.begin(client, url);
// La API requiere Basic Auth con las mismas credenciales
http.setAuthorization(mqtt_user.c_str(), mqtt_pass.c_str());
int httpCode = http.GET();
if (httpCode > 0) {
if (httpCode == HTTP_CODE_OK) {
String payload = http.getString();
Serial.println(DEBUG_STRING + "Estado recibido: " + payload);
StaticJsonDocument<1024> doc;
DeserializationError error = deserializeJson(doc, payload);
if (!error) {
// Extraemos properties (reportadas) y desiredProperties (deseadas) si existen
String reported = doc["properties"]["value"] | "UNKNOWN";
String desired = doc["desiredProperties"]["value"] | "UNKNOWN";
Serial.println(DEBUG_STRING + "=> Reported: " + reported + " | Desired: " + desired);
if (desired != "UNKNOWN" && desired != reported) {
Serial.println(DEBUG_STRING + "¡Discrepancia detectada! Aplicando desiredState: " + desired);
// ==========================================
// AQUÍ IRÍAN LAS ACCIONES FÍSICAS FRENTE A DISCREPANCIA
// if (desired == "ON") { digitalWrite(PIN_VALVULA, HIGH); } else { digitalWrite(PIN_VALVULA, LOW); }
// ==========================================
// Sincronizar de vuelta vía MQTT
StaticJsonDocument<200> respuestaProg;
respuestaProg["water_valve"] = desired;
String jsonOut;
serializeJson(respuestaProg, jsonOut);
mqtt_client.publish(topic_TELEMETRIA.c_str(), jsonOut.c_str());
Serial.println(DEBUG_STRING + "<= Discrepancia corregida y publicada: " + jsonOut);
} else {
Serial.println(DEBUG_STRING + "El dispositivo ya está sincronizado con Ditto.");
}
}
} else {
Serial.println(DEBUG_STRING + "Error HTTP en GET: " + String(httpCode));
}
} else {
Serial.println(DEBUG_STRING + "Fallo al conectar con la API de Ditto.");
}
http.end();
}
//================================================================================
// TAREAS FreeRTOS
//================================================================================
//-----------------------------------------------------
// TAREA 1: Mantener conexión WiFi/MQTT y atender mensajes entrantes
//-----------------------------------------------------
void taskMQTTService(void *pvParameters) {
info_tarea_actual();
// 1. Iniciar WiFi
conecta_wifi();
// 2. Configurar cliente MQTT
mqtt_client.setServer(mqtt_server.c_str(), mqtt_port);
mqtt_client.setBufferSize(1024); // Ampliado para soportar JSON pesados del Ditto Protocol
mqtt_client.setCallback(procesa_mensaje);
// 3. Conectar al Broker MQTT
conecta_mqtt();
// 4. Sincronización Pull-on-Boot (Descarga el estado del Gemelo y corrige discrepancias)
pull_on_boot();
Serial.println(DEBUG_STRING + "Conexiones establecidas. Abriendo semáforo para publicar...");
// Señalizar a la tarea de sensores que ya puede empezar a trabajar
xSemaphoreGive(semMqttReady);
// Bucle infinito de la tarea MQTT
while(true) {
if (!mqtt_client.connected()) {
conecta_mqtt(); // Reconectar si se cae
}
mqtt_client.loop(); // Atiende llamadas y suscripciones (Triggera el callback si hay mensajes)
// Pequeño delay para ceder tiempo de CPU al sistema
vTaskDelay(pdMS_TO_TICKS(10));
}
}
//-----------------------------------------------------
// TAREA 2: Leer el sensor DHT22 y publicar los datos periódicamente
//-----------------------------------------------------
void taskPublisher(void *pvParameters) {
info_tarea_actual();
// Inicializar el sensor
dht.setup(DHTPIN, DHTesp::DHT22);
Serial.println(DEBUG_STRING + "Tarea sensora esperando a que MQTT esté listo...");
// Bloquear la tarea hasta que el semáforo sea liberado por la tarea MQTT
xSemaphoreTake(semMqttReady, portMAX_DELAY);
Serial.println(DEBUG_STRING + "Semáforo superado. Iniciando lecturas...");
const TickType_t periodo = pdMS_TO_TICKS(PERIODO_PUBLICACION); // Cada 30 seg
while(true) {
// Leemos humedad y temperatura simultáneamente
TempAndHumidity newValues = dht.getTempAndHumidity();
// Comprobamos si la lectura ha fallado según el código de estado interno de DHTesp
if (dht.getStatus() != 0) {
Serial.println(DEBUG_STRING + "Error al leer del sensor DHT22! Razón: " + String(dht.getStatusString()));
} else {
float t = newValues.temperature;
float h = newValues.humidity;
// Creamos un JSON simple con la estructura requerida por nuestra conexión IN
StaticJsonDocument<200> telemetria;
telemetria["temperature"] = t;
telemetria["humidity"] = h;
String jsonStr;
serializeJson(telemetria, jsonStr);
Serial.println(DEBUG_STRING + "Publicando telemetría:");
Serial.println(" -> Topic: " + topic_TELEMETRIA);
Serial.println(" -> Datos: " + jsonStr);
// Publicamos en el topic iot/devices/<namespace>/<name>
mqtt_client.publish(topic_TELEMETRIA.c_str(), jsonStr.c_str());
}
// Dormimos la tarea hasta el siguiente ciclo (30s) cediendo completamente el procesador
vTaskDelay(periodo);
}
}
//================================================================================
// SETUP Y LOOP PRINCIPAL
//================================================================================
void setup() {
Serial.begin(115200);
Serial.println();
// Creamos el semáforo binario para coordinar el arranque de tareas
semMqttReady = xSemaphoreCreateBinary();
info_tarea_actual();
// Creamos y arrancamos la tarea de Red (Prioridad alta = 2)
xTaskCreate(
taskMQTTService, // Función de la tarea
"MQTT_Service", // Nombre amigable
8192, // Tamaño de pila (stack size) en bytes
NULL, // Parámetros a pasar a la tarea
2, // Prioridad
NULL // Handle de la tarea
);
// Creamos y arrancamos la Tarea Sensora (Prioridad normal = 1)
xTaskCreate(
taskPublisher,
"Sensor_Pub",
4096,
NULL,
1,
NULL
);
Serial.println(DEBUG_STRING + "Setup (core) terminado, el sistema queda en manos de FreeRTOS.");
// Eliminamos la tarea loop() ya que en FreeRTOS no necesitamos este bucle
vTaskDelete(NULL);
}
void loop() {
// El loop de Arduino queda inutilizado y liberado gracias a las tareas FreeRTOS
}