from machine import Pin, SPI
import network
import socket
import json
import time
from neopixel import NeoPixel
# Network Configuration
ATEM_HOST = '10.0.20.186'
ATEM_PORT = 9977
# NeoPixel Configuration
LED_PIN = 28
GRID_SIZE = 8
NUM_PIXELS = GRID_SIZE * GRID_SIZE
BRIGHTNESS = 50
# Initialize NeoPixel grid
grid = NeoPixel(Pin(LED_PIN, Pin.OUT), NUM_PIXELS)
# Initialize W5500 Ethernet (SPI1)
spi_eth = SPI(1,
sck=Pin(10, Pin.OUT),
mosi=Pin(11, Pin.OUT),
miso=Pin(12, Pin.IN))
nic_cs = Pin(13, Pin.OUT)
nic_rst = Pin(14, Pin.OUT)
class TallyLight:
def __init__(self):
self.is_program = False
self.socket = None
self.connected = False
self.demo_mode = False
self.demo_state = 0
self.demo_timer = 0
self.clear_grid()
def clear_grid(self):
"""Clear the LED grid"""
for i in range(NUM_PIXELS):
grid[i] = (0, 0, 0)
grid.write()
def show_program(self):
"""Show Program state (RED)"""
for i in range(NUM_PIXELS):
grid[i] = (BRIGHTNESS, 0, 0)
grid.write()
def show_preview(self):
"""Show Preview state (GREEN)"""
for i in range(NUM_PIXELS):
grid[i] = (0, BRIGHTNESS, 0)
grid.write()
def run_demo(self):
"""Run demo mode sequence"""
current_time = time.ticks_ms()
# Change state every 2 seconds
if time.ticks_diff(current_time, self.demo_timer) >= 2000:
self.demo_timer = current_time
self.demo_state = (self.demo_state + 1) % 3
if self.demo_state == 0:
print("Demo: Program (RED)")
self.show_program()
elif self.demo_state == 1:
print("Demo: Preview (GREEN)")
self.show_preview()
else:
print("Demo: Off")
self.clear_grid()
def connect_ethernet(self):
"""Initialize and connect Ethernet"""
try:
print("Initializing Ethernet...")
eth = network.WIZNET5K(spi_eth, nic_cs, nic_rst)
eth.active(True)
eth.ifconfig('dhcp') # Use DHCP to get an IP address
print("Ethernet initialized. IP address:", eth.ifconfig()[0])
return True
except Exception as e:
print(f"Ethernet connection error: {e}")
return False
def process_tally_data(self, data):
"""Process received tally data"""
try:
tally_state = json.loads(data)
print(f"Received tally state: {tally_state}")
program_state = tally_state.get('program', False)
if program_state:
self.is_program = True
self.show_program()
else:
self.is_program = False
self.show_preview()
except json.JSONDecodeError:
print("Invalid tally data received")
except Exception as e:
print(f"Error processing tally data: {e}")
def run(self):
"""Main run loop"""
print("Starting Tally Light System...")
if not self.connect_ethernet():
print("Network connection failed. Entering demo mode...")
self.demo_mode = True
while True:
if self.demo_mode:
self.run_demo()
continue
try:
if not self.connected:
if not self.connect_atem():
print("ATEM connection failed. Retrying in 5 seconds...")
self.clear_grid()
time.sleep(5)
continue
data = self.socket.recv(1024)
if data:
self.process_tally_data(data.decode())
time.sleep(0.1)
except socket.timeout:
print("Connection timed out. Reconnecting...")
self.connected = False
except Exception as e:
print(f"Error in main loop: {e}")
self.connected = False
if self.socket:
self.socket.close()
self.socket = None
time.sleep(5)
def connect_atem(self):
"""Connect to ATEM Switcher"""
try:
print(f"Connecting to ATEM at {ATEM_HOST}:{ATEM_PORT}...")
self.socket = socket.socket()
self.socket.settimeout(5)
self.socket.connect((ATEM_HOST, ATEM_PORT))
print("Connected to ATEM Tally Server")
self.connected = True
return True
except Exception as e:
print(f"ATEM connection error: {e}")
self.connected = False
if self.socket:
self.socket.close()
self.socket = None
return False
def main():
try:
tally = TallyLight()
tally.run()
except KeyboardInterrupt:
print("\nProgram stopped by user")
grid.fill((0, 0, 0))
grid.write()
except Exception as e:
print(f"Fatal error: {e}")
grid.fill((0, 0, 0))
grid.write()
if __name__ == "__main__":
main()