# DHT11/DHT22 driver for MicroPython on ESP8266
# MIT license; Copyright (c) 2016 Damien P. George
from machine import Pin, Timer, I2C, RTC
import time
import sys
# MicroPython SSD1306 OLED driver, I2C and SPI interfaces
from micropython import const
import framebuf
import network
import ntptime
import umqtt.simple as mqtt
import json
class WIFI(object):
def __init__(self):
self.sta_if = network.WLAN(network.STA_IF)
def wifi_connect(self):
print("Connecting to WiFi", end="")
self.sta_if.active(True)
self.sta_if.connect('Wokwi-GUEST', '')
while not self.sta_if.isconnected():
print(".", end="")
time.sleep(0.1)
print(" Connected!")
class RTC_BeiJing(object):
def __init__(self):
self.NTP_DELTA_BeiJing = 3155644800
ntptime.NTP_DELTA = self.NTP_DELTA_BeiJing
ntptime.settime()
self.rtc = RTC()
def beijingtime_get(self):
return self.rtc.datetime()
class MQTT(object):
def __init__(self):
self.mqtt_config = {
"client_id": "Zgs|securemode=2,signmethod=hmacsha1,timestamp=1668006502751|",
"server": "iot-06z00h5q2mn7vx4.mqtt.iothub.aliyuncs.com",
"port": 1883,
"user": "Zgs617_DHT22_test3&hw9hKgbM5Sd",
"password": "55E9B13214915D2F1D0D3D2C62F38E2C03EBECBC",
"keepalive": 60
}
self.mqtt_client = mqtt.MQTTClient(
self.mqtt_config["client_id"],
self.mqtt_config["server"],
self.mqtt_config["port"],
self.mqtt_config["user"],
self.mqtt_config["password"],
self.mqtt_config["keepalive"],
)
self.mqtt_client.set_callback(self.mqtt_callback)
self.topic_publish = "/sys/hw9hKgbM5Sd/Zgs617_DHT22_test3/thing/event/property/post"
self.last_msg = 'disconnect'
self.mqtt_client.set_last_will(self.topic_publish, self.last_msg.encode())
self.mqtt_client.connect()
self.msg_publish_id = 0
self.topic_subscribe = "/hw9hKgbM5Sd/Zgs617_DHT22_test3/user/get"
self.mqtt_client.subscribe(self.topic_subscribe)
self.condition = 1
def mqtt_callback(self, topic, msg):
if msg.decode() == "stop":
self.condition = 0
if msg.decode() == "start":
self.condition = 1
def mqtt_publish(self, CurrentHumidity, CurrentTemperature):
self.msg_publish_id += 1
msg_publsih = {
"id": str(self.msg_publish_id),
"version": "1.0",
"sys": {
"ack": 0
},
"params": {
"CurrentHumidity": {
"value": CurrentHumidity
},
"CurrentTemperature": {
"value": CurrentTemperature
}
},
"method": "thing.event.property.post"
}
self.mqtt_client.publish(self.topic_publish, json.dumps(msg_publsih))
# register definitions
SET_CONTRAST = const(0x81)
SET_ENTIRE_ON = const(0xA4)
SET_NORM_INV = const(0xA6)
SET_DISP = const(0xAE)
SET_MEM_ADDR = const(0x20)
SET_COL_ADDR = const(0x21)
SET_PAGE_ADDR = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP = const(0xA0)
SET_MUX_RATIO = const(0xA8)
SET_IREF_SELECT = const(0xAD)
SET_COM_OUT_DIR = const(0xC0)
SET_DISP_OFFSET = const(0xD3)
SET_COM_PIN_CFG = const(0xDA)
SET_DISP_CLK_DIV = const(0xD5)
SET_PRECHARGE = const(0xD9)
SET_VCOM_DESEL = const(0xDB)
SET_CHARGE_PUMP = const(0x8D)
# Subclassing FrameBuffer provides support for graphics primitives
# http://docs.micropython.org/en/latest/pyboard/library/framebuf.html
class SSD1306(framebuf.FrameBuffer):
def __init__(self, width, height, external_vcc):
self.width = width
self.height = height
self.external_vcc = external_vcc
self.pages = self.height // 8
self.buffer = bytearray(self.pages * self.width)
super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB)
self.init_display()
def init_display(self):
for cmd in (
SET_DISP, # display off
# address setting
SET_MEM_ADDR,
0x00, # horizontal
# resolution and layout
SET_DISP_START_LINE, # start at line 0
SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0
SET_MUX_RATIO,
self.height - 1,
SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0
SET_DISP_OFFSET,
0x00,
SET_COM_PIN_CFG,
0x02 if self.width > 2 * self.height else 0x12,
# timing and driving scheme
SET_DISP_CLK_DIV,
0x80,
SET_PRECHARGE,
0x22 if self.external_vcc else 0xF1,
SET_VCOM_DESEL,
0x30, # 0.83*Vcc
# display
SET_CONTRAST,
0xFF, # maximum
SET_ENTIRE_ON, # output follows RAM contents
SET_NORM_INV, # not inverted
SET_IREF_SELECT,
0x30, # enable internal IREF during display on
# charge pump
SET_CHARGE_PUMP,
0x10 if self.external_vcc else 0x14,
SET_DISP | 0x01, # display on
): # on
self.write_cmd(cmd)
self.fill(0)
self.show()
def poweroff(self):
self.write_cmd(SET_DISP)
def poweron(self):
self.write_cmd(SET_DISP | 0x01)
def contrast(self, contrast):
self.write_cmd(SET_CONTRAST)
self.write_cmd(contrast)
def invert(self, invert):
self.write_cmd(SET_NORM_INV | (invert & 1))
def rotate(self, rotate):
self.write_cmd(SET_COM_OUT_DIR | ((rotate & 1) << 3))
self.write_cmd(SET_SEG_REMAP | (rotate & 1))
def show(self):
x0 = 0
x1 = self.width - 1
if self.width != 128:
# narrow displays use centred columns
col_offset = (128 - self.width) // 2
x0 += col_offset
x1 += col_offset
self.write_cmd(SET_COL_ADDR)
self.write_cmd(x0)
self.write_cmd(x1)
self.write_cmd(SET_PAGE_ADDR)
self.write_cmd(0)
self.write_cmd(self.pages - 1)
self.write_data(self.buffer)
class SSD1306_I2C(SSD1306):
def __init__(self, width, height, i2c, addr=0x3C, external_vcc=False):
self.i2c = i2c
self.addr = addr
self.temp = bytearray(2)
self.write_list = [b"\x40", None] # Co=0, D/C#=1
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.temp[0] = 0x80 # Co=1, D/C#=0
self.temp[1] = cmd
self.i2c.writeto(self.addr, self.temp)
def write_data(self, buf):
self.write_list[1] = buf
self.i2c.writevto(self.addr, self.write_list)
class SSD1306_SPI(SSD1306):
def __init__(self, width, height, spi, dc, res, cs, external_vcc=False):
self.rate = 10 * 1024 * 1024
dc.init(dc.OUT, value=0)
res.init(res.OUT, value=0)
cs.init(cs.OUT, value=1)
self.spi = spi
self.dc = dc
self.res = res
self.cs = cs
import time
self.res(1)
time.sleep_ms(1)
self.res(0)
time.sleep_ms(10)
self.res(1)
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs(1)
self.dc(0)
self.cs(0)
self.spi.write(bytearray([cmd]))
self.cs(1)
def write_data(self, buf):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs(1)
self.dc(1)
self.cs(0)
self.spi.write(buf)
self.cs(1)
if sys.platform.startswith("esp"):
from esp import dht_readinto
elif sys.platform == "mimxrt":
from mimxrt import dht_readinto
elif sys.platform == "rp2":
from rp2 import dht_readinto
else:
from pyb import dht_readinto
class DHTBase:
def __init__(self, pin):
self.pin = pin
self.buf = bytearray(5)
def measure(self):
buf = self.buf
dht_readinto(self.pin, buf)
if (buf[0] + buf[1] + buf[2] + buf[3]) & 0xFF != buf[4]:
raise Exception("checksum error")
class DHT11(DHTBase):
def humidity(self):
return self.buf[0]
def temperature(self):
return self.buf[2]
class DHT22(DHTBase):
def humidity(self):
return (self.buf[0] << 8 | self.buf[1]) * 0.1
def temperature(self):
t = ((self.buf[2] & 0x7F) << 8 | self.buf[3]) * 0.1
if self.buf[2] & 0x80:
t = -t
return t
class DHT22_Umqtt_RTC_BeiJing_Timer_Display(object):
def __init__(self):
WIFI().wifi_connect()
self.mqtt_dht22 = MQTT()
self.rtc_beijing = RTC_BeiJing()
self.i2c = I2C(0)
self.display = SSD1306_I2C(128, 64, self.i2c)
self.dht22_pin = Pin(15)
self.dht22_define = DHT22(self.dht22_pin)
time.sleep(1)
def rtc_time_handle(self):
current_time = self.rtc_beijing.beijingtime_get()
week = ['Mon', 'Tues', 'Wed', 'Thur', 'Fri', 'Sat', 'Sun']
time_list = [''] * 5
u = 1
for i in range(1, 7):
if i == 3:
u = 2
continue
if current_time[i] < 10:
time_list[i - u] = '0' + str(current_time[i])
else:
time_list[i - u] = str(current_time[i])
time_display_first = str(current_time[0]) + '-' + time_list[0] + '-' + time_list[1] + ' ' + week[
current_time[3]]
time_display_second = str(time_list[2] + ':' + time_list[3] + ':' + time_list[4])
return time_display_first, time_display_second
def dht22_rtc_beijing_timer_display(self, t):
self.mqtt_dht22.mqtt_client.check_msg()
if not self.mqtt_dht22.condition:
self.display.fill(0)
self.display.text("stop", 0, 0)
self.display.show()
while True:
self.mqtt_dht22.mqtt_client.check_msg()
if self.mqtt_dht22.condition:
break
time.sleep(1)
self.dht22_define.measure()
CurrentHumidity, CurrentTemperature = self.dht22_define.humidity(), self.dht22_define.temperature()
self.mqtt_dht22.mqtt_publish(CurrentHumidity, CurrentTemperature)
self.display.fill(0)
self.display.text("test", 0, 0)
time_display_first, time_display_second = self.rtc_time_handle()
self.display.text(time_display_first, 0, 10)
self.display.text(time_display_second, 0, 20)
self.display.text('hum' + str(CurrentHumidity) + "%", 0, 30)
self.display.text('tpm' + str(CurrentTemperature) + "C", 0, 40)
self.display.show()
class Timer_Start(object):
def __init__(self):
self.dht22_timer = DHT22_Umqtt_RTC_BeiJing_Timer_Display()
self.tim0 = Timer(0)
self.tim0.init(period=2000, callback=self.dht22_timer.dht22_rtc_beijing_timer_display)
if __name__ == '__main__':
Timer_Start()