import machine
import network
import time
import dht
import urequests
from machine import Pin, I2C, ADC
# Wi-Fi configuration
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# ThingSpeak Channel 1 configuration (input data)
THINGSPEAK_CHANNEL_1_ID = 2817791
THINGSPEAK_CHANNEL_1_WRITE_API_KEY = "9ADR240XYFJIV3O2"
# ThingSpeak Channel 2 configuration (accelerometer data)
THINGSPEAK_CHANNEL_2_ID = 2819005
THINGSPEAK_CHANNEL_2_WRITE_API_KEY = "EF0IK8PKF5JJ8SWM"
# MPU6050 Registers
MPU6050_ADDR = 0x68
PWR_MGMT_1 = 0x6B
ACCEL_XOUT_H = 0x3B
GYRO_XOUT_H = 0x43
# Scale factors
ACCEL_SCALE = 16384.0 # +/- 2g
GYRO_SCALE = 131.0 # +/- 250 deg/s
# Initialize DHT sensor
dht_sensor = dht.DHT22(machine.Pin(2))
# Initialize Moisture sensor
moisture = ADC(Pin(28)) # GP28 as ADC
# Function to initialize I2C for MPU6050
def init_i2c():
return I2C(0, scl=Pin(9), sda=Pin(8), freq=400000)
# Function to initialize MPU6050
def init_mpu6050(i2c):
i2c.writeto_mem(MPU6050_ADDR, PWR_MGMT_1, b'\x00') # Wake up sensor
# Function to read 16-bit signed data
def read_word(i2c, reg):
data = i2c.readfrom_mem(MPU6050_ADDR, reg, 2)
value = (data[0] << 8) | data[1]
return value if value < 0x8000 else value - 0x10000
# Function to read accelerometer data
def read_accel(i2c):
accel_x = read_word(i2c, ACCEL_XOUT_H) / ACCEL_SCALE
accel_y = read_word(i2c, ACCEL_XOUT_H + 2) / ACCEL_SCALE
accel_z = read_word(i2c, ACCEL_XOUT_H + 4) / ACCEL_SCALE
return accel_x, accel_y, accel_z
# Function to read gyroscope data
def read_gyro(i2c):
gyro_x = read_word(i2c, GYRO_XOUT_H) / GYRO_SCALE
gyro_y = read_word(i2c, GYRO_XOUT_H + 2) / GYRO_SCALE
gyro_z = read_word(i2c, GYRO_XOUT_H + 4) / GYRO_SCALE
return gyro_x, gyro_y, gyro_z
# Function to send data to ThingSpeak Channel 1
def send_to_channel_1(temperature, humidity, gyro_x, gyro_y, gyro_z, accel_x, accel_y, accel_z):
try:
url = "https://api.thingspeak.com/update"
payload = {
"api_key": THINGSPEAK_CHANNEL_1_WRITE_API_KEY,
"field1": temperature,
"field2": humidity,
"field3": gyro_x,
"field4": gyro_y,
"field5": gyro_z,
"field6": accel_x,
"field7": accel_y,
"field8": accel_z,
}
response = urequests.post(url, json=payload)
print("\nChannel 1 Response:", response.text, "\n")
response.close()
except Exception as e:
print("Error sending to Channel 1:", e)
# Function to send accelerometer data to ThingSpeak Channel 2
def send_to_channel_2(accel_x, accel_y, accel_z):
try:
url = "https://api.thingspeak.com/update"
payload = {
"api_key": THINGSPEAK_CHANNEL_2_WRITE_API_KEY,
"field1": accel_x,
"field2": accel_y,
"field3": accel_z,
}
response = urequests.post(url, json=payload)
print("\nChannel 2 Response:", response.text, "\n")
response.close()
except Exception as e:
print("Error sending to Channel 2:", e)
# Function to initialize Wi-Fi
def init_wifi():
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
wifi.connect(WIFI_SSID, WIFI_PASSWORD)
while not wifi.isconnected():
time.sleep(1)
print("Connected to Wi-Fi!")
print(wifi.ifconfig())
# Main function
def main():
init_wifi()
i2c = init_i2c()
init_mpu6050(i2c)
while True:
try:
# Read DHT22 sensor
dht_sensor.measure()
temperature = dht_sensor.temperature()
humidity = dht_sensor.humidity()
# Read MPU6050 data
accel_x, accel_y, accel_z = read_accel(i2c)
gyro_x, gyro_y, gyro_z = read_gyro(i2c)
# Print data for debugging
print(f"Temperature: {temperature} °C, Humidity: {humidity} %")
print(f"Accel: X={accel_x:.2f}, Y={accel_y:.2f}, Z={accel_z:.2f}")
print(f"Gyro: X={gyro_x:.2f}, Y={gyro_y:.2f}, Z={gyro_z:.2f}")
# Send data to ThingSpeak Channel 1
send_to_channel_1(temperature, humidity, gyro_x, gyro_y, gyro_z, accel_x, accel_y, accel_z)
# Send accelerometer data to ThingSpeak Channel 2
send_to_channel_2(accel_x, accel_y, accel_z)
# Delay (ThingSpeak rate limit is 15 seconds)
time.sleep(16)
except Exception as e:
print("Error in main loop:", e)
if __name__ == "__main__":
main()