KODE_KELOMPOK = "UNI449"
NAMA = "ibka"
MQTT_CLIENT_ID = "UNI449-ENIGMATICS-IBKA"
MQTT_BROKER = "broker.emqx.io"
MQTT_USER = ""
MQTT_PASSWORD = ""
MQTT_TOPIC = f"{KODE_KELOMPOK}/{NAMA}/aktuasi_led"
TOPIC_TO_PUBLISH = f"{KODE_KELOMPOK}/{NAMA}/data_sensor"
LED_PIN = 32
DHT_PIN_1 = 4
DHT_PIN_2 = 16
led = Pin(LED_PIN, Pin.OUT)
sensor_1 = dht.DHT22(Pin(DHT_PIN_1))
sensor_2 = dht.DHT22(Pin(DHT_PIN_2))
def manipulate_light(state: bool) -> None:
"""Controls the LED state and sends a status message"""
led.on() if state else led.off()
state_msg = "LED Turned On!" if state else "LED Turned Off!"
send_data(json.dumps({
"message": state_msg
}))
def parse_led_data(data: str) -> bool:
"""Parses and validates the 'led' data"""
data = data.lower()
if data not in {"true", "false"}:
raise ValueError("'led' must be 'true' or 'false'")
return data == "true"
def on_data_received(topic, message: bytes) -> None:
"""Callback to process incoming message"""
try:
msg_dict = json.loads(str(message, 'UTF-8'))
led_data = msg_dict.get('led')
if led_data is not None:
led_state = parse_led_data(str(led_data))
manipulate_light(led_state)
except (ValueError, KeyError):
print(f"Error while processing message: {e}")
def send_data(msg="") -> None:
"""Send a message to the MQTT Topic"""
print(f"Reporting to {TOPIC_TO_PUBLISH}: {msg}")
client.publish(TOPIC_TO_PUBLISH, msg)
# def measure_and_publish() -> None:
# """Measure the current temperature from the `dht22` sensor and publish it"""
# prev_msg = ""
# while True:
# sensor_1.measure()
# sensor_2.measure()
# message = json.dumps({
# "temp_1": sensor_1.temperature(),
# "humidity_1": sensor_1.humidity(),
# "temp_2": sensor_2.temperature(),
# "humidity_2": sensor_2.humidity(),
# })
# if message != prev_msg:
# print("Weather conditions changed!")
# send_data(message)
# prev_msg = message
# else:
# print("No Change on weather condition")
# time.sleep(1)
print("Connecting to MQTT server...")
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, user=MQTT_USER, password=MQTT_PASSWORD)
client.connect()
client.set_callback(on_data_received)
print("Connected!")
client.subscribe(MQTT_TOPIC)
print(f"Subscribed to topic: {MQTT_TOPIC}")
while True:
client.check_msg()
# measure_and_publish()