"""
MicroPython IoT Weather Station Example for Wokwi.com
To view the data:
1. Go to http://www.hivemq.com/demos/websocket-client/
2. Click "Connect"
3. Under Subscriptions, click "Add New Topic Subscription"
4. In the Topic field, type "wokwi-weather" then click "Subscribe"
Now click on the DHT22 sensor in the simulation,
change the temperature/humidity, and you should see
the message appear on the MQTT Broker, in the "Messages" pane.
Copyright (C) 2022, Uri Shaked
https://wokwi.com/arduino/projects/322577683855704658
"""
import os
import network
import time
from machine import Pin, reset
import dht
import ujson
from umqtt.simple import MQTTClient
import ntptime
from utime import localtime
from Suntime import Sun
# MQTT Server Parameters
MQTT_CLIENT_ID = "micropython-weather-demo"
MQTT_BROKER = "broker.mqttdashboard.com"
MQTT_USER = ""
MQTT_PASSWORD = ""
MQTT_TOPIC = "wokwi-weather"
# ******************
def failsafe_dump(message: str):
msg = str(localtime()) + ": " + str(message) + "\n"
try:
with open("./error.log.txt", "a+") as fh:
fh.write(msg)
except Exception as ex:
print(str(ex).encode())
def failsafe_read() -> str:
try:
with open("./error.log.txt", "r") as fh:
content = fh.read()
os.remove("./error.log.txt")
return content
except Exception as ex:
print(str(ex).encode())
return None
# ******************
sensor = dht.DHT22(Pin(15))
print("Connecting to WiFi", end="")
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('Wokwi-GUEST', '')
while not sta_if.isconnected():
print(".", end="")
time.sleep(0.1)
print(" Connected!")
print("Connecting to MQTT server... ", end="")
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, user=MQTT_USER, password=MQTT_PASSWORD)
client.connect()
print("Connected!")
ntptime.host="de.pool.ntp.org"
ntptime.settime()
print("NTP localtime: ", localtime())
sun = Sun(46.86713366376123, 14.477590245083029, 0)
# *** VOR BEGIN DER SCHLEIFE, WENN INHALT DA, PUBLISH
content = failsafe_read()
if content:
print(f"mqtt publish: {content}")
# ************
while True:
try:
print("NTP daylight:", sun.daylight(), sta_if.isconnected())
time.sleep(5)
# zum test - exception werfen
raise AssertionError("jetzt ist schon wieder was passiert")
except Exception as ex:
# exception ins file schreiben und reset, beim näcshte start siehst den inhalt
failsafe_dump(str(ex).encode())
reset()