#include <WiFi.h>
#include <PubSubClient.h>
#include "SimpleJson.h"
#define MIRROR_LED 2
// -- WiFi ------------------------------------------------------------
const char* WIFI_SSID = "Wokwi-GUEST";
const char* WIFI_PASS = "";
// -- MQTT ------------------------------------------------------------
const char* MQTT_BROKER = "broker.hivemq.com";
const int MQTT_PORT = 1883;
const char* TOPIC_DATA = "iem/task3/arushi/data";
const char* TOPIC_CMD = "iem/task3/arushi/cmd";
// -- Robustness ------------------------------------------------------
const char* SHARED_TOKEN = "iem2026";
const char* EXPECTED_SOURCE = "pico";
const unsigned long WATCHDOG_TIMEOUT = 10000;
// -- Remote Pico state -----------------------------------------------
int remotePotValue = 0;
int remoteInterval = 0;
int remoteUptime = 0;
bool remoteBlink = false;
bool remoteLedState = false;
bool remoteSafeState = false;
// -- Robustness state ------------------------------------------------
unsigned long lastValidData = 0;
bool picoTimeout = false;
int expectedSeqNr = -1;
unsigned long seqOutgoing = 0;
// -- Statistics ------------------------------------------------------
unsigned long msgAccepted = 0;
unsigned long msgRejectedJson = 0;
unsigned long msgRejectedAuth = 0;
unsigned long msgSeqGaps = 0;
WiFiClient wifiClient;
PubSubClient mqtt(wifiClient);
SimpleJson jsonOut, jsonIn;
// ── WiFi setup ──────────────────────────────────────────────────────
void setupWiFi() {
Serial.print("Connecting to WiFi");
WiFi.begin(WIFI_SSID, WIFI_PASS);
int attempts = 0;
while (WiFi.status() != WL_CONNECTED && attempts < 30) {
delay(500);
Serial.print(".");
attempts++;
}
if (WiFi.status() == WL_CONNECTED) {
Serial.println("\nWiFi connected!");
} else {
Serial.println("\nWiFi FAILED");
}
}
// ── Message validation ───────────────────────────────────────────────
// Rejects messages with wrong token or wrong source
bool validateMessage(const SimpleJson& msg) {
if (strcmp(msg.getString("token"), SHARED_TOKEN) != 0) {
msgRejectedAuth++;
return false;
}
if (strcmp(msg.getString("source"), EXPECTED_SOURCE) != 0) {
msgRejectedAuth++;
return false;
}
return true;
}
// ── Sequence number check ────────────────────────────────────────────
bool checkSequence(const SimpleJson& msg) {
int seq = msg.getInt("seq", -1);
if (seq < 0) return true;
if (seq == 0) { expectedSeqNr = 1; return true; }
if (expectedSeqNr < 0) { expectedSeqNr = seq + 1; return true; }
if (seq < expectedSeqNr) {
Serial.println("SEQ: replay discarded");
return false;
}
if (seq > expectedSeqNr) {
Serial.println("SEQ: gap detected");
msgSeqGaps++;
}
expectedSeqNr = seq + 1;
return true;
}
// ── MQTT callback ────────────────────────────────────────────────────
// Receives sensor data published by Pico
void mqttCallback(char* topic, byte* payload, unsigned int length) {
char buf[256];
unsigned int len = min(length, (unsigned int)255);
memcpy(buf, payload, len);
buf[len] = 0;
jsonIn.parse(buf);
if (jsonIn.count == 0) { msgRejectedJson++; return; }
if (!validateMessage(jsonIn)) return;
if (!checkSequence(jsonIn)) return;
// Store remote Pico state
remotePotValue = jsonIn.getInt ("potValue");
remoteBlink = jsonIn.getBool("blinkEnabled");
remoteInterval = jsonIn.getInt ("interval");
remoteLedState = jsonIn.getBool("ledState");
remoteSafeState = jsonIn.getBool("safeState");
remoteUptime = jsonIn.getInt ("uptime");
// Mirror LED follows Pico LED state
digitalWrite(MIRROR_LED, remoteLedState ? HIGH : LOW);
lastValidData = millis();
picoTimeout = false;
msgAccepted++;
}
// ── Send command to Pico ─────────────────────────────────────────────
void sendCommand(SimpleJson& cmd) {
cmd.setString("token", SHARED_TOKEN);
cmd.setString("source", "nano");
cmd.setInt ("seq", (int)seqOutgoing++);
char buf[256];
cmd.toCharArray(buf, sizeof(buf));
mqtt.publish(TOPIC_CMD, buf);
Serial.print("TX: "); Serial.println(buf);
}
// ── MQTT reconnect ───────────────────────────────────────────────────
void mqttReconnect() {
while (!mqtt.connected()) {
Serial.print("Connecting to MQTT...");
String clientId = "esp32-" + String(random(0xffff), HEX);
if (mqtt.connect(clientId.c_str())) {
Serial.println("connected");
mqtt.subscribe(TOPIC_DATA);
} else {
Serial.print("failed rc=");
Serial.print(mqtt.state());
Serial.println(" retrying in 3s");
delay(3000);
}
}
}
// ── Serial command processor ─────────────────────────────────────────
void processSerialInput() {
if (!Serial.available()) return;
String input = Serial.readStringUntil('\n');
input.trim();
input.toUpperCase();
Serial.print("> "); Serial.println(input);
jsonOut.clear();
if (input == "ON") {
jsonOut.setBool("blinkEnabled", false);
jsonOut.setBool("ledOn", true);
sendCommand(jsonOut);
Serial.println("ACK:ON");
}
else if (input == "OFF") {
jsonOut.setBool("blinkEnabled", false);
jsonOut.setBool("ledOn", false);
sendCommand(jsonOut);
Serial.println("ACK:OFF");
}
else if (input == "BLINK") {
jsonOut.setBool("blinkEnabled", true);
sendCommand(jsonOut);
Serial.println("ACK:BLINK");
}
else if (input == "NOBLINK") {
jsonOut.setBool("blinkEnabled", false);
sendCommand(jsonOut);
Serial.println("ACK:NOBLINK");
}
else if (input.startsWith("INTERVAL ")) {
int val = input.substring(9).toInt();
if (val >= 50 && val <= 2000) {
jsonOut.setInt("interval", val);
sendCommand(jsonOut);
Serial.print("ACK:INTERVAL:"); Serial.println(val);
} else {
Serial.println("ERROR: interval must be 50-2000 ms");
}
}
else if (input == "POT") {
jsonOut.setBool("useLocalPot", true);
sendCommand(jsonOut);
Serial.println("ACK:POT");
}
else if (input == "STATUS") {
Serial.println("── Remote Pico Status ──────────");
Serial.print("LED: "); Serial.println(remoteLedState ? "ON" : "OFF");
Serial.print("Blink: "); Serial.println(remoteBlink ? "ON" : "OFF");
Serial.print("Interval: "); Serial.print(remoteInterval); Serial.println(" ms");
Serial.print("Poti: "); Serial.println(remotePotValue);
Serial.print("SafeState:"); Serial.println(remoteSafeState ? "YES" : "NO");
Serial.print("Uptime: "); Serial.print(remoteUptime); Serial.println(" s");
Serial.println("────────────────────────────────");
}
else if (input == "STATS") {
Serial.println("── Statistics ──────────────────");
Serial.print("Accepted: "); Serial.println(msgAccepted);
Serial.print("Rejected auth: "); Serial.println(msgRejectedAuth);
Serial.print("Rejected JSON: "); Serial.println(msgRejectedJson);
Serial.print("Seq gaps: "); Serial.println(msgSeqGaps);
Serial.println("────────────────────────────────");
}
else if (input == "HELP") {
Serial.println("── Commands ────────────────────");
Serial.println("ON LED on");
Serial.println("OFF LED off");
Serial.println("BLINK Start blinking");
Serial.println("NOBLINK Stop blinking");
Serial.println("INTERVAL <ms> Set interval (50-2000)");
Serial.println("POT Restore poti control");
Serial.println("STATUS Show Pico state");
Serial.println("STATS Show statistics");
Serial.println("HELP Show this help");
Serial.println("────────────────────────────────");
}
else {
Serial.print("ERROR: Unknown command '");
Serial.print(input); Serial.println("'");
}
}
// ── setup ────────────────────────────────────────────────────────────
void setup() {
pinMode(MIRROR_LED, OUTPUT);
Serial.begin(115200);
delay(1000);
Serial.println("=== ESP32 MQTT Gateway ===");
Serial.println("Type HELP for commands");
setupWiFi();
mqtt.setServer(MQTT_BROKER, MQTT_PORT);
mqtt.setCallback(mqttCallback);
lastValidData = millis();
}
// ── loop ─────────────────────────────────────────────────────────────
void loop() {
unsigned long now = millis();
// MQTT connection + processing
if (!mqtt.connected()) {
picoTimeout = false;
mqttReconnect();
}
mqtt.loop();
// Watchdog – warn once when Pico stops responding
if (!picoTimeout && (now - lastValidData > WATCHDOG_TIMEOUT)) {
picoTimeout = true;
digitalWrite(MIRROR_LED, LOW);
Serial.println("WATCHDOG: Pico not responding!");
}
// Reset watchdog when Pico comes back online
if (picoTimeout && (now - lastValidData < WATCHDOG_TIMEOUT)) {
picoTimeout = false;
Serial.println("WATCHDOG: Pico back online!");
}
processSerialInput();
}