import network
import time
import urequests
from machine import Pin, ADC
# --- Configuration ---
SSID = "Wokwi-GUEST"
PASSWORD = ""
# THINGSPEAK API (Replace with your actual Write API Key)
API_KEY = "YOUR API KEY"
BASE_URL = "https://api.thingspeak.com/update"
# --- Hardware Setup (Edge) ---
trigger = Pin(3, Pin.OUT)
echo = Pin(2, Pin.IN)
ldr = ADC(26)
brake_light = Pin(15, Pin.OUT)
# Thresholds (The "Brain" logic)
CRITICAL_DISTANCE = 50 # cm (Stop immediately)
NIGHT_THRESHOLD = 30000 # LDR value (Turn on lights)
def measure_distance():
trigger.low()
time.sleep_us(2)
trigger.high()
time.sleep_us(10)
trigger.low()
while echo.value() == 0:
signal_off = time.ticks_us()
while echo.value() == 1:
signal_on = time.ticks_us()
time_passed = signal_on - signal_off
distance = (time_passed * 0.0343) / 2
return distance
def send_to_cloud(dist, light_level, event_type):
"""
This function sends ONLY important metadata to the cloud.
"""
print(f"CLOUD: Uploading Event - {event_type}...")
try:
url = f"{BASE_URL}?api_key={API_KEY}&field1={dist}&field2={light_level}&field3={1 if event_type == 'BRAKE' else 0}"
urequests.get(url)
print("CLOUD: Upload Success.")
except:
print("CLOUD: Upload Failed.")
# --- Network Setup ---
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(SSID, PASSWORD)
while not wlan.isconnected(): time.sleep(1)
print("Connected to Internet.")
print("System Active: Tesla Autopilot Simulation")
# --- Main Loop ---
last_cloud_update = 0
last_state = "DRIVING"
while True:
# 1. PERCEPTION (High Speed Sensing)
dist = measure_distance()
light = ldr.read_u16()
# 2. EDGE DECISION (Immediate Action)
# This logic happens instantly on the Pico. No internet needed.
if dist < CRITICAL_DISTANCE:
brake_light.value(1) # HARD BRAKE
current_state = "EMERGENCY_STOP"
print(f"EDGE: CRITICAL OBSTACLE! Braking at {dist:.1f}cm")
else:
brake_light.value(0) # Drive Normal
current_state = "DRIVING"
# print(f"EDGE: Path Clear ({dist:.1f}cm)")
# 3. CLOUD LOGIC (Bandwidth Optimized)
# Only upload data if:
# A) An emergency happened (Event-based)
# B) Or it has been 20 seconds (Heartbeat)
current_time = time.time()
if current_state == "EMERGENCY_STOP" and last_state == "DRIVING":
# Event-driven upload: We just slammed the brakes. The cloud needs to know WHERE.
send_to_cloud(dist, light, "BRAKE")
last_cloud_update = current_time
elif (current_time - last_cloud_update) > 20:
# Routine heartbeat (just to say "I am alive")
send_to_cloud(dist, light, "HEARTBEAT")
last_cloud_update = current_time
last_state = current_state
time.sleep(0.1) # Loop runs very fast (10Hz) for safety
time.sleep(0.5)