import network
import time
from machine import Pin, I2C, Timer, ADC, reset
import dht
import ujson
from umqtt.simple import MQTTClient
import ntptime
import machine
# Implementação do módulo i2c_lcd
class I2cLcd:
# LCD Address and other constants
LCD_ADDRESS = 0x27
LCD_CHR = 1
LCD_CMD = 0
LCD_BACKLIGHT = 0x08
ENABLE = 0b00000100
# Timing constants
E_PULSE = 0.0005
E_DELAY = 0.0005
def __init__(self, i2c, addr, num_rows, num_cols):
self.i2c = i2c
self.addr = addr
self.num_rows = num_rows
self.num_cols = num_cols
self.backlight = self.LCD_BACKLIGHT
self.init_lcd()
def init_lcd(self):
self.lcd_byte(0x33, self.LCD_CMD)
self.lcd_byte(0x32, self.LCD_CMD)
self.lcd_byte(0x06, self.LCD_CMD)
self.lcd_byte(0x0C, self.LCD_CMD)
self.lcd_byte(0x28, self.LCD_CMD)
self.lcd_byte(0x01, self.LCD_CMD)
time.sleep(self.E_DELAY)
def lcd_byte(self, bits, mode):
bits_high = mode | (bits & 0xF0) | self.backlight
bits_low = mode | ((bits << 4) & 0xF0) | self.backlight
self.i2c.writeto(self.addr, bytes([bits_high]))
self.lcd_toggle_enable(bits_high)
self.i2c.writeto(self.addr, bytes([bits_low]))
self.lcd_toggle_enable(bits_low)
def lcd_toggle_enable(self, bits):
time.sleep(self.E_DELAY)
self.i2c.writeto(self.addr, bytes([bits | self.ENABLE]))
time.sleep(self.E_PULSE)
self.i2c.writeto(self.addr, bytes([bits & ~self.ENABLE]))
time.sleep(self.E_DELAY)
def clear(self):
self.lcd_byte(0x01, self.LCD_CMD)
def putstr(self, string):
for char in string:
self.lcd_byte(ord(char), self.LCD_CHR)
def move_to(self, col, row):
row_offsets = [0x00, 0x40, 0x14, 0x54]
self.lcd_byte(0x80 | (col + row_offsets[row]), self.LCD_CMD)
# Parâmetros do Servidor MQTT
MQTT_CLIENT_ID = "micropython-weather-demo"
MQTT_BROKER = "broker.mqttdashboard.com"
MQTT_USER = ""
MQTT_PASSWORD = ""
MQTT_TOPIC = "wokwi-weather"
# Configuração do sensor DHT22
dht_sensor = dht.DHT22(Pin(15))
# Configuração do sensor de umidade do solo
soil_moisture_sensor = ADC(Pin(32))
soil_moisture_sensor.atten(ADC.ATTN_11DB) # Para medir até 3.3V
# Configuração do relé, LEDs e botões
rele = Pin(4, Pin.OUT)
led_red = Pin(5, Pin.OUT)
led_green = Pin(16, Pin.OUT)
button1 = Pin(14, Pin.IN, Pin.PULL_UP)
button2 = Pin(12, Pin.IN, Pin.PULL_UP)
button3 = Pin(13, Pin.IN, Pin.PULL_UP)
# Configuração do LCD
I2C_ADDR = 0x27
I2C_NUM_ROWS = 4
I2C_NUM_COLS = 20
i2c = I2C(0, scl=Pin(22), sda=Pin(21), freq=400000)
lcd = I2cLcd(i2c, I2C_ADDR, I2C_NUM_ROWS, I2C_NUM_COLS)
# Timer para desligar a bomba
pump_timer = Timer(-1)
# Variáveis de estado
pump_on = False
pump_start_time = 0
min_humidity = 30
max_humidity = 70
# Função para conectar ao WiFi
def connect_wifi():
print("Conectando ao WiFi", end="")
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('Wokwi-GUEST', '')
while not sta_if.isconnected():
print(".", end="")
time.sleep(0.1)
print(" Conectado!")
print("Endereço IP: ", sta_if.ifconfig()[0])
# Função para conectar ao servidor MQTT
def connect_mqtt():
print("Conectando ao servidor MQTT... ", end="")
try:
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, user=MQTT_USER, password=MQTT_PASSWORD)
client.connect()
print("Conectado ao servidor MQTT!")
return client
except Exception as e:
print("Falha na conexão MQTT: ", e)
return None
# Função para sincronizar o tempo via NTP e ajustar o fuso horário
def sync_time():
print("Sincronizando tempo via NTP...")
try:
ntptime.settime()
tm = time.localtime(time.time() - 10800)
rtc = machine.RTC()
rtc.datetime((tm[0], tm[1], tm[2], tm[6], tm[3], tm[4], tm[5], 0))
print("Tempo sincronizado e ajustado!")
except Exception as e:
print("Falha na sincronização de tempo: ", e)
# Função para obter o timestamp atual em um formato simples
def get_timestamp():
tm = time.localtime()
return "{:02d}/{:02d}/{:04d} {:02d}:{:02d}".format(tm[2], tm[1], tm[0], tm[3], tm[4])
# Função para verificar a conexão MQTT
def check_mqtt(client):
try:
client.ping()
return True
except Exception as e:
print("Perda de conexão MQTT: ", e)
return False
# Função para atualizar o LCD
def update_lcd(temp, hum, soil_moisture, rele_state, rele_time):
lcd.clear()
lcd.putstr("T:{:.1f}C H:{:.1f}%".format(temp, hum))
lcd.move_to(0, 1)
lcd.putstr("Umid. Solo: {:.1f}%".format(soil_moisture))
lcd.move_to(0, 2)
lcd.putstr("Bomba: {} {}".format(rele_state, rele_time))
lcd.move_to(0, 3)
lcd.putstr("{}".format(get_timestamp()[:10]))
# Função para acionar a bomba
def start_pump():
global pump_on, pump_start_time
pump_on = True
pump_start_time = time.time()
rele.value(1)
led_red.value(1)
led_green.value(0)
print("Bomba acionada por 2 minutos")
# Função para desligar a bomba
def stop_pump():
global pump_on
pump_on = False
rele.value(0)
led_red.value(0)
led_green.value(1)
print("Bomba desligada")
# Função para exibir o menu
def show_menu():
lcd.clear()
lcd.putstr("1. Set Humidity")
lcd.move_to(0, 1)
lcd.putstr("2. Restart")
lcd.move_to(0, 2)
lcd.putstr("3. Exit")
# Função para ajustar os limites de umidade
def set_humidity():
global min_humidity, max_humidity
lcd.clear()
lcd.putstr("Min Humidity: ")
lcd.move_to(0, 1)
min_humidity = adjust_value(min_humidity)
lcd.clear()
lcd.putstr("Max Humidity: ")
lcd.move_to(0, 1)
max_humidity = adjust_value(max_humidity)
# Função para ajustar um valor usando os botões
def adjust_value(current_value):
while True:
lcd.move_to(0, 1)
lcd.putstr("{:>3}".format(current_value))
if button1.value() == 0:
current_value += 1
time.sleep(0.2)
if button2.value() == 0:
current_value -= 1
time.sleep(0.2)
if button3.value() == 0:
return current_value
time.sleep(0.1)
# Conectando ao WiFi
connect_wifi()
# Sincronizando o tempo via NTP
sync_time()
# Conectando ao servidor MQTT
client = connect_mqtt()
if client is None:
raise Exception("Não foi possível conectar ao servidor MQTT")
# Loop principal para medir e enviar dados
prev_temp = None
prev_hum = None
prev_soil_moisture = None
prev_rele_state = None
rele_time = ""
led_green.value(1) # LED verde inicializado como ligado
while True:
try:
if not network.WLAN(network.STA_IF).isconnected():
print("WiFi desconectado. Tentando reconectar...")
connect_wifi()
if not check_mqtt(client):
print("Tentando reconectar ao MQTT...")
client = connect_mqtt()
if client is None:
print("Falha na reconexão. Tentando novamente em 5 segundos...")
time.sleep(5)
continue
print("Medindo condições meteorológicas... ", end="")
dht_sensor.measure()
temp = dht_sensor.temperature()
hum = dht_sensor.humidity()
soil_moisture_value = soil_moisture_sensor.read()
soil_moisture_percent = (soil_moisture_value / 4095.0) * 100
# Controle do relé e LED com base na umidade do solo
if soil_moisture_percent < min_humidity and not pump_on:
start_pump()
# Verifica se o botão 1 foi pressionado para exibir o menu
if button1.value() == 0:
show_menu()
while True:
if button1.value() == 0:
set_humidity()
break
if button2.value() == 0:
reset()
if button3.value() == 0:
lcd.clear()
break
time.sleep(0.1)
# Verifica se 2 minutos se passaram para desligar a bomba
if pump_on and (time.time() - pump_start_time >= 120):
stop_pump()
rele_state = "Ligado" if pump_on else "Deslig."
rele_time = get_timestamp()[11:16] if pump_on else ""
if temp != prev_temp or hum != prev_hum or soil_moisture_percent != prev_soil_moisture or rele_state != prev_rele_state:
message = {
"Data e Hora": get_timestamp(),
"Temperatura": "{:.2f} C".format(temp),
"Umidade": "{:.2f} %".format(hum),
"Umidade do Solo": "{:.2f} %".format(soil_moisture_percent),
"Bomba": rele_state
}
print("Atualizado! Reportando ao tópico MQTT {}: {}".format(MQTT_TOPIC, message))
try:
client.publish(MQTT_TOPIC, ujson.dumps(message))
print("Mensagem publicada com sucesso!")
except Exception as e:
print("Falha ao publicar a mensagem: ", e)
prev_temp = temp
prev_hum = hum
prev_soil_moisture = soil_moisture_percent
prev_rele_state = rele_state
# Atualiza o LCD
update_lcd(temp, hum, soil_moisture_percent, rele_state, rele_time)
else:
print("Sem mudanças")
time.sleep(1)
except OSError as e:
if e.args[0] == 128:
print("Conexão MQTT perdida. Tentando reconectar...")
client = connect_mqtt()
if client is None:
print("Falha na reconexão. Tentando novamente em 5 segundos...")
time.sleep(5)
except Exception as e:
print("Erro ao medir/enviar dados: ", e)
time.sleep(5)