import network
import urequests
import time
import json
from machine import RTC
import sys
# WiFi credentials
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# API endpoint
API_URL = 'https://api-bpi.k3p.in/api/config'
# Retry settings
MAX_RETRIES = 3
RETRY_DELAY = 5 # seconds
API_CHECK_INTERVAL = 30 # Check API every 30 seconds
# Global state
current_expression = None
current_refresh_time = 0
current_time_format = 'dd-mmm-yyyy'
last_time_display = 0
time_only_mode = False # NEW: Track if we're in time-only mode
rtc = RTC()
def connect_wifi():
"""Connect to WiFi"""
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('Connecting to WiFi...')
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
timeout = 20
while not wlan.isconnected() and timeout > 0:
print('.', end='')
time.sleep(1)
timeout -= 1
print()
if wlan.isconnected():
print('WiFi Connected!')
print('IP:', wlan.ifconfig()[0])
return True
else:
print('WiFi connection failed!')
return False
def fetch_config(retries=MAX_RETRIES):
"""Fetch configuration from API with retry logic"""
for attempt in range(1, retries + 1):
try:
response = urequests.get(API_URL, timeout=10)
if response.status_code == 200:
data = response.json()
response.close()
return data
else:
response.close()
if attempt < retries:
time.sleep(RETRY_DELAY)
except OSError as e:
if attempt < retries:
time.sleep(RETRY_DELAY)
except Exception as e:
if attempt < retries:
time.sleep(RETRY_DELAY)
return None
def format_time(time_format='dd-mmm-yyyy'):
"""Format current time based on time_format setting"""
try:
year, month, day, hour, minute, second, _, _ = rtc.datetime()
months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
if time_format == 'dd-mmm-yyyy':
return f"{day:02d}-{months[month-1]}-{year}"
elif time_format == 'hh:mm:ss':
return f"{hour:02d}:{minute:02d}:{second:02d}"
elif time_format == 'hh:mm':
return f"{hour:02d}:{minute:02d}"
elif time_format == 'dd/mm/yyyy':
return f"{day:02d}/{month:02d}/{year}"
elif time_format == 'mm/dd/yyyy':
return f"{month:02d}/{day:02d}/{year}"
else:
# Default: date and time
return f"{day:02d}-{months[month-1]} {hour:02d}:{minute:02d}"
except:
return "Time N/A"
def display_time_on_screen():
"""Display time on the center of the OLED screen"""
try:
import expression as exp
if exp.DISPLAY_AVAILABLE and exp.oled:
time_str = format_time(current_time_format)
# Clear display
exp.oled.fill(0)
# Calculate center position for text
# Text is 8 pixels per character, so adjust x position
text_width = len(time_str) * 8
x_pos = max(0, (128 - text_width) // 2)
y_pos = 28 # Center vertically
# Display time
exp.oled.text(time_str, x_pos, y_pos, 1)
exp.oled.show()
print(f"Time displayed: {time_str}")
except Exception as e:
print(f"Error displaying time: {e}")
def show_loading_screen(stage="Loading"):
"""Display loading screen with status"""
try:
import expression as exp
if exp.DISPLAY_AVAILABLE and exp.oled:
# Clear display
exp.oled.fill(0)
# Display title
exp.oled.text("BeePI", 42, 8, 1)
# Display stage-specific text
if stage == "wifi":
exp.oled.text("Loading", 38, 24, 1)
exp.oled.text("Connecting WiFi", 8, 40, 1)
elif stage == "time":
exp.oled.text("Loading", 38, 24, 1)
exp.oled.text("Syncing time...", 16, 40, 1)
elif stage == "config":
exp.oled.text("Loading", 38, 24, 1)
exp.oled.text("Fetching config", 14, 40, 1)
elif stage == "error":
exp.oled.text("Error", 44, 24, 1)
exp.oled.text("WiFi Failed!", 24, 40, 1)
exp.oled.text("Retrying...", 28, 56, 1)
exp.oled.show()
return # Skip animation for error
else:
exp.oled.text("Loading", 38, 24, 1)
exp.oled.text("Initializing...", 16, 40, 1)
# Display loading dots animation (skip for error stage)
current_time = time.time()
dot_count = int(current_time % 3) + 1
dots = "." * dot_count
exp.oled.text(dots, 58, 56, 1)
exp.oled.show()
except Exception as e:
print(f"Error showing loading screen: {e}")
def clear_loading_screen():
"""Clear the loading screen"""
try:
import expression as exp
if exp.DISPLAY_AVAILABLE and exp.oled:
exp.oled.fill(0)
exp.oled.show()
except Exception as e:
print(f"Error clearing loading screen: {e}")
def stop_current_expression():
"""Stop the current expression animation completely"""
try:
import expression as exp
print("DEBUG: Stopping previous expression...")
# Try multiple methods to stop the expression
stopped = False
# 1. If the expression module has a stop function, call it
if hasattr(exp, 'stop_expression'):
print("DEBUG: Calling stop_expression function...")
exp.stop_expression()
stopped = True
# 2. If the expression module has a stop_animation function, call it
if hasattr(exp, 'stop_animation'):
print("DEBUG: Calling stop_animation function...")
exp.stop_animation()
stopped = True
# 3. If the expression module has a clear function, call it
if hasattr(exp, 'clear_display'):
print("DEBUG: Calling clear_display function...")
exp.clear_display()
stopped = True
# 4. Force clear the display
if exp.DISPLAY_AVAILABLE and exp.oled:
print("DEBUG: Force clearing OLED display...")
exp.oled.fill(0)
exp.oled.show()
stopped = True
# 5. Reset expression state variables if they exist
if hasattr(exp, 'current_animation'):
print("DEBUG: Resetting animation state...")
exp.current_animation = None
stopped = True
if hasattr(exp, 'animation_running'):
print("DEBUG: Setting animation_running to False...")
exp.animation_running = False
stopped = True
if stopped:
print("DEBUG: Previous expression successfully stopped")
else:
print("DEBUG: No specific stop methods found, display cleared")
# Add a small delay to ensure everything stops
time.sleep(0.2)
except Exception as e:
print(f"Error stopping expression: {e}")
# Try to clear the display as a last resort
try:
import expression as exp
if exp.DISPLAY_AVAILABLE and exp.oled:
exp.oled.fill(0)
exp.oled.show()
except:
pass
def apply_beepi_config(config):
"""Apply BeePI configuration and return if changes were made"""
global current_expression, current_refresh_time, current_time_format, time_only_mode
if not config:
return False
beepi_config = config.get('BeePI', {})
if not beepi_config:
return False
# Get configuration values and ensure proper types
new_expression = beepi_config.get('expression', 'Flat')
new_refresh_time = int(beepi_config.get('refreshTime', 0)) # Force int conversion
new_time_format = beepi_config.get('timeFormat', 'dd-mmm-yyyy')
# Check if anything changed
changes_made = False
# IMPORTANT: Handle refresh time change FIRST
if new_refresh_time != current_refresh_time:
print(f'⏱️ Refresh time changed: {current_refresh_time}s → {new_refresh_time}s')
current_refresh_time = new_refresh_time
changes_made = True
# If switching to time-only mode (refresh_time = 0)
if new_refresh_time == 0:
print("🕐 Entering TIME-ONLY mode - stopping all expressions")
time_only_mode = True
# Stop any running expression immediately
stop_current_expression()
# Clear the current expression so it won't restart
current_expression = None
else:
print(f"🎭 Entering EXPRESSION mode with {new_refresh_time}s refresh")
time_only_mode = False
# Only handle expression changes if NOT in time-only mode
if new_expression != current_expression and not time_only_mode:
print(f'\n📝 Expression changed: {current_expression} → {new_expression}')
# Stop previous expression if any
if current_expression is not None:
print("DEBUG: Stopping previous expression before setting new one...")
stop_current_expression()
time.sleep(0.5)
current_expression = new_expression
changes_made = True
# Apply new expression
try:
print(f"DEBUG: Setting expression to: {new_expression}")
import expression as exp
# Clear display once more before setting new expression
if exp.DISPLAY_AVAILABLE and exp.oled:
exp.oled.fill(0)
exp.oled.show()
# Set the expression
exp.set_expression(new_expression)
print(f"✓ Expression set to: {new_expression}")
except Exception as e:
print(f'❌ Error setting expression: {e}')
# Use sys.print_exception instead of traceback
sys.print_exception(e)
# Handle expression change when IN time-only mode
elif new_expression != current_expression and time_only_mode:
print(f'📝 Expression preference updated to: {new_expression} (will apply when exiting time-only mode)')
current_expression = new_expression
# Don't apply the expression yet, just store it
changes_made = True
if new_time_format != current_time_format:
print(f'📅 Time format changed: {current_time_format} → {new_time_format}')
current_time_format = new_time_format
changes_made = True
return changes_made
def sync_time_from_api():
"""Sync time from worldtimeapi.org"""
try:
print("Syncing time...")
response = urequests.get("http://worldtimeapi.org/api/timezone/Asia/Jakarta", timeout=10)
if response.status_code == 200:
data = response.json()
datetime_str = data['datetime']
response.close()
# Parse datetime: 2024-12-09T15:30:45.123456+07:00
date_part, time_part = datetime_str.split('T')
year, month, day = map(int, date_part.split('-'))
hour, minute, second = map(int, time_part.split('+')[0].split('.')[0].split(':'))
# Set RTC (year, month, day, weekday, hour, minute, second, subsecond)
rtc.datetime((year, month, day, 0, hour, minute, second, 0))
print(f"Time synced: {format_time()}")
return True
else:
response.close()
except Exception as e:
print(f"Time sync failed: {e}")
return False
def main_loop():
"""Main program loop with auto-refresh"""
global current_expression, current_refresh_time, last_time_display, time_only_mode
print('=' * 50)
print('ESP32 HTTP API Client - BeePI')
print('Auto-refresh enabled (30 sec interval)')
print('=' * 50)
# Show initial loading screen
show_loading_screen("init")
time.sleep(1)
# Connect to WiFi
show_loading_screen("wifi")
if not connect_wifi():
print('Failed to connect to WiFi. Exiting.')
show_loading_screen("error")
time.sleep(3)
return
time.sleep(1)
# Sync time
show_loading_screen("time")
sync_time_from_api()
# Initial configuration fetch
show_loading_screen("config")
print('\n🔄 Initial configuration fetch...')
config_data = fetch_config()
if config_data:
configurations = config_data.get('configurations', {})
apply_beepi_config(configurations)
else:
print('Failed to fetch initial configuration')
print('Running with default settings...')
current_expression = 'Flat'
current_refresh_time = 0
time_only_mode = True # Default is time-only
# Clear loading screen before starting main loop
clear_loading_screen()
time.sleep(0.5)
print(f'\n✓ System started')
print(f'Mode: {"TIME-ONLY" if time_only_mode else "EXPRESSION"}')
print(f'Current expression: {current_expression}')
print(f'Refresh time: {current_refresh_time}s')
print(f'Time format: {current_time_format}')
print('\nMonitoring for changes...\n')
last_api_check = time.time()
last_time_display = time.time()
time_display_start = None
# Main loop
loop_count = 0
while True:
try:
loop_count += 1
current_time = time.time()
# Check API every 30 seconds
if current_time - last_api_check >= API_CHECK_INTERVAL:
print(f"\n🔍 Checking API (interval: {API_CHECK_INTERVAL}s)")
config_data = fetch_config(retries=1)
if config_data:
configurations = config_data.get('configurations', {})
changes = apply_beepi_config(configurations)
if changes:
print('✅ Configuration updated!')
# Reset timers after configuration change
last_time_display = current_time
time_display_start = None
last_api_check = current_time
# Handle display based on mode
if time_only_mode:
# TIME-ONLY MODE: Always show time, update every second
if current_time - last_time_display >= 1:
display_time_on_screen()
last_time_display = current_time
else:
# EXPRESSION MODE: Blink time based on refresh_time
if current_refresh_time > 0:
cycle_time = current_time - last_time_display
# Time appears for 3 seconds every refresh_time interval
if cycle_time >= current_refresh_time:
# Start new cycle - show time
display_time_on_screen()
last_time_display = current_time
time_display_start = current_time
elif time_display_start is not None:
# Check if 3 seconds have passed
if current_time - time_display_start >= 3:
# Clear screen and let expression show
try:
import expression as exp
if exp.DISPLAY_AVAILABLE and exp.oled:
exp.oled.fill(0)
exp.oled.show()
except:
pass
time_display_start = None
# Small delay to prevent tight loop
time.sleep(0.1)
except KeyboardInterrupt:
print('\n\n🛑 Program stopped by user')
break
except Exception as e:
print(f'❌ ERROR in main loop: {e}')
sys.print_exception(e)
time.sleep(1)
# Run main program
if __name__ == '__main__':
main_loop()