import network
from machine import Pin, I2C, unique_id
import time, utime, ntptime
from Menu import *
from encoder import *
import ssd1306, gfx
import uasyncio as asyncio
import secrets
import urequests
import json
import ubinascii
#import AzureIoT
# ESP32 Pin assignment
i2c = I2C(0, scl=Pin(22), sda=Pin(21))
packdone_button = Pin(25, Pin.IN)
nc_button = Pin(26, Pin.IN)
failure_button = Pin(27, Pin.IN)
rotor_btn = Pin(33, Pin.IN)
builtin_led = Pin(2, Pin.OUT)
indicator_red = Pin(16, Pin.OUT)
indicator_orange = Pin(17, Pin.OUT)
indicator_green = Pin(18, Pin.OUT)
encoder_clk = Pin(35, Pin.IN)
encoder_dt = Pin(32, Pin.IN)
buzzer = Pin(14, Pin.OUT)
beepEnable = True
timezone = 1
showMainScreen = True
indicators = [indicator_red, indicator_orange, indicator_green]
counts = {"done": 0, "failure": 0, "nc": 0}
NCs = []
debouncer = {"lastPress": 0, "debounceTime": 1000}
def setUpWiFi(ssid, password):
global wlan
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, password)
while not wlan.isconnected(): continue
def setUpNTPTime():
try:
ntptime.settime()
except:
print("Error syncing time")
def setUpOLED():
global oled, graphics
oled_width = 128
oled_height = 64
oled = ssd1306.SSD1306_I2C(oled_width, oled_height, i2c)
graphics = gfx.GFX(oled_width, oled_height, oled.pixel)
def getDeviceId():
return ubinascii.hexlify(unique_id()).decode('utf-8').upper()
#Azure Communication:
deviceId = getDeviceId()
endpoint = f'https://reportlyhub.azure-devices.net/devices/{deviceId}/messages/events?api-version=2018-06-30'
iotHubconnectionString = 'SharedAccessSignature sr=reportlyhub.azure-devices.net&sig=IflVJ1FP3cyXlr0yW837GJnaWZQ03HH27BlcWyvOTYU%3D&skn=iothubowner&se=1713516024'
employeeId = None
employeeDisplayName = None
machineId = None
partNumber = None
NonCompliances = None
def getCurrentAssignation():
global employeeId, employeeDisplayName, machineId, partNumber
url = f"https://reportly.azurewebsites.net/api/devicecurrent.php?deviceId={deviceId}"
response = json.loads(urequests.get(url).text)
employeeId = response.get('employeeId')
employeeDisplayName = response.get('displayName')
machineId = response.get('machineId')
partNumber = response.get('partNumber')
return True
def getNonCompliances():
global NonCompliances
tmpPartNumber = partNumber.replace(" ", "%20")
url = f"https://reportly.azurewebsites.net/api/partnoncompliances.php?partNumber={tmpPartNumber}"
NonCompliances = json.loads(urequests.get(url).text)
return True
def printAll():
print(employeeId)
print(employeeDisplayName)
print(partNumber)
print(machineId)
print(NonCompliances)
def sendTelemetry(payload):
headers = { 'Authorization': iotHubconnectionString , 'Content-Encoding': 'utf-8'}
response = urequests.request("POST", endpoint, headers=headers, json=payload)
print(response.status_code)
print(response.content)
return response.status_code
def rotary_changed(change):
directions = {Rotary.ROT_CW: "up", Rotary.ROT_CCW: "down"}
if (change in directions.keys()): menu.navigate(directions[change])
def setUpHandlers():
global rotary
packdone_button.irq(trigger=Pin.IRQ_FALLING,handler=packdone_handler)
nc_button.irq(trigger=Pin.IRQ_FALLING,handler=nc_handler)
failure_button.irq(trigger=Pin.IRQ_FALLING,handler=failure_handler)
rotary = Rotary(32, 35, 33)
rotary.add_handler(rotary_changed)
rotor_btn.irq(handler=rotor_btn_handler, trigger=Pin.IRQ_FALLING | Pin.IRQ_RISING)
def debounce():
global debouncer
current_time = time.ticks_ms()
if time.ticks_diff(current_time, debouncer["lastPress"]) < debouncer["debounceTime"]:
return False
debouncer["lastPress"] = current_time
return True
def Submit_NC(Nc):
global showMainScreen
print("New NC was submitted: {}".format(Nc))
status = sendTelemetry({"event": "nc", "code": Nc})
if (status == 204): StatusScreen("Noncompliance", "was submitted", 1.5, True)
else: StatusScreen("Error Occurred", "Contact Maintenance", False, 1.5, True)
showMainScreen = True
def SetUpMenus():
global menu,menu_NCs
menu_NCs = MENU_OPTIONS(oled)
for nc in NonCompliances: menu_NCs.add_option(nc, Submit_NC, nc)
menu = NAVIGATE_MENU([menu_NCs])
def Open_NC_Menu():
global showMainScreen
showMainScreen = False
menu_NCs.draw()
def packdone_handler(pin):
if (not debounce()): return False
print("Pack Done")
StatusScreen("Submitting", "Please wait")
status = sendTelemetry({"event": "packDone"})
if (status == 204): StatusScreen("Pack", "was submitted")
else: StatusScreen("Error Occurred", "Contact Maintenance")
utime.sleep(1)
showMainScreen = True
def nc_handler(pin):
if (not debounce()): return False
global showMainScreen
if (showMainScreen): Open_NC_Menu()
else: showMainScreen = True
def failure_handler(pin):
if (not debounce()): return False
StatusScreen("Submitting", "Please wait")
status = sendTelemetry({"event": "failure"})
if (status == 204): StatusScreen("Failure", "was submitted")
else: StatusScreen("Error Occurred", "Contact Maintenance")
utime.sleep(1)
showMainScreen = True
def rotor_btn_handler(pin):
if (not debounce()): return False
if (showMainScreen): Open_NC_Menu()
else: menu.select()
def WelcomeScreen():
StatusScreen("Amphenol", False)
utime.sleep(2)
StatusScreen("Amphenol Tunisia", "Auditor", 1.5, True)
def StatusScreen(status, value, sleepTime = 0, goBlack = False):
global showMainScreen
showMainScreen = False
oled.fill(0)
oled.invert(1)
oled.text(status, ((16 - len(status)) * 8) // 2, 30 if value == False else 20)
if (not value == False): oled.text(value, ((16 - len(value)) * 8) // 2, 40)
oled.show()
if (sleepTime > 0): utime.sleep(sleepTime)
if (goBlack):
oled.invert(0)
oled.fill(0)
oled.show()
showMainScreen = True
async def blinkLights(duration = 1):
while True:
for indicator in indicators:
indicator.value(1)
await asyncio.sleep(duration)
indicator.value(0)
await asyncio.sleep(0.1)
async def showDisplay():
while True:
if (not showMainScreen): continue
current_time = utime.localtime()
#Format the current time as "dd/mm/yyyy HH:MM"
formatted_time = "{:02d}/{:02d}/{} {:02d}:{:02d}".format(current_time[2], current_time[1], current_time[0], current_time[3] + timezone, current_time[4])
operator = "{}/{}".format(employeeId, employeeDisplayName.split(" ")[0])
oled.fill(0)
oled.text(formatted_time, 0, 0)
oled.text(operator, ((16 - len(operator)) * 8) // 2, 10)
graphics.line(0, 20, 127, 20, 1)
oled.text(partNumber.replace(" ", ""), ((16 - len(partNumber.replace(" ", ""))) * 8) // 2, 23)
graphics.line(0, 32, 127, 32, 1)
oled.text("Qt: {}".format(counts["done"]), 0, 35)
oled.text("Nc: {}".format(len(NCs)), 0, 45)
oled.text("Dt: {}".format(counts["failure"]), 0, 55)
oled.show()
await asyncio.sleep(.5)
def checkWiFi():
while True:
continue
try:
my_ip = urequests.get("https://api.myip.com/").json()
print(my_ip)
status = True
except:
status = False
builtin_led.value(status)
print(status)
await asyncio.sleep(1)
def setUp(fast = False):
setUpOLED()
if (not fast): WelcomeScreen()
if (not fast): StatusScreen("Device ID", deviceId, 2, True)
if (not fast): StatusScreen("Connecting", secrets.WiFi["ssid"])
setUpWiFi(**secrets.WiFi)
if (not fast): StatusScreen("Connected", wlan.ifconfig()[0], 1.5, True)
if (not fast): StatusScreen("Time Sync", "")
setUpNTPTime()
if (not fast): StatusScreen("Time Sync", "Done", .5, True)
if (not fast): StatusScreen("Azure IoT Hub", "")
#setUpAzureIoTHub()
if (not fast): StatusScreen("Azure IoT Hub", "Connected", 1.5, True)
if (not fast): StatusScreen("Current Assignations", "")
getCurrentAssignation()
if (not fast): StatusScreen("Current Assignations", "Fetched", 1.5, True)
if (not fast): StatusScreen("Non Compliances", "")
getNonCompliances()
if (not fast): StatusScreen("Non Compliances", "Fetched", 1.5, True)
printAll()
if (not fast): StatusScreen("Finishing", False, 1.5)
setUpHandlers()
SetUpMenus()
if (not fast): StatusScreen("Welcome", False, 2, True)
async def main():
functions = [showDisplay(), blinkLights(1)]
tasks = [asyncio.create_task(f) for f in functions]
await asyncio.gather(*tasks)
setUp(False)
asyncio.run(main())