import time
import dht
import network
import urequests
import ujson
import ssd1306
import ussl
import socket
import ubinascii
from machine import Pin, ADC, I2C
from umqtt.simple import MQTTClient
try:
import ujson as json
except:
import json
try:
from urllib.parse import urlencode
except:
urlencode = None
# WiFi
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASS = ""
# OpenWeatherMap
OWM_API_KEY = "69354f94d933ec6152f12add3f3fbef4"
OWM_CIDADE = "Ribeirao%20Pires,BR"
OWM_URL = "http://api.openweathermap.org/data/2.5/weather?q={}&appid={}&units=metric".format(OWM_CIDADE, OWM_API_KEY)
# HiveMQ
MQTT_BROKER = "broker.hivemq.com"
MQTT_PORTA = 1883
MQTT_USUARIO = None
MQTT_SENHA = None
MQTT_TOPICO = "thaina/estacao/dados"
MQTT_CLIENT = "esp32_" + str(time.time())
# Excel Sheets
EXCEL_URL = "https://script.google.com/macros/s/AKfycbxXtOJR_KA3JUBsMIx7LMl8CKWba56EqmrrpU_DgBIQDJl3fQyyggVliAWbFak4TBW5/exec"
# Intervalos
INTERVALO_MQTT = 5
INTERVALO_API = 60
INTERVALO_SHEETS = 5
INTERVALO_EMAIL = 60
# Pinos
botao = Pin(18, Pin.IN, Pin.PULL_UP)
dht_sensor = dht.DHT22(Pin(27))
ldr = ADC(Pin(34))
ldr.atten(ADC.ATTN_11DB)
mq2 = ADC(Pin(35))
mq2.atten(ADC.ATTN_11DB)
i2c = I2C(0, scl=Pin(22), sda=Pin(21))
oled = ssd1306.SSD1306_I2C(128, 64, i2c)
# Controle
tela_atual = 0
botao_p = True
pressao_atual = None
pressao_anterior = None
cliente_mqtt = None
# WiFi
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
wifi.connect("Wokwi-GUEST", "")
print("Conectando ao WiFi...")
timeout = 10
inicio = time.time()
while not wifi.isconnected():
if time.time() - inicio > timeout:
print("Erro: timeout WiFi")
break
time.sleep(1)
if wifi.isconnected():
print("WiFi conectado!")
# MQTT
def conectar_mqtt():
global cliente_mqtt
try:
cliente_mqtt = MQTTClient(
client_id = MQTT_CLIENT,
server = MQTT_BROKER,
port = MQTT_PORTA,
user = MQTT_USUARIO,
password = MQTT_SENHA,
ssl = False
)
cliente_mqtt.connect()
print("MQTT conectado!")
return True
except Exception as e:
print("Erro MQTT:", e)
return False
def publicar_mqtt(payload):
global cliente_mqtt
try:
if cliente_mqtt:
cliente_mqtt.publish(MQTT_TOPICO, ujson.dumps(payload))
except Exception as e:
print("Erro ao publicar:", e)
conectar_mqtt()
# API
def buscar_pressao():
resp = None
try:
resp = urequests.get(OWM_URL, timeout=10)
if resp.status_code == 200:
dados = resp.json()
print ("Atualizando API")
return dados["main"]["pressure"]
else:
print("Erro API:", resp.status_code)
return None
except Exception as e:
print("Erro API:", e)
return None
finally:
if resp:
resp.close()
# Previsão
def calcular_previsao(pressao_atual, pressao_ant, temp):
if pressao_atual is None:
return "Sem dados"
variacao = 0
if pressao_ant is not None:
variacao = pressao_atual - pressao_ant
if pressao_atual < 1005:
return "Tempestade" if variacao < -2 else "Chuva"
elif pressao_atual < 1013:
return "Nublado"
elif pressao_atual <= 1020:
return "Ensolarado" if temp > 30 else "Bom tempo"
else:
return "Muito seco" if variacao > 2 else "Ensolarado"
# Excel Sheets
def enviar_sheets(temp, umid):
data = {'temp': str(temp), 'umid': str(umid)}
try:
if urlencode:
payload = urlencode(data)
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
response = urequests.post(EXCEL_URL, data=payload, headers=headers)
else:
payload = json.dumps(data)
headers = {'Content-Type': 'application/json'}
response = urequests.post(EXCEL_URL, data=payload, headers=headers)
print('Medidas enviadas com sucesso!!')
response.close()
except Exception as e:
print('Erro Sheel:', e)
# SMTP Gmail
def enviar_email(media_temp, media_umid, num_leituras):
resp = None
try:
t = time.localtime()
data = "{:02d}/{:02d}/{:04d}".format(t[2], t[1], t[0])
url = EXCEL_URL + "?email=true&media_temp={}&media_umid={}&leituras={}&data={}".format(
media_temp, media_umid, num_leituras, data)
resp = urequests.get(url, timeout=10)
print( resp.text[:50])
except Exception as e:
print("Erro Email:", e)
finally:
if resp:
resp.close()
# Inicialização
conectar_mqtt()
pressao_atual = buscar_pressao()
pressao_anterior = None
previsao = calcular_previsao(pressao_atual, None, 25)
ultimo_mqtt = time.time()
ultima_api = time.time()
ultimo_sheets = time.time()
ultimo_email = time.time()
soma_temperatura = 0
soma_umidade = 0
quantidade_leituras = 0
# Sensores
def ler_sensores():
try:
dht_sensor.measure()
temp = dht_sensor.temperature()
umid = dht_sensor.humidity()
except:
temp, umid = 0, 0
ldr_val = ldr.read()
mq2_val = mq2.read()
return temp, umid, ldr_val, mq2_val
def calcular_conforto(temp, umid):
hi = -8.78469475556
hi += 1.61139411 * temp
hi += 2.33854883889 * umid
hi += -0.14611605 * temp * umid
hi += -0.012308094 * temp ** 2
hi += -0.0164248277778 * umid ** 2
hi += 0.002211732 * temp ** 2 * umid
hi += 0.00072546 * temp * umid ** 2
hi += -0.000003582 * temp ** 2 * umid ** 2
return round(hi, 1)
def classificar_luz(val):
if val < 1000: return "Escuro"
elif val < 2000: return "Baixa"
elif val < 3000: return "Media"
else:
return "Alta"
def classificar_ar(val):
if val < 1000: return "Bom"
elif val < 2500: return "Moderado"
else:
return "Ruim"
# Telas OLED
def tela_temp_umid(temp, umid):
oled.fill(0)
oled.text("CLIMATIZACAO", 0, 0)
oled.text("TEMP: {}C".format(temp), 0, 20)
oled.text("UMID: {}%".format(umid), 0, 35)
if temp >= 35:
oled.text("!TEMP ALTA!", 0, 52)
oled.show()
def tela_conforto(temp, umid):
oled.fill(0)
oled.text("CONFORTO", 0, 0)
hi = calcular_conforto(temp, umid)
oled.text("INDICE: {}C".format(hi), 0, 18)
if hi < 27: status = "Confortavel"
elif hi < 32: status = "Atencao"
elif hi < 41: status = "Perigo"
else: status = "Extremo!"
oled.text("STATUS:", 0, 35)
oled.text(status, 0, 48)
oled.show()
def tela_luz(ldr_val):
oled.fill(0)
oled.text("LUMINOSIDADE", 0, 0)
oled.text("VAL: {}".format(ldr_val), 0, 18)
oled.text("NIVEL:", 0, 35)
oled.text(classificar_luz(ldr_val), 0, 48)
oled.show()
def tela_ar(mq2_val):
oled.fill(0)
oled.text("QUALIDADE AR", 0, 0)
oled.text("VAL: {}".format(mq2_val), 0, 18)
qualidade = classificar_ar(mq2_val)
oled.text("STATUS:", 0, 35)
oled.text(qualidade, 0, 48)
if qualidade == "Ruim":
oled.text("!Ar Poluido!", 0, 52)
oled.show()
def tela_previsao(previsao, pressao):
oled.fill(0)
oled.text("Previsao", 0, 0)
oled.text(previsao, 0, 20)
if pressao:
oled.text("{}hPa".format(pressao), 0, 38)
else:
oled.text("Press: --", 0, 38)
oled.show()
# Loop de telas
telas = [tela_temp_umid, tela_conforto, tela_luz, tela_ar, tela_previsao]
while True:
temp, umid, ldr_val, mq2_val = ler_sensores()
agora = time.time()
#API
if (agora - ultima_api) >= INTERVALO_API:
pressao_anterior = pressao_atual
pressao_atual = buscar_pressao()
previsao = calcular_previsao(pressao_atual, pressao_anterior, temp)
ultima_api = agora
#MQTT
if (agora - ultimo_mqtt) >= INTERVALO_MQTT:
payload = {
"temperatura" : temp,
"umidade" : umid,
"luminosidade": ldr_val,
"qualidade_ar": classificar_ar(mq2_val),
"conforto" : calcular_conforto(temp, umid),
"pressao" : pressao_atual,
"previsao" : previsao
}
publicar_mqtt(payload)
ultimo_mqtt = agora
print(payload)
# Acumula para média
soma_temperatura += temp
soma_umidade += umid
quantidade_leituras += 1
# Sheets
if (agora - ultimo_sheets) >= INTERVALO_SHEETS:
enviar_sheets(temp, umid)
ultimo_sheets = agora
# Email
if (agora - ultimo_email) >= INTERVALO_EMAIL:
if quantidade_leituras > 0:
media_temperatura = round(soma_temperatura / quantidade_leituras, 1)
media_umidade = round(soma_umidade / quantidade_leituras, 1)
else:
media_temperatura = 0
media_umidade = 0
print("Leituras:", quantidade_leituras, "Media temp:", media_temperatura, "Media umid:", media_umidade)
enviar_email(media_temperatura, media_umidade, quantidade_leituras)
ultimo_email = agora
soma_temperatura = 0
soma_umidade = 0
quantidade_leituras = 0
# Botão
botao_atual = botao.value()
if botao_atual == 0 and botao_p == 1:
tela_atual = (tela_atual + 1) % len(telas)
time.sleep(0.2)
botao_p = botao_atual
args = [
(temp, umid),
(temp, umid),
(ldr_val,),
(mq2_val,),
(previsao, pressao_atual)
]
telas[tela_atual](*args[tela_atual])
time.sleep(60)