import machine
import neopixel
import network
from umqtt.simple import MQTTClient
import time
# WiFi credentials
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# NeoPixel setup
NUM_PIXELS = 16
pin = machine.Pin(18, machine.Pin.OUT) # Pin 18
np = neopixel.NeoPixel(pin, NUM_PIXELS)
# Servo setup
servo = machine.PWM(machine.Pin(22), freq=50) # Pin 22
# MQTT Broker setup
CLIENT_ID = "WokwiAf"
BROKER = "mqtt-dashboard.com"
TOPIC_PIXEL = "PixelAf"
TOPIC_SERVO = "ServoAf"
client = MQTTClient(CLIENT_ID, BROKER)
# Connect to Wi-Fi
def connect_to_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('Connecting to WiFi...')
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
while not wlan.isconnected():
pass
print('WiFi Connected:', wlan.ifconfig())
# Connect to MQTT Broker
def connect_to_broker():
try:
client.connect()
print("Connected to MQTT broker")
client.set_callback(on_message)
client.subscribe(TOPIC_PIXEL)
client.subscribe(TOPIC_SERVO)
except Exception as e:
print("Error connecting to MQTT broker:", e)
# Handle incoming MQTT messages
def on_message(topic, msg):
topic_str = topic.decode('utf-8') # Decode bytes to string
if topic_str == TOPIC_PIXEL:
process_pixel_command(msg)
elif topic_str == TOPIC_SERVO:
process_servo_command(msg)
# Process NeoPixel commands
def process_pixel_command(msg):
try:
data = eval(msg.decode('utf-8')) # Using eval to parse the list from string to list
if len(data) == 3: # Ensure that we have 3 elements in the list (for RGB)
r = int(data[0])
g = int(data[1])
b = int(data[2])
for i in range(NUM_PIXELS):
np[i] = (r, g, b) # Set all NeoPixels to the received RGB values
np.write()
else:
print("Invalid pixel command format:", data)
except ValueError as ve:
print("ValueError: Could not convert RGB values to integers:", ve)
except Exception as e:
print("Error processing pixel command:", e)
# Process servo commands
def process_servo_command(msg):
try:
angle = int(msg.decode('utf-8')) # Decode the message to integer
print("Received angle command:", angle)
# Map the angle to duty cycle values
duty_cycle = int((angle / 180) * 1023) # Assuming 0 to 180 degree mapping
print("Calculated duty cycle:", duty_cycle)
servo.duty(duty_cycle)
except ValueError as ve:
print("Invalid integer format for servo command:", ve)
except Exception as e:
print("Error processing servo command:", e)
# Main loop
def main():
connect_to_wifi()
connect_to_broker()
while True:
client.check_msg()
time.sleep(0.1)
if __name__ == "__main__":
main()