import network
import time
print("Connecting to WiFi", end="")
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('Wokwi-GUEST', '')
while not sta_if.isconnected():
print(".", end="")
time.sleep(0.1)
print(" Connected!")
# -*- coding: utf-8 -*-
#
# Copyright 2024 Kevin Lindemark
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
"""
links and resources used:
https://thingsboard.io/docs/user-guide/rpc/
https://github.com/manuargue/thingsboard-micropython
https://www.youtube.com/watch?v=EWJAZA_44ro
"""
from client import TBDeviceMqttClient
from time import sleep
from random import randint, uniform, choice
from machine import Pin, reset
from neopixel import NeoPixel
import gc
import secrets
def handler(req_id, method, params):
"""handler callback to recieve RPC from server """
# handler signature is callback(req_id, method, params)
print('Response {id}: {method}, params {params})'.format(id=req_id, method=method, params=params))
print(params, "params type:", type(params))
# NEOPIXEL COLOR CONTROL WITH SLIDERS FROM DASHBOARD
try:
if method == "red_RGB":
neopixel_colors.update({"red" : int(params)})
update_pixels()
if method == "green_RGB":
neopixel_colors.update({"green" : int(params)})
update_pixels()
if method == "blue_RGB":
neopixel_colors.update({"blue" : int(params)})
update_pixels()
except TypeError as e:
print(e)
# See examples for more authentication options
client = TBDeviceMqttClient(secrets.SERVER_IP_ADDRESS, access_token = secrets.ACCESS_TOKEN)
# Connecting to ThingsBoard
client.connect()
print("connected to thingsboard, starting to send and receive data")
while True:
try:
print(f"free memory: {gc.mem_free()}")
# monitor and free memory
if gc.mem_free() < 2000:
print("Garbage collected!")
gc.collect()
# GPS data needs to be string og list
latitude = f"{uniform(54, 56):.5f}"
longitude = f"{uniform(8, 12):.5f}"
# Sending telemetry
telemetry = {'temperature': randint(10,33), 'latitude': str(latitude), 'longitude': str(longitude),
'speed': randint(0, 85)}
client.send_telemetry(telemetry)
client.set_server_side_rpc_request_handler(handler) #callback to get server RPC requests
# Checking for incoming subscriptions or RPC call requests (non-blocking)
client.check_msg()
print(".",end="")
sleep(3)
except KeyboardInterrupt:
print("Disconnected!")
# Disconnecting from ThingsBoard
client.disconnect()
reset()