from utime import sleep
import network
import machine
from umqtt.simple import MQTTClient
import neopixel
from machine import Pin, I2C, PWM, ADC
from ssd1306 import SSD1306_I2C
import dht
import math
import time
WIFI_SSID = 'Wokwi-GUEST'
WIFI_PASSWORD = ''
MQTT_BROKER = 'mqtt-dashboard.com'
MQTT_CLIENT_ID = 'seedlings'
MQTT_TOPICS = {
'luminosity': 'gp/lux',
'temp': 'gp/temp',
'humidity': 'gp/humidity',
'gp/servo': 'gp/servo',
'neopixel': 'gp/neopixel', # Topic for NeoPixel control
'servo_left': 'gp/servo_left', # Topic for servo left control
'servo_right': 'gp/servo_right', # Topic for servo right control
'buzzer': 'gp/buzzer' # Topic for buzzer control
}
# Global variable to track manual override
manual_override = False
servo_angle = 90 # Initial position
def connect_wifi(ssid, password):
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('Connecting to WiFi...')
wlan.connect(ssid, password)
while not wlan.isconnected():
pass
print('WiFi connected!')
def connect_mqtt():
try:
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER)
client.set_callback(on_message)
client.connect()
client.subscribe(MQTT_TOPICS['gp/servo'])
client.subscribe(MQTT_TOPICS['neopixel']) # Subscribe to NeoPixel control topic
client.subscribe(MQTT_TOPICS['servo_left']) # Subscribe to servo left control topic
client.subscribe(MQTT_TOPICS['servo_right']) # Subscribe to servo right control topic
client.subscribe(MQTT_TOPICS['buzzer']) # Subscribe to buzzer control topic
print('MQTT connected and subscribed to all topics!')
return client
except Exception as e:
print('Error connecting to MQTT:', e)
return None
def publish_mqtt(client, topic, message):
try:
client.publish(topic, message)
except Exception as e:
print('Error publishing to MQTT:', e)
client = connect_mqtt()
return client
# OLED configuration
i2c = I2C(0, scl=Pin(22), sda=Pin(21), freq=200000)
oled = SSD1306_I2C(128, 64, i2c)
# NeoPixel configuration
NEOPIXEL_PIN = 14 # Pin on ESP32
NUM_PIXELS = 16 # Number of NeoPixels
np = neopixel.NeoPixel(Pin(NEOPIXEL_PIN), NUM_PIXELS)
# DHT22 configuration
dht22 = dht.DHT22(Pin(4))
# LDR configuration
ldr = ADC(Pin(34)) # Define the pin for LDR
ldr.atten(ADC.ATTN_11DB) # Set the attenuation for full range (0-3.3V)
# Servo configuration
SERVO_PIN = 33
SERVO_MIN_DUTY = 40
SERVO_MAX_DUTY = 115
# Buzzer configuration
BUZZER_PIN = 32
buzzer = PWM(Pin(BUZZER_PIN), freq=1000, duty=0) # Initialize the buzzer with no sound
# Function to handle incoming MQTT messages
def on_message(topic, msg):
global manual_override, servo_angle
print('Received message on topic {}: {}'.format(topic, msg))
if topic == MQTT_TOPICS['gp/servo'].encode():
try:
angle = int(msg)
if 0 <= angle <= 180:
set_servo_angle(angle)
else:
print('Received angle out of range:', angle)
except ValueError:
print('Invalid message:', msg)
elif topic == MQTT_TOPICS['neopixel'].encode(): # Handle NeoPixel control message
command = msg.decode('utf-8')
control_neopixels(command)
if command == 'on':
manual_override = True
elif command == 'off':
manual_override = False
elif topic == MQTT_TOPICS['servo_left'].encode(): # Handle servo left control message
move_servo_left()
elif topic == MQTT_TOPICS['servo_right'].encode(): # Handle servo right control message
move_servo_right()
elif topic == MQTT_TOPICS['buzzer'].encode(): # Handle buzzer control message
beep_buzzer()
# Function to set the servo angle
def set_servo_angle(angle):
duty = SERVO_MIN_DUTY + (SERVO_MAX_DUTY - SERVO_MIN_DUTY) * angle // 180
servo.duty(duty)
print('Servo angle set to:', angle)
def move_servo_left():
global servo_angle
servo_angle = max(0, servo_angle - 10) # Decrease angle by 10 degrees, not below 0
set_servo_angle(servo_angle)
def move_servo_right():
global servo_angle
servo_angle = min(180, servo_angle + 10) # Increase angle by 10 degrees, not above 180
set_servo_angle(servo_angle)
def beep_buzzer():
buzzer.duty(512) # Set duty cycle to 50% to make sound
sleep(0.1) # Beep duration
buzzer.duty(0) # Turn off buzzer
print('Buzzer beeped')
# Initialize servo motor
servo = PWM(Pin(SERVO_PIN), freq=50)
set_servo_angle(servo_angle) # Initial position
def read_dht22():
try:
dht22.measure()
return dht22.temperature(), dht22.humidity()
except Exception as e:
print('Error reading DHT22:', e)
return None, None
def read_ldr():
try:
value = ldr.read()
voltage_ratio = value / (4095 - value)
resistance = 10000 * voltage_ratio
brightness = 10 * math.pow(50000 / resistance, 1 / 0.7)
return round(brightness, 2)
except Exception as e:
print('Error reading LDR:', e)
return None
def light_up_neopixels():
for i in range(NUM_PIXELS):
np[i] = (255, 255, 255) # Set all pixels to white
np.write()
def turn_off_neopixels():
for i in range(NUM_PIXELS):
np[i] = (0, 0, 0) # Turn off all pixels
np.write()
def control_neopixels(command):
if command == 'on':
light_up_neopixels()
elif command == 'off':
turn_off_neopixels()
else:
print('Invalid NeoPixel command:', command)
def main():
global manual_override
connect_wifi(WIFI_SSID, WIFI_PASSWORD)
mqtt_client = connect_mqtt()
LDR_THRESHOLD = 50 # Define a threshold for low light intensity
while True:
# Read temp and humidity from DHT22
temp, humidity = read_dht22()
luminosity = read_ldr()
if temp is not None and humidity is not None and luminosity is not None:
mqtt_client.check_msg() # Check for new messages
# Read LDR value
print(f'LDR value: {luminosity}')
print(f'temp: {temp}°C, Humidity: {humidity}%')
mqtt_client = publish_mqtt(mqtt_client, MQTT_TOPICS['luminosity'], str(luminosity))
mqtt_client = publish_mqtt(mqtt_client, MQTT_TOPICS['temp'], str(temp))
mqtt_client = publish_mqtt(mqtt_client, MQTT_TOPICS['humidity'], str(humidity))
# Display temp, humidity, and LDR value on OLED
oled.fill(0) # Clear the display
oled.text(f'temp: {temp}C', 0, 0)
oled.text(f'Hum: {humidity}%', 0, 10)
oled.text(f'LDR: {luminosity}', 0, 20)
oled.show()
# Light up NeoPixels if low light is detected or manual override is active
if luminosity < LDR_THRESHOLD or manual_override:
light_up_neopixels()
else:
turn_off_neopixels()
time.sleep(2)
if __name__ == '__main__':
main()