import network
import time
import random
from machine import Pin, PWM, SoftI2C
from hcsr04 import HCSR04
import ssd1306
from umqtt.simple import MQTTClient
# NETPIE CONFIG
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
NETPIE_BROKER = "mqtt.netpie.io"
NETPIE_PORT = 1883
CLIENT_ID = "6f7152cf-8e71-46b1-8ccc-e2119d49f6ff"
TOKEN = "AsTBiQXmsedwVLTHkg2x8QhNPqp27GKL"
SECRET = "QRgrjTUaTQihDdqUnkFgnvyRRGD9akvU"
# NETPIE shadow & feed topics
SHADOW_DATA_TOPIC = "@shadow/data/update"
FEED_TOPIC = "@msg/data"
# 1. HARDWARE PIN ASSIGNMENTS
TRIG_PIN = 5 # GPIO5 -> TRIG
ECHO_PIN = 18 # GPIO18 -> ECHO
I2C_SCL = 22 # GPIO22 -> SCL
I2C_SDA = 21 # GPIO21 -> SDA
LED_FWD_PIN = 16 # GPIO16 (forward)
LED_REV_PIN = 17 # GPIO17 (reverse)
# 2. CONSTANTS
TARGET_MIN = 50.0 # minimum allowed target distance (cm)
TARGET_MAX = 300.0 # maximum allowed target distance (cm)
# Maximum possible speed: full range in 1 second -> (300-50)/100 m/s
MAX_SPEED = (TARGET_MAX - TARGET_MIN) / 100.0 # 2.5 m/s
MY_NAME = "Punyakorn Juichaloen"
MY_ID = "6710546489"
# 3. INITIALISE HARDWARE
# Ultrasonic sensor
sensor = HCSR04(trigger_pin=TRIG_PIN, echo_pin=ECHO_PIN, echo_timeout_us=10000)
# OLED display
i2c = SoftI2C(scl=Pin(I2C_SCL), sda=Pin(I2C_SDA)) # software I2C
oled = ssd1306.SSD1306_I2C(128, 64, i2c) # 128×64 pixel OLED
# PWM LEDs
pwm_fwd = PWM(Pin(LED_FWD_PIN), freq=1000) # forward LED PWM, 1 kHz carrier
pwm_rev = PWM(Pin(LED_REV_PIN), freq=1000) # reverse LED PWM, 1 kHz carrier
pwm_fwd.duty(0) # start both LEDs off
pwm_rev.duty(0)
# 4. HELPER FUNCTIONS
def read_distance():
"""Read distance in cm from HC-SR04. Returns float or None on error."""
try:
dist = sensor.distance_cm()
if dist <= 0 or dist > 400: # reject obviously bad readings
return None
return dist
except Exception:
return None
def update_oled(target, actual, direction, speed):
"""Refresh all four fields on the 128×64 OLED."""
oled.fill(0) # clear screen
oled.text("Target: {:.1f} cm".format(target), 0, 0) # row 0
oled.text("Actual: {:.1f} cm".format(actual), 0, 16) # row 16
oled.text("Dir : {}".format(direction), 0, 32) # row 32
oled.text("Speed : {:.3f} m/s".format(speed), 0, 48) # row 48
oled.show() # push buffer to display
def compute_speed_and_direction(target, actual):
"""
Speed = |target - actual| cm, converted to m/s assuming 1-second travel time.
Direction: 'forward' if actual > target (car too far, move toward wall),
'reverse' if actual < target (car too close, move away from wall).
Returns (speed_ms, direction_str)
"""
error = target - actual # positive -> need to go forward
speed_ms = abs(error) / 100.0 # cm -> m, assume 1 s travel time
direction = "forward" if error > 0 else "reverse"
return speed_ms, direction
def set_leds(speed_ms, direction):
"""
Map speed (0 … MAX_SPEED m/s) to PWM duty (0 … 1023).
Only the LED matching the current direction lights up.
"""
duty = int((speed_ms / MAX_SPEED) * 1023) # linear mapping
duty = max(0, min(1023, duty)) # clamp to valid range
if direction == "forward":
pwm_fwd.duty(duty) # light forward LED proportional to speed
pwm_rev.duty(0) # reverse LED off
else:
pwm_rev.duty(duty) # light reverse LED proportional to speed
pwm_fwd.duty(0) # forward LED off
def parse_target_command(cmd):
"""
Parse 'target = XXX' in a case-insensitive, whitespace-tolerant way.
Returns float value if valid, or raises ValueError with a message.
"""
cmd_clean = cmd.strip().lower().replace(" ", "") # normalise: lowercase, no spaces
if not cmd_clean.startswith("target="): # start with 'target='
raise ValueError("Unknown command")
val_str = cmd_clean[len("target="):] # extract the number part
try:
val = float(val_str) # try converting to float
except ValueError:
raise ValueError("Value '{}' is not a number".format(val_str))
if not (TARGET_MIN <= val <= TARGET_MAX): # range check
raise ValueError("Target {:.1f} out of range [{:.0f}-{:.0f}]".format(
val, TARGET_MIN, TARGET_MAX))
return val
# 5. WI-FI CONNECTION
def connect_wifi():
"""Connect to Wi-Fi and block until connected (or print error)."""
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print("Connecting to Wi-Fi ...")
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
timeout = 15
while not wlan.isconnected() and timeout > 0:
time.sleep(1)
timeout -= 1
if wlan.isconnected():
print("Wi-Fi connected:", wlan.ifconfig()[0])
else:
print("Wi-Fi FAILED – continuing without network")
# 6. NETPIE MQTT SETUP
mqtt_client = None
def netpie_connect():
"""Create MQTT connection to NETPIE broker."""
global mqtt_client
try:
mqtt_client = MQTTClient(
client_id = CLIENT_ID,
server = NETPIE_BROKER,
port = NETPIE_PORT,
user = TOKEN,
password = SECRET,
keepalive = 60
)
mqtt_client.set_callback(on_netpie_message)
mqtt_client.connect()
# Subscribe to the incoming command topic from dashboard
mqtt_client.subscribe("@msg/cmd")
print("NETPIE connected")
except Exception as e:
print("NETPIE connect error:", e)
mqtt_client = None
def on_netpie_message(topic, msg):
"""
Callback fired when a message arrives on a subscribed NETPIE topic.
Handles: 'target = XXX' commands AND 'name' command from dashboard button.
"""
global target_distance
cmd = msg.decode("utf-8").strip()
print("NETPIE cmd received:", cmd)
# Check for 'name' command first (case-insensitive)
if cmd.strip().lower() == "name":
handle_name_command()
return
try:
new_target = parse_target_command(cmd)
target_distance = new_target
print("Target updated to {:.1f} cm".format(target_distance))
except ValueError as e:
# Publish error back as a feed message so it appears somewhere visible
pub_feed({"error": str(e)})
print("Command error:", e)
def publish_shadow(target, actual, speed):
"""Write target, actual, speed to NETPIE device shadow."""
if mqtt_client is None:
return
# NETPIE shadow payload format: {"data": { ... }}
payload = '{{"data":{{"target":{t:.2f},"actual":{a:.2f},"speed":{s:.4f}}}}}'.format(
t=target, a=actual, s=speed)
try:
mqtt_client.publish(SHADOW_DATA_TOPIC, payload)
except Exception as e:
print("Shadow publish error:", e)
def pub_feed(data_dict):
pairs = ",".join('"{}":"{}"'.format(k, v) for k, v in data_dict.items())
payload = "{" + pairs + "}"
try:
mqtt_client.publish(FEED_TOPIC, payload)
except Exception as e:
print("Feed publish error:", e)
def handle_name_command():
rnd = random.randint(1000, 9999) # generate a 4-digit random number
output = "{} | {} | {}".format(MY_NAME, MY_ID, rnd)
print("NAME CMD ->", output)
# Send directly to dashboard via feed
pub_feed({"name_info": output})
# 7. MAIN LOOP
def main():
global target_distance
# Initial state
target_distance = 150.0 # default target: 150 cm from wall
# Connect Wi-Fi & NETPIE
connect_wifi()
netpie_connect()
# OLED splash screen
oled.fill(0)
oled.text("IAI & IoT HW5", 10, 20)
oled.text("Initialising...", 5, 40)
oled.show()
time.sleep(2)
last_publish = time.ticks_ms() # timestamp of last NETPIE publish
PUBLISH_INTERVAL_MS = 2000 # publish every 2 seconds
print("System ready. Default target:", target_distance, "cm")
while True:
# (a) Check for incoming NETPIE messages
if mqtt_client:
try:
mqtt_client.check_msg()
except Exception as e:
print("MQTT check error:", e)
netpie_connect()
# (b) Read sensor-
actual = read_distance()
if actual is None:
actual = target_distance # fallback, pretend at target on bad read
print("Sensor error – using target as fallback")
# (c) Compute speed & direction
speed_ms, direction = compute_speed_and_direction(target_distance, actual)
# (d) Drive LEDs
set_leds(speed_ms, direction)
# (e) Update OLED
update_oled(target_distance, actual, direction, speed_ms)
# (f) Handle serial commands
try:
import sys, select
r, _, _ = select.select([sys.stdin], [], [], 0.05)
if r:
line = sys.stdin.readline().strip()
if line:
# Handle 'name' command from serial
if line.lower() == "name":
handle_name_command()
else:
try:
new_t = parse_target_command(line)
target_distance = new_t
print("Target set to {:.1f} cm".format(target_distance))
except ValueError as e:
print("Error:", e)
except Exception:
pass
# (g) Periodic NETPIE shadow publish
now = time.ticks_ms()
if time.ticks_diff(now, last_publish) >= PUBLISH_INTERVAL_MS:
publish_shadow(target_distance, actual, speed_ms)
# Send direction directly to feed
pub_feed({"direction": direction})
last_publish = now
time.sleep_ms(100) # 100 ms main loop cadence
# 8. ENTRY POINT
main() # run forever