from machine import Pin, ADC
import network
import time
import ujson
import usocket as socket
import ustruct as struct
from ubinascii import b2a_base64
class MQTTException(Exception):
pass
class MQTTClient:
def __init__(self, client_id, server, port=1883, user=None, password=None, keepalive=0, ssl=False, ssl_params={}):
self.client_id = client_id
self.server = server
self.port = port
self.sock = None
self.user = user
self.pswd = password
self.keepalive = keepalive
self.ssl = ssl
self.ssl_params = ssl_params
self.cb = None
self.rcv_pids = set()
def _send_str(self, s):
self.sock.write(struct.pack("!H", len(s)))
self.sock.write(s)
def set_callback(self, f):
self.cb = f
def connect(self, clean_session=True):
addr = socket.getaddrinfo(self.server, self.port)[0][-1]
self.sock = socket.socket()
self.sock.connect(addr)
if self.ssl:
import ussl
self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
msg = bytearray(b"\x10\0\0\0")
msg[1] = 10 + 2 + len(self.client_id)
if self.user:
msg[1] += 2 + len(self.user) + 2 + len(self.pswd)
self.sock.write(msg)
self.sock.write(b"\x00\x04MQTT\x04\x02")
self.sock.write(struct.pack("!H", self.keepalive))
self._send_str(self.client_id)
if self.user:
self._send_str(self.user)
self._send_str(self.pswd)
resp = self.sock.read(4)
if resp is None or resp[3] != 0:
raise MQTTException("Connection refused")
def disconnect(self):
self.sock.write(b"\xe0\0")
self.sock.close()
def publish(self, topic, msg, retain=False, qos=0):
pkt = bytearray(b"\x30\0\0\0")
pkt[0] |= qos << 1 | retain
size = 2 + len(topic) + len(msg)
pkt[1] = size
self.sock.write(pkt)
self._send_str(topic)
self.sock.write(msg)
def subscribe(self, topic, qos=0):
pkt = bytearray(b"\x82\0\0\0")
pkt[2] = 1
pkt[3] = 0
self.sock.write(pkt)
self._send_str(topic)
self.sock.write(bytes([qos]))
def check_msg(self):
self.sock.setblocking(False)
try:
return self.wait_msg()
except OSError:
return None
def wait_msg(self):
self.sock.setblocking(True)
res = self.sock.read(1)
if res is None:
return None
if res == b"\xd0":
self.sock.read(1)
return None
op = res[0]
if op & 0xf0 != 0x30:
raise MQTTException("Unknown message type")
rem = self.sock.read(1)[0]
topic_len = struct.unpack("!H", self.sock.read(2))[0]
topic = self.sock.read(topic_len)
msg = self.sock.read(rem - topic_len - 2)
if self.cb:
self.cb(topic, msg)
return topic, msg
import dht
import onewire
import ds18x20
# ==========================
# WIFI
# ==========================
SSID = "Wokwi-GUEST"
PASSWORD = ""
# ==========================
# THINGSBOARD
# ==========================
TB_HOST = "thingsboard.cloud"
ACCESS_TOKEN = "z62dUhQEVlLZZ0Cl9plh"
# ==========================
# DHT22
# ==========================
dht_sensor = dht.DHT22(Pin(15))
# ==========================
# DS18B20
# ==========================
ow = onewire.OneWire(Pin(16))
ds = ds18x20.DS18X20(ow)
roms = ds.scan()
# ==========================
# ADC
# ==========================
current_adc = ADC(26)
voltage_adc = ADC(27)
# ==========================
# WIFI
# ==========================
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
print("Conectare WiFi...")
wifi.connect(SSID, PASSWORD)
while not wifi.isconnected():
time.sleep(1)
print("WiFi conectat")
# ==========================
# MQTT
# ==========================
client = MQTTClient(
"PicoW",
TB_HOST,
user=ACCESS_TOKEN
)
client.connect()
print("MQTT conectat")
# ==========================
# LOOP
# ==========================
while True:
dht_sensor.measure()
temp_air = dht_sensor.temperature()
humidity = dht_sensor.humidity()
ds.convert_temp()
time.sleep_ms(750)
temp_box = ds.read_temp(roms[0])
current_raw = current_adc.read_u16()
voltage_raw = voltage_adc.read_u16()
current = round(current_raw / 65535 * 100, 2)
voltage = round(voltage_raw / 65535 * 250, 2)
power = round(voltage * current, 2)
payload = {
"temperature_air": temp_air,
"temperature_box": temp_box,
"humidity": humidity,
"current": current,
"voltage": voltage,
"power": power
}
client.publish(
"v1/devices/me/telemetry",
ujson.dumps(payload)
)
print(payload)
time.sleep(10)