import machine
import neopixel
import network
from umqtt.simple import MQTTClient
import time
# WiFi credentials
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# NeoPixel setup
NUM_PIXELS = 16
pin = machine.Pin(4, machine.Pin.OUT)
np = neopixel.NeoPixel(pin, NUM_PIXELS)
# Servo setup
servo = machine.PWM(machine.Pin(2),freq=50)
# MQTT Broker setup
CLIENT_ID = ""
BROKER = "mqtt-dashboard.com"
TOPIC_PIXEL = "neopixeltasha"
TOPIC_SERVO = "servotasha"
client = MQTTClient(CLIENT_ID, BROKER)
# Connect to Wi-Fi
def connect_to_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('Connecting to WiFi...')
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
while not wlan.isconnected():
pass
print('WiFi Connected:', wlan.ifconfig())
# Connect to MQTT Broker
def connect_to_broker():
try:
client.connect()
print("Connected to MQTT broker")
client.set_callback(on_message)
client.subscribe(TOPIC_PIXEL)
client.subscribe(TOPIC_SERVO)
except Exception as e:
print("Error connecting to MQTT broker:", e)
# Handle incoming MQTT messages
def on_message(topic, msg):
topic_str = topic.decode('utf-8') # Decode bytes to string
if topic_str == TOPIC_PIXEL:
process_pixel_command(msg)
elif topic_str == TOPIC_SERVO:
process_servo_command(msg)
# Process NeoPixel commands
def process_pixel_command(msg):
try:
data = msg.decode('utf-8').split(",")
if len(data) == 3: # Ensure that we have 3 elements in the split data (for RGB)
r = int(data[0])
g = int(data[1])
b = int(data[2])
for i in range(NUM_PIXELS):
np[i] = (r, g, b) # Set all NeoPixels to the received RGB values
np.write()
else:
print("Invalid pixel command format:", data)
except Exception as e:
print("Error processing pixel command:", e)
# Process servo commands
def process_servo_command(msg):
try:
angle = int(msg.decode('utf-8')) # Decode the message to integer
print("Received angle command:", angle)
# Map the angle to duty cycle values
duty_cycle =(int(((angle)/180) * 1023)) # Assuming 0 to 180 degree mapping
print("Calculated duty cycle:", duty_cycle)
servo.duty(duty_cycle)
except ValueError as ve:
print("Invalid integer format for servo command:", ve)
except Exception as e:
print("Error processing servo command:", e)
# Convert the angle to duty cycle
duty = (int(((angle)/180 *2 + 0.5) / 20 * 1023))
# Set the duty cycle
servo.duty(duty)
# Main loop
def main():
connect_to_wifi()
connect_to_broker()
while True:
client.check_msg()
time.sleep(0.1)
if __name__ == "__main__":
main()