from machine import Pin
import asyncio
import time
from neopixel import NeoPixel
import config
import blynk_mqtt
time.sleep(0.1) # Wait for USB to become ready
print("Welcome to Skill Disk")
# Neo Pixel Config
neo = NeoPixel( Pin(19), 16)
red_color_value = 0
green_color_value = 0
blue_color_value = 0
# Blynk / MQTT Config
mqtt = blynk_mqtt.mqtt
def device_connected():
mqtt.publish("get/ds", "red color,green color,blue color")
def blynk_connected():
print("Blynk MQTT Connected..")
device_connected()
def blynk_disconnected():
print("*** Blynk MQTT disconnected.. ***")
def blynk_callback(topic, payload):
print(f"Topic: {topic}")
print(f"Payload: {payload}")
global red_color_value, green_color_value, blue_color_value
if topic == "downlink/ds/red color":
red_color_value = int(payload)
if topic == "downlink/ds/green color":
green_color_value = int(payload)
if topic == "downlink/ds/blue color":
blue_color_value = int(payload)
neo.fill((red_color_value, green_color_value, blue_color_value))
neo.write()
print("Color filled")
blynk_mqtt.on_connected = blynk_connected
blynk_mqtt.on_disconnected = blynk_disconnected
blynk_mqtt.on_message = blynk_callback
# Wifi Connection
def connect_wifi():
print("Connecting to WiFi..", end="")
import network
sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected():
sta_if.active(True)
sta_if.connect(config.WIFI_SSID, config.WIFI_PASS)
while not sta_if.isconnected():
print(".", end="")
time.sleep_ms(250)
print()
print("Wifi Connected")
def red_es_update(color_value):
mqtt.publish("ds/red_es", color_value)
def green_es_update(color_value):
mqtt.publish("ds/green_es", color_value)
def blue_es_update(color_value):
mqtt.publish("ds/blue_es", color_value)
def device_update():
red_es_update(red_color_value)
green_es_update(green_color_value)
blue_es_update(blue_color_value)
async def publisher_task():
while True:
try:
device_update()
except:
pass
await asyncio.sleep_ms(1000)
connect_wifi()
try:
asyncio.run( asyncio.gather(
blynk_mqtt.task(),
publisher_task()
))
except KeyboardInterrupt:
print("Interrupted")
finally:
asyncio.new_event_loop()