"""
ESP32-S3 MicroPython: QR payment display + poll Django API.
Pin map (same as Wokwi diagram — edit PIN_* constants below if you rewire):
Component Signal ESP32-S3 GPIO
---------- -------- -------------
OLED (I2C) SDA 18
OLED (I2C) SCL 17
OLED VCC 3V3
OLED GND GND
Buzzer + 4
Buzzer - GND
Relay module IN 19
Relay VCC 3V3
Relay GND GND
I2C: SoftI2C @ I2C_FREQ Hz. OLED is usually 0x3C; jumpers can make 0x3D.
If auto picks the wrong address, set OLED_I2C_ADDR_FORCE = 0x3C (or 0x3D).
SSD1306 I2C driver: micropython-lib / MIT (inlined).
"""
import framebuf
import json
import socket
import time
import network
from machine import I2C, Pin, PWM, SoftI2C
from micropython import const
def http_get_json(url, timeout=30):
"""Minimal HTTP/1.0 GET; HTTP only (no TLS)."""
if url.startswith("https://"):
raise OSError("HTTPS not implemented; use HTTP or add ussl.")
if not url.startswith("http://"):
raise ValueError("url must start with http://")
rest = url[7:]
slash = rest.find("/")
if slash < 0:
host_port, path = rest, "/"
else:
host_port, path = rest[:slash], rest[slash:]
if ":" in host_port:
host, port_s = host_port.rsplit(":", 1)
port = int(port_s)
else:
host, port = host_port, 80
ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0]
s = socket.socket(ai[0], ai[1], ai[2])
s.settimeout(timeout)
s.connect(ai[-1])
req = ("GET %s HTTP/1.0\r\nHost: %s\r\nConnection: close\r\n\r\n" % (path, host_port)).encode()
s.send(req)
chunks = []
while True:
try:
b = s.recv(4096)
except OSError:
break
if not b:
break
chunks.append(b)
s.close()
raw = b"".join(chunks)
i = raw.find(b"\r\n\r\n")
if i < 0:
raise OSError("bad HTTP response")
body = raw[i + 4 :]
return json.loads(body)
# --- SSD1306 128x64 (I2C) ---
_SET_CONTRAST = const(0x81)
_SET_ENTIRE_ON = const(0xA4)
_SET_NORM_INV = const(0xA6)
_SET_DISP = const(0xAE)
_SET_MEM_ADDR = const(0x20)
_SET_COL_ADDR = const(0x21)
_SET_PAGE_ADDR = const(0x22)
_SET_DISP_START_LINE = const(0x40)
_SET_SEG_REMAP = const(0xA0)
_SET_MUX_RATIO = const(0xA8)
_SET_IREF_SELECT = const(0xAD)
_SET_COM_OUT_DIR = const(0xC0)
_SET_DISP_OFFSET = const(0xD3)
_SET_COM_PIN_CFG = const(0xDA)
_SET_DISP_CLK_DIV = const(0xD5)
_SET_PRECHARGE = const(0xD9)
_SET_VCOM_DESEL = const(0xDB)
_SET_CHARGE_PUMP = const(0x8D)
class SSD1306(framebuf.FrameBuffer):
def __init__(self, width, height, external_vcc):
self.width = width
self.height = height
self.external_vcc = external_vcc
self.pages = self.height // 8
self.buffer = bytearray(self.pages * self.width)
super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB)
self.init_display()
def init_display(self):
for cmd in (
_SET_DISP,
_SET_MEM_ADDR,
0x00,
_SET_DISP_START_LINE,
_SET_SEG_REMAP | 0x01,
_SET_MUX_RATIO,
self.height - 1,
_SET_COM_OUT_DIR | 0x08,
_SET_DISP_OFFSET,
0x00,
_SET_COM_PIN_CFG,
0x02 if self.width > 2 * self.height else 0x12,
_SET_DISP_CLK_DIV,
0x80,
_SET_PRECHARGE,
0x22 if self.external_vcc else 0xF1,
_SET_VCOM_DESEL,
0x30,
_SET_CONTRAST,
0xFF,
_SET_ENTIRE_ON,
_SET_NORM_INV,
_SET_IREF_SELECT,
0x30,
_SET_CHARGE_PUMP,
0x10 if self.external_vcc else 0x14,
_SET_DISP | 0x01,
):
self.write_cmd(cmd)
self.fill(0)
self.show()
def show(self):
x0 = 0
x1 = self.width - 1
if self.width != 128:
col_offset = (128 - self.width) // 2
x0 += col_offset
x1 += col_offset
self.write_cmd(_SET_COL_ADDR)
self.write_cmd(x0)
self.write_cmd(x1)
self.write_cmd(_SET_PAGE_ADDR)
self.write_cmd(0)
self.write_cmd(self.pages - 1)
self.write_data(self.buffer)
class SSD1306_I2C(SSD1306):
# Smaller chunks + retries reduce ENODEV on SoftI2C after RF / long wires.
_I2C_DATA_CHUNK = 16
def __init__(self, width, height, i2c, addr=0x3C, external_vcc=False):
self.i2c = i2c
self.addr = addr
self.temp = bytearray(2)
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.temp[0] = 0x80
self.temp[1] = cmd
_i2c_writeto(self.i2c, self.addr, self.temp)
def write_data(self, buf):
mv = memoryview(buf)
c = self._I2C_DATA_CHUNK
for i in range(0, len(mv), c):
part = mv[i : i + c]
chunk = bytearray(1 + len(part))
chunk[0] = 0x40
chunk[1:] = part
_i2c_writeto(self.i2c, self.addr, chunk)
# --- WiFi / API (edit for your network and Django host) ---
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# Base URL, no trailing slash (HTTP only; see http_get_json for TLS)
API_BASE = "http://115.69.213.102:4044"
DEVICE_ID = "esp32-wash-1"
# --- GPIO (must match your wiring / Wokwi) ---
PIN_I2C_SDA = 18
PIN_I2C_SCL = 17
PIN_BUZZER = 4
PIN_RELAY = 19
# None = auto-detect 0x3C / 0x3D. Set to 0x3C or 0x3D if display glitches or ENODEV after init.
OLED_I2C_ADDR_FORCE = None
# If scan finds nothing, last-chance probe uses this address first.
OLED_I2C_ADDR_FALLBACK = 0x3C
# Marginal wiring / RF noise: start at 50 kHz; raise to 100_000–400_000 when stable.
I2C_FREQ = 50_000
# Retries for transient ENODEV (19) after WiFi activity or capacitive bus.
I2C_WRITE_RETRIES = 6
I2C_RETRY_DELAY_MS = 3
# Songle-style opto relay boards often turn ON when IN is LOW.
RELAY_ACTIVE_LOW = True
POLL_MS = 1500
BUZZER_FREQ_HZ = 3500
BUZZER_MS = 3000
PAYMENT_SUCCESS_ON_MS = 10_000
# True: black modules on white field (typical for phone scanners). False: inverted.
QR_BLACK_ON_WHITE = True
# Extra border in pixels around the QR (0 = use full panel; encoder should include quiet zone in matrix).
QR_PIXEL_MARGIN = 0
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
for _ in range(60):
if wlan.isconnected():
break
time.sleep(1)
if not wlan.isconnected():
raise OSError("WiFi connect failed")
print("WiFi OK", wlan.ifconfig())
def _is_enodev(err):
return getattr(err, "errno", None) == 19 or (err.args and err.args[0] == 19)
def _i2c_writeto(bus, addr, buf):
for attempt in range(I2C_WRITE_RETRIES):
try:
bus.writeto(addr, buf)
return
except OSError as e:
if not _is_enodev(e) or attempt + 1 >= I2C_WRITE_RETRIES:
raise
time.sleep_ms(I2C_RETRY_DELAY_MS)
def _probe_oled_ack(bus):
"""SSD1306 ACK check — do not trust scan() alone (floating SDA can ghost many addrs)."""
probe = b"\x80\xAE" # command stream, display off
for addr in (0x3C, 0x3D):
try:
_i2c_writeto(bus, addr, probe)
return addr
except OSError:
pass
return None
def _i2c_gpio_pins():
"""Weak internal pull-ups help some breakout wires; module usually has 4k7 too."""
return (
Pin(PIN_I2C_SCL, Pin.IN, Pin.PULL_UP),
Pin(PIN_I2C_SDA, Pin.IN, Pin.PULL_UP),
)
def print_pin_config():
print(
"PIN_CONFIG OLED SDA=%s SCL=%s buzzer=%s relay_IN=%s I2C=%sHz force_addr=%s"
% (
PIN_I2C_SDA,
PIN_I2C_SCL,
PIN_BUZZER,
PIN_RELAY,
I2C_FREQ,
OLED_I2C_ADDR_FORCE,
)
)
def open_i2c_for_oled():
"""
ENODEV on I2C = no ACK (wrong pins/bus/address). ESP32-S3 + Wokwi often need
SoftI2C on arbitrary GPIO; hardware I2C(0) may ignore SDA/SCL or use wrong pins.
"""
scl, sda = _i2c_gpio_pins()
if OLED_I2C_ADDR_FORCE is not None:
bus = SoftI2C(scl=scl, sda=sda, freq=I2C_FREQ)
_i2c_writeto(bus, OLED_I2C_ADDR_FORCE, b"\x80\xAE")
return bus, OLED_I2C_ADDR_FORCE, "SoftI2C(forced)"
variants = [
("SoftI2C", lambda: SoftI2C(scl=scl, sda=sda, freq=I2C_FREQ)),
("I2C(0)", lambda: I2C(0, scl=scl, sda=sda, freq=I2C_FREQ)),
("I2C(1)", lambda: I2C(1, scl=scl, sda=sda, freq=I2C_FREQ)),
]
last_err = None
for name, factory in variants:
try:
bus = factory()
found = bus.scan()
print(name, "scan:", [hex(a) for a in found])
# Scan often lists bogus addresses; only accept addr after writeto ACK.
addr = _probe_oled_ack(bus)
if addr is not None:
return bus, addr, name
last_err = OSError("%s: no SSD1306 (0x3C/0x3D) on bus" % name)
except OSError as e:
last_err = e
print(name, "fail:", e)
except Exception as e:
last_err = e
print(name, "fail:", e)
# Last resort: use configured address (probe with a command)
try:
bus = SoftI2C(scl=scl, sda=sda, freq=I2C_FREQ)
_i2c_writeto(bus, OLED_I2C_ADDR_FALLBACK, b"\x80\xAE")
return bus, OLED_I2C_ADDR_FALLBACK, "SoftI2C(fallback addr)"
except Exception as e:
last_err = e
raise OSError(
"OLED I2C ENODEV: no ACK on SDA=%s SCL=%s. Swap wires, check 3V3/GND, "
"or set PIN_I2C_SDA / PIN_I2C_SCL. Last error: %s"
% (PIN_I2C_SDA, PIN_I2C_SCL, last_err)
)
def make_display():
i2c, addr, bus_name = open_i2c_for_oled()
print("OLED using", bus_name, "addr", hex(addr))
return SSD1306_I2C(128, 64, i2c, addr=addr)
def draw_qr_matrix(disp, matrix):
"""
Draw QR for phone cameras: white/light background, black modules (ISO 18004),
integer cells from JSON (0/1 or true/false). Scales to max size in panel (see QR_PIXEL_MARGIN).
"""
rows = [list(r) for r in matrix]
h = len(rows)
if h == 0:
return
w = len(rows[0])
for ri in range(1, h):
if len(rows[ri]) != w:
print("qr warn: ragged row", ri, len(rows[ri]), "!=", w)
pw = getattr(disp, "width", 128)
ph = getattr(disp, "height", 64)
m = max(0, int(QR_PIXEL_MARGIN))
inner_w = pw - 2 * m
inner_h = ph - 2 * m
if inner_w <= 0 or inner_h <= 0:
m = 0
inner_w, inner_h = pw, ph
scale = min(inner_w // w, inner_h // h)
scale = max(1, scale)
pix_w = w * scale
pix_h = h * scale
ox = m + (inner_w - pix_w) // 2
oy = m + (inner_h - pix_h) // 2
if QR_BLACK_ON_WHITE:
disp.fill(1)
dark = 0
else:
disp.fill(0)
dark = 1
for y in range(h):
row = rows[y]
for x in range(w):
if int(row[x]) != 0:
disp.fill_rect(ox + x * scale, oy + y * scale, scale, scale, dark)
disp.show()
def relay_set(on: bool):
pin = Pin(PIN_RELAY, Pin.OUT)
if RELAY_ACTIVE_LOW:
pin.value(0 if on else 1)
else:
pin.value(1 if on else 0)
def buzzer_beep(duration_ms=BUZZER_MS, freq=BUZZER_FREQ_HZ):
pwm = PWM(Pin(PIN_BUZZER), freq=freq, duty_u16=32768)
time.sleep_ms(duration_ms)
pwm.deinit()
def on_payment_success():
"""
Turn machine output ON for a fixed window so relay output (and load LED)
stays active long enough to be clearly visible after a successful payment.
"""
relay_set(True)
buzzer_beep()
remaining_ms = PAYMENT_SUCCESS_ON_MS - BUZZER_MS
if remaining_ms > 0:
time.sleep_ms(remaining_ms)
relay_set(False)
def fetch_qr_session():
url = "%s/api/qr/%s/" % (API_BASE, DEVICE_ID)
data = http_get_json(url)
return data["uid"], data["qr"], int(data.get("expires_in_seconds", 300))
def poll_paid():
url = "%s/api/check/%s/" % (API_BASE, DEVICE_ID)
data = http_get_json(url)
return bool(data.get("paid")), data
def run_session(disp):
uid, qr, exp_s = fetch_qr_session()
print("session", uid, "exp_s", exp_s)
draw_qr_matrix(disp, qr)
deadline = time.ticks_add(time.ticks_ms(), (exp_s + 30) * 1000)
while True:
paid, info = poll_paid()
print("poll", paid, info)
if paid:
on_payment_success()
return
if time.ticks_diff(deadline, time.ticks_ms()) <= 0:
print("session window ended, refreshing QR")
return
time.sleep_ms(POLL_MS)
def main():
print_pin_config()
connect_wifi()
time.sleep_ms(50)
relay_set(False)
disp = make_display()
while True:
try:
run_session(disp)
except Exception as e:
print("error", e)
time.sleep(5)
main()