import network
import time
import math
import ujson
from machine import Pin, ADC
from umqtt.simple import MQTTClient
# Inisialisasi LED
led = Pin(15, Pin.OUT)
# Konfigurasi WiFi
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# Konfigurasi MQTT
MQTT_BROKER = "broker.emqx.io" # Broker MQTT publik
PUBLISH_TOPIC = "/UNI068/Rayhan/data_sensor" # Topik untuk publikasi sensor
SUBSCRIBE_TOPIC = "/UNI068/Rayhan/aktuasi_led" # Topik untuk kontrol LED
CLIENT_ID = "ESP32_" + str(time.ticks_ms()) # Client ID unik untuk setiap perangkat
# Parameter sensor cahaya
GAMMA = 0.7
RL10 = 50
sensor = ADC(Pin(34))
sensor.atten(ADC.ATTN_11DB)
# Fungsi untuk koneksi ke WiFi
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
print("Connecting to WiFi...", end="")
while not wlan.isconnected():
print(".", end="")
time.sleep(0.5)
print("\nConnected to WiFi:", wlan.ifconfig())
# Fungsi untuk membaca sensor cahaya
def read_light_sensor():
sensor_value = int(sensor.read() / 4)
voltage = sensor_value / 1024.0 * 5.0
resistance = 2000.0 * voltage / (1 - voltage / 5.0)
lux = math.pow(RL10 * 1e3 * math.pow(10, GAMMA) / resistance, 1 / GAMMA)
return lux
# Callback untuk menerima pesan MQTT
def on_message(topic, msg):
print(f"Message received on topic {topic.decode()}: {msg.decode()}")
try:
# Parse pesan JSON
data = ujson.loads(msg)
if data.get("msg") == "on":
led.on()
print("LED is ON")
elif data.get("msg") == "off":
led.off()
print("LED is OFF")
except Exception as e:
print("Error processing message:", e)
# Fungsi utama
def main():
connect_wifi()
# Koneksi ke MQTT broker
client = MQTTClient(CLIENT_ID, MQTT_BROKER, keepalive=60)
client.set_callback(on_message)
client.connect()
print("Connected to MQTT broker")
# Subscribe ke topik kontrol LED
client.subscribe(SUBSCRIBE_TOPIC)
print(f"Subscribed to topic: {SUBSCRIBE_TOPIC}")
try:
while True:
try:
# Periksa pesan MQTT (untuk kontrol LED)
client.check_msg()
# Baca sensor cahaya dan publikasikan ke MQTT
lux = read_light_sensor()
message = ujson.dumps({
"lux": lux,
"isLight": lux >= 50
})
client.publish(PUBLISH_TOPIC, message)
print(f"Published to {PUBLISH_TOPIC}: {message}")
time.sleep(2)
except OSError as e:
print(f"MQTT error: {e}")
print("Reconnecting to MQTT broker...")
client.connect()
except KeyboardInterrupt:
print("Disconnected")
client.disconnect()
# Jalankan program
if __name__ == "__main__":
main()