import network
import time
import ubinascii
import machine
from machine import Pin, ADC, PWM, time_pulse_us
import dht
from umqtt.simple import MQTTClient
# RescueDose Sentinel
# Smart medical cold-chain and anti-tamper box
# WiFi
SSID = "Wokwi-GUEST"
PASSWORD = ""
# MQTT
MQTT_BROKER = "mqtt-dashboard.com"
MQTT_PORT = 1883
BASE_TOPIC = "ishou/iot/markoh/rescuedose"
TOPIC_READING = BASE_TOPIC + "/reading"
TOPIC_ACTION = BASE_TOPIC + "/action"
TOPIC_CONTROL = BASE_TOPIC + "/control"
CLIENT_ID = "rescuedose-" + ubinascii.hexlify(machine.unique_id()).decode()
DEVICE_NAME = "rescuedose_sentinel_esp32"
# Sensors and outputs
dht_sensor = dht.DHT22(Pin(15))
ldr = ADC(Pin(34))
ldr.atten(ADC.ATTN_11DB)
trig = Pin(5, Pin.OUT)
echo = Pin(19, Pin.IN)
servo = PWM(Pin(18), freq=50)
buzzer = PWM(Pin(23))
buzzer.freq(1000)
buzzer.duty(0)
red_led = Pin(25, Pin.OUT)
yellow_led = Pin(26, Pin.OUT)
green_led = Pin(27, Pin.OUT)
# Thresholds
SAFE_TEMP_MIN = 2
SAFE_TEMP_MAX = 8
WARNING_TEMP_MAX = 12
HUMIDITY_WARNING = 75
# light_raw is raw ADC value, not lux.
# dark / closed box = high light_raw
# bright / opened box = low light_raw
LIGHT_OPEN_THRESHOLD = 500
PACKAGE_MISSING_DISTANCE = 30
READ_DELAY = 3
# Servo angles
# Default state is locked.
# If the servo looks reversed in Wokwi, swap these two values.
LOCKED_ANGLE = 90
UNLOCKED_ANGLE = 0
DEFAULT_SERVO_ANGLE = LOCKED_ANGLE
# Current output states
servo_state = "LOCKED"
buzzer_state = "OFF"
led_state = "GREEN"
# This keeps remote unlock active until LOCK, RESET, or critical alert
remote_unlocked = False
# LED control
def set_leds(red=False, yellow=False, green=False):
global led_state
red_led.value(1 if red else 0)
yellow_led.value(1 if yellow else 0)
green_led.value(1 if green else 0)
if red:
led_state = "RED"
elif yellow:
led_state = "YELLOW"
elif green:
led_state = "GREEN"
else:
led_state = "OFF"
# Buzzer control
def buzzer_on():
global buzzer_state
buzzer.duty(512)
buzzer_state = "ON"
def buzzer_off():
global buzzer_state
buzzer.duty(0)
buzzer_state = "OFF"
# Servo control
def servo_angle(angle):
min_duty = 40
max_duty = 115
duty = int(min_duty + (angle / 180) * (max_duty - min_duty))
servo.duty(duty)
def lock_box():
global servo_state
servo_angle(LOCKED_ANGLE)
servo_state = "LOCKED"
def unlock_box():
global servo_state
servo_angle(UNLOCKED_ANGLE)
servo_state = "UNLOCKED"
def reset_servo_default():
lock_box()
# Read ultrasonic distance
def read_distance_cm():
trig.value(0)
time.sleep_us(2)
trig.value(1)
time.sleep_us(10)
trig.value(0)
try:
duration = time_pulse_us(echo, 1, 30000)
if duration < 0:
return 999
return round(duration / 58, 2)
except OSError:
return 999
# Connect to WiFi
def connect_wifi():
wifi = network.WLAN(network.STA_IF)
wifi.active(False)
time.sleep(1)
wifi.active(True)
time.sleep(1)
print()
print()
print("========================================")
print("RescueDose Sentinel starting...")
print("========================================")
print()
print("Connecting to WiFi")
print("SSID:", SSID)
print()
wifi.connect(SSID, PASSWORD)
while not wifi.isconnected():
print("Still connecting... status =", wifi.status())
time.sleep(1)
print()
print("WiFi connected!")
print("IP address:", wifi.ifconfig()[0])
print("========================================")
print()
# Send message to MQTT
def publish_message(topic, message):
client.publish(topic, message)
print()
print("MQTT SENT")
print("----------------------------------------")
print("Topic:")
print(topic)
print()
print("Message:")
print(message)
print("----------------------------------------")
print()
# Handle remote control from HiveMQ
def mqtt_callback(topic, msg):
global remote_unlocked
print()
print()
print("MQTT COMMAND RECEIVED")
print("----------------------------------------")
print("Topic:", topic)
print("Message:", msg)
print()
try:
command = msg.decode().strip().upper()
# Accept simple command or JSON-looking command
# Example: UNLOCK
# Example: {"command":"UNLOCK"}
if "UNLOCK" in command:
command = "UNLOCK"
elif "LOCK" in command:
command = "LOCK"
elif "RESET" in command:
command = "RESET"
elif "STATUS" in command:
command = "STATUS"
except:
command = ""
print("Command:", command)
print()
if command == "LOCK":
remote_unlocked = False
lock_box()
buzzer_off()
set_leds(yellow=True)
action_msg = (
"Remote Action | "
"Device: " + DEVICE_NAME +
" | Command: LOCK"
" | Servo: " + servo_state +
" | ServoAngle: " + str(LOCKED_ANGLE) +
" | Buzzer: " + buzzer_state +
" | LED: " + led_state +
" | Message: Box locked by remote MQTT command"
)
publish_message(TOPIC_ACTION, action_msg)
elif command == "UNLOCK":
remote_unlocked = True
unlock_box()
buzzer_off()
set_leds(green=True)
action_msg = (
"Remote Action | "
"Device: " + DEVICE_NAME +
" | Command: UNLOCK"
" | Servo: " + servo_state +
" | ServoAngle: " + str(UNLOCKED_ANGLE) +
" | Buzzer: " + buzzer_state +
" | LED: " + led_state +
" | Message: Box unlocked by remote MQTT command"
)
publish_message(TOPIC_ACTION, action_msg)
elif command == "RESET":
remote_unlocked = False
reset_servo_default()
buzzer_off()
set_leds(green=True)
action_msg = (
"Remote Action | "
"Device: " + DEVICE_NAME +
" | Command: RESET"
" | Servo: " + servo_state +
" | ServoAngle: " + str(DEFAULT_SERVO_ANGLE) +
" | Buzzer: " + buzzer_state +
" | LED: " + led_state +
" | Message: System reset to default locked state"
)
publish_message(TOPIC_ACTION, action_msg)
elif command == "STATUS":
action_msg = (
"Remote Action | "
"Device: " + DEVICE_NAME +
" | Command: STATUS"
" | Servo: " + servo_state +
" | Buzzer: " + buzzer_state +
" | LED: " + led_state +
" | RemoteUnlocked: " + str(remote_unlocked)
)
publish_message(TOPIC_ACTION, action_msg)
else:
print("Unknown command.")
print("Use LOCK, UNLOCK, RESET, or STATUS.")
print("----------------------------------------")
print()
# Connect to MQTT
def connect_mqtt():
mqtt = MQTTClient(CLIENT_ID, MQTT_BROKER, port=MQTT_PORT)
mqtt.set_callback(mqtt_callback)
print("Connecting to MQTT broker")
print("Broker:", MQTT_BROKER)
print("Port:", MQTT_PORT)
print()
mqtt.connect()
print("MQTT connected!")
print()
mqtt.subscribe(TOPIC_CONTROL)
print("Subscribed to control topic:")
print(TOPIC_CONTROL)
print("========================================")
print()
return mqtt
# Decide system status
def check_status(temp, humidity, light_raw, distance_cm):
global remote_unlocked
lid_open = light_raw <= LIGHT_OPEN_THRESHOLD
package_missing = distance_cm >= PACKAGE_MISSING_DISTANCE
if lid_open:
remote_unlocked = False
status = "TAMPER_ALERT"
severity = "CRITICAL"
action = "LOCK_AND_ALARM"
message = "Box may have been opened. Bright light detected by LDR."
lock_box()
buzzer_on()
set_leds(red=True)
elif package_missing:
remote_unlocked = False
status = "PACKAGE_REMOVED"
severity = "CRITICAL"
action = "LOCK_AND_ALARM"
message = "Medicine package may have been removed."
lock_box()
buzzer_on()
set_leds(red=True)
elif temp < SAFE_TEMP_MIN or temp > WARNING_TEMP_MAX:
remote_unlocked = False
status = "COLD_CHAIN_BREACH"
severity = "CRITICAL"
action = "LOCK_AND_ALARM"
message = "Temperature is outside the safe medical range."
lock_box()
buzzer_on()
set_leds(red=True)
elif temp > SAFE_TEMP_MAX or humidity >= HUMIDITY_WARNING:
status = "TEMP_WARNING"
severity = "WARNING"
if remote_unlocked:
action = "REMOTE_UNLOCKED_WARNING"
message = "Warning detected, but box is currently unlocked by remote command."
unlock_box()
else:
action = "WARNING_MODE"
message = "Temperature or humidity is near unsafe level."
lock_box()
buzzer_off()
set_leds(yellow=True)
else:
status = "SAFE"
severity = "NORMAL"
if remote_unlocked:
action = "REMOTE_UNLOCKED_MONITORING"
message = "Medical box is safe, and box is unlocked by remote command."
unlock_box()
else:
action = "NORMAL_MONITORING"
message = "Medical box condition is safe."
lock_box()
buzzer_off()
set_leds(green=True)
return {
"status": status,
"severity": severity,
"action": action,
"message": message,
"lid_open": lid_open,
"package_missing": package_missing
}
# Detailed Serial Monitor message
def make_serial_message(reading_number, temp, humidity, light_raw, distance_cm, result):
text = (
"========================================\n"
"RescueDose Sentinel - Reading #" + str(reading_number) + "\n"
"========================================\n"
"\n"
"Sensor values\n"
"----------------------------------------\n"
"Temperature : " + str(temp) + " C\n"
"Humidity : " + str(humidity) + " %\n"
"Light raw : " + str(light_raw) + "\n"
"Distance : " + str(distance_cm) + " cm\n"
"\n"
"Detection\n"
"----------------------------------------\n"
"Lid open : " + str(result["lid_open"]) + "\n"
"Package missing : " + str(result["package_missing"]) + "\n"
"\n"
"System result\n"
"----------------------------------------\n"
"Status : " + result["status"] + "\n"
"Severity : " + result["severity"] + "\n"
"Action : " + result["action"] + "\n"
"Message : " + result["message"] + "\n"
"\n"
"Outputs\n"
"----------------------------------------\n"
"Servo : " + servo_state + "\n"
"Buzzer : " + buzzer_state + "\n"
"LED : " + led_state + "\n"
"Remote unlocked : " + str(remote_unlocked) + "\n"
"\n"
"Thresholds\n"
"----------------------------------------\n"
"Safe temp : " + str(SAFE_TEMP_MIN) + " C to " + str(SAFE_TEMP_MAX) + " C\n"
"Critical temp > : " + str(WARNING_TEMP_MAX) + " C\n"
"Humidity warn : " + str(HUMIDITY_WARNING) + " %\n"
"Lid open raw <= : " + str(LIGHT_OPEN_THRESHOLD) + "\n"
"Package miss >= : " + str(PACKAGE_MISSING_DISTANCE) + " cm\n"
"Servo locked : " + str(LOCKED_ANGLE) + " degrees\n"
"Servo unlocked : " + str(UNLOCKED_ANGLE) + " degrees\n"
"Default servo : LOCKED\n"
"========================================"
)
return text
# Short HiveMQ message
def make_mqtt_message(reading_number, temp, humidity, light_raw, distance_cm, result):
if result["lid_open"]:
lid_state = "OPEN"
else:
lid_state = "CLOSED"
if result["package_missing"]:
package_state = "MISSING"
else:
package_state = "PRESENT"
text = (
"Reading #" + str(reading_number) +
" | Status: " + result["status"] +
" | Severity: " + result["severity"] +
" | Temp: " + str(temp) + "C" +
" | Humidity: " + str(humidity) + "%" +
" | LightRaw: " + str(light_raw) +
" | Distance: " + str(distance_cm) + "cm" +
" | Lid: " + lid_state +
" | Package: " + package_state +
" | Servo: " + servo_state +
" | Buzzer: " + buzzer_state +
" | LED: " + led_state +
" | Action: " + result["action"]
)
return text
# Print detailed reading
def print_reading(message):
print()
print()
print(message)
print()
# Start program
connect_wifi()
client = connect_mqtt()
reset_servo_default()
buzzer_off()
set_leds(green=True)
start_msg = (
"System Started | "
"Device: " + DEVICE_NAME +
" | Servo: " + servo_state +
" | ServoAngle: " + str(DEFAULT_SERVO_ANGLE) +
" | Buzzer: " + buzzer_state +
" | LED: " + led_state +
" | Message: RescueDose Sentinel started in default locked state"
)
publish_message(TOPIC_ACTION, start_msg)
reading_number = 1
while True:
try:
# Check for remote MQTT commands
client.check_msg()
# Read sensors
dht_sensor.measure()
temperature = dht_sensor.temperature()
humidity = dht_sensor.humidity()
light_raw = ldr.read()
distance_cm = read_distance_cm()
# Check system status
result = check_status(temperature, humidity, light_raw, distance_cm)
# Make separate messages
serial_message = make_serial_message(
reading_number,
temperature,
humidity,
light_raw,
distance_cm,
result
)
mqtt_message = make_mqtt_message(
reading_number,
temperature,
humidity,
light_raw,
distance_cm,
result
)
# Show detailed message in Serial Monitor
print_reading(serial_message)
# Send short message to HiveMQ
publish_message(TOPIC_READING, mqtt_message)
reading_number += 1
time.sleep(READ_DELAY)
except Exception as e:
print()
print()
print("ERROR")
print("----------------------------------------")
print("Something went wrong:")
print(e)
print("----------------------------------------")
print()
time.sleep(3)