# This program controls a full traffic-light roundabout with 5 entrances (A–E).
# It listens to buttons, controls LEDs, and uses fairness so every entrance
# eventually gets a green light even if others are busy.
from machine import Pin, I2C
import time
# These are the timing rules for how long lights stay green/yellow/red.
MIN_GREEN = 10.0 # Minimum time a green light must stay green
MAX_GREEN = 20.0 # Maximum allowed green time
YELLOW = 2.0 # Time the light stays yellow when changing
CLEARANCE = 1.2 # Time where all lights are red for safety
EMERG_CUTOFF_S = 2 # Emergency button cuts a green within this time
HVY_EXTEND_S = 3.0 # Heavy-traffic button extends green by this amount
EMERG_RED_EXT_S = 5.0 # Emergency button on a red approach extends current green
# These are the five entrances into the roundabout.
APPROACHES = ['A','B','C','D','E']
# The two external MCP chips that control LEDs on the roundabout internals.
i2c = I2C(0, sda=Pin(0), scl=Pin(1), freq=400000)
MCP0, MCP1 = 0x20, 0x21
_mcp_img = {MCP0: 0xFFFF, MCP1: 0xFFFF}
# These functions control the MCP chips to turn LEDs on/off.
def _mcp_flush(addr):
v = _mcp_img[addr]
i2c.writeto(addr, bytes([v & 0xFF, (v >> 8) & 0xFF]))
def mcp_init(addr):
_mcp_img[addr] = 0xFFFF
_mcp_flush(addr)
def mcp_set_bit(addr, pin_index, bit_value):
mask = 1 << pin_index
if bit_value: _mcp_img[addr] |= mask
else: _mcp_img[addr] &= ~mask
_mcp_flush(addr)
def mcp_led(addr, pin_index, on=True):
mcp_set_bit(addr, pin_index, 0 if on else 1)
# Entrance LEDs turn ON when the Raspberry Pi Pico outputs HIGH.
gpio_active_high = True
# These are the pin numbers for all the entrance lights on the Pico.
gpio_out_map = {
"A_Red":27, "A_Orange":26, "A_Green":22,
"B_Red":21, "B_Orange":20, "B_Green":19,
"C_Red":17, "C_Orange":16, "C_Green":18,
}
# These are all the internal-roundabout LEDs on MCP chip #1.
mcp1_out_map = {
"A_RedIn":14, "A_OrangeIn":15, "A_GreenIn":7,
"B_RedIn":3, "B_OrangeIn":2, "B_GreenIn":1,
"C_RedIn":0,
"D_RedIn":6, "D_OrangeIn":5, "D_GreenIn":4,
"D_Red":8, "D_Orange":9, "D_Green":10,
"E_Red":11, "E_Orange":12, "E_Green":13,
}
# Internal lights on MCP chip #2.
mcp2_out_map = {
"C_OrangeIn":0, "C_GreenIn":1,
"E_RedIn":8, "E_OrangeIn":9, "E_GreenIn":10,
}
# These are all the physical buttons (emergency + heavy traffic) for each entrance.
gpio_in_map = {
"A_EmBtn":3, "A_HvTraffic":2, "A_HvTrafficIn":12,
"B_EmBtn":4, "B_HvTraffic":5, "B_HvTrafficIn":13,
"C_EmBtn":6, "C_HvTraffic":7, "C_HvTrafficIn":14,
"D_EmBtn":9, "D_HvTraffic":8, "D_HvTrafficIn":15,
"E_EmBtn":10, "E_HvTraffic":11, "E_HvTrafficIn":28,
}
# Set up the hardware.
mcp_init(MCP0)
mcp_init(MCP1)
gpio_out = {n: Pin(p, Pin.OUT, value=1) for n, p in gpio_out_map.items()}
buttons = {n: Pin(p, Pin.IN, Pin.PULL_UP) for n, p in gpio_in_map.items()}
# Helper to turn entrance LEDs on/off.
def _gpio_set(name, on):
gpio_out[name].value(1 if on else 0)
def set_light(name, on=True):
if name in gpio_out: _gpio_set(name, on)
elif name in mcp1_out_map: mcp_led(MCP0, mcp1_out_map[name], on)
elif name in mcp2_out_map: mcp_led(MCP1, mcp2_out_map[name], on)
def read_btn(name):
return buttons[name].value() == 0
# These dicts make it easy to refer to “A_Red”, “A_Green”, etc. by category.
ENT_RED = {a: f"{a}_Red" for a in APPROACHES}
ENT_GRN = {a: f"{a}_Green" for a in APPROACHES}
ENT_ORG = {a: f"{a}_Orange" for a in APPROACHES}
INT_RED = {'A':"A_RedIn",'B':"B_RedIn",'C':"C_RedIn",'D':"D_RedIn",'E':"E_RedIn"}
INT_GRN = {'A':"A_GreenIn",'B':"B_GreenIn",'C':"C_GreenIn",'D':"D_GreenIn",'E':"E_GreenIn"}
INT_ORG = {'A':"A_OrangeIn",'B':"B_OrangeIn",'C':"C_OrangeIn",'D':"D_OrangeIn",'E':"E_OrangeIn"}
# Turns a light on safely without crashing if the name doesn't exist.
def safe_set(name, on):
try: set_light(name, on)
except: pass
# This updates all lights to match the currently active entrance.
# Active entrance: external GREEN, internal RED.
# Others: external RED, internal GREEN.
def enforce_phase_state(active):
for a in APPROACHES:
safe_set(ENT_ORG.get(a,''), False)
safe_set(ENT_GRN[a], (a == active))
safe_set(ENT_RED[a], (a != active))
for a in APPROACHES:
safe_set(INT_ORG.get(a,''), False)
safe_set(INT_RED[a], (a == active))
safe_set(INT_GRN[a], (a != active))
# This remembers the last time each entrance got a green,
# so the system can give a longer green to approaches that waited a long time.
last_served_ms = {a: time.ticks_ms() for a in APPROACHES}
# This calculates how long the next green should be.
# If an entrance has been red for a long time, it gets a longer green (fairness).
def base_green_time(a):
waited = time.ticks_diff(time.ticks_ms(), last_served_ms[a]) / 1000.0
g = MIN_GREEN + min(waited * 0.2, (MAX_GREEN - MIN_GREEN))
return max(MIN_GREEN, min(MAX_GREEN, g))
# Button-edge detector with debounce (only triggers on real button presses).
_prev = {name: buttons[name].value() for name in gpio_in_map}
def read_edges_once():
pressed = set()
for name, pin in buttons.items():
v = pin.value()
if v == 0 and _prev[name] == 1:
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < 70:
if pin.value() != 0: break
time.sleep_ms(1)
else:
_prev[name] = 0
print(f"[PRESS] {name}")
pressed.add(name)
elif v == 1 and _prev[name] == 0:
_prev[name] = 1
return pressed
# This runs one complete green phase for the active entrance.
# It handles emergency buttons, heavy traffic buttons,
# and internal heavy buttons that send the system backwards.
def serve_phase(active, planned_g, next_index_default):
enforce_phase_state(active)
start_ms = time.ticks_ms()
end_ms = time.ticks_add(start_ms, int(planned_g * 1000))
preempt_target = None
while time.ticks_diff(end_ms, time.ticks_ms()) > 0:
edges = read_edges_once()
# Emergency button on the active (green) entrance → cut green soon.
if f"{active}_EmBtn" in edges:
print(f"[EMERGENCY] {active} GREEN → early cut")
emerg_deadline = time.ticks_add(time.ticks_ms(), int(EMERG_CUTOFF_S * 1000))
if time.ticks_diff(end_ms, emerg_deadline) > 0:
end_ms = emerg_deadline
# Emergency on a red entrance → extend current green.
for a in APPROACHES:
if a != active and f"{a}_EmBtn" in edges:
new_end = min(
time.ticks_add(end_ms, int(EMERG_RED_EXT_S * 1000)),
time.ticks_add(start_ms, int(MAX_GREEN * 1000))
)
added = time.ticks_diff(new_end, end_ms) / 1000.0
end_ms = new_end
print(f"[EMERGENCY] {a} RED → extend current green +{added:.1f}s")
# Heavy-traffic logic for external buttons and internal buttons.
for a in APPROACHES:
hv_ext = (f"{a}_HvTraffic" in edges)
hv_int = (f"{a}_HvTrafficIn" in edges)
if not (hv_ext or hv_int):
continue
# External heavy button (on the road outside).
if hv_ext:
if a == active:
# Active entrance → extend green.
new_end = min(
time.ticks_add(end_ms, int(HVY_EXTEND_S * 1000)),
time.ticks_add(start_ms, int(MAX_GREEN * 1000))
)
added = time.ticks_diff(new_end, end_ms) / 1000.0
end_ms = new_end
print(f"[HVY-EXT] {a} GREEN → extend +{added:.1f}s")
else:
# Red external button → jump to that phase next.
if preempt_target is None:
preempt_target = a
print(f"[HVY-EXT] {a} RED → PREEMPT next")
end_ms = time.ticks_ms()
# Internal heavy button (inside the roundabout).
# Internal is GREEN when a != active; internal is RED when a == active.
if hv_int:
if a != active:
# Internal GREEN → extend green.
new_end = min(
time.ticks_add(end_ms, int(HVY_EXTEND_S * 1000)),
time.ticks_add(start_ms, int(MAX_GREEN * 1000))
)
added = time.ticks_diff(new_end, end_ms) / 1000.0
end_ms = new_end
print(f"[HVY-INT] {a} internal GREEN → extend +{added:.1f}s")
else:
# Internal RED on active → go BACKWARD to the previous phase.
if preempt_target is None:
curr_idx = APPROACHES.index(active)
prev_idx = (curr_idx - 1) % len(APPROACHES)
preempt_target = APPROACHES[prev_idx]
print(f"[HVY-INT] {a} internal RED → RETURN to {preempt_target}")
end_ms = time.ticks_ms()
time.sleep_ms(20)
# End of green → yellow → red.
safe_set(f"{active}_Green", False)
safe_set(f"{active}_Orange", True)
time.sleep(YELLOW)
safe_set(f"{active}_Orange", False)
safe_set(f"{active}_Red", True)
# Safety pause where all entrances are red.
time.sleep(CLEARANCE)
# Use preempt (external heavy or internal-return) or continue forward.
if preempt_target is not None:
return APPROACHES.index(preempt_target)
return next_index_default
# This keeps the roundabout running forever,
# selecting the next approach and supplying fairness.
def run():
idx = 0
while True:
active = APPROACHES[idx]
g = base_green_time(active)
idx_next = (idx + 1) % len(APPROACHES)
idx = serve_phase(active, g, idx_next)
last_served_ms[active] = time.ticks_ms()
if __name__ == "__main__":
run()