import machine
from machine import ADC, Pin, time_pulse_us, PWM
from machine import Timer
from time import sleep_ms
import os
import onewire
import ds18x20
import dht
import time
import ubluetooth
import network
import urequests
import uos
import ujson
# Load TFLite model and allocate tensors
# interpreter = tf.lite.Interpreter(model_path="nn_autoencoder.tflite")
# interpreter.allocate_tensors()
# # Get input and output tensors
# input_details = interpreter.get_input_details()
# output_details = interpreter.get_output_details()
# ble_msg = ""
# is_ble_connected = False
# from ble_advertising import advertising_payload
# from ubluetooth import BLE, UUID, FLAG_READ, FLAG_WRITE
# import machine
# import time
# # Initialize BLE
# ble = BLE()
# ble.active(True)
# # Define BLE services and characteristics
# service1_uuid = UUID("12345678-1234-5678-1234-56789abcdef0") # Service 1
# char1_uuid = UUID("12345678-1234-5678-1234-56789abcdef1") # Characteristic 1
# service2_uuid = UUID("abcdef12-3456-7890-abcd-ef1234567890") # Service 2
# char2_uuid = UUID("abcdef12-3456-7890-abcd-ef0987654321") # Characteristic 2
# # Create services
# services = [
# (service1_uuid, [(char1_uuid, FLAG_READ | FLAG_WRITE)]),
# (service2_uuid, [(char2_uuid, FLAG_READ | FLAG_WRITE)])
# ]
# handles = ble.gatts_register_services(services)
# # Advertising payload
# def advertise(ble, name="ESP32-Multi-BLE"):
# payload = b'\x02\x01\x06' + bytearray([len(name) + 1, 0x09]) + name.encode()
# ble.gap_advertise(100, payload)
# print(f"Advertising as '{name}'...")
# # Callback to handle data
# def on_write(event, data):
# if data == b'command1':
# print("Command 1 received for Service 1")
# elif data == b'command2':
# print("Command 2 received for Service 2")
# # Set default values
# ble.gatts_write(handles[0][1], b'Initial data for service 1') # Set initial value for Service 1
# ble.gatts_write(handles[1][1], b'Initial data for service 2') # Set initial value for Service 2
# # Define BLE callback
# def ble_callback(event, data):
# if event == 1: # Connected
# print("Central device connected")
# elif event == 2: # Disconnected
# print("Central device disconnected")
# advertise(ble)
# elif event == 3: # Write event
# value = ble.gatts_read(data)
# on_write(event, value)
# ble.irq(ble_callback)
# # Start advertising
# advertise(ble)
# try:
# while True:
# time.sleep(1) # Keep program running
# except KeyboardInterrupt:
# ble.active(False)
# print("BLE service stopped.")
# # Initialize BLE a
# # Handle received data
# def ble_on_write(char):
# data = char.decode('utf-8').strip()
# if data == 'part1':
# Exec_Monitoring()
# elif data == 'part2':
# Exec_Training()
# # Define parts of your code
# def Exec_Monitoring():
# global Monitoring
# Monitoring= True
# print("Part One is running...")
# def Exec_Training():
# global Training
# Training= True
# print("Part Two is running...")
# # BLE callback handler
# def ble_callback(event, data):
# if event == 1: # Central connected
# print("Central device connected.")
# elif event == 2: # Central disconnected
# print("Central device disconnected.")
# ble.start_advertising()
# # BLE service setup
# def setup_ble():
# ble.active(True)
# ble.config(gap_name="ESP32-BLE")
# ble.irq(ble_callback)
# service_uuid = UUID('12345678-1234-5678-1234-56789abcdef0')
# char_uuid = UUID('12345678-1234-5678-1234-56789abcdef1')
# service = (service_uuid, [(char_uuid, FLAG_READ | FLAG_WRITE)])
# ble.gatts_register_services([service])
# ble_handle = ble.gatts_add_characteristic(char_uuid)
# ble.gatts_set_buffer(ble_handle, 20, True) # Set buffer for the characteristic
# # Advertising
# payload = advertising_payload(name="ESP32-BLE", services=[service_uuid])
# ble.gap_advertise(100, payload)
# print("BLE service started. Waiting for commands...")
# return ble_handle
# # Initialize BLE service
# handle = setup_ble()
# # Main loop to listen for BLE writes
# try:
# while True:
# events = ble.events()
# if events: # Check for BLE events
# data = ble.gatts_read(handle)
# if data:
# ble_on_write(data)
# time.sleep(0.1)
# except KeyboardInterrupt:
# print("BLE service stopped.")
# Setup the data pins
dht_pin= machine.Pin(4)
ds_pin = machine.Pin(18)
pir_sensor = Pin(21, Pin.IN) #ats_pin = machine.Pin(25, machine.Pin.IN), #adc = ADC(Pin(32))
trigger = Pin(22, Pin.OUT)
echo = Pin(23, Pin.IN)
red = PWM(Pin(25))
green = PWM(Pin(26))
blue = PWM(Pin(27))
red.freq(1000)
green.freq(1000)
blue.freq(1000)
buzzer_pin = Pin(19, Pin.OUT)
if dht_pin and ds_pin and pir_sensor and trigger and echo and red and green and blue and buzzer_pin:
print('***Pins Setup Successful***')
#Initialising Sensors
dht_sensor = dht.DHT22(dht_pin)
ow = onewire.OneWire(ds_pin)
ds = ds18x20.DS18X20(ow) # Initialize the onewire and DS18B20 objects
# adc.width(ADC.WIDTH_12BIT) #Set ADC resolution to 12 bits (0-4095)
# adc.atten(ADC.ATTN_11DB) # Set attenuation for maximum input voltage range (approx 3.3V)
# # Calibration constants (depending on ACS712 version and applied setup)
# VREF = 3.3 # Reference voltage (depends on microcontroller setup)
# ADC_RESOLUTION = 4095 # 12-bit ADC range
# VREF = 3.3 # Reference voltage
# SENSOR_SENSITIVITY = 0.066 # Sensitivity in V/A (e.g., 66mV/A for ACS712-5A)
Training= False
Monitoring= True
# Wi-Fi credentials
SSID = "uofzStaff"
PASSWORD = "wkpProg219!"
# ThingSpeak details
THINGSPEAK_API_KEY = "IUVRP5ZBSYFDNQEQ"
THINGSPEAK_URL = "https://thingspeak.mathworks.com/channels/2909070/private_show"
if True:
print('***Sensor Initialised Sucessfully***')
#Bluetooth Sending Function
def transfer_to_gui(scaled_data):
global is_ble_connected
if scaled_data:
print("Sending information via BLE...")
if is_ble_connected:
ble.send(scaled_data)
print('***Data trans Success!!!***')
else:
print("No data to send.")
if True:
print("*** Bluetooth Functions Defined Successfully***")
#sensor= ds.scan()
# if sensor:
# print(f' Found: {sensor}')
# ds.convert_temp()
pfv_hum = 0
pfv_r_tmp=0
pfv_dtemp=0
pfv_ndtmp=0
pfv_rtspd=0
pfv_tmcrnt=0
sys_threshold=''
sns_threshold= [402, 40, 0, 403, 403, 403]
Iqrs = {
'0': {'ub': 405,'lb': 403},
'1': {'ub': 50,'lb': 40},
'2': {'ub': 0,'lb': 0},
'3': {'ub': 405,'lb': 403},
'4': {'ub': 405,'lb': 403},
'5': {'ub': 405,'lb': 403},
}
Nmtrx = {
'0': {'max': 0.95,'min': 0.05},
'1': {'max': 0.80,'min': 0.02},
'2': {'max': 0.9,'min': 0.1},
'3': {'max': 1,'min': 0.01},
'4': {'max': 0.95,'min': 0.04},
'5': {'max': 0.80,'min': 0.03},
}
modes = [402, 40, 0, 403, 403, 403]
if dht_sensor and ow and ds:
print("*** Pins and Modules Initialised Successfully***")
#Signal Acquisition Functions and Outputs
#Functions for Outputs
# Function to activate the buzzer
def buzz(duration):
buzzer_pin.on() # Turn the buzzer on
time.sleep(duration) # Wait for the specified duration
buzzer_pin.off() # Turn the buzzer off
#Function to activate rgb light
def set_color(r, g, b):
red.duty_u16(int(r * 65535)) # Scale 0-1 to 0-65535
green.duty_u16(int(g * 65535))
blue.duty_u16(int(b * 65535))
if True:
print("***Functions for Outputs Defined***")
# Function for UltraSonic Sensor
def measure_distance():
# Send a 10us pulse to trigger the sensor
trigger.value(0)
time.sleep_us(2)
trigger.value(1)
time.sleep_us(10)
trigger.value(0)
# Measure the duration of the echo pulse
duration = time_pulse_us(echo, 1)
# Calculate the distance in cm (speed of sound: 343 m/s)
distance = (duration / 2) / 29.1 # Divide by 2 for round trip, and by 29.1 for cm
return distance
#Current Sensor
def read_current():
# Read the raw ADC value
raw_value = adc.read()
# Convert ADC value to voltage
voltage = (raw_value / ADC_RESOLUTION) * VREF
# Calculate current (subtract offset voltage and divide by sensitivity)
current = (voltage - (VREF / 2)) / SENSITIVITY
return current
#Speed Sensors
def read_ats_sensor_data(pin):
"""
Reads data from the ATS667 speed sensor.
"""
try:
# Detect state changes (high/low) from the sensor
state = pin.value()
return state
except Exception as e:
print(f"Error reading sensor data: {e}")
return None
if True:
print("***Sensor Functions Defined Successfully***")
#Noise Filtering for Digital and Analog Sensors
# Parameters for the moving average filter (analog-like signal)
WINDOW_SIZE = 5
analog_buffer = []
# Function to apply the moving average filter
def moving_average_filter(new_value, buffer, window_size):
"""
Applies a moving average filter to smooth analog signals.
"""
buffer.append(new_value)
if len(buffer) > window_size:
buffer.pop(0) # Maintain buffer size
return sum(buffer) / len(buffer)
#Function to read csv file
def create_csv(file_name):
try:
with open(file_name, "w") as file:
# Read all lines, skipping the header
lines = file.readlines()[1:] # Skip the first line (header)
data = [line.strip().split(",") for line in lines]
return data
except Exception as e:
print("Error reading CSV file:", e)
return []
#Function for a Low Pass Filter
def low_pass_filter(new_sample, prev_filtered, alpha):
"""
Simple low-pass filter using exponential moving average.
Parameters:
new_sample: Current signal value (new sample).
prev_filtered: Previously filtered value.
alpha: Smoothing factor (0 < alpha < 1). Lower alpha = stronger filtering.
Returns:
Filtered signal value.
"""
filtered_value = alpha * new_sample + (1 - alpha) * prev_filtered
return filtered_value
# Main function to denoise imputed signals
def filter_noise(sgnls):
#Low-Pass Filter Implementation
alpha = 0.1 # Adjust this based on your application (0.1 for stronger filtering)
global pfv_hum
global pfv_r_tmp
global pfv_dtemp
global pfv_ndtmp
global pfv_rtspd
global pfv_tmcrnt
fltd_rel_hum = [low_pass_filter(v,pfv_hum, alpha) for i,v in enumerate (sgnls) if v==sgnls[0]]
print(f'Filtered Hum Value is: {fltd_rel_hum}')
pfv_hum= fltd_rel_hum[0]
fltd_rel_temp = [low_pass_filter(v,pfv_r_tmp, alpha) for i,v in enumerate (sgnls) if v==sgnls[1]]
pfv_r_tmp= fltd_rel_temp[0]
fltd_tmcrnt = [low_pass_filter(v,pfv_tmcrnt, alpha) for i,v in enumerate (sgnls) if v==sgnls[2]]
pfv_tmcrnt= fltd_rel_hum[0]
fltd_dtemp = [low_pass_filter(v,pfv_dtemp, alpha) for i,v in enumerate (sgnls) if v==sgnls[3]]
pfv_dtemp= fltd_dtemp[0]
fltd_ndtemp= [low_pass_filter(v,pfv_ndtmp, alpha) for i,v in enumerate (sgnls) if v==sgnls[4]]
pfv_ndtmp= fltd_ndtemp[0]
fltd_rtspd= [low_pass_filter(v,pfv_rtspd, alpha) for i,v in enumerate (sgnls) if v==sgnls[5]]
pfv_rtspd= fltd_rtspd[0]
return [fltd_rel_temp[0], fltd_rel_hum[0], fltd_dtemp[0], fltd_ndtemp[0], fltd_rtspd[0], fltd_tmcrnt[0] ]
if True:
print("*** Denoising Functions Defined Successfully***")
#Function for Removing Outliers
def check_and_remove_outliers(data):
"""
Checks for outliers from denoised signals and clips each sensor's value which is an outlier.
Outliers are identified using the IQR method."""
global Iqrs
clean_sns_v=[]
print(f'This is the data list of sensors:{data}')
try:
for s in range(len(data)):
#Clipping outliers
v = float(data[s])
if v < Iqrs[f'{s}']['lb']:
v = Iqrs[f'{s}']['lb']
clean_sns_v.append(v)
elif v > Iqrs[f'{s}']['ub']:
v = Iqrs[f'{s}']['ub']
clean_sns_v.append(v)
else:
clean_sns_v.append(v)
except Exception as e:
print(f'Data manipulation failed with error: {e}')
# Writing the cleaned data back to the file
print(f'This is the cleaned sensor data \n {clean_sns_v}')
return clean_sns_v
if True:
print('***Outliers Removing Functions Defined successfully***')
def normalize(data):
"""
Reads sensor data, normalizes its contents, and writes the normalized data to a file.
"""
data_values=[float(value) for value in data] # Convert all values to float
global Nmtrx
scaled_sns_v=[]
try:
for s in range(len(data)):
#Clipping outliers
v = float(data[s])
if v < Nmtrx[f'{s}']['min']:
v = Nmtrx[f'{s}']['min']
scaled_sns_v.append(v)
elif v > Nmtrx[f'{s}']['max']:
v = Nmtrx[f'{s}']['max']
scaled_sns_v.append(v)
else:
scaled_sns_v_sns_v.append((v - Nmtrx[f'{s}']['min']) / ( Nmtrx[f'{s}']['max'] - Nmtrx[f'{s}']['min']))
except Exception as e:
print(f'Data manipulation failed with error: {e}')
# Writing the cleaned data back to the file
print(f'This is the Normalised data \n {scaled_sns_v}')
return scaled_sns_v
if True:
print("*** Normalising Functions Defined Successfully***")
I_D=1
data_2 = {
'readings': []
}
def create_and_insert(data):
# Access each item
for index, item in enumerate(data):
if index == 0:
relative_temp_values = item
print(f'This is a rel_temp reading {relative_temp_values}')
if index== 1:
relative_hum_values=item
print(f'This is a rel_hum reading {relative_hum_values}')
if index==2:
deb_temp_values=item
print(f'This is a deb_temp reading {deb_temp_values}')
if index==3:
non_deb_temp_values=item
print(f'This is a nd_temp reading {non_deb_temp_values}')
if index==4:
rt_speed_values=item
print(f'This is a rt_speed reading {rt_speed_values}')
if index==5:
tm_current_values =item
print(f'This is current value {tm_current_values}')
global I_D
global data_2
# Modify the JSON data
data_2["readings"].append(
{
'id':I_D,
'rel_temp': relative_temp_values,
'rel_hum':relative_hum_values,
'deb_temp': deb_temp_values,
'non_deb_temp':non_deb_temp_values,
'rt_speed': rt_speed_values,
'tm_current': tm_current_values})
# Write the updated data back to the file
with open(db_file, "w") as file:
ujson.dump(data_2, file)
print(f"Updated data written to '{db_file}'")
# Display the updated data
print("Updated Data:", data_2)
I_D +=1
def view_database_files(db_directory):
# List files in the directory
print("Files in database directory:", uos.listdir(db_directory))
def retrieve_data(db_file):
# Read data from the JSON file
with open(db_file, "r") as file:
loaded_data = ujson.load(file)
if type(loaded_data)== dict:
print(f'This is the file with {len(loaded_data['readings'])} loggings {loaded_data}')
rel_tmp=[]
rel_hum=[]
db_tmp=[]
ndb_tmp=[]
rt_spd=[]
tm_crt=[]
for index, sns in enumerate(loaded_data['readings']):
if index==0:
rel_tmp.append(sns['rel_temp'])
if index==1:
rel_hum.append(sns['rel_hum'])
if index==1:
db_temp.append(sns['deb_temp'])
if index==1:
ndb_temp.append(sns['non_deb_temp'])
if index==1:
rt_speed.append(sns['rt_speed'])
if index==1:
tm_crt.append(sns['tm_current'])
return loaded_data
else:
print(f'This is the file with {len(loaded_data)} loggings {loaded_data}')
# Accessing individual pieces of data
# Example: Get the name of the first user
rel_tmp=[]
rel_hum=[]
db_tmp=[]
ndb_tmp=[]
rt_spd=[]
tm_crt=[]
rel_tmp.append(loaded_data[0])
rel_hum.append(loaded_data[1])
db_tmp.append(loaded_data[2])
ndb_tmp.append(loaded_data[3])
rt_spd.append(loaded_data[4])
#tm_crt.append(loaded_data[5])
data= [rel_tmp, rel_hum, db_tmp, ndb_tmp, rt_spd, tm_crt]
return data
#Reading Values Continously
#Function for Checking and fixing missing values
def check_and_impute(relative_temp, relative_hum, deb_temp, non_deb_temp, rt_speed, tm_curren):
"""
Reads sensor values, checks for missing values, and imputes them using the mode."""
global modes
data = [str(relative_temp), str(relative_hum), str(deb_temp), str(non_deb_temp), str(rt_speed), str(tm_current)]
# Imputing Missing values
nw_data = [float(modes[sns]) if v == '' else float(v) for sns, v in enumerate(data)]
print("Missing values have been imputed using mode.")
print('Function ran successfully')
return nw_data
if True:
print("*** Missing Values Fixing Functions Defined Successfully***")
#Function to send data to Thingspeak
# Connect to Wi-Fi
def connect_to_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print("Connecting to Wi-Fi...")
wlan.connect(SSID, PASSWORD)
while wlan.isconnected():
pass
print("Wi-Fi connected:", wlan.ifconfig())
# Send data to ThingSpeak
def send_to_thingspeak(relative_temp, relative_hum, deb_temp, non_deb_temp, rt_speed, tm_current, reco_error, alert):
try:
global THINGSPEAK_API_KEY
# Prepare the payload
payload = {
"api_key": THINGSPEAK_API_KEY,
"field1": relative_temp,
"field2": relative_hum,
"field3": deb_temp,
"field4": non_deb_temp,
"field5": rt_speed,
"field6": tm_current,
"filed7": reco_error,
"field8": alert
}
# Send the request
response = urequests.post(THINGSPEAK_URL, json=payload)
print("Response:", response.text)
response.close()
except Exception as e:
print("Failed to send data to ThingSpeak:", e)
#Function to send sensor data to thingspeak every second
def main_thingspeak_portal(db_file):
connect_to_wifi()
condition_data = retrieve_data(db_file)
if condition_data:
for item in condition_data:
print(f"Sending data: {timestamp},{relative_temp:.2f}°C,{relative_hum:.2f}%, {deb_temp:.2f}°C, {non_deb_temp:.2f}°C, {rt_speed:.2f}rpm, {tm_current:.2f}, {error}")
send_to_thingspeak(item[0], item[1], item[2], item[3], item[4], item[5], item[6][0])
time.sleep(1) # ThingSpeak allows updates every 15 seconds
else:
print("No data to send.")
if True:
print('*** ThingSpeak Functions Defined Successfully***')
class ESP32_BLE():
def __init__(self, name):
# Create internal objects for the onboard LED
# blinking when no BLE device is connected
# stable ON when connected
self.led = Pin(2, Pin.OUT)
self.timer1 = Timer(0)
self.name = name
self.ble = ubluetooth.BLE()
self.ble.active(True)
self.disconnected()
self.ble.irq(self.ble_irq)
self.register()
self.advertiser()
def connected(self):
global is_ble_connected
is_ble_connected = True
self.led.value(1)
self.timer1.deinit()
def disconnected(self):
global is_ble_connected
is_ble_connected = False
self.timer1.init(period=100, mode=Timer.PERIODIC, callback=lambda t: self.led.value(not self.led.value()))
def ble_irq(self, event, data):
global ble_msg
if event == 1: #_IRQ_CENTRAL_CONNECT:
# A central has connected to this peripheral
self.connected()
elif event == 2: #_IRQ_CENTRAL_DISCONNECT:
# A central has disconnected from this peripheral.
self.advertiser()
self.disconnected()
elif event == 3: #_IRQ_GATTS_WRITE:
# A client has written to this characteristic or descriptor.
buffer = self.ble.gatts_read(self.rx)
ble_msg = buffer.decode('UTF-8').strip()
def register(self):
# Nordic UART Service (NUS)
NUS_UUID = '6E400001-B5A3-F393-E0A9-E50E24DCCA9E'
RX_UUID = '6E400002-B5A3-F393-E0A9-E50E24DCCA9E'
TX_UUID = '6E400003-B5A3-F393-E0A9-E50E24DCCA9E'
BLE_NUS = ubluetooth.UUID(NUS_UUID)
BLE_RX = (ubluetooth.UUID(RX_UUID), ubluetooth.FLAG_WRITE)
BLE_TX = (ubluetooth.UUID(TX_UUID), ubluetooth.FLAG_NOTIFY)
BLE_UART = (BLE_NUS, (BLE_TX, BLE_RX,))
SERVICES = (BLE_UART, )
((self.tx, self.rx,), ) = self.ble.gatts_register_services(SERVICES)
def send(self, data):
"""Send sensor data via BLE."""
for value in data:
msg= str(value) # Convert sensor value to string
tm_s= str(get_timestamp)
self.ble.gatts_notify(0, self.tx, msg + 'n')
print("Sent:", msg)
def advertiser(self):
name = bytes(self.name, 'UTF-8')
adv_data = bytearray('x02x01x02') + bytearray((len(name) + 1, 0x09)) + name
self.ble.gap_advertise(100, adv_data)
print(adv_data)
print("rn")
# adv_data
# raw: 0x02010209094553503332424C45
# b'x02x01x02ttESP32BLE'
#
# 0x02 - General discoverable mode
# 0x01 - AD Type = 0x01
# 0x02 - value = 0x02
def store(data):
# Storing scaled Sensor data
timestamp = "{:04d}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}".format(*get_timestamp())
# Create a directory for the JSON database
#data.append(status)
data.append([get_timestamp])
db_directory = "json_database"
try:
uos.mkdir(db_directory)
print(f"Directory '{db_directory}' created!")
except OSError:
print(f"Directory '{db_directory}' already exists!")
# File path for the JSON database
db_file = f"{db_directory}/condition_data.json"
with open(db_file, "w") as file:
ujson.dump(data, file)
print(f"Updated data written to '{db_file}'")
def monitor(scaled_data):
global sys_threshold
interpreter.set_tensor(input_details[0]["index"], scaled_data)
interpreter.invoke()
overall_output = interpreter.get_tensor(output_details[0]["index"])
overall_error = np.mean((scaled_data - overall_output)**2)
if overall_error > sys_threshold:
status=['Fault Dev']
send_to_GUI(scaled_data, overall_output,overall_error, status)
else:
status=['Healthy']
send_to_GUI(scaled_data, overall_output,overall_error, status)
# Real-time anomaly detection loop
input_data_subset = scaled_data.astype(np.float32)
for feature_index in range(5): #5 repres features
global sns_threshold
csns_thres= sns_threshold[feature_index]
# Iterate through the selected samples for the current feature
for i in range(10):
current_sample = input_data_subset[i, :].reshape(1, -1)
sns_values=[]
reco_values=[]
sns1_error=[]
# Run inference
interpreter.set_tensor(input_details[0]['index'], current_sample)
interpreter.invoke()
output_data = interpreter.get_tensor(output_details[0]['index'])
# Gather the original and reconstructed values for the current feature
sns_values.append(current_sample[0][feature_index])
reco_values.append(output_data[0][feature_index])
sns1_error.append(np.mean((sns_values-reco_values)**2))
if sns1_error_error > csns_thres:
status=['Fault Dev']
send_to_GUI(sns_values, reco_values,sns1_error, status)
else:
status=['Healthy']
send_to_GUI(sns_values, reco_values,sns1_error, status)
sns_values.clear()
reco_values.clear()
sns1_error.clear()
def send_to_GUI(scaled_data, reco_data,reco_error, status):
ble = ESP32_BLE("OBDsysESP32")
store(scaled_data, status)
transfer_to_gui(scaled_data) # 5 signals
transfer_to_gui(reco_data) # 5 signals
transfer_to_gui(reco_error) # 5 signals
transfer_to_gui(status) # 5 states
if True:
print("*** Wi-Fi Functions Defined Successfully***")
# Function to get current timestamp
def get_timestamp():
return time.localtime()
if True:
print("************Now Here***************")
count=1
""" Code for Collecting Training Data"""
# Create the CSV file and write the header
def create_csv_file(file_name):
if os.path.exists(csv_filename):
print("File already exists. Appending data to the existing file.")
update_csv_file(file_name)
else:
with open(csv_filename, 'w') as file:
header= "Timestamp", "relative_temp", "relative_hum", "deb_temp", "non_deb_temp", "tm_current", "rt_speed\n"
file.write(header)
print("CSV file created.")
# Update the CSV file with new sensor readings
def update_csv_file(csv_filename):
timestamp = time.time()
sensor_values = [
get_timestamp,
relative_temp,
relative_hum,
deb_temp,
non_deb_temp,
tm_current,
rt_speed,
]
# Convert values to a CSV-formatted string
row = ",".join(map(str, sensor_values)) + "\n"
with open(csv_filename, 'a') as file:
file.write(row)
print("Data written to CSV file:", row.strip())
if Monitoring== True:
while True:
# Reading Current
#tm_current = read_current()
#print("Current: {:.2f} A".format(tm_current))
#Reading Relative Temperature and Humidity
dht_sensor.measure()
relative_temp= dht_sensor.temperature()
relative_hum= dht_sensor.humidity()
print(f'Temperature: {relative_temp} \n, Humidity: {relative_hum}')
#Reading Rotational Speed
#rt_speed = read_ats_sensor_data(ats_pin)
# if sensor_state is not None:
#print(f"Sensor State: {sensor_state}")
#Reading Temperature from Ds18b20
# Initiate a temperature conversion
# Read and print temperatures from all sensors
# temp_values=[]
# for sensor in sensors:
# temp_values.append(ds.read_temp(sensor))
deb_temp = measure_distance()
non_deb_temp = measure_distance()
# temp_values.clear()
print(f"deb_temp: {deb_temp}°C \n,non_deb_temp: {non_deb_temp} °C")
#Reading from PIR Motion Sensor
rt_speed= pir_sensor.value()
if rt_speed == 1:
print("Speed detected!", rt_speed, 'rpm')
else:
print("Speed detected.", rt_speed, 'rpm')
#Reading Ultrasonic Sensor
tm_current = measure_distance()
print("Tm_Current:", tm_current, "mA")
# Actual Missing value Imputaion Point
cmplt_sns_data = check_and_impute(relative_temp, relative_hum, deb_temp, non_deb_temp, rt_speed, tm_current)
print('***Imputed Signals***')
print(cmplt_sns_data)
# Actual Denoising Point
Fn_sns_data = filter_noise(cmplt_sns_data)# Denoising of Imputed signals
print('***Denoised Signals***')
print(Fn_sns_data)
# Actual Outlier Removal Point
Fotlr_sns_data = check_and_remove_outliers(Fn_sns_data) # Outlier removal from denoised signals
print('***Outlier Free Signals***')
print(Fotlr_sns_data)
#Actual Normalising point
output_file_path = 'normalized_sensor_data.csv' # Scaling of outlier free signals for inference
scaled_data = normalize(Fotlr_sns_data)
print('***Normalised Signals***')
print(scaled_data)
print('***Monitoring Conditions***')
#monitor(scaled_data) # Inferencing and sending to GUI
store(scaled_data)
main_thingspeak_portal('condition_data.csv') # Sending to Cloud
sleep_ms(3000)
if Training == True:
try:
print('Collecting Raw Sensor Data')
count=1
while True:
# Reading Current
#tm_current = read_current()
#print("Current: {:.2f} A".format(tm_current))
#Reading Relative Temperature and Humidity
dht_sensor.measure()
relative_temp= dht_sensor.temperature()
relative_hum= dht_sensor.humidity()
print(f'Temperature: {relative_temp} \n, Humidity: {relative_hum}')
#Reading Rotational Speed
#rt_speed = read_ats_sensor_data(ats_pin)
# if sensor_state is not None:
#print(f"Sensor State: {sensor_state}")
#Reading Temperature from Ds18b20
# Initiate a temperature conversion
# Read and print temperatures from all sensors
# temp_values=[]
# for sensor in sensors:
# temp_values.append(ds.read_temp(sensor))
deb_temp = measure_distance()
non_deb_temp = measure_distance()
# temp_values.clear()
print(f"deb_temp: {deb_temp}°C \n,non_deb_temp: {non_deb_temp} °C")
#Reading from PIR Motion Sensor
rt_speed= pir_sensor.value()
if rt_speed == 1:
print("Speed detected!", rt_speed, 'rpm')
else:
print("Speed detected.", rt_speed, 'rpm')
#Reading Ultrasonic Sensor
tm_current = measure_distance()
print("Tm_Current:", tm_current, "mA")
# File name for the CSV file
raw_data = "raw_data.csv"
# Initialize the CSV file
create_csv_file()
# Real-time data collection loop
create_csv_file(raw_data)
print(f'Logged {count} times')
time.sleep(3) # Collect data every second
count +=1
except KeyboardInterrupt:
print("Stopped data collection.")