import time
import init as pyRTOS
from machine import Pin, PWM, ADC
from message import MessageQueue, MESSAGE_TYPE_ADC_VALUE, Message # เพิ่ม Message ไปยังการนำเข้า
duty_us = 0
# Define tasks
def task1(self):
led1 = Pin(2, Pin.OUT)
led1.off()
yield
while True:
time.sleep_ms(10)
led1.toggle()
print("Task 1 is running")
yield [pyRTOS.timeout_ns(100000000)] # Sleep for 1 second
def task2(self):
led2 = Pin(7, Pin.OUT)
led2.off()
yield
while True:
time.sleep_ms(10)
led2.toggle()
print("Task 2 is running")
yield [pyRTOS.timeout_ns(300000000)] # Sleep for 1.5 seconds
def controller(self):
global duty_us
MIN_DUTY = 500
MAX_DUTY = 2500
MAX_U16 = 65535
dY = MAX_DUTY - MIN_DUTY
adc = ADC(Pin(28))
pwm_message_queue = MessageQueue()
yield
while True:
time.sleep_ms(10)
print("Task 3 is running")
yield [duty_mutex.lock(self)]
duty_us = (adc.read_u16()*dY/65535)+MIN_DUTY
duty_mutex.unlock()
# Create a message with correct message type
pwm_message = Message(type=MESSAGE_TYPE_ADC_VALUE, source="controller", target="servo", message=duty_us)
yield [pwm_message_queue.send(pwm_message)] # ส่งข้อความไปยัง Message Queue
yield [pyRTOS.timeout_ns(50000000)]
def servo(self):
global duty_us, duty_mutex, dY, MIN_DUTY, MAX_U16
pwm = PWM(Pin(18))
pwm.freq(50)
pwm.duty_ns(500000)
yield
while True:
time.sleep_ms(10)
print(duty_us)
# รับข้อความจาก Message Queue
messages = self.recv()
if messages: # ตรวจสอบว่ามีข้อความใหม่ในคิวหรือไม่
for message in messages:
# ตรวจสอบว่า message ไม่เป็น None และมี attribute 'type' และ type เป็น
if message and hasattr(message, 'type') and message.type == MESSAGE_TYPE_ADC_VALUE:
# ใช้ข้อมูลจากข้อความเพื่อควบคุม PWM
adc_value = message.message
duty_us = (adc_value * dY / MAX_U16) + MIN_DUTY
pwm.duty_ns(int(duty_us * 1000))
yield [pyRTOS.timeout_ns(50000000)]
# Create tasks
flash1 = pyRTOS.Task(task1,priority=1,name='task1')
flash2 = pyRTOS.Task(task2,name='task2')
crtl = pyRTOS.Task(controller,name='controller',mailbox=True)
pwm_servo = pyRTOS.Task(servo,name='servo',mailbox=True)
duty_mutex = pyRTOS.Mutex()
crtl.deliver(duty_mutex)
pwm_servo.deliver(duty_mutex)
pyRTOS.add_task(flash1)
pyRTOS.add_task(flash2)
pyRTOS.add_task(crtl)
pyRTOS.add_task(pwm_servo)
# Start the scheduler
pyRTOS.start()