from machine import Pin, I2C, time_pulse_us, ADC
from ssd1306 import SSD1306_I2C
import dht
import time
import bluetooth
from ble_simple_peripheral import BLESimplePeripheral
from ble_simple_central import BLESimpleCentral
#import utime
count=0
#US Pin Declare
trigger = Pin(20, Pin.OUT)
echo = Pin(21, Pin.IN)
SOUND_SPEED=340
TRIG_PULSE_DURATION_US=10
#DHT22 Pin
data_pin = Pin(13, Pin.IN, Pin.PULL_UP)
sensor = dht.DHT22(data_pin)
#SSD1306 Display
WIDTH =128
HEIGHT= 64
#Create the I2C connection
i2c=I2C(0,scl=Pin(17),sda=Pin(16),freq=200000)
#Initialize the OLED display
oled = SSD1306_I2C(WIDTH,HEIGHT,i2c)
# Create a Bluetooth Low Energy (BLE) object
ble = bluetooth.BLE()
# Create an instance of the BLESimplePeripheral class with the BLE object
sp = BLESimplePeripheral(ble)
def ultra():
trigger.value(0)
time.sleep_us(5)
trigger.value(1)
time.sleep_us(TRIG_PULSE_DURATION_US)
trigger.value(0)
ultrason_duration = time_pulse_us(echo, 1, 30000)
distance_cm = SOUND_SPEED * ultrason_duration / 20000
print(f"Distance : {distance_cm} cm")
oled.fill_rect(0, 55, 128, 10, 0)
oled.text("{:.2f} cm".format(distance_cm),4,56)
oled.show()
if sp.is_connected():
sp.send("\nDistance: {} cm".format(distance_cm))
time.sleep_ms(200)
def read_DHT22sensor():
try:
sensor.measure()
temperature = sensor.temperature()
humidity = sensor.humidity()
print_dht22_data(temperature, humidity)
except OSError as e:
print("Error reading data from the sensor")
def print_dht22_data(temperature, humidity):
print("Temperature:{}, Humidity:{}".format(temperature, humidity))
oled.fill_rect(0, 0, 128, 55, 0)
oled.text("Temp.",4,8)
oled.text("Humid.",4,20)
oled.text(str(temperature),65,8)
oled.text(str(humidity),65,20)
oled.show()
if sp.is_connected():
sp.send("\nTemperature: {} c Humidity: {} %".format(temperature, humidity))
time.sleep_ms(1)
#----------------------------------CONNECTOR----------------
def demo():
ble = bluetooth.BLE()
central = BLESimpleCentral(ble)
not_found = False
def on_scan(addr_type, addr, name):
if addr_type is not None:
print("Found peripheral:", addr_type, addr, name)
central.connect()
else:
nonlocal not_found
not_found = True
print("No peripheral found.")
central.scan(callback=on_scan)
# Wait for connection...
while not central.is_connected():
time.sleep_ms(100)
if not_found:
return
print("Connected")
def on_rx(v):
print("RX", v)
central.on_notify(on_rx)
with_response = False
i = 0
while central.is_connected():
try:
v = str(i) + "_"
print("TX", v)
central.write(v, with_response)
except:
print("TX failed")
i += 1
time.sleep_ms(400 if with_response else 30)
print("Disconnected")
while True:
ultra()
count+=1
if count == 10:
read_DHT22sensor()
count=0
if __name__ == "__main__":
demo()
#----------------------------------XXXXXXXXX----------------