import math
import time
import machine
import dht
from machine import ADC, I2C, Pin
# Pinout declarations
PIN_LIGHT_SENSOR = 34 # Ambient light sensor on ADC pin 34
PIN_DHT_SENSOR = 4 # DHT22 temperature and humidity sensor on pin 4
PIN_SOIL_SENSOR = 35 # Adafruit soil sensor (analog) on pin 35
PIN_MOISTURE_SENSOR = 32 # Moisture sensor on ADC pin 32
PIN_GAS_SENSOR = 33 # Gas sensor (analog) on pin 33
PIN_I2C_SCL = 22 # I2C SCL pin for ADXL345 accelerometer (if used)
PIN_I2C_SDA = 21 # I2C SDA pin for ADXL345 accelerometer (if used)
# Initialize sensors
light_sensor = ADC(Pin(PIN_LIGHT_SENSOR))
light_sensor.atten(ADC.ATTN_11DB)
# ADXL345 accelerometer on I2C (if used)
# i2c = I2C(scl=Pin(PIN_I2C_SCL), sda=Pin(PIN_I2C_SDA))
# accel = adxl345.ADXL345(i2c)
# Use DHT22 instead of DHT11
dht_sensor = dht.DHT22(Pin(PIN_DHT_SENSOR))
soil_sensor = ADC(Pin(PIN_SOIL_SENSOR))
soil_sensor.atten(ADC.ATTN_11DB)
moisture_sensor = ADC(Pin(PIN_MOISTURE_SENSOR))
moisture_sensor.atten(ADC.ATTN_11DB)
gas_sensor = ADC(Pin(PIN_GAS_SENSOR))
gas_sensor.atten(ADC.ATTN_11DB)
# Function to collect data from sensors
def collect_data():
data = []
try:
# Read light sensor
light = light_sensor.read()
data.append(light)
except Exception as e:
print(f"Error reading light sensor: {e}")
data.append(-1) # Append a default error value
try:
# Use hardcoded dummy values for accelerometer
axes = [12, 34, 56] # Replace with actual accelerometer data if available
data.extend(axes)
except Exception as e:
print(f"Error reading accelerometer: {e}")
data.extend([-1, -1, -1]) # Append default error values
try:
# Read DHT22 sensor
dht_sensor.measure()
temp = dht_sensor.temperature()
humidity = dht_sensor.humidity()
data.append(temp)
data.append(humidity)
except Exception as e:
print(f"Error reading DHT22 sensor: {e}")
data.extend([-1, -1]) # Append default error values
try:
# Read soil sensor
soil = soil_sensor.read()
data.append(soil)
except Exception as e:
print(f"Error reading soil sensor: {e}")
data.append(-1) # Append a default error value
try:
# Read moisture sensor
moisture = moisture_sensor.read()
data.append(moisture)
except Exception as e:
print(f"Error reading moisture sensor: {e}")
data.append(-1) # Append a default error value
try:
# Read gas sensor
gas = gas_sensor.read()
data.append(gas)
except Exception as e:
print(f"Error reading gas sensor: {e}")
data.append(-1) # Append a default error value
return data
# PCA implementation
def mean(data):
return sum(data) / len(data) if len(data) > 0 else 0
def covariance(x, y):
n = len(x)
if n <= 1:
return 0
mean_x = mean(x)
mean_y = mean(y)
cov = 0
for i in range(n):
cov += (x[i] - mean_x) * (y[i] - mean_y)
return cov / (n - 1)
def eigenvectors(matrix):
try:
# Simple 2D eigenvector calculation for demonstration purposes
# For higher dimensions, use appropriate numerical methods
a, b, c, d = matrix[0][0], matrix[0][1], matrix[1][0], matrix[1][1]
trace = a + d
determinant = a * d - b * c
if b == 0: # Check for division by zero
print("Error: division by zero in eigenvector calculation")
return [(1, 0), (0, 1)] # Return default identity vectors if division by zero occurs
lambda1 = trace / 2 + math.sqrt(trace ** 2 / 4 - determinant)
lambda2 = trace / 2 - math.sqrt(trace ** 2 / 4 - determinant)
eigenvector1 = (1, (lambda1 - a) / b)
eigenvector2 = (1, (lambda2 - a) / b)
return [eigenvector1, eigenvector2]
except Exception as e:
print(f"Error calculating eigenvectors: {e}")
return [(0, 0), (0, 0)] # Return default values if error occurs
def pca(data):
try:
# Normalize data
mean_centered = [[x - mean(col) for x in col] for col in zip(*data)]
# Calculate covariance matrix
covariance_matrix = [[covariance(x, y) for y in mean_centered] for x in mean_centered]
# Get eigenvectors (principal components)
components = eigenvectors(covariance_matrix)
# Project data onto principal components
pca_data = [[sum(a * b for a, b in zip(row, comp)) for comp in components] for row in mean_centered]
return pca_data
except Exception as e:
print(f"Error in PCA processing: {e}")
return [] # Return empty list if error occurs
# Main loop to collect and process data
data_buffer = []
while True:
try:
sensor_data = collect_data()
data_buffer.append(sensor_data)
print(f"Buffer size: {len(data_buffer)}") # Debugging: Check buffer size
if len(data_buffer) >= 10: # Collect data in batches of 10
print("Calling PCA function...") # Debugging: Confirm PCA call
print("Data sent to PCA:", data_buffer) # Debugging: Check data sent to PCA
pca_result = pca(data_buffer)
if pca_result:
print("PCA Result:", pca_result)
data_buffer = [] # Reset buffer
time.sleep(5) # Collect data every 5 seconds
except Exception as e:
print(f"Error in main loop: {e}")