# IF LUX == NIGHT TIME, ROOF CANNOT OPEN, FAN CANNOT TURN ON
import uasyncio as asyncio
import ntptime
from actuators import actuator_logic
from alerts import high_temp_alert
from goodnight import check_goodnight
from logging import log
from time import sleep, localtime
from screen import screen
from sensors import sensor
from weather import get_weather_data, get_sunrise_hour, get_temperature_at_hour, weather_message
"""Global Constants"""
record_interval = 1
weather_message_interval = 60 * 60 * 24 #24hrs
roof_open = False
fan_on = False
irrigation_on = False
temp_celc_current = None
rh_current = None
temp_celc_outside_current = None
rh_outside_current = None
async def main():
sleep(0.1) # Wait for USB to become ready
"""Set the time"""
#ntptime.settime() # Uncomment this code for real device
global record_interval, weather_message_interval, roof_open, fan_on, irrigation_on
"""weather message calculations"""
current_time = localtime()
current_minutes = current_time[3] * 60 + current_time[4] # hour * 60 + minutes
target_time = 00 #hour in 24hr format RESET THIS VALUE
target_minutes = target_time * 60
minutes_until_target = target_minutes - current_minutes
print("Hello, Pi Pico W!")
screen()
# Set asyncio events to produce relevant signals
csv_complete = asyncio.Event()
actuator_update = asyncio.Event()
temp_alert = asyncio.Event()
goodnight = asyncio.Event()
# Start all tasks concurrently
await asyncio.gather(
sensor_log(record_interval,csv_complete,actuator_update),
# refresh_screen()
cloud_upload(csv_complete, actuator_update),
actuators(actuator_update, temp_alert),
weather_check(weather_message_interval, minutes_until_target),
temperature_alert(temp_alert, goodnight),
goodnight_routine(goodnight)
)
async def sensor_log(record_interval,csv_complete, actuator_update):
"""Upload CSV to the cloud"""
while True:
for i in range(10): # Records 10 data points, uploads to cloud, clears cache and repeats
"""Record sensor values"""
temp_celc, rh, temp_celc_outside, rh_outside, lux = sensor()
"""Log values"""
log(temp_celc, rh, temp_celc_outside, rh_outside, lux)
"""Wait until next interval"""
await asyncio.sleep(record_interval)
csv_complete.set() # Signal to cloud_upload
async def refresh_screen(temp_celc, rh, lux):
pass
async def cloud_upload(csv_complete, actuator_update): # Button press switches screens between inside current, inside history, outside current, outside history
"""Upload CSV data to the cloud"""
while True:
await csv_complete.wait() # Wait for signal from sense_log
print("Uploading CSV to cloud...")
# TODO: Add your cloud upload code here
csv_complete.clear() # Reset the signal
actuator_update.set() # Signal to actuators
async def actuators(actuator_update, temp_alert):
global temp_celc_current, rh_current, temp_celc_outside_current, rh_outside_current, roof_open, fan_on
# Target Setpoints
temp_setpoint_low = 15
temp_setpoint_high = 25
rh_setpoint_low = 40
rh_setpoint_high = 70
# Previous State Memory
prev_temp = None
prev_rh = None
prev_roof = 0
last_change_time = None
hold_time = 1 # 10 minutes
# Set Current Actuator States
roof_open = 0 # 0 to 100%. Steps of 25%
fan_on = False
heat_pad_on = False
while True:
await actuator_update.wait() # Signal from cloud upload
actuator_update.clear()
temp_celc_current, rh_current, temp_celc_outside_current, rh_outside_current, lux_current = sensor()
prev_temp, prev_rh, prev_roof, last_change_time, roof_open, fan_on, heat_pad_on,temp_celc_current, rh_current = actuator_logic(temp_setpoint_low, temp_setpoint_high, rh_setpoint_low, rh_setpoint_high, prev_temp, prev_rh, prev_roof, last_change_time, roof_open, fan_on, heat_pad_on,temp_celc_current, rh_current)
print(f"roof open: {roof_open}, fan on: {fan_on}, heat pad on: {heat_pad_on}")
temp_alert.set() # Signal to temperature_alert
await asyncio.sleep(hold_time)
async def weather_check(weather_message_interval, minutes_until_target):
# Wait until target time
if minutes_until_target > 0:
print(f"Waiting {minutes_until_target} minutes until target time...")
await asyncio.sleep(minutes_until_target * 60)
# Enter the regular weather checking loop
while True:
data = get_weather_data()
if data:
sunrise_hour = get_sunrise_hour(data)
temp_at_sunrise = get_temperature_at_hour(data, sunrise_hour)
if temp_at_sunrise is not None:
print(f"Temperature at sunrise ({sunrise_hour}:00) is {temp_at_sunrise}°C")
else:
print("Could not find temperature at sunrise hour")
message_temp_threshold = 12
weather_comms = weather_message(message_temp_threshold,temp_at_sunrise)
await asyncio.sleep(weather_message_interval)
async def temperature_alert(temp_alert, goodnight):
while True:
await temp_alert.wait() # Signal from weather check
temp_alert.clear()
global temp_celc_current, temp_celc_outside_current, roof_open, fan_on
high_temp_alert(temp_celc_current, temp_celc_outside_current, roof_open, fan_on)
goodnight.set() # Signal to goodnight
async def goodnight_routine(goodnight):
global roof_open, fan_on
while True:
await goodnight.wait() # Signal from temperature_alert
goodnight.clear()
prev_roof = roof_open
prev_fan = fan_on
is_goodnight = check_goodnight()
if is_goodnight:
roof_open = 0
# ACTUATE HERE
fan_on = False
# ACTUATE HERE
asyncio.run(main())
## Once data is being recorded _> Cloud Infrastructure:
# AWS IoT Core (optional but robust for secure device messaging via MQTT or HTTPS)
# AWS Lambda (process/transform data)
# DynamoDB (store time-series data in NoSQL format)
# Amazon S3 (for long-term backup or CSV export)Inside Temperature Sensor
Outside Temperature Sensor
Light Sensor