import network
import time
import ubinascii
import machine
try:
import urequests as requests
except ImportError:
# Some firmware builds don't bundle urequests
raise SystemExit("Missing urequests. Add urequests.py to your device or use a firmware that includes it.")
try:
import ntptime
except ImportError:
ntptime = None
# ─────────────────────────────────────────────────────────────────────────────
# CONFIG
# ─────────────────────────────────────────────────────────────────────────────
WIFI_SSID = "YOUR_WIFI_SSID"
WIFI_PASSWORD = "YOUR_WIFI_PASSWORD"
WU_STATION_ID = "KXXXXXXXX" # Your PWS Station ID
WU_STATION_KEY = "YYYYYYYYYYYY" # Your Station Key (case-sensitive)
WU_ENDPOINT = "https://weatherstation.wunderground.com/weatherstation/updateweatherstation.php" # :contentReference[oaicite:1]{index=1}
UPLOAD_INTERVAL_S = 300 # 5 minutes
# ─────────────────────────────────────────────────────────────────────────────
# WIFI
# ─────────────────────────────────────────────────────────────────────────────
def wifi_connect(ssid: str, password: str, timeout_s: int = 20) -> None:
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if wlan.isconnected():
return
wlan.connect(ssid, password)
t0 = time.time()
while not wlan.isconnected():
if time.time() - t0 > timeout_s:
raise RuntimeError("WiFi connection timed out")
time.sleep(0.2)
# Helpful for debugging DNS issues
print("WiFi OK:", wlan.ifconfig())
# ─────────────────────────────────────────────────────────────────────────────
# TIME (UTC)
# ─────────────────────────────────────────────────────────────────────────────
def sync_time_utc() -> None:
# WU accepts dateutc=now, but having correct time helps logs/debug.
if ntptime is None:
return
try:
ntptime.settime() # sets RTC to UTC
print("NTP time synced")
except Exception as e:
print("NTP sync failed:", e)
# ─────────────────────────────────────────────────────────────────────────────
# SENSOR READING (replace with your real sensor code)
# Return:
# temp_c (float), rh_percent (float), pressure_hpa (float)
# ─────────────────────────────────────────────────────────────────────────────
def read_weather_sensors():
# TODO: replace with e.g. BME280/SHT31/DS18B20 etc.
# Example dummy values:
temp_c = 21.6
rh = 48.3
pressure_hpa = 1013.2
return temp_c, rh, pressure_hpa
# ─────────────────────────────────────────────────────────────────────────────
# UNIT CONVERSIONS REQUIRED BY WU FIELDS (commonly in US units)
# tempf (°F), baromin (inHg)
# ─────────────────────────────────────────────────────────────────────────────
def c_to_f(c: float) -> float:
return (c * 9.0 / 5.0) + 32.0
def hpa_to_inhg(hpa: float) -> float:
return hpa * 0.0295299830714
# ─────────────────────────────────────────────────────────────────────────────
# WU UPLOAD
# ─────────────────────────────────────────────────────────────────────────────
def wu_upload(temp_c: float, rh: float, pressure_hpa: float) -> str:
# WU PWS Upload Protocol:
# Required: ID, PASSWORD, dateutc, action=updateraw :contentReference[oaicite:2]{index=2}
# Common fields: tempf, humidity, baromin ...
params = {
"ID": WU_STATION_ID,
"PASSWORD": WU_STATION_KEY,
"dateutc": "now",
"action": "updateraw",
# Measurements:
"tempf": "{:.1f}".format(c_to_f(temp_c)),
"humidity": "{:.0f}".format(rh),
"baromin": "{:.2f}".format(hpa_to_inhg(pressure_hpa)),
# Optional but nice:
"softwaretype": "micropython-esp32"
}
# GET request with params
# (WU expects URL query parameters) :contentReference[oaicite:3]{index=3}
r = None
try:
r = requests.get(WU_ENDPOINT, params=params, timeout=10)
text = r.text.strip()
return text # typically "success" or an error string
finally:
if r is not None:
r.close()
# ─────────────────────────────────────────────────────────────────────────────
# MAIN LOOP
# ─────────────────────────────────────────────────────────────────────────────
def main():
wifi_connect(WIFI_SSID, WIFI_PASSWORD)
sync_time_utc()
# Optional: print a device id to help track which unit is sending data
mac = ubinascii.hexlify(network.WLAN(network.STA_IF).config("mac"), b":").decode()
print("Device MAC:", mac)
while True:
try:
temp_c, rh, pressure_hpa = read_weather_sensors()
resp = wu_upload(temp_c, rh, pressure_hpa)
print("Uploaded:",
"T(C)=", temp_c,
"RH(%)=", rh,
"P(hPa)=", pressure_hpa,
"WU resp=", resp)
# WU usually returns "success" when it ingests the obs :contentReference[oaicite:4]{index=4}
except Exception as e:
print("Upload error:", e)
time.sleep(UPLOAD_INTERVAL_S)
if __name__ == "__main__":
main()