# Système Intégré: Arrosage Automatique et Contrôle d'Accès par Sonnette (Corrigé)
import network
import urequests
import time
import json
import ssd1306
from machine import Pin, I2C, PWM
import dht
from pico_i2c_lcd import I2CLcd
'''------------------------------ CONFIGURATIONS ------------------------------------'''
# --- System Timers ---
TIEMPO_ACTIVACION_HIDROCAM = 180
COMMAND_CHECK_INTERVAL = 5
OLED_UPDATE_INTERVAL = 10
# --- API & Bot URLs ---
DOOR_SERVER_URL = "https://pq7nmnhf-5000.uks1.devtunnels.ms" # <-- IMPORTANT: REPLACE WITH YOUR NGROK URL
API_CLIMA_URL = "http://api.weatherstack.com/current?access_key=5f9685320c910465401de4e462183c00&query=rabat"
TELEGRAM_CONFIG = {
"bot_token": "8311573110:AAFA2aJmLpZ6cXW8mGzrOFqZ4Y_ru72QKoI", # <-- IMPORTANT: Replace with your Bot Token
"chat_id": "5455039697" # <-- IMPORTANT: Replace with your Chat ID
}
# --- PIN Definitions ---
LED_PUMP_PIN = 21; DHT_PIN = 22; SERVO_VALVE_PIN = 23; PIR_PIN = 25; ECHO_PIN = 26; TRIG_PIN = 27
DOORBELL_PIN = 32; SERVO_DOOR_PIN = 2; LED_GREEN_PIN = 13; LED_RED_PIN = 12
SCL_PIN = 18; SDA_PIN = 19
'''------------------------------ HARDWARE INITIALIZATION ------------------------------------'''
i2c = I2C(0, scl=Pin(SCL_PIN), sda=Pin(SDA_PIN))
oled = ssd1306.SSD1306_I2C(128, 64, i2c)
lcd = I2CLcd(i2c, 0x27, 2, 16)
led_pump = Pin(LED_PUMP_PIN, Pin.OUT); dht_sensor = dht.DHT22(Pin(DHT_PIN))
trigger = Pin(TRIG_PIN, Pin.OUT); echo = Pin(ECHO_PIN, Pin.IN)
servo_valve = PWM(Pin(SERVO_VALVE_PIN), freq=50); pir = Pin(PIR_PIN, Pin.IN)
led_green = Pin(LED_GREEN_PIN, Pin.OUT); led_red = Pin(LED_RED_PIN, Pin.OUT)
servo_door = PWM(Pin(SERVO_DOOR_PIN), freq=50)
doorbell = Pin(DOORBELL_PIN, Pin.IN, Pin.PULL_DOWN)
last_update_id = 0
'''------------------------------ CORE FUNCTIONS ------------------------------------'''
def conecte_wifi():
print("Connecting to WiFi..."); lcd.clear(); lcd.putstr("Connecting WiFi"); oled.fill(0); oled.text("Connecting WiFi", 0, 20); oled.show()
red = network.WLAN(network.STA_IF); red.active(True); red.connect("Wokwi-GUEST", "")
while not red.isconnected(): time.sleep(0.5); print(".", end="")
print("\nWiFi Connected!"); lcd.clear(); lcd.putstr("WiFi Connected!"); oled.fill(0); oled.text("WiFi Connected!", 0, 20); oled.show(); time.sleep(2)
def send_telegram_message(message):
token, chat_id = TELEGRAM_CONFIG["bot_token"], TELEGRAM_CONFIG["chat_id"]
msg = message.replace(" ", "%20").replace("\n", "%0A")
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={msg}"
try:
r = urequests.get(url); r.close()
print("Telegram message sent.")
except Exception as e: print(f"Telegram API error: {e}")
'''------------------------------ DOOR LOCK SYSTEM (Button Triggered) ------------------------------------'''
def set_servo_position(servo_pwm, angle):
duty = int(25 + (angle / 180) * 100); servo_pwm.duty(duty)
def check_face_recognition():
endpoint = DOOR_SERVER_URL + "/recognize"
print("\n🔍 Checking face recognition..."); lcd.clear(); lcd.putstr("Verification..."); lcd.move_to(0, 1); lcd.putstr("Analyse visage")
try:
r = urequests.get(endpoint, headers={"ngrok-skip-browser-warning": "true"}, timeout=10)
data = r.json(); r.close()
recognized, name = data.get("recognized", False), data.get("name", "Inconnu")
print(f"-> Recognized: {recognized}, Name: {name}")
return recognized, name
except Exception as e:
print(f"✗ HTTP Error: {e}"); lcd.clear(); lcd.putstr("Erreur Serveur")
return False, "Erreur"
def open_door(name):
print("🔓 Opening door..."); led_green.on(); led_red.off(); lcd.clear(); lcd.putstr(f"Bienvenue {name}")
set_servo_position(servo_door, 90); time.sleep(4)
print("🔒 Closing door..."); set_servo_position(servo_door, 0); time.sleep(1); led_green.off()
def keep_locked():
print("🚫 Access Denied."); led_red.on(); led_green.off(); lcd.clear(); lcd.putstr("ACCES REFUSE!");
send_telegram_message("ALERTE: Personne inconnue détectée à la porte ! Accès refusé.")
time.sleep(2); led_red.off()
def check_doorbell():
if doorbell.value() == 1:
print("🔔 Doorbell pressed!"); recognized, name = check_face_recognition()
if recognized:
open_door(name)
else:
keep_locked()
time.sleep(1); lcd.clear(); lcd.putstr("Systeme Pret")
'''------------------------------ WATERING SYSTEM ------------------------------------'''
def fetch_weather_data():
try: r = urequests.get(API_CLIMA_URL); data = r.json(); r.close(); return data
except: return None
def measure_distance():
trigger.off(); time.sleep_us(2); trigger.on(); time.sleep_us(10); trigger.off()
start = time.ticks_us()
while echo.value() == 0:
if time.ticks_diff(time.ticks_us(), start) > 100000: return None
start = time.ticks_us()
while echo.value() == 1:
if time.ticks_diff(time.ticks_us(), start) > 100000: return None
return (time.ticks_diff(time.ticks_us(), start) * 0.0343) / 2
def manage_irrigation(weather_data_current):
if not weather_data_current: return "Meteo N/A"
if weather_data_current.get('humidity', 100) > 75: return "Humidite elevee"
elif weather_data_current.get('temperature', 0) > 28: activate_pump(10, "Arrosage intensif"); return "Arrosage intensif"
return "Conditions OK"
def activate_pump(duration, reason):
led_pump.on(); send_telegram_message(f"Pompe activée: {duration}s.\nRaison: {reason}"); time.sleep(duration); led_pump.off()
def detect_motion():
if pir.value() == 1:
send_telegram_message("ALERTE: Mouvement détecté dans le jardin!")
time.sleep(10)
def check_telegram_commands(full_weather_data):
global last_update_id
url = f"https://api.telegram.org/bot{TELEGRAM_CONFIG['bot_token']}/getUpdates?offset={last_update_id + 1}"
try:
r = urequests.get(url); data = r.json(); r.close()
for update in data["result"]:
last_update_id = update["update_id"]
if "message" not in update or "text" not in update["message"]: continue
command = update["message"]["text"]
if command == "/status":
print("Action: Sending detailed status report to Telegram.")
dht_sensor.measure(); temp_local, local_humidity = dht_sensor.temperature(), dht_sensor.humidity()
distance = measure_distance(); tank_height = 100
if distance:
water_level_cm = tank_height - distance
water_percentage = ((tank_height - distance) / tank_height) * 100
else:
water_level_cm, water_percentage = "N/A", "N/A"
# --- THIS IS THE CORRECTED SECTION ---
current_data = full_weather_data.get('current', {})
location_data = full_weather_data.get('location', {})
# Safely get each value, providing 'N/A' as a default if the key doesn't exist
ville = location_data.get('name', 'N/A')
temp = current_data.get('temperature', 'N/A')
humedad = current_data.get('humidity', 'N/A')
# The description is a list, so we get the list and then its first element
descripcion_list = current_data.get('weather_descriptions', ['N/A'])
descripcion = descripcion_list[0]
message = (
"SYSTÈME D'ARROSAGE\n\n"
"**Informations météo de la ville:**\n"
"Ville: " + str(ville) + "\n"
"Température: " + str(temp) + "°C\n"
"Humidité: " + str(humedad) + "%\n"
"Conditions: " + str(descripcion) + "\n\n"
"------------------------------------\n\n"
"**Informations locales:**\n"
"Température locale: " + str(temp_local) + "°C\n"
"Humidité sol: " + str(local_humidity) + "%\n\n"
"------------------------------------\n\n"
"**Informations réservoir d'eau:**\n"
"Hauteur réservoir: " + str(tank_height) + " cm\n"
"Niveau eau: " + (f"{water_level_cm:.1f}" if isinstance(water_level_cm, float) else "N/A") + " cm\n"
"Niveau eau: " + (f"{water_percentage:.0f}" if isinstance(water_percentage, float) else "N/A") + "%\n"
)
send_telegram_message(message)
except Exception as e: print(f"Error checking Telegram: {e}")
'''------------------------------ MAIN LOOP ------------------------------------'''
if __name__ == "__main__":
conecte_wifi(); set_servo_position(servo_door, 0)
last_cmd_check, last_weather_update, last_oled_update = 0, 0, 0
riego_realizado = True
full_weather_data = {}
send_telegram_message("Système Intégré Démarré.")
lcd.clear(); lcd.putstr("Systeme Pret"); oled.fill(0); oled.text("System Ready", 0, 20); oled.show()
while True:
current_time = time.time()
check_doorbell()
detect_motion()
if current_time - last_cmd_check > COMMAND_CHECK_INTERVAL:
check_telegram_commands(full_weather_data)
last_cmd_check = current_time
if current_time - last_weather_update > TIEMPO_ACTIVACION_HIDROCAM:
print("\nUpdating weather and watering status..."); clima = fetch_weather_data()
if clima:
full_weather_data = clima
riego_realizado = False
last_weather_update = current_time
if not riego_realizado:
manage_irrigation(full_weather_data.get('current', {}))
riego_realizado = True
if current_time - last_oled_update > OLED_UPDATE_INTERVAL:
oled.fill(0); oled.text("--- JARDIN ---", 0, 0)
dist = measure_distance()
if dist: oled.text(f"Reservoir: {((100-dist)/100)*100:.0f}%", 0, 20)
dht_sensor.measure(); oled.text(f"T:{dht_sensor.temperature()}C H:{dht_sensor.humidity()}%", 0, 40)
oled.show()
last_oled_update = current_time
time.sleep(0.1)