# microPython v1.22 Raspberry Pi Pico
import array
import math
import rp2
import gc
import time
from machine import Pin, Timer
GPIO1 = Pin(1, Pin.OUT)
GPIO2 = Pin(2, Pin.OUT)
@rp2.asm_pio(set_init=rp2.PIO.OUT_LOW,
out_shiftdir=rp2.PIO.SHIFT_LEFT,
in_shiftdir=rp2.PIO.SHIFT_LEFT,
autopull=True
)
def custom_program():
# Initialize ISR
set(pins, 0b1) # Set a different pin to high for error
set(x, 0b1) # Set x to 1
mov(isr, x) # Copy x into the ISR
in_(null, 10) # Shift in half the window
label("start_cycle")
mov(x, isr)
out(y, 32)
jmp(not_y, "start_cycle")
label("lead_cycle")
jmp(x_dec, "test_lead_cycle")
label("test_lead_cycle")
jmp(x_not_y, "lead_cycle")
label("rise")
set(pins, 1)
mov(y, invert(y))
label("lag_cycle")
jmp(x_dec, "test_lag_cycle")
label("test_lag_cycle")
jmp(x_not_y, "lag_cycle")
label("fall")
set(pins, 0)
mov(y, invert(isr))
label("end_cycle")
jmp(x_dec, "test_end_cycle")
label("test_end_cycle")
jmp(x_not_y, "end_cycle")
jmp("start_cycle") # Loop back to the start
set_pin = GPIO1
frequency = 4000
timer_count = 0 # global variable
data_queue = [] # global variable
machines = [] # global variable
QUEUE_THRESHOLD = 12
# Initialize the PIO state machine on a specified pin, for example, GP15
sm = rp2.StateMachine(0, custom_program, freq=frequency, set_base=GPIO1)
machines.append(sm)
# Start the state machine
sm.active(1)
def produce_sinus(frames, frames_per_cycle, min_val, max_val, offset=0):
# Generates sinusoidal values in the range [min_val, max_val]
sinus = array.array('I') # Use 'I' for unsigned int if the PIO uses 32-bit; adjust as necessary
for i in range(frames):
angle = 2 * math.pi / frames_per_cycle + (math.pi * offset / 180)
value = int((max_val - min_val) * (1 + math.sin(angle * i)) / 2 + min_val)
sinus.append(value)
return sinus
def interruption_handler(pin):
global timer_count
timer_count += 1
send_to_pio()
def add_to_queue(frames, frames_per_cycle, min_duty, max_duty):
global data_queue
gc.collect()
for_pin1 = produce_sinus(frames, frames_per_cycle, min_duty, max_duty, 0)
for_pin2 = produce_sinus(frames, frames_per_cycle, min_duty, max_duty, 90)
for i in range(10):
data_queue.append([0, for_pin1[i]])
## data_queue.append([1, for_pin2[i]])
gc.collect()
def send_to_pio():
waiting = False
while len(data_queue) > 0 and not waiting:
## peek the first item
item = data_queue[0]
machine = machines[item[0]]
# Check if there is space in the TX FIFO
# Assuming each item in the data_queue is a single word (32 bits) to be sent
print(f"ff{machine.tx_fifo()} d{item[1]}")
if machine.tx_fifo() >= 4: # If there's less space than needed for one item
waiting = True
else:
# There's enough space in the FIFO, so send the data
# Here, converting item[1] to the format expected by the FIFO, if necessary
machine.put(item[1])
data_queue.pop(0) # Remove the item from the queue after sending
if __name__ == "__main__":
frames = 10
timer_count = 0
window = 2000.0
win_msecs = math.floor(2 * 800 * window / frequency )
frame_load_sec = frames * win_msecs / 1000
add_to_queue(10, 30, 10, 90)
soft_timer = Timer(mode=Timer.PERIODIC, period=win_msecs, callback=interruption_handler)
while True:
print(f"xxx{frame_load_sec}")
t = time.ticks_us()
if timer_count > 3 and QUEUE_THRESHOLD>len(data_queue):
timer_count = 0
print(f"addtoqueue {t}")
add_to_queue(10, 30, 10, 90)
time.sleep(frame_load_sec)
print(f"cycle {t} {win_msecs}")