import machine
from machine import Pin, I2C, PWM
import ssd1306
import dht
import network
import time
import ujson
import MQTTClient
prev_temp_reading = None
prev_humidity_reading = None
prev_gate_status = None
prev_distance = None
publish_success = False
# Connect the system to the WiFi
ssid = 'Wokwi-GUEST'
print("Connecting to WiFi ........", end="")
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect(ssid, '')
while not sta_if.isconnected():
print(".", end="")
time.sleep(0.1)
print(" Connected!")
# MQTT Server Parameters
MQTT_CLIENT_ID = "c76a631320a34292a95421b6396dfaa5.s1.eu.hivemq.cloud"
MQTT_BROKER = "c76a631320a34292a95421b6396dfaa5.s1.eu.hivemq.cloud"
MQTT_USER = "smartsystem"
MQTT_PASSWORD = "SmartSystem1!"
MQTT_TOPIC_GATE = "prog6002/gate"
MQTT_TOPIC_TEMP = "prog6002/temperature"
MQTT_TOPIC_HUMIDITY = "prog6002/humidity"
# Setting up MQTT client
client = MQTTClient.MQTTClient(client_id=MQTT_CLIENT_ID,
server=MQTT_BROKER,
user=MQTT_USER,
password=MQTT_PASSWORD,
keepalive=7200, # Reduced keepalive
ssl=True,
ssl_params={'server_hostname': MQTT_BROKER})
# The callback function
def sub_callback(topic, msg):
print("Received: {}:{}".format(topic.decode(), msg.decode()))
oled.fill(0)
oled.text(msg.decode(), 10, 10)
oled.show()
client.set_callback(sub_callback)
def reconnect():
try:
client.connect()
client.subscribe(MQTT_TOPIC_GATE)
client.subscribe(MQTT_TOPIC_TEMP)
client.subscribe(MQTT_TOPIC_HUMIDITY)
print("MQTT Reconnected!")
except Exception as e:
print(f"MQTT Reconnection failed: {e}")
time.sleep(5)
print("Connecting to MQTT server... ", end="")
try:
client.connect()
client.subscribe(MQTT_TOPIC_GATE)
client.subscribe(MQTT_TOPIC_TEMP)
client.subscribe(MQTT_TOPIC_HUMIDITY)
print("MQTT Connected!")
except Exception as e:
print(f"MQTT connection failed: {e}")
reconnect()
# Initialize I2C for OLED
i2c = I2C(0, scl=Pin(22), sda=Pin(21))
oled_width = 128
oled_height = 64
oled = ssd1306.SSD1306_I2C(oled_width, oled_height, i2c)
# Initializing the OLED display
oled.text('Hello!', 10, 10)
oled.show()
# Initialize DHT22 sensor
sensor = dht.DHT22(Pin(13))
# Initialize ultrasonic sensor pins
echo = Pin(18, Pin.IN)
trig = Pin(5, Pin.OUT)
# Initialize servo motor
servo = PWM(Pin(12, mode=Pin.OUT))
servo.freq(50)
# Function to publish messages
def publish_message(topic, message):
try:
client.publish(topic, message)
print(f"Published to {topic}: {message}")
except Exception as e:
print(f"Error publishing to {topic}: {e}")
reconnect() # Attempt to reconnect on failure
# Function to open the gate
def open_gate(gate_status):
global prev_gate_status
if prev_gate_status != gate_status:
servo.duty_u16(1638)
print("Gate status: Open")
publish_success = publish_message(MQTT_TOPIC_GATE, ujson.dumps({"status": "Gate Opened"}))
prev_gate_status = gate_status
time.sleep(1)
# Function to close the gate
def close_gate(gate_status):
global prev_gate_status
if prev_gate_status != gate_status:
servo.duty_u16(7864)
print("Gate status: Closed")
publish_success = publish_message(MQTT_TOPIC_GATE, ujson.dumps({"status": "Gate Closed"}))
prev_gate_status = gate_status
time.sleep(1)
# Initialize publish cooldown
last_publish_time = 0
publish_cooldown = 10 # seconds
while True:
try:
# Measuring temperature and humidity
sensor.measure()
temp = sensor.temperature()
humidity = sensor.humidity()
# Clearing OLED display
oled.fill(0)
# Displaying temperature and humidity
oled.text(f'Temp: {temp:.1f} C', 10, 10)
oled.text(f'Humidity: {humidity:.1f} %', 10, 25)
current_time = time.time()
if prev_temp_reading != temp and current_time - last_publish_time > publish_cooldown:
publish_success = publish_message(MQTT_TOPIC_TEMP, ujson.dumps({"temperature": f"{temp:.2f}"}))
last_publish_time = current_time
prev_temp_reading = temp
if prev_humidity_reading != humidity and current_time - last_publish_time > publish_cooldown:
publish_success = publish_message(MQTT_TOPIC_HUMIDITY, ujson.dumps({"humidity": f"{humidity:.2f}"}))
last_publish_time = current_time
prev_humidity_reading = humidity
# Distance measurement
trig.value(0)
time.sleep_us(2)
trig.value(1)
time.sleep_us(10)
trig.value(0)
duration = machine.time_pulse_us(echo, 1)
distance = int((0.034 * duration) / 2)
print('Distance:', f'{distance:.1f}', 'cm')
# Check distance to control gate
if distance < 60 and prev_distance != distance:
oled.text('Gate: Open', 10, 40)
open_gate("Open")
elif distance > 60 and prev_distance != distance:
oled.text('Gate: Closed', 10, 40)
close_gate("Closed")
else:
print("Nothing to publish")
prev_distance = distance
# Update OLED display
oled.show()
except Exception as e:
print(f"Main loop error: {e}")
reconnect() # Attempt to reconnect if there's an error
time.sleep(5) # Adjust sleep duration as needed