from w1thermsensor import W1ThermSensorError
import time
import spidev # For MCP3008 ADC
# Initialize the DS18B20 sensor
try:
temp_sensor = W1ThermSensor()
print("DS18B20 sensor initialized successfully.")
except W1ThermSensorError as e:
print(f"Error initializing DS18B20 sensor: {e}")
temp_sensor = None
# Initialize SPI for MCP3008
spi = spidev.SpiDev()
spi.open(0, 0) # SPI bus 0, device 0
spi.max_speed_hz = 1350000
# Function to read analog data from MCP3008
def read_adc(channel):
if channel < 0 or channel > 7:
raise ValueError("ADC channel must be between 0 and 7.")
adc = spi.xfer2([1, (8 + channel) << 4, 0])
data = ((adc[1] & 3) << 8) + adc[2]
return data
try:
print("Starting sensor data collection...")
while True:
# DS18B20: Read temperature if sensor is initialized
if temp_sensor:
try:
temperature_c = temp_sensor.get_temperature()
temperature_f = (temperature_c * 9/5) + 32
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Temperature: {temperature_c:.2f} °C / {temperature_f:.2f} °F")
except W1ThermSensorError as e:
print(f"Error reading temperature: {e}")
# MCP3008: Read heartbeat sensor data from channel 0
try:
heartbeat_value = read_adc(0)
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Heartbeat Sensor Value: {heartbeat_value}")
except Exception as e:
print(f"Error reading heartbeat sensor: {e}")
print("------------------------")
time.sleep(1) # Delay for 1 second
except KeyboardInterrupt:
print("Program terminated by user.")
except Exception as e:
print(f"Unexpected error: {e}")
finally:
spi.close()
print("SPI connection closed.")