import network, time, ubinascii, ujson, machine, os, urequests, utime
from umqtt.simple import MQTTClient
from machine import Pin, SPI, PWM
from xglcd_font import XglcdFont
from ili9341 import Display, color565
# Serial (use board id)
DEVICE_SERIAL = (
"DEV/PING/001/00001" # ubinascii.hexlify(machine.unique_id()).decode().upper()
)
FIRMWARE_VERSION = "v1.0.0.0"
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASS = ""
BROKER = "45.33.76.210"
BROKER_PORT = 1386
MQTT_USER = "ping_africa"
MQTT_PASSWORD = "bLeRABsOMpTy"
MQTT_CLIENT = None
# SERVER_URL = "https://api.getping.africa/v1"
SERVER_URL = "http://c7dd5c7d3a33.ngrok-free.app/v1/"
CFG_FILE = "config.json"
RECORDS_STORE = "transactions.json"
LOGO_FILE = "logo.jpg"
MAX_RECORDS_STORE_CAPACITY = 50
CURRENT_RECORD_INDEX = 0
INTERNET_CONNECTION = None
IS_INTERNET_CONNECTED = False
COLOR_BLACK = color565(0, 0, 0)
COLOR_WHITE = color565(255, 255, 255)
TEXT_COLOR = color565(255, 255, 255)
TEXT_BG_COLOR = color565(64, 0, 255)
TFT_WIDHT = 240
TFT_HEIGHT = 135
LOADED_CFG = None
DISPLAY_POWER_OFF_TIME = None
HEARTBEAT_INTERVAL = 3600000000
HEARTBEAT_BROADCAST_TIME = utime.ticks_ms() + HEARTBEAT_INTERVAL
OTA_INTERVAL = 7200000000
FIRMWARE_CHECK_TIME = utime.ticks_ms() + OTA_INTERVAL
DEVICE_MODE = "idle"
IDLE_MODE = "idle"
MENU_MODE = "menu"
NOTIFICATION_MODE = "notf"
DEVICE_MENU = ["History", "Balance", "Account", "Exit"]
SELECTED_MENU_ITEM = None
SELECTED_MENU_ITEM_SHOWING = False
RECORDS_IN_MEMORY = []
BUZZER_TIMEOUT = None
# Peripherals
btnPower = Pin(19, Pin.IN, Pin.PULL_UP)
btnNext = Pin(20, Pin.IN, Pin.PULL_UP)
btnPrev = Pin(21, Pin.IN, Pin.PULL_UP)
bzr = PWM(Pin(47, Pin.IN))
bzr.duty(0)
# led = Pin(2, Pin.OUT)
displayFont = XglcdFont("Unispace12x24.c", 12, 24)
spi = SPI(1, baudrate=40000000, sck=Pin(18), mosi=Pin(8))
tftDisplay = Display(
spi,
cs=Pin(15),
dc=Pin(5),
rst=Pin(4),
width=240,
height=320,
rotation=180,
mirror=False,
)
# =========== Helper Functions ============
def load_config():
try:
with open(CFG_FILE, "r") as f:
cfg = ujson.load(f)
return cfg
except Exception:
return None
def save_config(data):
try:
tmp = CFG_FILE + ".tmp"
with open(tmp, "w") as f:
f.write(ujson.dumps(data))
try:
os.remove(CFG_FILE)
except:
pass
os.rename(tmp, CFG_FILE)
return True
except Exception as e:
print("save_config failed", CFG_FILE, e)
return False
def set_device_mode(mode):
global DEVICE_MODE
DEVICE_MODE = mode
def get_logo():
try:
with open(LOGO_FILE, "rb") as f:
png_data = f.read()
return png_data
except Exception:
return False
def get_records():
try:
with open(RECORDS_STORE, "r") as f:
return ujson.load(f)
except Exception:
pass
return []
def safe_load_json(fn):
try:
with open(fn, "r") as f:
return ujson.load(f)
except Exception:
return False
def safe_save_json(fn, obj):
try:
tmp = fn + ".tmp"
with open(tmp, "w") as f:
f.write(ujson.dumps(obj))
try:
os.remove(fn)
except:
pass
os.rename(tmp, fn)
return True
except Exception as e:
print("save_json failed", fn, e)
return False
def displayOn():
tftDisplay.clear()
tftDisplay.display_on()
def displayOff():
tftDisplay.clear()
tftDisplay.display_off()
global SELECTED_MENU_ITEM
global SELECTED_MENU_ITEM_SHOWING
SELECTED_MENU_ITEM = 0
SELECTED_MENU_ITEM_SHOWING = False
set_device_mode(mode=IDLE_MODE)
# Note frequencies (Hz)
MELODIES = {
"C4": 262,
"D4": 294,
"E4": 330,
"F4": 349,
"G4": 392,
"A4": 440,
"B4": 494,
"C5": 523,
"D5": 587,
"E5": 659,
"F5": 698,
"G5": 784,
"A5": 880,
"B5": 988,
"C6": 1047,
}
def play_melody(note, duration=0.15, pause=0.05):
"""Play one note on the buzzer."""
freq = MELODIES.get(note, 440)
bzr.freq(freq)
bzr.duty(70) # 50% duty cycle
time.sleep(duration)
bzr.duty(0) # stop
time.sleep(pause)
def jingle_bootup():
melody = [
("C4", 0.2),
("E4", 0.2),
("G4", 0.2),
("C5", 0.2),
]
for note, dur in melody:
play_melody(note, dur)
def jingle_alert():
melody = [
("C5", 0.12),
("E5", 0.12),
("G5", 0.15),
("C6", 0.2),
("A5", 0.15),
("G5", 0.25),
]
for note, dur in melody:
play_melody(note, dur)
def jingle_cheerful():
"""Cheerful rising jingle (default new record)."""
melody = [
("C5", 0.12),
("E5", 0.12),
("G5", 0.15),
("C6", 0.2),
("A5", 0.15),
("G5", 0.25),
]
for note, dur in melody:
play_melody(note, dur)
def jingle_urgent():
"""Urgent alert jingle (faster, sharp)."""
melody = [
("C6", 0.1),
("C6", 0.1),
("C6", 0.1),
("A5", 0.1),
("C6", 0.15),
("A5", 0.15),
]
for note, dur in melody:
play_melody(note, dur, pause=0.02)
def jingle_calm():
"""Soft downward jingle (acknowledgement)."""
melody = [("E5", 0.2), ("D5", 0.2), ("C5", 0.3)]
for note, dur in melody:
play_melody(note, dur, pause=0.1)
# ======== Operational Functions ==========
def connect_wifi():
global INTERNET_CONNECTION
global IS_INTERNET_CONNECTED
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print("Connecting to Wi-Fi...")
wlan.connect(WIFI_SSID, WIFI_PASS)
t0 = time.time()
while not wlan.isconnected():
time.sleep(0.5)
if time.time() - t0 > 10:
break
if wlan.isconnected():
print("Wi-Fi config:", wlan.ifconfig())
IS_INTERNET_CONNECTED = True
INTERNET_CONNECTION = wlan
return wlan.isconnected()
def mqtt_connect():
global MQTT_CLIENT
client_id = f"sim-{DEVICE_SERIAL}"
MQTT_CLIENT = MQTTClient(
client_id, BROKER, port=BROKER_PORT, user=MQTT_USER, password=MQTT_PASSWORD
)
MQTT_CLIENT.set_callback(mqtt_callback)
try:
MQTT_CLIENT.connect()
for topic in LOADED_CFG.get("mqtt", {}).get("sub", []):
MQTT_CLIENT.subscribe(topic)
print("Connected:", topic)
return True
except Exception as e:
print("MQTT connect error:", e)
MQTT_CLIENT = None
return False
def mqtt_callback(topic, msg):
print("MQTT MSG:", topic.decode(), msg.decode())
payload = msg.decode()
parts = str(topic.decode()).split("/")
if len(parts) <= 1:
return
if parts[2] == "trx":
handleIncomingTrxs(payload)
elif parts[2] == "otp":
handleIncomingOtp(payload)
elif parts[2] == "ota":
handleFirmwareUpdate(payload)
else:
pass
def check_internet_connection():
try:
# Attempt to make a request to a well-known, reliable URL
# http://clients3.google.com/generate_204 is often used as it returns a 204 No Content
# status code if connected, and is designed for this purpose.
response = urequests.get("http://clients3.google.com/generate_204", timeout=5)
if response.status_code in [200, 202, 204]:
return True
except Exception as e:
# print(f"[CONN] No internet connection: {e}")
pass
return False
def get_device_linking():
print("==== Provision lookup ====")
headers = {"Content-Type": "application/json", "x-serial-id": DEVICE_SERIAL}
endpoint = f"{SERVER_URL}device/fetch.json"
try:
res = urequests.get(endpoint, headers=headers)
if res.status_code == 200:
data = res.json()
if data.get("status", False) is False:
return False
if data.get("data").get("linked", False) is False:
return False
return data.get("data").get("config")
except Exception as e:
print("fetch error:", e)
return False
def save_bank_logo(logoUrl):
try:
img_data = urequests.get(logoUrl, headers={"ngrok-skip-browser-warning": "69420"}).content
with open(LOGO_FILE, "wb") as handler:
handler.write(img_data)
except:
print("ERROR downloading bank logo")
def qr_make(data):
try:
from uqrcode import QRCode as MicroQRCode
qr = MicroQRCode()
qr.add_data(data)
matrix = qr.get_matrix()
return matrix
except Exception as e:
print("show_qr err:", e)
return None
def showQR(url):
if not tftDisplay:
print("QR:", url)
return
try:
tftDisplay.clear(TEXT_BG_COLOR)
if not LOADED_CFG:
tftDisplay.draw_text(
45, 30, "Device Setup", displayFont, COLOR_WHITE, TEXT_BG_COLOR
)
else:
tftDisplay.draw_text(
43, 30, "Device Portal", displayFont, COLOR_WHITE, TEXT_BG_COLOR
)
matrix = qr_make(url)
qw = len(matrix[0])
scale = 5
size_px = qw * scale
px = (TFT_WIDHT - size_px) // 2
py = 65
for y in range(len(matrix)):
for x in range(len(matrix[0])):
value = not matrix[int(y)][int(x)]
if matrix[y][x]:
tftDisplay.fill_rectangle(
px + x * scale,
py + y * scale,
scale,
scale,
(COLOR_BLACK if not value else COLOR_WHITE),
)
else:
tftDisplay.fill_rectangle(
px + x * scale,
py + y * scale,
scale,
scale,
(COLOR_BLACK if not value else COLOR_WHITE),
)
except Exception as e:
print("show_qr err:", e)
def showLogo():
logo = get_logo()
if not logo:
save_bank_logo(logoUrl=LOADED_CFG.get("bank").get("logo"))
logo = get_logo()
# shown = tftDisplay.draw_image_url(url=LOADED_CFG.get("bank").get("logo"), x=45, y=80, w=150, h=150)
tftDisplay.clear(TEXT_BG_COLOR)
if logo:
tftDisplay.draw_image(path=LOGO_FILE, x=45, y=80, w=150, h=150)
else:
tftDisplay.draw_text(70, 155, "AleroCube", displayFont, TEXT_COLOR, TEXT_BG_COLOR)
# def show_record(index):
# try:
# if tftDisplay.DISPLAY_OFF:
# displayOn()
# tftDisplay.clear(TEXT_BG_COLOR)
# time.sleep_ms(1000)
# records = safe_load_json(RECORDS_STORE)
# if len(records) <= 0:
# tftDisplay.draw_text(4, 36, "No records found.", displayFont, TEXT_COLOR, TEXT_BG_COLOR)
# return
# record = records[index]
# if record is None:
# tftDisplay.draw_text(4, 36, "Record not found.", displayFont, TEXT_COLOR, TEXT_BG_COLOR)
# return
# tftDisplay.draw_text(20, 36, "== Credit Alert ==", displayFont, TEXT_COLOR, TEXT_BG_COLOR)
# # Line1 amount
# amount = str(record.get("amount", ""))
# tftDisplay.draw_text(4, 90, amount, displayFont, TEXT_COLOR, TEXT_BG_COLOR)
# # Line2 name (scroll if long)
# name = record.get("name", "")
# tftDisplay.draw_text(4, 126, name, displayFont, TEXT_COLOR, TEXT_BG_COLOR)
# # Line3 datetime (static)
# tTime = str(record.get("time", ""))
# tftDisplay.draw_text(4, 162, tTime, displayFont, TEXT_COLOR, TEXT_BG_COLOR)
# global DISPLAY_POWER_OFF_TIME
# DISPLAY_POWER_OFF_TIME = utime.ticks_ms() + 60000
# except Exception as e:
# print("Display draw failed:", e)
def show_record(record):
try:
if tftDisplay.DISPLAY_OFF:
displayOn()
tftDisplay.clear(TEXT_BG_COLOR)
time.sleep_ms(1000)
tftDisplay.draw_text(
20, 36, "== Credit Alert ==", displayFont, TEXT_COLOR, TEXT_BG_COLOR
)
# Line1 amount
amount = str(record.get("amount", ""))
tftDisplay.draw_text(4, 90, amount, displayFont, TEXT_COLOR, TEXT_BG_COLOR)
# Line2 name (scroll if long)
name = record.get("name", "")
tftDisplay.draw_text(4, 126, name, displayFont, TEXT_COLOR, TEXT_BG_COLOR)
# Line3 datetime (static)
tTime = str(record.get("time", ""))
tftDisplay.draw_text(4, 162, tTime, displayFont, TEXT_COLOR, TEXT_BG_COLOR)
global DISPLAY_POWER_OFF_TIME
DISPLAY_POWER_OFF_TIME = utime.ticks_ms() + 60000
except Exception as e:
print("Display draw failed:", e)
def fetch_records_online():
global RECORDS_IN_MEMORY
try:
headers = {"x-serial-id": DEVICE_SERIAL}
url = f"{LOADED_CFG.get("server").get("url")}device/transactions"
res = urequests.get(url, headers=headers)
if res.status_code == 200:
data = res.json()
if data.get("status", False) is False:
return False
RECORDS_IN_MEMORY = data.get("data").get("transactions")
return True
except Exception as e:
print("fetch error:", e)
return False
def show_record_v2():
global CURRENT_RECORD_INDEX
try:
if CURRENT_RECORD_INDEX < 0:
CURRENT_RECORD_INDEX = len(RECORDS_IN_MEMORY) - 1
else:
if CURRENT_RECORD_INDEX > len(RECORDS_IN_MEMORY):
CURRENT_RECORD_INDEX = 0
if len(RECORDS_IN_MEMORY) <= 0:
tftDisplay.draw_text(
4, 36, "No records available.", displayFont, TEXT_COLOR, TEXT_BG_COLOR
)
return
record = RECORDS_IN_MEMORY[CURRENT_RECORD_INDEX]
if record is None:
tftDisplay.draw_text(
4, 36, "Record not found.", displayFont, TEXT_COLOR, TEXT_BG_COLOR
)
return
index = CURRENT_RECORD_INDEX
tftDisplay.clear(TEXT_BG_COLOR)
tftDisplay.draw_text(
4, 36, f"Record {(index + 1)} >>", displayFont, TEXT_COLOR, TEXT_BG_COLOR
)
# Line1 amount
amount = str(record.get("amount", ""))
tftDisplay.draw_text(4, 90, amount, displayFont, TEXT_COLOR, TEXT_BG_COLOR)
# Line2 name (scroll if long)
name = record.get("name", "")
tftDisplay.draw_text(4, 126, name, displayFont, TEXT_COLOR, TEXT_BG_COLOR)
# Line3 datetime (static)
tTime = str(record.get("time", ""))
tftDisplay.draw_text(4, 162, tTime, displayFont, TEXT_COLOR, TEXT_BG_COLOR)
global DISPLAY_POWER_OFF_TIME
DISPLAY_POWER_OFF_TIME = utime.ticks_ms() + 60000
except Exception as e:
print("Display draw failed:", e)
def handleIncomingTrxs(payload):
payload = ujson.loads(payload)
if payload.get("device", None) is None:
return
if payload.get("device") != DEVICE_SERIAL:
return
if payload.get("reference", None) is None:
return
if payload.get("amount", None) is None:
return
if payload.get("name", None) is None:
return
if payload.get("time", None) is None:
return
# tmpRecords = []
# records = get_records()
# if len(records) > 0:
# tmpRecords = records
# tmpRecords.append({
# "reference": payload.get("reference", None),
# "amount": payload.get("amount", None),
# "name": payload.get("name", None),
# "time": payload.get("time", None),
# })
# if len(tmpRecords) > MAX_RECORDS_STORE_CAPACITY:
# tmpRecords.pop(0)
# safe_save_json(RECORDS_STORE, tmpRecords)
# CURRENT_RECORD_INDEX = len(tmpRecords) - 1
# Sound the buzzer
jingle_alert()
record = {
"reference": payload.get("reference", None),
"amount": payload.get("amount", None),
"name": payload.get("name", None),
"time": payload.get("time", None),
}
set_device_mode(mode=NOTIFICATION_MODE)
show_record(record=record)
time.sleep_ms(1000)
ack_trx(payload.get("reference", None))
def handleIncomingOtp(payload):
payload = ujson.loads(payload)
if payload.get("device", None) is None:
return
if payload.get("device") != DEVICE_SERIAL:
return
if payload.get("reference", None) is None:
return
if payload.get("otp", None) is None:
return
# Sound the buzzer
jingle_urgent()
# Display OTP
if tftDisplay.DISPLAY_OFF:
displayOn()
tftDisplay.clear(TEXT_BG_COLOR)
time.sleep_ms(1000)
set_device_mode(mode=NOTIFICATION_MODE)
tftDisplay.draw_text(
46, 36, "== OTP Code ==", displayFont, TEXT_COLOR, TEXT_BG_COLOR
)
otp = payload.get("otp", None)
tftDisplay.draw_text(85, 90, otp, displayFont, TEXT_COLOR, TEXT_BG_COLOR)
global DISPLAY_POWER_OFF_TIME
DISPLAY_POWER_OFF_TIME = utime.ticks_ms() + 60000
time.sleep_ms(1000)
ack_otp(reference=payload.get("reference", None))
def handleFirmwareUpdate(payload):
# payload = ujson.loads(payload)
# if payload.get("version", None) is None:
# return
# if payload.get("version") != FIRMWARE_VERSION:
# runOTAUpdate()
runOTAUpdate()
def ack_trx(reference):
topic = LOADED_CFG.get("mqtt", {}).get("pub", {}).get("trx")
payload = ujson.dumps({"reference": reference})
try:
if MQTT_CLIENT:
MQTT_CLIENT.publish(topic, payload)
except:
pass
def ack_otp(reference):
topic = LOADED_CFG.get("mqtt", {}).get("pub", {}).get("otp")
payload = ujson.dumps({"reference": reference})
try:
if MQTT_CLIENT:
MQTT_CLIENT.publish(topic, payload)
except:
pass
def send_heartbeat():
topic = LOADED_CFG.get("mqtt", {}).get("pub", {}).get("pulse")
payload = ujson.dumps({"device": DEVICE_SERIAL})
try:
if MQTT_CLIENT:
MQTT_CLIENT.publish(topic, payload)
except:
pass
def runOTAUpdate():
try:
version = LOADED_CFG.get("version") if LOADED_CFG.get("version", None) is not None else FIRMWARE_VERSION
headers = {"x-serial-id": DEVICE_SERIAL, "version": version}
url = f"{LOADED_CFG.get("server").get("url")}device/ota.json"
res = urequests.get(url, headers=headers)
if res.status_code != 200:
print(f"[OTA] Status error: {res.status_code}")
return
data = res.json()
if data.get("status", False) is False:
print(data.get("data").get("message"))
return
# print(data)
files = data.get("data").get("files", [])
if len(files) <= 0:
print("[OTA] No files")
return
if tftDisplay.DISPLAY_OFF:
tftDisplay.display_on()
tftDisplay.clear(TEXT_BG_COLOR)
tftDisplay.draw_text(
65, 155, "updating...", displayFont, TEXT_COLOR, TEXT_BG_COLOR
)
error = False
for f in files:
if (f.get("name", None) == None or len(f.get("name")) <= 0) or (
f.get("contents", None) == None or len(f.get("contents")) <= 0
):
print("[OTA] File seems invalid, aborting")
error = True
break
# Write to temporary file
tmpName = f"{f.get("name")}.tmp"
with open(tmpName, "w") as fh:
fh.write(f.get("contents"))
fh.flush()
fh.close()
# Basic sanity check: file size
if os.stat(tmpName)[6] < 100: # too small
print("[OTA] File seems invalid, aborting")
os.remove(tmpName)
error = True
break
if error is True:
tftDisplay.clear(TEXT_BG_COLOR)
tftDisplay.draw_text(
65, 155, "update error", displayFont, TEXT_COLOR, TEXT_BG_COLOR
)
return
print("[OTA] Download complete, verifying...")
for f in files:
name = f"{f.get("name")}"
tmpName = f"{f.get("name")}.tmp"
# Replace tmp files safely
os.rename(tmpName, name)
print("[OTA] Update applied, rebooting...")
tftDisplay.clear(TEXT_BG_COLOR)
tftDisplay.draw_text(
60, 155, "rebooting...", displayFont, TEXT_COLOR, TEXT_BG_COLOR
)
time.sleep(0.5)
machine.reset()
except Exception as e:
print("[OTA] Fetch error:", e)
tftDisplay.clear(TEXT_BG_COLOR)
tftDisplay.draw_text(
65, 155, "update error", displayFont, TEXT_COLOR, TEXT_BG_COLOR
)
# ============= Menu Navigation ============= #
def showMenu():
global SELECTED_MENU_ITEM
if SELECTED_MENU_ITEM is None:
SELECTED_MENU_ITEM = 0
if tftDisplay.DISPLAY_OFF:
tftDisplay.display_on()
tftDisplay.clear(TEXT_BG_COLOR)
tftDisplay.draw_text(
20, 36, "== Device Menu ==", displayFont, TEXT_COLOR, TEXT_BG_COLOR
)
startY = 90
lineGap = 46
position = 0
for idx, item in enumerate(DEVICE_MENU):
menuItem = item
if SELECTED_MENU_ITEM == idx:
menuItem = f"> {item}"
else:
menuItem = f" {item}"
tftDisplay.draw_text(
10,
(startY + (position * lineGap)),
menuItem,
displayFont,
TEXT_COLOR,
TEXT_BG_COLOR,
)
position += 1
def showHistory():
tftDisplay.clear(TEXT_BG_COLOR)
tftDisplay.draw_text(10, 36, "fetching ...", displayFont, TEXT_COLOR, TEXT_BG_COLOR)
fetched = fetch_records_online()
if fetched is False:
tftDisplay.clear(TEXT_BG_COLOR)
tftDisplay.draw_text(10, 36, "Fetch error!!", displayFont, TEXT_COLOR, TEXT_BG_COLOR)
time.sleep(1)
showMenu()
return
tftDisplay.clear(TEXT_BG_COLOR)
show_record_v2()
global SELECTED_MENU_ITEM_SHOWING
SELECTED_MENU_ITEM_SHOWING = True
def showBalance():
tftDisplay.clear(TEXT_BG_COLOR)
tftDisplay.draw_text(10, 36, "fetching ...", displayFont, TEXT_COLOR, TEXT_BG_COLOR)
balance = None
try:
headers = {"x-serial-id": DEVICE_SERIAL}
url = f"{LOADED_CFG.get("server").get("url")}device/balance"
res = urequests.get(url, headers=headers)
if res.status_code == 200:
data = res.json()
if data.get("status", False) is True:
balance = data.get("data", {}).get("balance", None)
except Exception as e:
print("fetch error:", e)
if balance is None:
tftDisplay.clear(TEXT_BG_COLOR)
tftDisplay.draw_text(10, 36, "Fetch error!!", displayFont, TEXT_COLOR, TEXT_BG_COLOR)
time.sleep(1)
showMenu()
return
tftDisplay.clear(TEXT_BG_COLOR)
tftDisplay.draw_text(
35, 36, "== Balance ==", displayFont, TEXT_COLOR, TEXT_BG_COLOR
)
tftDisplay.draw_text(10, 90, balance, displayFont, TEXT_COLOR, TEXT_BG_COLOR)
tftDisplay.draw_text(4, 162, "> Back", displayFont, TEXT_COLOR, TEXT_BG_COLOR)
global SELECTED_MENU_ITEM_SHOWING
SELECTED_MENU_ITEM_SHOWING = True
def showAccountInfo():
tftDisplay.clear(TEXT_BG_COLOR)
tftDisplay.draw_text(
15, 36, "== Account Info ==", displayFont, TEXT_COLOR, TEXT_BG_COLOR
)
tftDisplay.draw_text(
4,
90,
f"{LOADED_CFG.get("bank").get("title")}",
displayFont,
TEXT_COLOR,
TEXT_BG_COLOR,
)
tftDisplay.draw_text(
4,
126,
f"{LOADED_CFG.get("account").get("account_name")}",
displayFont,
TEXT_COLOR,
TEXT_BG_COLOR,
)
tftDisplay.draw_text(
4,
162,
f"{LOADED_CFG.get("account").get("account_number")}",
displayFont,
TEXT_COLOR,
TEXT_BG_COLOR,
)
tftDisplay.draw_text(4, 234, "> Back", displayFont, TEXT_COLOR, TEXT_BG_COLOR)
global SELECTED_MENU_ITEM_SHOWING
SELECTED_MENU_ITEM_SHOWING = True
def showPortalQRCode():
url = f"{SERVER_URL}d/{DEVICE_SERIAL}/init.json"
showQR(url=url)
global SELECTED_MENU_ITEM_SHOWING
SELECTED_MENU_ITEM_SHOWING = True
def menuGoBack():
global SELECTED_MENU_ITEM_SHOWING
if SELECTED_MENU_ITEM_SHOWING is True:
SELECTED_MENU_ITEM_SHOWING = False
showMenu()
# ========== Running Threads ============ #
def provisionLoop():
while True:
if btnPower.value() == 0:
start_press_time = time.ticks_ms()
while True:
elapsed_time = time.ticks_diff(time.ticks_ms(), start_press_time)
if elapsed_time > 3000:
machine.deepsleep()
break
time.sleep_ms(100)
def provisionThread():
url = f"{SERVER_URL}d/{DEVICE_SERIAL}/init.json"
if IS_INTERNET_CONNECTED is False:
showQR(url=url)
time.sleep(0.5)
provisionLoop()
else:
serverResponse = get_device_linking()
if serverResponse is False:
showQR(url=url)
time.sleep(0.5)
provisionLoop()
else:
save_config(serverResponse)
# save_bank_logo(serverResponse.get("bank").get("logo"))
time.sleep(0.2)
machine.reset()
def connectionThread(callback=None):
global INTERNET_CONNECTION
global IS_INTERNET_CONNECTED
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print("Connecting to Wi-Fi...")
wlan.connect(WIFI_SSID, WIFI_PASS)
max_attempts = 12
attempts = 0
while not wlan.isconnected():
if attempts >= max_attempts:
break
time.sleep_ms(15000)
attempts += 1
print("Retrying Wi-Fi connection...")
if wlan.isconnected():
print("Wi-Fi config:", wlan.ifconfig())
IS_INTERNET_CONNECTED = True
INTERNET_CONNECTION = wlan
# ========= Peripherals Actions ============
def powerBtnOnClick():
print("PWR BTN")
global DEVICE_MODE
global SELECTED_MENU_ITEM
global BUZZER_TIMEOUT
if DEVICE_MODE == IDLE_MODE:
set_device_mode(mode=MENU_MODE)
showMenu()
elif DEVICE_MODE == NOTIFICATION_MODE:
BUZZER_TIMEOUT = None
displayOff()
elif DEVICE_MODE == MENU_MODE:
if SELECTED_MENU_ITEM_SHOWING is False:
if SELECTED_MENU_ITEM == 0:
showHistory()
elif SELECTED_MENU_ITEM == 1:
showBalance()
elif SELECTED_MENU_ITEM == 2:
showAccountInfo()
# elif SELECTED_MENU_ITEM == 3:
# showPortalQRCode()
elif SELECTED_MENU_ITEM == 3:
displayOff()
else:
menuGoBack()
global DISPLAY_POWER_OFF_TIME
DISPLAY_POWER_OFF_TIME = utime.ticks_ms() + 30000
time.sleep(0.2)
def nextBtnOnClick():
print("NXT BTN")
global DEVICE_MODE
global SELECTED_MENU_ITEM
global BUZZER_TIMEOUT
if DEVICE_MODE == MENU_MODE:
if SELECTED_MENU_ITEM_SHOWING is False:
SELECTED_MENU_ITEM += 1
if SELECTED_MENU_ITEM > len(DEVICE_MENU):
SELECTED_MENU_ITEM = 0
showMenu()
else:
if SELECTED_MENU_ITEM == 0:
global CURRENT_RECORD_INDEX
CURRENT_RECORD_INDEX += 1
show_record_v2()
else:
BUZZER_TIMEOUT = None
global DISPLAY_POWER_OFF_TIME
DISPLAY_POWER_OFF_TIME = utime.ticks_ms() + 30000
time.sleep(0.2)
def prevBtnOnClick():
print("PREV BTN")
global DEVICE_MODE
global SELECTED_MENU_ITEM
global BUZZER_TIMEOUT
if DEVICE_MODE == MENU_MODE:
if SELECTED_MENU_ITEM_SHOWING is False:
SELECTED_MENU_ITEM -= 1
if SELECTED_MENU_ITEM < 0:
SELECTED_MENU_ITEM = len(DEVICE_MENU) - 1
showMenu()
else:
if SELECTED_MENU_ITEM == 0:
global CURRENT_RECORD_INDEX
CURRENT_RECORD_INDEX -= 1
show_record_v2()
else:
menuGoBack()
else:
BUZZER_TIMEOUT = None
global DISPLAY_POWER_OFF_TIME
DISPLAY_POWER_OFF_TIME = utime.ticks_ms() + 30000
time.sleep(0.2)
def mainThread():
global LOADED_CFG
global DISPLAY_POWER_OFF_TIME
global HEARTBEAT_BROADCAST_TIME
global FIRMWARE_CHECK_TIME
if LOADED_CFG == None:
LOADED_CFG = load_config()
# tftDisplay.clear(TEXT_BG_COLOR)
# tftDisplay.draw_text(25, 30, f"{LOADED_CFG.get("bank").get("title")}", displayFont, TEXT_COLOR, TEXT_BG_COLOR)
showLogo()
connected = mqtt_connect()
if connected is True:
jingle_bootup()
# Check for updates
runOTAUpdate()
tftDisplay.clear(TEXT_BG_COLOR)
tftDisplay.draw_text(50, 135, "Device Ready", displayFont, TEXT_COLOR, TEXT_BG_COLOR)
# Set display off time
DISPLAY_POWER_OFF_TIME = utime.ticks_ms() + 15000
while True:
# Check internet connection and reconnect if not connected.
if not INTERNET_CONNECTION.isconnected():
print("[WIFI] Network connection lost. Reconnecting...")
connect_wifi()
# time.sleep(2)
continue
# # Check active internet access
# iConnected = check_internet_connection()
# if iConnected is False:
# print("[CONN] Internet access lost. Reconnecting...")
# INTERNET_CONNECTION.disconnect()
# time.sleep(1)
# connect_wifi()
# time.sleep(2)
# continue
# MQTT Check connection
try:
MQTT_CLIENT.ping()
except Exception as e:
print(f"[MQTT] Connection lost: {e}. Reconnecting...")
try:
MQTT_CLIENT.disconnect() # Disconnect gracefully if possible
except Exception:
pass
connect_wifi()
mqtt_connect()
# time.sleep(2)
continue
# MQTT check
try:
if MQTT_CLIENT:
MQTT_CLIENT.check_msg()
except Exception as e:
print("[MQTT] Check msg error:", e)
# Turn off display if recently active
if DISPLAY_POWER_OFF_TIME is not None:
if utime.ticks_ms() >= DISPLAY_POWER_OFF_TIME:
tftDisplay.clear(TEXT_BG_COLOR)
if tftDisplay.DISPLAY_ON:
displayOff()
DISPLAY_POWER_OFF_TIME = None
set_device_mode(mode=IDLE_MODE)
# Send heart beat
if utime.ticks_ms() >= HEARTBEAT_BROADCAST_TIME:
tftDisplay.clear(TEXT_BG_COLOR)
tftDisplay.draw_text(
70, 155, "syncing...", displayFont, TEXT_COLOR, TEXT_BG_COLOR
)
send_heartbeat()
time.sleep_ms(1000)
tftDisplay.clear(TEXT_BG_COLOR)
HEARTBEAT_BROADCAST_TIME = utime.ticks_ms() + HEARTBEAT_INTERVAL
# Check for firmware updates
if utime.ticks_ms() >= FIRMWARE_CHECK_TIME:
runOTAUpdate()
FIRMWARE_CHECK_TIME = utime.ticks_ms() + OTA_INTERVAL
# Buttons
if btnNext.value() == 0:
nextBtnOnClick()
if btnPrev.value() == 0:
prevBtnOnClick()
if btnPower.value() == 0:
powerBtnOnClick()
time.sleep(0.3)
# ------ Main ------ #
def main():
tftDisplay.clear(TEXT_BG_COLOR)
tftDisplay.draw_text(70, 155, "booting...", displayFont, TEXT_COLOR, TEXT_BG_COLOR)
connect_wifi()
# time.sleep(1)
# tftDisplay.clear(TEXT_BG_COLOR)
global LOADED_CFG
LOADED_CFG = load_config()
if LOADED_CFG is None:
provisionThread()
else:
mainThread()
if __name__ == "__main__":
main()