# MicroController Remote Server (runs on the µcontroller)

import asyncio, sys, uos, time, network, json, binascii, io
from machine import Pin

WLAN = "" # Connects to one of the WLAN defined in 'ucuq.json'.
# WLAN = "<name>" # Connects to the WLAN <name> as defined in 'ucuq.json'.
# WLAN = ("<ssid>","<key>") # Connects to WLAN <ssid> using <key>.

with open("ucuq.json", "r") as config:
  CONFIG = json.load(config)

K_IDENTIFICATION = "Identification"
K_ONBOARD_LED = "OnBoardLed"
K_PROXY = "Proxy"
K_WIFI_POWER = "WifiPower"

DEFAULT_ONBOARD_LED = (None, True)
DEFAULT_PROXY = ("ucuq.q37.info", 53800, False)
DEFAULT_WIFI_POWER = [None]

getConfig = lambda key: CONFIG[key] if key in CONFIG else None

CONFIG_IDENTIFICATION = CONFIG[K_IDENTIFICATION]
CONFIG_ONBOARD_LED = getConfig(K_ONBOARD_LED)
CONFIG_PROXY = getConfig(K_PROXY)
CONFIG_WIFI_POWER = getConfig(K_WIFI_POWER)

WLAN_FALLBACK = "q37"

PROTOCOL_LABEL = "c37cc83e-079f-448a-9541-5c63ce00d960"
PROTOCOL_VERSION = "0"

# Connection status.
S_FAILURE = 0
S_SEARCHING = 1 # Search an available WLAN.
S_WLAN = 2 # Connecting to WLAN. There is a delay of 0.5 second between two calls.
S_UCUQ = 3 # Connecting to UCUq server.
S_SUCCESS = 4
S_DECONNECTION = 5

# Request
R_PING = 0  # Deprecated!
R_EXECUTE = 1

# Answer; must match in device.h: device::eAnswer.
A_RESULT = 0
A_SENSOR = 1
A_ERROR = 2
A_PUZZLED = 3
A_DISCONNECTED = 4

P_READER = 0
P_WRITER = 1

wifi = None
buffer = bytes()


def getMacAddress():
  global wifi
  if not wifi:
    wifi = network.WLAN(network.STA_IF)
    wifi.active(True) # Otherwise the MAC address is NUL.
  return binascii.hexlify(network.WLAN(network.STA_IF).config('mac')).decode()


# NOTA: also used in the script for 'getInfos()'… 
def getIdentificationId(identification):
  if isinstance(identification[1], str):
    return identification[1]
  else:
    mac = getMacAddress()

    if mac in identification[1]:
      return identification[1][mac]
    else:
      raise Exception("Unable to get an id for this device.")

    
WLANS = CONFIG["WLAN"]

def wlanIsShortcut(wlan):
  if isinstance(wlan, str):
    return True
  elif isinstance(wlan, (list, tuple)) and len(wlan) == 2:
    return False
  else:
    raise TypeError("'wlan' parameter can only be a string (shortcut), a list or a tuple of 2 strings (SSID and key)")


def wlanGetKnownStation(wifi, callback):
  known = ""
  tries = 0

  wifi.active(True)
  
  while not known:
    if not callback(S_SEARCHING, tries):
      callback(S_FAILURE, 0)
      exit()
    
    for station in wifi.scan():
      if known:
        break
      for name in WLANS:
        if known:
          break
        if station[0].decode("utf-8") == WLANS[name][0]:
          known = name

    tries += 1
    time.sleep(0.5)

  return known


def wlanDisconnect():
  wifi = network.WLAN(network.STA_IF)

  wifi.disconnect()

  while wifi.status() != network.STAT_IDLE:
    pass


def wlanConnect(wlan, callback):
  global wifi

  wifi = network.WLAN(network.STA_IF)

  if not wifi.isconnected():
    if wlanIsShortcut(wlan):
      if wlan == "":
        wlan = WLANS[wlanGetKnownStation(wifi, callback)]
      else:
        try:
          wlan = WLANS[wlan]
        except KeyError:
          wlan = WLANS[WLAN_FALLBACK]

    tries = 0

    wifi.active(True)

    id = getIdentificationId(CONFIG_IDENTIFICATION)

    # An ESP32-C3 supermini does not connect to WiFi with default WiFi power when plugged in a breadboard.
    # See https://www.reddit.com/r/arduino/comments/1dl6atc/esp32c3_boards_cant_connect_to_wifi_when_plugged/
    # RPi Pico does not support a float.
    wifiPowerParams = getParams(CONFIG_WIFI_POWER, getIdentificationId(CONFIG_IDENTIFICATION), DEFAULT_WIFI_POWER)

    if wifiPowerParams[0]:
      wifi.config(txpower=wifiPowerParams)

    wifi.connect(wlan[0], wlan[1])

    while not wifi.isconnected():
      time.sleep(0.5)
      if not callback(S_WLAN, tries):
        return False
      tries += 1

  return True


async def recv(size):
  global buffer

  while len(buffer) < size:
    # With ESP8266 and SSL, returns always an empty buffer.
    buffer += await proxy[P_READER].read(4096)

  result = buffer[:size]

  buffer = buffer[size:]

  return result


async def send(data):
  totalAmount = len(data)
  amountSent = 0

  while amountSent < totalAmount:
    amount = totalAmount - amountSent

    if amount > 4096:
      amount = 4096

    proxy[P_WRITER].write(data[amountSent:amountSent + amount])	
    await proxy[P_WRITER].drain()

    amountSent += amount


async def writeUInt(value):
  result = bytes([value & 0x7f])
  value >>= 7

  while value != 0:
    result = bytes([(value & 0x7f) | 0x80]) + result
    value >>= 7

  await send(result)


