import network
import time
from machine import Pin, PWM
import socket
import ujson
import ubinascii
from umqtt.simple import MQTTClient
# WiFi Configuration
SSID = "your_wifi_ssid"
PASSWORD = "your_wifi_password"
# Blynk MQTT Broker Details (Blynk uses MQTT)
MQTT_BROKER = "blynk-cloud.com"
BLYNK_AUTH_TOKEN = "your_blynk_auth_token"
MQTT_CLIENT_ID = b"pico-watchpi-" + ubinascii.hexlify(machine.unique_id())
MQTT_TOPIC_PUB = bytes(BLYNK_AUTH_TOKEN + "/update/V0", 'utf-8') # Virtual pin V0 for PIR status
# Initialize PIR sensor on GPIO 16 (example)
pir = Pin(16, Pin.IN, Pin.PULL_DOWN)
# Initialize Buzzer on GPIO 15
buzzer = PWM(Pin(15))
buzzer.deinit() # Initially buzzer off
# WiFi connection routine
def connect_wifi(ssid, password):
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print("Connecting to WiFi...")
wlan.connect(ssid, password)
while not wlan.isconnected():
time.sleep(0.5)
print("WiFi connected, IP=", wlan.ifconfig()[0])
return wlan
# Buzzer beep function
def beep(duration_ms):
buzzer.init()
buzzer.freq(2000) # 2kHz beep tone
buzzer.duty_u16(32768)
time.sleep_ms(duration_ms)
buzzer.deinit()
# MQTT message publish function
def publish_motion(client, motion_detected):
data = "1" if motion_detected else "0"
try:
client.publish(MQTT_TOPIC_PUB, data)
print(f"Published motion status: {data}")
except Exception as e:
print("Publish error", e)
# Main function
def main():
wlan = connect_wifi(SSID, PASSWORD)
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, keepalive=60)
client.connect()
motion_state = 0
print("WatchPi Smart Security is running...")
while True:
pir_val = pir.value()
if pir_val == 1 and motion_state == 0:
print("Motion Detected!")
beep(500) # Beep buzzer 0.5 seconds
publish_motion(client, True)
motion_state = 1
elif pir_val == 0 and motion_state == 1:
print("Motion Stopped")
publish_motion(client, False)
motion_state = 0
time.sleep(0.1)
if __name__ == "__main__":
main()