from Keypad import Keypad
from Alarm import Alarm
from Door import Door
from mqtt import MQTT
from Display import Display
from machine import PWM, Pin, I2C
import time, _thread, ssd1306, network, ujson,json
#parametri MQTT
MQTT_CLIENT_ID = "gruppo04"
MQTT_BROKER = "test.mosquitto.org"
MQTT_USER = ""
MQTT_PASSWORD = ""
#topic su cui pubblico
MQTT_NOTIFICATION = "FortiVault/Alarm/Notification"
MQTT_ALARM_STATE = "FortiVault/Alarm/State"
MQTT_DOOR_STATE = "FortiVault/Door/State"
MQTT_DAILY_ACCESS = "FortiVault/Door/Access"
MQTT_LIGHTS_STATE = "FortiVault/Lights/State"
MQTT_SUBSCRIBE_TOPIC = {
"MQTT_SYSTEM_RESET": b'FortiVault/System/State',
"MQTT_LIGHTS_STATE": b'FortiVault/Lights/ManualState'
}
#dizionario dei valori che indicano lo stato dei sensori e degli attuatori da pubblicare sui rispettivi topic
MQTT_PUBLISH_VALUE = {
MQTT_NOTIFICATION : None,
MQTT_ALARM_STATE : None,
MQTT_DOOR_STATE : None,
MQTT_DAILY_ACCESS : None,
MQTT_LIGHTS_STATE : None
}
#dizionario dei valori attuali rilevati dai sensori
MQTT_ACTUAL_VALUE = {
MQTT_DOOR_STATE: None,
MQTT_ALARM_STATE : None,
MQTT_NOTIFICATION: None,
MQTT_LIGHTS_STATE : None,
MQTT_DAILY_ACCESS : None
}
SYSTEM_PARAMS = {
"BOOT": True,
"RESTART_SYSTEM": False,
"MANUAL_LIGHTS_CONTROL": False
}
#parametri Display
WIDTH = 128
HEIGHT = 64
#pin display per keypad
SCL_PIN_KP = 16
SDA_PIN_KP = 21
SERIAL_OLED_KP = 0
#Pin per il buzzer
PIN_BUZZER = 25
#Pin per i led dell'allarme
PIN_BUZZ_LEDS = [26,27]
#Pin per le righe e le colonne del tastierino
ROWPINS = [17,5,18,19]
COLPINS = [2,4,13,12]
# Codice di apertura della porta
OPENING_PIN = "1234"
#pin per la porta
PIN_DOOR = 32
#pin lampadina
#PIN_LUX = 33
#definisco i pin del display per il monitoraggio dello stato
SCL_PIN_OLED_STATE = 15
SDA_PIN_OLED_STATE = 0
SERIAL_OLED_STATE = 1
#pin per il sensore di movimento
#SENSOR_PIN = 14
#Creazione istanze oggetti
oledKp = Display(SERIAL_OLED_KP, WIDTH, HEIGHT, SCL_PIN_KP, SDA_PIN_KP)
keypad = Keypad(ROWPINS, COLPINS, OPENING_PIN, oledKp)
leds = [PWM(Pin(led,Pin.OUT),duty = 0) for led in PIN_BUZZ_LEDS]
alarm = Alarm(leds,PIN_BUZZER)
door = Door(PIN_DOOR)
door.set_angle(0)
#sensor = Sensor_HW(SENSOR_PIN)
#sensor.setup_interrupt()
oled_state = Display(SERIAL_OLED_STATE, WIDTH, HEIGHT, SCL_PIN_OLED_STATE, SDA_PIN_OLED_STATE)
#lux = Pin(PIN_LUX,Pin.OUT)
#lux.off()
client = MQTT(MQTT_CLIENT_ID,MQTT_BROKER)
#Da rimuovere poi
RESET_BTN_PIN = 14
LAST = 0
reset_btn = Pin(RESET_BTN_PIN, Pin.IN, Pin.PULL_UP)
def hard_reset(btn_reset):
global LAST,SYSTEM_PARAMS
current = time.ticks_ms()
delta = time.ticks_diff(current, LAST)
if delta < 200:
return
last = current
SYSTEM_PARAMS["RESTART_SYSTEM"] = True
reset_btn.irq(trigger=Pin.IRQ_FALLING, handler=hard_reset)
#fino a qui
def wifi_connect():
try:
x = 0
oled_state.text_fill("Connecting",0,10)
oled_state.text_no_fill("to Wifi",0,20)
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('Wokwi-GUEST','')
while not sta_if.isconnected():
oled_state.text_no_fill(".", 53+x ,20)
time.sleep(0.1)
x = x + 6
oled_state.text_no_fill("Connected",0,30)
except OSError as e:
oled_state.text_no_fill("Not Connected!",0,30)
def mqtt_connection():
x = 0
oled_state.text_fill("Connecting to", 0, 10)
oled_state.text_no_fill("MQTT server", 0, 20)
client.connect(MQTT_SUBSCRIBE_TOPIC, subCallback)
while x < 28:
oled_state.text_no_fill(".", x+85, 20)
time.sleep(0.1)
x = x + 6
if client.get_conn_state():
oled_state.text_no_fill("Connected!", 0, 30)
oled_state.fill(0)
# Avvio del thread secondario per la lettura dell'input
_thread.start_new_thread(keypad.read_input, ())
else:
oled_state.text_no_fill("Not Connected!", 0, 30)
oled_state.fill(0)
raise OSException
#funzione di stampa dei parametri fissi dello stato del sistema
def print_state():
oled_state.text_fill("SYSTEM STATE: ", 0, 1)
oled_state.text_no_fill("Porta:",1,20)
oled_state.text_no_fill("Luce:",1,30)
oled_state.text_no_fill("N. accessi:",1,40)
oled_state.text_no_fill("A. state:",1,51)
oled_state.circle_no_fill(112,54,4,1)
#funzione di stampa dei parametri dello stato del sistema che variano durante l'esecuzione del programma
def print_parameters():
for k,v in MQTT_PUBLISH_VALUE.items():
if v != None:
if k == MQTT_DOOR_STATE:
oled_state.fill_rect(48,20,127,10)
oled_state.text_no_fill(str(json.loads(MQTT_PUBLISH_VALUE[MQTT_DOOR_STATE])["porta"]),48,20)
"""elif k == MQTT_LIGHTS_STATE:
oled_state.fill_rect(50,30,127,10)
oled_state.text_no_fill(str(json.loads(MQTT_PUBLISH_VALUE[MQTT_LIGHTS_STATE])["lux"]),50,30)
"""
elif k == MQTT_DAILY_ACCESS:
oled_state.fill_rect(90,40,127,10)
oled_state.text_no_fill(str(json.loads(MQTT_PUBLISH_VALUE[MQTT_DAILY_ACCESS])["n_accessi"]),90,40)
elif k == MQTT_ALARM_STATE:
oled_state.fill_rect(72,51,35,10)
oled_state.text_no_fill(str(json.loads(MQTT_PUBLISH_VALUE[MQTT_ALARM_STATE])["allarme_attivo"]),72,51)
message = str(json.loads(MQTT_PUBLISH_VALUE[MQTT_NOTIFICATION])["allarme_sta_suonando"])
if message == "False":
oled_state.circle_fill(112,54,3,0)
def print_alarm_ringing():
oled_state.circle_fill(112,54,3,1)
def get_ujson_lux_state():
return ujson.dumps({"luce": lux.value()})
def read_sensors_value():
global oled_state, alarm, keypad, door, MQTT_ACTUAL_VALUE
global MQTT_NOTIFICATION, MQTT_ALARM_STATE, MQTT_DOOR_STATE, MQTT_DAILY_ACCESS, MQTT_LIGHTS_STATE
oled_state.fill_rect(1,1,127,7,0)
oled_state.text_no_fill("System state: ", 0, 1)
MQTT_ACTUAL_VALUE[MQTT_NOTIFICATION] = keypad.get_ujson_alarm_is_ringing()
MQTT_ACTUAL_VALUE[MQTT_ALARM_STATE] = keypad.get_ujson_active_alarm()
MQTT_ACTUAL_VALUE[MQTT_DOOR_STATE] = door.get_ujson_door_state()
MQTT_ACTUAL_VALUE[MQTT_DAILY_ACCESS] = door.get_ujson_n_access()
#MQTT_ACTUAL_VALUE[MQTT_LIGHTS_STATE] = get_ujson_lux_state()
def subCallback(topic, msg):
if topic == MQTT_SUBSCRIBE_TOPIC["MQTT_SYSTEM_RESET"]:
if msg == b'true':
SYSTEM_PARAMS["RESTART_SYSTEM"] = True
elif msg == b'false':
SYSTEM_PARAMS["RESTART_SYSTEM"] = False
if topic == MQTT_SUBSCRIBE_TOPIC["MQTT_LIGHTS_STATE"]:
if msg == b'true':
SYSTEM_PARAMS["MANUAL_LIGHTS_CONTROL"] = True
elif msg == b'false':
SYSTEM_PARAMS["MANUAL_LIGHTS_CONTROL"] = False
#funzione per le luci automatiche: DA DEFINIRE
"""
def auto_lights_control():
if door.get_door_state():
luce.on()
MQTT_PUBLISH_VALUE[MQTT_LIGHTS_STATE] = ujson.dumps({"light" : 1})
client.publish(MQTT_LIGHTS_STATE,MQTT_PUBLISH_VALUE[MQTT_LIGHTS_STATE])
else:
luce.off()
MQTT_PUBLISH_VALUE[MQTT_LIGHTS_STATE] = ujson.dumps({"light" : 0})
client.publish(MQTT_LIGHTS_STATE,MQTT_PUBLISH_VALUE[MQTT_LIGHTS_STATE])
"""
def restart_procedure():
global running_flag, MQTT_PUBLISH_VALUE,MQTT_ACTUAL_VALUE,SYSTEM_PARAMS
global keypad,oled_state,alarm,door
keypad.restart = True
oled_state.text_fill("Restarting the",1,1)
oled_state.text_no_fill("system",1,10)
door.set_angle(0)
keypad.set_alarm_sounds(False)
keypad.set_alarm_is_ringing(False)
keypad.set_active_alarm(True)
keypad.set_open_door(False)
keypad.set_count(0)
alarm.alarm_stop()
"""
sensor.set_alarm_sensor(False)
sensor.set_door_sensor(False)
sensor.set_thief_sensor(False)
"""
MQTT_PUBLISH_VALUE = {
MQTT_NOTIFICATION : None,
MQTT_ALARM_STATE : None,
MQTT_DOOR_STATE : None,
MQTT_DAILY_ACCESS : None,
MQTT_LIGHTS_STATE : None
}
#dizionario dei valori attuali rilevati dai sensori
MQTT_ACTUAL_VALUE = {
MQTT_DOOR_STATE: None,
MQTT_ALARM_STATE : None,
MQTT_NOTIFICATION: None,
MQTT_LIGHTS_STATE : None,
MQTT_DAILY_ACCESS : None
}
SYSTEM_PARAMS = {
"BOOT": True,
"RESTART_SYSTEM": False,
"MANUAL_LIGHTS_CONTROL": False
}
while True:
if SYSTEM_PARAMS["BOOT"] == True:
wifi_connect()
mqtt_connection()
print_state()
SYSTEM_PARAMS["BOOT"] = False
if SYSTEM_PARAMS["RESTART_SYSTEM"] == False:
try:
read_sensors_value()
except OSError as e:
print("Errore mentre leggo i valori del sensore", e)
try:
MQTT_PUBLISH_VALUE = client.publish_new_value(MQTT_PUBLISH_VALUE,MQTT_ACTUAL_VALUE)
print_parameters()
except OSError as e:
print("Errore durante la pubblicazione dei dati", e)
try:
client.check_msg()
except OSError as e:
print("Il check dei messaggi dal broker ha fallito", e)
while keypad.get_alarm_sounds():
keypad.set_alarm_is_ringing(True)
MQTT_ACTUAL_VALUE[MQTT_NOTIFICATION] = keypad.get_ujson_alarm_is_ringing()
MQTT_PUBLISH_VALUE = client.publish_new_value(MQTT_PUBLISH_VALUE,MQTT_ACTUAL_VALUE)
print_alarm_ringing()
#sensor.set_alarm_sensor(True)
alarm.alarm_sound()
if alarm.get_alarm_status() and not keypad.get_alarm_is_ringing():
alarm.alarm_stop()
#sensor.set_alarm_sensor(False)
"""
elif sensor.get_thief_sensor() and not keypad.get_open_door():
keypad.set_thief(True)
#sensor.set_alarm_sensor(True)
#sensor.set_thief_sensor(False)
"""
elif keypad.get_open_door() and not door.get_door_state():
door.set_angle(90)
#sensor.set_door_sensor(True)
elif not keypad.get_open_door() and door.get_door_state():
door.set_angle(0)
#sensor.set_door_sensor(False)
else:
restart_procedure()
time.sleep_ms(1)