from machine import ADC, Pin, PWM, I2C
import network
import dht
import time
import urequests
from pico_i2c_lcd import I2cLcd
# Initialize ADC for your device
# Example for ESP32: ADC pin 32
# For ESP8266, you might use ADC(0), check your board's pinout
heart_rate_adc = ADC(Pin(35))
heart_rate_adc.atten(ADC.ATTN_6DB) # Configure the attenuation for full range of 0-3.3V
O2_adc = ADC(Pin(34))
O2_adc.atten(ADC.ATTN_6DB) # Adjust based on your ESP model
temp_sensor = dht.DHT22(Pin(13))
buzzer = PWM(Pin(12), freq=440, duty=0)
# Health issue thresholds
temp_threshold_high = 37.2 # Example threshold in Celsius
temp_threshold_low = 35.5
o2_threshold = 94 # Example O2 saturation percentage threshold
heart_rate_threshold_high = 150 # Example high heart rate threshold
heart_rate_threshold_low = 50 # Example low heart rate threshold
I2C_ADDR = 0x27
I2C_NUM_ROWS = 4
I2C_NUM_COLS = 20
i2c = I2C(1, sda=Pin(21), scl=Pin(22), freq=100000)
lcd = I2cLcd(i2c, I2C_ADDR, I2C_NUM_ROWS, I2C_NUM_COLS)
HTTP_HEADERS = {'Content-Type': 'application/json'}
THINGSPEAK_WRITE_API_KEY = 'BP0WJ6XNBXXU3JL3' # Write API of the Channel
# Connect to WiFi
print("Connecting to WiFi", end="")
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('Wokwi-GUEST', '')
while not sta_if.isconnected():
print(".", end="")
time.sleep(0.1)
print(" Connected!")
def read_simulated_heart_rate():
pot_value = heart_rate_adc.read()
# Map the ADC value (0-4095 for ESP32) to a heart rate range (60-100 BPM)
# Adjust the range (0-4095) based on your ADC resolution
heart_rate = pot_value * (220 - 30) // 4095 + 40
return heart_rate
def read_spo2():
pot_value = O2_adc.read()
# Map the ADC value to an SpO2 percentage range
spo2 = pot_value * (100 - 90) / 4095 + 90 # Adjust the mapping based on ADC resolution
return spo2
def check_health(temperature, oxygen_saturation, heart_rate):
severity = 0
if temperature >= temp_threshold_high or temperature <= temp_threshold_low:
severity += 1
if oxygen_saturation < o2_threshold:
severity += 1
if heart_rate > heart_rate_threshold_high or heart_rate < heart_rate_threshold_low:
severity += 1
return severity
def alert_severity(severity, temperature, oxygen_saturation, heart_rate):
"""Activate buzzer with different patterns based on severity."""
if severity == 0:
buzzer.duty(0) # No issues detected, buzzer off
else:
buzzer.duty(512) # Set buzzer on
# Vary the buzzer tone duration based on severity
if severity == 1:
time.sleep(0.1) # Short beep for low severity
elif severity == 2:
time.sleep(0.5) # Longer beep for moderate severity
elif severity >= 3:
for _ in range(3): # Three beeps for high severity
buzzer.duty(512)
time.sleep(0.2)
buzzer.duty(0)
time.sleep(0.2)
buzzer.duty(0) # Turn off the buzzer after alert
display_health_issues(temperature, oxygen_saturation, heart_rate)
def display_health_issues(temperature, oxygen_saturation, heart_rate):
issues = []
if temperature >= temp_threshold_high:
issues.append("High Temp")
elif temperature <= temp_threshold_low:
issues.append("Low Temp")
if oxygen_saturation < o2_threshold:
issues.append("Low O2")
if heart_rate > heart_rate_threshold_high:
issues.append("High HR")
elif heart_rate < heart_rate_threshold_low:
issues.append("Low HR")
# Clear the LCD and display the issues
lcd.clear()
if not issues: # If there are no issues, display all parameters are normal
lcd.putstr("All parameters\nnormal")
else:
lcd.putstr("Issues detected:\n")
for issue in issues[:2]: # Display first two issues on the second line
lcd.putstr(issue + " ")
if len(issues) > 2: # Check if there are more than two issues to display
lcd.putstr("\n") # Move to the next line on the LCD
for issue in issues[2:4]: # Display the rest of the issues
lcd.putstr(issue + " ")
while True:
simulated_heart_rate = read_simulated_heart_rate()
print('Simulated Heart Rate: {} BPM'.format(simulated_heart_rate))
temp_sensor.measure() # Trigger a measurement
temp = temp_sensor.temperature() # Get temperature value
# Print the temperature (°C)
print('Patient Temperature: {:.2f} °C'.format(temp))
spo2_simulated = read_spo2()
print('Simulated SpO2: {:.2f}%'.format(spo2_simulated))
health_severity = check_health(temp, spo2_simulated, simulated_heart_rate)
alert_severity(health_severity, temp, spo2_simulated, simulated_heart_rate)
dht_readings = {'field1':temp, 'field2':spo2_simulated, 'field3':simulated_heart_rate, 'field4':health_severity}
request = urequests.post( 'http://api.thingspeak.com/update?api_key=' + THINGSPEAK_WRITE_API_KEY, json = dht_readings, headers = HTTP_HEADERS )
request.close()
print(dht_readings)
print(" Msg sent to Thingspeak channel successfully...")
print(" ********************************************")
time.sleep(15) # Update every second