import network
import time
import urequests
from machine import Pin, I2C, PWM
import math
from imu import MPU6050 # This relies on vector3d.py internally
# --- 1. Configuration ----
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASS = ""
THINGSPEAK_API_KEY = "WVLBJ663SNWC7HBZ"
# Industrial Safety Thresholds (in G's)
G_WARNING = 1.5
G_CRITICAL = 3.0
# --- 2. Hardware Initialization ---
# I2C Initialization for Pico W (Using standard pins GP0 and GP1)
i2c = I2C(0, sda=Pin(4), scl=Pin(5), freq=400000)
imu = MPU6050(i2c)
# Actuators
motor = PWM(Pin(15))
motor.freq(50)
alarm_led = Pin(14, Pin.OUT)
def set_motor_state(running):
if running:
motor.duty_u16(4915) # Rotate to 90 degrees (Simulate Running)
else:
motor.duty_u16(0) # Stop (Simulate Emergency Stop)
def calculate_magnitude(accel):
"""Calculates the absolute G-force magnitude from the Vector3d object."""
return math.sqrt(accel.x**2 + accel.y**2 + accel.z**2)
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASS)
while not wlan.isconnected():
print("Connecting to Factory Wi-Fi...")
time.sleep(1)
print("Connected!")
# --- 3. The Edge Brain ---
last_upload = 0
motor_active = True
set_motor_state(True)
connect_wifi()
print("Smart Factory: Predictive Maintenance System Active...")
while True:
# EDGE PERCEPTION: Accessing data via the imu property structure
# imu.accel returns a Vector3d object with x, y, z properties
vibration = calculate_magnitude(imu.accel)
# EDGE DECISION (Real-time safety)
if vibration > G_CRITICAL:
set_motor_state(False)
motor_active = False
alarm_led.value(1) # Solid Red for critical failure
print(f"CRITICAL: {vibration:.2f}g detected! Emergency Stop triggered.")
elif vibration > G_WARNING:
alarm_led.toggle() # Blinking for warning
print(f"WARNING: {vibration:.2f}g detected. Maintenance required soon.")
else:
alarm_led.value(0) # All clear
# CLOUD ANALYTICS (Data Offloading)
# Uploading data every 20 seconds to ThingSpeak for historical trend analysis
if time.time() - last_upload > 20:
try:
status_val = 1 if motor_active else 0
url = f"https://api.thingspeak.com/update?api_key={THINGSPEAK_API_KEY}&field1={vibration}&field2={status_val}"
print("CLOUD: Sending health report...")
res = urequests.get(url)
res.close()
last_upload = time.time()
except:
print("CLOUD: Connection lost. Maintaining local Edge control.")
time.sleep(0.1) # Fast sampling loop (10Hz)