"""
MicroPython IoT REST Server for Wokwi.com
Copyright (C) 2024, Dr.Druck
"""
import network
from machine import Pin
import usocket as socket
import ujson as json
import ubinascii
import time
import ntptime
print("Connecting to WiFi", end="")
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('Wokwi-GUEST', '')
while not sta_if.isconnected():
print(".", end="")
time.sleep(0.1)
print(" Connected!")
print('network config:', sta_if.ifconfig())
# Sample user and password for base authentication
USERNAME = "user1"
PASSWORD = "password"
def timeStamp():
t = time.gmtime()
tt = '{:04d}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}'.format(t[0], t[1], t[2], t[3], t[4], t[5])
return tt
def parse_query_string(query_string):
params = {}
pairs = query_string.split('&')
for pair in pairs:
key, value = pair.split('=')
params[key] = value
return params
def _send_response(client, code, message):
response = "HTTP/1.1 {} OK\r\nContent-type: application/json\r\n\r\n{}".format(code, json.dumps(message))
client.write(response.encode('utf-8'))
def _authenticate(headers):
auth_header = [header for header in headers if 'Authorization' in header]
print(auth_header)
if auth_header:
# Check for Basic Authentication
_, _, encoded_credentials = auth_header[0].split(' ')
print(encoded_credentials)
decoded_credentials = ubinascii.a2b_base64(encoded_credentials).decode("utf-8")
username, password = decoded_credentials.split(":")
print(username, password)
return str(username) == USERNAME and str(password) == PASSWORD
return False
def handle_request(client, request):
method, path, _ = request.split('\r\n')[0].split(' ')
print("Received {} request for {}".format(method, path))
_path,_params = path.split('?')
headers_end = request.find("\r\n\r\n")
headers = request[:headers_end].split("\r\n")[1:]
# Check for authentication before processing the request
if not _authenticate(headers):
error_message = {'error': 'Authentication failed'}
print(error_message)
_send_response(client, 401, error_message)
return
print(_path,_params)
if _path == '/api':
# Your API logic here
# For example, assuming you expect 'param1' and 'param2' with clauses
print(_params)
if 'param1' in _params and 'param2' in _params:
print(2)
print('....')
query_params = parse_query_string(_params)
print(query_params)
param1_value = query_params.get('param1', None)
param2_value = query_params.get('param2', None)
# Your processing logic here to get the current values for param1 and param2
value1, value2 = get_current_values(param1_value, param2_value)
response_message = {'value1': value1, 'value2': value2, 'localtime': timeStamp() }
print(response_message)
_send_response(client, 200, response_message)
else:
error_message = {'error': 'Missing parameters'}
_send_response(client, 400, error_message)
else:
error_message = {'error': 'Invalid endpoint'}
_send_response(client, 404, error_message)
# Sample functiom
def get_current_values(param1, param2):
# Here implement your own function
return 72, 35
def run(port=8599):
addr = socket.getaddrinfo('0.0.0.0', port)[0][-1]
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(addr)
s.listen(5)
print("Starting server on port {}".format(port))
while True:
client, addr = s.accept()
request = client.recv(4096)
request = request.decode('utf-8')
handle_request(client, request)
client.close()
if __name__ == '__main__':
run()