from machine import Pin, I2C, ADC
from umqtt.simple import MQTTClient
import network
import time
import json
import pinno as P
import config as C
# -------------------------
# WiFi
# -------------------------
def wifi_connect():
sta = network.WLAN(network.STA_IF)
sta.active(True)
if sta.isconnected():
return sta
sta.connect(C.SSID, C.SSID_PASS)
for _ in range(80):
if sta.isconnected():
print("WiFi connected:", sta.ifconfig())
return sta
time.sleep(0.25)
raise RuntimeError("WiFi connection failed")
# -------------------------
# OLED
# -------------------------
from ssd1306 import SSD1306_I2C
i2c = I2C(0, scl=Pin(P.SCL), sda=Pin(P.SDA), freq=400000)
addr_list = i2c.scan()
if not addr_list:
raise RuntimeError("OLED not found on I2C. Check SDA/SCL.")
OLED_ADDR = addr_list[0]
# 원본에서 WIDTH=64 HEIGHT=48 사용했으니 유지
oled = SSD1306_I2C(64, 48, i2c, addr=OLED_ADDR)
# -------------------------
# IO / Sensors
# -------------------------
pir_pin = Pin(P.PIR_IN, Pin.IN)
led_1_pin = Pin(P.LED_1_IN, Pin.OUT) # LED_1
led_1_pin.value(0)
gas = ADC(Pin(P.A0_IN))
gas.atten(ADC.ATTN_11DB)
ldr = ADC(Pin(P.LDR_IN))
ldr.atten(ADC.ATTN_11DB)
# -------------------------
# 상태 변수(원본 의미 유지)
# -------------------------
gas_value = 0
gas_threshold = 200
brightness = 0
TELEPERIOD = 5
# PIR edge detect
pir_prev = pir_pin.value()
# LED_1 state helpers (원본 Blink.set_on_off_str / on_off 흉내)
led_1_state = 0 # 0=off, 1=on
def led_1_set_on_off_str(msg: str):
global led_1_state
m = msg.strip().lower()
if m in ("1", "on", "true"):
led_1_state = 1
elif m in ("0", "off", "false"):
led_1_state = 0
led_1_pin.value(led_1_state)
def led_1_on_off():
return "on" if led_1_state else "off"
# -------------------------
# OLED 출력 (hangul 없이, 출력 구조 유지)
# -------------------------
def display_oled():
oled.fill(0)
oled.text("GAS/LDR", 0, 0)
oled.text(str(gas_value) + "/" + str(gas_threshold), 0, 20)
oled.text(str(brightness), 0, 32)
oled.show()
# -------------------------
# MQTT publish wrapper
# -------------------------
mqtt = None
def mqtt_publish(topic, payload, retain=False):
if isinstance(payload, (dict, list)):
payload = json.dumps(payload)
mqtt.publish(topic, payload, retain=retain)
def on_connect_more():
print("on_connect_more()...")
def mqtt_on_connect():
print("on_connect() called")
msg = {"TELEPERIOD": TELEPERIOD}
mqtt_publish(f"tele/{C.DEVICE}/INFO", msg, retain=True)
on_connect_more()
def pir_on():
mqtt_publish(f"tele/{C.DEVICE}/PIR", "on", retain=True)
print("PIR on")
def pir_off():
mqtt_publish(f"tele/{C.DEVICE}/PIR", "off", retain=True)
print("PIR off")
def callback_more(topic: str, msg: str):
global gas_threshold
if topic == f"cmnd/{C.DEVICE}/LED_1":
led_1_set_on_off_str(msg)
mqtt_publish(f"stat/{C.DEVICE}/LED_1", led_1_on_off())
elif topic == f"cmnd/{C.DEVICE}/THRESHOLD":
gas_threshold = int(msg)
print(f"new {{변수}}: {gas_threshold}")
mqtt_publish(f"stat/{C.DEVICE}/THRESHOLD", str(gas_threshold))
display_oled()
def mqtt_callback(in_topic, in_msg):
global TELEPERIOD
topic = in_topic.decode()
msg = in_msg.decode()
print(f"RCV> {topic}, |{msg}|")
if topic == f"cmnd/{C.DEVICE}/TELEPERIOD":
TELEPERIOD = int(msg)
tele_msg = {"TELEPERIOD": msg}
mqtt_publish(f"stat/{C.DEVICE}/TELEPERIOD", msg, retain=True)
mqtt_publish(f"tele/{C.DEVICE}/INFO", tele_msg, retain=True)
print(f"TELEPERIOD가 {msg}초로 변경되었습니다.")
else:
callback_more(topic, msg)
def mqtt_connect():
global mqtt
client = MQTTClient(
C.MQTT_CLIENT_ID,
C.MQTT_SERVER,
port=C.MQTT_PORT,
user=C.MQTT_USER,
password=C.MQTT_PASS,
keepalive=60,
)
client.set_callback(mqtt_callback)
client.connect()
mqtt = client
# 구독(원본 방식 동일)
client.subscribe(f"cmnd/{C.DEVICE}/TELEPERIOD")
client.subscribe(f"cmnd/{C.DEVICE}/LED_1")
client.subscribe(f"cmnd/{C.DEVICE}/THRESHOLD")
mqtt_on_connect()
# -------------------------
# 센서 읽기 (BH1750->LDR)
# + 원본 JSON 형태 유지
# -------------------------
def read_sensors_more():
global brightness, gas_value
brightness = ldr.read()
print(brightness)
gas_value = gas.read()
print(f"gas:{gas_value}")
display_oled()
data = {
"ldr": {"value": brightness},
"gas": {"value": gas_value},
}
return data
def send_data():
data = read_sensors_more()
mqtt_publish(f"tele/{C.DEVICE}/SENSOR", json.dumps(data))
# -------------------------
# 메인 루프 (Wokwi 스타일)
# -------------------------
def main():
global pir_prev
wifi_connect()
mqtt_connect()
last_oled = time.ticks_ms()
last_sensor = time.ticks_ms()
while True:
# MQTT 수신 처리
try:
mqtt.check_msg() # non-blocking
except Exception as e:
print("MQTT error:", e)
# PIR 변화 감지
pir_now = pir_pin.value()
if pir_now != pir_prev:
pir_prev = pir_now
if pir_now == 1:
pir_on()
else:
pir_off()
# OLED 3초마다 갱신
now = time.ticks_ms()
if time.ticks_diff(now, last_oled) >= 3000:
last_oled = now
display_oled()
# 센서 TELEPERIOD마다 전송
if time.ticks_diff(now, last_sensor) >= TELEPERIOD * 1000:
last_sensor = now
send_data()
time.sleep_ms(10)
if __name__ == "__main__":
main()
Loading
esp32-c3-devkitm-1
esp32-c3-devkitm-1
Loading
ssd1306
ssd1306