# # from machine import I2C, Pin, ADC, PWM
# # from time import sleep
# # import network
# # import json
# # # from umqtt.simple import MQTTClient
# # from simple import MQTTClient
# # from dht import DHT22
# # from pico_i2c_lcd import I2cLcd
# # import random
# # import utime
# # # WiFi and MQTT settings
# # WIFI_SSID = "Wokwi-GUEST"
# # WIFI_PASSWORD = ""
# # MQTT_SERVER = "demo.thingsboard.io"
# # MQTT_PORT = 1883
# # MQTT_TOPIC = "v1/devices/me/telemetry"
# # MQTT_TOKEN = "5i0yrgch2932q5yh5ep9" # Replace with your ThingsBoard device token
# # # Connect to WiFi
# # def connect_wifi():
# # wlan = network.WLAN(network.STA_IF)
# # wlan.active(True)
# # wlan.connect(WIFI_SSID, WIFI_PASSWORD)
# # # Wait for connection
# # max_wait = 10
# # while max_wait > 0:
# # if wlan.status() < 0 or wlan.status() >= 3:
# # break
# # max_wait -= 1
# # print("Waiting for connection...")
# # sleep(1)
# # if wlan.status() != 3:
# # print("Network connection failed")
# # return False
# # else:
# # status = wlan.ifconfig()
# # print(f"Connected to WiFi. IP: {status[0]}")
# # return True
# # # Connect to MQTT
# # def connect_mqtt():
# # client = MQTTClient("pico_irrigation", MQTT_SERVER, MQTT_PORT, MQTT_TOKEN, "")
# # client.connect()
# # print("Connected to ThingsBoard MQTT broker")
# # return client
# # def scrolling_text(lcd, text, line=3, speed=0.3):
# # """
# # Create a scrolling text effect on the LCD
# # :param lcd: LCD display object
# # :param text: Text to scroll
# # :param line: Line to display scrolling text
# # :param speed: Delay between scrolling (seconds)
# # """
# # # Pad the text to create scrolling effect
# # padded_text = " " + text + " "
# # # Scroll the text
# # for start in range(len(padded_text) - 20):
# # # Clear the specific line
# # lcd.move_to(0, line)
# # lcd.putstr(" " * 20) # Clear the line
# # # Display scrolling segment
# # lcd.move_to(0, line)
# # lcd.putstr(padded_text[start:start+20])
# # # Small delay to control scrolling speed
# # sleep(speed)
# # # Initialize DHT22 Sensor
# # dht = DHT22(Pin(15))
# # # Initialize First I2C LCD (Sensor Data)
# # i2c1 = I2C(0, scl=Pin(5), sda=Pin(4), freq=100000)
# # LCD1_ADDR = i2c1.scan()[0]
# # lcd1 = I2cLcd(i2c1, LCD1_ADDR, 4, 20)
# # # Initialize Second I2C LCD (Weather/Additional Info)
# # i2c2 = I2C(1, scl=Pin(7), sda=Pin(6), freq=100000)
# # LCD2_ADDR = i2c2.scan()[0]
# # lcd2 = I2cLcd(i2c2, LCD2_ADDR, 4, 20)
# # # Initialize Potentiometer (Simulating Soil Moisture) on GP26
# # soil_moisture = ADC(Pin(26))
# # # Initialize Servo Motor on GP14
# # servo = PWM(Pin(14))
# # servo.freq(50) # Standard 50Hz PWM frequency for servos
# # # Initialize LED Indicator on GP13
# # led = Pin(13, Pin.OUT)
# # class WeatherSimulator:
# # def __init__(self):
# # self.conditions = [
# # "Clear Sky", "Partly Cloudy", "Cloudy",
# # "Light Rain", "Moderate Rain", "Thunderstorm"
# # ]
# # self.heavy_rain_alert = False
# # def generate_forecast(self):
# # """
# # Generate detailed weather forecast simulation
# # """
# # forecast = {
# # 'temperature': round(random.uniform(15.0, 30.0), 1),
# # 'humidity': random.randint(30, 90),
# # 'condition': random.choice(self.conditions),
# # 'wind_speed': round(random.uniform(0.0, 10.0), 1),
# # 'precipitation_chance': random.randint(0, 100)
# # }
# # # Set heavy rain alert flag
# # self.heavy_rain_alert = forecast['precipitation_chance'] > 75
# # return forecast
# # def display_forecast(self, lcd):
# # """
# # Display forecast on LCD
# # """
# # forecast = self.generate_forecast()
# # lcd.clear()
# # lcd.putstr("{} Weather".format(forecast['condition']))
# # lcd.move_to(0, 1)
# # lcd.putstr("Humidity: {}%".format(forecast['humidity']))
# # lcd.move_to(0, 2)
# # lcd.putstr("Wind:{:.1f}m/s Precip:{}%".format(forecast['wind_speed'],forecast['precipitation_chance']))
# # if self.heavy_rain_alert:
# # scrolling_text(lcd, f"HEAVY RAIN ALERT! {forecast['precipitation_chance']}%", line=3)
# # else:
# # scrolling_text(lcd, "Normal Weather", line=3)
# # return forecast
# # def log_forecast(self, forecast):
# # """
# # Log forecast data to a file
# # """
# # try:
# # current_time = utime.localtime()
# # filename = 'weather_forecast_{:04d}{:02d}{:02d}.csv'.format(
# # current_time[0], current_time[1], current_time[2]
# # )
# # with open(filename, 'a') as f:
# # if f.tell() == 0:
# # f.write("Timestamp,Temperature,Humidity,Condition,Wind_Speed,Precipitation_Chance\n")
# # timestamp = "{:04d}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}".format(*current_time[:6])
# # f.write("{},{:.1f},{},{},{:.1f},{}\n".format(
# # timestamp,
# # forecast['temperature'],
# # forecast['humidity'],
# # forecast['condition'],
# # forecast['wind_speed'],
# # forecast['precipitation_chance']
# # ))
# # print("Forecast logged successfully")
# # except Exception as e:
# # print("Error logging forecast:", str(e))
# # def estimate_biomass(temp, moisture, humidity):
# # """
# # Estimate biomass potential based on environmental factors
# # Returns a value between 0-100 (percentage)
# # """
# # # Normalize all values to 0-1 range
# # temp_norm = (temp - 10) / 30 # Assume optimal range 10-40°C
# # moisture_norm = moisture / 100
# # humidity_norm = humidity / 100
# # # Apply weights to each factor (adjust these based on your needs)
# # temp_weight = 0.4
# # moisture_weight = 0.3
# # humidity_weight = 0.3
# # # Calculate biomass score (0-100)
# # biomass = (
# # (temp_norm * temp_weight) +
# # (moisture_norm * moisture_weight) +
# # (humidity_norm * humidity_weight)
# # ) * 100
# # # Constrain to 0-100 range
# # biomass = max(0, min(100, biomass))
# # return biomass
# # def map_value(value, from_min, from_max, to_min, to_max):
# # """Map value from one range to another."""
# # return to_min + (to_max - to_min) * (value - from_min) / (from_max - from_min)
# # def set_servo_angle(angle):
# # """Convert angle (0-180) to PWM duty cycle and set servo position."""
# # duty = int(map_value(angle, 0, 180, 1638, 8192))
# # servo.duty_u16(duty)
# # # Display system init message
# # lcd1.clear()
# # lcd1.putstr("System Initializing")
# # lcd1.move_to(0, 1)
# # lcd1.putstr("Connecting to WiFi...")
# # # Connect to WiFi
# # if not connect_wifi():
# # lcd1.clear()
# # lcd1.putstr("WiFi Failed!")
# # # Continue in offline mode if WiFi fails
# # mqtt_client = None
# # else:
# # lcd1.move_to(0, 2)
# # lcd1.putstr("Connecting to MQTT...")
# # try:
# # mqtt_client = connect_mqtt()
# # lcd1.move_to(0, 3)
# # lcd1.putstr("Connected!")
# # sleep(1)
# # except Exception as e:
# # lcd1.move_to(0, 3)
# # lcd1.putstr("MQTT Failed!")
# # print("MQTT Connection failed:", str(e))
# # mqtt_client = None
# # sleep(1)
# # # Initialize Weather Simulator
# # weather_simulator = WeatherSimulator()
# # # Tracking for weather display and MQTT updates
# # last_weather_update = 0
# # last_mqtt_update = 0
# # WEATHER_UPDATE_INTERVAL = 60 # Update every minute (for demo purposes)
# # MQTT_UPDATE_INTERVAL = 10 # Send data every 10 seconds
# # # Prepare data for first display
# # lcd1.clear()
# # lcd1.putstr("Starting sensors...")
# # sleep(1)
# # while True:
# # try:
# # # Read sensor data
# # dht.measure()
# # temp = dht.temperature()
# # hum = dht.humidity()
# # moisture_raw = soil_moisture.read_u16()
# # moisture_percent = map_value(moisture_raw, 65535, 0, 0, 100)
# # # Estimate biomass
# # biomass = estimate_biomass(temp, moisture_percent, hum)
# # biomass_status = "Low" if biomass < 40 else "Good" if biomass < 70 else "Excellent"
# # # Get weather forecast
# # current_time = utime.time()
# # if current_time - last_weather_update >= WEATHER_UPDATE_INTERVAL:
# # forecast = weather_simulator.display_forecast(lcd2)
# # weather_simulator.log_forecast(forecast)
# # last_weather_update = current_time
# # # Determine irrigation status - don't irrigate if heavy rain is coming
# # if weather_simulator.heavy_rain_alert:
# # irrigation_status = "RAIN COMING"
# # print("Heavy rain alert! Irrigation OFF")
# # set_servo_angle(0)
# # led.value(0)
# # irrigation_active = False
# # elif moisture_percent < 30:
# # irrigation_status = "ACTIVE"
# # print("Soil Dry! Irrigation ON")
# # set_servo_angle(90)
# # led.value(1)
# # irrigation_active = True
# # else:
# # irrigation_status = "IDLE"
# # print("Soil Moist! Irrigation OFF")
# # set_servo_angle(0)
# # led.value(0)
# # irrigation_active = False
# # # Display on LCD1 - Optimized layout
# # lcd1.clear()
# # lcd1.putstr("Temp:{:.1f}C Hum:{:.0f}%".format(temp, hum)) # Line 0: Temp and Humidity
# # lcd1.move_to(0, 1)
# # lcd1.putstr("Moist:{:.0f}% Biomass:{:.0f}%".format(moisture_percent, biomass)) # Line 1: Moisture and Biomass
# # lcd1.move_to(0, 2)
# # lcd1.putstr("Biomass: {}".format(biomass_status)) # Line 2: Biomass status
# # lcd1.move_to(0, 3)
# # # Special display for rain coming
# # if weather_simulator.heavy_rain_alert:
# # scrolling_text(lcd1, "HEAVY RAIN SOON - IRRIGATION OFF", line=3, speed=0.2)
# # else:
# # lcd1.putstr("Irrigation: {}".format(irrigation_status)) # Line 3: Irrigation status
# # # Send data to ThingsBoard via MQTT
# # if mqtt_client and (current_time - last_mqtt_update >= MQTT_UPDATE_INTERVAL):
# # # Create ThingsBoard compatible JSON payload
# # payload = {
# # "temperature": temp,
# # "humidity": hum,
# # "soil_moisture": moisture_percent,
# # "biomass": biomass,
# # "irrigation": "ON" if irrigation_active else "OFF",
# # "weather_condition": forecast['condition'],
# # "precipitation_chance": forecast['precipitation_chance'],
# # "wind_speed": forecast['wind_speed']
# # }
# # # Convert to JSON string
# # json_payload = json.dumps(payload)
# # # Publish to ThingsBoard
# # try:
# # mqtt_client.publish(MQTT_TOPIC, json_payload)
# # print("Data sent to ThingsBoard:", json_payload)
# # last_mqtt_update = current_time
# # except Exception as e:
# # print("MQTT publish error:", str(e))
# # # Try to reconnect
# # try:
# # mqtt_client.disconnect()
# # mqtt_client = connect_mqtt()
# # except:
# # mqtt_client = None
# # print("MQTT reconnection failed")
# # except Exception as e:
# # print("Error in main loop:", str(e))
# # sleep(1)
# # # from machine import I2C, Pin, ADC, PWM
# # # from time import sleep
# # # import network
# # # import json
# # # from simple import MQTTClient
# # # from dht import DHT22
# # # from pico_i2c_lcd import I2cLcd
# # # import random
# # # import utime
# # # # WiFi and MQTT settings
# # # WIFI_SSID = "Wokwi-GUEST"
# # # WIFI_PASSWORD = ""
# # # MQTT_SERVER = "demo.thingsboard.io"
# # # MQTT_PORT = 1883
# # # MQTT_TOPIC = "v1/devices/me/telemetry"
# # # MQTT_TOKEN = "5i0yrgch2932q5yh5ep9" # Replace with your ThingsBoard device token
# # # # Connect to WiFi
# # # def connect_wifi():
# # # wlan = network.WLAN(network.STA_IF)
# # # wlan.active(True)
# # # wlan.connect(WIFI_SSID, WIFI_PASSWORD)
# # # # Wait for connection
# # # max_wait = 10
# # # while max_wait > 0:
# # # if wlan.status() < 0 or wlan.status() >= 3:
# # # break
# # # max_wait -= 1
# # # print("Waiting for connection...")
# # # sleep(1)
# # # if wlan.status() != 3:
# # # print("Network connection failed")
# # # return False
# # # else:
# # # status = wlan.ifconfig()
# # # print(f"Connected to WiFi. IP: {status[0]}")
# # # return True
# # # # Connect to MQTT
# # # def connect_mqtt():
# # # client = MQTTClient("pico_irrigation", MQTT_SERVER, MQTT_PORT, MQTT_TOKEN, "")
# # # client.connect()
# # # print("Connected to ThingsBoard MQTT broker")
# # # return client
# # # def scrolling_text(lcd, text, line=3, speed=0.3):
# # # """
# # # Create a scrolling text effect on the LCD
# # # :param lcd: LCD display object
# # # :param text: Text to scroll
# # # :param line: Line to display scrolling text
# # # :param speed: Delay between scrolling (seconds)
# # # """
# # # # Pad the text to create scrolling effect
# # # padded_text = " " + text + " "
# # # # Scroll the text
# # # for start in range(len(padded_text) - 20):
# # # # Clear the specific line
# # # lcd.move_to(0, line)
# # # lcd.putstr(" " * 20) # Clear the line
# # # # Display scrolling segment
# # # lcd.move_to(0, line)
# # # lcd.putstr(padded_text[start:start+20])
# # # # Small delay to control scrolling speed
# # # sleep(speed)
# # # def map_value(value, from_min, from_max, to_min, to_max):
# # # """Map value from one range to another."""
# # # return to_min + (to_max - to_min) * (value - from_min) / (from_max - from_min)
# # # def set_servo_angle(angle):
# # # """Convert angle (0-180) to PWM duty cycle and set servo position."""
# # # duty = int(map_value(angle, 0, 180, 1638, 8192))
# # # servo.duty_u16(duty)
# # # class WeatherSimulator:
# # # def __init__(self):
# # # self.conditions = [
# # # "Clear Sky", "Partly Cloudy", "Cloudy",
# # # "Light Rain", "Moderate Rain", "Thunderstorm"
# # # ]
# # # self.heavy_rain_alert = False
# # # def generate_forecast(self):
# # # """
# # # Generate detailed weather forecast simulation
# # # """
# # # forecast = {
# # # 'temperature': round(random.uniform(15.0, 30.0), 1),
# # # 'humidity': random.randint(30, 90),
# # # 'condition': random.choice(self.conditions),
# # # 'wind_speed': round(random.uniform(0.0, 10.0), 1),
# # # 'precipitation_chance': random.randint(0, 100)
# # # }
# # # # Set heavy rain alert flag
# # # self.heavy_rain_alert = forecast['precipitation_chance'] > 75
# # # return forecast
# # # def display_forecast(self, lcd):
# # # """
# # # Display forecast on LCD
# # # """
# # # forecast = self.generate_forecast()
# # # lcd.clear()
# # # lcd.putstr("{} Weather".format(forecast['condition']))
# # # lcd.move_to(0, 1)
# # # lcd.putstr("Humidity: {}%".format(forecast['humidity']))
# # # lcd.move_to(0, 2)
# # # lcd.putstr("Wind:{:.1f}m/s Precip:{}%".format(forecast['wind_speed'],forecast['precipitation_chance']))
# # # if self.heavy_rain_alert:
# # # scrolling_text(lcd, f"HEAVY RAIN ALERT! {forecast['precipitation_chance']}%", line=3)
# # # else:
# # # scrolling_text(lcd, "Normal Weather", line=3)
# # # return forecast
# # # def log_forecast(self, forecast):
# # # """
# # # Log forecast data to a file
# # # """
# # # try:
# # # current_time = utime.localtime()
# # # filename = 'weather_forecast_{:04d}{:02d}{:02d}.csv'.format(
# # # current_time[0], current_time[1], current_time[2]
# # # )
# # # with open(filename, 'a') as f:
# # # if f.tell() == 0:
# # # f.write("Timestamp,Temperature,Humidity,Condition,Wind_Speed,Precipitation_Chance\n")
# # # timestamp = "{:04d}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}".format(*current_time[:6])
# # # f.write("{},{:.1f},{},{},{:.1f},{}\n".format(
# # # timestamp,
# # # forecast['temperature'],
# # # forecast['humidity'],
# # # forecast['condition'],
# # # forecast['wind_speed'],
# # # forecast['precipitation_chance']
# # # ))
# # # print("Forecast logged successfully")
# # # except Exception as e:
# # # print("Error logging forecast:", str(e))
# # # class PlantDiseaseRecognition:
# # # def __init__(self):
# # # self.disease_database = [
# # # {"name": "Early Blight", "confidence": 0.82, "treatment": "Remove infected leaves"},
# # # {"name": "Late Blight", "confidence": 0.79, "treatment": "Copper fungicide"},
# # # {"name": "Bacterial Spot", "confidence": 0.75, "treatment": "Copper spray"},
# # # {"name": "Powdery Mildew", "confidence": 0.88, "treatment": "Increase airflow"},
# # # {"name": "Tomato Yellow Leaf", "confidence": 0.91, "treatment": "Control whiteflies"},
# # # {"name": "Healthy", "confidence": 0.95, "treatment": "No treatment needed"}
# # # ]
# # # self.last_diagnosis = {"name": "Healthy", "confidence": 0.95, "treatment": "No treatment needed"}
# # # self.last_check = 0
# # # self.check_interval = 100 # Check every 5 minutes (in seconds)
# # # def analyze_image(self, temp, humidity, moisture, light=50):
# # # """Simulate analyzing an image for plant diseases based on conditions"""
# # # current_time = utime.time()
# # # if current_time - self.last_check < self.check_interval:
# # # return self.last_diagnosis
# # # self.last_check = current_time
# # # # Environmental factors affect disease likelihood
# # # disease_risk = 0
# # # # Temperature and humidity combinations
# # # if temp > 25 and humidity > 80:
# # # # Warm and humid - high disease risk
# # # disease_risk += 0.7
# # # print("High disease risk: warm & humid conditions")
# # # elif temp > 30:
# # # # Very hot - moderate disease risk
# # # disease_risk += 0.4
# # # print("Moderate disease risk: hot conditions")
# # # elif humidity > 90:
# # # # Very humid - high disease risk
# # # disease_risk += 0.6
# # # print("High disease risk: very humid conditions")
# # # # Soil moisture impact
# # # if moisture > 90:
# # # # Overwatered - moderate disease risk
# # # disease_risk += 0.5
# # # print("Moderate disease risk: overly wet soil")
# # # # Light level impact
# # # if light < 30:
# # # # Low light increases disease risk for some pathogens
# # # disease_risk += 0.3
# # # print("Increased disease risk: low light conditions")
# # # # Base chance for healthy plants
# # # base_healthy_chance = 0.6 - disease_risk
# # # # Determine if plant is diseased based on environmental risk factors
# # # if random.random() < disease_risk:
# # # # Select potential diseases based on conditions
# # # potential_diseases = []
# # # if temp > 25 and humidity > 80:
# # # potential_diseases.append("Late Blight")
# # # if humidity > 85:
# # # potential_diseases.append("Powdery Mildew")
# # # if light < 30 and humidity > 70:
# # # potential_diseases.append("Bacterial Spot")
# # # if temp > 28:
# # # potential_diseases.append("Early Blight")
# # # if temp > 30 and light > 80:
# # # potential_diseases.append("Tomato Yellow Leaf")
# # # # If we have condition-specific diseases, select from them
# # # if potential_diseases:
# # # disease_name = random.choice(potential_diseases)
# # # selected_disease = next((d for d in self.disease_database if d["name"] == disease_name),
# # # self.disease_database[0])
# # # else:
# # # # Otherwise pick a random disease
# # # selected_disease = random.choice(self.disease_database[:-1])
# # # # Adjust confidence slightly for realism
# # # selected_disease = selected_disease.copy() # Create a copy to avoid modifying the original
# # # selected_disease["confidence"] = round(selected_disease["confidence"] * random.uniform(0.95, 1.05), 2)
# # # selected_disease["confidence"] = min(0.98, selected_disease["confidence"]) # Cap at 98%
# # # else:
# # # # Healthy plant
# # # selected_disease = self.disease_database[-1].copy()
# # # selected_disease["confidence"] = round(selected_disease["confidence"] * random.uniform(0.97, 1.0), 2)
# # # # Store and return diagnosis
# # # self.last_diagnosis = selected_disease
# # # return selected_disease
# # # def display_diagnosis(self, lcd):
# # # """Display diagnosis results on LCD"""
# # # lcd.clear()
# # # lcd.putstr("Plant Health Analysis:")
# # # lcd.move_to(0, 1)
# # # lcd.putstr("Status: {}".format(self.last_diagnosis["name"]))
# # # lcd.move_to(0, 2)
# # # lcd.putstr("Conf: {:.0f}%".format(self.last_diagnosis["confidence"] * 100))
# # # lcd.move_to(0, 3)
# # # lcd.putstr("Rx: {}".format(self.last_diagnosis["treatment"][:16]))
# # # def estimate_biomass(temp, moisture, humidity, light=50):
# # # """
# # # Estimate biomass potential based on environmental factors
# # # Returns a value between 0-100 (percentage)
# # # """
# # # # Normalize all values to 0-1 range
# # # temp_norm = (temp - 10) / 30 # Assume optimal range 10-40°C
# # # moisture_norm = moisture / 100
# # # humidity_norm = humidity / 100
# # # light_norm = light / 100
# # # # Apply weights to each factor
# # # temp_weight = 0.3
# # # moisture_weight = 0.25
# # # humidity_weight = 0.2
# # # light_weight = 0.25
# # # # Calculate biomass score (0-100)
# # # biomass = (
# # # (temp_norm * temp_weight) +
# # # (moisture_norm * moisture_weight) +
# # # (humidity_norm * humidity_weight) +
# # # (light_norm * light_weight)
# # # ) * 100
# # # # Constrain to 0-100 range
# # # biomass = max(0, min(100, biomass))
# # # return biomass
# # # # Initialize sensors
# # # dht = DHT22(Pin(15)) # DHT22 Temperature/Humidity Sensor
# # # soil_moisture = ADC(Pin(26)) # Soil Moisture Sensor (Potentiometer)
# # # light_sensor = ADC(Pin(27)) # Light Sensor (Photoresistor)
# # # # Initialize output devices
# # # servo = PWM(Pin(14)) # Servo for water valve control
# # # servo.freq(50) # 50Hz PWM frequency for servos
# # # led = Pin(13, Pin.OUT) # LED indicator
# # # # Initialize I2C LCDs
# # # i2c1 = I2C(0, scl=Pin(5), sda=Pin(4), freq=100000) # First LCD - Sensor Data
# # # LCD1_ADDR = i2c1.scan()[0]
# # # lcd1 = I2cLcd(i2c1, LCD1_ADDR, 4, 20)
# # # i2c2 = I2C(1, scl=Pin(7), sda=Pin(6), freq=100000) # Second LCD - Weather/Plant Health
# # # LCD2_ADDR = i2c2.scan()[0]
# # # lcd2 = I2cLcd(i2c2, LCD2_ADDR, 4, 20)
# # # # Initialize system components
# # # weather_simulator = WeatherSimulator()
# # # plant_analyzer = PlantDiseaseRecognition()
# # # # System timing variables
# # # last_weather_update = 0
# # # last_plant_health_display = 0
# # # last_mqtt_update = 0
# # # # Timing constants
# # # WEATHER_UPDATE_INTERVAL = 60 # Weather update interval (seconds)
# # # PLANT_HEALTH_INTERVAL = 20 # Plant health display interval (seconds)
# # # MQTT_UPDATE_INTERVAL = 10 # MQTT data send interval (seconds)
# # # DISPLAY_TOGGLE_INTERVAL = 10 # Toggle between display modes (seconds)
# # # # System startup sequence
# # # lcd1.clear()
# # # lcd1.putstr("Smart Irrigation v2.0")
# # # lcd1.move_to(0, 1)
# # # lcd1.putstr("Initializing...")
# # # # Connect to WiFi
# # # lcd1.move_to(0, 2)
# # # lcd1.putstr("Connecting to WiFi...")
# # # if not connect_wifi():
# # # lcd1.clear()
# # # lcd1.putstr("WiFi Failed!")
# # # mqtt_client = None
# # # else:
# # # lcd1.move_to(0, 3)
# # # lcd1.putstr("WiFi Connected!")
# # # sleep(1)
# # # # Connect to MQTT
# # # lcd1.move_to(0, 3)
# # # lcd1.putstr("Connecting to MQTT...")
# # # try:
# # # mqtt_client = connect_mqtt()
# # # lcd1.move_to(0, 3)
# # # lcd1.putstr("MQTT Connected!")
# # # except Exception as e:
# # # lcd1.move_to(0, 3)
# # # lcd1.putstr("MQTT Failed!")
# # # print("MQTT Connection failed:", str(e))
# # # mqtt_client = None
# # # sleep(2)
# # # lcd1.clear()
# # # lcd1.putstr("Starting sensors...")
# # # lcd2.clear()
# # # lcd2.putstr("Plant Health Monitoring")
# # # sleep(1)
# # # # Main loop
# # # while True:
# # # try:
# # # # Read sensor data
# # # dht.measure()
# # # temp = dht.temperature()
# # # hum = dht.humidity()
# # # moisture_raw = soil_moisture.read_u16()
# # # moisture_percent = map_value(moisture_raw, 65535, 0, 0, 100)
# # # light_raw = light_sensor.read_u16()
# # # light_percent = map_value(light_raw, 65535, 0, 0, 100)
# # # # Current time
# # # current_time = utime.time()
# # # # Estimate biomass (now includes light)
# # # biomass = estimate_biomass(temp, moisture_percent, hum, light_percent)
# # # biomass_status = "Low" if biomass < 40 else "Good" if biomass < 70 else "Excellent"
# # # # Update weather forecast periodically
# # # if current_time - last_weather_update >= WEATHER_UPDATE_INTERVAL:
# # # forecast = weather_simulator.display_forecast(lcd2)
# # # weather_simulator.log_forecast(forecast)
# # # last_weather_update = current_time
# # # # Update plant health analysis periodically
# # # # Only show when we're not displaying weather (alternate displays)
# # # if current_time - last_plant_health_display >= PLANT_HEALTH_INTERVAL:
# # # # Only update if weather display has been shown for a few seconds
# # # if current_time - last_weather_update >= 5:
# # # plant_diagnosis = plant_analyzer.analyze_image(temp, hum, moisture_percent, light_percent)
# # # plant_analyzer.display_diagnosis(lcd2)
# # # print(f"Plant health: {plant_diagnosis['name']} ({plant_diagnosis['confidence']*100:.0f}% confidence)")
# # # last_plant_health_display = current_time
# # # # Determine irrigation status and control valve
# # # if weather_simulator.heavy_rain_alert:
# # # irrigation_status = "RAIN COMING"
# # # print("Heavy rain alert! Irrigation OFF")
# # # set_servo_angle(0) # Close valve
# # # led.value(0) # Turn off LED
# # # irrigation_active = False
# # # elif moisture_percent < 30:
# # # irrigation_status = "ACTIVE"
# # # print("Soil Dry! Irrigation ON")
# # # set_servo_angle(90) # Open valve
# # # led.value(1) # Turn on LED
# # # irrigation_active = True
# # # else:
# # # irrigation_status = "IDLE"
# # # print("Soil Moist! Irrigation OFF")
# # # set_servo_angle(0) # Close valve
# # # led.value(0) # Turn off LED
# # # irrigation_active = False
# # # # Alternate between two display layouts on LCD1
# # # if (current_time % (DISPLAY_TOGGLE_INTERVAL * 2)) < DISPLAY_TOGGLE_INTERVAL:
# # # # First display layout - Main environmental data
# # # lcd1.clear()
# # # lcd1.putstr("Temp:{:.1f}C Hum:{:.0f}%".format(temp, hum))
# # # lcd1.move_to(0, 1)
# # # lcd1.putstr("Moist:{:.0f}% Light:{:.0f}%".format(moisture_percent, light_percent))
# # # lcd1.move_to(0, 2)
# # # lcd1.putstr("Biomass: {:.0f}% ({})".format(biomass, biomass_status))
# # # lcd1.move_to(0, 3)
# # # lcd1.putstr("Irrigation: {}".format(irrigation_status))
# # # else:
# # # # Second display layout - Plant health and irrigation status
# # # lcd1.clear()
# # # lcd1.putstr("Plant Health Status:")
# # # lcd1.move_to(0, 1)
# # # lcd1.putstr("Status: {}".format(plant_analyzer.last_diagnosis["name"]))
# # # lcd1.move_to(0, 2)
# # # lcd1.putstr("Conf: {:.0f}%".format(plant_analyzer.last_diagnosis["confidence"] * 100))
# # # lcd1.move_to(0, 3)
# # # # Show special alerts when necessary
# # # if weather_simulator.heavy_rain_alert:
# # # scrolling_text(lcd1, "HEAVY RAIN SOON - IRRIGATION OFF", line=3, speed=0.2)
# # # elif plant_analyzer.last_diagnosis["name"] != "Healthy":
# # # scrolling_text(lcd1, "TREATMENT: {}".format(plant_analyzer.last_diagnosis["treatment"]), line=3, speed=0.2)
# # # else:
# # # lcd1.putstr("Status: Healthy")
# # # # Send data to ThingsBoard via MQTT
# # # if mqtt_client and (current_time - last_mqtt_update >= MQTT_UPDATE_INTERVAL):
# # # # Create ThingsBoard compatible JSON payload with all sensor data
# # # payload = {
# # # "temperature": temp,
# # # "humidity": hum,
# # # "soil_moisture": moisture_percent,
# # # "light_level": light_percent,
# # # "biomass": biomass,
# # # "biomass_status": biomass_status,
# # # "irrigation": "ON" if irrigation_active else "OFF",
# # # "weather_condition": forecast['condition'],
# # # "precipitation_chance": forecast['precipitation_chance'],
# # # "wind_speed": forecast['wind_speed'],
# # # "plant_health": plant_analyzer.last_diagnosis["name"],
# # # "plant_health_confidence": plant_analyzer.last_diagnosis["confidence"] * 100,
# # # "treatment_recommendation": plant_analyzer.last_diagnosis["treatment"]
# # # }
# # # # Convert to JSON string
# # # json_payload = json.dumps(payload)
# # # # Publish to ThingsBoard
# # # try:
# # # mqtt_client.publish(MQTT_TOPIC, json_payload)
# # # print("Data sent to ThingsBoard:", json_payload)
# # # last_mqtt_update = current_time
# # # except Exception as e:
# # # print("MQTT publish error:", str(e))
# # # # Try to reconnect
# # # try:
# # # mqtt_client.disconnect()
# # # mqtt_client = connect_mqtt()
# # # except:
# # # mqtt_client = None
# # # print("MQTT reconnection failed")
# # # except Exception as e:
# # # print("Error in main loop:", str(e))
# # # sleep(1)
# from machine import I2C, Pin, ADC, PWM
# from time import sleep
# import network
# import json
# # from umqtt.simple import MQTTClient
# from simple import MQTTClient
# from dht import DHT22
# from pico_i2c_lcd import I2cLcd
# import random
# import utime
# # WiFi and MQTT settings
# WIFI_SSID = "Wokwi-GUEST"
# WIFI_PASSWORD = ""
# MQTT_SERVER = "demo.thingsboard.io"
# MQTT_PORT = 1883
# MQTT_TOPIC = "v1/devices/me/telemetry"
# MQTT_TOKEN = "5i0yrgch2932q5yh5ep9" # Replace with your ThingsBoard device token
# # Connect to WiFi
# def connect_wifi():
# wlan = network.WLAN(network.STA_IF)
# wlan.active(True)
# wlan.connect(WIFI_SSID, WIFI_PASSWORD)
# # Wait for connection
# max_wait = 10
# while max_wait > 0:
# if wlan.status() < 0 or wlan.status() >= 3:
# break
# max_wait -= 1
# print("Waiting for connection...")
# sleep(1)
# if wlan.status() != 3:
# print("Network connection failed")
# return False
# else:
# status = wlan.ifconfig()
# print(f"Connected to WiFi. IP: {status[0]}")
# return True
# # Connect to MQTT
# def connect_mqtt():
# client = MQTTClient("pico_irrigation", MQTT_SERVER, MQTT_PORT, MQTT_TOKEN, "")
# client.connect()
# print("Connected to ThingsBoard MQTT broker")
# return client
# def scrolling_text(lcd, text, line=3, speed=0.3):
# """
# Create a scrolling text effect on the LCD
# :param lcd: LCD display object
# :param text: Text to scroll
# :param line: Line to display scrolling text
# :param speed: Delay between scrolling (seconds)
# """
# # Pad the text to create scrolling effect
# padded_text = " " + text + " "
# # Scroll the text
# for start in range(len(padded_text) - 20):
# # Clear the specific line
# lcd.move_to(0, line)
# lcd.putstr(" " * 20) # Clear the line
# # Display scrolling segment
# lcd.move_to(0, line)
# lcd.putstr(padded_text[start:start+20])
# # Small delay to control scrolling speed
# sleep(speed)
# # Initialize DHT22 Sensor
# dht = DHT22(Pin(15))
# # Initialize First I2C LCD (Sensor Data)
# i2c1 = I2C(0, scl=Pin(5), sda=Pin(4), freq=100000)
# LCD1_ADDR = i2c1.scan()[0]
# lcd1 = I2cLcd(i2c1, LCD1_ADDR, 4, 20)
# # Initialize Second I2C LCD (Weather/Additional Info)
# i2c2 = I2C(1, scl=Pin(7), sda=Pin(6), freq=100000)
# LCD2_ADDR = i2c2.scan()[0]
# lcd2 = I2cLcd(i2c2, LCD2_ADDR, 4, 20)
# # Initialize Potentiometer (Simulating Soil Moisture) on GP26
# soil_moisture = ADC(Pin(26))
# # Initialize LDR (Light Dependent Resistor) on GP27
# light_sensor = ADC(Pin(27))
# print("LDR sensor initialized on GP27")
# # Function to get simulated lux value from wokwi based on pin reading
# def get_simulated_lux(pin_reading):
# """
# In Wokwi, the LDR simulator shows lux values up to 100,000,
# but the actual ADC readings don't match this.
# This function returns the simulated lux value shown in the UI.
# """
# # For debugging/testing - force a specific lux value
# # Uncomment and modify to test different light levels
# # return 95000 # Force extreme light for testing
# # Return a value based on the pin reading
# # This will be very low if the simulator isn't providing proper values
# raw_val = pin_reading
# # If the simulator is working correctly, we should get meaningful values
# # Otherwise, we'll simulate lux based on other sensors to demonstrate functionality
# # If reading is very low (0-1), we'll use temperature as a stand-in for light
# # (This is just to demonstrate the functionality)
# if raw_val <= 1:
# try:
# # Use DHT temperature (multiplied) to simulate different light levels
# # This is just for demonstration purposes
# dht.measure()
# temp_based_lux = min(dht.temperature() * 3000, 100000)
# print(f"Using temperature-based lux simulation: {temp_based_lux}")
# return temp_based_lux
# except:
# # If DHT fails, use a random value for demonstration
# simulated_lux = random.randint(20000, 100000)
# print(f"Using random lux simulation: {simulated_lux}")
# return simulated_lux
# # If we get here, we have an actual reading from the LDR
# print(f"Using actual LDR reading: {raw_val}")
# return raw_val
# # Initialize Servo Motor on GP14
# servo = PWM(Pin(14))
# servo.freq(50) # Standard 50Hz PWM frequency for servos
# # Initialize LED Indicator on GP13
# led = Pin(13, Pin.OUT)
# # Luminance thresholds - set for direct lux comparison
# # We'll use a separate variable for lux thresholds since they're not the same as ADC values
# LUX_HIGH_THRESHOLD = 70000 # 70,000 lux (bright daylight)
# LUX_EXTREME_THRESHOLD = 90000 # 90,000 lux (very bright direct sunlight)
# class WeatherSimulator:
# def __init__(self):
# self.conditions = [
# "Clear Sky", "Partly Cloudy", "Cloudy",
# "Light Rain", "Moderate Rain", "Thunderstorm"
# ]
# self.heavy_rain_alert = False
# def generate_forecast(self):
# """
# Generate detailed weather forecast simulation
# """
# forecast = {
# 'temperature': round(random.uniform(15.0, 30.0), 1),
# 'humidity': random.randint(30, 90),
# 'condition': random.choice(self.conditions),
# 'wind_speed': round(random.uniform(0.0, 10.0), 1),
# 'precipitation_chance': random.randint(0, 100)
# }
# # Set heavy rain alert flag
# self.heavy_rain_alert = forecast['precipitation_chance'] > 75
# return forecast
# def display_forecast(self, lcd):
# """
# Display forecast on LCD
# """
# forecast = self.generate_forecast()
# lcd.clear()
# lcd.putstr("{} Weather".format(forecast['condition']))
# lcd.move_to(0, 1)
# lcd.putstr("Humidity: {}%".format(forecast['humidity']))
# lcd.move_to(0, 2)
# lcd.putstr("Wind:{:.1f}m/s Precip:{}%".format(forecast['wind_speed'],forecast['precipitation_chance']))
# if self.heavy_rain_alert:
# scrolling_text(lcd, f"HEAVY RAIN ALERT! {forecast['precipitation_chance']}%", line=3)
# else:
# scrolling_text(lcd, "Normal Weather", line=3)
# return forecast
# def log_forecast(self, forecast):
# """
# Log forecast data to a file
# """
# try:
# current_time = utime.localtime()
# filename = 'weather_forecast_{:04d}{:02d}{:02d}.csv'.format(
# current_time[0], current_time[1], current_time[2]
# )
# with open(filename, 'a') as f:
# if f.tell() == 0:
# f.write("Timestamp,Temperature,Humidity,Condition,Wind_Speed,Precipitation_Chance\n")
# timestamp = "{:04d}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}".format(*current_time[:6])
# f.write("{},{:.1f},{},{},{:.1f},{}\n".format(
# timestamp,
# forecast['temperature'],
# forecast['humidity'],
# forecast['condition'],
# forecast['wind_speed'],
# forecast['precipitation_chance']
# ))
# print("Forecast logged successfully")
# except Exception as e:
# print("Error logging forecast:", str(e))
# def estimate_biomass(temp, moisture, humidity, light_lux):
# """
# Estimate biomass potential based on environmental factors including light
# Returns a value between 0-100 (percentage)
# """
# # Normalize all values to 0-1 range
# temp_norm = (temp - 10) / 30 # Assume optimal range 10-40°C
# moisture_norm = moisture / 100
# humidity_norm = humidity / 100
# # Normalize light (100000 lux is max value for our simulator)
# light_norm = light_lux / 100000
# # Apply weights to each factor (adjust these based on your needs)
# temp_weight = 0.3
# moisture_weight = 0.3
# humidity_weight = 0.2
# light_weight = 0.2
# # Calculate biomass score (0-100)
# biomass = (
# (temp_norm * temp_weight) +
# (moisture_norm * moisture_weight) +
# (humidity_norm * humidity_weight) +
# (light_norm * light_weight)
# ) * 100
# # Constrain to 0-100 range
# biomass = max(0, min(100, biomass))
# return biomass
# def map_value(value, from_min, from_max, to_min, to_max):
# """Map value from one range to another."""
# return to_min + (to_max - to_min) * (value - from_min) / (from_max - from_min)
# def set_servo_angle(angle):
# """Convert angle (0-180) to PWM duty cycle and set servo position."""
# duty = int(map_value(angle, 0, 180, 1638, 8192))
# servo.duty_u16(duty)
# def get_light_status(lux_value):
# """
# Get light status based on lux value
# Returns (status_text, is_high_alert, is_extreme_alert)
# """
# # Debug print to understand the values we're receiving
# print(f"Light status check - Lux value: {lux_value}")
# # Direct lux comparison - no percentage conversion needed
# if lux_value >= LUX_EXTREME_THRESHOLD:
# print("EXTREME light level detected!")
# return "EXTREME", True, True
# elif lux_value >= LUX_HIGH_THRESHOLD:
# print("HIGH light level detected!")
# return "HIGH", True, False
# elif lux_value >= LUX_HIGH_THRESHOLD * 0.7:
# return "Medium-High", False, False
# elif lux_value >= LUX_HIGH_THRESHOLD * 0.4:
# return "Medium", False, False
# else:
# return "Low", False, False
# # Display system init message
# lcd1.clear()
# lcd1.putstr("System Initializing")
# lcd1.move_to(0, 1)
# lcd1.putstr("Connecting to WiFi...")
# # Connect to WiFi
# if not connect_wifi():
# lcd1.clear()
# lcd1.putstr("WiFi Failed!")
# # Continue in offline mode if WiFi fails
# mqtt_client = None
# else:
# lcd1.move_to(0, 2)
# lcd1.putstr("Connecting to MQTT...")
# try:
# mqtt_client = connect_mqtt()
# lcd1.move_to(0, 3)
# lcd1.putstr("Connected!")
# sleep(1)
# except Exception as e:
# lcd1.move_to(0, 3)
# lcd1.putstr("MQTT Failed!")
# print("MQTT Connection failed:", str(e))
# mqtt_client = None
# sleep(1)
# # Initialize Weather Simulator
# weather_simulator = WeatherSimulator()
# # Tracking for weather display and MQTT updates
# last_weather_update = 0
# last_mqtt_update = 0
# WEATHER_UPDATE_INTERVAL = 60 # Update every minute (for demo purposes)
# MQTT_UPDATE_INTERVAL = 10 # Send data every 10 seconds
# # Prepare data for first display
# lcd1.clear()
# lcd1.putstr("Starting sensors...")
# sleep(1)
# while True:
# try:
# # Read sensor data
# dht.measure()
# temp = dht.temperature()
# hum = dht.humidity()
# moisture_raw = soil_moisture.read_u16()
# moisture_percent = map_value(moisture_raw, 65535, 0, 0, 100)
# # Read light sensor - now using lux value instead of raw ADC value
# light_raw = light_sensor.read_u16()
# print("Raw light sensor value:", light_raw)
# # Get the lux value (either actual or simulated)
# lux_value = get_simulated_lux(light_raw)
# print(f"Lux value: {lux_value}")
# # Get light status based on lux value
# light_status, is_high_light, is_extreme_light = get_light_status(lux_value)
# # Map lux to a percentage for displaying (0-100,000 lux range to 0-100%)
# light_percent = map_value(lux_value, 0, 100000, 0, 100)
# print(f"Light percentage for display: {light_percent}%")
# # Estimate biomass (now using lux value)
# biomass = estimate_biomass(temp, moisture_percent, hum, lux_value)
# biomass_status = "Low" if biomass < 40 else "Good" if biomass < 70 else "Excellent"
# # Get weather forecast
# current_time = utime.time()
# if current_time - last_weather_update >= WEATHER_UPDATE_INTERVAL:
# forecast = weather_simulator.display_forecast(lcd2)
# weather_simulator.log_forecast(forecast)
# last_weather_update = current_time
# # Determine irrigation status based on multiple factors
# # Don't irrigate if heavy rain is coming or if light is extreme
# if weather_simulator.heavy_rain_alert:
# irrigation_status = "RAIN COMING"
# print("Heavy rain alert! Irrigation OFF")
# set_servo_angle(0)
# led.value(0)
# irrigation_active = False
# elif is_extreme_light:
# irrigation_status = "SUN TOO BRIGHT"
# print("Extreme sunlight! Irrigation OFF")
# set_servo_angle(0)
# led.value(0)
# irrigation_active = False
# elif moisture_percent < 30 and not is_high_light:
# irrigation_status = "ACTIVE"
# print("Soil Dry! Irrigation ON")
# set_servo_angle(90)
# led.value(1)
# irrigation_active = True
# else:
# irrigation_status = "IDLE"
# print("Irrigation OFF - Soil:{:.0f}% Light:{}".format(moisture_percent, light_status))
# set_servo_angle(0)
# led.value(0)
# irrigation_active = False
# # Display on LCD1 - Optimized layout
# lcd1.clear()
# lcd1.putstr("Temp:{:.1f}C Hum:{:.0f}%".format(temp, hum)) # Line 0: Temp and Humidity
# lcd1.move_to(0, 1)
# lcd1.putstr("Moist:{:.0f}% Light:{:.0f}%".format(moisture_percent, light_percent)) # Line 1: Moisture and Light
# lcd1.move_to(0, 2)
# lcd1.putstr("Biomass:{}% {}".format(int(biomass), biomass_status)) # Line 2: Biomass status
# # Special alerts for line 3
# if is_extreme_light:
# print("Displaying extreme light warning")
# lcd1.move_to(0, 3)
# lcd1.putstr("EXTREME LIGHT ALERT!")
# sleep(1) # Show the static message briefly
# scrolling_text(lcd1, "WARNING: EXTREME SUNLIGHT DETECTED! PROTECT PLANTS", line=3, speed=0.15)
# elif is_high_light:
# print("Displaying high light warning")
# lcd1.move_to(0, 3)
# lcd1.putstr("HIGH LIGHT WARNING!")
# sleep(1) # Show the static message briefly
# scrolling_text(lcd1, "ALERT: HIGH LIGHT LEVEL - MONITOR PLANTS", line=3, speed=0.2)
# elif weather_simulator.heavy_rain_alert:
# scrolling_text(lcd1, "HEAVY RAIN SOON - IRRIGATION OFF", line=3, speed=0.2)
# else:
# lcd1.move_to(0, 3)
# lcd1.putstr("Irrigation: {}".format(irrigation_status)) # Line 3: Irrigation status
# # Send data to ThingsBoard via MQTT
# if mqtt_client and (current_time - last_mqtt_update >= MQTT_UPDATE_INTERVAL):
# # Create ThingsBoard compatible JSON payload
# payload = {
# "temperature": temp,
# "humidity": hum,
# "soil_moisture": moisture_percent,
# "luminance_lux": lux_value,
# "luminance_percent": light_percent,
# "light_status": light_status,
# "biomass": biomass,
# "irrigation": "ON" if irrigation_active else "OFF",
# "weather_condition": forecast['condition'],
# "precipitation_chance": forecast['precipitation_chance'],
# "wind_speed": forecast['wind_speed']
# }
# # Convert to JSON string
# json_payload = json.dumps(payload)
# # Publish to ThingsBoard
# try:
# mqtt_client.publish(MQTT_TOPIC, json_payload)
# print("Data sent to ThingsBoard:", json_payload)
# last_mqtt_update = current_time
# except Exception as e:
# print("MQTT publish error:", str(e))
# # Try to reconnect
# try:
# mqtt_client.disconnect()
# mqtt_client = connect_mqtt()
# except:
# mqtt_client = None
# print("MQTT reconnection failed")
# except Exception as e:
# print("Error in main loop:", str(e))
# sleep(1)
from machine import I2C, Pin, ADC, PWM
from time import sleep
import network
import json
# from umqtt.simple import MQTTClient
from simple import MQTTClient
from dht import DHT22
from pico_i2c_lcd import I2cLcd
import random
import utime
# WiFi and MQTT settings
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
MQTT_SERVER = "demo.thingsboard.io"
MQTT_PORT = 1883
MQTT_TOPIC = "v1/devices/me/telemetry"
MQTT_TOKEN = "5i0yrgch2932q5yh5ep9" # Replace with your ThingsBoard device token
# Connect to WiFi
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
# Wait for connection
max_wait = 10
while max_wait > 0:
if wlan.status() < 0 or wlan.status() >= 3:
break
max_wait -= 1
print("Waiting for connection...")
sleep(1)
if wlan.status() != 3:
print("Network connection failed")
return False
else:
status = wlan.ifconfig()
print(f"Connected to WiFi. IP: {status[0]}")
return True
# Connect to MQTT
def connect_mqtt():
client = MQTTClient("pico_irrigation", MQTT_SERVER, MQTT_PORT, MQTT_TOKEN, "")
client.connect()
print("Connected to ThingsBoard MQTT broker")
return client
def scrolling_text(lcd, text, line=3, speed=0.3):
"""
Create a scrolling text effect on the LCD
:param lcd: LCD display object
:param text: Text to scroll
:param line: Line to display scrolling text
:param speed: Delay between scrolling (seconds)
"""
# Pad the text to create scrolling effect
padded_text = " " + text + " "
# Scroll the text
for start in range(len(padded_text) - 20):
# Clear the specific line
lcd.move_to(0, line)
lcd.putstr(" " * 20) # Clear the line
# Display scrolling segment
lcd.move_to(0, line)
lcd.putstr(padded_text[start:start+20])
# Small delay to control scrolling speed
sleep(speed)
# Initialize DHT22 Sensor
dht = DHT22(Pin(15))
# Initialize First I2C LCD (Sensor Data)
i2c1 = I2C(0, scl=Pin(5), sda=Pin(4), freq=100000)
LCD1_ADDR = i2c1.scan()[0]
lcd1 = I2cLcd(i2c1, LCD1_ADDR, 4, 20)
# Initialize Second I2C LCD (Weather/Additional Info)
i2c2 = I2C(1, scl=Pin(7), sda=Pin(6), freq=100000)
LCD2_ADDR = i2c2.scan()[0]
lcd2 = I2cLcd(i2c2, LCD2_ADDR, 4, 20)
# Initialize Potentiometer (Simulating Soil Moisture) on GP26
soil_moisture = ADC(Pin(26))
# Initialize LDR (Light Dependent Resistor) on GP27
light_sensor = ADC(Pin(27))
print("LDR sensor initialized on GP27")
# Function to get simulated lux value from wokwi based on pin reading
def get_simulated_lux(pin_reading):
"""
In Wokwi, the LDR simulator shows lux values up to 100,000,
but the actual ADC readings don't match this.
The key issue is that we need to directly use the slider value
from the Wokwi UI rather than relying on ADC readings.
"""
# DIRECT ACCESS METHOD:
# This is a special hack for the Wokwi simulator - in real hardware
# you would use the actual ADC value from the light_sensor.read_u16()
# For debugging/testing - force a specific lux value
# return 95000 # Force extreme light for testing
# return 75000 # Force high light for testing
# return 30000 # Force medium light for testing
# The value appears to be changing in the UI but not being properly read by our code
# We'll create a direct mapping from the current time to simulate changing light levels
# Use current seconds to create a cyclic pattern for demo purposes
current_seconds = utime.time() % 60
# Create a simulation that cycles through different light levels every minute
if current_seconds < 15:
# 0-15 seconds: Low light
simulated_lux = 20000
elif current_seconds < 30:
# 15-30 seconds: Medium light
simulated_lux = 50000
elif current_seconds < 45:
# 30-45 seconds: High light
simulated_lux = 75000
else:
# 45-60 seconds: Extreme light
simulated_lux = 95000
print(f"Using time-based lux simulation: {simulated_lux} lux (based on second: {current_seconds})")
return simulated_lux
# Initialize Servo Motor on GP14
servo = PWM(Pin(14))
servo.freq(50) # Standard 50Hz PWM frequency for servos
# Initialize LED Indicator on GP13
led = Pin(13, Pin.OUT)
# Luminance thresholds - more realistic values for our time-based simulation
LUX_HIGH_THRESHOLD = 70000 # 70,000 lux (bright daylight)
LUX_EXTREME_THRESHOLD = 90000 # 90,000 lux (very bright direct sunlight)
# Add a variable to track when we last checked light status
last_light_check = 0
LIGHT_CHECK_INTERVAL = 5 # Check light status every 5 seconds
class WeatherSimulator:
def __init__(self):
self.conditions = [
"Clear Sky", "Partly Cloudy", "Cloudy",
"Light Rain", "Moderate Rain", "Thunderstorm"
]
self.heavy_rain_alert = False
def generate_forecast(self):
"""
Generate detailed weather forecast simulation
"""
forecast = {
'temperature': round(random.uniform(15.0, 30.0), 1),
'humidity': random.randint(30, 90),
'condition': random.choice(self.conditions),
'wind_speed': round(random.uniform(0.0, 10.0), 1),
'precipitation_chance': random.randint(0, 100)
}
# Set heavy rain alert flag
self.heavy_rain_alert = forecast['precipitation_chance'] > 75
return forecast
def display_forecast(self, lcd):
"""
Display forecast on LCD
"""
forecast = self.generate_forecast()
lcd.clear()
lcd.putstr("{} Weather".format(forecast['condition']))
lcd.move_to(0, 1)
lcd.putstr("Humidity: {}%".format(forecast['humidity']))
lcd.move_to(0, 2)
lcd.putstr("Wind:{:.1f}m/s Precip:{}%".format(forecast['wind_speed'],forecast['precipitation_chance']))
if self.heavy_rain_alert:
scrolling_text(lcd, f"HEAVY RAIN ALERT! {forecast['precipitation_chance']}%", line=3)
else:
scrolling_text(lcd, "Normal Weather", line=3)
return forecast
def log_forecast(self, forecast):
"""
Log forecast data to a file
"""
try:
current_time = utime.localtime()
filename = 'weather_forecast_{:04d}{:02d}{:02d}.csv'.format(
current_time[0], current_time[1], current_time[2]
)
with open(filename, 'a') as f:
if f.tell() == 0:
f.write("Timestamp,Temperature,Humidity,Condition,Wind_Speed,Precipitation_Chance\n")
timestamp = "{:04d}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}".format(*current_time[:6])
f.write("{},{:.1f},{},{},{:.1f},{}\n".format(
timestamp,
forecast['temperature'],
forecast['humidity'],
forecast['condition'],
forecast['wind_speed'],
forecast['precipitation_chance']
))
print("Forecast logged successfully")
except Exception as e:
print("Error logging forecast:", str(e))
def estimate_biomass(temp, moisture, humidity, light_lux):
"""
Estimate biomass potential based on environmental factors including light
Returns a value between 0-100 (percentage)
"""
# Normalize all values to 0-1 range
temp_norm = (temp - 10) / 30 # Assume optimal range 10-40°C
moisture_norm = moisture / 100
humidity_norm = humidity / 100
# Normalize light (100000 lux is max value for our simulator)
light_norm = light_lux / 100000
# Apply weights to each factor (adjust these based on your needs)
temp_weight = 0.3
moisture_weight = 0.3
humidity_weight = 0.2
light_weight = 0.2
# Calculate biomass score (0-100)
biomass = (
(temp_norm * temp_weight) +
(moisture_norm * moisture_weight) +
(humidity_norm * humidity_weight) +
(light_norm * light_weight)
) * 100
# Constrain to 0-100 range
biomass = max(0, min(100, biomass))
return biomass
def map_value(value, from_min, from_max, to_min, to_max):
"""Map value from one range to another."""
return to_min + (to_max - to_min) * (value - from_min) / (from_max - from_min)
def set_servo_angle(angle):
"""Convert angle (0-180) to PWM duty cycle and set servo position."""
duty = int(map_value(angle, 0, 180, 1638, 8192))
servo.duty_u16(duty)
def get_light_status(lux_value):
"""
Get light status based on lux value
Returns (status_text, is_high_alert, is_extreme_alert)
"""
# Debug print to understand the values we're receiving
print(f"Light status check - Lux value: {lux_value}")
# Direct lux comparison - no percentage conversion needed
if lux_value >= LUX_EXTREME_THRESHOLD:
print("EXTREME light level detected!")
return "EXTREME", True, True
elif lux_value >= LUX_HIGH_THRESHOLD:
print("HIGH light level detected!")
return "HIGH", True, False
elif lux_value >= LUX_HIGH_THRESHOLD * 0.7:
return "Medium-High", False, False
elif lux_value >= LUX_HIGH_THRESHOLD * 0.4:
return "Medium", False, False
else:
return "Low", False, False
# Display system init message
lcd1.clear()
lcd1.putstr("System Initializing")
lcd1.move_to(0, 1)
lcd1.putstr("Connecting to WiFi...")
# Connect to WiFi
if not connect_wifi():
lcd1.clear()
lcd1.putstr("WiFi Failed!")
# Continue in offline mode if WiFi fails
mqtt_client = None
else:
lcd1.move_to(0, 2)
lcd1.putstr("Connecting to MQTT...")
try:
mqtt_client = connect_mqtt()
lcd1.move_to(0, 3)
lcd1.putstr("Connected!")
sleep(1)
except Exception as e:
lcd1.move_to(0, 3)
lcd1.putstr("MQTT Failed!")
print("MQTT Connection failed:", str(e))
mqtt_client = None
sleep(1)
# Initialize Weather Simulator
weather_simulator = WeatherSimulator()
# Add variables outside the main loop
lux_value = 0
light_status = "Unknown"
is_high_light = False
is_extreme_light = False
last_light_check = 0
# Tracking for weather display and MQTT updates
last_weather_update = 0
last_mqtt_update = 0
WEATHER_UPDATE_INTERVAL = 60 # Update every minute (for demo purposes)
MQTT_UPDATE_INTERVAL = 10 # Send data every 10 seconds
while True:
try:
# Read sensor data
dht.measure()
temp = dht.temperature()
hum = dht.humidity()
moisture_raw = soil_moisture.read_u16()
moisture_percent = map_value(moisture_raw, 65535, 0, 0, 100)
# Read light sensor - now using time-based simulation instead of ADC
light_raw = light_sensor.read_u16()
print("Raw light sensor value:", light_raw)
# Get the lux value using our time-based simulation
# Check light levels at regular intervals, not every loop
current_time = utime.time()
if current_time - last_light_check >= LIGHT_CHECK_INTERVAL:
lux_value = get_simulated_lux(light_raw)
light_status, is_high_light, is_extreme_light = get_light_status(lux_value)
last_light_check = current_time
# Print clear status message
print(f"**** LIGHT STATUS CHECK: {light_status} ({lux_value} lux) ****")
# Map lux to a percentage for displaying (0-100,000 lux range to 0-100%)
light_percent = map_value(lux_value, 0, 100000, 0, 100)
print(f"Light percentage for display: {light_percent}%")
# Estimate biomass (now using lux value)
biomass = estimate_biomass(temp, moisture_percent, hum, lux_value)
biomass_status = "Low" if biomass < 40 else "Good" if biomass < 70 else "Excellent"
# Get weather forecast
current_time = utime.time()
if current_time - last_weather_update >= WEATHER_UPDATE_INTERVAL:
forecast = weather_simulator.display_forecast(lcd2)
weather_simulator.log_forecast(forecast)
last_weather_update = current_time
# Determine irrigation status based on multiple factors
# Don't irrigate if heavy rain is coming or if light is extreme
if weather_simulator.heavy_rain_alert:
irrigation_status = "RAIN COMING"
print("Heavy rain alert! Irrigation OFF")
set_servo_angle(0)
led.value(0)
irrigation_active = False
elif is_extreme_light:
irrigation_status = "SUN TOO BRIGHT"
print("Extreme sunlight! Irrigation OFF")
set_servo_angle(0)
led.value(0)
irrigation_active = False
elif moisture_percent < 30 and not is_high_light:
irrigation_status = "ACTIVE"
print("Soil Dry! Irrigation ON")
set_servo_angle(90)
led.value(1)
irrigation_active = True
else:
irrigation_status = "IDLE"
print("Irrigation OFF - Soil:{:.0f}% Light:{}".format(moisture_percent, light_status))
set_servo_angle(0)
led.value(0)
irrigation_active = False
# Display on LCD1 - Optimized layout
lcd1.clear()
lcd1.putstr("Temp:{:.1f}C Hum:{:.0f}%".format(temp, hum)) # Line 0: Temp and Humidity
lcd1.move_to(0, 1)
lcd1.putstr("Moist:{:.0f}% Light:{:.0f}%".format(moisture_percent, light_percent)) # Line 1: Moisture and Light
lcd1.move_to(0, 2)
lcd1.putstr("Biomass:{}% {}".format(int(biomass), biomass_status)) # Line 2: Biomass status
# Special alerts for line 3
if is_extreme_light:
print("Displaying extreme light warning")
lcd1.move_to(0, 3)
lcd1.putstr("EXTREME LIGHT ALERT!")
sleep(1) # Show the static message briefly
scrolling_text(lcd1, "WARNING: EXTREME SUNLIGHT DETECTED! PROTECT PLANTS", line=3, speed=0.15)
elif is_high_light:
print("Displaying high light warning")
lcd1.move_to(0, 3)
lcd1.putstr("HIGH LIGHT WARNING!")
sleep(1) # Show the static message briefly
scrolling_text(lcd1, "ALERT: HIGH LIGHT LEVEL - MONITOR PLANTS", line=3, speed=0.2)
elif weather_simulator.heavy_rain_alert:
scrolling_text(lcd1, "HEAVY RAIN SOON - IRRIGATION OFF", line=3, speed=0.2)
else:
lcd1.move_to(0, 3)
lcd1.putstr("Irrigation: {}".format(irrigation_status)) # Line 3: Irrigation status
# Send data to ThingsBoard via MQTT
if mqtt_client and (current_time - last_mqtt_update >= MQTT_UPDATE_INTERVAL):
# Create ThingsBoard compatible JSON payload
payload = {
"temperature": temp,
"humidity": hum,
"soil_moisture": moisture_percent,
"luminance_lux": lux_value,
"luminance_percent": light_percent,
"light_status": light_status,
"biomass": biomass,
"irrigation": "ON" if irrigation_active else "OFF",
"weather_condition": forecast['condition'],
"precipitation_chance": forecast['precipitation_chance'],
"wind_speed": forecast['wind_speed']
}
# Convert to JSON string
json_payload = json.dumps(payload)
# Publish to ThingsBoard
try:
mqtt_client.publish(MQTT_TOPIC, json_payload)
print("Data sent to ThingsBoard:", json_payload)
last_mqtt_update = current_time
except Exception as e:
print("MQTT publish error:", str(e))
# Try to reconnect
try:
mqtt_client.disconnect()
mqtt_client = connect_mqtt()
except:
mqtt_client = None
print("MQTT reconnection failed")
except Exception as e:
print("Error in main loop:", str(e))
sleep(1)