import machine
import network
import time
import math
import uasyncio as asyncio
import urequests
import json
import ntptime
import gc
from neopixel import NeoPixel
import ssd1306
# --- HARDWARE SETUP ---
BUTTON_PIN = machine.Pin(16, machine.Pin.IN, machine.Pin.PULL_DOWN)
NUM_LEDS_PER_STRIP = 17
pixels = NeoPixel(machine.Pin(15), NUM_LEDS_PER_STRIP * 2)
# OLED Setup (Simulated I2C for Wokwi)
# Create the I2C with a massive timeout for slow simulations
i2c = machine.I2C(0, sda=machine.Pin(4), scl=machine.Pin(5), timeout=100000)
display = ssd1306.SSD1306_I2C(128, 64, i2c)
# --- GLOBAL STATE ---
STATIONS = {"Earl's Court": 0, "West Brompton": 2, "Fulham Broadway": 4, "Parsons Green": 6,
"Putney Bridge": 8, "East Putney": 10, "Southfields": 12, "Wimbledon Park": 14, "Wimbledon": 16}
tfl_departures = []
train_positions = {"north": [], "south": []}
weather_temp, line_status = "--", "Loading..."
ticker_x = 128
force_refresh_event = asyncio.Event()
# --- WIFI & SMART TIME SYNC ---
async def sync_time_smart():
global line_status
print("Syncing local London time...")
# We use TimeAPI.io but focus on their coordinate-based data which is lighter
url = "https://timeapi.io/api/Time/current/zone?timeZone=Europe/London"
try:
res = urequests.get(url)
data = res.json()
res.close()
# Manually extract the components to avoid 32-bit epoch overflow math
year = data['year']
month = data['month']
day = data['day']
hour = data['hour']
minute = data['minute']
seconds = data['seconds']
# Set the RTC: (year, month, day, weekday, hours, minutes, seconds, subseconds)
machine.RTC().datetime((year, month, day, 0, hour, minute, seconds, 0))
print("Time synced via TimeAPI (London Local)")
except Exception as e:
print(f"Smart sync failed ({e}), using NTP fallback...")
try: ntptime.settime()
except: pass
async def connect_wifi():
global line_status
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
line_status = "Connecting..."
wlan.connect("Wokwi-GUEST", "")
while not wlan.isconnected():
await asyncio.sleep(0.5)
print("WiFi Connected!", wlan.ifconfig())
line_status = "Connected"
await sync_time_smart()
async def wifi_watchdog():
wlan = network.WLAN(network.STA_IF)
while True:
if not wlan.isconnected():
await connect_wifi()
await asyncio.sleep(30)
# --- DATA TASKS ---
async def update_tfl_api():
import gc
global tfl_departures, train_positions
# Let's hit the big hubs first to get data on screen faster
STOP_POINTS = [
"940GZZLUECT", "940GZZLUWBT", "940GZZLUFBY", "940GZZLUPGN",
"940GZZLUPYB", "940GZZLUEPY", "940GZZLUSFS", "940GZZLUWMP", "940GZZLUWIM"
]
while True:
if network.WLAN(network.STA_IF).isconnected():
new_valid_trains = []
new_southfields_board = []
for stop_id in STOP_POINTS:
# Give the simulator time to breathe between stations
await asyncio.sleep(2)
try:
# TFL API is faster if we only ask for what we need
url = f"https://api.tfl.gov.uk/StopPoint/{stop_id}/Arrivals"
response = urequests.get(url, timeout=15)
raw_data = response.json()
response.close()
if isinstance(raw_data, list):
for t in raw_data:
if t.get("lineName") == "District":
s_name = t.get("stationName", "").replace(" Underground Station", "")
direction = t.get("direction", "")
# Filters
if s_name == "Earl's Court" and direction != "outbound": continue
new_valid_trains.append({
"s": s_name,
"d": direction,
"t": t.get("timeToStation", 0)
})
if "Southfields" in s_name:
new_southfields_board.append(t)
print(f"Synced station: {stop_id}")
except Exception as e:
print(f"Station {stop_id} error: {e}")
# Cleanup memory after every single request
gc.collect()
await asyncio.sleep(0.5)
# --- Process Results ---
tfl_departures = sorted(new_southfields_board, key=lambda x: x["timeToStation"])
pn, ps = [], []
for t in new_valid_trains:
s_name, direction, time_to = t["s"], t["d"], t["t"]
if s_name in STATIONS:
idx = STATIONS[s_name]
if time_to > 60 and idx < 16 and direction == "outbound": idx += 1
if time_to > 60 and idx > 0 and direction == "inbound": idx -= 1
state = "pulse" if time_to < 30 else "solid"
if direction == "outbound": ps.append({"idx": idx, "state": state})
else: pn.append({"idx": idx, "state": state})
train_positions["north"], train_positions["south"] = pn, ps
print("--- Map Update Complete ---")
# Wait 2 minutes in simulation time for next update (Wokwi is slow!)
try:
await asyncio.wait_for(force_refresh_event.wait(), 120)
force_refresh_event.clear()
except asyncio.TimeoutError:
pass
async def update_slow_data():
global weather_temp, line_status
while True:
if network.WLAN(network.STA_IF).isconnected():
try:
print("Fetching Weather & Status...")
res = urequests.get("https://api.open-meteo.com/v1/forecast?latitude=51.50&longitude=-0.12¤t_weather=true")
data = res.json()
res.close()
if "current_weather" in data:
weather_temp = str(round(data["current_weather"]["temperature"]))
print(f"Weather Update: {weather_temp}C")
# District Line Status
res = urequests.get("https://api.tfl.gov.uk/Line/district/Status")
line_status = res.json()[0]["lineStatuses"][0]["statusSeverityDescription"]
res.close()
print(f"Line Status: {line_status}")
except Exception as e:
print(f"Slow Data Error: {e}")
# If it fails, wait 30 seconds to try again; if success, wait 10 mins
await asyncio.sleep(600 if weather_temp != "--" else 30)
# --- VISUAL TASKS ---
async def run_led_animations():
while True:
ms = time.ticks_ms()
pulse = (math.sin(ms / 300) + 1) / 2 * 0.9 + 0.1
for i in range(34): pixels[i] = (0,0,0)
for t in train_positions["south"]:
b = int(50*pulse) if t["state"]=="pulse" else 50
if 0 <= t["idx"] < 17: pixels[t["idx"]] = (0, b, 0)
for t in train_positions["north"]:
b = int(50*pulse) if t["state"]=="pulse" else 50
idx = t["idx"] + 17
if 17 <= idx < 34: pixels[idx] = (0, b, 0)
pixels[9] = pixels[26] = (0,0,5) # Thames
pixels.write()
await asyncio.sleep(0.2)
async def run_display():
global ticker_x
while True:
try:
display.fill(0)
# Ticker
ticker_text = f"{weather_temp}C | {line_status} "
display.text(ticker_text, ticker_x, 2, 1)
ticker_x -= 2
if ticker_x < -(len(ticker_text)*8): ticker_x = 128
display.hline(0, 12, 128, 1)
# Departures
if not tfl_departures:
display.text("Updating TFL...", 10, 25, 1)
else:
for i, train in enumerate(tfl_departures[:3]):
dest = train.get("destinationName", "")[:12]
m = train.get("timeToStation", 0) // 60
display.text(f"{dest}", 2, 15+(i*11), 1)
display.text(f"{m}m", 100, 15+(i*11), 1)
# Clock
display.hline(0, 50, 128, 1)
lt = time.localtime()
display.text(f"{lt[3]:02d}:{lt[4]:02d}:{lt[5]:02d}", 30, 54, 1)
display.show()
# CRITICAL: 0.05 gives Wokwi enough time to process I2C without ETIMEDOUT
await asyncio.sleep(0.1)
except OSError:
print("Display Timeout - Simulation Lag")
await asyncio.sleep(0.5) # Wait for simulation to breathe
async def button_monitor():
while True:
if BUTTON_PIN.value() == 1:
print("Force Refresh...")
force_refresh_event.set()
await asyncio.sleep(1)
await asyncio.sleep(0.1)
# --- BOOT ---
async def boot_sequence():
"""Shows splash screen and handles initial connection/sync before starting tasks."""
display.fill(0)
display.rect(0, 0, 128, 64, 1)
display.text("TUBE MAP V1.1", 15, 15, 1)
display.text("WIFI: Connecting", 5, 35, 1)
display.show()
# 1. Connect WiFi
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect("Wokwi-GUEST", "")
dot_count = 0
while not wlan.isconnected():
# Animating dots so you know it's not frozen
dots = "." * (dot_count % 4)
display.fill_rect(5, 35, 120, 10, 0) # Clear the line
display.text(f"WIFI: Connecting{dots}", 5, 35, 1)
display.show()
dot_count += 1
await asyncio.sleep(0.5)
# 2. Update Splash for Time Sync
display.fill_rect(5, 35, 120, 25, 0) # Clear bottom half
display.text("WIFI: Online", 5, 35, 1)
display.text("TIME: Syncing...", 5, 45, 1)
display.show()
# 3. Sync Time
await sync_time_smart()
# 4. Final Splash Update
display.fill_rect(5, 45, 120, 10, 0)
display.text("TIME: Synced!", 5, 45, 1)
display.show()
await asyncio.sleep(1) # Brief pause to show success
# 5. Start all background tasks
print("Starting background tasks...")
loop = asyncio.get_event_loop()
loop.create_task(wifi_watchdog())
loop.create_task(update_slow_data())
loop.create_task(update_tfl_api())
loop.create_task(run_led_animations())
loop.create_task(run_display())
loop.create_task(button_monitor())
# --- Revised Boot Execution ---
def boot():
# We use a simple loop.run_until_complete for the boot sequence
# to ensure it finishes before the background tasks take over.
loop = asyncio.get_event_loop()
loop.run_until_complete(boot_sequence())
loop.run_forever()
boot()Loading
pi-pico-w
pi-pico-w