"""
Smart Garden - LCD Display Controller (ESP32 MicroPython)
Subscribe: garden/sensor/light, garden/sensor/temp_humidity, garden/sensor/soil_moisture, garden/sensor/npk
Publish: garden/status/lcd
Display sensor information and alerts on LCD screen
"""
from machine import Pin, I2C, unique_id, reset
import network
import time
import json
import gc
from umqtt.simple import MQTTClient
# WiFi Configuration
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# MQTT Configuration
MQTT_SERVER = "broker.hivemq.com"
MQTT_PORT = 1883
CLIENT_ID = "ESP_CONTROL_LCD_01"
SUBSCRIBE_TOPICS = [
b"garden/sensor/light",
b"garden/sensor/temp_humidity",
b"garden/sensor/soil_moisture",
b"garden/sensor/npk",
b"garden/alerts/#"
]
STATUS_TOPIC = b"garden/status/lcd"
# Hardware Configuration
LCD_SDA_PIN = 21
LCD_SCL_PIN = 22
LCD_I2C_ADDR = 0x27 # Standard LCD address
LED_PIN = 2
# LCD Configuration
LCD_ROWS = 4
LCD_COLS = 20
# Global variables
wifi = None
mqtt_client = None
lcd = None
light = None
mqtt_connected = False
wifi_connected = False
last_activity = time.ticks_ms()
reconnect_attempts = 0
# Sensor data storage
sensor_data = {
"light": {"value": 0, "status": "Unknown", "timestamp": 0},
"temperature": {"value": 0, "humidity": 0, "timestamp": 0},
"soil_moisture": {"value": 0, "percentage": 0, "timestamp": 0},
"npk": {"n": 0, "p": 0, "k": 0, "timestamp": 0}
}
class LCD1602:
"""LCD 1602/2004 driver for I2C"""
def __init__(self, i2c, addr=0x27, cols=16, rows=2):
self.i2c = i2c
self.addr = addr
self.cols = cols
self.rows = rows
self.backlight = True
# LCD commands
self.LCD_CLEARDISPLAY = 0x01
self.LCD_RETURNHOME = 0x02
self.LCD_ENTRYMODESET = 0x04
self.LCD_DISPLAYCONTROL = 0x08
self.LCD_CURSORSHIFT = 0x10
self.LCD_FUNCTIONSET = 0x20
self.LCD_SETCGRAMADDR = 0x40
self.LCD_SETDDRAMADDR = 0x80
# Flags for display entry mode
self.LCD_ENTRYRIGHT = 0x00
self.LCD_ENTRYLEFT = 0x02
self.LCD_ENTRYSHIFTINCREMENT = 0x01
self.LCD_ENTRYSHIFTDECREMENT = 0x00
# Flags for display on/off control
self.LCD_DISPLAYON = 0x04
self.LCD_DISPLAYOFF = 0x00
self.LCD_CURSORON = 0x02
self.LCD_CURSOROFF = 0x00
self.LCD_BLINKON = 0x01
self.LCD_BLINKOFF = 0x00
# Flags for display/cursor shift
self.LCD_DISPLAYMOVE = 0x08
self.LSD_CURSORMOVE = 0x00
self.LCD_MOVERIGHT = 0x04
self.LCD_MOVELEFT = 0x00
# Flags for function set
self.LCD_8BITMODE = 0x10
self.LCD_4BITMODE = 0x00
self.LCD_2LINE = 0x08
self.LCD_1LINE = 0x00
self.LCD_5x10DOTS = 0x04
self.LCD_5x8DOTS = 0x00
self.init()
def init(self):
"""Initialize the LCD"""
try:
# Send initial setup commands
self.write_byte(0x33, 0)
self.write_byte(0x32, 0)
self.write_byte(0x06, 0)
self.write_byte(0x0C, 0)
self.write_byte(0x28, 0)
self.clear()
print("✅ LCD initialized successfully")
except Exception as e:
print("❌ LCD initialization failed:", str(e))
raise
def write_byte(self, data, mode=0):
"""Write a byte to the LCD"""
try:
# Send data with backlight control
if self.backlight:
data |= 0x08 # Set backlight bit
else:
data &= ~0x08 # Clear backlight bit
self.i2c.writeto(self.addr, bytes([data | mode]))
self._pulse_enable(data | mode)
except Exception as e:
print("❌ LCD write byte error:", str(e))
def _pulse_enable(self, data):
"""Pulse the enable pin to write data"""
try:
self.i2c.writeto(self.addr, bytes([data | 0x04])) # Enable high
time.sleep_us(1) # Enable pulse width
self.i2c.writeto(self.addr, bytes([data & ~0x04])) # Enable low
time.sleep_us(50) # Command execution time
except Exception as e:
print("❌ LCD pulse enable error:", str(e))
def clear(self):
"""Clear the display"""
self.write_byte(self.LCD_CLEARDISPLAY)
time.sleep_ms(2)
def home(self):
"""Return cursor to home position"""
self.write_byte(self.LCD_RETURNHOME)
time.sleep_ms(2)
def set_cursor(self, col, row):
"""Set cursor position"""
row_offsets = [0x00, 0x40, 0x14, 0x54]
if row >= self.rows:
row = self.rows - 1
self.write_byte(self.LCD_SETDDRAMADDR | (col + row_offsets[row]))
def display_string(self, string, col=0, row=0):
"""Display a string on the LCD"""
try:
self.set_cursor(col, row)
for char in string:
if char == '\n':
row += 1
if row >= self.rows:
break
self.set_cursor(0, row)
else:
self.write_byte(ord(char), 1)
except Exception as e:
print("❌ LCD display string error:", str(e))
def backlight_on(self):
"""Turn on backlight"""
self.backlight = True
def backlight_off(self):
"""Turn off backlight"""
self.backlight = False
def setup_wifi():
"""Connect to WiFi"""
global wifi, wifi_connected
try:
print("WiFi: Initializing...")
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
print(f"WiFi: Connecting to {WIFI_SSID}")
wifi.connect(WIFI_SSID, WIFI_PASSWORD)
max_attempts = 40
for attempt in range(max_attempts):
if wifi.isconnected():
wifi_connected = True
ip = wifi.ifconfig()[0]
print("✅ WiFi: Connected successfully!")
print("IP:", ip)
return True
if attempt % 4 == 0:
print(f"WiFi: Attempt {attempt+1}/{max_attempts}")
time.sleep(0.5)
wifi_connected = False
print("❌ WiFi: Connection failed")
return False
except Exception as e:
wifi_connected = False
print("❌ WiFi: Setup error:", str(e))
return False
def mqtt_callback(topic, msg):
"""Handle incoming MQTT messages"""
global last_activity
last_activity = time.ticks_ms()
try:
if not topic or not msg:
return
topic_str = topic.decode('utf-8', 'ignore')
message = msg.decode('utf-8', 'ignore')
print(f"[MQTT] Receive: {topic_str}")
# Parse JSON data
try:
data = json.loads(message)
update_sensor_data(topic_str, data)
update_lcd_display()
except json.JSONDecodeError:
print("❌ Invalid JSON data")
except Exception as e:
print("❌ MQTT callback error:", str(e))
def update_sensor_data(topic, data):
"""Update sensor data from MQTT messages"""
try:
if "light" in topic:
sensor_data["light"]["value"] = data.get("l", 0)
sensor_data["light"]["percentage"] = data.get("percentage", 0)
sensor_data["light"]["timestamp"] = time.ticks_ms()
elif "temp_humidity" in topic:
sensor_data["temperature"]["value"] = data.get("t", 0)
sensor_data["temperature"]["humidity"] = data.get("h", 0)
sensor_data["temperature"]["timestamp"] = time.ticks_ms()
elif "soil_moisture" in topic:
sensor_data["soil_moisture"]["value"] = data.get("m", 0)
sensor_data["soil_moisture"]["percentage"] = data.get("percentage", 0)
sensor_data["soil_moisture"]["timestamp"] = time.ticks_ms()
elif "npk" in topic:
sensor_data["npk"]["n"] = data.get("n_mgkg", 0)
sensor_data["npk"]["p"] = data.get("p_mgkg", 0)
sensor_data["npk"]["k"] = data.get("k_mgkg", 0)
sensor_data["npk"]["timestamp"] = time.ticks_ms()
except Exception as e:
print("❌ Update sensor data error:", str(e))
def update_lcd_display():
"""Update LCD display with current sensor data"""
try:
if not lcd:
return
# Clear display
lcd.clear()
# Line 1: Temperature and Humidity
temp = sensor_data["temperature"]["value"]
humidity = sensor_data["temperature"]["humidity"]
lcd.display_string(f"Temp: {temp:.1f}C Hum:{humidity:.0f}%", 0, 0)
# Line 2: Soil Moisture
soil_pct = sensor_data["soil_moisture"]["percentage"]
lcd.display_string(f"Soil: {soil_pct:.0f}%", 0, 1)
# Line 3: Light Level
light_pct = sensor_data["light"]["percentage"]
lcd.display_string(f"Light: {light_pct:.0f}%", 0, 2)
# Line 4: NPK Values
n_val = sensor_data["npk"]["n"]
p_val = sensor_data["npk"]["p"]
k_val = sensor_data["npk"]["k"]
lcd.display_string(f"N:{n_val:.0f} P:{p_val:.0f} K:{k_val:.0f}", 0, 3)
# Flash LED to indicate update
light.on()
time.sleep_ms(100)
light.off()
except Exception as e:
print("❌ LCD display update error:", str(e))
def send_status_report():
"""Send periodic status report"""
global mqtt_connected
try:
if not mqtt_connected:
return
status = {
"device_id": CLIENT_ID,
"status": "ONLINE",
"display_rows": LCD_ROWS,
"display_cols": LCD_COLS,
"uptime": time.ticks_ms(),
"rssi": wifi.signal_strength() if wifi_connected else 0,
"mqtt_connected": mqtt_connected,
"wifi_connected": wifi_connected,
"sensor_data": sensor_data
}
status_str = json.dumps(status)
mqtt_client.publish(STATUS_TOPIC, status_str.encode())
except Exception as e:
print("❌ Status report error:", str(e))
def setup_hardware():
"""Initialize hardware"""
global lcd, light
try:
print("=== Smart Garden LCD Display Controller ===")
print("Client ID:", CLIENT_ID)
print("Device ID:", unique_id())
# Initialize LED
light = Pin(LED_PIN, Pin.OUT)
light.off()
print("✅ LED initialized")
# Initialize I2C
i2c = I2C(0, scl=Pin(LCD_SCL_PIN), sda=Pin(LCD_SDA_PIN), freq=100000)
# Scan I2C bus for LCD
devices = i2c.scan()
if LCD_I2C_ADDR in devices:
print(f"✅ LCD found at address 0x{LCD_I2C_ADDR:02X}")
else:
print(f"❌ LCD not found at address 0x{LCD_I2C_ADDR:02X}")
print("Available devices:", [hex(d) for d in devices])
# Initialize LCD
lcd = LCD1602(i2c, LCD_I2C_ADDR, LCD_COLS, LCD_ROWS)
# Display initial message
lcd.display_string("Smart Garden LCD", 0, 0)
lcd.display_string("Initializing...", 0, 1)
lcd.display_string("Please wait...", 0, 2)
print("✅ Hardware: All components initialized")
except Exception as e:
print("❌ Hardware setup error:", str(e))
raise
def setup_mqtt():
"""Initialize MQTT connection"""
global mqtt_client, mqtt_connected, reconnect_attempts
try:
if not wifi_connected:
return False
print("MQTT: Connecting to broker...")
mqtt_client = MQTTClient(
CLIENT_ID,
MQTT_SERVER,
MQTT_PORT,
keepalive=60
)
mqtt_client.set_callback(mqtt_callback)
max_retries = 3
for attempt in range(max_retries):
try:
mqtt_client.connect(clean_session=True)
mqtt_connected = True
reconnect_attempts = 0
print("✅ MQTT: Connected successfully")
# Subscribe to all sensor topics
for topic in SUBSCRIBE_TOPICS:
mqtt_client.subscribe(topic)
print(f"MQTT: Subscribed to {topic.decode()}")
# Send initial status
mqtt_client.publish(STATUS_TOPIC, b"ONLINE")
# Send device info
info = {
"device_id": CLIENT_ID,
"type": "lcd_display_controller",
"version": "1.0-micropython",
"display": f"{LCD_COLS}x{LCD_ROWS}",
"ip": wifi.ifconfig()[0],
"rssi": wifi.signal_strength(),
"uptime": time.ticks_ms()
}
info_str = json.dumps(info)
mqtt_client.publish(b"garden/system/device_info", info_str.encode())
return True
except Exception as conn_error:
print(f"❌ MQTT connection attempt {attempt+1} failed:", str(conn_error))
reconnect_attempts += 1
if attempt < max_retries - 1:
time.sleep(3)
else:
mqtt_connected = False
return False
except Exception as e:
mqtt_connected = False
print("❌ MQTT setup error:", str(e))
return False
def main():
"""Main function"""
global last_activity, mqtt_connected, wifi_connected
print("🚀 Starting LCD Display Controller...")
# Collect garbage
gc.collect()
# Initialize hardware
setup_hardware()
# Connect WiFi
if not setup_wifi():
print("❌ WiFi: Failed to connect")
for retry in range(3):
time.sleep(10)
if setup_wifi():
break
else:
print("❌ WiFi: Permanent failure. Rebooting...")
reset()
# Connect MQTT
if not setup_mqtt():
print("❌ MQTT: Failed to connect")
print("✅ LCD Controller is ready!")
# Main loop
last_status_report = time.ticks_ms()
while True:
try:
current_time = time.ticks_ms()
# Health monitoring
if time.ticks_diff(current_time, last_activity) > 300000:
print("⚠️ Health: No activity for 5 minutes")
send_status_report()
last_activity = current_time
# MQTT processing
if mqtt_connected and mqtt_client:
try:
mqtt_client.check_msg()
except Exception as mqtt_error:
print(f"❌ MQTT processing error: {str(mqtt_error)}")
mqtt_connected = False
# WiFi health check
if wifi and wifi.isconnected() != wifi_connected:
wifi_connected = wifi.isconnected()
if not wifi_connected:
setup_wifi()
# Status report every 5 minutes
if time.ticks_diff(current_time, last_status_report) > 300000:
send_status_report()
last_status_report = current_time
time.sleep_ms(100)
except KeyboardInterrupt:
print("\n🛑 Program interrupted")
break
except Exception as e:
print("❌ Main loop error:", str(e))
time.sleep(2)
if __name__ == "__main__":
try:
main()
except Exception as e:
print("💥 Fatal error:", str(e))
print("🔄 System will reboot...")
time.sleep(10)
reset()