import machine
import time
# --- Pin Definitions ---
RED_LED = machine.Pin(25, machine.Pin.OUT)
ORANGE_LED = machine.Pin(33, machine.Pin.OUT)
GREEN_LED = machine.Pin(32, machine.Pin.OUT)
TRIG_PIN = machine.Pin(12, machine.Pin.OUT)
ECHO_PIN = machine.Pin(13, machine.Pin.IN)
# LoRa Control Pins
LORA_CS = machine.Pin(5, machine.Pin.OUT)
LORA_RST = machine.Pin(14, machine.Pin.OUT)
LORA_IRQ = machine.Pin(26, machine.Pin.IN)
# --- Timer Settings (in milliseconds) ---
GREEN_TIME = 30000 # 30 seconds
ORANGE_TIME = 5000 # 5 seconds
RED_TIME = 30000 # 30 seconds
# --- State Variables ---
STATE_GREEN = 0
STATE_ORANGE = 1
STATE_RED = 2
current_state = STATE_GREEN
previous_light_millis = time.ticks_ms()
# --- Sensor & Simulation Settings ---
DISTANCE_THRESHOLD = 100 # cm
last_broadcast_millis = time.ticks_ms()
COOLDOWN = 2000 # 2 seconds cooldown
simulated_trigger_done = False # Tracks if our 8-second auto-trigger fired
def setup():
print("Master Node Started (MicroPython).")
set_light(STATE_GREEN)
# ---------------------------------------------------------
# 1. Core Logic: Traffic Light Loop & Broadcasting
# ---------------------------------------------------------
def set_light(state):
global current_state, simulated_trigger_done
current_state = state
# Turn all LEDs off first
GREEN_LED.value(0)
ORANGE_LED.value(0)
RED_LED.value(0)
# Turn on correct LED, print messages, and broadcast
if state == STATE_GREEN:
GREEN_LED.value(1)
print("\n=== GREEN LIGHT ===")
print("Green light (safe to cross) ignore if pedestrians crossing.")
print("[LoRa] Sending message to both SLAVE A and SLAVE B: Light is GREEN")
broadcast("STATE:GREEN")
elif state == STATE_ORANGE:
ORANGE_LED.value(1)
print("\n=== ORANGE LIGHT ===")
print("[LoRa] Sending message to both SLAVE A and SLAVE B: Light is ORANGE")
broadcast("STATE:ORANGE")
elif state == STATE_RED:
RED_LED.value(1)
simulated_trigger_done = False # Reset the 8-second timer for the new red phase
print("\n=== RED LIGHT ===")
print("[LoRa] Sending message to both SLAVE A and SLAVE B: Light is RED")
broadcast("STATE:RED")
def handle_traffic_light():
global previous_light_millis
current_millis = time.ticks_ms()
interval = 0
if current_state == STATE_GREEN:
interval = GREEN_TIME
elif current_state == STATE_ORANGE:
interval = ORANGE_TIME
elif current_state == STATE_RED:
interval = RED_TIME
# Check if it's time to change the light
if time.ticks_diff(current_millis, previous_light_millis) >= interval:
previous_light_millis = current_millis
if current_state == STATE_GREEN:
set_light(STATE_ORANGE)
elif current_state == STATE_ORANGE:
set_light(STATE_RED)
elif current_state == STATE_RED:
set_light(STATE_GREEN)
# ---------------------------------------------------------
# 2. Automated Simulation (8 seconds into Red Light)
# ---------------------------------------------------------
def handle_8_second_simulation():
global simulated_trigger_done
if current_state == STATE_RED and not simulated_trigger_done:
current_millis = time.ticks_ms()
# If 8000 milliseconds (8 seconds) have passed since the light turned red
if time.ticks_diff(current_millis, previous_light_millis) >= 8000:
print("\n[8-SEC SIMULATION] ALERT! PEDESTRIAN CROSSING!")
print("[8-SEC SIMULATION] Lora sent message to SLAVE (Turn on the air curtain)")
broadcast("CMD:AIR_ON")
simulated_trigger_done = True # Mark as done so it doesn't spam
# ---------------------------------------------------------
# 3. Real Sensor Check (Still active if you move the slider)
# ---------------------------------------------------------
def handle_ultrasonic():
global last_broadcast_millis
if current_state != STATE_RED:
return
current_millis = time.ticks_ms()
if time.ticks_diff(current_millis, last_broadcast_millis) < COOLDOWN:
return
TRIG_PIN.value(0)
time.sleep_us(2)
TRIG_PIN.value(1)
time.sleep_us(10)
TRIG_PIN.value(0)
try:
pulse_time = machine.time_pulse_us(ECHO_PIN, 1, 30000)
if pulse_time > 0:
distance = (pulse_time / 2) / 29.1
if 0 < distance < DISTANCE_THRESHOLD:
print("\n[MANUAL SENSOR] ALERT! PEDESTRIAN CROSSING!")
print("[MANUAL SENSOR] Lora sent message to SLAVE (Turn on the air curtain)")
broadcast("CMD:AIR_ON")
last_broadcast_millis = current_millis
except OSError:
pass
# ---------------------------------------------------------
# 4. Listening: Check for messages from Slave B
# ---------------------------------------------------------
def handle_lora_receive():
incoming_message = "" # Placeholder for actual LoRa SPI read
if incoming_message == "MSG:B_TRIGGERED":
print("Slave B triggered! Relaying CMD:AIR_ON to everyone.")
broadcast("CMD:AIR_ON")
# ---------------------------------------------------------
# Helper: Send a LoRa Message
# ---------------------------------------------------------
def broadcast(msg):
# Printing to console to simulate the network activity in Wokwi
print("LoRa TX -> " + msg)
# =========================================================
# MAIN LOOP
# =========================================================
setup()
while True:
handle_traffic_light()
handle_8_second_simulation() # Checks for the 8-second mark
handle_ultrasonic() # Checks actual physical Wokwi sensor
handle_lora_receive()
time.sleep_ms(10)