import time
import math
from umqtt.simple import MQTTClient
import ubinascii
import machine
import micropython
import network
import esp
from machine import Pin, ADC, PWM, SPI
import dht
import sys
esp.osdebug(None)
import gc
gc.collect()
from time import sleep_ms
from mfrc522 import MFRC522
from machine import Pin
from machine import SPI
spi = SPI(2, baudrate=2500000, polarity=0, phase=0)
spi.init()
rdr = MFRC522(spi=spi, gpioRst=4, gpioCs=5)
print("Place card")
mc38_pin = 4
mc38 = machine.Pin(mc38_pin, machine.Pin.IN)
relay = Pin(15, Pin.OUT)
servo = machine.PWM(machine.Pin(26))
pir1 = 33
pir_sensor1 = machine.Pin(pir1, machine.Pin.IN, machine.Pin.PULL_DOWN)
pir2 = 32
pir_sensor2 = machine.Pin(pir2, machine.Pin.IN, machine.Pin.PULL_DOWN)
sensor = dht.DHT22(Pin(14))
LIGHT_SENSOR_PIN = 34
adc = ADC(Pin(LIGHT_SENSOR_PIN))
adc.atten(ADC.ATTN_11DB)
buzzer= PWM(Pin(16, Pin.OUT), freq=1000)
buzzer.duty(0)
frequency = 5000
led1 = PWM(Pin(13), frequency)
led2 = PWM(Pin(12), frequency)
led1.duty(0)
led2.duty(0)
led=Pin(5,Pin.OUT)
ssid = 'Wokwi-GUEST'
password = ''
mqtt_server = 'test.mosquitto.org'
last_message = 0
message_interval = 1
client_id = ubinascii.hexlify(machine.unique_id())
topic_sub = b'esp/led/slider'
topic_sub1 = b'esp/servo/angle'
topic_sub2 = b'esp/relay/lock'
topic_sub3 = b'esp/pir/pir2OffOn'
topic_pub_temp = b'esp/dht/temperature'
topic_pub_hum = b'esp/dht/humidity'
topic_pub_light = b'esp/ldr/light'
topic_pub_pirstatus = b'esp/pir/status'
topic_pub_pir2status= b'esp/pir/pir2status'
topic_pub_ledstatus = b'esp/led/status'
topic_pub_servostatus = b'esp/servo/status'
topic_pub_lockstatus = b'esp/relay/lockstatus'
topic_pub_notificationLDR = b'esp/notif/ldr'
topic_pub_notificationTempHum = b'esp/notif/dht'
topic_pub_notifVlamanie = b'esp/mc38/okno'
def connect_wifi():
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
wifi.disconnect()
wifi.connect(ssid,password)
if not wifi.isconnected():
print('connecting..')
timeout = 0
while (not wifi.isconnected() and timeout < 10):
print(10 - timeout)
timeout = timeout + 1
time.sleep(1)
if(wifi.isconnected()):
print('connected')
else:
print('not connected')
sys.exit()
connect_wifi()
print('Connection successful')
def connect_and_subscribe():
global client_id, mqtt_server, topic_sub, topic_sub1, topic_sub2, topic_sub3
client = MQTTClient(client_id, mqtt_server)
client.set_callback(sub_sub)
client.connect()
client.subscribe(topic_sub)
client.subscribe(topic_sub1)
client.subscribe(topic_sub2)
client.subscribe(topic_sub3)
print('Connected to %s MQTT broker, subscribed to %s topic' % (mqtt_server, topic_sub))
print('Connected to %s MQTT broker, subscribed to %s topic' % (mqtt_server, topic_sub1))
print('Connected to %s MQTT broker, subscribed to %s topic' % (mqtt_server, topic_sub2))
print('Connected to %s MQTT broker, subscribed to %s topic' % (mqtt_server, topic_sub3))
return client
def restart_and_reconnect():
print('Failed to connect to MQTT broker. Reconnecting...')
time.sleep(10)
machine.reset()
def rfid():
(stat, tag_type) = rdr.request(rdr.REQIDL)
if stat == rdr.OK:
(stat, raw_uid) = rdr.anticoll()
if stat == rdr.OK:
card_id = "uid: 0x%02x%02x%02x%02x" % (raw_uid[0], raw_uid[1], raw_uid[2], raw_uid[3])
print(card_id)
motion_detected_flag = False
def motion_detected(pir1):
global motion_detected_flag
motion_detected_flag = True
motion_detected_flag1 = False
def motion_detected1(pir2):
global motion_detected_flag1
motion_detected_flag1 = True
pir_sensor1.irq(trigger=machine.Pin.IRQ_RISING, handler=motion_detected)
pir_sensor2.irq(trigger=machine.Pin.IRQ_RISING, handler=motion_detected1)
servo.freq(50)
def interval_mapping(x, in_min, in_max, out_min, out_max):
return (x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
def servo_write(pin, angle):
pulse_width = interval_mapping(angle, 0, 180, 0.5, 2.5)
duty = int(interval_mapping(pulse_width, 0, 20, 0, 1023))
pin.duty(duty)
rl10 = 50e3
gamma = 0.7
def calculate_resistance():
value = adc.read()
voltage_ratio = value / (4095 - value)
return 10e3 * voltage_ratio
def calculate_lux(resistance):
return 10 * math.pow(rl10/resistance, 1/gamma)
for i in range(len(points) - 1):
if points[i+1][1] <= resistance <= points[i][1]:
x1, y1 = points[i]
x2, y2 = points[i + 1]
light_up_to(i + 2)
return x1 + ((resistance - y1) / (y2 - y1)) * (x2 - x1)
mc38_status = mc38.value()
detekovanyPohybVlamanie = False
detekovanyPohyb = False
otvoreneDvere = False
zatvoreneDvere = False
zapnuteSvetlo = False
vypnutesvetlo = False
otvoreneOkno = False
zatvoreneOkno = False
PIR2OffOn = False
angle = 0
servo_write(servo , 0)
relay.value(1)
buzzer.duty(0)
def sub_sub(topic, msg):
print ('%s from topic %s' %(msg, topic))
if topic == b'esp/led/slider':
ledVal = int(msg)
print(ledVal)
ledPWM = int(interval_mapping(ledVal, 0, 100, 0, 1023))
led1.duty(ledPWM)
led2.duty(ledPWM)
if topic == b'esp/servo/angle':
global angle
angle = int(msg)
servo_write(servo, angle)
if topic == b'esp/relay/lock':
mess = msg
if mess == b'TRUE':
relay.value(0)
elif mess == b'FALSE':
relay.value(1)
global PIR2OffOn
if topic == b'esp/pir/pir2OffOn':
PIR2msg = msg
global PIR2OffOn
if PIR2msg == b'TRUE':
PIR2OffOn = True
elif PIR2msg == b'FALSE':
PIR2OffOn = False
def read_sensor(): #Načítavanie hodnôt z DHT22 a BH1750
try:
resistance = calculate_resistance()
fotorezistor = round(calculate_lux(resistance),1)
sensor.measure()
temp = sensor.temperature()
hum = sensor.humidity()
if (isinstance(temp, float) and isinstance(hum, float)) or (isinstance(temp, int) and isinstance(hum, int)):
temp = (b'{0:3.1f},'.format(temp))
hum = (b'{0:3.1f},'.format(hum))
light = (b'{0:3.1f},'.format(fotorezistor))
return temp, hum, fotorezistor
else:
return('Invalid sensor readings.')
except OSError as e:
return('Failed to read sensor.')
def lock_status(): #Status dverí
global zatvoreneDvere, otvoreneDvere
try:
if relay.value() == 1:
zatvoreneDvere = True
otvoreneDvere = False
msg = "ZATVORENE"
client.publish(topic_pub_lockstatus, msg)
print('Lock status: %s on topic %s' % (msg, topic_pub_lockstatus))
elif relay.value() == 0:
zatvoreneDvere = False
otvoreneDvere = True
msg = "OTVORENE"
client.publish(topic_pub_lockstatus, msg)
print('Lock status: %s on topic %s' % (msg, topic_pub_lockstatus))
except OSError as e:
print("Nepodarilo sa načítať stav Lock")
def servo_status(): #Status Okna
global zatvoreneOkno, otvoreneOkno
try:
if mc38_status == 1 or angle == 0:
zatvoreneOkno = True
otvoreneOkno = False
msg = "ZATVORENE"
client.publish(topic_pub_servostatus, msg)
print('Okno status: %s on topic %s' % (msg, topic_pub_servostatus))
elif mc38_status == 0 or angle == 90:
zatvoreneOkno = False
otvoreneOkno = True
msg = "OTVORENE"
client.publish(topic_pub_servostatus, msg)
print('Okno status: %s on topic %s' % (msg, topic_pub_servostatus))
except OSError as e:
print("Nepodarilo sa načítať stav Servo")
def led_status(): #Status LED
global vypnutesvetlo, zapnuteSvetlo
try:
if (led1.duty() > 0 and led1.duty() < 1024) or (led2.duty() > 0 and led2.duty() < 1024):
vypnutesvetlo = False
zapnuteSvetlo = True
msg = "ON"
client.publish(topic_pub_ledstatus, msg)
print('LED status: %s on topic %s' % (msg, topic_pub_ledstatus))
elif led1.duty() == 0 or led2.duty() == 0:
zapnuteSvetlo = False
vypnutesvetlo = True
msg = "OFF"
client.publish(topic_pub_ledstatus, msg)
print('LED status: %s on topic %s' % (msg, topic_pub_ledstatus))
except OSerror as e:
return("Nepodarilo sa poslať stav LED")
def pir_status(): #Status prvého PIR senzora
global motion_detected_flag
global detekovanyPohyb
try:
if motion_detected_flag == True:
detekovanyPohyb = True
msg = "DETECTED"
client.publish(topic_pub_pirstatus, msg)
print('PIR status: %s on topic %s' % (msg, topic_pub_pirstatus))
motion_detected_flag = False
elif motion_detected_flag == False:
detekovanyPohyb = False
msg = "NO MOTION"
client.publish(topic_pub_pirstatus, msg)
print('PIR status: %s on topic %s' % (msg, topic_pub_pirstatus))
except OSError as e:
print("Nepodarilo sa načítať PIR status")
def pir_status1(): #Status druhého PIR senzora
global motion_detected_flag1
global detekovanyPohybVlamanie
try:
if motion_detected_flag1 == True:
detekovanyPohybVlamanie = True
msg = "DETECTED"
client.publish(topic_pub_pir2status, msg)
print('PIR status: %s on topic %s' % (msg, topic_pub_pir2status))
motion_detected_flag1 = False
elif motion_detected_flag1 == False:
detekovanyPohybVlamanie = False
msg = "NO MOTION"
client.publish(topic_pub_pir2status, msg)
print('PIR status: %s on topic %s' % (msg, topic_pub_pir2status))
except OSError as e:
print("Nepodarilo sa načítať PIR status")
Status = False
def temp_hum_check(Status): #Detekovanie prípadného požiaru
teplota = sensor.temperature()
vlhkost = sensor.humidity()
if teplota > 40 and vlhkost > 70:
Status = True
print("Nameraná vysoká teplota a vlhkosť, potenciálny požiar")
buzzer.duty(300)
msg = "TRUE"
client.publish(topic_pub_notificationTempHum, msg)
for aa in range(5):
if (angle == 0 and relay.value() == 1) or (angle == 0 or relay.value() == 1) or (angle == 90 or relay.value() == 0):
aa += 1
print("Kontrolujem stav dverí a okna %s s" % aa)
print("-------------------------")
time.sleep(1)
servo_status()
lock_status()
led_status()
time.sleep(1)
if aa == 5:
print("Automaticky otváram dvere a okná a vypínam svetlá")
time.sleep(1)
servo_write(servo, 90)
relay.value(0)
led1.duty(0)
led2.duty(0)
time.sleep(1)
print("Uspešne vypnutie dverí, okna a svetla")
time.sleep(2)
buzzer.value(0)
return Status
break
elif (angle == 90 and relay.value() == 0 and led.duty(0)):
buzzer.duty(0)
time.sleep(1)
return Status
break
def ldr_check(): #Detekovanie vysokej intenzity osvetlenia
if fotorezistor > 4060:
print("Vyskotá intezita svetla zistená")
buzzer.duty(1)
time.sleep(0.5)
buzzer.duty(0)
msg = "TRUE"
client.publish(topic_pub_notificationLDR, msg)
def zapSvetlo(): #Automatické zap. svetla podľa pohybu a intenzity svetla
if detekovanyPohyb == True:
if fotorezistor > 4000:
return
if vypnutesvetlo == True and fotorezistor < 1000:
led1.duty(500)
led2.duty(500)
def protiVlamaniu():
if PIR2OffOn == True:
global motion_detected_flag1
global detekovanyPohybVlamanie
try:
if motion_detected_flag1 == True:
detekovanyPohybVlamanie = True
buzzer.duty(200)
msg = "DETECTED"
client.publish(topic_pub_pir2status, msg)
print('PIR status: %s on topic %s' % (msg, topic_pub_pir2status))
motion_detected_flag1 = False
buzzer.duty(0)
elif motion_detected_flag1 == False:
detekovanyPohybVlamanie = False
msg = "NO MOTION"
client.publish(topic_pub_pir2status, msg)
print('PIR status: %s on topic %s' % (msg, topic_pub_pir2status))
except OSError as e:
print("Nepodarilo sa načítať PIR status")
if mc38_status == 0:
print("Kontakt otvorený!")
buzzer.duty(200)
msg = "TRUE"
client.publish(topic_pub_notifVlamanie, msg)
buzzer.duty(0)
elif mc38_status == 1:
msg = "FALSE"
client.publish(topic_pub_notifVlamanie, msg)
def ModeStatusLED():
if pir2OffOn == True:
led.value(1)
elif pir2OffOn == False:
led.value(0)
try:
client = connect_and_subscribe()
except OSError as e:
restart_and_reconnect()
while True: #Hlavná slučka
try:
new_msg = client.check_msg()
if (time.time() - last_message) > message_interval:
#do_read()
print("-------------------------")
print("Kontrolujem stav senzorov")
print("-------------------------")
temp, hum, fotorezistor = read_sensor()
print("Teplota : %s" % (temp))
print("Vlhkost : %s" % (hum))
print("Svetlo : %s" % (fotorezistor))
client.publish(topic_pub_temp, temp)
client.publish(topic_pub_hum, hum)
client.publish(topic_pub_light, str(fotorezistor))
led_status()
pir_status()
pir_status1()
servo_status()
lock_status()
ldr_check()
zapSvetlo()
print("Place card")
if temp_hum_check(Status):
print("Vypínam systém z dôvodu zistenia požiaru")
break
temp_hum_check(Status)
if card_id == 0xe3e30035:
relay.value (0)
elif card_id == 0xe3e30035 and relay.value(0):
relay.value(1)
last_message = time.time()
except OSError as e:
restart_and_reconnect()