import network
import time
import json
import ntptime
import dht
from machine import Pin, I2C, ADC
from i2c_lcd import I2cLcd
from umqtt.simple import MQTTClient
# Ustawienia sieci i serwera MQTT
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
DEVICE_ID = "ALARM_3498572398457_HA"
BASE_TOPIC = f"{DEVICE_ID}/home"
MQTT_BROKER = "broker.hivemq.com"
# Ustawienia czasu lokalnego
UTC_OFFSET = 1 * 3600
# Lista użytkowników i ich PIN-y
USERS = {
"1234": "Robert",
"4321": "Adam",
"1111": "Aleksandra",
"0000": "Patryk"
}
# Inicjalizacja wyświetlacza LCD
i2c = I2C(0, sda=Pin(21), scl=Pin(22), freq=400000)
lcd = I2cLcd(i2c, 0x27, 2, 16)
# Konfiguracja pinów klawiatury
rows = [Pin(18, Pin.OUT), Pin(5, Pin.OUT), Pin(17, Pin.OUT), Pin(16, Pin.OUT)]
cols = [Pin(4, Pin.IN, Pin.PULL_UP), Pin(0, Pin.IN, Pin.PULL_UP), Pin(2, Pin.IN, Pin.PULL_UP), Pin(15, Pin.IN, Pin.PULL_UP)]
# Inicjalizacja czujników
dht_sensor = dht.DHT22(Pin(32))
gas_sensor = ADC(Pin(34))
pir_sensor = Pin(27, Pin.IN)
door_sensor = Pin(25, Pin.IN, Pin.PULL_UP)
# Konfiguracja wyjść i świateł
buzzer = Pin(14, Pin.OUT)
led_red = Pin(13, Pin.OUT)
led_green = Pin(12, Pin.OUT)
light_office = Pin(19, Pin.OUT)
light_bathroom = Pin(23, Pin.OUT)
light_kitchen = Pin(26, Pin.OUT)
# Definicja stanów alarmu
ST_DISARMED = "DISARMED"
ST_ARMING = "ARMING"
ST_ARMED_AWAY = "ARMED_AWAY"
ST_ARMED_HOME = "ARMED_HOME"
ST_ALARM = "ALARM"
# Zmienne globalne systemu
current_state = ST_DISARMED
alarm_mode = "AWAY"
current_user = "Brak"
pin_buffer = ""
last_update_time = 0
mqtt_client = None
# Funkcja łączenia z WiFi
def connect_wifi():
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect(WIFI_SSID, WIFI_PASSWORD)
lcd.clear()
lcd.putstr("WiFi...")
while not sta_if.isconnected():
time.sleep(0.5)
print("WiFi OK")
# Funkcja synchronizacji czasu z serwera NTP
def sync_time():
try:
ntptime.settime()
print("Czas OK")
except:
print("Błąd czasu")
# Funkcja pobierania aktualnej godziny w formacie YYYY-MM-DD HH:MM:SS
def get_time():
t = time.localtime(time.time() + UTC_OFFSET)
return "{:04d}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}".format(t[0], t[1], t[2], t[3], t[4], t[5])
# Funkcja obsługi wiadomości MQTT
def on_message(topic, msg):
global alarm_mode
t = topic.decode()
m = msg.decode()
print("-" * 40)
print("ODEBRANO WIADOMOŚĆ MQTT")
print("Topic :", t)
print("Payload:", m)
print("-" * 40)
# Obsługa komend sterowania światłami
if "/light/" in t:
val = 1 if m.strip().upper() == "ON" else 0
if "office" in t:
light_office.value(val)
elif "kitchen" in t:
light_kitchen.value(val)
elif "bathroom" in t:
light_bathroom.value(val)
send_status_to_ha("LightChange")
return
# Obsługa komend alarmu
if t.endswith("/alarm/set"):
parts = m.split(',')
action = parts[0].strip().lower()
pin = parts[1].strip() if len(parts) > 1 else ""
if pin not in USERS:
print("ZŁY PIN")
send_status_to_ha("WrongPIN", pin)
return
user = USERS[pin]
if action == "arm_away":
alarm_mode = "AWAY"
change_state(ST_ARMING, user)
elif action == "arm_home":
alarm_mode = "HOME"
change_state(ST_ARMING, user)
elif action == "disarm":
change_state(ST_DISARMED, user)
else:
print("Nieznana komenda alarmu:", action)
# Funkcja łączenia z brokerem MQTT
def connect_to_broker():
global mqtt_client
user = "ha_mqtt_user"
password = "Twoje_Haslo_MQTT"
mqtt_client = MQTTClient(DEVICE_ID, MQTT_BROKER, 1883, user=user, password=password)
mqtt_client.set_callback(on_message)
try:
mqtt_client.connect()
mqtt_client.subscribe(f"{BASE_TOPIC}/#")
print("MQTT OK")
except:
print("MQTT Błąd")
# Funkcja wysyłania statusu do Home Assistant
def send_status_to_ha(reason="Update", info=""):
try:
dht_sensor.measure()
temp, hum = dht_sensor.temperature(), dht_sensor.humidity()
except:
temp, hum = 0, 0
status_packet = {
"time": get_time(),
"reason": reason,
"state": current_state,
"user": current_user,
"sensors": {"temp": temp, "hum": hum, "gas": gas_sensor.read()},
"lights": {
"office": "ON" if light_office.value() else "OFF",
"bathroom": "ON" if light_bathroom.value() else "OFF",
"kitchen": "ON" if light_kitchen.value() else "OFF"
},
"info": info
}
message = json.dumps(status_packet)
if mqtt_client:
try: mqtt_client.publish(f"{BASE_TOPIC}/status", message)
except: pass
# Funkcja wysyłania zdarzeń do Home Assistant
def send_event_to_ha(action, user):
event = {
"action": action,
"user": user,
"time": get_time()
}
try:
mqtt_client.publish(f"{BASE_TOPIC}/event", json.dumps(event))
except:
pass
# Funkcja zmiany stanu alarmu i aktualizacji urządzeń
def change_state(new_state, user="System"):
global current_state, current_user, pin_buffer
if new_state == current_state:
return
current_state = new_state
current_user = user
pin_buffer = ""
send_status_to_ha("StateChange")
if new_state == ST_DISARMED:
send_event_to_ha("DISARM", user)
elif new_state == ST_ARMED_AWAY:
send_event_to_ha("ARM_AWAY", user)
elif new_state == ST_ARMED_HOME:
send_event_to_ha("ARM_HOME", user)
elif new_state == ST_ALARM:
send_event_to_ha("ALARM_TRIGGERED", user)
if current_state == ST_DISARMED:
led_green.on(); led_red.off(); buzzer.off()
lcd.clear(); lcd.putstr("GOTOWY: " + alarm_mode)
elif current_state == ST_ALARM:
led_green.off()
lcd.clear(); lcd.putstr("!!! ALARM !!!")
# Funkcja odczytu klawiatury
def read_keypad():
keys = [['1','2','3','A'], ['4','5','6','B'], ['7','8','9','C'], ['*','0','#','D']]
for r in range(4):
rows[r].value(0)
for c in range(4):
if cols[c].value() == 0:
key = keys[r][c]
while cols[c].value() == 0: pass
return key
rows[r].value(1)
return None
# Inicjalizacja systemu
connect_wifi()
sync_time()
connect_to_broker()
change_state(ST_DISARMED)
# Główna pętla systemu
while True:
# Sprawdzanie wiadomości MQTT
if mqtt_client:
try: mqtt_client.check_msg()
except: pass
# Wysyłanie raportu okresowego co 30 sekund
if time.time() - last_update_time > 30:
send_status_to_ha("Periodic")
last_update_time = time.time()
# Sprawdzanie czujnika gazu
if gas_sensor.read() > 0 and current_state != ST_ALARM:
change_state(ST_ALARM, "Sensor Gazu")
# Obsługa klawiatury
key = read_keypad()
if key:
if key.isdigit():
pin_buffer += key
lcd.move_to(0, 1); lcd.putstr("PIN: " + "*" * len(pin_buffer))
elif key == 'A':
alarm_mode = "AWAY"
lcd.move_to(0, 1); lcd.putstr("Mode: AWAY ")
elif key == 'B':
alarm_mode = "HOME"
lcd.move_to(0, 1); lcd.putstr("Mode: HOME ")
elif key == '#':
if pin_buffer in USERS:
user_name = USERS[pin_buffer]
if current_state == ST_DISARMED: change_state(ST_ARMING, user_name)
else: change_state(ST_DISARMED, user_name)
else:
lcd.clear(); lcd.putstr("BLEDNY PIN")
send_status_to_ha("WrongPin", pin_buffer)
time.sleep(1); change_state(current_state)
pin_buffer = ""
elif key == '*':
pin_buffer = ""; change_state(current_state)
# Odliczanie czasu do uzbrojenia alarmu
if current_state == ST_ARMING:
for i in range(5, 0, -1):
lcd.clear(); lcd.putstr(f"UZBRAJANIE {alarm_mode}\nWyjdz: {i}s")
time.sleep(1)
if alarm_mode == "AWAY": change_state(ST_ARMED_AWAY, current_user)
else: change_state(ST_ARMED_HOME, current_user)
# Sprawdzanie włamania w trybie ARMED_AWAY i ARMED_HOME
if current_state == ST_ARMED_AWAY:
if pir_sensor.value() == 1 or door_sensor.value() == 1:
change_state(ST_ALARM, "Intruz")
elif current_state == ST_ARMED_HOME:
if door_sensor.value() == 1:
change_state(ST_ALARM, "Drzwi")
# Miganie buzzerem i czerwoną diodą w stanie ALARM
if current_state == ST_ALARM:
buzzer.value(not buzzer.value())
led_red.value(not led_red.value())
time.sleep(0.1)