from machine import Pin, PWM, I2C
from umqtt.simple import MQTTClient
import ujson
import network
import utime as time
import dht
from led_pwm import LED
import ssd1306
from device_traits import *
# Device Setup
DEVICE_ID = "wokwi001"
# WiFi Setup
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# MQTT Setup
MQTT_BROKER = "7706f65c1d3b40428155631e41b1cd5f.s1.eu.hivemq.cloud"
MQTT_CLIENT = DEVICE_ID
MQTT_TELEMETRY_TOPIC = 'iot/device/{0}/telemetry'.format(DEVICE_ID)
MQTT_CONTROL_TOPIC = 'iot/device/{0}/control'.format(DEVICE_ID)
MQTT_MASTER_TELEMETRY_TOPIC = 'iot/telemetry'.format(DEVICE_ID)
MQTT_MASTER_CONTROL_TOPIC = 'iot/control'.format(DEVICE_ID)
# DHT Sensor Setup
DHT_PIN = Pin(15)
dht_sensor = dht.DHT22(DHT_PIN)
# LED/LAMP Setup
RED_LED = LED(12)
BLUE_LED = LED(13)
FLASH_LED = Pin(2, Pin.OUT)
# OLED Pins
i2c = I2C(0, scl=Pin(22), sda=Pin(21))
oled_width = 128
oled_height = 64
oled = ssd1306.SSD1306_I2C(oled_width, oled_height, i2c)
# Turn On LED
RED_LED.on()
BLUE_LED.on()
# Methods
def did_recieve_callback(topic, message):
print('\n\nData Recieved! \ntopic = {0}, message = {1}'.format(topic, message))
if topic == MQTT_CONTROL_TOPIC.encode():
#Get the command message from json command.
command_message = ujson.loads(message.decode())["command"]
if command_message == "lamp/red/on":
RED_LED.on()
send_led_status()
elif command_message == "lamp/red/off":
RED_LED.off()
send_led_status()
elif command_message == "lamp/blue/on":
BLUE_LED.on()
send_led_status()
elif command_message == "lamp/blue/off":
BLUE_LED.off()
send_led_status()
elif command_message == "lamp/on":
RED_LED.on()
BLUE_LED.on()
send_led_status()
elif command_message == "lamp/off":
RED_LED.off()
BLUE_LED.off()
send_led_status()
elif command_message == "status":
mqtt_client_publish(MQTT_TELEMETRY_TOPIC, get_sensor_json_data())
send_led_status()
elif len(command_message.split('/')) == 4 and command_message.split('/')[2] == "brightness":
# "lamp/red/brightness/34"
brightness_commands = command_message.split('/')
brightness_value = float(brightness_commands[3])
if(brightness_commands[1] == "red"):
RED_LED.set_brightness(brightness_value)
if(brightness_commands[1] == "blue"):
BLUE_LED.set_brightness(brightness_value)
send_led_status()
else:
return
# MQTT_MASTER_CONTROL_TOPIC is used for Google Home Integration.
if topic == MQTT_MASTER_CONTROL_TOPIC.encode():
#Get the command message from json command.
received_message = ujson.loads(message.decode())
should_acknowledge = {}
command_data = {}
if 'type' in received_message:
if received_message['type'] == "command":
if "data" in received_message and DEVICE_ID in received_message["data"]:
print(received_message)
command_data = received_message["data"]
should_acknowledge = received_message["acknowledge"]
process_commands(command_data[DEVICE_ID], should_acknowledge)
else:
print("Message is not for this device")
elif received_message['type'] == "ping" and received_message['id'] == DEVICE_ID:
print("PING Message Received")
else:
print("Message not of type COMMAND or PING")
else:
print("Message type invalid, do not process.")
def mqtt_connect():
print("Connecting to MQTT broker ...", end="")
mqtt_client = MQTTClient(MQTT_CLIENT, MQTT_BROKER, user="iotsmarthome", password="Smarthome1", ssl=True, ssl_params={'server_hostname':MQTT_BROKER})
mqtt_client.set_callback(did_recieve_callback)
mqtt_client.connect()
print("Connected.")
mqtt_client.subscribe(MQTT_CONTROL_TOPIC)
# subscribe to master topics for Google Home Control
mqtt_client.subscribe(MQTT_MASTER_CONTROL_TOPIC)
return mqtt_client
def create_control_json_data(command, command_id):
#import ujson
data = ujson.dumps({
"device_id": DEVICE_ID,
"command_id": command_id,
"command": command
})
return data
def get_sensor_json_data():
data = ujson.dumps({
"device_id": DEVICE_ID,
"temp": dht_sensor.temperature(),
"humidity": dht_sensor.humidity(),
"type": "sensor"
})
return data
def get_all_parts_settings():
data = {
"device_id": DEVICE_ID,
"temp": dht_sensor.temperature(),
"humidity": dht_sensor.humidity(),
"humidity_ison": humiditySetpointPercentOn,
"set_temp": thermostatTemperatureSetpoint,
"set_humidity": humiditySetpointPercent,
"red_led": True if RED_LED.value() == 1 else False,
"blue_led": True if BLUE_LED.value() == 1 else False,
"red_led_brightness": RED_LED.get_brightness(),
"blue_led_brightness": BLUE_LED.get_brightness()
}
global parts_settings
parts_settings = data
return data
def create_master_json_data():
data = ujson.dumps(get_all_parts_settings())
return data
def mqtt_client_publish(topic, data):
try:
print("\nUpdating MQTT Broker...")
mqtt_client.publish(topic, data)
print(data)
except:
print("MQTT client may not be initialized.")
def send_led_status():
data = ujson.dumps({
"device_id": DEVICE_ID,
"red_led": "ON" if RED_LED.value() == 1 else "OFF",
"blue_led": "ON" if BLUE_LED.value() == 1 else "OFF",
"red_led_brightness": RED_LED.get_brightness(),
"blue_led_brightness": BLUE_LED.get_brightness(),
"type": "lamp"
})
mqtt_client_publish(MQTT_TELEMETRY_TOPIC, data)
def get_part_by_name(name):
if name == "red_led":
return RED_LED
if name == "blue_led":
return BLUE_LED
def send_ack_data(data):
mqtt_client_publish(MQTT_MASTER_TELEMETRY_TOPIC, data)
def process_commands(commands, acknowledge):
if acknowledge:
data = ujson.dumps({
"gatewayId": DEVICE_ID,
"data": commands,
})
send_ack_data( data)
for command in commands:
part = command["deviceId"].split("::")[1]
command_actions = command["commands"]
if part == "red_led" or part == "blue_led":
if 'on' in command_actions:
get_part_by_name(part).set_value(0 if command_actions["on"] == False else 1)
if 'brightness' in command_actions:
get_part_by_name(part).set_brightness(command_actions["brightness"])
if part == "sensor_temp" and "thermostatTemperatureSetpoint" in command_actions:
global thermostatTemperatureSetpoint
thermostatTemperatureSetpoint = command_actions["thermostatTemperatureSetpoint"]
print("\n\nTEMPERATURE SENSOR = " + str(command_actions["thermostatTemperatureSetpoint"]) )
if part == "sensor_humidity":
if 'on' in command_actions:
global humiditySetpointPercentOn
humiditySetpointPercentOn = command_actions["on"]
if "thermostatTemperatureSetpoint" in command_actions:
global humiditySetpointPercent
humiditySetpointPercent = command_actions["thermostatTemperatureSetpoint"]
print("\n\nHUMIDITY SENSOR = " + str(command_actions["thermostatTemperatureSetpoint"]))
oled_print()
def oled_print():
oled.fill(0)
oled.show()
oled.text('CUR TMP: '+ str(dht_sensor.temperature()), 0, 0)
oled.text('SET TMP: '+ str(thermostatTemperatureSetpoint), 0, 10)
oled.text('CUR HUM: '+ str(dht_sensor.humidity()), 0, 20)
oled.text('SET HUM: '+ str(humiditySetpointPercent if humiditySetpointPercentOn == True else "OFF"), 0, 30)
oled.text('RED: '+ str(RED_LED.get_brightness()) + '%', 0, 40)
oled.text('BLUE: ' + str(BLUE_LED.get_brightness()) + '%', 0, 50)
oled.show()
def mqtt_ping():
data = ujson.dumps({
"device_id": DEVICE_ID,
"id": DEVICE_ID,
"type": "ping",
"devices": []
})
mqtt_client_publish(MQTT_MASTER_CONTROL_TOPIC, data)
# Application Logic
# Connect to WiFi
wifi_client = network.WLAN(network.STA_IF)
wifi_client.active(True)
print("Connecting device to WiFi")
wifi_client.connect(WIFI_SSID, WIFI_PASSWORD)
# Wait until WiFi is Connected
while not wifi_client.isconnected():
print("Connecting")
time.sleep(0.1)
print("WiFi Connected!")
print(wifi_client.ifconfig())
# Connect to MQTT
mqtt_client = mqtt_connect()
# RED_LED.off()
# BLUE_LED.off()
mqtt_client_publish(MQTT_CONTROL_TOPIC, create_control_json_data('lamp/off', 'DEVICE-RESET-00'))
# read dht_sensor and register device.
dht_sensor.measure()
time.sleep(0.2)
# Set default settings
parts_settings = get_all_parts_settings()
register_settings = ujson.dumps(
get_settings(DEVICE_ID, "register_settings", parts_settings)
)
mqtt_client_publish(MQTT_MASTER_CONTROL_TOPIC, register_settings)
master_data_old = ""
oled_print()
pause_time = 1500000
start_time = time.ticks_ms()
sleep_time = pause_time
while True:
mqtt_client.check_msg()
print(". ", end="")
try:
FLASH_LED.on()
dht_sensor.measure()
time.sleep(0.2)
FLASH_LED.off()
except:
pass
master_data_new = create_master_json_data()
if master_data_new != master_data_old:
mqtt_client_publish(MQTT_TELEMETRY_TOPIC, get_sensor_json_data())
send_led_status()
master_data_old = master_data_new
oled_print() #print out to OLED
# update device settings in cloud.
all_settings = ujson.dumps(
get_settings(DEVICE_ID, "update_settings", get_all_parts_settings())
)
mqtt_client_publish(MQTT_MASTER_CONTROL_TOPIC, all_settings)
time.sleep(0.1)
sleep_time = sleep_time - (time.ticks_ms() - start_time)
if sleep_time < 1:
start_time = time.ticks_ms()
sleep_time = pause_time
mqtt_ping()