from machine import Pin, ADC
import network
import time
import json
from umqtt.simple import MQTTClient
import urandom # Para simular fluctuaciones en el pH
class PHMonitor:
def __init__(self):
# Configuración del hardware
self.potenciometro = ADC(Pin(34)) # Conectar potenciómetro a GPIO34
self.potenciometro.atten(ADC.ATTN_11DB) # Rango completo 0-3.3V
self.potenciometro.width(ADC.WIDTH_12BIT) # 12 bits de resolución (0-4095)
# LED de estado
self.led = Pin(2, Pin.OUT)
# Configuración WiFi
self.wifi_ssid = "Juan pablo's Galaxy S10+"
self.wifi_password = "Domelite1134"
# Configuración HiveMQ
self.mqtt_broker = "broker.hivemq.com"
self.mqtt_port = 1883
self.client_id = "ph_monitor_" + str(urandom.getrandbits(20))
self.topic = "ph/monitoring"
# Variables de calibración (simuladas)
self.calibracion = {
'min_val': 0, # Valor mínimo del ADC
'max_val': 4095, # Valor máximo del ADC
'min_ph': 0.0, # Valor de pH mínimo correspondiente
'max_ph': 14.0 # Valor de pH máximo correspondiente
}
# Conectar a WiFi y MQTT
self.conectar_wifi()
self.conectar_mqtt()
def conectar_wifi(self):
sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected():
print("Conectando a WiFi...")
sta_if.active(True)
sta_if.connect(self.wifi_ssid, self.wifi_password)
while not sta_if.isconnected():
time.sleep(1)
print("WiFi conectado:", sta_if.ifconfig())
def conectar_mqtt(self):
try:
self.client = MQTTClient(self.client_id, self.mqtt_broker, port=self.mqtt_port)
self.client.connect()
print("Conectado a HiveMQ")
self.led.on() # Indicador de conexión
except Exception as e:
print("Error conectando a MQTT:", e)
self.led.off()
def leer_potenciometro(self):
"""Lee el valor del potenciómetro y agrega ruido simulado"""
valor = self.potenciometro.read()
# Simular fluctuaciones pequeñas (ruido)
valor += urandom.randint(-50, 50)
return max(0, min(4095, valor)) # Mantener dentro del rango
def calcular_ph(self, valor_adc):
"""Convierte el valor del ADC a un valor de pH simulado"""
# Mapeo lineal del valor del ADC al rango de pH
ph = ((valor_adc - self.calibracion['min_val']) /
(self.calibracion['max_val'] - self.calibracion['min_val'])) * \
(self.calibracion['max_ph'] - self.calibracion['min_ph']) + \
self.calibracion['min_ph']
# Simular comportamiento no lineal típico de un sensor de pH real
ph = ph + 0.1 * urandom.random() # Pequeña variación aleatoria
return round(ph, 2) # Redondear a 2 decimales
def publicar_datos(self, ph):
"""Publica los datos del pH a HiveMQ"""
datos = {
"ph": ph,
"unidad": "pH",
"timestamp": time.time(),
"estado": "normal" if 6.5 <= ph <= 7.5 else "alerta"
}
try:
self.client.publish(self.topic, json.dumps(datos))
print(f"Datos publicados: {datos}")
except Exception as e:
print("Error publicando datos:", e)
# Intentar reconectar
self.conectar_mqtt()
def ejecutar(self, intervalo=5):
"""Bucle principal del sistema"""
print("Iniciando monitoreo de pH...")
try:
while True:
# Leer y procesar datos
valor_adc = self.leer_potenciometro()
ph = self.calular_ph(valor_adc)
# Mostrar en consola
print(f"Valor ADC: {valor_adc} -> pH: {ph}")
# Publicar datos
self.publicar_datos(ph)
# Esperar para la próxima lectura
time.sleep(intervalo)
except KeyboardInterrupt:
print("Deteniendo monitor...")
self.client.disconnect()
self.led.off()
# Iniciar el sistema
if __name__ == "__main__":
monitor = PHMonitor()
monitor.ejecutar(intervalo=10) # Lectura cada 10 segundos