import network
import time
from machine import Pin, ADC, SoftI2C
from utime import sleep, localtime, mktime
from lcd_api import LcdApi
from i2c_lcd import I2cLcd
import urequests as requests
import ujson as json
# Function to connect to Wi-Fi
def connect_to_wifi():
print("Connecting to WiFi", end="")
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('Wokwi-GUEST', '')
while not sta_if.isconnected():
print(".", end="")
time.sleep(0.1)
print(" Connected!")
# Wi-Fi connection
connect_to_wifi()
# Initialize I2C for the LCD
i2c = SoftI2C(scl=Pin(22), sda=Pin(21), freq=400000)
i2c_addresses = i2c.scan()
print("I2C addresses found:", i2c_addresses)
if not i2c_addresses:
raise Exception("No I2C devices found")
lcd_address = i2c_addresses[0]
lcd = I2cLcd(i2c, lcd_address, 2, 16)
# Potentiometer
potentiometer = ADC(Pin(34))
potentiometer.atten(ADC.ATTN_11DB)
# Button
button = Pin(25, Pin.IN, Pin.PULL_UP)
# State variable
verification_done = False
# Function to read potentiometer and display value on LCD
def display_potentiometer_value():
global verification_done, set_hours
while not verification_done:
if button.value() == 0: # Button pressed
pot_value = potentiometer.read()
set_hours = int((pot_value / 4095.0) * 12)
lcd.clear()
lcd.putstr("Hours set to {:.0f}H".format(set_hours))
sleep(3) # Display for 3 seconds
verification_done = True
else:
pot_value = potentiometer.read()
lcd.clear()
lcd.putstr("Set hours {:.0f}H".format((pot_value / 4095.0) * 12)) # Lets you choose the hours with the potentiometer
sleep(0.1)
# Call the function to start displaying the potentiometer value
display_potentiometer_value()
# Stop refreshing LCD and end program after verification
lcd.clear()
print("Verification complete. Hours set to {:.0f}H".format(set_hours))
# Function to fetch the next day
def get_next_day():
current_time = localtime()
seconds_in_a_day = 24 * 60 * 60
next_day_time = mktime(current_time) + seconds_in_a_day
next_day_tuple = localtime(next_day_time)
next_day = (next_day_tuple[0], next_day_tuple[1], next_day_tuple[2])
return next_day
# Function to get current spot price
def get_current_spot_price():
try:
response = requests.get("https://www.nordpoolgroup.com/api/marketdata/page/10?currency=EUR")
if response.status_code == 200:
data = response.json()
prices = data['data']['Rows'][0]['Columns'][0]['Value']
return float(prices.replace(',', '.'))
else:
print("Failed to fetch current spot price. Status code:", response.status_code)
return None
except Exception as e:
print("An error occurred while fetching current spot price:", e)
return None
# Function to get cheapest spot prices
def get_cheapest_spot_prices(num_cheapest, next_day_only=False):
try:
response = requests.get("https://www.nordpoolgroup.com/api/marketdata/page/10?currency=EUR")
if response.status_code == 200:
data = response.json()
spot_prices = data['data']['Rows']
cheapest_data = {}
for period in spot_prices:
date_str = period['StartTime'][:10]
date = tuple(map(int, date_str.split('-')))
if next_day_only and date != get_next_day():
continue
start_time_str = period['StartTime'][11:16]
end_time_str = period['EndTime'][11:16]
value = float(period['Columns'][0]['Value'].replace(',', '.'))
start_time = tuple(map(int, start_time_str.split(':')))
end_time = tuple(map(int, end_time_str.split(':')))
if date not in cheapest_data:
cheapest_data[date] = []
cheapest_data[date].append({'value': value, 'start': start_time, 'end': end_time})
for date, data in cheapest_data.items():
cheapest_data[date] = sorted(data, key=lambda x: x['value'])[:num_cheapest]
return cheapest_data
else:
print("Failed to fetch cheapest spot prices. Status code:", response.status_code)
return None
except Exception as e:
print("An error occurred:", e)
return None
# Function to print cheapest spot prices
def print_cheapest_spot_prices(cheapest_data, current_spot_price):
if cheapest_data is None:
print("Failed to fetch spot prices. Please check your internet connection or try again later.")
return
current_time = localtime() # Get current UTC time
current_time_adjusted = (current_time[0], current_time[1], current_time[2], current_time[3] + 2, current_time[4], current_time[5], current_time[6], current_time[7]) # Adjust time by adding 2 hours
current_date = (current_time_adjusted[0], current_time_adjusted[1], current_time_adjusted[2])
current_time_in_cheapest_period = False
print("Request made {}/{}/{} {}:{}:{}".format(current_time_adjusted[0], current_time_adjusted[1], current_time_adjusted[2], current_time_adjusted[3], current_time_adjusted[4], current_time_adjusted[5]))
print("Current spot price when the request was made: {} EUR/MWh".format(current_spot_price))
next_day = get_next_day()
if next_day in cheapest_data:
print("\nCheapest spot prices for {}/{}/{}:".format(next_day[0], next_day[1], next_day[2]))
# Sort the list by time within each day, but keep the original cheapest ranking
data_sorted_by_time = sorted(cheapest_data[next_day], key=lambda x: x['start'])
for idx, entry in enumerate(data_sorted_by_time, start=1):
print("{idx}. Price: {value} EUR/MWh, Period: from {start[0]}:{start[1]} to {end[0]}:{end[1]}".format(idx=idx, value=entry['value'], start=entry['start'], end=entry['end']))
start_time = entry['start']
end_time = entry['end']
if start_time <= (current_time_adjusted[3], current_time_adjusted[4]) <= end_time:
print(" - Current time is within this period and has the cheapest spot price!")
current_time_in_cheapest_period = True
else:
print("No data available for {}/{}/{}.".format(next_day[0], next_day[1], next_day[2]))
if not current_time_in_cheapest_period:
print("\nCurrent time is not within any cheapest spot price period for today.")
# Get the current spot price
current_spot_price = get_current_spot_price()
# Get the cheapest spot prices
cheapest_data = get_cheapest_spot_prices(set_hours, next_day_only=True)
# Print the cheapest spot prices
print_cheapest_spot_prices(cheapest_data, current_spot_price)
# Display the cheapest spot prices on the LCD
def display_cheapest_prices_on_lcd(cheapest_data, current_spot_price):
lcd.clear()
lcd.putstr("Cur: {} EUR/MWh".format(current_spot_price))
next_day = get_next_day()
if next_day in cheapest_data:
data_sorted_by_time = sorted(cheapest_data[next_day], key=lambda x: x['start'])
for entry in data_sorted_by_time:
lcd.clear()
lcd.putstr("{} to {}\n{} EUR/MWh".format("{:02d}:{:02d}".format(entry['start'][0], entry['start'][1]), "{:02d}:{:02d}".format(entry['end'][0], entry['end'][1]), entry['value']))
sleep(2) # Display each entry for 2 seconds
display_cheapest_prices_on_lcd(cheapest_data, current_spot_price)