import bluetooth
import random
import struct
import time
from micropython import const
from ble_advertising import advertising_payload
from machine import Pin
# ----------------------------------------------------------
# GPIO2 = LED (relais)
# ----------------------------------------------------------
LED = Pin(2, Pin.OUT)
LED.off()
# ----------------------------------------------------------
# IRQ events
# ----------------------------------------------------------
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_GATTS_INDICATE_DONE = const(20)
# ----------------------------------------------------------
# GATT flags
# ----------------------------------------------------------
_FLAG_READ = const(0x0002)
_FLAG_WRITE = const(0x0008)
_FLAG_NOTIFY = const(0x0010)
_FLAG_INDICATE = const(0x0020)
# ==========================================================
# SERVICE 1 : Environmental Sensing (standard)
# ==========================================================
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
_TEMP_CHAR = (bluetooth.UUID(0x2A6E), _FLAG_READ | _FLAG_NOTIFY | _FLAG_INDICATE)
_HUM_CHAR = (bluetooth.UUID(0x2A6F), _FLAG_READ | _FLAG_NOTIFY | _FLAG_INDICATE)
_ENV_SENSE_SERVICE = (_ENV_SENSE_UUID, (_TEMP_CHAR, _HUM_CHAR,),)
# ==========================================================
# SERVICE 2 : Relais (custom, séparé)
# ==========================================================
_RELAY_SERVICE_UUID = bluetooth.UUID("F5A128FE-227D-4C9E-AD2C-11D0FD6ED640")
_RELAY_CHAR_UUID = bluetooth.UUID("F5A128EE-227D-4C9E-AD2C-11D0FD6ED640")
# ✅ Caractéristique relais : READ + WRITE + INDICATE
_RELAY_CHAR = (_RELAY_CHAR_UUID, _FLAG_READ | _FLAG_WRITE | _FLAG_INDICATE)
_RELAY_SERVICE = (_RELAY_SERVICE_UUID, (_RELAY_CHAR,),)
# ----------------------------------------------------------
# Périodes
# ----------------------------------------------------------
UPDATE_PERIOD = 1
SEND_PERIOD = 10
# ----------------------------------------------------------
# Globals
# ----------------------------------------------------------
ble = bluetooth.BLE()
connections = set()
handle_temp = None
handle_hum = None
handle_relay = None
payload = None
relay_state = b"OFF" # valeur stockée dans la caractéristique relais
# ----------------------------------------------------------
# Advertising
# ----------------------------------------------------------
def start_advertising(interval_us=500000):
print(">> Advertising BLE...")
ble.gap_advertise(interval_us, adv_data=payload)
# ----------------------------------------------------------
# Pack temp/hum (2 octets chacun)
# ----------------------------------------------------------
def pack_temperature(temp_deg_c):
return struct.pack("<h", int(temp_deg_c * 100))
def pack_humidity(hum_percent):
return struct.pack("<H", int(hum_percent * 100))
# ----------------------------------------------------------
# Diffuser l'état relais via INDICATE (ACK)
# ----------------------------------------------------------
def indicate_relay_to_all():
# Indiquer la valeur déjà stockée dans handle_relay
for ch in connections:
ble.gatts_indicate(ch, handle_relay)
# ----------------------------------------------------------
# Traitement commande relais (ON/OFF)
# ----------------------------------------------------------
def handle_relay_command(cmd_bytes):
global relay_state
try:
cmd = cmd_bytes.decode("utf-8").strip().upper()
except:
cmd = ""
if cmd == "ON":
LED.on()
relay_state = b"ON"
print("💡 LED = ON")
elif cmd == "OFF":
LED.off()
relay_state = b"OFF"
print("💡 LED = OFF")
else:
print("⚠️ Commande inconnue (attendu ON/OFF) :", cmd_bytes)
return
# 1) Mettre à jour la valeur officielle dans la GATT
ble.gatts_write(handle_relay, relay_state)
# 2) Envoyer l’état au téléphone par INDICATE (avec ACK)
if connections:
indicate_relay_to_all()
# ----------------------------------------------------------
# IRQ handler
# ----------------------------------------------------------
def irq_handler(event, data):
if event == _IRQ_CENTRAL_CONNECT:
conn_handle, _, _ = data
print("✅ Connecté :", conn_handle)
connections.add(conn_handle)
# Optionnel : écrire l'état actuel du relais dès connexion
ble.gatts_write(handle_relay, relay_state)
elif event == _IRQ_CENTRAL_DISCONNECT:
conn_handle, _, _ = data
print("❌ Déconnecté :", conn_handle)
if conn_handle in connections:
connections.remove(conn_handle)
start_advertising()
elif event == _IRQ_GATTS_WRITE:
conn_handle, value_handle = data
written = ble.gatts_read(value_handle)
# ✅ Si le téléphone a écrit sur la caractéristique relais
if value_handle == handle_relay:
print("📩 Reçu sur RELAIS :", written)
handle_relay_command(written)
elif event == _IRQ_GATTS_INDICATE_DONE:
conn_handle, value_handle, status = data
# status=0 => OK
# print("ℹ️ INDICATE DONE:", conn_handle, value_handle, status)
pass
# ----------------------------------------------------------
# Update capteurs + envoi périodique (temp/hum)
# ----------------------------------------------------------
def update_and_maybe_send(temp_c, hum_pct, do_send=False):
temp_bytes = pack_temperature(temp_c)
hum_bytes = pack_humidity(hum_pct)
ble.gatts_write(handle_temp, temp_bytes)
ble.gatts_write(handle_hum, hum_bytes)
if do_send and connections:
for ch in connections:
ble.gatts_indicate(ch, handle_temp)
ble.gatts_indicate(ch, handle_hum)
# ----------------------------------------------------------
# Main
# ----------------------------------------------------------
def main():
global handle_temp, handle_hum, handle_relay, payload
ble.active(True)
ble.irq(irq_handler)
# ✅ Enregistrer DEUX services : env_sense + relay
# Retourne un tuple de 2 blocs de handles:
# env: (handle_temp, handle_hum)
# relay: (handle_relay,)
(env_handles, relay_handles) = ble.gatts_register_services((_ENV_SENSE_SERVICE, _RELAY_SERVICE))
(handle_temp, handle_hum) = env_handles
(handle_relay,) = relay_handles
print("Handles: temp =", handle_temp, "| hum =", handle_hum, "| relay =", handle_relay)
# Advertising : on annonce les 2 services (plus clair côté téléphone)
payload = advertising_payload(
name="ESP32-Env-Relay",
)
start_advertising()
elapsed = 0
while True:
temp = random.uniform(20, 30)
hum = random.uniform(40, 60)
do_send = (elapsed % SEND_PERIOD == 0)
update_and_maybe_send(temp, hum, do_send=do_send)
time.sleep(UPDATE_PERIOD)
elapsed += UPDATE_PERIOD
if __name__ == "__main__":
main()