import network
import time
import urequests
from machine import Pin, I2C, PWM
import math
from imu import MPU6050 # This will work if imu.py is in same folder
# --- 1. Configuration ---
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASS = ""
THINGSPEAK_API_KEY = "JFXM58T0W2FSWIBC"
# Industrial Safety Thresholds (in G's)
G_WARNING = 1.5
G_CRITICAL = 3.0
# --- 2. Hardware Initialization ---
# I2C Initialization for Pico W
i2c = I2C(0, sda=Pin(4), scl=Pin(5), freq=400000)
# IMU init
imu = MPU6050(i2c)
# Actuators
motor = Pin(15)
motor.freq(150)
alarm_led = (14, Pin.OUT)
def set_motor_state(running: bool):
if running:
motor.duty_u16(4915) # Running
else:
motor.duty_u16(0) # Stop
def calculate_magnitude(accel):
"""Calculate overall vibration magnitude (G-force)."""
return math.sqrt(accel.x**2 + accel.y**2 + accel.z**2)
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASS)
while not wlan.isconnected():
print("Connecting to Factory Wi-Fi...")
time.sleep(1)
print("WiFi Connected:", wlan.ifconfig())
# --- Start ---
last_upload = 0
motor_active = True
set_motor_state(True)
connect_wifi()
print("Smart Factory: Predictive Maintenance System Active...")
while True:
try:
vibration = calculate_magnitude(imu.accel)
# EDGE DECISION
if vibration > G_CRITICAL:
set_motor_state(False)
motor_active = False
alarm_led.value(1)
print("CRITICAL:", vibration, "g -> Emergency Stop")
elif vibration > G_WARNING:
alarm_led.toggle()
print("WARNING:", vibration, "g -> Maintenance Required")
else:
alarm_led.value(0)
# CLOUD UPLOAD every 20 seconds
if time.time() - last_upload > 20:
try:
status_val = 1 if motor_active else 0
url = (
"https://api.thingspeak.com/update?"
f"api_key={THINGSPEAK_API_KEY}"
f"&field1={vibration:.3f}"
f"&field2={status_val}"
)
print("CLOUD: Sending health report...")
r = urequests.get(url)
r.close()
last_upload = time.time()
except Exception as e:
print("CLOUD ERROR:", e)
time.sleep(0.1)
except Exception as e:
print("SENSOR ERROR:", e)
time.sleep(1)