"""
ESP32 MQTT Subscribe with Micropython
Library: servo.py
1. Go to https://www.hivemq.com/demos/websocket-client/
2. Start connection
3. Set the topic on the publish section (neopixel or servo) - view line 78
4. Message for neopixel: "[255, 0, 0]" RGB value
5. Message for servo: "45" any angle
"""
from machine import Pin, PWM
import time
import network
from umqtt.simple import MQTTClient
from servo import Servo
import ujson
from neopixel import NeoPixel
# Define neopixel
pixels = NeoPixel(Pin(13), 16)
pixels.fill((0, 0, 0))
pixels.write()
# WiFi configuration
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# MQTT configuration
MQTT_CLIENT_ID = ""
MQTT_BROKER = "broker.mqttdashboard.com"
MQTT_PORT = ""
MQTT_USER = ""
MQTT_PASSWORD = ""
MQTT_TOPIC = "servo-neopixel1"
# Callback function to handle messages from MQTT broker
def mqtt_message(topic, msg):
topic = topic.decode('utf-8')
msg_str = msg.decode('utf-8')
print("Incoming message for", topic, ":", msg_str)
if topic == 'neopixel1': # To change neopixel led light colours
try:
msg = ujson.loads(msg)
pixels.fill((msg[0], msg[1], msg[2]))
pixels.write()
except Exception as e:
print("Error:", e)
elif topic == 'servo': # To control the angle of servo
angle = float(msg.decode('utf-8'))
motor.move(angle)
else: # When topic is not recognized
print("Not recognized please check your topic")
# Define Servo
motor=Servo(pin=19)
# Connect to WiFi
print("Connecting to WiFi", end="")
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('Wokwi-GUEST', '')
while not sta_if.isconnected():
print(".", end="")
time.sleep(0.1)
print(" Connected!")
# Connecting to the MQTT Broker server
print("Connecting to MQTT server... ", end="")
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER)
client.set_callback(mqtt_message)
client.connect()
# client.subscribe("TOPIC NAME")
client.subscribe("neopixel1")
client.subscribe("servo")
print("Connected")
# Main Function
while True:
client.wait_msg()