# Wokwi ESP32 Motion Detector / Intrusion Alert System
# main.py
import machine
import time
import network
from umqtt.simple import MQTTClient
import ubinascii # For unique ID
import gc
# --- Configuration ---
# PIR Motion Sensor Configuration
PIR_PIN_NUM = 27 # GPIO pin connected to the PIR Motion Sensor's OUT pin
# MQTT Broker Configuration
MQTT_BROKER = "broker.hivemq.com" # Or "test.mosquitto.org"
MQTT_PORT = 1883
# --- UNIQUE MQTT Settings for THIS FRIEND ---
mac = ubinascii.hexlify(machine.unique_id())
# FRIEND 3: YOU MUST CHANGE "MyMotionDetector" TO SOMETHING UNIQUE FOR YOUR PROJECT!
# Example: If your name is Chris, use "ChrisPIRAlert"
FRIEND3_PERSONALIZE_PREFIX = "MyMotionDetector" # FRIEND 3: CHANGE THIS!
MQTT_CLIENT_ID = f"wokwi_esp32_{FRIEND3_PERSONALIZE_PREFIX}_{mac[-6:].decode()}"
MQTT_TOPIC_MOTION_ALERT = f"wokwi/{FRIEND3_PERSONALIZE_PREFIX}_{mac[-6:].decode()}/esp32/motion_status"
# --- END UNIQUE MQTT Settings ---
# Wi-Fi Configuration
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# Other settings
READ_INTERVAL_SECONDS = 1 # Check PIR state frequently
MQTT_KEEPALIVE = 60
MAX_RETRY_ATTEMPTS = 3
MOTION_ACTIVE_STATE = 1 # PIR output is HIGH when motion is detected
NO_MOTION_TIMEOUT_SECONDS = 10 # How long after last motion to consider it "CLEAR"
# --- Global Variables ---
pir_sensor = machine.Pin(PIR_PIN_NUM, machine.Pin.IN)
last_motion_state_str = "CLEAR" # Initial assumed state
last_motion_time = 0 # Time when motion was last detected
wifi_station = network.WLAN(network.STA_IF)
mqtt_client = None
# --- Functions (Use the same robust connect_wifi, connect_mqtt, publish_data from previous examples) ---
def connect_wifi():
# ... (Friend should copy the working connect_wifi function here) ...
print("Connecting to Wi-Fi...")
wifi_station.active(True)
if not wifi_station.isconnected():
print(f"Connecting to {WIFI_SSID}...")
wifi_station.connect(WIFI_SSID, WIFI_PASSWORD)
timeout = 10
while not wifi_station.isconnected() and timeout > 0:
print(".", end="")
time.sleep(1); timeout -=1
if wifi_station.isconnected():
print(f"\nWi-Fi connected! IP: {wifi_station.ifconfig()[0]}")
return True
else:
print("\nWi-Fi connection failed!")
return False
else:
print(f"Already connected. IP: {wifi_station.ifconfig()[0]}")
return True
def connect_mqtt():
# ... (Friend should copy the working connect_mqtt function here) ...
global mqtt_client
if mqtt_client:
try: mqtt_client.disconnect()
except: pass
mqtt_client = None
gc.collect()
for attempt in range(MAX_RETRY_ATTEMPTS):
try:
print(f"MQTT attempt {attempt+1}/{MAX_RETRY_ATTEMPTS} to {MQTT_BROKER} with ClientID: {MQTT_CLIENT_ID}")
mqtt_client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, port=MQTT_PORT, keepalive=MQTT_KEEPALIVE)
mqtt_client.sock_timeout = 5
mqtt_client.connect()
print(f"✓ MQTT Connected: {MQTT_BROKER}")
return True
except OSError as e:
print(f"✗ MQTT Error (attempt {attempt+1}): {e}")
mqtt_client = None
if attempt < MAX_RETRY_ATTEMPTS -1: time.sleep(3)
except Exception as e:
print(f"✗ Unexpected MQTT error: {e}")
mqtt_client = None
if attempt < MAX_RETRY_ATTEMPTS -1: time.sleep(3)
print("✗ All MQTT connection attempts failed")
return False
def publish_data(topic, value_str):
# ... (Friend should copy the working publish_data function here) ...
global mqtt_client
if not mqtt_client:
print("⚠️ MQTT client not connected for publish")
return False
try:
mqtt_client.publish(topic.encode(), value_str.encode()) # Ensure topic and message are bytes
print(f"📤 Published to {topic}: {value_str}")
return True
except OSError as e:
print(f"⚠️ MQTT publish error: {e}")
try: mqtt_client.disconnect()
except: pass
mqtt_client = None
return False
except Exception as e:
print(f"⚠️ Unexpected publish error: {e}")
return False
def read_motion_sensor():
"""Reads the PIR motion sensor state."""
global last_motion_time # Allow modification
current_motion = (pir_sensor.value() == MOTION_ACTIVE_STATE)
current_status_str = ""
publish_now = False
if current_motion:
current_status_str = "MOTION DETECTED"
last_motion_time = time.time() # Update time of last motion
if last_motion_state_str != "MOTION DETECTED":
publish_now = True
print(f"✨ Event: {current_status_str}")
else: # No current motion
# Check if it was previously "MOTION DETECTED" and timeout has passed
if last_motion_state_str == "MOTION DETECTED" and \
(time.time() - last_motion_time > NO_MOTION_TIMEOUT_SECONDS):
current_status_str = "CLEAR"
publish_now = True
print(f"✨ Event: Area {current_status_str} after timeout")
else:
# Still in "CLEAR" state or waiting for timeout after motion
current_status_str = last_motion_state_str # Keep last known state until change or timeout
# In Wokwi, clicking the PIR sensor toggles its output.
# It doesn't automatically go low after a period like a real PIR.
# So, this logic helps simulate the "CLEAR" state after a period of no (new) motion detection.
# For a simple toggle, you could just use current_motion state.
if publish_now:
return current_status_str, True # Status, and flag to publish
else:
# Return current logical state even if not publishing, and False for publish flag
return current_status_str, False
# --- Main Program ---
print("🚨 Starting Wokwi ESP32 Motion Detector...")
print("=" * 50)
print(f"MQTT Client ID: {MQTT_CLIENT_ID}")
print(f"Motion Alert Topic: {MQTT_TOPIC_MOTION_ALERT}")
print(f"Broker: {MQTT_BROKER}")
print("=" * 50)
if not connect_wifi():
print("❌ Critical: Wi-Fi failed! MQTT will not work.")
else:
connect_mqtt()
cycle_count = 0
while True:
cycle_count += 1
# print(f"\n🔄 Cycle {cycle_count}") # Optional
current_status, should_publish = read_motion_sensor()
if should_publish and current_status != last_motion_state_str : # Publish only if state changed AND flag is true
last_motion_state_str = current_status # Update last known state
if not mqtt_client :
print("🔄 MQTT client not valid. Attempting MQTT reconnection...")
connect_mqtt()
if mqtt_client:
publish_data(MQTT_TOPIC_MOTION_ALERT, current_status)
else:
print("⚠️ MQTT offline - motion alert not published this cycle")
elif not should_publish and current_status != last_motion_state_str:
# State changed logically (e.g. from motion to waiting for clear timeout) but not time to publish "CLEAR" yet
last_motion_state_str = current_status
# Print current logical status regardless of publishing, for debugging
# print(f"ℹ️ Current logical state: {last_motion_state_str}")
gc.collect()
time.sleep(READ_INTERVAL_SECONDS)