# Rafael Augusto Geraldo Senne - Prova02 -
import network
import ntptime # Biblioteca pra sincronizar o horário via NTP
import time
from machine import RTC, Pin, ADC, I2C # Controle dos hardwares -Relógio, pinos, analógico/digital, comunicação I2C
from umqtt.simple import MQTTClient
import dht
import ssd1306
SSID = "Rafael S20 fe"
PASSWORD = "mkfy4156"
MQTT_BROKER = "test.mosquitto.org" # Onde vai ter a troca de mensagens
CLIENT_ID = "esp32_client"
TOPIC_TEMP = b"sala/temperatura"
TOPIC_UMID = b"sala/umidade"
TOPIC_LUZ = b"sala/luz"
TOPIC_LED_VERMELHO = b"sala/led_vermelho"
TOPIC_LED_VERDE = b"sala/led_verde"
TOPIC_ALERTA = b"sala/alerta"
TIMEZONE_OFFSET = -3
led_vermelho = Pin(25, Pin.OUT)
led_verde = Pin(26, Pin.OUT)
botao_alerta = Pin(35, Pin.IN)
dht_sensor = dht.DHT11(Pin(4))
ldr = ADC(Pin(34))
ldr.atten(ADC.ATTN_11DB) # Regula faixa de leitura entre ~0–3.6V
i2c = I2C(0, scl=Pin(22), sda=Pin(21)) # Serial Data line é para dados e Serial Clock Line é para sincronização
oled = ssd1306.SSD1306_I2C(128, 64, i2c)
rtc = RTC()
# ---------------- FUNÇÕES ----------------
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(SSID, PASSWORD)
while not wlan.isconnected():
print("Conectando ao Wi-Fi...")
time.sleep(1)
print("Conectado ao Wi-Fi:", wlan.ifconfig())
def setup_time():
try:
ntptime.host = "pool.ntp.org"
ntptime.settime()
print("Hora sincronizada via NTP")
except Exception as e:
print("Falha ao sincronizar tempo:", e)
def hora_local_formatada():
tm = rtc.datetime() # O método datetime do objeto rtc tem uma tupla com a sequência que aparece as informações do relógio
hora_local = (tm[4] + TIMEZONE_OFFSET) % 24
if hora_local < 0: # Evita valores negativos
hora_local += 24
return "{:02d}/{:02d}/{:04d} {:02d}:{:02d}:{:02d}".format(
tm[2], tm[1], tm[0], hora_local, tm[5], tm[6]
) # Substitui os valores da tupla, dia, mês, ano, hora local corrigida, minuto, segundo
def ler_sensores():
try:
dht_sensor.measure()
tempC = dht_sensor.temperature()
umid = dht_sensor.humidity()
except Exception as e:
print("Erro ao ler DHT11:", e)
tempC = None
umid = None
luz_raw = ldr.read()
luz_pct = int((luz_raw / 4095) * 100) # Converte da faixa de 0 ate 4095 pra 0 ate 100
return tempC, umid, luz_raw, luz_pct
def sub_callback(topic, msg):
print("Tópico:", topic, "Mensagem:", msg)
if topic == TOPIC_LED_VERMELHO:
led_vermelho.value(1 if msg == b"1" else 0)
elif topic == TOPIC_LED_VERDE:
led_verde.value(1 if msg == b"1" else 0)
def mqtt_setup():
client = MQTTClient(CLIENT_ID, MQTT_BROKER)
client.set_callback(sub_callback)
client.connect()
client.subscribe(TOPIC_LED_VERMELHO)
client.subscribe(TOPIC_LED_VERDE)
print("Conectado ao broker MQTT e inscrito nos tópicos de LED")
return client
def atualizar_oled(temp, luz_pct, mensagem=""):
oled.fill(0)
if temp is None:
oled.text("Temp: --.- C", 0, 0)
else:
oled.text("Temp: {:.1f} C".format(temp), 0, 0) # Precisa usar .format?
oled.text("Luz: {}%".format(luz_pct), 0, 15)
oled.text(hora_local_formatada(), 0, 30)
oled.text(mensagem, 0, 50)
oled.show()
def main():
connect_wifi()
setup_time()
client = mqtt_setup()
try:
while True:
tempC, umid, luz_raw, luz_pct = ler_sensores()
if tempC is not None:
client.publish(TOPIC_TEMP, str(round(tempC, 1))) # Não lembrava do Round e precisei de ajuda da IA
if umid is not None:
client.publish(TOPIC_UMID, str(int(umid)))
client.publish(TOPIC_LUZ, str(luz_pct))
client.check_msg() # Verifica mensagens recebidas para controlar os LEDs
if botao_alerta.value() == 0:
client.publish(TOPIC_ALERTA, b"ALERTA!")
atualizar_oled(tempC, luz_pct, "Enviando alerta...")
time.sleep(1)
atualizar_oled(tempC, luz_pct)
time.sleep(2)
except KeyboardInterrupt: # Se aplicar um Ctrl + C ele entende que estou desconectando
print("Desconectando...")
client.disconnect()
if __name__ == "__main__":
main()