from machine import Pin, PWM, Timer, ADC
from umqtt.simple import MQTTClient
import ujson
import os
import sys
# Beebotte MQTT Configuration
CHANNEL_TOKEN = 'token_OlQfIeO6MS4QByUz'
CHANNEL_NAME = 'ESP32_MQTT_4'
RESOURCE_NAME1 = 'led_red'
RESOURCE_NAME2 = 'led_yellow'
RESOURCE_NAME3 = 'led_green'
RESOURCE_NAME4 = 'servo'
RESOURCE_NAME5 = 'light_sensor'
MQTT_SERVER = 'mqtt.beebotte.com'
MQTT_USER = 'token:' + CHANNEL_TOKEN
MQTT_TOPIC1 = CHANNEL_NAME + '/' + RESOURCE_NAME1
MQTT_TOPIC2 = CHANNEL_NAME + '/' + RESOURCE_NAME2
MQTT_TOPIC3 = CHANNEL_NAME + '/' + RESOURCE_NAME3
MQTT_TOPIC4 = CHANNEL_NAME + '/' + RESOURCE_NAME4
MQTT_TOPIC5 = CHANNEL_NAME + '/' + RESOURCE_NAME5
# Hardware Setup
led_red_pin = Pin(5, Pin.OUT)
led_yellow_pin = Pin(4, Pin.OUT)
led_green_pin = Pin(2, Pin.OUT)
servo_pin = Pin(23, Pin.OUT)
pwm = PWM(servo_pin)
pwm.freq(50)
adc = ADC(Pin(32)) # ADC1 pin 32 .. 39 !!!
adc.atten(ADC.ATTN_11DB) # 0–3.3V range
# Timers
PING_PERIOD = 120
PUBLISH_PERIOD = 10 # in seconds
timer0 = Timer(0) # ping timer
timer1 = Timer(1) # publish timer
msgNumber = 0
# Ping the server periodically
def handleTimer0Int(timer):
try:
client.ping()
print('Ping')
except Exception as e:
print('Ping error:', e)
# Publish light sensor value
def handleTimer1Int(timer):
global msgNumber
try:
msgNumber = adc.read()
msg = ujson.dumps({"data": msgNumber, "write": True})
client.publish(MQTT_TOPIC5, msg, qos=0)
print("Published:", msg)
except Exception as e:
print("Publish error:", e)
# Set servo angle
def set_servo_angle(angle):
if 0 <= angle <= 180 :
try:
duty = int((angle / 180) * 100 + 25) # Simple mapping
pwm.duty(duty)
print(f"Servo moved to {angle}°")
except Exception as e:
print(f"Servo error: {e}")
else :
print(f"Invalid servo angle: {angle}°")
# MQTT Message Callback
def mqtt_callback(topic, msg):
try:
topic = topic.decode('utf-8')
print(f"{topic}: {msg}")
json_data = ujson.loads(msg)
dt = json_data.get("data")
if dt is None:
print("No data")
return
if topic == MQTT_TOPIC1:
led_red_pin.value(1 if dt else 0)
print("RED LED", "ON" if dt else "OFF")
elif topic == MQTT_TOPIC2:
led_yellow_pin.value(1 if dt else 0)
print("YELLOW LED", "ON" if dt else "OFF")
elif topic == MQTT_TOPIC3:
led_green_pin.value(1 if dt else 0)
print("GREEN LED", "ON" if dt else "OFF")
elif topic == MQTT_TOPIC4:
set_servo_angle(dt)
except Exception as e:
print(f"Callback error: {e}")
# MQTT Client Setup
random_num = int.from_bytes(os.urandom(3), 'little')
mqtt_client_id = bytes('client_' + str(random_num), 'utf-8')
client = MQTTClient(mqtt_client_id,MQTT_SERVER,user=MQTT_USER,password='',keepalive=PING_PERIOD * 2)
try:
client.connect()
print("MQTT connected")
timer0.init(period=PING_PERIOD * 1000, mode=Timer.PERIODIC, callback=handleTimer0Int)
timer1.init(period=PUBLISH_PERIOD * 1000, mode=Timer.PERIODIC, callback=handleTimer1Int)
except Exception as e:
print('Could not connect to MQTT server:', type(e).__name__, e)
sys.exit()
# MQTT Subscription
client.set_callback(mqtt_callback)
client.subscribe(MQTT_TOPIC1)
client.subscribe(MQTT_TOPIC2)
client.subscribe(MQTT_TOPIC3)
client.subscribe(MQTT_TOPIC4)
# Main loop
while True:
try:
client.wait_msg()
except KeyboardInterrupt:
print('Ctrl-C pressed, exiting...')
client.disconnect()
pwm.deinit()
sys.exit()
except Exception as e:
print("MQTT Error:", e)