import network
import time
import urequests as requests
from machine import SoftI2C, Pin, reset
import ssd1306
import json
import usocket as socket
# File path to store preferences and data
CONFIG_FILE = 'config.json'
# Default configuration
config = {
"WIFI_SSID": 'Wokwi-GUEST',
"WIFI_PASSWORD": '',
"AUTH0_DOMAIN": 'greenhousemon.eu.auth0.com',
"AUTH0_CLIENT_ID": 'VmalPmJs55oiyQ8fF9FB628ZiNAqRZTs',
"AUTH0_AUDIENCE": 'https://greenhousemon.eu.auth0.com/api/v2/',
"ACCESS_TOKEN": None
}
# OLED display settings
i2c = SoftI2C(scl=Pin(22), sda=Pin(21))
oled_width = 128
oled_height = 64
oled = ssd1306.SSD1306_I2C(oled_width, oled_height, i2c)
def load_config():
global config
try:
with open(CONFIG_FILE, 'r') as f:
config = json.load(f)
print('Configuration loaded:', config)
except Exception as e:
print('Could not load config, using defaults:', e)
def save_config():
try:
with open(CONFIG_FILE, 'w') as f:
json.dump(config, f)
print('Configuration saved:', config)
except Exception as e:
print('Could not save config:', e)
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(config['WIFI_SSID'], config['WIFI_PASSWORD'])
timeout = 10
while not wlan.isconnected() and timeout > 0:
print('Connecting to WiFi...')
time.sleep(1)
timeout -= 1
return wlan.isconnected()
def display_code(user_code):
oled.fill(0)
oled.text('User Code:', 0, 0)
oled.text(user_code, 0, 16)
oled.show()
def get_device_code():
url = f'https://{config["AUTH0_DOMAIN"]}/oauth/device/code'
headers = {'content-type': 'application/x-www-form-urlencoded'}
data = f'client_id={config["AUTH0_CLIENT_ID"]}&scope=openid profile offline_access&audience={config["AUTH0_AUDIENCE"]}'
try:
response = requests.post(url, data=data, headers=headers)
response_json = response.json()
return response_json['device_code'], response_json['user_code'], response_json['verification_uri_complete'], response_json['interval']
except Exception as e:
print('Failed to get device code:', e)
return None, None, None, None
def poll_for_token(device_code, interval):
url = f'https://{config["AUTH0_DOMAIN"]}/oauth/token'
headers = {'content-type': 'application/x-www-form-urlencoded'}
data = f'grant_type=urn:ietf:params:oauth:grant-type:device_code&device_code={device_code}&client_id={config["AUTH0_CLIENT_ID"]}'
while True:
try:
response = requests.post(url, data=data, headers=headers)
if response.status_code == 200:
response_json = response.json()
return response_json['access_token']
elif response.status_code == 403:
error_response = response.json()
if error_response.get('error') == 'authorization_pending':
print('Authorization pending...')
elif error_response.get('error') == 'slow_down':
print('Slow down...')
interval += 5 # Adjusting interval to slow down
elif error_response.get('error') == 'expired_token':
print('Device code expired. Please restart the process.')
return None
elif error_response.get('error') == 'access_denied':
print('Access denied. Please restart the process.')
return None
time.sleep(interval)
except Exception as e:
print('Failed to poll token:', e)
return None
def start_ap_mode():
wlan_ap = network.WLAN(network.AP_IF)
wlan_ap.active(True)
wlan_ap.config(essid='ESP32-AP', authmode=network.AUTH_WPA_WPA2_PSK, password='12345678')
print('AP mode started, SSID: ESP32-AP')
def captive_portal():
# Set up a basic web server for WiFi configuration
addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1]
s = socket.socket()
s.bind(addr)
s.listen(1)
print('Listening on', addr)
while True:
cl, addr = s.accept()
print('Client connected from', addr)
request = cl.recv(1024)
request = str(request)
if 'wifi_ssid' in request and 'wifi_password' in request:
ssid = request.split('wifi_ssid=')[1].split('&')[0]
password = request.split('wifi_password=')[1].split(' ')[0]
ssid = ssid.replace('%20', ' ')
password = password.replace('%20', ' ')
config['WIFI_SSID'] = ssid
config['WIFI_PASSWORD'] = password
save_config()
cl.send('HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n')
cl.send('<html><body><h1>WiFi Credentials Saved! Please restart the device.</h1></body></html>')
cl.close()
time.sleep(3)
reset()
response = """
<html>
<body>
<form action="/" method="POST">
<label for="ssid">SSID:</label><br>
<input type="text" id="ssid" name="wifi_ssid"><br>
<label for="password">Password:</label><br>
<input type="text" id="password" name="wifi_password"><br><br>
<input type="submit" value="Submit">
</form>
</body>
</html>
"""
cl.send('HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n' + response)
cl.close()
def check_access_token():
if config["ACCESS_TOKEN"]:
url = 'https://your_api_endpoint' # Replace with your API endpoint
headers = {
'Authorization': f'Bearer {config["ACCESS_TOKEN"]}'
}
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
print('Access token is valid')
return True
else:
print('Access token is invalid')
config["ACCESS_TOKEN"] = None
save_config()
return False
except Exception as e:
print('Failed to validate access token:', e)
return False
return False
def device_auth_flow():
device_code, user_code, verification_uri_complete, interval = get_device_code()
if device_code and user_code and verification_uri_complete and interval:
print(f'User Code: {user_code}')
print(f'Verification URI: {verification_uri_complete}')
display_code(user_code)
print(f'Please visit: {verification_uri_complete} to authorize the device.')
access_token = poll_for_token(device_code, interval)
if access_token:
print(f'Access Token: {access_token}')
config["ACCESS_TOKEN"] = access_token
save_config()
return True
else:
print('Failed to get access token.')
else:
print('Failed to initiate device authorization flow.')
return False
def main():
load_config()
if not config['WIFI_SSID']:
print('WiFi configuration not set. Starting AP mode.')
start_ap_mode()
captive_portal()
if connect_wifi():
if not check_access_token():
if not device_auth_flow():
print('Device auth flow failed.')
return
print('Starting main loop...')
while True:
# Main loop code here
time.sleep(10)
else:
print('Failed to connect to WiFi. Starting AP mode.')
start_ap_mode()
captive_portal()
if __name__ == '__main__':
main()