# === ADC -> FFT -> OLED (SSD1306) ===
# Compatible con: ESP32 / Pi Pico W (RP2040)
# - En RP2040 usa muestreo por espera activa (ticks_us) para precisión de fs
# - FFT con 'ulab' (N=1024) o DFT de respaldo (N=256)
# - Grafica señal (arriba) y espectro en dB (abajo)
# - Muestra picos principales por consola
import sys, math, time, micropython
from machine import Pin, ADC, I2C
# ===== Detección de plataforma =====
PLAT = sys.platform # 'esp32', 'rp2', 'rp2350', etc.
IS_ESP32 = (PLAT == 'esp32')
IS_RP = ('rp2' in PLAT) or ('rp2350' in PLAT)
# ===== Parámetros =====
FS_HZ = 4000 # Frecuencia de muestreo deseada (Hz)
N_ULAB = 1024 # Tamaño FFT con ulab
N_FALLBACK = 256 # Tamaño DFT sin ulab
PEAKS_TO_SHOW = 5 # Número de picos a listar en consola
DB_DYN = 60.0 # Rango dinámico espectro (dB)
VREF = 3.3 # Referencia ADC (aprox. 3.3V RP2040)
# Pines por defecto
ADC_PIN_ESP32 = 34 # GPIO34 (ADC1_CH6)
ADC_PIN_RP = 26 # GP26 (ADC0)
I2C_ID = 0
I2C_SDA_ESP32 = 21
I2C_SCL_ESP32 = 22
I2C_SDA_RP = 4 # Pi Pico W -> GP4 SDA
I2C_SCL_RP = 5 # Pi Pico W -> GP5 SCL
OLED_W, OLED_H = 128, 64
OLED_ADDR = 0x3C
# ===== ulab opcional =====
HAVE_ULAB = False
try:
from ulab import numpy as np
HAVE_ULAB = True
except Exception:
HAVE_ULAB = False
N = N_ULAB if HAVE_ULAB else N_FALLBACK
print("Plataforma:", PLAT, "| FS objetivo:", FS_HZ, "Hz | N:", N, "| ulab:", HAVE_ULAB)
# ===== OLED =====
from ssd1306 import SSD1306_I2C
if IS_ESP32:
i2c = I2C(I2C_ID, scl=Pin(I2C_SCL_ESP32), sda=Pin(I2C_SDA_ESP32), freq=400_000)
else:
i2c = I2C(I2C_ID, scl=Pin(I2C_SCL_RP), sda=Pin(I2C_SDA_RP), freq=400_000)
oled = SSD1306_I2C(OLED_W, OLED_H, i2c, addr=OLED_ADDR)
# ===== ADC =====
if IS_ESP32:
from machine import Timer
adc = ADC(Pin(ADC_PIN_ESP32))
if hasattr(adc, "atten"):
adc.atten(ADC.ATTN_11DB) # ~0..3.6V
if hasattr(adc, "width"):
adc.width(ADC.WIDTH_12BIT) # 0..4095
def adc_read12():
return adc.read() # 0..4095
else:
adc = ADC(Pin(ADC_PIN_RP)) # GP26 -> ADC0
# read_u16() devuelve 0..65535; normalizamos a ~12 bits (0..4095)
def adc_read12():
return (adc.read_u16() >> 4)
# ===== Buffers =====
samples = [0] * N
# ===== Muestreo =====
# ESP32: puede usarse Timer periódico + ISR
# RP2040: mejor usar espera activa (ticks_us) para microsegundos
if IS_ESP32:
from machine import Timer
idx = 0
full = False
tim = Timer(0)
micropython.alloc_emergency_exception_buf(200)
def isr_sample(_t):
global idx, full
if full:
return
samples[idx] = adc_read12()
idx += 1
if idx >= N:
full = True
def capture_frame(fs_hz):
"""Captura N muestras usando Timer ISR (ESP32). Devuelve (lista, fs_efectiva)."""
global idx, full
idx = 0
full = False
# intentar con freq, si no, caer a periodo ms (peor precisión)
try:
tim.init(freq=fs_hz, mode=Timer.PERIODIC, callback=isr_sample)
except Exception:
period_ms = max(1, int(1000 // fs_hz))
tim.init(period=period_ms, mode=Timer.PERIODIC, callback=isr_sample)
t0 = time.ticks_us()
while not full:
time.sleep_ms(1)
t1 = time.ticks_us()
tim.deinit()
elapsed_s = (time.ticks_diff(t1, t0)) / 1_000_000
fs_eff = N / elapsed_s if elapsed_s > 0 else fs_hz
return samples[:], fs_eff
else:
# RP2040: muestreo por espera activa a microsegundos
def capture_frame(fs_hz):
"""Captura N muestras con temporización por ticks_us. Devuelve (lista, fs_efectiva)."""
period_us = max(1, int(1_000_000 // fs_hz))
samps = samples # reutiliza buffer
ticks = time.ticks_us
diff = time.ticks_diff
t_next = ticks()
t0 = t_next
for i in range(N):
# esperar el instante exacto
while diff(ticks(), t_next) < 0:
pass
samps[i] = adc_read12()
t_next += period_us
t1 = ticks()
elapsed_s = (diff(t1, t0)) / 1_000_000
fs_eff = N / elapsed_s if elapsed_s > 0 else fs_hz
return samps[:], fs_eff
# ===== DSP =====
def hann_window(n, N_):
return 0.5 - 0.5 * math.cos(2*math.pi*n/(N_-1))
def compute_fft_ulab(samps, fs):
x = np.array(samps, dtype=np.float)
# Convertir a voltios aprox para escala (opcional)
# 12 bits -> 0..4095 mapea a 0..VREF
x = (x * VREF) / 4095.0
x = x - np.mean(x)
n = np.arange(N)
w = 0.5 - 0.5*np.cos(2*np.pi*n/(N-1))
xw = x * w
X = np.fft.fft(xw)
# Semiespectro
Xh = X[:N//2]
mag = np.sqrt(Xh.real*Xh.real + Xh.imag*Xh.imag)
# Normalización aproximada (ventana)
mag = (2.0 / np.sum(w)) * mag
freqs = (np.arange(N//2) * fs) / N
return freqs, mag
def compute_dft_fallback(samps, fs):
Nloc = len(samps)
mean = sum(samps)/Nloc
w = [hann_window(n, Nloc) for n in range(Nloc)]
# a voltios aprox
xw = [((samps[n]-mean) * w[n]) * (VREF/4095.0) for n in range(Nloc)]
half = Nloc//2
mag = [0.0]*half
for k in range(half):
re = 0.0; im = 0.0
angk = -2*math.pi*k/Nloc
for n in range(Nloc):
ang = angk*n
c = math.cos(ang); s = math.sin(ang)
re += xw[n]*c; im += xw[n]*s
mag[k] = math.sqrt(re*re + im*im) * (2.0/sum(w))
freqs = [k*fs/Nloc for k in range(half)]
return freqs, mag
def top_peaks(freqs, mag, n_peaks=5, fmin=1.0, fmax=None):
if not mag: return []
if fmax is None: fmax = freqs[-1]
idxs = [i for i,f in enumerate(freqs) if i>0 and f>=fmin and f<=fmax]
idxs.sort(key=lambda i: mag[i], reverse=True)
return [(freqs[i], mag[i], i) for i in idxs[:n_peaks]]
# ===== Gráficos en OLED =====
def draw_waveform(oled, x0, y0, w, h, samps):
"""Dibuja señal temporal en rectángulo (x0,y0,w,h)."""
oled.fill_rect(x0, y0, w, h, 0)
Nsrc = len(samps)
if Nsrc < 2: return
step = (Nsrc-1) / max(1, (w-1))
vmin = min(samps); vmax = max(samps)
if vmax == vmin: vmax = vmin + 1
def mapy(v):
return y0 + (h-1) - int((v - vmin) * (h-1) / (vmax - vmin))
x_prev = x0
y_prev = mapy(samps[0])
for xi in range(1, w):
idxf = int(xi * step)
if idxf >= Nsrc: idxf = Nsrc-1
y = mapy(samps[idxf])
oled.line(x_prev, y_prev, x0 + xi, y, 1)
x_prev, y_prev = x0 + xi, y
def draw_spectrum(oled, x0, y0, w, h, freqs, mag, dyn_db=60.0):
"""Dibuja espectro (dB) en rectángulo (x0,y0,w,h)."""
oled.fill_rect(x0, y0, w, h, 0)
if not mag: return
mmax = max(mag)
if mmax <= 0: mmax = 1e-12
def mapy(m):
db = 20*math.log10(m/mmax + 1e-12)
if db < -dyn_db: db = -dyn_db
return y0 + (h-1) - int((db + dyn_db) * (h-1) / dyn_db)
half = len(mag)
for xi in range(w):
k = int(xi * (half-1) / max(1, (w-1)))
y = mapy(mag[k])
oled.vline(x0 + xi, y, y0 + h - y, 1)
def banner(oled, txt, y=0):
oled.fill_rect(0, y, OLED_W, 8, 0)
oled.text(txt, 0, y, 1)
# ===== Main =====
print("Capturando y graficando… Ctrl+C para salir.")
banner(oled, "ADC->FFT->OLED", 0); oled.show()
try:
while True:
# 1) Captura
samps, fs_eff = capture_frame(FS_HZ)
# 2) FFT / DFT
if HAVE_ULAB:
freqs, mag = compute_fft_ulab(samps, fs_eff)
else:
print("(Sin ulab) DFT de respaldo; puede tardar…")
freqs, mag = compute_dft_fallback(samps, fs_eff)
# 3) Picos por consola
peaks = top_peaks(freqs, mag, n_peaks=PEAKS_TO_SHOW, fmin=1.0)
print("\n=== FFT === FS_ef:", int(fs_eff), "Hz | N:", N, "| Resol:", round(fs_eff/N,2), "Hz/bin")
if peaks:
for f, m, k in peaks:
mmax = max(mag) if mag else 1.0
rel_db = 20*math.log10((m/(mmax+1e-12))+1e-12)
print("Pico: {:8.2f} Hz | bin {:4d} | {:6.2f} dB".format(f, k, rel_db))
f0 = peaks[0][0]
else:
print("Sin picos notables.")
f0 = 0.0
# 4) Dibujar en OLED
oled.fill(0)
banner(oled, "FS:{:.0f} N:{} f0:{:.0f}Hz".format(fs_eff, N, f0), 0)
# Señal temporal (arriba, 22 px)
draw_waveform(oled, x0=0, y0=10, w=OLED_W, h=22, samps=samps)
# Espectro (abajo, 28 px)
draw_spectrum(oled, x0=0, y0=36, w=OLED_W, h=28, freqs=freqs, mag=mag, dyn_db=DB_DYN)
oled.show()
time.sleep_ms(20)
except KeyboardInterrupt:
banner(oled, "Detenido", 0); oled.show()
print("\nDetenido por usuario.")