import machine
import dht
import time
from umqtt.simple import MQTTClient
# ==========================================
# CẤU HÌNH KẾT NỐI WIFI & MQTT BROKER
# ==========================================
# Wokwi cung cấp mạng WiFi giả lập miễn phí có tên là "Wokwi-GUEST"
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
MQTT_BROKER = "broker.hivemq.com"
MQTT_CLIENT_ID = "esp32_kitchen_dashboard"
# ==========================================
# KHỞI TẠO CẤU HÌNH CHÂN PHẦN CỨNG (GPIO)
# ==========================================
# Cảm biến môi trường
dht_sensor = dht.DHT22(machine.Pin(15))
# 3 Rơ-le điều khiển thiết bị gia dụng (Mặc định TẮT)
relay_dryer = machine.Pin(12, machine.Pin.OUT, value=0)
relay_wm = machine.Pin(13, machine.Pin.OUT, value=0)
relay_dw = machine.Pin(14, machine.Pin.OUT, value=0)
# 3 Đèn LED báo trạng thái hệ thống chiếu sáng (Mặc định TẮT)
led_poppy = machine.Pin(21, machine.Pin.OUT, value=0)
led_cooker = machine.Pin(22, machine.Pin.OUT, value=0)
led_sink = machine.Pin(23, machine.Pin.OUT, value=0)
# Ánh xạ Topic MQTT tới các chân GPIO tương ứng để xử lý tự động
hardware_map = {
"home/appliances/dryer/cmnd": {"pin": relay_dryer, "stat_topic": "home/appliances/dryer/stat", "power_topic": "home/appliances/dryer/power", "is_appliance": True, "base_power": 1800},
"home/appliances/washing_machine/cmnd": {"pin": relay_wm, "stat_topic": "home/appliances/washing_machine/stat", "power_topic": "home/appliances/washing_machine/power", "is_appliance": True, "base_power": 1200},
"home/appliances/dishwasher/cmnd": {"pin": relay_dw, "stat_topic": "home/appliances/dishwasher/stat", "power_topic": "home/appliances/dishwasher/power", "is_appliance": True, "base_power": 1500},
"home/kitchen/poppy_light/cmnd": {"pin": led_poppy, "stat_topic": "home/kitchen/poppy_light/stat", "is_appliance": False},
"home/kitchen/cooker_light/cmnd": {"pin": led_cooker, "stat_topic": "home/kitchen/cooker_light/stat", "is_appliance": False},
"home/kitchen/sink_light/cmnd": {"pin": led_sink, "stat_topic": "home/kitchen/sink_light/stat", "is_appliance": False}
}
# ==========================================
# HÀM KẾT NỐI WIFI GIẢ LẬP
# ==========================================
def connect_wifi():
import network
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print("🌐 Đang kết nối WiFi...")
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
while not wlan.isconnected():
time.sleep_ms(500)
print("📶 Đã kết nối WiFi thành công! IP:", wlan.ifconfig()[0])
# ==========================================
# HÀM XỬ LÝ LỆNH TỪ HOME ASSISTANT (CALLBACK)
# ==========================================
def mqtt_callback(topic, msg):
topic_str = topic.decode("utf-8")
payload_str = msg.decode("utf-8")
print("📥 Nhận lệnh từ HA -> Topic:", topic_str, " | Payload:", payload_str)
if topic_str in hardware_map:
device = hardware_map[topic_str]
# Cập nhật trạng thái linh kiện phần cứng (ON = 1, OFF = 0)
if payload_str == "ON":
device["pin"].value(1)
else:
device["pin"].value(0)
# Phản hồi ngược lại kênh trạng thái (stat) để công tắc trên HA thay đổi theo
client.publish(device["stat_topic"], payload_str)
# Nếu là thiết bị gia dụng và bị TẮT, ép xung công suất tiêu thụ về 0W ngay lập tức
if device["is_appliance"] and payload_str == "OFF":
client.publish(device["power_topic"], "0")
# ==========================================
# CHƯƠNG TRÌNH CHÍNH (MAIN LOOP)
# ==========================================
connect_wifi()
# Khởi tạo và kết nối MQTT Client
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER)
client.set_callback(mqtt_callback)
client.connect()
print("🔌 Đã kết nối thành công tới MQTT Broker!")
# Đăng ký nhận toàn bộ các lệnh điều khiển (cmnd)
for topic in hardware_map.keys():
client.subscribe(topic)
print("📡 Đã subscribe topic:", topic)
last_sensor_update = 0
sensor_interval = 5000 # Cập nhật trạng thái cảm biến mỗi 5 giây (5000ms)
try:
while True:
# Kiểm tra xem có lệnh mới nào từ Home Assistant gửi xuống không
client.check_msg()
current_time = time.ticks_ms()
if time.ticks_diff(current_time, last_sensor_update) > sensor_interval:
# 1. Đọc dữ liệu từ cảm biến DHT22 thực tế trên Wokwi
try:
dht_sensor.measure()
temp = dht_sensor.temperature()
client.publish("home/kitchen/temperature", str(temp))
print("🌡️ Đã cập nhật nhiệt độ nhà bếp:", temp, "°C")
except Exception as e:
print("❌ Lỗi không đọc được cảm biến DHT22")
# 2. Giả lập phần trăm Pin mặt trời nhảy ngẫu nhiên quanh ngưỡng 71%
import random
fake_soc = random.randint(70, 75)
client.publish("home/energy/system/soc", str(fake_soc))
# 3. Tính toán công suất tiêu thụ (Power) dựa trên việc Relay đang bật hay tắt
for topic, device in hardware_map.items():
if device["is_appliance"]:
if device["pin"].value() == 1:
# Nếu thiết bị đang BẬT, tạo công suất ngẫu nhiên biến thiên nhẹ quanh công suất nền
simulated_power = device["base_power"] + random.randint(-50, 150)
client.publish(device["power_topic"], str(simulated_power))
else:
client.publish(device["power_topic"], "0")
last_sensor_update = current_time
time.sleep_ms(100) # Nghỉ ngắn để ổn định mạch điều khiển
except KeyboardInterrupt:
print("🛑 Đã dừng chương trình mô phỏng phần cứng.")