import network
import machine
import ujson
from umqtt.simple import MQTTClient
import neopixel
from time import sleep
# WiFi Configuration
SSID = "Wokwi-GUEST"
PASSWORD = ""
# MQTT Configuration
MQTT_SERVER = "broker.mqttdashboard.com"
DEVICE_ID = "aqlanbakhtiar"
LED_TOPIC = b"Neopixel_led"
MOTOR_TOPIC = b"Servo_motor"
# NeoPixel Configuration
PIXEL_PIN = 12 # D15 pin on ESP32
PIXEL_COUNT = 10
led_strip = neopixel.NeoPixel(machine.Pin(PIXEL_PIN), PIXEL_COUNT)
# Servo Configuration
SERVO_PIN = 17 # D12 pin on ESP32
servo_motor = machine.PWM(machine.Pin(SERVO_PIN), freq=50)
MIN_ANGLE = 0
MAX_ANGLE = 180
def setup_wifi():
wlan = network.WLAN(network.STA_IF)
if not wlan.isconnected():
print("Connecting to network...")
wlan.active(True)
wlan.connect(SSID)
while not wlan.isconnected():
pass
print("Connected to network:", wlan.ifconfig())
def handle_message(topic, message):
print("MQTT Message received on topic:", topic)
try:
data = ujson.loads(message)
if topic == LED_TOPIC and 'color' in data:
color = int(data['color'])
for i in range(PIXEL_COUNT):
led_strip[i] = (color >> 16 & 0xFF, color >> 8 & 0xFF, color & 0xFF)
led_strip.write()
print("Updated LED color to:", color)
elif topic == MOTOR_TOPIC and 'angle' in data:
angle = int(data['angle'])
if angle < MIN_ANGLE:
angle = MIN_ANGLE
elif angle > MAX_ANGLE:
angle = MAX_ANGLE
duty_cycle = int((angle / 180) * 1023)
servo_motor.duty(duty_cycle)
print("Updated servo angle to:", angle)
except Exception as error:
print("Failed to process MQTT message:", error)
def connect_to_mqtt():
mqtt_client = MQTTClient(DEVICE_ID, MQTT_SERVER)
mqtt_client.set_callback(handle_message)
mqtt_client.connect()
mqtt_client.subscribe(LED_TOPIC)
mqtt_client.subscribe(MOTOR_TOPIC)
print("Connected to MQTT server")
while True:
mqtt_client.check_msg()
sleep(1)
def run():
setup_wifi()
connect_to_mqtt()
if __name__ == "__main__":
run()