# Importação das bibliotecas necessárias
import machine
import network
import time
import dht
import uos
from machine import ADC, Pin, I2C, RTC
from ssd1306 import SSD1306_I2C
import uasyncio as asyncio
# --- CONFIGURAÇÃO INICIAL DOS PINOS E VARIÁVEIS ---
# Pinos dos sensores e atuadores (CONFEREM COM SEU DIAGRAMA.JSON)
DHT_PIN = 12
LDR_PIN = 28
RELE_LUZ = 20
RELE_VENT = 21
RELE_UMID = 22
I2C_SDA = 10
I2C_SCL = 11
# Limites para acionamento dos atuadores (ajuste conforme necessidade)
LIMITE_TEMP = 28.0 # Em graus Celsius
LIMITE_UMID = 45.0 # Em porcentagem
LIMITE_LUZ = 40.0 # Em porcentagem
# Intervalos de tempo em milissegundos
INTERVALO_CONTROLE = 3 * 1000 # AJUSTE DINÂMICO: Reduzido de 30s para 3s
INTERVALO_HISTORICO = 15 * 1000 # AJUSTE DINÂMICO: Reduzido de 5min para 15s
INTERVALO_INTERFACE = 100 # 100 ms para atualização da interface
# --- INICIALIZAÇÃO DOS COMPONENTES DE HARDWARE ---
# Sensores
sensor_dht = dht.DHT22(Pin(DHT_PIN))
ldr = ADC(Pin(LDR_PIN))
# Atuadores (relés)
rele_luz = Pin(RELE_LUZ, Pin.OUT, value=0)
rele_vent = Pin(RELE_VENT, Pin.OUT, value=0)
rele_umid = Pin(RELE_UMID, Pin.OUT, value=0)
# Display OLED
try:
i2c = I2C(1, scl=Pin(I2C_SCL), sda=Pin(I2C_SDA))
oled = SSD1306_I2C(128, 64, i2c)
oled.fill(0)
oled.text("Iniciando...", 0, 0)
oled.show()
display_ok = True
except Exception as e:
print(f"Erro ao iniciar display: {e}")
display_ok = False
# Relógio de Tempo Real (RTC) para histórico
rtc = RTC()
# AJUSTE PARA WOKWI: Defina uma data/hora inicial para os testes
# Formato: (ano, mes, dia, dia_da_semana, hora, minuto, segundo, microssegundo)
# Dia da semana: 0=Segunda, 6=Domingo
rtc.datetime((2025, 7, 24, 3, 20, 30, 0, 0))
# Variáveis globais para armazenar os dados dos sensores
temperatura_atual = 0.0
umidade_atual = 0.0
luminosidade_atual = 0.0
# --- LÓGICA DE CONTROLE E MONITORAMENTO (sem alterações) ---
async def controle_ambiente():
global temperatura_atual, umidade_atual, luminosidade_atual
last_run = time.ticks_ms()
while True:
# A lógica de controle é baseada em tempo, não em loop fixo
if time.ticks_diff(time.ticks_ms(), last_run) >= INTERVALO_CONTROLE:
try:
sensor_dht.measure()
temperatura_atual = sensor_dht.temperature()
umidade_atual = sensor_dht.humidity()
except Exception as e:
print(f"Erro ao ler DHT22: {e}")
luminosidade_atual = (ldr.read_u16() / 100000.0) * 100
rele_luz.value(1 if luminosidade_atual < LIMITE_LUZ else 0)
rele_vent.value(1 if temperatura_atual > LIMITE_TEMP else 0)
rele_umid.value(1 if umidade_atual < LIMITE_UMID else 0)
print(f"Controle executado: Temp={temperatura_atual:.1f}C, Umid={umidade_atual:.1f}%, Lux={luminosidade_atual:.1f}%")
last_run = time.ticks_ms()
await asyncio.sleep_ms(1000)
async def salvar_historico():
last_run = time.ticks_ms()
if 'historico.csv' not in uos.listdir():
with open('historico.csv', 'w') as f:
f.write('Data e Hora,Temperatura (C),Umidade (%)\n')
while True:
if time.ticks_diff(time.ticks_ms(), last_run) >= INTERVALO_HISTORICO:
ano, mes, dia, _, hora, minuto, segundo, _ = rtc.datetime()
timestamp = f"{dia:02d}/{mes:02d}/{ano} {hora:02d}:{minuto:02d}:{segundo:02d}"
with open('historico.csv', 'a') as f:
f.write(f'"{timestamp}",{temperatura_atual:.1f},{umidade_atual:.1f}\n')
print(f"Dados salvos no histórico: {timestamp}")
last_run = time.ticks_ms()
await asyncio.sleep_ms(1000)
async def atualizar_display():
if not display_ok: return
while True:
oled.fill(0)
oled.text(f"Temp: {temperatura_atual:.1f} C", 0, 0)
oled.text(f"Umid: {umidade_atual:.1f} %", 0, 12)
oled.text(f"Luz: {luminosidade_atual:.1f} %", 0, 24)
oled.text(f"Vent: {'ON' if rele_vent.value() else 'OFF'}", 0, 40)
oled.text(f"Umidif: {'ON' if rele_umid.value() else 'OFF'}", 64, 40)
oled.text(f"Luzes: {'ON' if rele_luz.value() else 'OFF'}", 0, 52)
oled.show()
await asyncio.sleep_ms(INTERVALO_INTERFACE)
# --- CONFIGURAÇÃO E LÓGICA DO SERVIDOR WEB ---
async def start_wifi_and_server():
# AJUSTE PARA WOKWI: Conectar à rede virtual do simulador
ssid = 'Wokwi-GUEST'
password = ''
station = network.WLAN(network.STA_IF)
station.active(True)
station.connect(ssid, password)
while not station.isconnected():
print("Aguardando conexão...")
await asyncio.sleep(1)
print(f"Conectado! IP: {station.ifconfig()[0]}")
server = await asyncio.start_server(handle_client, "0.0.0.0", 80)
# Mantém o servidor rodando para sempre na simulação
while True:
await asyncio.sleep(5)
# O restante do código do servidor web (handle_client, send_web_page, etc.) permanece o mesmo.
async def handle_client(reader, writer):
try:
request_line = await reader.readline()
request = str(request_line)
if not "GET" in request:
writer.close()
await writer.wait_closed()
return
if "GET /historico" in request:
await send_file(writer, 'historico.csv', 'text/csv')
else:
await send_web_page(writer)
except Exception as e:
print(f"Erro no handle_client: {e}")
finally:
writer.close()
await writer.wait_closed()
async def send_web_page(writer):
html = f"""
<!DOCTYPE html><html lang="pt-br"><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>Painel de Controle</title><meta http-equiv="refresh" content="5">
<style>body{{font-family:sans-serif;margin:2em}}h1,h2{{color:#333}}.status-grid,.atuador-grid{{display:grid;grid-template-columns:1fr 1fr;gap:10px;max-width:400px;margin-bottom:20px}}.card{{background-color:#f4f4f4;border:1px solid #ddd;padding:15px;border-radius:5px}}.label{{font-weight:bold}}.value{{font-size:1.2em}}.on{{color:green}}.off{{color:red}}a{{text-decoration:none;background-color:#007bff;color:white;padding:10px 15px;border-radius:5px}}</style>
</head><body><h1>Painel de Controle - Pecuária</h1><h2>Status Atual</h2><div class="status-grid"><div class="card"><span class="label">Temperatura:</span> <span class="value">{temperatura_atual:.1f} °C</span></div><div class="card"><span class="label">Umidade:</span> <span class="value">{umidade_atual:.1f} %</span></div><div class="card"><span class="label">Luminosidade:</span> <span class="value">{luminosidade_atual:.1f} %</span></div></div>
<h2>Atuadores</h2><div class="atuador-grid"><div class="card"><span class="label">Ventilador:</span> <span class="value {'on' if rele_vent.value() else 'off'}">{'LIGADO' if rele_vent.value() else 'DESLIGADO'}</span></div><div class="card"><span class="label">Umidificador:</span> <span class="value {'on' if rele_umid.value() else 'off'}">{'LIGADO' if rele_umid.value() else 'DESLIGADO'}</span></div><div class="card"><span class="label">Iluminação:</span> <span class="value {'on' if rele_luz.value() else 'off'}">{'LIGADA' if rele_luz.value() else 'DESLIGADA'}</span></div></div>
<h2>Histórico</h2><a href="/historico" download="historico.csv">Baixar Histórico (CSV)</a></body></html>"""
writer.write('HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n')
await writer.drain()
writer.write(html)
await writer.drain()
async def send_file(writer, filename, content_type):
try:
with open(filename, 'r') as f:
writer.write(f'HTTP/1.1 200 OK\r\nContent-Type: {content_type}\r\n')
writer.write(f'Content-Disposition: attachment; filename="{filename}"\r\n\r\n')
await writer.drain()
while True:
chunk = f.read(256)
if not chunk: break
writer.write(chunk)
await writer.drain()
except Exception as e:
writer.write('HTTP/1.1 404 Not Found\r\n\r\n')
await writer.drain()
# --- EXECUÇÃO PRINCIPAL (LOOP DE EVENTOS ASSÍNCRONO) ---
async def main():
asyncio.create_task(controle_ambiente())
asyncio.create_task(salvar_historico())
asyncio.create_task(atualizar_display())
asyncio.create_task(start_wifi_and_server())
while True:
await asyncio.sleep(10)
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Programa interrompido")
finally:
asyncio.new_event_loop()Sensor de umidade
e temperatura
Sensor de luminosidade
Relé da luz (GP20)
Relé do ventilador (GP21)
Relé do sprinkler (GP22)