from micropython import const
import framebuf
import network
import time
from umqtt.simple import MQTTClient
import dht
from machine import Pin, SoftI2C
import ntptime
import ujson
SET_CONTRAST = const(0x81)
SET_ENTIRE_ON = const(0xA4)
SET_NORM_INV = const(0xA6)
SET_DISP = const(0xAE)
SET_MEM_ADDR = const(0x20)
SET_COL_ADDR = const(0x21)
SET_PAGE_ADDR = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP = const(0xA0)
SET_MUX_RATIO = const(0xA8)
SET_COM_OUT_DIR = const(0xC0)
SET_DISP_OFFSET = const(0xD3)
SET_COM_PIN_CFG = const(0xDA)
SET_DISP_CLK_DIV = const(0xD5)
SET_PRECHARGE = const(0xD9)
SET_VCOM_DESEL = const(0xDB)
SET_CHARGE_PUMP = const(0x8D)
class SSD1306(framebuf.FrameBuffer):
def __init__(self, width, height, external_vcc):
self.width = width
self.height = height
self.external_vcc = external_vcc
self.pages = self.height // 8
self.buffer = bytearray(self.pages * self.width)
super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB)
self.init_display()
def init_display(self):
for cmd in (
SET_DISP, SET_MEM_ADDR, 0x00, SET_DISP_START_LINE,
SET_SEG_REMAP | 0x01, SET_MUX_RATIO, self.height - 1,
SET_COM_OUT_DIR | 0x08, SET_DISP_OFFSET, 0x00,
SET_COM_PIN_CFG, 0x02 if self.width > 2 * self.height else 0x12,
SET_DISP_CLK_DIV, 0x80, SET_PRECHARGE, 0xF1,
SET_VCOM_DESEL, 0x30, SET_CONTRAST, 0xFF,
SET_ENTIRE_ON, SET_NORM_INV, SET_CHARGE_PUMP, 0x14,
SET_DISP | 0x01,
):
self.write_cmd(cmd)
self.fill(0)
self.show()
def poweroff(self):
self.write_cmd(SET_DISP)
def poweron(self):
self.write_cmd(SET_DISP | 0x01)
def contrast(self, contrast):
self.write_cmd(SET_CONTRAST)
self.write_cmd(contrast)
def invert(self, invert):
self.write_cmd(SET_NORM_INV | (invert & 1))
def rotate(self, rotate):
self.write_cmd(SET_COM_OUT_DIR | ((rotate & 1) << 3))
self.write_cmd(SET_SEG_REMAP | (rotate & 1))
def show(self):
self.write_cmd(SET_COL_ADDR)
self.write_cmd(0)
self.write_cmd(self.width - 1)
self.write_cmd(SET_PAGE_ADDR)
self.write_cmd(0)
self.write_cmd(self.pages - 1)
self.write_data(self.buffer)
class SSD1306_I2C(SSD1306):
def __init__(self, width, height, i2c, addr=0x3C, external_vcc=False):
self.i2c = i2c
self.addr = addr
self.temp = bytearray(2)
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.temp[0] = 0x80
self.temp[1] = cmd
self.i2c.writeto(self.addr, self.temp)
def write_data(self, buf):
self.i2c.writeto(self.addr, b'\x40' + buf)
MQTT_CLIENT_ID = "progetto-iot-rile-temp-hum"
MQTT_BROKER_SERVER_URL = "broker.mqttdashboard.com"
MQTT_TOPIC_PUBLISHER_NORMAL = "sensors_to_server"
MQTT_TOPIC_PUBLISHER_EMERGENCY = "sensors_to_server_emergency"
MQTT_TOPIC_SUBSCRIBER = "server_to_sensor"
sensorPins = [13, 12]
sensors = [dht.DHT22(Pin(pin)) for pin in sensorPins]
ledPins = [[26, 25], [14, 27]]
leds = [[Pin(pin, Pin.OUT) for pin in pins] for pins in ledPins]
i2c = SoftI2C(sda=Pin(22), scl=Pin(23))
display = SSD1306_I2C(128, 64, i2c)
print("Connecting to WiFi", end="")
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('Wokwi-GUEST', '')
while not sta_if.isconnected():
print(".", end="")
time.sleep(0.1)
print(" Connected!")
print("Connessione al server MQTT in corso...", end="")
MQTTclient = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER_SERVER_URL)
MQTTclient.connect()
print(" Connesso.")
sensor_limits = {i: {"temp": 25, "hum": 15} for i in range(len(sensorPins))}
def onMessage(topic, msg):
try:
messages = ujson.loads(msg.decode())
for message in messages:
updateLimit(message)
except Exception as e:
print("Errore nel parsing del messaggio:", e)
def updateLimit(message):
sensor_id = message.get("sensor")
if sensor_id in sensor_limits:
sensor_limits[sensor_id]["temp"] = message.get("temp", sensor_limits[sensor_id]["temp"])
sensor_limits[sensor_id]["hum"] = message.get("hum", sensor_limits[sensor_id]["hum"])
MQTTclient.set_callback(onMessage)
MQTTclient.subscribe(MQTT_TOPIC_SUBSCRIBER)
ntptime.settime()
while True:
MQTTclient.check_msg()
display.fill(0)
display.hline(0, 30, 128, 1)
for i, sensor in enumerate(sensors):
sensor.measure()
sensorMessage = {
"sensor": i,
"temp": sensor.temperature(),
"hum": sensor.humidity(),
"timestamp": time.gmtime(time.time())
}
message = ujson.dumps(sensorMessage)
MQTTclient.publish(MQTT_TOPIC_PUBLISHER_NORMAL, message)
y_position = 35 * i
display.text(f'Sensor: {i}', 0, y_position, 1)
display.text(f'Temp: {sensorMessage["temp"]}', 0, y_position + 10, 1)
display.text(f'Hum: {sensorMessage["hum"]}', 0, y_position + 20, 1)
for limit in ["temp", "hum"]:
if sensorMessage[limit] > sensor_limits[i][limit]:
leds[i][0 if limit == "temp" else 1].on()
else:
leds[i][0 if limit == "temp" else 1].off()
display.show()
time.sleep(1.5)