import network
from machine import Pin
import time
from umqtt.simple import MQTTClient
# Wi-Fi 配置
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# 巴法云MQTT服务器地址和端口
SERVER = "bemfa.com"
PORT = 9501
# 你的巴法云私钥
CLIENT_ID = "67e997a13fc144b1ab966f5168835b5b"
# 外轴电机状态主题
TOPIC_EXTERNAL = "external"
# 内轴电机状态主题
TOPIC_INTERNAL = "internal"
# 设备在线状态主题
TOPIC_STATUS = "status"
# 定义外轴电机引脚
A1_neg = Pin(2, Pin.OUT) # 连接到ESP32 GPIO2
A1_pos = Pin(16, Pin.OUT) # 连接到ESP32 GPIO16
B1_pos = Pin(4, Pin.OUT) # 连接到ESP32 GPIO4
# 定义内轴电机引脚
A2_neg = Pin(5, Pin.OUT) # 连接到ESP32 GPIO5
A2_pos = Pin(0, Pin.OUT) # 连接到ESP32 GPIO0 (注意: GPIO0可能与启动模式冲突)
B2_pos = Pin(15, Pin.OUT) # 连接到ESP32 GPIO15
# 定义步进电机步数和延迟时间
STEPS = 200 # 步进电机每圈步数
DELAY = 0.005 # 步进延迟,控制速度
# 角度发布间隔 (步数)
ANGLE_PUBLISH_INTERVAL = 10 # 每10步发布一次角度
# 全局MQTT客户端
mqtt_client = None
def step_external_motor(step):
"""控制外轴电机步进"""
if step == 0:
A1_neg.value(0)
A1_pos.value(1)
B1_pos.value(0)
elif step == 1:
A1_neg.value(0)
A1_pos.value(0)
B1_pos.value(1)
elif step == 2:
A1_neg.value(1)
A1_pos.value(0)
B1_pos.value(0)
elif step == 3:
A1_neg.value(0)
A1_pos.value(0)
B1_pos.value(0)
def step_internal_motor(step):
"""控制内轴电机步进"""
if step == 0:
A2_neg.value(0)
A2_pos.value(1)
B2_pos.value(0)
elif step == 1:
A2_neg.value(0)
A2_pos.value(0)
B2_pos.value(1)
elif step == 2:
A2_neg.value(1)
A2_pos.value(0)
B2_pos.value(0)
elif step == 3:
A2_neg.value(0)
A2_pos.value(0)
B2_pos.value(0)
def rotate_external(degrees, clockwise=True):
"""外轴电机旋转指定角度,实时发布角度"""
steps = int(degrees * STEPS / 360)
direction = 1 if clockwise else -1
current_step = 0
status = f"外轴电机开始旋转 {degrees} 度,方向: {'顺时针' if clockwise else '逆时针'}"
print(status)
publish_status(TOPIC_EXTERNAL, status)
# 计算每步对应的角度
angle_per_step = 360.0 / STEPS
for i in range(steps):
step_external_motor(current_step)
current_step = (current_step + direction) % 4
# 计算当前角度
current_angle = i * angle_per_step * (1 if clockwise else -1)
# 每间隔一定步数发布一次角度
if i % ANGLE_PUBLISH_INTERVAL == 0 or i == steps - 1:
angle_status = f"外轴当前角度: {current_angle:.1f}°"
print(angle_status)
publish_status(TOPIC_EXTERNAL, angle_status)
time.sleep(DELAY)
# 确保电机停止
step_external_motor(3)
status = f"最终角度: {current_angle:.1f}°"
print(status)
publish_status(TOPIC_EXTERNAL, status)
def rotate_internal(degrees, clockwise=True):
"""内轴电机旋转指定角度,实时发布角度"""
steps = int(degrees * STEPS / 360)
direction = 1 if clockwise else -1
current_step = 0
status = f"内轴电机开始旋转 {degrees} 度,方向: {'顺时针' if clockwise else '逆时针'}"
print(status)
publish_status(TOPIC_INTERNAL, status)
# 计算每步对应的角度
angle_per_step = 360.0 / STEPS
for i in range(steps):
step_internal_motor(current_step)
current_step = (current_step + direction) % 4
# 计算当前角度
current_angle = i * angle_per_step * (1 if clockwise else -1)
# 每间隔一定步数发布一次角度
if i % ANGLE_PUBLISH_INTERVAL == 0 or i == steps - 1:
angle_status = f"内轴当前角度: {current_angle:.1f}°"
print(angle_status)
publish_status(TOPIC_INTERNAL, angle_status)
time.sleep(DELAY)
# 确保电机停止
step_internal_motor(3)
status = f"最终角度: {current_angle:.1f}°"
print(status)
publish_status(TOPIC_INTERNAL, status)
def process_command(command):
"""处理命令"""
try:
parts = command.strip().split()
if len(parts) != 3:
print("错误: 命令格式不正确")
return
motor = parts[0].upper()
angle = float(parts[1])
direction = parts[2].upper()
if angle < 0:
print("错误: 角度必须为正数")
return
clockwise = direction == "CW"
if motor == "E": # 外轴电机
rotate_external(angle, clockwise)
elif motor == "I": # 内轴电机
rotate_internal(angle, clockwise)
else:
print("错误: 未知电机类型,使用 'E' 或 'I'")
except Exception as e:
print(f"错误: 处理命令时发生异常 - {str(e)}")
def connect_wifi():
sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected():
print('正在连接Wi-Fi...')
sta_if.active(True)
sta_if.connect(WIFI_SSID, WIFI_PASSWORD)
while not sta_if.isconnected():
time.sleep(1)
print('Wi-Fi 连接成功', sta_if.ifconfig())
return sta_if
def connect_mqtt():
"""连接MQTT服务器并保持连接"""
global mqtt_client
try:
if mqtt_client is None:
mqtt_client = MQTTClient(CLIENT_ID, SERVER, PORT)
mqtt_client.connect()
# 发布在线状态
publish_status(TOPIC_STATUS, "online")
print("MQTT连接成功")
return mqtt_client
except Exception as e:
print(f"MQTT连接失败: {e}")
mqtt_client = None
return None
def publish_status(topic, message):
"""发布状态信息到MQTT服务器"""
global mqtt_client
try:
# 确保MQTT连接
if mqtt_client is None:
connect_mqtt()
if mqtt_client:
# 编码消息为UTF-8
encoded_message = message.encode('utf-8')
mqtt_client.publish(topic, encoded_message)
else:
print("无法发布状态: MQTT未连接")
except Exception as e:
print(f"发布状态失败: {e}")
mqtt_client = None
def disconnect_mqtt():
"""断开MQTT连接并发布离线状态"""
global mqtt_client
try:
if mqtt_client:
publish_status(TOPIC_STATUS, "offline")
mqtt_client.disconnect()
mqtt_client = None
print("MQTT连接已断开")
except Exception as e:
print(f"断开MQTT连接失败: {e}")
# 主程序
if __name__ == "__main__":
try:
connect_wifi()
# 连接MQTT服务器
connect_mqtt()
# 初始化电机
step_external_motor(3)
step_internal_motor(3)
print("\r\n=== 双轴步进电机控制器 ===\r\n")
print("命令格式: [MOTOR] [ANGLE] [DIRECTION]\r\n")
print("示例: 'E 90 CW' 或 'I 45 CCW'\r\n")
print("MOTOR: E=外轴, I=内轴\r\n")
print("DIRECTION: CW=顺时针, CCW=逆时针\r\n")
print("==============================\r\n")
while True:
command = input("请输入命令: ")
print(f"收到命令: {command}")
process_command(command)
except KeyboardInterrupt:
print("程序被用户中断")
finally:
# 确保电机停止
step_external_motor(3)
step_internal_motor(3)
# 断开MQTT连接
disconnect_mqtt()
print("电机已停止")