from machine import Pin, PWM
import time
import math
import network
import espnow
import ubinascii
import random
import json
def enum(**enums):
return type('Enum', (), enums)
GAME_STATE = enum(OPEN=1, LOCK_WAIT=2, WAIT=3)
class Led:
def __init__(self, rgb_pins: list, freq: int = 5000):
assert len(rgb_pins) == 3
self.RED = PWM(Pin(rgb_pins[0]), freq=freq, duty=0)
self.GREEN = PWM(Pin(rgb_pins[1]), freq=freq, duty=0)
self.BLUE = PWM(Pin(rgb_pins[2]), freq=freq, duty=0)
def pulse_random(self, duty: int = 511, t: int = 200, iterations: int = 10):
for i in range(0, iterations):
self.RED.duty(int(math.sin(i / 30 * math.pi) * duty + duty))
self.GREEN.duty(int(math.sin((i+20) / 30 * math.pi) * duty + duty))
self.BLUE.duty(int(math.sin((i+40) / 30 * math.pi) * duty + duty))
time.sleep_ms(t)
def _blink(self, duties: list, t: int = 200, iterations: int = 10):
for i in range(0, iterations):
self.RED.duty(duties[0])
self.GREEN.duty(duties[1])
self.BLUE.duty(duties[2])
time.sleep_ms(t)
self.RED.duty(0)
self.GREEN.duty(0)
self.BLUE.duty(0)
time.sleep_ms(t)
def won(self):
self._blink(duties=[0,1023,0])
def lost(self):
self._blink(duties=[1023,0,0])
def locked(self):
self._blink(duties=[1023,1023,0])
def deinit_pwm():
self.RED.deinit()
self.GREEN.deinit()
self.BLUE.deinit()
SCORES = dict()
BUZZERS = {
b'$\n\xc4\x00\x01\x10': {
"name": "controller",
"score": 0
},
b'$\n\xc4\x00\x01\xc5': {
"name": "red",
"score": 0
}
}
class Controller:
def __init__(self):
sta = network.WLAN(network.STA_IF)
sta.active(True)
self.game_state = GAME_STATE.OPEN
self.mac = sta.config('mac')
self.esp = espnow.ESPNow()
self.esp.active(True)
self.add_peers()
self.esp.irq(self.recv_cb)
self.current_buzzer = None
self.retry_locked_buzzers = []
self.led = Led([23,22,21])
self.won_button = Pin(33, Pin.IN)
self.lost_button = Pin(25, Pin.IN)
self.won_button.irq(trigger = Pin.IRQ_FALLING, handler = self.won_button_handler)
self.lost_button.irq(trigger = Pin.IRQ_FALLING, handler = self.lost_button_handler)
def add_peers(self):
#self.esp.add_peer(self.mac)
for p in BUZZERS:
self.esp.add_peer(p)
def send(self, peers: list, msg):
for p in peers:
self.esp.send(p, json.dumps(msg))
#print(f"SENT {msg} TO {BUZZERS[p]['name']}")
def unlock_others(self):
locked_peers = list(filter(lambda x: x != self.current_buzzer, BUZZERS.keys()))
self.send(peers=locked_peers, msg={"event": "end_turn"})
self.send(peers=[self.current_buzzer], msg={"event": "won"})
def lock_others(self, buzzer):
self.game_state = GAME_STATE.LOCK_WAIT
self.current_buzzer = buzzer
locked_peers = list(filter(lambda x: x != buzzer, BUZZERS.keys()))
self.send(peers=locked_peers, msg={"event": "lock_wait"})
self.send(peers=[buzzer], msg={"event": "lock_check"})
def won_button_handler(self, pin):
if self.game_state == GAME_STATE.LOCK_WAIT:
self.retry_locked_buzzers = []
self.game_state = GAME_STATE.WAIT
locked_peers = list(filter(lambda x: x != self.current_buzzer, BUZZERS.keys()))
self.send(peers=locked_peers, msg={"event": "end_turn"})
self.send(peers=[self.current_buzzer], msg={"event": "won"})
self.led.won()
def lost_button_handler(self, pin):
if self.game_state == GAME_STATE.LOCK_WAIT:
self.retry_locked_buzzers += self.current_buzzer
locked_peers = list(filter(lambda x: x != self.current_buzzer, BUZZERS.keys()))
self.send(peers=locked_peers, msg={"event": "end_turn"})
self.send(peers=[self.current_buzzer], msg={"event": "won"})
self.led.lost()
def launch_button_handler(self, pin):
if self.game_state == GAME_STATE.WAIT:
self.game_state = GAME_STATE.OPEN
def recv_cb(self, e):
while True: # Read out all messages waiting in the buffer
mac, msg = e.irecv(0) # Don't wait if no messages left
if mac is None:
return
#print(f"RECV: {json.loads(msg)} FROM {BUZZERS[mac]['name']}")
if self.game_state == GAME_STATE.OPEN:
self.lock_others(mac)
elif self.game_state == GAME_STATE.LOCK_WAIT:
print("LOCK_WAIT - Discarding message")
def __str__(self):
print("MAC Address:", ":".join(["{:02X}".format(byte) for byte in self.mac]))
def led():
led = Led([23,22,21])
led.locked()
led.won()
led.lost()
#led.deinit_pwm()
if __name__ == "__main__":
controller = Controller()
# controller.add_peer(controller.mac)
# controller.esp.irq(recv_cb)
#print(controller)
while True:
for p in range(1,5):
controller.send([controller.mac],{"player": random.randint(1,5)})
time.sleep(5)