# main.py — ESP32 Smart Parking (4 Slots) + Blynk IoT (Push + Dashboard)
# ตรงกับผัง: TRIG=[5,18,19,23], ECHO=[32,33,34,35], LED_G=[2,12,14,27], LED_R=[4,13,26,25]
from machine import Pin, time_pulse_us
import time, network
# ===================== USER CONFIG =====================
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASS = ""
THRESH_CM = 100.0 # ระยะ < THRESH → "มีรถ"
STABLE_N = 3 # จำนวนครั้งยืนยันสถานะ
READ_INTERVAL_MS = 400 # รอบใหญ่
BETWEEN_SENSOR_MS = 60 # เว้นระหว่างอ่านแต่ละตัว
BLYNK_TOKEN = "dO2XWKU2jpSzxG3QiS0SrBRl3HI0J-6B"
# สำหรับ Wokwi (จำลอง) แนะนำ HTTP region โดยตรง (เสถียรกว่า):
BLYNK_API = "http://sgp1.blynk.cloud/external/api"
# ถ้าเป็นบอร์ดจริง ให้ใช้ HTTPS:
# BLYNK_API = "https://blynk.cloud/external/api"
EVENT_CODE = "slot_change"
EVENT_GAP_MS = 800 # กันยิง event ถี่เกินไป
BATCH_PUSH_GAP_MS = 20000 # ส่งค่า V0–V5 แบบ batch ทุก ~2 วินาที
# =======================================================
# ---------- Wi-Fi ----------
def wifi_connect():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
wlan.connect(WIFI_SSID, WIFI_PASS)
print("Connecting Wi-Fi", end="")
t0 = time.ticks_ms()
while not wlan.isconnected() and time.ticks_diff(time.ticks_ms(), t0) < 15000:
print(".", end=""); time.sleep(0.25)
print("\n✅ WiFi:", wlan.ifconfig())
# ---------- HTTP helpers (retry + ปิด connection เร็ว) ----------
def _http_get(url, tries=3, delay_ms=250):
import urequests
last_err = None
for _ in range(tries):
try:
r = urequests.get(url)
sc = r.status_code
txt = r.text
r.close()
return sc, txt
except Exception as e:
last_err = e
time.sleep_ms(delay_ms)
raise last_err
def urlencode_utf8(s: str) -> str:
# percent-encode UTF-8 ให้รองรับภาษาไทย
if not isinstance(s, str):
s = str(s)
b = s.encode("utf-8")
SAFE = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~"
out = []
for c in b:
if c in SAFE:
out.append(chr(c))
else:
out.append("%{:02X}".format(c))
return "".join(out)
def blynk_log_event(code, description=""):
try:
url = f"{BLYNK_API}/logEvent?token={BLYNK_TOKEN}&code={code}"
if description:
url += f"&description={urlencode_utf8(description)}"
sc, txt = _http_get(url, tries=3, delay_ms=250)
print("logEvent:", sc, txt)
return sc == 200
except Exception as e:
print("logEvent error:", e)
return False
def blynk_update_v(pin, value):
try:
if isinstance(value, str):
value = urlencode_utf8(value)
url = f"{BLYNK_API}/update?token={BLYNK_TOKEN}&V{pin}={value}"
sc, _ = _http_get(url, tries=2, delay_ms=200)
return sc == 200
except Exception as e:
print(f"update V{pin} error:", e)
return False
def blynk_batch_update(map_v):
try:
parts = []
for k, v in map_v.items():
if isinstance(v, str):
v = urlencode_utf8(v)
parts.append(f"V{k}={v}")
url = f"{BLYNK_API}/batch/update?token={BLYNK_TOKEN}&" + "&".join(parts)
sc, txt = _http_get(url, tries=2, delay_ms=200)
# print("batch:", sc, txt) # debug ถ้าต้องการ
return sc == 200
except Exception as e:
print("batch update error:", e)
return False
def blynk_get_v(pin, default=1):
try:
url = f"{BLYNK_API}/get?token={BLYNK_TOKEN}&V{pin}"
sc, txt = _http_get(url, tries=2, delay_ms=200)
if sc != 200 or not txt:
print(f"get V{pin} -> HTTP {sc}, txt='{txt}'")
return default
raw = txt.strip()
# ปกติ Blynk คืน ["0"] หรือ ["1"]
if raw.startswith('["') and raw.endswith('"]'):
raw = raw[2:-2]
val = int(float(raw))
print(f"get V{pin} -> {val}") # debug ดูใน Serial
return val
except Exception as e:
print(f"get V{pin} error:", e)
return default
# ---------- GPIO mapping ----------
TRIG_PINS = [5, 18, 19, 23]
ECHO_PINS = [32, 33, 34, 35] # input-only เหมาะเป็น ECHO
LED_GREEN_PINS = [2, 12, 14, 27] # ว่าง
LED_RED_PINS = [4, 13, 26, 25] # มีรถ
trigs = [Pin(p, Pin.OUT, value=0) for p in TRIG_PINS]
echos = [Pin(p, Pin.IN) for p in ECHO_PINS]
led_g = [Pin(p, Pin.OUT, value=0) for p in LED_GREEN_PINS]
led_r = [Pin(p, Pin.OUT, value=0) for p in LED_RED_PINS]
# ---------- HC-SR04 ----------
def raw_cm(i):
t = trigs[i]; e = echos[i]
t.value(0); time.sleep_us(2)
t.value(1); time.sleep_us(10)
t.value(0)
dur = time_pulse_us(e, 1, 30000) # us
if dur <= 0:
return 9999.0
return (dur * 0.0343) / 2.0
def median3(a,b,c):
if a>b: a,b=b,a
if b>c: b,c=c,b
if a>b: a,b=b,a
return b
def read_cm(i):
d1 = raw_cm(i); time.sleep_ms(5)
d2 = raw_cm(i); time.sleep_ms(5)
d3 = raw_cm(i)
return median3(d1,d2,d3)
def set_led(i, occupied):
# True → แดง, False → เขียว
led_r[i].value(1 if occupied else 0)
led_g[i].value(0 if occupied else 1)
def pretty_state(state):
return "[" + " ".join(("X" if s else "O") for s in state) + "] (X=CAR,O=FREE)"
# ---------- Main ----------
def main():
wifi_connect()
# แจ้งเตือนครั้งแรกเพื่อยืนยันเส้นทาง
blynk_log_event(EVENT_CODE, "⚙️ ระบบเริ่มทำงานแล้ว")
occupied = [False]*4
last_state = [None]*4
stable_cnt = [0]*4
for i in range(4):
set_led(i, False) # เริ่ม “ว่าง” ทุกช่อง
last_loop = time.ticks_ms()
last_batch_push = time.ticks_ms()
last_ctrl_pull = time.ticks_ms()
last_event_ms = 0
prev_free_count = 4
first_summary = False
# อ่านสวิตช์คุมระบบจากแอป (V20)
system_enabled = blynk_get_v(20, default=1)
print("Slots=4 | THRESH={:.1f}cm | STABLE_N={} | INTERVAL={}ms".format(
THRESH_CM, STABLE_N, READ_INTERVAL_MS))
while True:
now = time.ticks_ms()
if time.ticks_diff(now, last_loop) < READ_INTERVAL_MS:
time.sleep_ms(5)
continue
last_loop = now
# ดึงสถานะสวิตช์ทุก ~2 วินาที
if time.ticks_diff(now, last_ctrl_pull) >= 2000:
system_enabled = blynk_get_v(20, default=1)
print("system_enabled =", system_enabled) # debug
last_ctrl_pull = now
if not system_enabled:
# OFF → แสดงว่างทั้งหมด + ขึ้นข้อความ System OFF
for i in range(4):
set_led(i, False)
occupied[i] = False
stable_cnt[i] = 0
if last_state[i] is None:
last_state[i] = False
if time.ticks_diff(now, last_batch_push) >= BATCH_PUSH_GAP_MS:
blynk_batch_update({0:0,1:0,2:0,3:0,4:4,5:"System OFF"})
last_batch_push = now
print("System OFF | State:", pretty_state(occupied), "| FREE = 4")
time.sleep_ms(BETWEEN_SENSOR_MS)
continue
# -------- System ON: อ่านเซนเซอร์ --------
free_slots = []
for i in range(4):
d = read_cm(i)
cur = (d < THRESH_CM) # True=มีรถ
if not cur:
free_slots.append(str(i+1))
if last_state[i] is None:
last_state[i] = cur
occupied[i] = cur
set_led(i, cur)
else:
if cur != occupied[i]:
stable_cnt[i] += 1
if stable_cnt[i] >= STABLE_N:
occupied[i] = cur
set_led(i, cur)
stable_cnt[i] = 0
# กันยิงถี่ด้วย EVENT_GAP_MS
if time.ticks_diff(now, last_event_ms) >= EVENT_GAP_MS:
msg = "ช่อง {}: {}".format(i+1, ("มีรถเข้าจอด" if cur else "ว่างแล้ว"))
blynk_log_event(EVENT_CODE, msg)
last_event_ms = now
# อัปเดตรายช่องทันที
blynk_update_v(i, 1 if cur else 0)
else:
stable_cnt[i] = 0
time.sleep_ms(BETWEEN_SENSOR_MS)
free_count = len(free_slots)
summary = ("ว่าง {}/4 ช่อง ({})".format(free_count, ", ".join(free_slots))
if free_count > 0 else "เต็มทั้งหมด")
# แจ้งเหตุการณ์ตอนเปลี่ยนผ่าน 0
if free_count == 0 and prev_free_count > 0:
blynk_log_event(EVENT_CODE, "ลานจอดเต็มทั้งหมด")
elif free_count > 0 and prev_free_count == 0:
blynk_log_event(EVENT_CODE, "มีที่ว่างแล้ว {} ช่อง".format(free_count))
prev_free_count = free_count
# ส่งชุดแรกหลังบูต
if not first_summary:
blynk_batch_update({
0:1 if occupied[0] else 0,
1:1 if occupied[1] else 0,
2:1 if occupied[2] else 0,
3:1 if occupied[3] else 0,
4:free_count,
5:summary
})
first_summary = True
# ส่งแบบ batch เป็นช่วง ๆ
if time.ticks_diff(now, last_batch_push) >= BATCH_PUSH_GAP_MS:
blynk_batch_update({
0:1 if occupied[0] else 0,
1:1 if occupied[1] else 0,
2:1 if occupied[2] else 0,
3:1 if occupied[3] else 0,
4:free_count,
5:summary
})
last_batch_push = now
print("System ON |", summary)
if __name__ == "__main__":
main()