from machine import Pin, I2C, ADC
from umqtt.simple import MQTTClient
import json
import time
import network
import pinno as P
import config as C
# ====== (선택) 한글 ======
USE_HANGUL = False
try:
from hangul import draw_hangul
USE_HANGUL = True
except Exception:
USE_HANGUL = False
# ====== WiFi ======
def wifi_connect(ssid, password):
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
wlan.connect(ssid, password)
t0 = time.ticks_ms()
while not wlan.isconnected():
if time.ticks_diff(time.ticks_ms(), t0) > 15000:
raise RuntimeError("WiFi connect timeout")
time.sleep_ms(200)
print("WiFi connected:", wlan.ifconfig())
return wlan
# ====== Blink ======
class Blink:
def __init__(self, pin_instance: Pin, inverted=False):
self.pin = pin_instance
self.inverted = inverted
self._state = False
self._blink = False
self._on_ms = 300
self._off_ms = 200
self._next = time.ticks_ms()
self.off()
def _write(self, v: int):
if self.inverted:
v = 0 if v else 1
self.pin.value(v)
def on(self):
self._state = True
self._write(1)
def off(self):
self._state = False
self._write(0)
def on_off(self):
return "ON" if self._state else "OFF"
def set_on_off_str(self, msg: str):
m = msg.strip().lower()
if m in ("1", "on", "true", "high"):
self.on()
else:
self.off()
def begin_blink(self, on=300, off=200):
self._on_ms = int(on)
self._off_ms = int(off)
self._blink = True
self._next = time.ticks_ms()
def end_blink(self):
self._blink = False
self.off()
def run(self):
if not self._blink:
return
now = time.ticks_ms()
if time.ticks_diff(now, self._next) >= 0:
if self._state:
self.off()
self._next = time.ticks_add(now, self._off_ms)
else:
self.on()
self._next = time.ticks_add(now, self._on_ms)
# ====== PIR BinarySensor ======
class BinarySensor:
def __init__(self, pin_instance: Pin, inverted=False, callback_on=None, callback_off=None):
self.pin = pin_instance
self.inverted = inverted
self.callback_on = callback_on
self.callback_off = callback_off
self._last = self.value()
self.pin.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=self._irq)
def value(self):
v = self.pin.value()
return (0 if v else 1) if self.inverted else v
def _irq(self, _pin):
v = self.value()
if v == self._last:
return
self._last = v
if v and self.callback_on:
self.callback_on()
elif (not v) and self.callback_off:
self.callback_off()
# ====== Parse 호환 ======
class Parse:
def __init__(self, data):
try:
self.value = data["light"]["value"]
except Exception:
self.value = 0
# ====== 전역 상태 ======
gas_value = 0
gas_threshold = 200
brightness = 0
TELEPERIOD = 5
# ====== I2C / OLED ======
i2c = I2C(0, scl=Pin(P.SCL), sda=Pin(P.SDA))
from ssd1306 import SSD1306_I2C
WIDTH = 64
HEIGHT = 48
oled = SSD1306_I2C(WIDTH, HEIGHT, i2c)
def display_oled():
oled.fill(0)
if USE_HANGUL and draw_hangul:
draw_hangul(oled, '가스조도', 0, 0)
else:
oled.text("GAS/LUX", 0, 0)
oled.text(str(gas_value) + '/' + str(gas_threshold), 0, 20)
oled.text(str(brightness), 0, 32)
oled.show()
# ====== LDR(ADC) ======
ldr = ADC(Pin(P.LDR_IN))
ldr.atten(ADC.ATTN_11DB)
def read_ldr_as_light_dict():
raw = ldr.read()
return {"light": {"value": raw}}
# ====== MQ-2(ADC) ======
mq2 = ADC(Pin(P.A0_IN))
mq2.atten(ADC.ATTN_11DB)
# ====== LED / BUZZER ======
led_1 = Blink(pin_instance=Pin(P.LED_1_IN, Pin.OUT), inverted=False)
bz = Blink(pin_instance=Pin(P.BUZZER_IN, Pin.OUT), inverted=False)
# ====== MQTT는 아래에서 만든 뒤 PIR 콜백이 publish 하도록 ======
mqtt = None
# ====== PIR ======
def pir_on():
mqtt.publish(f'tele/{C.DEVICE}/PIR', 'on', retain=True)
print('PIR on')
def pir_off():
mqtt.publish(f'tele/{C.DEVICE}/PIR', 'off', retain=True)
print('PIR off')
pir = BinarySensor(pin_instance=Pin(P.PIR_IN, Pin.IN), inverted=False, callback_on=pir_on, callback_off=pir_off)
# ====== MQTT 로직 (출력/토픽 그대로) ======
def on_connect_more():
print('on_connect_more()...')
def callback_more(topic: str, msg: str):
global gas_threshold
if topic == f'cmnd/{C.DEVICE}/LED_1':
led_1.set_on_off_str(msg)
mqtt.publish(f'stat/{C.DEVICE}/LED_1', led_1.on_off())
elif topic == f'cmnd/{C.DEVICE}/THRESHOLD':
gas_threshold = int(msg)
print(f'new {{변수}}: {gas_threshold}')
mqtt.publish(f'stat/{C.DEVICE}/THRESHOLD', str(gas_threshold))
display_oled()
def read_sensors_more():
global brightness, gas_value
data1 = read_ldr_as_light_dict()
print(data1)
p = Parse(data1)
brightness = p.value
gas_value = mq2.read()
data2 = {"gas": {"value": gas_value}}
print(f'gas:{gas_value}')
if gas_value > gas_threshold:
bz.begin_blink(on=300, off=200)
else:
bz.end_blink()
display_oled()
data1.update(data2)
return data1
def mqtt_on_connect():
print('on_connect() called')
msg = {'TELEPERIOD': TELEPERIOD}
mqtt.publish(f'tele/{C.DEVICE}/INFO', json.dumps(msg), retain=True)
on_connect_more()
def mqtt_callback(in_topic, in_msg):
global TELEPERIOD, _next_sensor_send
topic = in_topic.decode()
msg = in_msg.decode()
print(f'RCV> {topic}, |{msg}|')
if topic == f'cmnd/{C.DEVICE}/TELEPERIOD':
TELEPERIOD = int(msg)
_next_sensor_send = time.ticks_add(time.ticks_ms(), TELEPERIOD * 1000)
tele_msg = {'TELEPERIOD': msg}
mqtt.publish(f'stat/{C.DEVICE}/TELEPERIOD', msg, retain=True)
mqtt.publish(f'tele/{C.DEVICE}/INFO', tele_msg, retain=True)
print(f'TELEPERIOD가 {msg}초로 변경되었습니다.')
else:
callback_more(topic, msg)
def send_data():
data = read_sensors_more()
mqtt.publish(f'tele/{C.DEVICE}/SENSOR', json.dumps(data))
def mqtt_connect():
global mqtt
client = MQTTClient(
C.MQTT_CLIENT_ID,
C.MQTT_SERVER,
port=C.MQTT_PORT,
user=C.MQTT_USER,
password=C.MQTT_PASS,
keepalive=60
)
mqtt = client
mqtt.set_callback(mqtt_callback)
mqtt.connect()
mqtt_on_connect()
mqtt.subscribe(f'cmnd/{C.DEVICE}/#')
# ====== loop ======
_next_oled_refresh = time.ticks_add(time.ticks_ms(), 3 * 1000)
_next_sensor_send = time.ticks_add(time.ticks_ms(), TELEPERIOD * 1000)
def main():
wifi_connect(C.SSID, C.SSID_PASS)
mqtt_connect()
global _next_oled_refresh, _next_sensor_send
while True:
# MQTT 수신 처리
try:
mqtt.check_msg()
except Exception:
try:
mqtt.connect()
mqtt_on_connect()
mqtt.subscribe(f'cmnd/{C.DEVICE}/#')
except:
pass
led_1.run()
bz.run()
now = time.ticks_ms()
if time.ticks_diff(now, _next_oled_refresh) >= 0:
display_oled()
_next_oled_refresh = time.ticks_add(now, 3 * 1000)
if time.ticks_diff(now, _next_sensor_send) >= 0:
send_data()
_next_sensor_send = time.ticks_add(now, TELEPERIOD * 1000)
time.sleep_ms(10)
if __name__ == '__main__':
main()