import sys
import time
import machine
import uselect
from machine import Pin, I2C, PWM
# ---------- Pines ----------
PWM_PIN = 15 # Salida PWM hacia driver del motor
LED_PIN = "LED" # LED onboard
I2C_ID = 0 # I2C0 en GP4(SDA), GP5(SCL)
SDA_PIN = 4
SCL_PIN = 5
# ---------- MPU6050 (direcciones y registros) ----------
MPU_ADDR = 0x68
REG_PWR_MGMT_1 = 0x6B
REG_SMPLRT_DIV = 0x19
REG_CONFIG = 0x1A
REG_GYRO_CFG = 0x1B
REG_ACCEL_CFG = 0x1C
REG_ACCEL_XOUT_H = 0x3B
# Escala de acelerómetro ±2g => 16384 LSB/g
ACC_LSB_PER_G = 16384.0
# ---------- Objetos hardware ----------
pwm = PWM(Pin(PWM_PIN))
pwm.freq(20000) # 20 kHz
pwm.duty_u16(0)
led = Pin(LED_PIN, Pin.OUT)
led.off()
i2c = I2C(I2C_ID, scl=Pin(SCL_PIN), sda=Pin(SDA_PIN), freq=400_000)
# ---------- Estado/config ----------
state = {
"pwm_freq": 20000, # Hz
"pwm_duty": 0, # %
"sr_hz": 500, # lecturas por segundo (50..1000 recomendado)
"stream": False,
"mpu_ok": False
}
# ---------- Utilidades de MPU6050 ----------
def _i2c_write(reg, val):
i2c.writeto_mem(MPU_ADDR, reg, bytes([val]))
def _i2c_read(reg, n=1):
return i2c.readfrom_mem(MPU_ADDR, reg, n)
def _twos_comp16(msb, lsb):
v = (msb << 8) | lsb
if v & 0x8000:
v -= 0x10000
return v
def mpu_init():
# Despertar (clear sleep)
_i2c_write(REG_PWR_MGMT_1, 0x00) # clock interno, sleep=0
time.sleep_ms(50)
# Filtro DLPF (CONFIG): BW accel ~44-94 Hz según valor; usamos 0x03 (≈44 Hz gyro)
_i2c_write(REG_CONFIG, 0x03)
# Rango giroscopio: ±250 dps
_i2c_write(REG_GYRO_CFG, 0x00)
# Rango acelerómetro: ±2g
_i2c_write(REG_ACCEL_CFG, 0x00)
# Divisor de muestreo interno (SMPLRT_DIV). El dispositivo toma 1 kHz base para gyro;
# para accel se sincroniza al DLPF. Ajustaremos vía SR del host (polling) igualmente.
_i2c_write(REG_SMPLRT_DIV, 0x00)
# Probar lectura
try:
raw = _i2c_read(REG_ACCEL_XOUT_H, 6)
_ = _twos_comp16(raw[0], raw[1])
return True
except Exception:
return False
def mpu_read_acc_mg():
# Lee 6 bytes: ax, ay, az (cada uno 16 bits)
raw = _i2c_read(REG_ACCEL_XOUT_H, 6)
ax = _twos_comp16(raw[0], raw[1])
ay = _twos_comp16(raw[2], raw[3])
az = _twos_comp16(raw[4], raw[5])
# Convertir a mg (entero) para texto compacto
ax_mg = int((ax / ACC_LSB_PER_G) * 1000)
ay_mg = int((ay / ACC_LSB_PER_G) * 1000)
az_mg = int((az / ACC_LSB_PER_G) * 1000)
return ax_mg, ay_mg, az_mg
# ---------- Utilidades generales ----------
def set_pwm_freq(hz):
hz = max(50, min(200000, int(hz)))
pwm.freq(hz)
state["pwm_freq"] = hz
return hz
def set_pwm_duty(percent):
percent = max(0, min(100, int(percent)))
duty_u16 = int(percent * 65535 // 100)
pwm.duty_u16(duty_u16)
state["pwm_duty"] = percent
return percent
def set_sr(hz):
# 50..1000 Hz razonable para I2C @400k + prints
hz = max(50, min(1000, int(hz)))
state["sr_hz"] = hz
return hz
def info():
return "INFO,pwm_freq={},pwm_duty={},sr_hz={},stream={},mpu_ok={}\n".format(
state["pwm_freq"], state["pwm_duty"], state["sr_hz"],
int(state["stream"]), int(state["mpu_ok"])
)
def read_command(nonblock=True):
if nonblock:
poll = uselect.poll()
poll.register(sys.stdin, uselect.POLLIN)
if poll.poll(0):
return sys.stdin.readline()
return None
else:
return sys.stdin.readline()
def handle_command(line):
try:
s = line.strip()
if not s:
return
parts = s.split()
cmd = parts[0].upper()
if cmd == "PWM" and len(parts) >= 2:
val = set_pwm_duty(float(parts[1]))
sys.stdout.write("OK PWM {}\n".format(val))
elif cmd == "PFREQ" and len(parts) >= 2:
val = set_pwm_freq(float(parts[1]))
sys.stdout.write("OK PFREQ {}\n".format(val))
elif cmd == "SR" and len(parts) >= 2:
val = set_sr(float(parts[1]))
sys.stdout.write("OK SR {}\n".format(val))
elif cmd == "START":
if not state["mpu_ok"]:
# Intentar reinicializar por si falló al arranque
state["mpu_ok"] = mpu_init()
state["stream"] = True and state["mpu_ok"]
led.value(1 if state["stream"] else 0)
sys.stdout.write("OK START\n" if state["stream"] else "ERR MPU\n")
elif cmd == "STOP":
state["stream"] = False
led.off()
sys.stdout.write("OK STOP\n")
elif cmd == "INFO":
sys.stdout.write(info())
else:
sys.stdout.write("ERR CMD\n")
except Exception as e:
sys.stdout.write("ERR {}\n".format(e))
# ---------- Inicio ----------
state["mpu_ok"] = mpu_init()
sys.stdout.write(info())
# Timing del muestreo
last_us = time.ticks_us()
# ---------- Bucle principal ----------
while True:
# Comandos
line = read_command(nonblock=True)
if line is not None:
handle_command(line)
if state["stream"]:
period_us = int(1_000_000 // state["sr_hz"])
now = time.ticks_us()
if time.ticks_diff(now, last_us) >= period_us:
last_us = time.ticks_add(last_us, period_us)
try:
ax_mg, ay_mg, az_mg = mpu_read_acc_mg()
t_ms = time.ticks_ms() & 0xFFFFFFFF
# Salida CSV compacta
# Ejemplo: ACC,123456,12,-34,1012
sys.stdout.write("ACC,{},{},{},{}\n".format(t_ms, ax_mg, ay_mg, az_mg))
except Exception as e:
# Un error de I2C no debe matar el bucle; reportar y seguir
sys.stdout.write("ERR I2C {}\n".format(e))
# Pequeña pausa para no saturar si hay falla persistente
time.sleep_ms(2)
else:
time.sleep_ms(10)