import network
import time
import machine
import dht
import json
import urequests
from umqtt.simple import MQTTClient
from servo import Servo
# Wi-Fi configuration
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# ThingsBoard configuration
THINGSBOARD_SERVER = "anhtn512.duckdns.org" # or your ThingsBoard server
THINGSBOARD_PORT = 8883
# ACCESS_TOKEN = "o1a86nigmljgfay30m69" # Replace with your device access token
PROVISIONING_URL = f"https://{THINGSBOARD_SERVER}/api/v1/provision"
PROVISIONING_KEY = "wu3u8qqeni57n02alums"
# PROVISIONING_SECRET = "c0glxtaj5z4tesj8ptgf"
DEVICE_NAME = "ESP32_WOKWI_DEVICE"
ENCRYPTED_SECRET = b"\x08\x5d\x06\x07\x15\x15\x0a\x07\x54\x11\x59\x15\x0e\x1e\x0b\x53\x1d\x15\x0c\x0b"
def xor_decrypt(encrypted, key):
return ''.join(chr(c ^ ord(k)) for c, k in zip(encrypted, key * (len(encrypted) // len(key) + 1)))
def get_key():
print("PROVISIONING_SECRET: ")
key = ""
while True:
try:
char = input()
if char == "\r" or char == "\n":
break
key += char
except:
break
print("Done")
return key
PROVISIONING_SECRET = xor_decrypt(ENCRYPTED_SECRET, get_key())
# Pin configuration
LED_RED = machine.Pin(18, machine.Pin.OUT)
LED_GREEN = machine.Pin(5, machine.Pin.OUT)
LED_BLUE = machine.Pin(17, machine.Pin.OUT)
DHT_PIN = machine.Pin(16)
RELAY_PIN = machine.Pin(15, machine.Pin.OUT)
SERVO_PIN = machine.Pin(4)
# Initialize DHT22
dht_sensor = dht.DHT22(DHT_PIN)
# Initialize Servo
servo = Servo(SERVO_PIN)
# MQTT client
client = None
access_token = None
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print("Connecting to WiFi...")
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
for _ in range(10):
if wlan.isconnected():
break
time.sleep(1)
else:
print("Failed to connect to WiFi")
return False
print("WiFi connected:", wlan.ifconfig())
return True
def provision_device():
global access_token
try:
print("Provisioning device...")
payload = {
"deviceName": DEVICE_NAME,
"provisionDeviceKey": PROVISIONING_KEY,
"provisionDeviceSecret": PROVISIONING_SECRET
}
print("Payload:", payload)
response = urequests.post(PROVISIONING_URL, json=payload, headers={"Content-Type": "application/json"})
print("HTTP response code:", response.status_code)
print("Response:", response.text)
if response.status_code == 200:
data = response.json()
access_token = data.get("credentialsValue")
print("Successfull, access_token:", access_token)
response.close()
return True
else:
print("Fail:", response.status_code)
response.close()
return False
except Exception as e:
print("Exception error:", str(e))
return False
def mqtt_connect():
global client
client = MQTTClient("esp32_wokwi", THINGSBOARD_SERVER, port=THINGSBOARD_PORT, user=access_token, password="", ssl=True)
client.set_callback(mqtt_callback)
try:
client.connect()
print("Connected to ThingsBoard")
# Subscribe to RPC requests
client.subscribe("v1/devices/me/rpc/request/+")
return True
except Exception as e:
print("Failed to connect to ThingsBoard:", e)
return False
def mqtt_callback(topic, msg):
print("Received MQTT message:", topic, msg)
try:
topic_str = topic.decode()
if topic_str.startswith("v1/devices/me/rpc/request/"):
request = json.loads(msg.decode())
method = request.get("method")
params = request.get("params")
if method == "setLedRed":
LED_RED.value(params)
client.publish("v1/devices/me/attributes", json.dumps({"led_red": params}))
elif method == "setLedGreen":
LED_GREEN.value(params)
client.publish("v1/devices/me/attributes", json.dumps({"led_green": params}))
elif method == "setLedBlue":
LED_BLUE.value(params)
client.publish("v1/devices/me/attributes", json.dumps({"led_blue": params}))
elif method == "setRelay":
RELAY_PIN.value(params)
if params:
servo.write(90) # Open (90°) when relay is on
else:
servo.write(0)
client.publish("v1/devices/me/attributes", json.dumps({"relay": params}))
except Exception as e:
print("Error processing RPC:", e)
def read_dht():
try:
dht_sensor.measure()
temp = dht_sensor.temperature()
hum = dht_sensor.humidity()
return temp, hum
except Exception as e:
print("DHT22 read error:", e)
return None, None
def send_telemetry():
temp, hum = read_dht()
if temp is not None and hum is not None:
telemetry = {
"temperature": temp,
"humidity": hum,
"led_red": LED_RED.value(),
"led_green": LED_GREEN.value(),
"led_blue": LED_BLUE.value(),
"relay": RELAY_PIN.value()
}
try:
client.publish("v1/devices/me/telemetry", json.dumps(telemetry))
print("Sent telemetry:", telemetry)
except Exception as e:
print("Failed to send telemetry:", e)
def main():
# Connect to WiFi
if not connect_wifi():
return
if not provision_device():
print("Cung cấp thiết bị thất bại, thoát...")
return
# Connect to ThingsBoard
if not mqtt_connect():
return
# Main loop
while True:
try:
client.check_msg() # Check for incoming MQTT messages
send_telemetry() # Send telemetry data
time.sleep(5) # Wait 5 seconds
except Exception as e:
print("Main loop error:", e)
time.sleep(5)
# Attempt to reconnect
if not mqtt_connect():
break
if __name__ == "__main__":
main()