#__verion__ = '1.2.0'
#Release_Date = 'Sep-2024'
from machine import Pin, SPI
from utime import sleep_ms
from utility_actuators import SERVO, MOSFET
from utility_display import DISPLAY
from utility_aux import PICO_NETWORK, SD, RTC
from utility_menu import MENU
from gc import mem_free
class POLISHER:
"""
A class to control the polishing process of the polishing machine.
"""
def __init__(self):
#Creating the Servos
self.data_file_path = '/sd/MacConfig.dat'
#self.servo_1 = SERVO(servo_pin=18, min_duty=1500, max_duty=8150)
#self.servo_2= SERVO(servo_pin=19, min_duty=1500, max_duty=8150)
#self.servo_3 = SERVO(servo_pin=20, min_duty=1500, max_duty=8150)
display_mem = mem_free()
self.servos = (
SERVO(servo_pin=13, min_duty=1500, max_duty=8150),
SERVO(servo_pin=14, min_duty=1500, max_duty=8150),
SERVO(servo_pin=15, min_duty=1500, max_duty=8150)
)
#Creating the Mosfets
self.mosfets = (
MOSFET(mosfet_pin=18, frequancy=50),
MOSFET(mosfet_pin=19, frequancy=50)
)
print(f"Actuators {(display_mem - mem_free())/1024} Kb")
#self.mosfet_1 = MOSFET(mosfet_pin=21, frequancy=50)
#self.mosfet_2 = MOSFET(mosfet_pin=22, frequancy=50)
shared_baudrate = 40000000
spi_bus = SPI(
1,
baudrate=shared_baudrate,
polarity=0,
phase=0,
bits=8,
firstbit=SPI.MSB,
sck=Pin(10),
mosi=Pin(11),
miso=Pin(8)
)
#SD
self.sd_card = SD(
spi_bus=spi_bus, #overwrite the init
spi_id=1,
spi_sck_pin=10,
spi_mosi_pin=11,
spi_cs_pin=12,
spi_dc_pin=8
)
#RTC
self.rtc= RTC(
i2c_id=0,
i2c_scl_pin=17,
i2c_sda_pin=16
)
sleep_ms(50)
#Menu
display_mem = mem_free()
self.menu = MENU(program_name='Polisher',program_version='1.1',program_update='28-08-2024')
print(f"Menu_memory {(display_mem - mem_free())/1024} Kb")
default_plain_text = '''
Servo #1 Volume:5.0 cc/cycle
Servo #1 Cycles:5
Servo #1-2 Delay:1 sec
Servo #2 Cycles:1
Servo #2-3 Delay:5 sec
Servo #3 Cycles:1
Servo #3-1 Delay:5 sec
Servo #1 Start:0.0 %
Servo #1 End:50.0 %
Servo #2 Start:0.0 %
Servo #2 End:50.0 %
Servo #3 Start:0.0 %
Servo #3 End:50.0 %
Data Point Every:5 cycles
Data Point:Enable
WiFi Network Name:SSID
Password:SSID_pass
Mosfet #1 Duty Cycle:50 %
Mosfet #1 Delay: 9.9 sec
Mosfet #2 Duty Cycle:50 %
Mosfet #2 Delay: 9.9 sec
'''
#Read the data file from SD card and update the menu
self.data_from_file = ''
self.data_from_file = self.sd_card.read(file_path=self.data_file_path)
if self.data_from_file != '':
self.menu.import_text(plain_text=self.data_from_file)
print('successfully got the data from the SD card')
else:
print(f"creating {self.data_file_path}, with the default plain text")
self.sd_card.write(file_path=self.data_file_path, content=default_plain_text)
self.menu.import_text(plain_text=default_plain_text)
self.data_from_file = default_plain_text
#creating pico network handler
self.pico_network = PICO_NETWORK()
_ = self.menu.get_menu_data()
#data_dict_keys =>
#['Servo #1 Volume', 'Servo #1 Cycles', 'Servo #1-2 Delay', 'Servo #2 Cycles', 'Servo #2-3 Delay', 'Servo #3 Cycles', 'Servo #3-1 Delay', 'Servo #1 Start', 'Servo #1 End', 'Servo #2 Start', 'Servo #2 End', 'Servo #3 Start', 'Servo #3 End', 'Data Point Every', 'Data Point', 'WiFi Network Name', 'Password', 'Mosfet #1 Duty Cycle', 'Mosfet #1 Delay', 'Mosfet #2 Duty Cycle', 'Mosfet #2 Delay']
#Define LCD display
display_mem = mem_free()
self.spi_display = DISPLAY(
spi_bus=spi_bus,
rtc=self.rtc,
menu=self.menu,
pico_network=self.pico_network,
# spi_id=1,
# spi_sck_pin=10,
# spi_mosi_pin=11,
spi_cs_pin=9,
spi_dc_pin=8,
spi_rst_pin=6,
spi_bl_pin=7,
display_width=240,
display_height=320,
nav_encoder_clk_pin = 0,
nav_encoder_dt_pin = 1,
nav_encoder_sw_pin = 2,
sel_encoder_clk_pin = 3,
sel_encoder_dt_pin = 4,
sel_encoder_sw_pin = 5,
display_rotation = 1
)
print(f"Display_memory {(display_mem - mem_free())/1024} Kb")
#print(f"Connecting to {data_dict['WiFi Network Name']}:{data_dict['Password']}")
self.execution_data = {
"run": 0,
"servo #1 cycles" : 0,
"servo #1 delay" : 0,
"servo #1 status": False,
"servo #1 status_delay":False,
"servo #2 cycles" : 0,
"servo #2 delay" : 0,
"servo #2 status": False,
"servo #3 cycles" : 0,
"servo #3 delay" : 0,
"servo #3 status": False,
"mosfet #1 status": False,
"mosfet #2 status": False,
}
self.reset = False
self.working_logfile = ''
def update_execution_screen(self):
"""
update the execution screen on display
"""
self.execution_data['mosfet #1 status'] = self.mosfets[0].status
self.execution_data['mosfet #2 status'] = self.mosfets[1].status
if self.spi_display.curr_screen == 'Return To Execution':
options = {
'Sequence #': [self.execution_data['run'], 'run'],
'Volume/Run': [self.menu.data['Servo #1 Volume'] * self.menu.data['Servo #1 Cycles'], 'cc/run'],
'Total volume': [self.menu.data['Servo #1 Volume'] * self.menu.data['Servo #1 Cycles']*self.execution_data['run'], 'cc'],
'Servo 1 Cycle': [
[self.execution_data['servo #1 cycles'], self.execution_data['servo #1 status']],
[self.execution_data['servo #1 delay'], self.execution_data['servo #1 status'] or self.execution_data['servo #1 status_delay']]
],
'Servo 2 Cycle': [
[self.execution_data['servo #2 cycles'], self.execution_data['servo #2 status']],
[self.execution_data['servo #2 delay'], self.execution_data['servo #2 status']]
],
'Servo 3 Cycle': [
[self.execution_data['servo #3 cycles'], self.execution_data['servo #3 status']],
[self.execution_data['servo #3 delay'], self.execution_data['servo #3 status']]
],
'Output': [
['Loading',self.execution_data['mosfet #1 status']],
['Discharge', self.execution_data['mosfet #2 status']]
],
'Return to Main Menu': None
}
self.spi_display.display_execution(options=options)
#gc.collect()
def check_execution_condition(self)->str:
"""
check the execution condition RUN-Pause-STOP/RESET
Returns:
str: execution condition value, returns '' if not in RUN-Pause-STOP/RESET
"""
execution_condition = self.menu.get_execution_condition()
if execution_condition == 'Pause':
while True:
sleep_ms(100)
execution_condition = self.menu.get_execution_condition()
if execution_condition in ['RUN','STOP/RESET']:
self.menu.data = self.menu.get_menu_data()
break
if execution_condition =='STOP/RESET':
self.execution_data = {
"run": 0,
"servo #1 cycles" : 0,
"servo #1 delay" : 0,
"servo #1 status": False,
"servo #1 status_delay":False,
"servo #2 cycles" : 0,
"servo #2 delay" : 0,
"servo #2 status": False,
"servo #3 cycles" : 0,
"servo #3 delay" : 0,
"servo #3 status": False,
"mosfet #1 status": False,
"mosfet #2 status": False,
}
#gc.collect()
return 'STOP/RESET'
elif execution_condition == 'RUN':
#gc.collect()
return 'RUN'
else:
return ''
def reset_func(self):
"""
reset the execution data and the menu data
"""
print('Reseting ...')
self.execution_data = {
"run": 0,
"servo #1 cycles" : 0,
"servo #1 delay" : 0,
"servo #1 status": False,
"servo #1 status_delay":False,
"servo #2 cycles" : 0,
"servo #2 delay" : 0,
"servo #2 status": False,
"servo #3 cycles" : 0,
"servo #3 delay" : 0,
"servo #3 status": False,
"mosfet #1 status": False,
"mosfet #2 status": False,
}
for i,servo in enumerate(self.servos):
servo.set_servo_duty_percentage(percentage=self.menu.data[f'Servo #{i+1} End'])
for mosfet in self.mosfets:
mosfet.disable()
self.working_logfile = ''
#gc.collect()
def servo_toggle(self,servo:object, servo_number:str, delay_value:float)->bool:
"""
Toggle the servo angle between start/stop
Args:
servo (object): servo object to be controlled
servo_number (str): servo number
delay_value (float): delay of the servo
Returns:
bool: reset command captured (True/False)
"""
self.reset = False
self.execution_data[f'servo #{servo_number} status'] = True
for cycle in range(int(self.menu.data[f'Servo #{servo_number} Cycles'])):
if cycle > self.menu.data[f'Servo #{servo_number} Cycles']:
break #incease of updated the cycle within a pause command
self.execution_data[f'servo #{servo_number} cycles'] +=1
servo.set_servo_duty_percentage(percentage=self.menu.data[f'Servo #{servo_number} End'])
#servo moves to end position then wait for delay_value
#print(f"delay for {delay_value} sec")
self.execution_data[f'servo #{servo_number} delay'] = delay_value
if delay_value < 1:
self.update_execution_screen()
sleep_ms(int(delay_value*1000))
else:
for _ in range(int(delay_value)):
#self.execution_data[f'servo #{servo_number} delay'] +=1
self.update_execution_screen()
self.execution_data[f'servo #{servo_number} delay'] -=1
sleep_ms(1000)
if self.execution_data[f'servo #{servo_number} delay']>int(delay_value):
self.update_execution_screen()
sleep_ms(int(delay_value-self.execution_data[f'servo #{servo_number} delay']))
self.execution_data[f'servo #{servo_number} delay'] = 0
execution_condition = self.check_execution_condition()
if execution_condition == 'STOP/RESET':
self.reset = True
break
servo.set_servo_duty_percentage(percentage=self.menu.data[f'Servo #{servo_number} Start'])
#servo moves to start position then wait for delay_value
#print(f"delay for {delay_value} sec")
self.execution_data[f'servo #{servo_number} delay'] = delay_value
if delay_value < 1:
self.update_execution_screen()
sleep_ms(int(delay_value*1000))
self.execution_data[f'servo #{servo_number} delay'] = 0
else:
for _ in range(int(delay_value)):
#self.execution_data[f'servo #{servo_number} delay'] +=1
self.update_execution_screen()
self.execution_data[f'servo #{servo_number} delay'] -=1
sleep_ms(1000)
if self.execution_data[f'servo #{servo_number} delay']>int(delay_value):
self.update_execution_screen()
sleep_ms(delay_value-self.execution_data[f'servo #{servo_number} delay'])
self.execution_data[f'servo #{servo_number} delay'] =0
execution_condition = self.check_execution_condition()
if execution_condition == 'STOP/RESET':
self.reset = True
break
self.execution_data[f'servo #{servo_number} status'] = False
if self.reset:
self.reset = False
self.reset_func()
return True
servo.set_servo_duty_percentage(percentage=self.menu.data[f'Servo #{servo_number} Start'])
#self.execution_data[f'servo #{servo_number} cycles'] =0
self.execution_data[f'servo #{servo_number} Delay'] =0
print(f'servo #{servo_number} Done... ')
return False
def generate_logfile_name(self)->str:
"""
generate the logfile name in formate of DDMMYYYHHMM.dat
Returns:
str: DDMMYYYHHMM.dat
"""
date_time = self.rtc.read_time() #'YYYY/MM/DD hh:mm:ss dayname'
date_time_split = date_time.split(' ')
file_name_list = [
"/sd/",
date_time_split[0].split('/')[2],
date_time_split[0].split('/')[1],
date_time_split[0].split('/')[0],
date_time_split[1].split(':')[0],
date_time_split[1].split(':')[1],
".dat"]
return "".join(file_name_list)
def log_snap(self)->str:
"""
generate log time snap DDMMHHMM
Returns:
str: DDMMHHMM
"""
date_time = self.rtc.read_time() #'YYYY/MM/DD hh:mm:ss dayname'
date = date_time.split(' ')[0]
time_value = date_time.split(' ')[1]
#DDMMHHMM
#return f"{date.split('/')[2]}{date.split('/')[1]}{time_value.split(':')[0]}{time_value.split(':')[1]}"
snap_time = ''
snap_time += date.split('/')[2]
snap_time += date.split('/')[1]
snap_time += time_value.split(':')[0]
snap_time += time_value.split(':')[1]
return snap_time
def update_log(self):
"""
update logfile
"""
if self.menu.data['Data Point'] in ['Enable','enable'] and self.menu.data['Data Point Every'] >0:
if self.execution_data['run']%self.menu.data['Data Point Every']==0:
log_data = ''
#C volume
#D servo 1 cycles
#E servo 2 cycles
#F total volume
snap_time = self.log_snap()
#C = data_dict['Servo #1 Volume']
#D = data_dict['Servo #1 Cycles']
#E = data_dict['Servo #2 Cycles']
#F = data_dict['Servo #1 Volume'] * data_dict['Servo #1 Cycles']*self.execution_data['run']
log_data += f'{snap_time}\n'
log_data += f'{self.menu.data['Servo #1 Volume']}\n'
log_data += f'{self.menu.data['Servo #1 Cycles']}\n'
log_data += f'{self.menu.data['Servo #2 Cycles']}\n'
log_data += f'{self.menu.data['Servo #1 Volume'] * self.menu.data['Servo #1 Cycles']*self.execution_data['run']}\n'
if self.working_logfile == '':
self.working_logfile = self.generate_logfile_name()
print(f'loggin into {self.working_logfile}')
print(log_data)
self.sd_card.write(file_path=self.working_logfile, content=log_data, overwrite=True)
else:
self.sd_card.write(file_path=self.working_logfile, content=log_data, overwrite=False)
# part_memory = gc.mem_alloc()- init_allocated
# print('=======Before Main=======')
# print(f"Allocated Memory ==> {gc.mem_alloc()} Bytes")
# print(f"Free Memory ==> {gc.mem_free()} Bytes")
# print(f"Allocated Memory for the part ==> {part_memory} Bytes ({round(part_memory/1024,2)} Kb)")
# print('==========================')
if __name__ == '__main__':
polisher = POLISHER()
if polisher.menu.data['WiFi Network Name'] != '':
polisher.pico_network.connect_to_wifi(
ssid=polisher.menu.data['WiFi Network Name'],
password=polisher.menu.data['Password'], timeout=30)
while True:
plain_text = polisher.menu.export_text()
if plain_text != polisher.data_from_file:
print('update the settings file')
print(plain_text) #to test the update of the settings file
polisher.sd_card.write(file_path=polisher.data_file_path, content=f'{plain_text}', overwrite=True)
polisher.data_from_file =plain_text
#polisher.update_execution_screen()
execution_condition = polisher.check_execution_condition()
if execution_condition == 'RUN' :#or True:
print('Start Polishing...')
#uncomment the next line to force move to execution screen on RUN
#polisher.spi_display.curr_screen = 'Return To Execution'
polisher.execution_data['run'] += 1
polisher.update_log()
polisher.execution_data['servo #1 cycles'] = 0
polisher.execution_data['servo #1 delay'] = 0
polisher.execution_data['servo #2 cycles'] = 0
polisher.execution_data['servo #2 delay'] = 0
polisher.execution_data['servo #3 cycles'] = 0
polisher.execution_data['servo #3 delay'] = 0
#Mosfet #1 Enabled
polisher.mosfets[0].enable(
percentage=polisher.menu.data['Mosfet #1 Duty Cycle'],
period_sec=polisher.menu.data['Mosfet #1 Delay']
)
#Servo #1
#servo #1 moves to end position then wait for the delay 250ms
#servo #1 moves to start position then wait for the delay 250ms
reset_status = polisher.servo_toggle(
servo=polisher.servos[0],
servo_number='1',
delay_value= 0.25)
if reset_status:
continue
#delay of servo 1-2
#print(f"sleeping for {polisher.menu.data['Servo #1-2 Delay']} sec")
polisher.execution_data["servo #1 status_delay"] = True
delay_value = polisher.menu.data['Servo #1-2 Delay']
polisher.execution_data['servo #1 delay'] = polisher.menu.data['Servo #1-2 Delay']
if delay_value < 1:
polisher.update_execution_screen()
sleep_ms(int(delay_value*1000))
polisher.execution_data['servo #1 delay'] = 0
else:
for _ in range(int(delay_value)):
polisher.execution_data['servo #1 delay'] -=1 #or +=1 for counting up with changing the polisher.execution_data[f'servo #1-2 delay'] int value to 0
polisher.update_execution_screen()
sleep_ms(1000)
if delay_value >int(delay_value):
polisher.update_execution_screen()
sleep_ms(int((delay_value - int(delay_value))*1000))
polisher.execution_data["servo #1 status_delay"] = False
polisher.execution_data['servo #1 delay'] = 0
#sleep_ms(int(polisher.menu.data['Servo #1-2 Delay']*1000))
#ensures the mosfet #1 is disable before moving forward
while polisher.mosfets[0].status:
sleep_ms(1)
polisher.update_execution_screen()
execution_condition = polisher.check_execution_condition()
if execution_condition == 'STOP/RESET':
polisher.reset_func()
reset = False
continue
#Servo #2
#servo #2 moves to end position then wait for the delay defined into Servo #2-3 Delay
#servo #2 moves to start position then wait for the delay defined into Servo #2-3 Delay
reset_status = polisher.servo_toggle(
servo=polisher.servos[1],
servo_number='2',
delay_value= polisher.menu.data[f'Servo #2-3 Delay'])
if reset_status:
continue
#Mosfet #2
polisher.mosfets[1].enable(
percentage=polisher.menu.data['Mosfet #2 Duty Cycle'],
period_sec=polisher.menu.data['Mosfet #2 Delay']
)
polisher.update_execution_screen()
execution_condition = polisher.check_execution_condition()
if execution_condition == 'STOP/RESET':
polisher.reset_func()
reset = False
continue
#Servo #3
#servo #3 moves to end position then wait for the delay defined into Servo #3-1 Delay
#servo #3 moves to start position then wait for the delay defined into Servo #3-1 Delay
reset_status = polisher.servo_toggle(
servo=polisher.servos[2],
servo_number='3',
delay_value= polisher.menu.data[f'Servo #3-1 Delay'])
if reset_status:
continue
#ensures the mosfet #1 is disable before moving forward
while polisher.mosfets[1].status:
sleep_ms(1)
elif execution_condition =='STOP/RESET':
polisher.reset_func()
reset = False
continue
sleep_ms(250)