async def writeString(string):
  bString = bytes(string, "utf-8")
  await writeUInt(len(bString))
  await send(bString)


def blockingWriteString(string):
  return asyncio.run(writeString(string))


async def readByte():
  return ord(await recv(1))


async def readUInt():
  byte = await readByte()
  value = byte & 0x7f

  while byte & 0x80:
    byte = await readByte()
    value = (value << 7) + (byte & 0x7f)

  return value


async def readString():
  size = await readUInt()

  if size:
    return (await recv(size)).decode("utf-8")
  else:
    return ""
  

def exit(message=None):
  if (message):
    print(message, file=sys.stderr)

  sys.exit(-1)


def init(callback):
  global proxy
  proxyParam = getParams(CONFIG_PROXY, getIdentificationId(CONFIG_IDENTIFICATION), getParams(CONFIG_PROXY, "_default", DEFAULT_PROXY))

  callback(S_UCUQ, 0)

  try:
    proxy = asyncio.run(asyncio.open_connection(proxyParam[0], proxyParam[1], proxyParam[2]))
  except:
    return False
  else:
    return True
  

def getDeviceLabel():
  return uos.uname().sysname


def blockingReadString():
  return asyncio.run(readString())


def handshake():
  blockingWriteString(PROTOCOL_LABEL)
  blockingWriteString(PROTOCOL_VERSION)
  blockingWriteString("Device")
  blockingWriteString(getDeviceLabel())

  error = blockingReadString()

  if error:
    sys.exit(error)

  notification = blockingReadString()

  if notification:
    print(notification)


def ignition():
  blockingWriteString(CONFIG_IDENTIFICATION[0])
  blockingWriteString(getIdentificationId(CONFIG_IDENTIFICATION))

  error = blockingReadString()

  if error:
    sys.exit(error)


async def serve():
  while True:
    request = await readUInt()

    if request == R_EXECUTE:
      script = await readString()
      expression = await readString()
      returned = ""
      try:
        exec(script)
        if expression:
          returned = json.dumps(eval(expression))
      except Exception as exception:
        with io.StringIO() as stream:
          sys.print_exception(exception, stream)
          error = stream.getvalue()
          print("Error: ", error)
          await writeUInt(A_ERROR)
          await writeString(error)
      else:
        if expression:
          await writeUInt(A_RESULT)
          await writeString(returned)
    else:
      await writeUInt(A_PUZZLED)
      await writeString("")  # For future use


def defaultCallback(status, tries):
  if tries == 0:
    if status != S_FAILURE:
      print("\r                                                                                \r", end="")
    if status == S_SEARCHING:
      print("Searching for available WLAN...", end="")
    elif status == S_WLAN:
      print("Connecting to WLAN...", end="")
    elif status == S_UCUQ:
      print("Connecting to UCUq server...", end="")
    elif status == S_SUCCESS:
      print("", end="") # Erase line and go to the beginning of the line. 
  else:
    print(".", end="")

  if status == S_FAILURE:
    print("FAILURE!!!")
  elif status == S_DECONNECTION:
    print("Deconnection!")

  return True if tries <= 200 else False 


def handleLed(pin, state, onValue):
  Pin(pin, Pin.OUT).value(1 if state == onValue else 0)


def ledBlink(pin, count, onValue):
  for _ in range(count):
    handleLed(pin, True, onValue)
    time.sleep(0.1)
    handleLed(pin, False, onValue)
    time.sleep(0.1)


def ledCallback(status, tries, pin, onValue):
  if status == S_SEARCHING:
    ledBlink(pin, 1, onValue)
  elif status == S_WLAN:
    handleLed(pin, not( tries % 2), onValue )
  elif status == S_UCUQ:
    handleLed(pin, False, onValue)
  elif status == S_FAILURE:
    handleLed(pin, True, onValue)
  elif status == S_DECONNECTION:
    handleLed(pin, True, onValue)
  elif status == S_SUCCESS:
    ledBlink(pin, 3, onValue)
  return defaultCallback(status, tries) and not ( ( status == S_UCUQ) and ( tries > 5 ) )


def completeParam(params, default):
  if params is None:
    return default  
  elif isinstance(params, (int, str, float)):
    return [params] + default[1:]  
  elif isinstance(params, (list, tuple)):
    return params + list(default[len(params):])
  else:
    return default


def getParams(paramSet, device, default):
  if not isinstance(paramSet, (list, tuple)) or not all(isinstance(item, (list, tuple)) for item in paramSet):
    return completeParam(paramSet, default)
  else:
    for params in paramSet:
      if device in params[1]:
        return completeParam(params[0], default)
    return default


def getCallback():
  onBoardLed = getParams(CONFIG_ONBOARD_LED, getIdentificationId(CONFIG_IDENTIFICATION), getParams(CONFIG_ONBOARD_LED, "_default", DEFAULT_ONBOARD_LED))

  if onBoardLed[0]:
    return lambda status, tries: ledCallback(status, tries, onBoardLed[0], onBoardLed[1])
    
  return defaultCallback


def main():
  callback = getCallback()

  if not wlanConnect(WLAN, callback):
    callback(S_FAILURE, 0)
    exit()

  if not init(callback):
    if ( WLAN != "" ):
      callback(S_FAILURE, 0)
      exit()

    wlanDisconnect()

    if not wlanConnect(WLAN, callback):
      callback(S_FAILURE, 0)
      exit()

    if not init(callback):
      callback(S_FAILURE, 0)
      exit()

  callback(S_SUCCESS, 0)

  handshake()

  ignition()

  try:
    asyncio.run(serve())
  except Exception as exception:
    try:
      writeUInt(A_DISCONNECTED)
    except:
      pass

    getCallback()(S_DECONNECTION, 0)
    raise exception


main()
Loading
xiao-esp32-c3