import time
import board
import analogio
import digitalio
import wifi
import socketpool
import adafruit_minimqtt.adafruit_minimqtt as MQTT
# Configuration
WIFI_SSID, WIFI_PASS = "Your_WiFi_SSID", "Your_WiFi_Password"
MQTT_BROKER = "broker.hivemq.com"
GROUP = "WD01"
# Hardware Setup
ldr = analogio.AnalogIn(board.GP26)
temp = analogio.AnalogIn(board.GP27)
ir = digitalio.DigitalInOut(board.GP16)
ir.direction = digitalio.Direction.INPUT
relay = digitalio.DigitalInOut(board.GP14)
relay.direction = digitalio.Direction.OUTPUT
# MQTT Callbacks
def connected(client, userdata, flags, rc):
client.subscribe(f"{GROUP}/control/relay")
def message(client, topic, msg):
cmd = msg.strip().upper()
if cmd in ("ON", "1"):
relay.value = True
elif cmd in ("OFF", "0"):
relay.value = False
# Network Initialization
wifi.radio.connect(WIFI_SSID, WIFI_PASS)
mqtt_client = MQTT.MQTT(broker=MQTT_BROKER, client_id="PicoW_SmartHome", socket_pool=socketpool.SocketPool(wifi.radio))
mqtt_client.on_connect = connected
mqtt_client.on_message = message
mqtt_client.connect()
last_pub = time.monotonic()
while True:
try:
mqtt_client.loop(timeout=1.0)
# Periodic Sensor Sampling & Publishing (Every 5 seconds)
if (time.monotonic() - last_pub) >= 5.0:
# Task 1: 10 samples averaged and scaled to 12-bit (0-4095)
ldr_avg = sum(ldr.value >> 4 for _ in range(10)) // 10
ir_val = 1 if ir.value else 0
# Task 2: Simulated Temperature scaled (0-50°C)
temp_val = round(((temp.value >> 4) / 4095) * 50, 1)
# Publish Data
mqtt_client.publish(f"{GROUP}/illuminance", str(ldr_avg))
mqtt_client.publish(f"{GROUP}/motion", str(ir_val))
mqtt_client.publish(f"{GROUP}/temperature", str(temp_val))
# Local Automation Overrides
if ldr_avg < 1500:
relay.value = True
last_pub = time.monotonic()
except Exception:
time.sleep(5)
mqtt_client.reconnect()