"""
MicroPython MQTT LED light control Example for Wokwi.com
To control the led:
1. Open MQTTX application, New connection
2. Name: test, Server Address: broker.emqx.io
3. Click "Connect"
4. Add subscription, set the topic to "wokwi-message"
5. Under "JSON" option, set the Topic to "wokwi-light"
6. Turn on the light: In the Message field type (json format): {"setValue": true}
7. click "Publish"
8. Turn off the light: In the Message field type (json format): {"setValue": false}
9. click "Publish"
Copyright (C) 2022, Uri Shaked
Reference:
https://wokwi.com/arduino/projects/315787266233467457
umqtt.simple参考:
https://mpython.readthedocs.io/en/master/library/mPython/umqtt.simple.html
https://pypi.org/project/micropython-umqtt.simple/
"""
import network
import machine
from machine import Pin
from time import sleep
import ujson
from umqtt.simple import MQTTClient
# WiFi Hotspot Parameters
WIFI_AP = "Wokwi-GUEST"
WIFI_PASS = ""
# MQTT Server Parameters
MQTT_CLIENT_ID = "mqtt-explorer-8ef67865"
MQTT_BROKER = "broker.emqx.io"
MQTT_USER = ""
MQTT_PASSWORD = ""
MQTT_SUB_TOPIC = "wokwi-light"
MQTT_PUB_TOPIC = "wokwi-message"
# LED灯连接在ESP32的IO13引脚
led = Pin(13, Pin.OUT)
# mqtt回调函数,当收到订阅主体的MQTT消息时执行
def mqtt_message(topic, msg):
print("Incoming message:", msg)
try:
msg = ujson.loads(msg)
print(msg)
if(msg['method']=='setValue'):
if(msg['params']): led.on()
elif(not msg['params']): led.off()
else: print('Unknow command')
except Exception as e:
print("Error:", e)
def blink(num, period):
for i in range(num):
led.on()
sleep(period)
led.off()
sleep(period)
def reset():
print("Resetting...")
sleep(1)
machine.reset()
def mainroutine():
blink(3,0.25)
# 延时次数计数变量,每次延时0.2秒,10次延时是2秒钟
counter=0
# 连接WiFi
print("Connecting to WiFi...", end="")
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
wifi.connect(WIFI_AP, WIFI_PASS)
while not wifi.isconnected():
sleep(0.5)
print(".", end="")
print("Wifi Connection Done")
blink(2, 0.5)
# 连接 MQTT服务器
print("Connecting to MQTT...")
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, port=1883, ssl=False, user=MQTT_USER, password=MQTT_PASSWORD)
client.set_callback(mqtt_message)
client.connect()
#sleep(2)
client.subscribe(MQTT_SUB_TOPIC)
#sleep(2)
print("MQTT Connected!")
blink(2, 1)
# 如下循环执行两个周期性事务
# 1. 调用mqtt的check_msg()方法检查是否有来自MQTT服务器的消息
# 2. 每2秒钟,上报一个数据
while True:
if (counter<10000): counter=counter+1
elif (counter>=10000):
counter=0
message = ujson.dumps({
"msg":"hello from esp32"
})
client.publish(MQTT_PUB_TOPIC,message)
# client.publish(MQTT_PUB_TOPIC,"hello from esp32") //umqtt error: OS Error. Succeed after changing to json format.
sleep(0.001)
client.check_msg()
def main():
try:
mainroutine()
except Exception as e:
print("Error: " + str(e))
reset()
main()