print("Hello, ESP32!")
# main.py - MicroPython Code for ESP32 on Wokwi
# This script simulates a renewable energy microgrid monitor and publishes to MQTT.
import time
import json
from machine import Pin, ADC
import network
from umqttsimple import MQTTClient
# --- Configuration (Changed to test.mosquitto.org for better Wokwi reliability) ---
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
MQTT_BROKER = "test.mosquitto.org"
MQTT_PORT = 1883
MQTT_CLIENT_ID = b"esp32_microgrid_client_" + str(time.ticks_ms()).encode()
# *** TOPICS MATCHING FLUTTER APPLICATION ***
DATA_TOPIC = b"odisha/microgrid/data"
ALERTS_TOPIC = b"odisha/microgrid/alert"
# Define ADC Pins for Potentiometers (Simulated Sensors)
SOLAR_V_PIN = 32
BATTERY_V_PIN = 33
LOAD_I_PIN = 34
# --- Scaling and Alert Thresholds ---
ADC_MAX = 4095
# Using 4.0V max and 2.0A max for a more realistic simulation range
VOLTAGE_SCALE = 4.0 / ADC_MAX
CURRENT_SCALE = 2.0 / ADC_MAX
ALERT_LOW_SOLAR_V = 1.5 # < 1.5V
ALERT_BATTERY_FULL_V = 3.2 # > 3.2V
ALERT_LOW_CURRENT_I = 0.05 # < 0.05A
# --- Initialization ---
adc_solar = ADC(Pin(SOLAR_V_PIN))
adc_battery = ADC(Pin(BATTERY_V_PIN))
adc_load = ADC(Pin(LOAD_I_PIN))
# Set attenuation for full 0-3.3V range reading (best practice)
adc_solar.atten(ADC.ATTN_11DB)
adc_battery.atten(ADC.ATTN_11DB)
adc_load.atten(ADC.ATTN_11DB)
def connect_wifi():
"""Connect to Wi-Fi network."""
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
if not sta_if.isconnected():
print("Connecting to network...")
sta_if.connect(WIFI_SSID, WIFI_PASSWORD)
while not sta_if.isconnected():
time.sleep(0.5)
print(".", end="")
print(f"\nConnected! IP Address: {sta_if.ifconfig()[0]}")
def connect_mqtt():
"""Connect to the MQTT broker."""
try:
# Use umqttsimple from machine
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, port=MQTT_PORT)
client.connect()
print(f"Connected to MQTT broker: {MQTT_BROKER}")
return client
except Exception as e:
print(f"Could not connect to MQTT: {e}")
time.sleep(5)
return None
def read_and_publish_data(mqtt_client):
"""Reads sensor data, performs checks, and publishes to MQTT."""
# Read raw ADC values and scale to simulated units (V and A)
# The read() function handles the attenuation setting automatically
solar_voltage = adc_solar.read() * VOLTAGE_SCALE
battery_voltage = adc_battery.read() * VOLTAGE_SCALE
load_current = adc_load.read() * CURRENT_SCALE
# --- Calculate derived metrics (Power in kW, Storage in %) ---
# 1. Generation (kW): Solar Power = Solar_V * 5A (simulated max current) / 1000
solar_power_kw = (solar_voltage * 5.0) / 1000.0
# 2. Storage Level (%): Based on voltage (1.5V = 0%, 4.0V = 100%)
battery_level_percent = max(0, min(100, (battery_voltage - 1.5) / 2.5 * 100))
# 3. Consumption (kW): Load Power = Load_I * 220V (simulated AC load voltage) / 1000
community_consumption_kw = (load_current * 220.0) / 1000.0
# Prepare Data Payload with keys REQUIRED BY FLUTTER APP
data = {
"timestamp": time.time(),
"solar_V": round(solar_voltage, 2), # Raw V
"battery_V": round(battery_voltage, 2), # Raw V
"load_A": round(load_current, 2), # Raw A
"generation_kW": round(solar_power_kw, 3), # Derived kW
"storage_pct": round(battery_level_percent, 1), # Derived %
"consumption_kW": round(community_consumption_kw, 3) # Derived kW
}
# Publish data
payload = json.dumps(data)
try:
mqtt_client.publish(DATA_TOPIC, payload.encode('utf-8'))
print(f"Published Data: {payload}")
except Exception as e:
print(f"MQTT Publish Data Error: {e}")
# --- Check Alerts and Publish ---
alerts = []
if solar_voltage < ALERT_LOW_SOLAR_V:
alerts.append(f"CRITICAL: Low Solar V ({solar_voltage:.2f}V). Check panel maintenance.")
if battery_voltage > ALERT_BATTERY_FULL_V:
alerts.append(f"WARNING: Battery Full ({battery_voltage:.2f}V). Optimize load shifting.")
if load_current < ALERT_LOW_CURRENT_I:
alerts.append(f"INFO: Low Load Current ({load_current:.2f}A). System underutilized.")
# Publish Alerts if any (sends the first one as a string, as expected by Flutter)
if alerts:
alert_payload = alerts[0]
try:
mqtt_client.publish(ALERTS_TOPIC, alert_payload.encode('utf-8'))
print(f"Published Alert: {alert_payload}")
except Exception as e:
print(f"MQTT Publish Alert Error: {e}")
# --- Main Loop ---
connect_wifi()
mqtt_client = connect_mqtt()
if mqtt_client:
while True:
try:
read_and_publish_data(mqtt_client)
except Exception as e:
print(f"Main loop error: {e}. Attempting reconnect.")
# Note: For Wokwi, you often need to stop and restart the simulation after a major error
mqtt_client = connect_mqtt()
time.sleep(2) # Publish every 2 seconds