import network
from machine import Pin, ADC, I2C
from ssd1306 import SSD1306_I2C
import urequests
import time
import dht
from onewire import OneWire
from ds18x20 import DS18X20
from umqtt.simple import MQTTClient
import random
# GPIO Pin Definitions
button_rotations = Pin(32, Pin.IN, Pin.PULL_UP)
button_send = Pin(16, Pin.IN, Pin.PULL_UP)
potentiometer = ADC(Pin(34))
potentiometer.atten(ADC.ATTN_11DB)
dht_sensor = dht.DHT22(Pin(14))
accelerometer = ADC(Pin(35))
accelerometer.atten(ADC.ATTN_11DB)
oled_width = 128
oled_height = 64
# I2C Setup for OLED Display
i2c = I2C(0, scl=Pin(22), sda=Pin(21))
oled = SSD1306_I2C(oled_width, oled_height, i2c)
# Global Variables
rotational_speed = 1500
backend_url = "https://iot-4u4v.onrender.com/predict"
wifi_timeout = 10
# Wi-Fi Credentials
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# MQTT Configuration
MQTT_BROKER = "test.mosquitto.org"
MQTT_CLIENT_ID = "wokwi-sensor-client"
MQTT_TOPIC_PREFIX = "wokwi/predictive_maint/" # Topic for sensor data
MQTT_INTERVAL = 5
# MQTT Client
mqtt_client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER)
# Connect to Wi-Fi
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
print("Connecting to Wi-Fi...")
oled.fill(0)
oled.text("Connecting to WiFi", 0, 0)
oled.show()
timeout = wifi_timeout
while not wlan.isconnected() and timeout > 0:
time.sleep(1)
timeout -= 1
if wlan.isconnected():
print("Connected to Wi-Fi!")
print("IP Address:", wlan.ifconfig()[0])
oled.fill(0)
oled.text("WiFi Connected", 0, 0)
oled.text(wlan.ifconfig()[0], 0, 20)
oled.show()
time.sleep(2)
else:
print("Wi-Fi Connection Failed!")
oled.fill(0)
oled.text("WiFi Failed!", 0, 0)
oled.show()
time.sleep(2)
# Connect to MQTT Broker
def connect_mqtt():
try:
mqtt_client.connect()
print("Connected to MQTT Broker!")
except Exception as e:
print("Failed to connect to MQTT Broker:", e)
# Display a message on OLED
def display_oled(message):
oled.fill(0) # Clear screen
oled.text("Predictive Maint", 0, 0)
oled.text(message, 0, 20)
oled.show()
# Gather sensor data
def gather_sensor_data():
try:
dht_sensor.measure()
air_temp = dht_sensor.temperature()
dht_sensor2 = dht.DHT22(Pin(27))
dht_sensor2.measure()
process_temp = dht_sensor2.temperature()
torque = potentiometer.read() * 0.1
tool_wear = accelerometer.read() * 0.1
return {
"air_temperature": air_temp,
"process_temperature": process_temp,
"rotational_speed": rotational_speed,
"torque": torque,
"tool_wear": tool_wear,
}
except Exception as e:
print("Error gathering sensor data:", e)
return None
# Send data to the backend
def send_data_to_backend():
try:
# Gather data
data = gather_sensor_data()
if not data:
display_oled("Error: No data")
return
# Send to backend
print("Sending data to backend:", data)
response = urequests.post(backend_url, json=data)
print("Response status:", response.status_code) # Add this line
print("Response content:", response.text) # Add this line
if response.status_code == 200:
result = response.json()
probability = result.get("failure_probability", 0)
maintenance_required = "Yes" if probability > 0.5 else "No"
# Display results on OLED
display_oled(f"Fail Prob: {probability:.2f}")
time.sleep(10)
display_oled(f"Maint Req: {maintenance_required}")
else:
display_oled("Error in Response")
except Exception as e:
print("Error sending data to backend:", e)
display_oled(f"Error: {str(e)}")
# Publish data to MQTT
def publish_to_mqtt():
try:
data = gather_sensor_data()
if not data:
print("No data to publish to MQTT.")
return
for key, value in data.items():
topic = f"{MQTT_TOPIC_PREFIX}{key}"
mqtt_client.publish(topic, str(value))
print(f"Published to {topic}: {value}")
except Exception as e:
print("Error publishing to MQTT:", e)
google_sheets_url = "https://script.google.com/macros/s/AKfycbxTmJ8QsSvrOmWRPM3OLat-lk1X2RbAVgabG_NAxhIa6e301qC_9qJJl7va11kzjxHR/exec" # Replace with your Google Sheets Web App URL
google_sheets_interval = 10
def url_encode(data):
"""URL-encode a dictionary of key-value pairs."""
encoded = "&".join(f"{key}={str(value).replace(' ', '%20')}" for key, value in data.items())
return encoded
def send_data_to_google_sheets():
try:
data = gather_sensor_data()
if not data:
print("No data to send to Google Sheets.")
display_oled("No data for GSheet")
return
form_data = url_encode(data)
headers = {"Content-Type": "application/x-www-form-urlencoded"}
print("Sending data to Google Sheets:", form_data)
response = urequests.post(google_sheets_url, data=form_data, headers=headers)
if response.status_code == 200:
print("Data successfully sent to Google Sheets.")
# display_oled("GSheet Log: Success")
else:
print(f"Failed to send data to Google Sheets. Status Code: {response.status_code}")
# display_oled(f"GSheet Err: {response.status_code}")
# Close response to free memory
response.close()
except Exception as e:
print("Error sending data to Google Sheets:", e)
display_oled("GSheet Err")
# Main Loop
def main():
# Connect to Wi-Fi and MQTT
connect_wifi()
connect_mqtt()
last_mqtt_publish = time.time()
last_google_sheets_post = time.time()
# Main loop
while True:
current_time = time.time()
# Send data to backend on button press
if not button_send.value(): # Button pressed to send data
send_data_to_backend()
time.sleep(0.5) # Debounce
# Periodically publish to MQTT
if current_time - last_mqtt_publish >= MQTT_INTERVAL:
publish_to_mqtt()
last_mqtt_publish = current_time
# Periodically send data to Google Sheets
if current_time - last_google_sheets_post >= google_sheets_interval:
send_data_to_google_sheets()
last_google_sheets_post = current_time
# Adjust rotational speed on button press
if not button_rotations.value(): # Button pressed to adjust speed
global rotational_speed
rotational_speed += 100 # Increment speed by 100 RPM
if rotational_speed > 3000: # Reset if exceeds max
rotational_speed = 1500
display_oled(f"Speed: {rotational_speed} RPM")
time.sleep(0.5) # Debounce
# Run the program
if __name__ == "__main__":
main()
#spreadsheet
#deployment id-AKfycbytjLSQT3UGayD5sLFs3XhyZNdUTxKccRgNEQf3hiPa-UgyWY1HCj2wIdA143FPy5RutA
#url- https://script.google.com/macros/s/AKfycbytjLSQT3UGayD5sLFs3XhyZNdUTxKccRgNEQf3hiPa-UgyWY1HCj2wIdA143FPy5RutA/exec