#include <WiFi.h>
#include "secrets.h"
#include <WiFiClientSecure.h>
#define MQTT_MAX_PACKET_SIZE 4096
#include <PubSubClient.h>
#include <ArduinoJson.h>
#include "DHTesp.h"
// --- Pin Definitions ---
const int DHT_PIN = 15;
const int LED1_PIN = 17;
const int LED2_PIN = 18;
const int SWITCH1_PIN = 14;
const int SWITCH2_PIN = 16;
const int AIRPUMP1_RELAY_PIN = 12;
const int AIRPUMP2_RELAY_PIN = 13;
const int FILTER_FEED_PUMP1_RELAY_PIN = 20;
const int FILTER_FEED_PUMP2_RELAY_PIN = 19;
const int TRANSFER_PUMP1_RELAY_PIN = 21;
const int TRANSFER_PUMP2_RELAY_PIN = 22;
// --- MQTT Topics ---
// ESP32 publishes status HERE → IoT Rule listens HERE → DynamoDB writes
#define AWS_IOT_PUBLISH_TOPIC "stp0001/status"
// AWS/Dashboard sends commands HERE → ESP32 listens HERE
// IoT Rule does NOT listen to this topic
#define AWS_IOT_SUBSCRIBE_TOPIC "stp0001/commands"
// --- Global State ---
DHTesp dhtSensor;
bool led1State = false;
bool led2State = false;
bool airPump1OnStatus = false;
bool airPump2OnStatus = false;
bool filterfeedPump1OnStatus = false;
bool filterfeedPump2OnStatus = false;
bool transferPump1OnStatus = false;
bool transferPump2OnStatus = false;
bool airPump1MotorStatus = false;
bool airPump2MotorStatus = false;
bool filterfeedPump1MotorStatus = false;
bool filterfeedPump2MotorStatus = false;
bool transferPump1MotorStatus = false;
bool transferPump2MotorStatus = false;
bool bmsOnStatus = false;
int respInFloat1 = 0;
int respInFloat2 = 0;
int respInFloat3 = 0;
unsigned long lastPublishTime = 0;
unsigned long lastReconnectAttempt = 0;
const unsigned long PUBLISH_INTERVAL = 10000;
const unsigned long RECONNECT_INTERVAL = 5000;
WiFiClientSecure net;
PubSubClient client(net);
// -----------------------------------------------------------
// Forward Declaration
// -----------------------------------------------------------
void messageHandler(char* topic, byte* payload, unsigned int length);
// -----------------------------------------------------------
// WiFi Connect
// -----------------------------------------------------------
void connectWiFi() {
if (WiFi.status() == WL_CONNECTED) return;
Serial.print("Connecting to WiFi");
WiFi.mode(WIFI_STA);
WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
int attempts = 0;
while (WiFi.status() != WL_CONNECTED && attempts < 40) {
delay(500);
Serial.print(".");
attempts++;
}
if (WiFi.status() == WL_CONNECTED) {
Serial.println(" WiFi Connected! IP: " + WiFi.localIP().toString());
} else {
Serial.println(" WiFi Failed! Will retry...");
}
}
// -----------------------------------------------------------
// MQTT Connect
// -----------------------------------------------------------
bool connectMQTT() {
if (client.connected()) return true;
net.setCACert(AWS_CERT_CA);
net.setCertificate(AWS_CERT_CRT);
net.setPrivateKey(AWS_CERT_PRIVATE);
client.setBufferSize(4096);
client.setKeepAlive(60);
client.setServer(AWS_IOT_ENDPOINT, 8883);
client.setCallback(messageHandler);
Serial.print("Connecting to AWS IoT...");
if (client.connect(THINGNAME, "", "", 0, 0, 0, 0, true)) {
// Subscribe to COMMANDS topic only
// IoT Rule does NOT watch this topic so no feedback loop
client.subscribe(AWS_IOT_SUBSCRIBE_TOPIC);
Serial.println(" Connected!");
Serial.println("Subscribed to: " + String(AWS_IOT_SUBSCRIBE_TOPIC));
return true;
} else {
Serial.print(" Failed! MQTT state: ");
Serial.println(client.state());
return false;
}
}
// -----------------------------------------------------------
// Helper: Parse Motor Command
// -----------------------------------------------------------
bool parseMotorCommand(JsonVariant val) {
if (val.is<bool>()) return val.as<bool>();
if (val.is<int>()) return (val.as<int>() != 0);
if (val.is<String>()) {
String cmd = val.as<String>();
cmd.toLowerCase();
return (cmd == "start" || cmd == "true" || cmd == "on");
}
return false;
}
// -----------------------------------------------------------
// Publish Status to AWS IoT → triggers IoT Rule → DynamoDB
// -----------------------------------------------------------
void publishMessage() {
if (!client.connected()) {
Serial.println("Cannot publish: MQTT not connected");
return;
}
StaticJsonDocument<4096> doc;
doc["deviceType"] = "devicestp";
doc["deviceId"] = "esp32-001";
doc["VOLTAGE_R"] = 234;
doc["VOLTAGE_Y"] = 230;
doc["VOLTAGE_B"] = 233;
doc["LINEERROR_MESSAGE"] = 0;
doc["AUTO_OR_MANUAL_MODE"] = 0;
doc["AIRPUMP1_ON_STATUS"] = airPump1OnStatus ? 1 : 0;
doc["AIRPUMP2_ON_STATUS"] = airPump2OnStatus ? 1 : 0;
doc["FILTER_FEED_PUMP1_ON_STATUS"] = filterfeedPump1OnStatus ? 1 : 0;
doc["FILTER_FEED_PUMP2_ON_STATUS"] = filterfeedPump2OnStatus ? 1 : 0;
doc["TRANSFER_PUMP1_ON_STATUS"] = transferPump1OnStatus ? 1 : 0;
doc["TRANSFER_PUMP2_ON_STATUS"] = transferPump2OnStatus ? 1 : 0;
doc["AIRPUMP1_MOTOR_STATUS"] = airPump1MotorStatus ? 1 : 0;
doc["AIRPUMP2_MOTOR_STATUS"] = airPump2MotorStatus ? 1 : 0;
doc["FILTER_FEED_PUMP1_MOTOR_STATUS"] = filterfeedPump1MotorStatus ? 1 : 0;
doc["FILTER_FEED_PUMP2_MOTOR_STATUS"] = filterfeedPump2MotorStatus ? 1 : 0;
doc["TRANSFER_PUMP1_MOTOR_STATUS"] = transferPump1MotorStatus ? 1 : 0;
doc["TRANSFER_PUMP2_MOTOR_STATUS"] = transferPump2MotorStatus ? 1 : 0;
doc["BMS_ON_STATUS"] = bmsOnStatus ? 1 : 0;
doc["LEVEL_SWITCH_AIR"] = respInFloat1 ? 1 : 0;
doc["LEVEL_SWITCH_FILTER_FEED"] = respInFloat2 ? 1 : 0;
doc["LEVEL_SWITCH_TRANSFER"] = respInFloat3 ? 1 : 0;
doc["AIRPUMP1_CURRENT"] = 3.7;
doc["AIRPUMP2_CURRENT"] = 3.7;
doc["FF_PUMP1_CURRENT"] = 3.7;
doc["FF_PUMP2_CURRENT"] = 3.7;
doc["TF_PUMP1_CURRENT"] = 3.7;
doc["TF_PUMP2_CURRENT"] = 3.7;
doc["AIRPUMP1_LOWCURRENT_STATUS"] = 0;
doc["AIRPUMP1_HIGHCURRENT_STATUS"] = 0;
doc["AIRPUMP2_LOWCURRENT_STATUS"] = 0;
doc["AIRPUMP2_HIGHCURRENT_STATUS"] = 0;
doc["FF_PUMP1_LOWCURRENT_STATUS"] = 0;
doc["FF_PUMP1_HIGHCURRENT_STATUS"] = 0;
doc["FF_PUMP2_LOWCURRENT_STATUS"] = 0;
doc["FF_PUMP2_HIGHCURRENT_STATUS"] = 0;
doc["TF_PUMP1_LOWCURRENT_STATUS"] = 0;
doc["TF_PUMP1_HIGHCURRENT_STATUS"] = 0;
doc["TF_PUMP2_LOWCURRENT_STATUS"] = 0;
doc["TF_PUMP2_HIGHCURRENT_STATUS"] = 0;
doc["TOTAL_POWER_CONSUMPTION"] = 10.0;
doc["TOTAL_FLOW"] = 10.0;
doc["TOTAL_UNIT"] = 3.0;
doc["FLOW_RATE"] = 4.0;
size_t jsonSize = measureJson(doc);
Serial.print("JSON size: ");
Serial.print(jsonSize);
Serial.println(" bytes");
if (jsonSize > 3900) {
Serial.println("ERROR: JSON too large, aborting!");
return;
}
char jsonBuffer[4096];
serializeJson(doc, jsonBuffer, sizeof(jsonBuffer));
Serial.println("--- Publishing to stp/status ---");
Serial.println(jsonBuffer);
Serial.println("--------------------------------");
if (client.publish(AWS_IOT_PUBLISH_TOPIC, jsonBuffer)) {
Serial.println("Published successfully!");
lastPublishTime = millis();
} else {
Serial.print("Publish FAILED! MQTT state: ");
Serial.println(client.state());
}
}
// -----------------------------------------------------------
// Message Handler — Commands arrive on stp/commands
// IoT Rule never sees this topic so no loop/overwrite possible
// -----------------------------------------------------------
void messageHandler(char* topic, byte* payload, unsigned int length) {
Serial.print("Command received on topic: ");
Serial.println(topic);
StaticJsonDocument<512> doc;
DeserializationError error = deserializeJson(doc, payload, length);
if (error) {
Serial.print("JSON parse failed: ");
Serial.println(error.f_str());
return;
}
serializeJsonPretty(doc, Serial);
const char* targetType = doc["deviceType"];
if (!targetType || String(targetType) != "devicestp") {
Serial.println("Ignored: not for devicestp");
return;
}
bool stateChanged = false;
// --- LEDs ---
if (doc.containsKey("led1")) {
bool newState = doc["led1"];
if (newState != led1State) {
led1State = newState;
digitalWrite(LED1_PIN, led1State ? HIGH : LOW);
Serial.println("LED1: " + String(led1State ? "ON" : "OFF"));
stateChanged = true;
}
}
if (doc.containsKey("led2")) {
bool newState = doc["led2"];
if (newState != led2State) {
led2State = newState;
digitalWrite(LED2_PIN, led2State ? HIGH : LOW);
Serial.println("LED2: " + String(led2State ? "ON" : "OFF"));
stateChanged = true;
}
}
// --- Air Pumps ---
if (doc.containsKey("AIRPUMP1_ON_STATUS")) {
bool newStatus = parseMotorCommand(doc["AIRPUMP1_ON_STATUS"]);
if (newStatus != airPump1OnStatus) {
airPump1OnStatus = newStatus;
digitalWrite(AIRPUMP1_RELAY_PIN, newStatus ? HIGH : LOW);
Serial.println("AIRPUMP1: " + String(newStatus ? "ON" : "OFF"));
stateChanged = true;
}
}
if (doc.containsKey("AIRPUMP2_ON_STATUS")) {
bool newStatus = parseMotorCommand(doc["AIRPUMP2_ON_STATUS"]);
if (newStatus != airPump2OnStatus) {
airPump2OnStatus = newStatus;
digitalWrite(AIRPUMP2_RELAY_PIN, newStatus ? HIGH : LOW);
Serial.println("AIRPUMP2: " + String(newStatus ? "ON" : "OFF"));
stateChanged = true;
}
}
// --- Filter Feed Pumps ---
if (doc.containsKey("FILTER_FEED_PUMP1_ON_STATUS")) {
bool newStatus = parseMotorCommand(doc["FILTER_FEED_PUMP1_ON_STATUS"]);
if (newStatus != filterfeedPump1OnStatus) {
filterfeedPump1OnStatus = newStatus;
digitalWrite(FILTER_FEED_PUMP1_RELAY_PIN, newStatus ? HIGH : LOW);
Serial.println("FF_PUMP1: " + String(newStatus ? "ON" : "OFF"));
stateChanged = true;
}
}
if (doc.containsKey("FILTER_FEED_PUMP2_ON_STATUS")) {
bool newStatus = parseMotorCommand(doc["FILTER_FEED_PUMP2_ON_STATUS"]);
if (newStatus != filterfeedPump2OnStatus) {
filterfeedPump2OnStatus = newStatus;
digitalWrite(FILTER_FEED_PUMP2_RELAY_PIN, newStatus ? HIGH : LOW);
Serial.println("FF_PUMP2: " + String(newStatus ? "ON" : "OFF"));
stateChanged = true;
}
}
// --- Transfer Pumps ---
if (doc.containsKey("TRANSFER_PUMP1_ON_STATUS")) {
bool newStatus = parseMotorCommand(doc["TRANSFER_PUMP1_ON_STATUS"]);
if (newStatus != transferPump1OnStatus) {
transferPump1OnStatus = newStatus;
digitalWrite(TRANSFER_PUMP1_RELAY_PIN, newStatus ? HIGH : LOW);
Serial.println("TF_PUMP1: " + String(newStatus ? "ON" : "OFF"));
stateChanged = true;
}
}
if (doc.containsKey("TRANSFER_PUMP2_ON_STATUS")) {
bool newStatus = parseMotorCommand(doc["TRANSFER_PUMP2_ON_STATUS"]);
if (newStatus != transferPump2OnStatus) {
transferPump2OnStatus = newStatus;
digitalWrite(TRANSFER_PUMP2_RELAY_PIN, newStatus ? HIGH : LOW);
Serial.println("TF_PUMP2: " + String(newStatus ? "ON" : "OFF"));
stateChanged = true;
}
}
// --- BMS via BMS_ON_STATUS ---
if (doc.containsKey("BMS_ON_STATUS")) {
String bmsCommand = doc["BMS_ON_STATUS"].as<String>();
bmsCommand.toUpperCase();
if (bmsCommand == "START" || bmsCommand == "STOP") {
bool newStatus = (bmsCommand == "START");
if (newStatus != bmsOnStatus) {
bmsOnStatus = newStatus;
Serial.println("BMS: " + String(bmsOnStatus ? "ON" : "OFF"));
stateChanged = true;
}
} else {
Serial.println("Invalid BMS_ON_STATUS: " + bmsCommand);
}
}
// --- BMS via BMS_STATUS ---
if (doc.containsKey("BMS_STATUS")) {
bool newStatus = false;
if (doc["BMS_STATUS"].is<bool>())
newStatus = doc["BMS_STATUS"].as<bool>();
else if (doc["BMS_STATUS"].is<int>())
newStatus = (doc["BMS_STATUS"].as<int>() != 0);
else if (doc["BMS_STATUS"].is<String>()) {
String s = doc["BMS_STATUS"].as<String>();
s.toLowerCase();
newStatus = (s == "start" || s == "true" || s == "on");
}
if (newStatus != bmsOnStatus) {
bmsOnStatus = newStatus;
Serial.println("BMS_STATUS: " + String(bmsOnStatus ? "ON" : "OFF"));
stateChanged = true;
}
}
// Publish updated state to stp/status → DynamoDB updates
if (stateChanged) {
publishMessage();
}
}
// -----------------------------------------------------------
// Setup
// -----------------------------------------------------------
void setup() {
Serial.begin(115200);
delay(500);
dhtSensor.setup(DHT_PIN, DHTesp::DHT22);
pinMode(LED1_PIN, OUTPUT);
pinMode(LED2_PIN, OUTPUT);
pinMode(SWITCH1_PIN, INPUT_PULLUP);
pinMode(SWITCH2_PIN, INPUT_PULLUP);
pinMode(AIRPUMP1_RELAY_PIN, OUTPUT);
pinMode(AIRPUMP2_RELAY_PIN, OUTPUT);
pinMode(FILTER_FEED_PUMP1_RELAY_PIN, OUTPUT);
pinMode(FILTER_FEED_PUMP2_RELAY_PIN, OUTPUT);
pinMode(TRANSFER_PUMP1_RELAY_PIN, OUTPUT);
pinMode(TRANSFER_PUMP2_RELAY_PIN, OUTPUT);
digitalWrite(LED1_PIN, LOW);
digitalWrite(LED2_PIN, LOW);
digitalWrite(AIRPUMP1_RELAY_PIN, LOW);
digitalWrite(AIRPUMP2_RELAY_PIN, LOW);
digitalWrite(FILTER_FEED_PUMP1_RELAY_PIN, LOW);
digitalWrite(FILTER_FEED_PUMP2_RELAY_PIN, LOW);
digitalWrite(TRANSFER_PUMP1_RELAY_PIN, LOW);
digitalWrite(TRANSFER_PUMP2_RELAY_PIN, LOW);
connectWiFi();
connectMQTT();
publishMessage(); // send initial state on boot
}
// -----------------------------------------------------------
// Main Loop
// -----------------------------------------------------------
void loop() {
if (WiFi.status() != WL_CONNECTED) {
Serial.println("WiFi lost, reconnecting...");
connectWiFi();
}
if (!client.connected()) {
unsigned long now = millis();
if (now - lastReconnectAttempt >= RECONNECT_INTERVAL) {
lastReconnectAttempt = now;
Serial.println("MQTT disconnected, reconnecting...");
if (connectMQTT()) {
lastReconnectAttempt = 0;
publishMessage();
}
}
} else {
unsigned long now = millis();
if (now - lastPublishTime >= PUBLISH_INTERVAL) {
publishMessage();
}
client.loop();
}
delay(100);
}BMS
AUTO/Manual
BMS 1 0 or 1 1
AUTO 0 1
Manual 0 0