import network
import time
from machine import Pin, I2C, PWM
from i2c_lcd import I2cLcd
import dht
import ujson
from umqtt.simple import MQTTClient
import songs
"""
To view the data in web:
1. Go to http://www.hivemq.com/demos/websocket-client/
2. Click "Connect"
3. Under Subscriptions, click "Add New Topic Subscription"
4. In the Topic field, type "Foksha_lab2" then click "Subscribe"
"""
#PINS
Button = Pin(19,Pin.IN, Pin.PULL_UP) # switch on/off
Motion_sensor1 = Pin(34, Pin.IN) # motion 1
Motion_sensor2 = Pin(35, Pin.IN) # motion 2
pin_buzzer = Pin(5, Pin.OUT) # speaker
LCD = None # LCD as object, needed for func access
LCD_SCL = 22 # LCD SCL pin port
LCD_SDA = 23 # LCD SDA pin port
LCD_SIZE = 16 # LCD row length
MQTT_TOPIC = "Foksha_lab2" # hivemq topic connecting to
WIFI_CLIENT = "" # internet-connector object
# Var contains system status
ALARM_ENABLED = False
# API for web-site message
# call server_report(*your text*) to share this with site
def server_report(Text):
# if connected -> send
try:
message = ujson.dumps({
"Message": Text
})
WIFI_CLIENT.publish(MQTT_TOPIC, message)
# if disconnected -> connect and send
except:
init_MQTT()
reset_lcd(Both = True)
LCD.move_to(0,0)
if ALARM_ENABLED:
LCD.putstr("Alarm enabled!")
else: LCD.putstr("Alarm disabled!")
server_report(Text)
# API for speaker to play sound
def speaker_play(Freq, Duration_sec):
if Freq != 0:
buzzer_pwm = PWM(pin_buzzer) # init
buzzer_pwm.freq(Freq) # enable and set tone (freq = sound)
time.sleep(Duration_sec) # wait
buzzer_pwm.deinit() # disable
else:
time.sleep(Duration_sec) # wait
# API for reset LCD display rows
# Level upper or lower, Both for both
def reset_lcd(Level=0, Both = False):
# reset only one row
if not Both:
LCD.move_to(0,Level)
[LCD.putchar(" ") for _ in range(LCD_SIZE)]
# reset both rows
else:
LCD.move_to(0,0)
[LCD.putchar(" ") for _ in range(LCD_SIZE * 2)] # overload first row sets pointer in second row
# init LCD object
def init_lcd():
global LCD
# LCD adress by default
AddressOfLcd = 0x27
# custom library with LCD API from documentations
i2c = I2C(scl=Pin(LCD_SCL), sda=Pin(LCD_SDA), freq=400000)
LCD = I2cLcd(i2c, AddressOfLcd, 2, 16)
# Display my title when loading
# just remove this block to skip intro
if True:
notes = [
[261, 622, 0, 698, 415, 0, 698, 622, 554, 523, 0],
[523, 523, 349, 415, 0, 466, 523, 466, 415, 392]
]
LCD.move_to(0,0)
Titles = ["Foksha K.V ", "KN-M1023b "]
for title in Titles:
counter = 0
for ch in title:
LCD.putstr(ch)
if counter < len(notes[Titles.index(title)]):
speaker_play(notes[Titles.index(title)][counter], 0.3)
counter+=1
LCD.move_to(0,1)
time.sleep(2)
reset_lcd(Both=True)
# init wifi connector
def init_WiFi():
# reset display, show connection info
reset_lcd(Both=True)
LCD.move_to(0,0)
LCD.putstr("Connecting")
LCD.move_to(0,1)
LCD.putstr("to WiFi")
# Connect to wokwi internal web
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('Wokwi-GUEST', '')
# if wi-fi error it will swich chars in progress bar
counter = 0
char = [".", "x"]
# display loading while not connected
while not sta_if.isconnected():
counter+=1
if counter == LCD_SIZE * 2:
char[0], char[1] = char[1], char[0]
counter = 0
LCD.putstr(char[0])
time.sleep(0.1)
# display connected status
reset_lcd(Both=True)
LCD.move_to(0,0)
LCD.putstr("Wi-Fi Connected!")
speaker_play(2000, 0.2) # sound
time.sleep(1)
def init_MQTT():
# MQTT Server Parameters
global WIFI_CLIENT
# mqtt server website with data
MQTT_CLIENT_ID = "micropython-weather-demo"
MQTT_BROKER = "broker.mqttdashboard.com"
MQTT_USER = ""
MQTT_PASSWORD = ""
# display status
reset_lcd(Both=True)
LCD.move_to(0,0)
LCD.putstr("Connecting to")
LCD.move_to(0,1)
LCD.putstr("MQTT server...")
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, user=MQTT_USER, password=MQTT_PASSWORD)
client.connect()
reset_lcd(Both=True)
LCD.move_to(0,0)
LCD.putstr("MQTT Connected!")
WIFI_CLIENT = client
# server report
server_report("System enabled!")
speaker_play(2000, 0.2) # sound
time.sleep(1)
# function called when button pushed with alarm off
def enable_alarm():
global ALARM_ENABLED
# display enabling process
reset_lcd(Both = True)
LCD.move_to(0,0)
LCD.putstr("Enabling")
for _ in range(5):
LCD.putstr(".")
time.sleep(1)
# enable, display
ALARM_ENABLED = True
reset_lcd(Both = True)
LCD.move_to(0,0)
LCD.putstr("Alarm enabled!")
# report to MQTT
server_report("Alarm enabled!")
speaker_play(4000, 0.2) # sound
# function called when putton pushed with alarm
def disable_alarm():
global ALARM_ENABLED
# display disabling process to LCD
reset_lcd(Both = True)
LCD.move_to(0,0)
LCD.putstr("Disabling")
for _ in range(5):
LCD.putstr(".")
time.sleep(1)
# disable, display
ALARM_ENABLED = False
reset_lcd(Both = True)
LCD.move_to(0,0)
LCD.putstr("Alarm disabled!")
server_report("Alarm disabled!") # report to MQTT
speaker_play(3000, 0.2) # sound
# called when alarm triggered
def triggger_alarm():
# LCD display
reset_lcd(Both = True)
LCD.move_to(4,0)
LCD.putstr("ALARM!!")
LCD.move_to(4,1)
LCD.putstr("ALARM!!")
# to be played
song = [
293, 392, 293, 466, 293, 392, 293, 523, 293, 369, 293, 440, 293, 369, 293,
293, 392, 293, 587, 293, 466, 293, 466, 261, 311, 261, 311, 523, 311,
293, 392, 293, 466, 392, 587, 466, 783, 392, 466, 392, 622, 466, 783, 466,
783, 392, 466, 392, 587, 466, 698, 466, 698, 392, 523, 698, 622, 783, 523, 622, 587
]
[speaker_play(note, 0.3) for note in song]
def loop():
global ALARM_ENABLED
# display system status
reset_lcd(Both = True)
LCD.move_to(0,0)
LCD.putstr("Alarm disabled!")
while True:
# motions values
Motion = Motion_sensor1.value()
Motion += Motion_sensor2.value()
# check if button pressed
button_pushed = bool(1 - Button.value())
if button_pushed and not ALARM_ENABLED:
# Turn on
enable_alarm()
alarm_triggered = False
continue
if button_pushed and ALARM_ENABLED:
# Turn off
disable_alarm()
alarm_triggered = False
ALARM_ENABLED = False
continue
if ALARM_ENABLED and Motion > 0:
server_report("Alarm! Alarm") # report to MQTT
# spam buzzer and LCD
triggger_alarm()
reset_lcd(Both = True)
LCD.move_to(0,0)
LCD.putstr("Alarm enabled!")
continue
if __name__ == "__main__":
init_lcd()
init_WiFi()
init_MQTT()
loop()