# import machine
# import utime
# from machine import ADC, Pin, lightsleep
# # Configure ADC and pins
# adc = ADC(Pin(26))
# led = Pin("LED", Pin.OUT)
# # Threshold for impact detection (adjust based on testing)
# IMPACT_THRESHOLD = 80
# def calculate_bias():
# """Calculate the bias (DC offset) of the microphone signal"""
# # Take a quicker bias reading to reduce blocking time
# bias = int(sum([adc.read_u16() >> 4 for _ in range(100)]) / 100)
# print(f"Bias: {bias}")
# return bias
# def detect_impact(bias):
# """Detect if an impact has occurred based on signal variance"""
# # Sample the signal multiple times to detect sudden changes
# samples = []
# for _ in range(50): # Reduced samples for faster detection
# raw = adc.read_u16() >> 4
# diff = abs(raw - bias)
# samples.append(diff)
# utime.sleep_us(50) # Small delay between samples
# # Calculate average deviation from bias
# avg_deviation = sum(samples) / len(samples)
# # If average deviation exceeds threshold, we have an impact
# return avg_deviation > IMPACT_THRESHOLD
# def record_audio(bias):
# """Record 4 seconds of audio after impact detection"""
# filename = f"impact_{utime.ticks_ms()}.wav"
# print(f"Recording impact to {filename}")
# f = open(filename, 'wb')
# # Write WAV header (44 bytes)
# f.write(b'RIFF\xFF\xFF\xFF\xFFWAVEfmt \x10\x00\x00\x00\x01\x00\x01\x00\x80\x3E\x00\x00\x10\xf3\x00\x00\x02\x00\x10\x00data\xFF\xFF\xFF\xFF')
# print("Recording 4s of audio...")
# led.on() # Turn on LED during recording
# # Record for 4 seconds instead of 2 (64000 samples instead of 32000)
# for i in range(64000): # 4s @16kHz
# raw = adc.read_u16() >> 4
# diff = raw - bias
# val = diff * 256 # 256x gain
# if val > 32767: val = 32767
# elif val < -32768: val = -32768
# lo = val & 255
# hi = (val >> 8) & 255
# f.write(bytes([lo, hi]))
# # Periodically flush the buffer to ensure data is written
# if i % 1000 == 0:
# f.flush()
# f.close()
# led.off() # Turn off LED after recording
# print(f"✓ Recording saved to {filename}")
# print("Recording completed!")
# def main():
# """Main loop - sleep until impact detected"""
# print("Impact detection system starting...")
# # Calculate initial bias
# bias = calculate_bias()
# print("Entering sleep mode. System will wake on impact.")
# print("Press Ctrl+C to exit.")
# try:
# while True:
# # Put the Pico W into light sleep mode
# lightsleep(100) # Sleep for 100ms
# # Wake up and check for impact
# if detect_impact(bias):
# print("Impact detected!")
# record_audio(bias)
# # Quick bias recalculation (doesn't block as long)
# print("Quick recalibration...")
# bias = calculate_bias()
# print("Ready for next impact!")
# except KeyboardInterrupt:
# print("\nExiting program...")
# if __name__ == "__main__":
# main()
import struct
import machine
import utime
import network
import urequests
import gc
import os
from machine import ADC, Pin, lightsleep
# ========= CONFIG =========
WIFI_SSID = "SEU_WIFI"
WIFI_PASS = "SUA_SENHA"
DEVICE_ID = "pico-rack-01"
API_ENDPOINT = "https://noise-monitor-api.onrender.com/api/pico-alert"
ADC_PIN = 26
SAMPLE_RATE = 8000 # ↓ reduz RAM
RECORD_SECONDS = 1 # ↓ 1 segundo
IMPACT_THRESHOLD = 80
adc = ADC(Pin(ADC_PIN))
led = Pin("LED", Pin.OUT)
# ========= WIFI =========
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print("[WIFI] Connecting...")
wlan.connect(WIFI_SSID, WIFI_PASS)
for _ in range(15):
if wlan.isconnected():
break
utime.sleep(1)
if wlan.isconnected():
print("[WIFI] OK:", wlan.ifconfig())
return wlan
print("[WIFI] FAIL")
return None
def disconnect_wifi(wlan):
if wlan and wlan.isconnected():
wlan.disconnect()
wlan.active(False)
# ========= AUDIO =========
def calculate_bias():
bias = int(sum([adc.read_u16() >> 4 for _ in range(80)]) / 80)
print("[AUDIO] Bias:", bias)
return bias
def detect_impact(bias):
total = 0
for _ in range(30):
raw = adc.read_u16() >> 4
total += abs(raw - bias)
utime.sleep_us(50)
avg = total / 30
return avg > IMPACT_THRESHOLD
def record_audio(bias):
filename = f"impact_{utime.ticks_ms()}.wav"
total_samples = SAMPLE_RATE * RECORD_SECONDS
print("[AUDIO] Recording:", filename)
led.on()
with open(filename, "wb") as f:
# WAV HEADER
f.write(b"RIFF")
f.write(b"\x00\x00\x00\x00")
f.write(b"WAVEfmt ")
f.write(struct.pack(
"<IHHIIHH",
16,
1,
1,
SAMPLE_RATE,
SAMPLE_RATE * 2,
2,
16
))
f.write(b"data")
f.write(b"\x00\x00\x00\x00")
# GRAVAÇÃO REAL
for i in range(total_samples):
raw = adc.read_u16() >> 4
val = (raw - bias) * 128
if val > 32767:
val = 32767
elif val < -32768:
val = -32768
f.write(struct.pack("<h", val)) # ✅ CORRIGIDO
if i % 500 == 0:
gc.collect()
utime.sleep_us(int(1_000_000 / SAMPLE_RATE))
# Corrigir header WAV
size = f.tell()
data_size = size - 44
f.seek(4)
f.write(struct.pack("<I", size - 8)) # ✅ CORRIGIDO
f.seek(40)
f.write(struct.pack("<I", data_size)) # ✅ CORRIGIDO
led.off()
print("[AUDIO] Done")
return filename
# ========= UPLOAD =========
def upload_audio(filename):
wlan = connect_wifi()
if not wlan:
return
try:
gc.collect()
with open(filename, "rb") as f:
audio_bytes = f.read()
print("[UPLOAD] Size:", len(audio_bytes))
response = urequests.post(
API_ENDPOINT,
data=audio_bytes,
headers={
"Content-Type": "audio/wav",
"X-Device-ID": DEVICE_ID
}
)
print("[UPLOAD] Status:", response.status_code)
response.close()
except Exception as e:
print("[UPLOAD ERROR]", e)
finally:
disconnect_wifi(wlan)
try:
os.remove(filename)
except:
pass
# ========= MAIN =========
def main():
print("=== Impact Monitor Started ===")
bias = calculate_bias()
while True:
lightsleep(150)
if detect_impact(bias):
print("[EVENT] Impact!")
file = record_audio(bias)
upload_audio(file)
bias = calculate_bias()
print("[READY]")
main()