"""
This program detects potential earthquakes using an MPU6050 accelerometer
connected to a Raspberry Pi Pico W. The accelerometer data is read and mapped
to a specific range (-4 to 4), and the resultant acceleration is calculated.
If the resultant exceeds a predefined threshold (4), an emergency alert is sent
to a Blynk application. The system connects to WiFi for real-time communication
with Blynk, enabling live monitoring of the accelerometer readings and alerts.
"""
import machine
import time
from machine import I2C, Pin
import network
import BlynkLib
# Initialize I2C for MPU6050 (using default I2C pins for Raspberry Pi Pico)
i2c = I2C(0, scl=Pin(17), sda=Pin(16))
# Connecting to the WiFi
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect("Wokwi-GUEST", "")
max_wait = 10
print('Waiting for connection.')
while max_wait > 0:
if wlan.status() < 0 or wlan.status() >= 3:
break
max_wait -= 1
time.sleep(1)
if wlan.status() != 3: # Handling connection error
raise RuntimeError('network connection failed')
else:
print('connected')
status = wlan.ifconfig()
# Initializing Blynk
BLYNK_AUTH = "AM8A-pVFFJmJFq5hQgA6k0PlaxTWyRZl"
blynk = BlynkLib.Blynk(BLYNK_AUTH) # Correctly initializing Blynk
# Virtual Pins
@blynk.on("V0")
@blynk.on("V1")
@blynk.on("V2")
@blynk.on("V3")
def write_to_blynk():
global mapped_x, mapped_y, mapped_z
# Send mapped accelerometer values to Blynk
blynk.virtual_write(0, mapped_x)
blynk.virtual_write(1, mapped_y)
blynk.virtual_write(2, mapped_z)
blynk.virtual_write(3, ((mapped_x**2 + mapped_y**2 + mapped_z**2)**0.5)) # Resultant value
if (((mapped_x**2 + mapped_y**2 + mapped_z**2)**0.5 ) > 4):
blynk.log_event("emergency_alert", "Earthquake detected! Magnitude exceeds safe limits.") # Trigger SMS/notification
# MPU6050 class definition
class MPU6050:
def __init__(self, i2c):
self.i2c = i2c
self.addr = 0x68
self.i2c.writeto_mem(self.addr, 0x6B, b'\x00') # Wake up MPU6050
def get_accel(self):
accel_data = self.i2c.readfrom_mem(self.addr, 0x3B, 6)
x = int.from_bytes(accel_data[0:2], 'big') if accel_data[0] < 0x80 else int.from_bytes(accel_data[0:2], 'big') - 65536
y = int.from_bytes(accel_data[2:4], 'big') if accel_data[2] < 0x80 else int.from_bytes(accel_data[2:4], 'big') - 65536
z = int.from_bytes(accel_data[4:6], 'big') if accel_data[4] < 0x80 else int.from_bytes(accel_data[4:6], 'big') - 65536
return {'x': x, 'y': y, 'z': z}
# Map function to map the raw data to a smaller range
def map_value(value, in_min, in_max, out_min, out_max):
return (value - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
# Main Loop to continuously send mapped values to Blynk
mpu = MPU6050(i2c)
while True:
accel = mpu.get_accel()
x = accel['x']
y = accel['y']
z = accel['z']
# Map the values from the range -32768 to 32767 to -1 to 1 (or any other desired range)
mapped_x = map_value(x, -32768, 32767, -4, 4)
mapped_y = map_value(y, -32768, 32767, -4, 4)
mapped_z = map_value(z, -32768, 32767, -4, 4)
# Call the function to send data to Blynk
write_to_blynk()
# Run Blynk communication loop
blynk.run()
# Debug: Print mapped values
print(f"Accelerometer Values: x = {mapped_x:.2f}, y = {mapped_y:.2f}, z = {mapped_z:.2f}")
print(f"Resultant = {(mapped_x**2 + mapped_y**2 + mapped_z**2)**0.5:.2f}")
time.sleep(1) # Adjust the delay as needed