# -------------------- 1. Imports --------------------
import network
import time
import urequests
from machine import Pin, I2C, PWM
import math
from imu import MPU6050 # Uses vector3d.py internally
# -------------------- 2. Configuration --------------------
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASS = ""
THINGSPEAK_API_KEY = "YOUR_API_KEY"
# Industrial Safety Thresholds (in G's)
G_WARNING = 1.5
G_CRITICAL = 3.5
# -------------------- 3. Hardware Initialization --------------------
# I2C for Raspberry Pi Pico W (GP0=SDA, GP1=SCL)
i2c = I2C(0, sda=Pin(0), scl=Pin(1), freq=400000)
imu = MPU6050(i2c)
# Actuators
motor = PWM(Pin(15))
motor.freq(50)
alarm_led = Pin(14, Pin.OUT)
# -------------------- 4. Helper Functions --------------------
def set_motor_state(running):
if running:
motor.duty_u16(4915) # Rotate (Running)
else:
motor.duty_u16(0) # Stop (Emergency Stop)
def calculate_magnitude(accel):
"""Calculate absolute G-force magnitude"""
return math.sqrt(accel.x**2 + accel.y**2 + accel.z**2)
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASS)
while not wlan.isconnected():
print("Connecting to Factory Wi-Fi...")
time.sleep(1)
print("Connected!")
# -------------------- 5. Edge Brain Initialization --------------------
last_upload = 0
motor_active = True
set_motor_state(True)
connect_wifi()
print("Smart Factory: Predictive Maintenance System Active...")
# -------------------- 6. Main Loop --------------------
while True:
# ----- EDGE PERCEPTION -----
vibration = calculate_magnitude(imu.accel)
# ----- EDGE DECISION -----
if vibration > G_CRITICAL:
set_motor_state(False)
motor_active = False
alarm_led.value(1)
print(f"CRITICAL: {vibration:.2f}g detected! Emergency Stop triggered.")
elif vibration > G_WARNING:
alarm_led.toggle()
print(f"WARNING: {vibration:.2f}g detected. Maintenance required soon.")
else:
alarm_led.value(0)
# ----- CLOUD ANALYTICS (Every 20 sec) -----
if time.time() - last_upload > 20:
try:
status_val = 1 if motor_active else 0
url = f"https://api.thingspeak.com/update?api_key={THINGSPEAK_API_KEY}&field1={vibration}&field2={status_val}"
print("CLOUD: Sending health report...")
res = urequests.get(url)
res.close()
last_upload = time.time()
except:
print("CLOUD: Connection lost. Maintaining local edge control.")
time.sleep(0.1) # 10Hz loop