# py4sup_ch8_dcm_serial_v1a
# dew.ninja Apr 2024
# supplement of dcm_1joint
# connect to serial port of computer to exchange date
# update list:
# Apr 2024:
# - add $ to command and response so that it can be removed on the other side
# - use virtual serial port COM7 <-> COM8
# originally dcm1_controllers.py
# dew.ninja Sep 2023
# wokwi version using PWM/DIR control
# Controller implementation for dcm1 plant on Wokwi
# modified from sfic_ofc_pid_lag3.py
import time
from utime import sleep_ms
from machine import Pin,Timer, PWM, ADC, DAC, I2C #,UART
from neopixel import NeoPixel
#import sys
# ---- code for SSD1306 OLED. Comment out if it is not available
import ssd1306
# --- uart initialization -----
#uart = UART(1, 115200)
# ---- 4-point moving average
# ---- class implementation seems to have problem -------
# from ma4 import MA4
# mafilter = 1 # set to 1 to use average filter
# ma4=MA4()
# 4-point moving average implemented as function
# has 2 channels for y and x2
streaming = 0 # streaming mode flag
stream_downsamp = 10 # downsampling data stream
datacnts = 0
T = 0.01 # sampling period
T_ms = int(1000*T)
mafilter = 0
import array
ma_y = array.array('f',0.0 for _ in range (8))
ma_i = array.array('i',0 for _ in range (2))
ma_ysum = array.array('f',0.0 for _ in range (2))
ma_yavg = array.array('f',0.0 for _ in range (2))
def ma4_fn(ch, y): # ch = 0/1 for y/x2
global ma_y, ma_i, ma_ysum, ma_yavg
i = 4*ch+ma_i[ch]
ma_ysum[ch] -= ma_y[i]
ma_y[i] = y
ma_ysum[ch] += ma_y[i]
ma_yavg[ch] = ma_ysum[ch]/4
ma_i[ch]+=1
if ma_i[ch]>3:
ma_i[ch] = 0
return ma_yavg[ch]
# ESP32 Pin assignment
i2c = I2C(0, scl=Pin(22), sda=Pin(21))
oled_width = 128
oled_height = 64
oled = ssd1306.SSD1306_I2C(oled_width, oled_height, i2c)
def oled_showryu(oled,r,y,u,ulim):
oled.fill(0)
rtxt = "r : "+str(round(r*100,2)) +" deg."
ytxt = "y : "+str(round(y*100,2))+" deg."
utxt = "u : "+str(round(u,2))
ulimtxt = "ulim : "+str(round(ulim,2))
oled.text(rtxt,0,0)
oled.text(ytxt,0,15)
oled.text(utxt,0,30)
oled.text(ulimtxt,0,45)
oled.show()
oled_tprev = 0
#oled_showryu(oled,1,2,3)
# --------------------------------
# ----------neopixels representing motor movement ------------
NeoPixel_Pin = 15
NUMPIXELS = 24
pixels = NeoPixel(Pin(NeoPixel_Pin),NUMPIXELS)
#pixels[23]=(255,0,0) # test
#pixels.write()
def pixels_display():
angle_per_pixel = 360/NUMPIXELS # actually 360/24 but can adjust a bit
pixels.fill((0,0,0))
rpixel = int(r*100/angle_per_pixel) # pixel of command r
if rpixel<0:
rpixel = 0
if rpixel>NUMPIXELS-1:
rpixel = NUMPIXELS-1
ypixel = int(y*100/angle_per_pixel) # pixel of plant output y
if ypixel<0:
ypixel = 0
if ypixel>NUMPIXELS-1:
ypixel = NUMPIXELS-1
if rpixel == ypixel: # r match y exactly
pixels[rpixel] = (255,255,0) # red + green
else:
pixels[rpixel] = (0,255,0) # command in green
pixels[ypixel] = (255,0,0) # plant output in red
pixels.write()
# ------------------------------------------------
# -------------- adjust commad using VR ------------------
RCMD_PIN = 34
r_from_VR = False # change to true by r = -1
r_in = ADC(Pin(RCMD_PIN))
r_in.atten(ADC.ATTN_11DB) # range 0 - 3.6 V
# ---------------------------------------------
prompt = False # set prompt off/on so that it won't get in the way.
# range variables
PWMMAX = 1023 # maximum PWM value
PWMMID = 511
PWMMIN = 0
DACMAX = 255
DACMID = 127
DACMIN = 0
ADCMAX = 4095
ADCMID = 2047
ADCMIN = 0
UMAX = 3.3 # limit max controller output to 3.2 vols
UMID = 1.65
UMIN = 0.1 # min controller output
VEL_OFFSET = 2.0 # adjust this
pwm_out = PWM(Pin(5)) # output to lag3 board. Change from 16 to 5 for Wokwi
pwm_out.freq(1000) # set PWM frequency to 1 KHz (lower from 5KHz)
dac_out = DAC(Pin(25)) # this doesn't work on Wokwi
dir_out = Pin(19,Pin.OUT) # direction 0 = CW, 1 = CCW
y_in = ADC(Pin(39)) # ADC1_CH3. Shown as VN on Wokwi
y_in.atten(ADC.ATTN_11DB) # range 0 - 3.6 V
adc2v = 3.6/ADCMAX # max of ADC is set to 3.6 v by the package
v2deg = 100 # convert volt to degree
v2pwm = int(PWMMAX/3.3)
v2dac = int(DACMAX/3.3)
led = Pin(2, Pin.OUT)
t_data = 0 # time data output
# now can choose between PID and output-feedback controller (OFC)
# controller = 0, 1, 2 (PID, OFC, SFC)
controller = 0
plantsim = 0 # plant simulation mode. Plantsim is not available in this program.
# global variable for LSID
lsid = False # flag
PRBSVal = 1 # PRBS value, unit in volt
feedin = 0
bvec =[0]*13 # unit delay output
bvec[1] = bvec[2] = bvec[6] = bvec[7] = bvec[10] = bvec[12] = 1
# values for these PID coefficients are computed in PID_update
bi = 0
ad = 0
bd = 0
bt = 0
# controller states
ep = 0 # error for proportional term
e1 = 0 # true error (for integral term)
e0 = 0
eus1 = 0 # error for back calculation term
eus0 = 0
ed1 = 0 # error for derivative term
ed0 = 0
ui1 = 0 # integral-term outputs
ui0 = 0
ud1 = 0 # derivative-term outputs
ud0 = 0
# PID parameters
kp = 300 # PID gains
ki = 1.0
kd = 500
kt = 0.1 # back calculaiton gain
wp = 1 # proportional weight
wd = 1 # derivative weight
N = 50 # derivative filter coefficient
# --- global variables for output-feedback controller ----
# integrator (this may be used later for integrator + sfb shcieme
def integrator_coeff_compute():
return 0.5*T
a_int = integrator_coeff_compute()
# integrator states
u0_int = 0.0 # output
u1_int = 0.0 # past-output
e0_int = 0.0 # input
e1_int = 0.0 # past-input
def integrator(inp): # integrator function. Used for SFBI or custom control
global u0_int, u1_int, e0_int, e1_int
e1_int = e0_int
e0_int = inp
u1_int = u0_int
u0_int = u1_int + a_int*(e0_int + e1_int)
return u0_int
def leadlag_coeff_compute(a,b): # a = zero, b = pole
alpha1 = 2+a*T
alpha2 = a*T - 2
beta1 = 2+b*T
beta2 = b*T-2
gamma1 = -beta2/beta1
gamma2 = alpha1/beta1
gamma3 = alpha2/beta1
return gamma1, gamma2, gamma3
K_ofc = 8000 # output-feedback controller gain
ll_z = 5 # lead-lag zero
ll_p = 40 # lead-lag pole
g_1, g_2, g_3 = leadlag_coeff_compute(ll_z, ll_p)
# lead-lag states
u0_ll = 0.0 # output
u1_ll = 0.0 # past-output
e0_ll = 0.0 # input
e1_ll = 0.0 # past-input
def leadlag(inp): # lead-lag compute
global u0_ll, u1_ll, e0_ll, e1_ll
e1_ll = e0_ll
e0_ll = inp
u1_ll = u0_ll
u0_ll = g_1*u1_ll + g_2*e0_ll + g_3*e1_ll
return u0_ll
# ----- state feedback controller--------
# ---- state measurement -----
# pins used for state feedback is
# x2 = 32, x1(y) = 39
x_2 = ADC(Pin(32))
x_2.atten(ADC.ATTN_11DB)
# ---- variables for autotuning ----
autotune = False # turn autotune off/on
#relay_a = 0.0 # amplitude of output oscillation
relay_d = 1.0 # relay output level
cycle_cnts = 0 # count oscillation cycles before measuring Tu
at_ready2measure = False # flag to starts measuring osc magnitude and period
at_time = 0.0 # time since autotuning starts
Tu = 0.0 # ultimate period
Ku = 0.0 # ultimate gain
at_x_old = at_x_new = 0.0 # time output crosses zero (or UMID)
y_max = 0.0 # keep maximum y to compute at_a
r = 1.0 # ref. cmd
u = r # plant input
ulim = r
u_pwm = u*v2pwm
y = 0.0 # plant output
y_old = 0.0 # previous plant output
datasize = 200 # number of points in output capture
capture = 0 # capture mode 0 = off, 1 = on
capture_flag = False # indicates start/stop of data transfer
data_idx = 0 # data index
# ---- controller -----
feedback = 1 # 0 = open-loop, 1 = close-loop
timer=Timer(1)
# --- Functions for autotuning mode ----
# function to limit output between UMIN, UMAX and add offset UMID
def limit_and_offset(u):
u_lim = u
if u > UMID:
u_lim = UMID # limit u to UMID
elif u < -UMID:
u_lim = -UMID # limit u to -UMID
u_lim += UMID
return u_lim
# ------ function to generate pwm/dir outputs from controller output u
def convert2pwmdir(u):
u_lim = u
if u > UMAX:
u_lim = UMAX # limit u to UMAX
elif u < -UMAX:
u_lim = -UMAX # limit u to -UMAX
if u_lim >= 0: # positive u_lim = CW direction
u_dir = 0
u_pos = u_lim
else:
u_dir = 1
u_pos = -u_lim
return u_lim,u_pos, u_dir
# ---------------------------------------------------
def relay(y,d): # implement relay device
if plantsim:
if y>0:
rout = -d
else:
rout = d
else:
if y>UMID:
rout = -d
else:
rout = d
return rout
# measure oscillation amplitude period after 3 cycles
def measure_amp_period():
global at_ready2measure,cycle_cnts,at_time,at_x_old,at_x_new,\
Ku,Tu,y,y_max, autotune, prompt
at_time+= T # increase time during autotuning
if cycle_cnts>2:
#print('y = '+str(y))
#print('y_max = '+str(y_max))
if y > y_max: # keep maximum value
y_max = y
if plantsim:
if y_old < 0.0 and y >= 0.0: # crossing zero detected
at_x_old = at_x_new
at_x_new = at_time
cycle_cnts += 1 # cound osc cycle
if not capture_flag:
print("$Oscillation cycle : "+str(cycle_cnts))
else:
if y_old < UMID and y >= UMID: # crossing UMID detected
at_x_old = at_x_new
at_x_new = at_time
cycle_cnts += 1 # cound osc cycle
if not capture_flag:
print("$Oscillation cycle : "+str(cycle_cnts))
if cycle_cnts == 4:
at_ready2measure = True
if at_ready2measure:
Tu = at_x_new - at_x_old
if not capture_flag:
print("$Tu = "+str(Tu))
if plantsim:
at_a = y_max
else:
at_a = y_max - UMID
Ku = 7*(4*relay_d)/(22*at_a)
if not capture_flag:
# print("a = "+str(at_a))
print("$Ku = "+ str(Ku))
print("$Autotuning ends.")
autotune = False
at_pid_adjust()
#prompt=True
if not capture_flag:
#sys.stdout.write('\nEnter command : ') # show prompt
print("\n$Enter command : ")
def at_pid_adjust():
global kp, ki, kd, Ku, Tu
if not capture_flag:
print("$Updating PID parameters with Ku and Tu ")
kp = 0.6*Ku
ki = 1.2*Ku/Tu
kd = 0.075*Ku*Tu
if not capture_flag:
print("$kp = " + str(kp))
print("$ki = " + str(ki))
print("$kd = " + str(kd))
PID_update()
# compute PID coefficients, also update freeboard
def PID_update():
global ad, bd, bi, bt
bi = 0.5*T*ki
bt = 0.5*T*kt
ad1 = 1+0.5*N*T
ad2 = 0.5*N*T - 1
ad = -ad2/ad1
bd = kd*N/ad1
#update_dashboard()
# PID controller function
def PID_controller(r,y):
global e1,e0,ed1,ed0, eus1,eus0, ui1, ui0, ud1, ud0, u
# state transfer
e1 = e0
ed1 = ed0
eus1 = eus0
ui1 = ui0
ud1 = ud0
# compute errors for each term
e0 = r - y
ep0 = wp*r - y # weighted proportional error
ed0 = wd*r - y # weighted derivative error
up0 = kp*ep0 # output of P term
ui0 = ui1 +bi*(e0+e1) + bt*(eus0+eus1) # output of I term
ud0 = ad*ud1 +bd*(ed0 - ed1) # output of D term
u = up0 + ui0 + ud0
#u_lim = limit_and_offset(u)
return u
# output-feedback controller function
def OFC_controller(r,y):
u1 = leadlag(r-y)
#u2 = integrator(u1)
return K_ofc*u1
# state-feedback controller
K_sfic = [10,13.9] # state feedback gain
#ff_gain = 324 # feedforward gain
Ki_sfic = 5
def SFIC_controller(r,x2,y):
u = Ki_sfic*integrator(r-y) -K_sfic[0]*y -K_sfic[1]*x2
return u
# PRBS generator function
def PRBS_generator():
global bvec, feedin, PRBSVal
feedin = bvec[0] ^ (bvec[2]^(bvec[3]^ bvec[12]))
for j in range(12,1,-1):
bvec[j] = bvec[j-1]
#print(bvec[j])
bvec[1] = feedin
u_out = PRBSVal*(2*bvec[12] - 1)
return u_out
# ------------------- timer routine -------------------------------
# ----------------feedback is executed here ---------------
def timer_isr(event):
global a,b,T,u,ulim, u_states, y_states, capture_flag,t_data, \
data_idx,datasize, prompt, y, y_old, lsid, oled_tprev, \
oled,r, datacnts
led.value(not led.value())
if r_from_VR:
r=r_in.read()*adc2v
#y_adc = y_in.read() # read from ADC
#y1 = y_adc*adc2v # convert to volt
y_old = y
# if plantsim:
# y,x2,x1 = lag3(a,b,T,u, u_states, y_states) # simulation. in volt
# else:
# y = y_in.read_uv()*1e-6 # from electronics
# x1 = x_1.read_uv()*1e-6
# x2 = x_2.read_uv()*1e-6
yr = y_in.read()*adc2v # works better on Wokwi
x2r = x_2.read()*adc2v # need to offset this properly
#print(yr)
if mafilter: # perform 4-point average
# y = ma4.compute(yr)
# x2 = ma4.compute(x2r)
y = ma4_fn(0,yr)
x2 = ma4_fn(1,x2r)
else:
y = yr
x2 = x2r
#y2rgb() # just lid RGB LED. Not discussed in book.
if feedback:
if controller==0: # PID selected
if autotune:
u = relay(y,relay_d)
#ulim = limit_and_offset(u)
ulim, upos, udir = convert2pwmdir(u)
measure_amp_period()
else:
u = PID_controller(r,y)
ulim, upos, udir = convert2pwmdir(u)
elif controller==1: # output-feedback controller
u = OFC_controller(r,y)
ulim, upos, udir = convert2pwmdir(u)
#ulim = limit_and_offset(u)
elif controller==2: # state-feedback controller
u = SFIC_controller(r,x2,y)
ulim, upos, udir = convert2pwmdir(u)
else:
if lsid: # LSID mode
u = PRBS_generator()
ulim, upos, udir = convert2pwmdir(u)
else: # open-loop
u = r
ulim, upos, udir = convert2pwmdir(u)
# print("ulim={},upos={},udir={}".format(ulim,upos,udir))
u_pwm = int(upos*v2pwm) # v2pwm = int(PWMMAX/3.3)
#print(u_pwm)
pwm_out.duty(u_pwm) # send output via PWM pin 16
dir_out.value(udir) # direction 0 = CW, 1 = CCW
# pwm_out.duty(PWMMAX)
#u_dac = int(ulim*v2dac) # v2dac = int(DACMAX/3.3)
#dac_out.write(u_dac) # also send output to DAC1
# --- remove these lines if SSD1306 is not available ---
oled_tcurrent = time.ticks_ms()
if (oled_tcurrent - oled_tprev)>300: # display every 300 ms
oled_tprev = oled_tcurrent
oled_showryu(oled,r,y,u,ulim)
pixels_display() # display angle on NeoPixels
# ---------------------------------------------
if streaming: # send confinuous data
if datacnts == stream_downsamp:
print("{},{},{},{},{},{}".format(round(t_data,2),100*r,100*y,ulim,u,100*x2))
t_data+=(T*datacnts)
datacnts = 0
else:
datacnts += 1
elif capture_flag:
print("[{},{},{},{},{},{}],".format(round(t_data,2),100*r,100*y,ulim,u,100*x2))
t_data+=T
data_idx+=1
if data_idx==datasize:
data_idx = 0
t_data = 0.0
capture_flag = False # stop capture data
print("])") # end of np.array([ command
#prompt=True # prompt on
if lsid:
print("$LSID ends.")
lsid = False # LSID mode ends
# helper function for cmdInt()
# separate command and parameter
# at "="
def splitCmd(userstr):
result = userstr.find("=")
if result == -1:
noparm = 1
cmdstr = userstr.strip()
parmstr = '' # not used
else:
noparm = 0
splitstr = userstr.split("=")
cmdstr = splitstr[0].strip()
parmstr = splitstr[1].strip()
return cmdstr, parmstr, noparm
#command interpreter function
def cmdInt(userstr):
global r,datasize,capture,capture_flag,feedback,T,T_ms,a,b, prompt, mafilter,\
kp,ki,kd,kt,wp,wd,N, autotune, y, y_max, at_ready2measure, at_time,\
at_x_old, at_x_new, cycle_cnts, lsid,a_int,g_1,g_2,g_3, \
controller,r_from_VR, streaming, t_data, datacnts
cmdstr, parmstr, noparm = splitCmd(userstr)
if cmdstr.lower() == "$r" or cmdstr.lower() == "$step":
# toggle_mode = 0 # turn off toggle mode
# for this DCM model, r is specified in degrees
if noparm==1: # retrieve current r
print("$"+str(r))
else:
r_tmp = float(parmstr)
if r_tmp==-1: # change to cmd from VR mode
r_from_VR = True
print("$Ref. comd is now from VR")
else:
r_from_VR = False
if r_tmp > 330: # limit range
r_tmp = 330
elif r_tmp < 0.0:
r_tmp = 0.0
r = r_tmp/100
if capture: # capture mode on
capture_flag = True
print("datamat = np.array([") # head of data matrix
prompt=False # turn prompt off
# update_dashboard()
# elif cmdstr.lower() == "psim":
# if noparm==1:
# print(plantsim)
# else:
# plantsim = int(parmstr)
# if plantsim > 1:
# plantsim = 1
# elif plantsim < 0:
# plantsim = 0
# if plantsim:
# reset_lag3_states()
elif cmdstr.lower() == "$capture":
if noparm==1:
print("$"+str(capture))
else:
capture = int(parmstr)
if capture > 1:
capture = 1
elif capture < 0:
capture = 0
elif cmdstr.lower() == "$streaming":
if noparm==1:
print("$"+str(streaming))
else:
t_data = 0.0
datacnts = 0
streaming = int(parmstr)
if streaming > 1:
streaming = 1
elif streaming < 0:
streaming = 0
elif cmdstr.lower() == "$datasize":
if noparm==1:
print("$"+str(datasize))
else:
datasize = int(parmstr)
if datasize > 2000: # set maximum datasize
datasize = 2000
elif datasize < 10: # minimum datasize
datasize = 10
elif cmdstr.lower() == "$feedback":
if noparm==1:
print("$"+str(feedback))
else:
feedback = int(parmstr)
if feedback > 1:
feedback = 1
elif feedback < 0:
feedback = 0
elif cmdstr.lower() == "$mafilter": # turn MA4 filter off/on
if noparm==1:
print("$"+str(mafilter))
else:
mafilter = int(parmstr)
if mafilter > 1:
mafilter = 1
elif mafilter < 0:
mafilter = 0
# addeed in ofc_pid_lag3.py June 2023
elif cmdstr.lower() == "$controller":
if noparm==1:
print("$"+str(controller))
else:
parm_tmp = int(parmstr)
if parm_tmp > 2 or parm_tmp < 0:
if not capture_flag:
print("$Bad controller choice")
else:
controller = parm_tmp
# if plantsim:
# reset_lag3_states()
elif cmdstr.lower() == "$t":
if noparm==1:
print("$"+str(T))
else:
T = float(parmstr)
if T > 1: # set maximum T
T = 1
elif T < 0.001: # minimum T
T = 0.001
T_ms = int(1000*T)
#a,b = lag3_init(T)
PID_update()
# recompute coefficients for output feedback controller
a_int = integrator_coeff_compute()
g_1, g_2, g_3 = leadlag_coeff_compute(ll_z, ll_p)
timer.init(period=T_ms, mode=Timer.PERIODIC,
callback=timer_isr)
elif cmdstr.lower() == "$kp":
if noparm==1:
print("$"+str(kp))
else:
kp = float(parmstr)
if kp > 100: # set maximum kp
kp = 100
elif kp < 0: # minimum kp
kp = 0
PID_update()
# update_dashboard()
elif cmdstr.lower() == "$ki":
if noparm==1:
print("$"+str(ki))
else:
ki = float(parmstr)
if ki > 100: # set maximum ki
ki = 100
elif ki < 0: # minimum ki
ki = 0
PID_update()
#update_dashboard()
elif cmdstr.lower() == "$kd":
if noparm==1:
print("$"+str(kd))
else:
kd = float(parmstr)
if kd > 100: # set maximum kd
kd = 100
elif kd < 0: # minimum kd
kd = 0
PID_update()
#update_dashboard()
elif cmdstr.lower() == "$kt":
if noparm==1:
print("$"+str(kt))
else:
kt = float(parmstr)
if kt > 100: # set maximum kt
kt = 100
elif kt < 0: # minimum kt
kt = 0
PID_update()
#update_dashboard()
elif cmdstr.lower() == "$wp":
if noparm==1:
print("$"+str(wp))
else:
wp = float(parmstr)
if wp > 10: # set maximum wp
wp = 10
elif wp < 0: # minimum wp
wp = 0
PID_update()
#update_dashboard()
elif cmdstr.lower() == "$wd":
if noparm==1:
print("$"+str(wd))
else:
wd = float(parmstr)
if wd > 10: # set maximum wd
wd = 10
elif wd < 0: # minimum wd
wd = 0
PID_update()
#update_dashboard()
elif cmdstr.lower() == "$n":
if noparm==1:
print("$"+str(N))
else:
N = float(parmstr)
if N > 200: # set maximum N
N = 200
elif N < 2: # minimum N
N = 2
PID_update()
#update_dashboard()
elif cmdstr.lower() == "$autotune":
prompt=False
if not capture:
print("$Autotuning starts ...")
autotune = True
at_ready2measure = False # clear flag
at_time = 0.0
at_x_old = at_x_new = 0.0
y_max = 0.0
cycle_cnts = 0
if capture:
print("datamat = np.array([") # head of data matrix
prompt=False # turn prompt off
capture_flag = True
elif cmdstr.lower() == "$y":
print("$"+str(y))
elif cmdstr.lower() == "$ku": # ultimate gain from autotune
print("$"+str(Ku))
elif cmdstr.lower() == "$tu": # ultimate period from autotune
print("$"+str(Tu))
elif cmdstr.lower() == "$lsid":
feedback = 0 # in open-loop mode
lsid = True
# outputting data
print("datamat = np.array([") # head of data matrix
prompt=False # turn prompt off
capture_flag = True # output data
elif cmdstr.lower() == "$update":
print("${},{},{},{},{},{},{},{},{},{},{},{},{}".format(T,
streaming,capture,datasize,feedback,controller,kp,ki,kd,
kt,wp,wd,N))
else:
print("$Invalid command")
def user_input():
if prompt:
#sys.stdout.write('\nEnter command : ')
print("\n$Enter command : ")
#user_str = sys.stdin.readline()
#newline_char = sys.stdin.readline()
user_str = input()
cmdInt(user_str)
timer.init(period=T_ms, mode=Timer.PERIODIC,
callback=timer_isr)
PID_update()
while True:
user_input()
Loading
ssd1306
ssd1306