import network
import urequests
import json
import ussl
import usocket
import ubinascii
from machine import Pin, ADC, SoftI2C
from time import sleep, ticks_ms, ticks_diff
import framebuf
import dht
from umqtt.simple import MQTTClient
# ══════════════════════════════════════════════
# CONFIGURAÇÕES — edite apenas esta seção
# ══════════════════════════════════════════════
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASS = ""
API_KEY = "09507b780e315df66ee6a1b0f14ffa71"
CIDADE = "Mogi das Cruzes,BR"
MQTT_HOST = "broker.hivemq.com"
MQTT_PORT = 1883
MQTT_USER = ""
MQTT_PASS = ""
CLIENT_ID = "esp32-estacao-met"
# ── GOOGLE SHEETS ──────────────────────────────
SHEETS_URL = "https://script.google.com/macros/s/AKfycbzr1qTGoXM-7iDTC_AfanpSpcSoFEJg73RBFlJuDROGCN92l96j-8C_nHJ2aJzh3qbyYw/exec"
# ── GMAIL — edite aqui ─────────────────────────
GMAIL_USER = "[email protected]" # ← seu email Gmail
GMAIL_PASS = "nmvi fxgb cbwx frkb" # ← senha de app (16 chars com espaços)
EMAIL_DEST = "[email protected]" # ← destinatário
HORA_RELATORIO = 23 # hora do envio do relatório (0-23)
# Tópicos MQTT
T_JSON = "estacao/dados"
T_TEMP = "estacao/temperatura"
T_UMID = "estacao/umidade"
T_LUZ = "estacao/luminosidade"
T_GAS = "estacao/qualidade_ar"
T_PREV = "estacao/previsao"
T_PRESSAO = "estacao/pressao"
# ══════════════════════════════════════════════
# PINOS
# ══════════════════════════════════════════════
DHT_PIN = 4
LDR_PIN = 34
MQ2_PIN = 35
BTN_PIN = 14
I2C_SDA = 21
I2C_SCL = 22
# ══════════════════════════════════════════════
# OLED — driver manual via framebuf
# ══════════════════════════════════════════════
i2c = SoftI2C(scl=Pin(I2C_SCL), sda=Pin(I2C_SDA))
ADDR = 0x3c
buf = bytearray(1024)
fb = framebuf.FrameBuffer(buf, 128, 64, framebuf.MONO_VLSB)
def cmd(c):
i2c.writeto(ADDR, bytes([0x00, c]))
def oled_show():
for p in range(8):
cmd(0xB0 + p)
cmd(0x00); cmd(0x10)
i2c.writeto(ADDR, b'\x40' + buf[p*128:(p+1)*128])
def oled_init():
for c in [0xAE,0xA8,0x3F,0xD3,0x00,0x40,0xA1,0xC8,
0xDA,0x12,0x81,0x7F,0xA4,0xA6,0xD5,0x80,
0x8D,0x14,0xAF]:
cmd(c)
def cls():
fb.fill(0)
def txt(s, x, y):
fb.text(str(s), x, y, 1)
def linha(x1, y1, x2, y2):
fb.line(x1, y1, x2, y2, 1)
def barra(x, y, w, h, pct):
fb.rect(x, y, w, h, 1)
fw = max(0, int((pct / 100) * (w - 2)))
if fw > 0:
fb.fill_rect(x+1, y+1, fw, h-2, 1)
# ══════════════════════════════════════════════
# SENSORES
# ══════════════════════════════════════════════
sensor_dht = dht.DHT22(Pin(DHT_PIN))
ldr = ADC(Pin(LDR_PIN)); ldr.atten(ADC.ATTN_11DB)
mq2 = ADC(Pin(MQ2_PIN)); mq2.atten(ADC.ATTN_11DB)
botao = Pin(BTN_PIN, Pin.IN, Pin.PULL_UP)
# ══════════════════════════════════════════════
# VARIÁVEIS GLOBAIS
# ══════════════════════════════════════════════
tela_atual = 0
ultima_dht = 0
ultima_api = 0
ultimo_mqtt = 0
ultimo_btn = 0
ultima_sheets = 0
temp = 25.0
umid = 60.0
previsao = "Aguardando..."
pressao = 1013
mqtt_client = None
wifi_ok = False
# Acumuladores para média diária
leituras_temp = []
leituras_umid = []
hora_atual = 0
data_atual = "00/00/00"
ultimo_email = -1
email_enviado = False # controle teste email
# ══════════════════════════════════════════════
# FUNÇÕES DE CÁLCULO
# ══════════════════════════════════════════════
def calcular_conforto(t, h):
hi = (-8.784 + 1.611*t + 2.338*h
- 0.146*t*h - 0.0123*t**2
- 0.0164*h**2 + 0.00221*t**2*h
+ 0.000725*t*h**2 - 0.00000358*t**2*h**2)
return round(hi, 1)
def ler_luz():
return round((ldr.read() / 4095) * 100)
def ler_gas():
return round((mq2.read() / 4095) * 100)
def desc_luz(p):
if p > 80: return "Muito sol"
elif p > 50: return "Ensolarado"
elif p > 20: return "Nublado"
else: return "Escuro"
def desc_gas(p):
if p < 20: return "Otimo"
elif p < 40: return "Bom"
elif p < 60: return "Medio"
elif p < 80: return "Ruim"
else: return "PERIGO!"
# ══════════════════════════════════════════════
# WI-FI
# ══════════════════════════════════════════════
def conectar_wifi():
global wifi_ok
cls(); txt("Conectando", 16, 20); txt("Wi-Fi...", 24, 35)
oled_show()
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
wlan.connect(WIFI_SSID, WIFI_PASS)
for _ in range(20):
if wlan.isconnected():
break
sleep(0.5)
wifi_ok = wlan.isconnected()
if wifi_ok:
print("Wi-Fi OK:", wlan.ifconfig()[0])
else:
print("Wi-Fi FALHOU")
return wifi_ok
# ══════════════════════════════════════════════
# HORA ATUAL via API
# ══════════════════════════════════════════════
def obter_hora():
global hora_atual, data_atual
try:
r = urequests.get(
"http://worldtimeapi.org/api/timezone/America/Sao_Paulo",
timeout=8
)
d = r.json()
r.close()
dt = d.get("datetime", "")
hora_atual = int(dt[11:13])
data_atual = dt[8:10] + "/" + dt[5:7] + "/" + dt[2:4]
print("Hora: {}h Data: {}".format(hora_atual, data_atual))
except Exception as e:
print("Erro hora:", e)
# ══════════════════════════════════════════════
# OPENWEATHERMAP
# ══════════════════════════════════════════════
def buscar_previsao(t_sensor):
global previsao, pressao
try:
url = ("http://api.openweathermap.org/data/2.5/weather"
"?q={}&appid={}&units=metric&lang=pt_br".format(CIDADE, API_KEY))
r = urequests.get(url, timeout=10)
d = r.json()
r.close()
pressao = d["main"]["pressure"]
umid_api = d["main"]["humidity"]
descr = d["weather"][0]["description"]
if pressao < 1000:
previsao = "Chuva"
elif pressao < 1008 and umid_api > 75:
previsao = "Nublado"
elif pressao < 1013:
previsao = "Varia"
elif t_sensor > 32 and pressao >= 1013:
previsao = "Ensolarado"
elif pressao >= 1020:
previsao = "Tempo bom"
else:
previsao = descr[:10].capitalize()
print("API OK | Pressao:{}hPa | {}".format(pressao, previsao))
except Exception as e:
print("Erro API:", e)
previsao = "Sem sinal"
# ══════════════════════════════════════════════
# MQTT
# ══════════════════════════════════════════════
def conectar_mqtt():
global mqtt_client
try:
mqtt_client = MQTTClient(
CLIENT_ID, MQTT_HOST,
port=MQTT_PORT,
user=MQTT_USER if MQTT_USER else None,
password=MQTT_PASS if MQTT_PASS else None,
ssl=False,
keepalive=60
)
mqtt_client.connect()
print("MQTT conectado!")
cls(); txt("MQTT OK!", 20, 28); oled_show(); sleep(2)
return True
except Exception as e:
err = str(e)
print("Erro MQTT:", err)
cls()
txt("MQTT ERRO:", 0, 0)
txt(err[:16], 0, 12)
txt(err[16:32], 0, 24)
oled_show(); sleep(3)
mqtt_client = None
return False
def publicar(t, h, luz, gas):
global mqtt_client
if mqtt_client is None:
return
try:
payload = json.dumps({
"temperatura": round(t, 1),
"umidade": round(h, 1),
"luminosidade": luz,
"qualidade_ar": gas,
"previsao": previsao,
"pressao": pressao,
"conforto": calcular_conforto(t, h)
})
mqtt_client.publish(T_JSON, payload.encode())
mqtt_client.publish(T_TEMP, str(round(t,1)).encode())
mqtt_client.publish(T_UMID, str(round(h,1)).encode())
mqtt_client.publish(T_LUZ, str(luz).encode())
mqtt_client.publish(T_GAS, str(gas).encode())
mqtt_client.publish(T_PREV, previsao.encode())
mqtt_client.publish(T_PRESSAO, str(pressao).encode())
print("MQTT publicado!")
except Exception as e:
print("Erro publish:", e)
mqtt_client = None
# ══════════════════════════════════════════════
# GOOGLE SHEETS
# ══════════════════════════════════════════════
def gravar_sheets(t, h):
try:
datetime_str = "{} {}h".format(data_atual, hora_atual)
payload = json.dumps({
"datetime": datetime_str,
"temperatura": round(t, 1),
"umidade": round(h, 1)
})
r = urequests.post(
SHEETS_URL,
data=payload,
headers={"Content-Type": "application/json"},
timeout=15
)
resp = r.json()
r.close()
print("Sheets OK:", resp)
return True
except Exception as e:
print("Erro Sheets:", e)
return False
# ══════════════════════════════════════════════
# EMAIL SMTP GMAIL
# ══════════════════════════════════════════════
def enviar_email(media_t, media_u, qtd):
try:
cls(); txt("Enviando", 20, 20); txt("email...", 20, 35); oled_show()
assunto = "Relatorio Estacao - {}".format(data_atual)
corpo = (
"Relatorio Diario - Estacao Meteorologica\r\n"
"=========================================\r\n"
"Data: {}\r\n"
"Leituras: {}\r\n"
"Media Temperatura: {} C\r\n"
"Media Umidade: {} %\r\n"
"=========================================\r\n"
"Enviado automaticamente pelo ESP32\r\n"
).format(data_atual, qtd, round(media_t, 1), round(media_u, 1))
msg = "From: {}\r\nTo: {}\r\nSubject: {}\r\n\r\n{}".format(
GMAIL_USER, EMAIL_DEST, assunto, corpo)
addr = usocket.getaddrinfo("smtp.gmail.com", 587)[0][-1]
sock = usocket.socket()
sock.connect(addr)
sock.settimeout(10)
def recv(): return sock.recv(1024).decode()
def send(s): sock.send((s + "\r\n").encode())
recv()
send("EHLO esp32")
recv()
send("STARTTLS")
recv()
sock = ussl.wrap_socket(sock, server_hostname="smtp.gmail.com")
send("EHLO esp32")
recv()
send("AUTH LOGIN")
recv()
send(ubinascii.b2a_base64(GMAIL_USER.encode()).decode().strip())
recv()
send(ubinascii.b2a_base64(
GMAIL_PASS.replace(" ", "").encode()).decode().strip())
recv()
send("MAIL FROM:<{}>".format(GMAIL_USER))
recv()
send("RCPT TO:<{}>".format(EMAIL_DEST))
recv()
send("DATA")
recv()
send(msg)
send(".")
recv()
send("QUIT")
sock.close()
print("Email enviado!")
cls(); txt("Email OK!", 16, 28); oled_show(); sleep(2)
return True
except Exception as e:
print("Erro email:", e)
cls(); txt("Email ERRO", 8, 28); oled_show(); sleep(2)
return False
# ══════════════════════════════════════════════
# TELAS OLED
# ══════════════════════════════════════════════
def tela_clima(t, h):
txt("=== CLIMA ===", 8, 0)
linha(0, 10, 127, 10)
txt("Temp: {}C".format(round(t,1)), 0, 14)
txt("Umid: {}%".format(round(h,1)), 0, 26)
txt("Sens: {}C".format(calcular_conforto(t,h)), 0, 38)
linha(0, 50, 127, 50)
if t > 35: txt("! TEMP ALTA !", 8, 54)
elif t < 10: txt("! TEMP BAIXA!", 8, 54)
else: txt("Conforto: OK", 8, 54)
def tela_luz(luz):
txt("=== LUZ ===", 16, 0)
linha(0, 10, 127, 10)
txt("Nivel: {}%".format(luz), 0, 14)
txt(desc_luz(luz), 0, 26)
txt("Intensidade:", 0, 42)
barra(0, 53, 120, 9, luz)
def tela_gas(gas):
txt("=== AR ===", 20, 0)
linha(0, 10, 127, 10)
txt("Gas: {}%".format(gas), 0, 14)
txt(desc_gas(gas), 0, 26)
if gas > 60: txt("!! ALERTA !!", 8, 38)
txt("Nivel:", 0, 42)
barra(0, 53, 120, 9, gas)
def tela_previsao():
txt("=== PREVISAO ===", 0, 0)
linha(0, 10, 127, 10)
txt(previsao, 0, 18)
txt("Press:{}hPa".format(pressao), 0, 32)
linha(0, 44, 127, 44)
s = "WiFi:OK" if wifi_ok else "WiFi:OFF"
m = "MQTT:OK" if mqtt_client else "MQTT:OFF"
txt(s, 0, 48); txt(m, 64, 48)
txt("Mogi das Cruzes", 0, 56)
def tela_dados():
txt("=== DADOS ===", 8, 0)
linha(0, 10, 127, 10)
n = len(leituras_temp)
txt("Leituras: {}".format(n), 0, 14)
if n > 0:
mt = round(sum(leituras_temp) / n, 1)
mh = round(sum(leituras_umid) / n, 1)
txt("Med T: {}C".format(mt), 0, 26)
txt("Med U: {}%".format(mh), 0, 38)
else:
txt("Sem dados", 0, 26)
linha(0, 50, 127, 50)
txt("{} {}h".format(data_atual, hora_atual), 0, 54)
def tela_resumo(t, h, luz, gas):
txt("=== RESUMO ===", 4, 0)
linha(0, 10, 127, 10)
txt("T:{}C U:{}%".format(round(t), round(h)), 0, 14)
txt("Luz:{}% {}".format(luz, desc_luz(luz)[:3]), 0, 26)
txt("Gas:{}% {}".format(gas, desc_gas(gas)[:4]), 0, 38)
linha(0, 50, 127, 50)
txt("BTN p/ trocar tela", 0, 54)
def exibir(tela, t, h, luz, gas):
cls()
if tela == 0: tela_clima(t, h)
elif tela == 1: tela_luz(luz)
elif tela == 2: tela_gas(gas)
elif tela == 3: tela_previsao()
elif tela == 4: tela_dados()
elif tela == 5: tela_resumo(t, h, luz, gas)
oled_show()
# ══════════════════════════════════════════════
# BOTÃO COM DEBOUNCE
# ══════════════════════════════════════════════
def checar_botao():
global tela_atual, ultimo_btn
if not botao.value():
agora = ticks_ms()
if ticks_diff(agora, ultimo_btn) > 300:
tela_atual = (tela_atual + 1) % 6
ultimo_btn = agora
print(">> Tela:", tela_atual)
# ══════════════════════════════════════════════
# BOOT
# ══════════════════════════════════════════════
oled_init()
cls()
txt(" Estacao", 8, 8)
txt("Meteorologica", 4, 22)
txt("Mogi das Cruzes", 0, 36)
txt("Iniciando...", 16, 50)
oled_show()
sleep(2)
conectar_wifi()
if wifi_ok:
obter_hora()
buscar_previsao(temp)
conectar_mqtt()
print("Sistema pronto!")
# ══════════════════════════════════════════════
# LOOP PRINCIPAL
# ══════════════════════════════════════════════
while True:
checar_botao()
agora = ticks_ms()
# Lê DHT22 a cada 2s e acumula para média
if ticks_diff(agora, ultima_dht) > 2000:
try:
sensor_dht.measure()
temp = sensor_dht.temperature()
umid = sensor_dht.humidity()
leituras_temp.append(temp)
leituras_umid.append(umid)
print("T={:.1f}C U={:.1f}%".format(temp, umid))
except Exception as e:
print("Erro DHT:", e)
ultima_dht = agora
# Busca API + hora a cada 5 minutos
if wifi_ok and ticks_diff(agora, ultima_api) > 300000:
buscar_previsao(temp)
obter_hora()
ultima_api = agora
# Publica MQTT a cada 30s
if wifi_ok and ticks_diff(agora, ultimo_mqtt) > 30000:
if mqtt_client is None:
conectar_mqtt()
luz = ler_luz()
gas = ler_gas()
publicar(temp, umid, luz, gas)
ultimo_mqtt = agora
# Grava no Google Sheets a cada 5 minutos
if wifi_ok and ticks_diff(agora, ultima_sheets) > 30000: # 30s para teste
gravar_sheets(temp, umid)
obter_hora()
ultima_sheets = agora
# Envia email de teste após 2 minutos do boot (120000ms)
if wifi_ok and not email_enviado and ticks_diff(agora, 0) > 120000:
if leituras_temp:
media_t = sum(leituras_temp) / len(leituras_temp)
media_u = sum(leituras_umid) / len(leituras_umid)
if enviar_email(media_t, media_u, len(leituras_temp)):
email_enviado = True
luz = ler_luz()
gas = ler_gas()
exibir(tela_atual, temp, umid, luz, gas)
sleep(0.1)