from machine import Pin
from umqtt.simple import MQTTClient
import time
import json
import ujson
import network
import dht
# ============= Objects =============
led = Pin(4, Pin.OUT)
led.off()
dht_sensor = dht.DHT22(Pin(2))
# ============= WiFi =============
sta = network.WLAN(network.STA_IF)
if not sta.isconnected():
print('connecting to network...')
sta.active(True)
sta.connect('Wokwi-GUEST', '')
while not sta.isconnected():
pass
print('network config:', sta.ifconfig())
# ============= MQTT Configuration =============
MQTT_CLIENT_ID = "ku3hqodm4qc6yhmbh88m"
USERNAME = "hlnp06zv5mnon3mvrzy8"
PASSWORD = "bgq5av4a66j4n88glc3q"
BROKER = "thingsboard.cloud"
PUB_TOPIC = "v1/devices/me/telemetry"
SUB_TOPIC = "v1/devices/me/attributes"
RPC_TOPIC = "v1/devices/me/rpc/request/+"
# ============= Callback: MQTT receive message =============
def on_message(topic, msg):
"""Handle incoming shared attribute + RPC messages"""
print(f"[SUB] Topic: {topic}, Message: {msg}")
# ===== Handle Shared Attributes =====
if SUB_TOPIC in topic:
try:
msg_dict = ujson.loads(msg)
if 'led_state' in msg_dict:
led_state = msg_dict['led_state']
if led_state:
led.on()
print("[LED] ON")
else:
led.off()
print("[LED] OFF")
except Exception as e:
print(f"[ERROR] Parse attribute: {e}")
# ===== Handle RPC (Force Publish) =====
elif 'rpc/request' in topic:
try:
msg_dict = ujson.loads(msg)
# RPC format: {"method": "publish_data", "params": {...}}
method = msg_dict.get('method', '')
request_id = topic.split('/')[-1] # Extract request ID
if method == 'publish_data' or method == 'forcePubData':
print("[RPC] Force publish requested!")
force_publish_data()
# Send RPC response
response_topic = f"v1/devices/me/rpc/response/{request_id}"
response = ujson.dumps({"status": "success"})
client.publish(response_topic, response)
print("[RPC] Response sent")
else:
print(f"[RPC] Unknown method: {method}")
except Exception as e:
print(f"[ERROR] RPC parse: {e}")
# ============= Force Publish Function =============
def force_publish_data():
"""Publish DHT data immediately (for RPC)"""
try:
dht_sensor.measure()
temp = dht_sensor.temperature()
hum = dht_sensor.humidity()
data = {
"temperature": temp,
"humidity": hum
}
data_json = ujson.dumps(data)
client.publish(PUB_TOPIC, data_json)
print(f"[FORCE] Temp: {temp:.2f}°C, Humidity: {hum:.2f}% - Published Immediately!")
except Exception as e:
print(f"[ERROR] Force publish: {e}")
# ============= MQTT Connect =============
print("MQTT Connecting...")
client = MQTTClient(
client_id=MQTT_CLIENT_ID,
server=BROKER,
user=USERNAME,
password=PASSWORD,
keepalive=10000
)
client.set_callback(on_message)
client.connect()
print("MQTT Connected!")
# Subscribe to shared attributes + RPC
client.subscribe(SUB_TOPIC)
client.subscribe(RPC_TOPIC)
print(f"Subscribed to {SUB_TOPIC} and RPC")
# ============= Main Loop =============
print("\nStarting Praktikum 14 - Publish DHT + Subscribe LED + Force Publish RPC")
print("="*70)
last_publish = time.time()
publish_interval = 2 # seconds
while True:
try:
# ===== 1. CHECK INCOMING MESSAGES =====
client.check_msg()
# ===== 2. PUBLISH DHT DATA (periodically) =====
current_time = time.time()
if current_time - last_publish >= publish_interval:
try:
dht_sensor.measure()
temp = dht_sensor.temperature()
hum = dht_sensor.humidity()
data = {
"temperature": temp,
"humidity": hum
}
data_json = ujson.dumps(data)
client.publish(PUB_TOPIC, data_json)
print(f"[PUB] Temp: {temp:.2f}°C, Humidity: {hum:.2f}% - Published (periodic)")
last_publish = current_time
except Exception as e:
print(f"[ERROR] DHT read/publish: {e}")
time.sleep(0.1)
except KeyboardInterrupt:
print("\n[BOOT] User interrupted")
break
except Exception as e:
print(f"[ERROR] Main loop: {e}")
time.sleep(1)
print("[BOOT] Program ended")