import network
import time
import urequests
from machine import Pin
import dht
# ==============================
# WIFI WOKWI
# ==============================
SSID = "Wokwi-GUEST"
PASSWORD = ""
# ==============================
# ENDPOINT PYTHONANYWHERE
# ==============================
ENDPOINT = "https://andersonsenai.pythonanywhere.com/api/dht22"
CLUSTER_ENDPOINT = "https://andersonsenai.pythonanywhere.com/api/cluster"
THINGSPEAK_KEY = "OK0E4E6RHLUX4KQC"
# ==============================
# CONECTA WIFI
# ==============================
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
wifi.connect(SSID, PASSWORD)
print("Conectando ao WiFi", end="")
while not wifi.isconnected():
print(".", end="")
time.sleep(0.5)
print("\nConectado!")
print("IP:", wifi.ifconfig()[0])
# ==============================
# DHT22
# ==============================
sensor = dht.DHT22(Pin(15))
def interpretar_cluster(temp, umi):
if temp >= 25 and umi < 60:
return "quente e seco"
elif temp >= 25 and umi >= 60:
return "quente e úmido"
else:
return "frio e úmido"
'''
def buscar_clusters():
try:
print("\nConsultando clusters...")
resposta = urequests.get(CLUSTER_ENDPOINT)
dados = resposta.json()
resposta.close()
centros = dados.get("centros", [])
if not centros:
print("Nenhum cluster encontrado.")
return
for i, centro in enumerate(centros):
temp = centro[0]
umi = centro[1]
descricao = interpretar_cluster(temp, umi)
print("Cluster", i, "→", descricao)
except Exception as e:
print("Erro ao buscar cluster:", e)
'''
def buscar_clusters():
try:
print("\nConsultando clusters...")
resposta = urequests.get(CLUSTER_ENDPOINT)
if resposta.status_code != 200:
print("Erro HTTP:", resposta.status_code)
resposta.close()
return None
dados = resposta.json()
resposta.close()
centros = dados.get("centros", [])
if not centros:
print("Nenhum cluster encontrado.")
return None
lista_clusters = []
for i, centro in enumerate(centros):
temp = centro[0]
umi = centro[1]
descricao = interpretar_cluster(temp, umi)
print("Cluster", i, "→", descricao)
lista_clusters.append({
"cluster": i,
"descricao": descricao,
"temperatura": temp,
"umidade": umi
})
return lista_clusters
except Exception as e:
print("Erro ao buscar cluster:", e)
return None
def interpretar_cluster_ret_num(temp, umi):
if temp >= 25 and umi < 60:
return 0 # quente e seco
elif temp >= 25 and umi >= 60:
return 1 # quente e úmido
else:
return 2 # frio e úmido
def enviar_thingspeak():
try:
sensor.measure()
temperatura = sensor.temperature()
umidade = sensor.humidity()
#cluster = buscar_clusters()
cluster = interpretar_cluster_ret_num(temperatura, umidade)
print("Temp:", temperatura)
print("Umidade:", umidade)
print("Cluster:", cluster)
url = "https://api.thingspeak.com/update?api_key={}&field1={}&field2={}&field3={}".format(THINGSPEAK_KEY,temperatura,umidade,cluster)
resposta = urequests.get(url)
print("Status:", resposta.status_code)
print("Resposta:", resposta.text)
resposta.close()
except Exception as e:
print("Erro:", e)
# ==============================
# LOOP
# ==============================
while True:
try:
sensor.measure()
temperatura = sensor.temperature()
umidade = sensor.humidity()
payload = {
"temperatura": temperatura,
"umidade": umidade
}
print("Enviando:", payload)
resposta = urequests.post(
ENDPOINT,
json=payload,
headers={"Content-Type": "application/json"}
)
print("Status HTTP:", resposta.status_code)
resposta.close()
except Exception as e:
print("Erro:", e)
time.sleep(5)
buscar_clusters()
time.sleep(2)
enviar_thingspeak()
time.sleep(10)