# MicroController Remote Server (runs on the µcontroller)
import asyncio, sys, uos, time, network, json, binascii, io
from machine import Pin
WLAN = "" # Connects to one of the WLAN defined in 'ucuq.json'.
# WLAN = "<name>" # Connects to the WLAN <name> as defined in 'ucuq.json'.
# WLAN = ("<ssid>","<key>") # Connects to WLAN <ssid> using <key>.
with open("ucuq.json", "r") as config:
CONFIG = json.load(config)
K_IDENTIFICATION = "Identification"
K_ONBOARD_LED = "OnBoardLed"
K_PROXY = "Proxy"
K_WIFI_POWER = "WifiPower"
DEFAULT_ONBOARD_LED = (None, True)
DEFAULT_PROXY = ("ucuq.q37.info", 53800, False)
DEFAULT_WIFI_POWER = [None]
getConfig = lambda key: CONFIG[key] if key in CONFIG else None
CONFIG_IDENTIFICATION = CONFIG[K_IDENTIFICATION]
CONFIG_ONBOARD_LED = getConfig(K_ONBOARD_LED)
CONFIG_PROXY = getConfig(K_PROXY)
CONFIG_WIFI_POWER = getConfig(K_WIFI_POWER)
WLAN_FALLBACK = "q37"
PROTOCOL_LABEL = "c37cc83e-079f-448a-9541-5c63ce00d960"
PROTOCOL_VERSION = "0"
# Connection status.
S_FAILURE = 0
S_SEARCHING = 1 # Search an available WLAN.
S_WLAN = 2 # Connecting to WLAN. There is a delay of 0.5 second between two calls.
S_UCUQ = 3 # Connecting to UCUq server.
S_SUCCESS = 4
S_DECONNECTION = 5
# Request
R_PING = 0 # Deprecated!
R_EXECUTE = 1
# Answer; must match in device.h: device::eAnswer.
A_RESULT = 0
A_SENSOR = 1
A_ERROR = 2
A_PUZZLED = 3
A_DISCONNECTED = 4
P_READER = 0
P_WRITER = 1
wifi = None
buffer = bytes()
def getMacAddress():
global wifi
if not wifi:
wifi = network.WLAN(network.STA_IF)
wifi.active(True) # Otherwise the MAC address is NUL.
return binascii.hexlify(network.WLAN(network.STA_IF).config('mac')).decode()
# NOTA: also used in the script for 'getInfos()'…
def getIdentificationId(identification):
if isinstance(identification[1], str):
return identification[1]
else:
mac = getMacAddress()
if mac in identification[1]:
return identification[1][mac]
else:
raise Exception("Unable to get an id for this device.")
WLANS = CONFIG["WLAN"]
def wlanIsShortcut(wlan):
if isinstance(wlan, str):
return True
elif isinstance(wlan, (list, tuple)) and len(wlan) == 2:
return False
else:
raise TypeError("'wlan' parameter can only be a string (shortcut), a list or a tuple of 2 strings (SSID and key)")
def wlanGetKnownStation(wifi, callback):
known = ""
tries = 0
wifi.active(True)
while not known:
if not callback(S_SEARCHING, tries):
callback(S_FAILURE, 0)
exit()
for station in wifi.scan():
if known:
break
for name in WLANS:
if known:
break
if station[0].decode("utf-8") == WLANS[name][0]:
known = name
tries += 1
time.sleep(0.5)
return known
def wlanDisconnect():
wifi = network.WLAN(network.STA_IF)
wifi.disconnect()
while wifi.status() != network.STAT_IDLE:
pass
def wlanConnect(wlan, callback):
global wifi
wifi = network.WLAN(network.STA_IF)
if not wifi.isconnected():
if wlanIsShortcut(wlan):
if wlan == "":
wlan = WLANS[wlanGetKnownStation(wifi, callback)]
else:
try:
wlan = WLANS[wlan]
except KeyError:
wlan = WLANS[WLAN_FALLBACK]
tries = 0
wifi.active(True)
id = getIdentificationId(CONFIG_IDENTIFICATION)
# An ESP32-C3 supermini does not connect to WiFi with default WiFi power when plugged in a breadboard.
# See https://www.reddit.com/r/arduino/comments/1dl6atc/esp32c3_boards_cant_connect_to_wifi_when_plugged/
# RPi Pico does not support a float.
wifiPowerParams = getParams(CONFIG_WIFI_POWER, getIdentificationId(CONFIG_IDENTIFICATION), DEFAULT_WIFI_POWER)
if wifiPowerParams[0]:
wifi.config(txpower=wifiPowerParams)
wifi.connect(wlan[0], wlan[1])
while not wifi.isconnected():
time.sleep(0.5)
if not callback(S_WLAN, tries):
return False
tries += 1
return True
async def recv(size):
global buffer
while len(buffer) < size:
# With ESP8266 and SSL, returns always an empty buffer.
buffer += await proxy[P_READER].read(4096)
result = buffer[:size]
buffer = buffer[size:]
return result
async def send(data):
totalAmount = len(data)
amountSent = 0
while amountSent < totalAmount:
amount = totalAmount - amountSent
if amount > 4096:
amount = 4096
proxy[P_WRITER].write(data[amountSent:amountSent + amount])
await proxy[P_WRITER].drain()
amountSent += amount
async def writeUInt(value):
result = bytes([value & 0x7f])
value >>= 7
while value != 0:
result = bytes([(value & 0x7f) | 0x80]) + result
value >>= 7
await send(result)
async def writeString(string):
bString = bytes(string, "utf-8")
await writeUInt(len(bString))
await send(bString)
def blockingWriteString(string):
return asyncio.run(writeString(string))
async def readByte():
return ord(await recv(1))
async def readUInt():
byte = await readByte()
value = byte & 0x7f
while byte & 0x80:
byte = await readByte()
value = (value << 7) + (byte & 0x7f)
return value
async def readString():
size = await readUInt()
if size:
return (await recv(size)).decode("utf-8")
else:
return ""
def exit(message=None):
if (message):
print(message, file=sys.stderr)
sys.exit(-1)
def init(callback):
global proxy
proxyParam = getParams(CONFIG_PROXY, getIdentificationId(CONFIG_IDENTIFICATION), getParams(CONFIG_PROXY, "_default", DEFAULT_PROXY))
callback(S_UCUQ, 0)
try:
proxy = asyncio.run(asyncio.open_connection(proxyParam[0], proxyParam[1], proxyParam[2]))
except:
return False
else:
return True
def getDeviceLabel():
return uos.uname().sysname
def blockingReadString():
return asyncio.run(readString())
def handshake():
blockingWriteString(PROTOCOL_LABEL)
blockingWriteString(PROTOCOL_VERSION)
blockingWriteString("Device")
blockingWriteString(getDeviceLabel())
error = blockingReadString()
if error:
sys.exit(error)
notification = blockingReadString()
if notification:
print(notification)
def ignition():
blockingWriteString(CONFIG_IDENTIFICATION[0])
blockingWriteString(getIdentificationId(CONFIG_IDENTIFICATION))
error = blockingReadString()
if error:
sys.exit(error)
async def serve():
while True:
request = await readUInt()
if request == R_EXECUTE:
script = await readString()
expression = await readString()
returned = ""
try:
exec(script)
if expression:
returned = json.dumps(eval(expression))
except Exception as exception:
with io.StringIO() as stream:
sys.print_exception(exception, stream)
error = stream.getvalue()
print("Error: ", error)
await writeUInt(A_ERROR)
await writeString(error)
else:
if expression:
await writeUInt(A_RESULT)
await writeString(returned)
else:
await writeUInt(A_PUZZLED)
await writeString("") # For future use
def defaultCallback(status, tries):
if tries == 0:
if status != S_FAILURE:
print("\r \r", end="")
if status == S_SEARCHING:
print("Searching for available WLAN...", end="")
elif status == S_WLAN:
print("Connecting to WLAN...", end="")
elif status == S_UCUQ:
print("Connecting to UCUq server...", end="")
elif status == S_SUCCESS:
print("", end="") # Erase line and go to the beginning of the line.
else:
print(".", end="")
if status == S_FAILURE:
print("FAILURE!!!")
elif status == S_DECONNECTION:
print("Deconnection!")
return True if tries <= 200 else False
def handleLed(pin, state, onValue):
Pin(pin, Pin.OUT).value(1 if state == onValue else 0)
def ledBlink(pin, count, onValue):
for _ in range(count):
handleLed(pin, True, onValue)
time.sleep(0.1)
handleLed(pin, False, onValue)
time.sleep(0.1)
def ledCallback(status, tries, pin, onValue):
if status == S_SEARCHING:
ledBlink(pin, 1, onValue)
elif status == S_WLAN:
handleLed(pin, not( tries % 2), onValue )
elif status == S_UCUQ:
handleLed(pin, False, onValue)
elif status == S_FAILURE:
handleLed(pin, True, onValue)
elif status == S_DECONNECTION:
handleLed(pin, True, onValue)
elif status == S_SUCCESS:
ledBlink(pin, 3, onValue)
return defaultCallback(status, tries) and not ( ( status == S_UCUQ) and ( tries > 5 ) )
def completeParam(params, default):
if params is None:
return default
elif isinstance(params, (int, str, float)):
return [params] + default[1:]
elif isinstance(params, (list, tuple)):
return params + list(default[len(params):])
else:
return default
def getParams(paramSet, device, default):
if not isinstance(paramSet, (list, tuple)) or not all(isinstance(item, (list, tuple)) for item in paramSet):
return completeParam(paramSet, default)
else:
for params in paramSet:
if device in params[1]:
return completeParam(params[0], default)
return default
def getCallback():
onBoardLed = getParams(CONFIG_ONBOARD_LED, getIdentificationId(CONFIG_IDENTIFICATION), getParams(CONFIG_ONBOARD_LED, "_default", DEFAULT_ONBOARD_LED))
if onBoardLed[0]:
return lambda status, tries: ledCallback(status, tries, onBoardLed[0], onBoardLed[1])
return defaultCallback
def main():
callback = getCallback()
if not wlanConnect(WLAN, callback):
callback(S_FAILURE, 0)
exit()
if not init(callback):
if ( WLAN != "" ):
callback(S_FAILURE, 0)
exit()
wlanDisconnect()
if not wlanConnect(WLAN, callback):
callback(S_FAILURE, 0)
exit()
if not init(callback):
callback(S_FAILURE, 0)
exit()
callback(S_SUCCESS, 0)
handshake()
ignition()
try:
asyncio.run(serve())
except Exception as exception:
try:
writeUInt(A_DISCONNECTED)
except:
pass
getCallback()(S_DECONNECTION, 0)
raise exception
main()
Loading
xiao-esp32-c3
xiao-esp32-c3