# Wokwi Custom Chip - For docs and examples see:
# https://docs.wokwi.com/chips-api/getting-started
#
# SPDX-License-Identifier: MIT
# Instituto Tecnológico y de Estudios Superiores de Monterrey (ITESM)
# Proyecto final IoT Equipo 7
# Copyright 2025 Angel Pérez, Adrian Morales, Emiliano Palafox
from machine import Pin, I2C, ADC
from soil_sensor import SoilSensor
from npk_sensor import NPKSensor
from time import sleep
import ssd1306
import network
import urequests
import json
# Constantes
SENSOR_API = "https://apex.oracle.com/pls/apex/pa01797001/garden/sensor"
CMD_QUEUE_API = "https://apex.oracle.com/pls/apex/pa01797001/garden/command"
POT_TAG = "9e9edd760d"
API_CONTENT = {"Content-Type": "application/json"}
WIDTH = 128
HEIGHT = 64
API_SEND_TIMEFRAME = 5
# Estado de sensores
SENSORS_DATA = {
"soil": {"id":"760d-01","unit": "%", "value": 100, "status": "online"},
"ldr": {"id":"760d-02","unit": "lux", "value": 100, "status": "online"},
"npk": {"id":"760d-03","unit": "units", "value": "N:3,P:3,K:2", "status": "online"}
}
# Inicialización de sensores y hardware
npkSensor = NPKSensor()
soilSensor = SoilSensor()
ldrSensor = ADC(Pin(20))
buzzer = Pin(7, Pin.OUT)
screen = ssd1306.SSD1306_I2C(WIDTH, HEIGHT, I2C(0, scl=Pin(9), sda=Pin(8)))
rgbLED = [Pin(15, Pin.OUT), Pin(16, Pin.OUT), Pin(3, Pin.OUT)]
screen_enabled = True
# Funciones I/O
def clear_serial_monitor():
print("\033[2J\033[H")
def show_message(rows, ixc=18):
if not screen_enabled:
rows = [""]
screen.fill(0)
iy = 0
for element in rows:
screen.text(element, ixc, iy)
iy += 16
screen.show()
def buzz_run(activate):
buzzer.value(1 if activate else 0)
sleep(1)
def led_color(red, green, blue):
rgbLED[0].value(red)
rgbLED[1].value(green)
rgbLED[2].value(blue)
def process_command(command):
cmd = command['command']
args = command['params']
if cmd == "deactivate_sensor":
SENSORS_DATA[args]['status'] = 'offline'
elif cmd == 'activate_sensor':
SENSORS_DATA[args]['status'] = 'online'
elif cmd == 'turn_off_screen':
show_message([""])
screen_enabled = False
screen.poweroff()
elif cmd == "turn_on_screen":
screen_enabled = True
screen.poweron()
show_message(["Smart Pot","Online"])
elif cmd == 'blink_routine':
led_colors = [int(x) for x in args.split(",")]
for _ in range(5):
led_color(led_colors[0],led_colors[1],led_colors[2])
sleep(0.3)
led_color(0,0,0)
sleep(0.3)
elif cmd == 'make_buzz':
buzzer.value(1)
sleep(2)
buzzer.value(0)
else:
print("Comando no reconocido: {} con parametros {}".format(cmd,args))
set_command_executed(command["command_id"])
# Funciones de red
def send_telemetry(sensor_type):
sensor = SENSORS_DATA[sensor_type]
if sensor['status'] == 'online':
data = 'pot_tag={}&sensor_id={}&sensor_type={}&sensor_value={}&sensor_status={}&unit={}'.format(POT_TAG,sensor["id"],sensor_type,sensor["value"],sensor["status"],sensor["unit"])
url = '{}?{}'.format(SENSOR_API, data)
response = urequests.get(url, headers=API_CONTENT)
sleep(2)
else:
print("No se envia datos, habilita el sensor")
def process_command_queue():
url = "{}?pot_tag={}".format(CMD_QUEUE_API,POT_TAG)
response = urequests.get(url)
if response.status_code == 200:
cmd_list = json.loads(response.text)["items"]
for cmd in cmd_list:
process_command(cmd)
sleep(1)
print ("{} comandos procesados".format(len(cmd_list)))
sleep(1)
def set_command_executed(cmd_id):
url = "{}?command_id={}".format(CMD_QUEUE_API,cmd_id)
response = urequests.post(url)
#if response.status_code == 200:
# print("Comando ejecutado")
# Funciones del Main Loop
def send_pot_telemetry():
for item in SENSORS_DATA.keys():
send_telemetry(item)
print("Telemetria enviada")
def refresh_sensors():
soil_data = soilSensor.soil_readings()
npk_dict = npkSensor.read_all()
lux = (ldrSensor.read_u16() / 65535) * 100
SENSORS_DATA['ldr']['value'] = lux
SENSORS_DATA['soil']['value'] = soil_data['moisture']
SENSORS_DATA['npk']['value'] = "{},{},{}".format(npk_dict["N"], npk_dict["P"], npk_dict["K"])
return npk_dict
def refresh_local_ui(npk_dict):
status_update = []
valid = npkSensor.verify_nutrients(npk_dict)
status_moisture = "Dry" if soilSensor.is_dry() else "Ok"
status_npk = "Ok" if valid else "Check"
status_update.append("Pot: {}".format(POT_TAG))
status_update.append("Humidity: {} %".format(SENSORS_DATA['soil']['value']))
status_update.append("NPK: {}".format(SENSORS_DATA['npk']['value']))
# LED RGB basado en estado combinado
red = 1 if status_moisture == "Dry" or status_npk != "Ok" else 0
green = 1 if status_npk == "Ok" else 0
blue = 1 if status_moisture == "Ok" else 0
led_color(red, green, blue)
show_message(status_update,ixc=8)
# Inicialización WiFi
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect("Wokwi-GUEST", "")
while not wlan.isconnected():
pass
# Mensaje de inicio
show_message(["Final Project", "Smart Gardens", "Team 7"])
sleep(1)
show_message(["Smart Pot", "Online"])
sleep(1)
# Main
while True:
try:
clear_serial_monitor()
npk_data = refresh_sensors()
refresh_local_ui(npk_data)
send_pot_telemetry()
process_command_queue()
except Exception as e:
print("Error:", e)
sleep(API_SEND_TIMEFRAME)