from mpu6050 import MPU6050
from machine import Pin,I2C
import time
import math
import network
import ujson
import random
from umqtt import MQTTClient
window_size = None
step = None
CLIENT_ID = "xjdd" + str(random.getrandbits(16))
TOPIC_TEMP = "wokwi-class/xjdd/sensor/data"
MQTT_BROKER = "broker.emqx.io"
SAMPLING_RATE = 10 # MPU6050采样频率100Hz
# WiFi配置(Wokwi模拟网络)
SSID = "Wokwi-GUEST"
PASSWORD = ""
# 初始化WiFi
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
# 连接WiFi
print("Connecting to WiFi...")
wifi.connect(SSID, PASSWORD)
# 等待连接
while not wifi.isconnected():
time.sleep(1)
print("Still connecting...")
# 连接成功
print("WiFi Connected!")
print("IP Address:", wifi.ifconfig()[0])
timeout = 10
while not wifi.isconnected() and timeout > 0:
time.sleep(1)
timeout -= 1
print("wifi",wifi.status())
client = MQTTClient(CLIENT_ID, MQTT_BROKER)
client.connect()
print("Publisher Ready!")
# I2C初始化
i2c=I2C(
0,
scl=Pin(22),
sda=Pin(23)
)
mpu=MPU6050(i2c)
# 按钮
button=Pin(
16,
Pin.IN,
Pin.PULL_UP
)
running = False
last = 1
base_pitch = 0
base_roll = 0
def get_motion():
acc = mpu.get_accel()
gyro = mpu.get_gyro()
ax = acc['x']
ay = acc['y']
az = acc['z']
gx = gyro['x']
gy = gyro['y']
gz = gyro['z']
return ax, ay, az, gx, gy, gz
def get_angle():
acc = mpu.get_accel()
ax = acc['x']
ay = acc['y']
az = acc['z']
roll = math.degrees(math.atan2(ay, az))
pitch = math.degrees(
math.atan2(-ax, math.sqrt(ay * ay + az * az))
)
return pitch, roll
def get_base_angle(n=30):
p_sum = 0
r_sum = 0
for _ in range(n):
p, r = get_angle()
p_sum += p
r_sum += r
time.sleep_ms(10)
return p_sum / n, r_sum / n
def angle_diff(a, b):
d = a - b
if d > 180:
d -= 360
elif d < -180:
d += 360
return d
data_list = []
label = []
while True:
current = button.value()
# 按键触发
if last == 1 and current == 0:
window_start = time.ticks_ms()
print("按下时间:", window_start)
while button.value() == 0: # 等待松开,防止重复记录
pass
time.sleep_ms(30)
if not running:
base_pitch, base_roll = get_base_angle()
running = True
print("开始训练")
print("初始:", base_pitch, base_roll)
else:
running = False
print("结束训练")
while button.value() == 0:
pass
last = current
if running:
ax,ay,az,gx,gy,gz = get_motion()
data = [ax,ay,az,gx,gy,gz]
data_list.append(data)
pitch, roll = get_angle()
relative_pitch = angle_diff(pitch, base_pitch)
relative_roll = angle_diff(roll, base_roll)
if len(data_list)>= 50:
msg = ujson.dumps(data_list)
client.publish(TOPIC_TEMP, msg)
print("发送完成")
print(len(data_list),len(data_list[0]))
data_list = []
time.sleep_ms(SAMPLING_RATE)