"""
Multi-Zone Cold Storage Monitor - Raspberry Pi Pico
Author: VIKRAM KADAM
Pharmaceutical Distribution Center Monitoring
3 Zones: Vaccines, Insulin, Biologics
"""
import machine
import utime
import onewire
import ds18x20
import dht
from machine import Pin, I2C
# Try to import OLED library, fallback if not available
try:
import ssd1306
OLED_AVAILABLE = True
except ImportError:
OLED_AVAILABLE = False
print("⚠️ OLED library not available - using console display")
class MultiZoneColdStorage:
def __init__(self):
print("🏭 MULTI-ZONE COLD STORAGE MONITOR v3.1")
print("="*50)
print("Pharmaceutical Distribution Center")
print("Author: VIKRAM KADAM")
print("="*50)
# Zone specifications
self.zones = {
'zone1': {
'name': 'Vaccines',
'min_temp': 2.0,
'max_temp': 8.0,
'priority': 'CRITICAL',
'gpio': 0,
'led_gpio': 17
},
'zone2': {
'name': 'Insulin',
'min_temp': -2.0,
'max_temp': 4.0,
'priority': 'HIGH',
'gpio': 1,
'led_gpio': 18
},
'zone3': {
'name': 'Biologics',
'min_temp': -25.0,
'max_temp': -15.0,
'priority': 'SPECIALIZED',
'gpio': 2,
'led_gpio': 19
}
}
self.initialize_hardware()
self.data_log = []
self.system_uptime = utime.time()
def initialize_hardware(self):
"""Initialize all hardware components"""
print("🔧 Initializing Multi-Zone Hardware...")
# Temperature sensors for each zone
self.zone_sensors = {}
for zone_id, config in self.zones.items():
try:
ow = onewire.OneWire(Pin(config['gpio']))
ds = ds18x20.DS18X20(ow)
utime.sleep_ms(1000) # Wait for sensor initialization
roms = ds.scan()
# If no sensors found, try again
if not roms:
utime.sleep_ms(2000)
roms = ds.scan()
self.zone_sensors[zone_id] = {'ds': ds, 'roms': roms}
print(f" ✓ {config['name']}: {len(roms)} sensor(s) on GP{config['gpio']}")
except Exception as e:
print(f" ❌ {config['name']}: Sensor error - {e}")
self.zone_sensors[zone_id] = {'ds': None, 'roms': []}
# Ambient sensor
try:
self.ambient_sensor = dht.DHT22(Pin(15))
print(" ✓ Ambient DHT22: GP15")
except Exception as e:
print(f" ❌ Ambient sensor error: {e}")
self.ambient_sensor = None
# LEDs for each zone
self.zone_leds = {}
for zone_id, config in self.zones.items():
try:
self.zone_leds[zone_id] = Pin(config['led_gpio'], Pin.OUT)
print(f" ✓ {config['name']} LED: GP{config['led_gpio']}")
except Exception as e:
print(f" ❌ {config['name']} LED error: {e}")
# Control outputs
try:
self.emergency_relay = Pin(16, Pin.OUT)
self.alarm_buzzer = Pin(22, Pin.OUT)
self.reset_button = Pin(14, Pin.IN, Pin.PULL_UP)
print(" ✓ Control outputs initialized")
except Exception as e:
print(f" ❌ Control output error: {e}")
# I2C and OLED display
try:
self.i2c = I2C(0, sda=Pin(20), scl=Pin(21), freq=400000)
devices = self.i2c.scan()
print(f" ✓ I2C initialized: {len(devices)} device(s) found")
if OLED_AVAILABLE and devices:
# Try to initialize OLED
self.oled = ssd1306.SSD1306_I2C(128, 64, self.i2c)
self.oled.fill(0)
self.oled.text("Multi-Zone Monitor", 0, 0)
self.oled.text("VIKRAM KADAM", 0, 20)
self.oled.text("Initializing...", 0, 40)
self.oled.show()
print(" ✓ OLED display ready")
else:
self.oled = None
print(" ⚠️ OLED not available - using console")
except Exception as e:
print(f" ❌ I2C/OLED error: {e}")
self.oled = None
print("✅ Multi-Zone System Ready")
def read_zone_temperature(self, zone_id):
"""Read temperature from specific zone with simulation fallback"""
try:
zone_info = self.zone_sensors[zone_id]
config = self.zones[zone_id]
if zone_info['ds'] and zone_info['roms']:
zone_info['ds'].convert_temp()
utime.sleep_ms(750)
temp = zone_info['ds'].read_temp(zone_info['roms'][0])
if temp != 85.0 and temp != -127.0 and -50 <= temp <= 50:
return temp
# Simulation fallback for Wokwi (realistic temperatures)
sim_temps = {
'zone1': 5.2, # Vaccines: 2-8°C
'zone2': 1.8, # Insulin: -2-4°C
'zone3': -20.1 # Biologics: -25--15°C
}
return sim_temps.get(zone_id, 25.0)
except Exception as e:
print(f" ❌ Error reading {zone_id}: {e}")
return None
def read_all_temperatures(self):
"""Read temperatures from all zones plus ambient conditions"""
readings = {
'timestamp': utime.time(),
'zones': {},
'ambient': {},
'system_status': 'NORMAL'
}
print(f"\n📊 Multi-Zone Temperature Reading Cycle")
print("-" * 40)
# Read each zone
for zone_id, config in self.zones.items():
temp = self.read_zone_temperature(zone_id)
readings['zones'][zone_id] = {
'name': config['name'],
'temperature': temp,
'min_temp': config['min_temp'],
'max_temp': config['max_temp'],
'priority': config['priority']
}
if temp is not None:
print(f" 🌡️ {config['name']}: {temp:.1f}°C [{config['min_temp']}°C to {config['max_temp']}°C]")
else:
print(f" ❌ {config['name']}: SENSOR ERROR")
# Read ambient conditions
if self.ambient_sensor:
try:
self.ambient_sensor.measure()
ambient_temp = self.ambient_sensor.temperature()
ambient_humidity = self.ambient_sensor.humidity()
readings['ambient'] = {
'temperature': ambient_temp,
'humidity': ambient_humidity
}
print(f" 🌍 Ambient: {ambient_temp:.1f}°C, {ambient_humidity:.1f}%RH")
except Exception as e:
print(f" ⚠️ Ambient sensor error: {e}")
readings['ambient'] = {'temperature': None, 'humidity': None}
print("-" * 40)
return readings
def analyze_multi_zone_status(self, readings):
"""Comprehensive multi-zone analysis with priority-based alerts"""
analysis = {
'overall_status': 'NORMAL',
'critical_zones': [],
'warning_zones': [],
'offline_zones': [],
'emergency_protocol': False,
'priority_alerts': []
}
critical_count = 0
for zone_id, zone_data in readings['zones'].items():
temp = zone_data['temperature']
priority = zone_data['priority']
name = zone_data['name']
if temp is None:
# Sensor offline
analysis['offline_zones'].append(zone_id)
analysis['priority_alerts'].append(f"{name} sensor offline")
if priority in ['CRITICAL', 'HIGH']:
critical_count += 1
elif temp < zone_data['min_temp'] or temp > zone_data['max_temp']:
# Temperature out of range
if priority == 'CRITICAL':
analysis['critical_zones'].append(zone_id)
critical_count += 1
else:
analysis['warning_zones'].append(zone_id)
analysis['priority_alerts'].append(f"{name}: {temp:.1f}°C out of range")
# Determine overall system status
if critical_count >= 2 or len(analysis['offline_zones']) >= 2:
analysis['overall_status'] = 'EMERGENCY'
analysis['emergency_protocol'] = True
elif critical_count >= 1:
analysis['overall_status'] = 'CRITICAL'
elif len(analysis['warning_zones']) >= 1:
analysis['overall_status'] = 'WARNING'
return analysis
def control_multi_zone_outputs(self, analysis):
"""Control LEDs, alarms, and emergency systems based on analysis"""
# Control individual zone LEDs
for zone_id, led in self.zone_leds.items():
if zone_id in analysis['critical_zones']:
led.value(1) # Critical zones - LED on
elif zone_id in analysis['warning_zones']:
led.value(1) # Warning zones - LED on
elif zone_id in analysis['offline_zones']:
led.value(0) # Offline zones - LED off
else:
led.value(1) # Normal zones - LED on
# Emergency relay control
self.emergency_relay.value(1 if analysis['emergency_protocol'] else 0)
# Alarm control
if analysis['emergency_protocol']:
print("🆘 EMERGENCY PROTOCOL ACTIVATED")
self.trigger_emergency_alarm()
elif analysis['overall_status'] in ['CRITICAL', 'WARNING']:
self.trigger_warning_alarm()
def trigger_emergency_alarm(self):
"""Emergency alarm sequence for critical multi-zone failures"""
print("🚨 EMERGENCY ALARM - MULTI-ZONE FAILURE")
for i in range(10):
self.alarm_buzzer.value(1)
utime.sleep_ms(200)
self.alarm_buzzer.value(0)
utime.sleep_ms(100)
def trigger_warning_alarm(self):
"""Warning alarm for single zone issues"""
for i in range(3):
self.alarm_buzzer.value(1)
utime.sleep_ms(300)
self.alarm_buzzer.value(0)
utime.sleep_ms(200)
def update_oled_display(self, readings, analysis):
"""Update OLED display with zone information"""
if not self.oled:
return
try:
# Clear display
self.oled.fill(0)
# Title
self.oled.text("COLD STORAGE", 0, 0)
self.oled.text("Multi-Zone Monitor", 0, 10)
# Zone information
y_pos = 25
for zone_id, zone_data in readings['zones'].items():
temp = zone_data['temperature']
name = zone_data['name'][:8] # Truncate for display
if temp is not None:
temp_str = f"{temp:.1f}C"
else:
temp_str = "ERR"
# Status icon
if zone_id in analysis['critical_zones']:
icon = "!"
elif zone_id in analysis['warning_zones']:
icon = "?"
elif zone_id in analysis['offline_zones']:
icon = "X"
else:
icon = "✓"
display_text = f"{icon}{name}: {temp_str}"
self.oled.text(display_text, 0, y_pos)
y_pos += 10
# System status
status_text = analysis['overall_status'][:10] # Truncate
self.oled.text(f"Status: {status_text}", 0, 55)
# Update display
self.oled.show()
except Exception as e:
print(f"OLED Error: {e}")
def create_console_display(self, readings, analysis):
"""Create enhanced console display as OLED alternative"""
print("\n" + "┌" + "─"*52 + "┐")
print("│" + " "*10 + "PHARMACEUTICAL COLD STORAGE" + " "*14 + "│")
print("│" + " "*15 + "Multi-Zone Monitor" + " "*19 + "│")
print("├" + "─"*52 + "┤")
for zone_id, zone_data in readings['zones'].items():
temp = zone_data['temperature']
name = zone_data['name']
# Status icon
if zone_id in analysis['critical_zones']:
icon = "🔴"
status_text = "CRITICAL"
elif zone_id in analysis['warning_zones']:
icon = "🟡"
status_text = "WARNING"
elif zone_id in analysis['offline_zones']:
icon = "⚫"
status_text = "OFFLINE"
else:
icon = "🟢"
status_text = "NORMAL"
if temp is not None:
temp_display = f"{temp:6.1f}°C"
else:
temp_display = " ERROR "
line = f"│ {icon} {name:<10}: {temp_display} | {status_text:<8} │"
print(line)
print("├" + "─"*52 + "┤")
# Ambient conditions
ambient = readings['ambient']
if ambient.get('temperature') is not None:
amb_line = f"│ 🌍 Ambient: {ambient['temperature']:5.1f}°C, {ambient['humidity']:4.1f}%RH" + " "*15 + "│"
print(amb_line)
print("├" + "─"*52 + "┤")
# System status
status_color = "🆘" if analysis['emergency_protocol'] else "✅"
status_line = f"│ {status_color} System Status: {analysis['overall_status']:<20}" + " "*13 + "│"
print(status_line)
print("└" + "─"*52 + "┘")
def log_system_data(self, readings, analysis):
"""Log system data for compliance"""
timestamp = utime.localtime(readings['timestamp'])
log_entry = {
'timestamp': f"{timestamp[0]}-{timestamp[1]:02d}-{timestamp[2]:02d} {timestamp[3]:02d}:{timestamp[4]:02d}:{timestamp[5]:02d}",
'system_status': analysis['overall_status'],
'emergency_active': analysis['emergency_protocol'],
'zones': readings['zones'],
'ambient': readings['ambient'],
'alerts': len(analysis['priority_alerts'])
}
self.data_log.append(log_entry)
# Maintain log size
if len(self.data_log) > 100:
self.data_log.pop(0)
print(f"📝 Data logged: Entry #{len(self.data_log)}")
def export_compliance_csv(self):
"""Export compliance data in CSV format"""
print("\n📋 REGULATORY COMPLIANCE REPORT")
print("=" * 50)
print("Multi-Zone Pharmaceutical Cold Storage")
print("Generated by: VIKRAM KADAM")
print("=" * 50)
# CSV Header
print("Timestamp,System_Status,Emergency,Zone1_Temp,Zone2_Temp,Zone3_Temp,Ambient_Temp,Ambient_Humidity,Alert_Count")
# Export recent data (last 10 entries)
for entry in self.data_log[-10:]:
zones = entry['zones']
ambient = entry['ambient']
zone1_temp = zones.get('zone1', {}).get('temperature', 'N/A')
zone2_temp = zones.get('zone2', {}).get('temperature', 'N/A')
zone3_temp = zones.get('zone3', {}).get('temperature', 'N/A')
ambient_temp = ambient.get('temperature', 'N/A')
ambient_hum = ambient.get('humidity', 'N/A')
csv_line = f"{entry['timestamp']},{entry['system_status']},{entry['emergency_active']},{zone1_temp},{zone2_temp},{zone3_temp},{ambient_temp},{ambient_hum},{entry['alerts']}"
print(csv_line)
print("=" * 50)
print(f"Total Records: {len(self.data_log)}")
print("Report Complete\n")
def run_multi_zone_monitoring(self):
"""Main monitoring system loop"""
print("\n🚀 Starting Multi-Zone Monitoring System...")
print("Pharmaceutical Distribution Center")
print("Press CTRL+C to stop monitoring")
print("-" * 50)
cycle_count = 0
try:
while True:
cycle_count += 1
print(f"\n🔄 MONITORING CYCLE #{cycle_count}")
# Read all zone temperatures
readings = self.read_all_temperatures()
# Analyze multi-zone system
analysis = self.analyze_multi_zone_status(readings)
# Control outputs and alarms
self.control_multi_zone_outputs(analysis)
# Update displays
self.update_oled_display(readings, analysis)
self.create_console_display(readings, analysis)
# Log data for compliance
self.log_system_data(readings, analysis)
# Generate compliance report every 10 cycles
if cycle_count % 10 == 0:
self.export_compliance_csv()
# Check for emergency reset
if not self.reset_button.value():
print("\n🔄 Emergency reset button pressed - clearing alarms")
self.emergency_relay.value(0)
utime.sleep(2)
# Wait for next cycle
print("\n⏳ Next monitoring cycle in 30 seconds...")
utime.sleep(30) # 30-second cycles for demo
except KeyboardInterrupt:
print("\n🛑 Monitoring stopped by user")
self.shutdown_system()
except Exception as e:
print(f"\n❌ Critical system error: {e}")
print("🆘 Activating emergency mode...")
self.emergency_shutdown()
def shutdown_system(self):
"""Safe system shutdown"""
print("🔧 Shutting down Multi-Zone System...")
# Turn off all outputs
self.emergency_relay.value(0)
self.alarm_buzzer.value(0)
for led in self.zone_leds.values():
led.value(0)
# Clear OLED
if self.oled:
self.oled.fill(0)
self.oled.text("System Shutdown", 0, 25)
self.oled.text("VIKRAM KADAM", 0, 35)
self.oled.show()
print("✅ System shutdown complete")
print(f"Final data log entries: {len(self.data_log)}")
def emergency_shutdown(self):
"""Emergency system shutdown with alarms"""
print("🆘 EMERGENCY SYSTEM SHUTDOWN")
# Activate emergency relay
self.emergency_relay.value(1)
# Continuous alarm
for i in range(20):
self.alarm_buzzer.value(1)
utime.sleep_ms(250)
self.alarm_buzzer.value(0)
utime.sleep_ms(250)
# =================== MAIN EXECUTION ===================
if __name__ == "__main__":
print("🏭 MULTI-ZONE PHARMACEUTICAL COLD STORAGE")
print("Raspberry Pi Pico Implementation")
print("Regulatory Compliant Monitoring System")
print("Author: VIKRAM KADAM")
print("\n" + "="*60)
# Initialize and start the monitoring system
storage_system = MultiZoneColdStorage()
storage_system.run_multi_zone_monitoring()