"""
Industrial Air Safety System for Wokwi.com (ESP32)
- Toxic gas monitoring (MQ2 + CO emulation)
- Action-oriented: ventilation trigger, evacuation alerts, escalation logic
- Quantitative gas concentration estimation (AOUT + Ro calibration)
- Distributed fault awareness with structured MQTT fault levels
- Proper QoS semantics (umqtt.robust)
- Multi-device naming: industry/site1/device01/...
- Risk score + recommended_action instead of raw data
- Event-driven (interrupts for gas detection)
- Network failure tolerance (local queue with store-and-forward)
1. Go to http://www.hivemq.com/demos/websocket-client/.
2. Connect to mqtt-dashboard.com : 8884
3. Client ID: Use default
4. Subscribe to all topics via: industry/site1/device01/#
5. Risk scores, actions, escalation level: industry/site1/device01/status
6. Escalation alerts (HIGH_RISK, EVACUATION): industry/site1/device01/aler
7. System faults (sensor failures, etc.): industry/site1/device01/fault
8. Raw sensor data (temp, humidity, gas ppm): industry/site1/device01/telemetry
"""
"""
Industrial Air Safety System - PROFESSIONAL MQ-2 CALIBRATION
Based on Sandbox Electronics MQ-2 Library
Supports: LPG, CO, SMOKE detection with proper logarithmic curves
"""
import network
import time
import json
from machine import Pin, SoftI2C, ADC, PWM
import dht
from umqtt.robust import MQTTClient
from lcd_api import LcdApi
from i2c_lcd import I2cLcd
import math
# ========== MQ-2 GAS SENSOR CALIBRATION CONSTANTS ==========
MQ_PIN = 34
RL_VALUE = 10.0
RO_CLEAN_AIR_FACTOR = 9.83
CALIBRATION_SAMPLE_TIMES = 50
CALIBRATION_SAMPLE_INTERVAL = 0.5
READ_SAMPLE_TIMES = 5
READ_SAMPLE_INTERVAL = 0.05
# Gas curves
LPG_CURVE = [2.3, 0.21, -0.47]
CO_CURVE = [2.3, 0.72, -0.34]
SMOKE_CURVE = [2.3, 0.53, -0.44]
GAS_LPG = 0
GAS_CO = 1
GAS_SMOKE = 2
# ========== SYSTEM CONFIGURATION ==========
SITE_ID = "site1"
DEVICE_ID = "device01"
BASE_TOPIC = f"industry/{SITE_ID}/{DEVICE_ID}"
# MQTT Configuration
MQTT_BROKER = "mqtt-dashboard.com"
MQTT_PORT = 1883
MQTT_CLIENT_ID = f"airsafety-{SITE_ID}-{DEVICE_ID}"
# MQTT Topics
TOPIC_STATUS = f"{BASE_TOPIC}/status"
TOPIC_ALERT = f"{BASE_TOPIC}/alert"
TOPIC_FAULT = f"{BASE_TOPIC}/fault"
TOPIC_TELEMETRY = f"{BASE_TOPIC}/telemetry"
TOPIC_GAS_DETAILED = f"{BASE_TOPIC}/gas/detailed"
TOPIC_OCCUPANCY = f"{BASE_TOPIC}/occupancy" # New topic for people detection
# QoS Levels
QOS_EXACTLY_ONCE = 1
QOS_AT_LEAST_ONCE = 1
QOS_AT_MOST_ONCE = 0
# ========== HARDWARE PINS ==========
PIN_DHT22 = 15
PIN_MQ2_AOUT = 34
PIN_MQ2_DOUT = 25
PIN_PIR = 12 # PIR motion sensor
PIN_LED_GREEN = 17
PIN_LED_YELLOW = 16
PIN_LED_RED = 4
PIN_BUZZER = 13
PIN_VENT_FAN = 14
PIN_SCL = 27
PIN_SDA = 26
# ========== SAFETY THRESHOLDS ==========
TEMP_CRITICAL = 55.0
TEMP_WARNING = 40.0
HUMIDITY_WARNING = 70.0
HUMIDITY_CRITICAL = 85.0
# Gas safety thresholds (ppm)
GAS_THRESHOLDS = {
"LPG": {"warning": 1000, "critical": 2000, "evacuation": 3000},
"CO": {"warning": 50, "critical": 100, "evacuation": 200},
"SMOKE": {"warning": 300, "critical": 600, "evacuation": 1000}
}
# Risk multiplier when people are present
OCCUPANCY_RISK_MULTIPLIER = 1.5 # 50% higher risk when people detected
ESCALATION_LEVELS = {0: "NORMAL", 1: "WARNING", 2: "HIGH_RISK", 3: "EVACUATION"}
FAULT_LEVELS = {0: "OK", 1: "MINOR", 2: "MAJOR", 3: "CRITICAL"}
# Network failure tolerance
LOCAL_QUEUE_MAX = 100
message_queue = []
# ========== HARDWARE INITIALIZATION ==========
print("\n" + "="*60)
print("INDUSTRIAL AIR SAFETY SYSTEM - WITH OCCUPANCY DETECTION")
print("="*60)
# LEDs
led_green = Pin(PIN_LED_GREEN, Pin.OUT)
led_yellow = Pin(PIN_LED_YELLOW, Pin.OUT)
led_red = Pin(PIN_LED_RED, Pin.OUT)
# Actuators
buzzer = PWM(Pin(PIN_BUZZER), freq=2000, duty=0)
vent_fan = Pin(PIN_VENT_FAN, Pin.OUT)
# Sensors
dht_sensor = dht.DHT22(Pin(PIN_DHT22))
mq2_adc = ADC(Pin(PIN_MQ2_AOUT))
mq2_adc.atten(ADC.ATTN_11DB)
mq2_digital = Pin(PIN_MQ2_DOUT, Pin.IN)
pir_sensor = Pin(PIN_PIR, Pin.IN) # PIR motion sensor
# LCD
try:
i2c = SoftI2C(scl=Pin(PIN_SCL), sda=Pin(PIN_SDA), freq=400000)
lcd = I2cLcd(i2c, 0x27, 2, 16)
lcd_ok = True
print("LCD initialized")
except:
lcd_ok = False
print("LCD not found")
# ========== PIR OCCUPANCY TRACKING ==========
people_present = False
last_motion_time = 0
MOTION_TIMEOUT = 30 # Seconds before considering area empty
def pir_interrupt_handler(pin):
"""Triggered when PIR detects motion"""
global people_present, last_motion_time
people_present = True
last_motion_time = time.time()
print("\n[PIR] 👤 PEOPLE DETECTED in the area! 👤")
# Attach interrupt to PIR sensor (rising edge when motion detected)
pir_sensor.irq(trigger=Pin.IRQ_RISING, handler=pir_interrupt_handler)
# ========== HARDWARE INTERRUPT (Gas detection) ==========
gas_triggered = False
last_interrupt_time = 0
def gas_interrupt_handler(pin):
"""Immediate action on gas detection"""
global gas_triggered, last_interrupt_time
now = time.ticks_ms()
if time.ticks_diff(now, last_interrupt_time) > 500:
gas_triggered = True
last_interrupt_time = now
vent_fan.value(1)
print("\n[INTERRUPT] ⚠ GAS DETECTED! Ventilation activated ⚠")
mq2_digital.irq(trigger=Pin.IRQ_FALLING, handler=gas_interrupt_handler)
# ========== OCCUPANCY STATUS FUNCTION ==========
def update_occupancy_status():
"""Update people_present based on motion timeout"""
global people_present
if people_present and (time.time() - last_motion_time > MOTION_TIMEOUT):
people_present = False
print("[PIR] Area is now EMPTY (no motion for {} seconds)".format(MOTION_TIMEOUT))
return people_present
# ========== MQ-2 CALIBRATION FUNCTIONS ==========
def mq_resistance_calculation(raw_adc):
if raw_adc == 0:
return float('inf')
voltage = (raw_adc / 4095.0) * 3.3
if voltage < 3.3:
rs = (3.3 - voltage) / voltage * RL_VALUE
else:
rs = 100000
return rs
def mq_calibration():
print("\n[Calibration] MQ-2 Sensor Calibration Starting...")
print("[Calibration] Ensure sensor is in CLEAN AIR!")
val = 0.0
for i in range(CALIBRATION_SAMPLE_TIMES):
adc_value = mq2_adc.read()
rs = mq_resistance_calculation(adc_value)
val += rs
print(f" Sample {i+1}: ADC={adc_value:4d} RS={rs:.2f}kΩ")
time.sleep(CALIBRATION_SAMPLE_INTERVAL)
val = val / CALIBRATION_SAMPLE_TIMES
ro = val / RO_CLEAN_AIR_FACTOR
print(f"[Calibration] Complete! Ro = {ro:.2f}kΩ\n")
return ro
def mq_read(ro):
rs = 0.0
for i in range(READ_SAMPLE_TIMES):
adc_value = mq2_adc.read()
rs += mq_resistance_calculation(adc_value)
time.sleep(READ_SAMPLE_INTERVAL)
rs = rs / READ_SAMPLE_TIMES
return rs / ro
def mq_get_percentage(rs_ro_ratio, curve):
if rs_ro_ratio <= 0:
return 0
log_ratio = math.log10(rs_ro_ratio)
log_ppm = ((log_ratio - curve[1]) / curve[2]) + curve[0]
ppm = math.pow(10, log_ppm)
return int(max(0, min(ppm, 10000)))
def mq_get_gas_percentage(rs_ro_ratio, gas_id):
if gas_id == GAS_LPG:
return mq_get_percentage(rs_ro_ratio, LPG_CURVE)
elif gas_id == GAS_CO:
return mq_get_percentage(rs_ro_ratio, CO_CURVE)
elif gas_id == GAS_SMOKE:
return mq_get_percentage(rs_ro_ratio, SMOKE_CURVE)
return 0
def read_all_gases(ro):
rs_ro_ratio = mq_read(ro)
gas_data = {
"lpg_ppm": mq_get_gas_percentage(rs_ro_ratio, GAS_LPG),
"co_ppm": mq_get_gas_percentage(rs_ro_ratio, GAS_CO),
"smoke_ppm": mq_get_gas_percentage(rs_ro_ratio, GAS_SMOKE),
"rs_ro_ratio": rs_ro_ratio
}
gases = [
(gas_data["lpg_ppm"], "LPG"),
(gas_data["co_ppm"], "CO"),
(gas_data["smoke_ppm"], "SMOKE")
]
gas_data["max_ppm"], gas_data["primary_gas"] = max(gases, key=lambda x: x[0])
return gas_data
# ========== RISK ASSESSMENT WITH OCCUPANCY ==========
def assess_risk(temp, humidity, gas_data, people_present_flag):
"""Calculate risk level - OCCUPANCY increases risk severity"""
escalation = 0
faults = []
primary_hazard = None
# Temperature check
if temp > TEMP_CRITICAL:
escalation = max(escalation, 3)
faults.append(f"TEMP_CRITICAL: {temp:.1f}°C")
elif temp > TEMP_WARNING:
escalation = max(escalation, 1)
faults.append(f"TEMP_WARNING: {temp:.1f}°C")
# Gas checks
gas_checks = [
(gas_data["lpg_ppm"], "LPG", GAS_THRESHOLDS["LPG"]),
(gas_data["co_ppm"], "CO", GAS_THRESHOLDS["CO"]),
(gas_data["smoke_ppm"], "SMOKE", GAS_THRESHOLDS["SMOKE"])
]
for ppm, gas_name, thresholds in gas_checks:
if ppm >= thresholds["evacuation"]:
escalation = max(escalation, 3)
faults.append(f"{gas_name}_EVACUATION: {ppm}ppm")
primary_hazard = gas_name
elif ppm >= thresholds["critical"]:
escalation = max(escalation, 2)
faults.append(f"{gas_name}_CRITICAL: {ppm}ppm")
if not primary_hazard:
primary_hazard = gas_name
elif ppm >= thresholds["warning"]:
escalation = max(escalation, 1)
faults.append(f"{gas_name}_WARNING: {ppm}ppm")
if not primary_hazard:
primary_hazard = gas_name
# Humidity check
if humidity > HUMIDITY_CRITICAL:
escalation = max(escalation, 2)
faults.append(f"HUMIDITY_CRITICAL: {humidity:.1f}%")
elif humidity > HUMIDITY_WARNING:
escalation = max(escalation, 1)
faults.append(f"HUMIDITY_WARNING: {humidity:.1f}%")
# *** OCCUPANCY IMPACT ON ESCALATION ***
# If people are present, escalate the response (can't evacuate empty area)
if people_present_flag:
if escalation >= 2:
# People present + hazard = immediate evacuation
escalation = max(escalation, 3)
faults.append("PEOPLE_PRESENT: Immediate evacuation required")
elif escalation == 1:
# People present + warning = escalate to high risk
escalation = 2
faults.append("PEOPLE_PRESENT: Escalated to HIGH_RISK")
# Add people presence to faults for awareness
if "PEOPLE_PRESENT" not in str(faults):
faults.append("PEOPLE_PRESENT in hazardous area")
# Determine action and risk score
if escalation == 3:
risk_score = 95
if people_present_flag:
action = "EVACUATE_PEOPLE_NOW"
faults.append("URGENT: People in evacuation zone")
else:
action = "EVACUATE_AREA"
elif escalation == 2:
risk_score = 70
if people_present_flag:
action = "WARN_PEOPLE_AND_VENTILATE"
else:
action = "VENTILATE_IMMEDIATELY"
elif escalation == 1:
risk_score = 40
action = "MONITOR_CLOSELY"
else:
risk_score = 10
action = "NORMAL_OPERATION"
# Apply risk multiplier if people present
if people_present_flag and risk_score > 10:
risk_score = min(100, int(risk_score * OCCUPANCY_RISK_MULTIPLIER))
return {
"escalation": escalation,
"level_name": ESCALATION_LEVELS[escalation],
"risk_score": risk_score,
"action": action,
"primary_hazard": primary_hazard or "NONE",
"people_present": people_present_flag,
"faults": faults
}
# ========== ACTUATOR CONTROL ==========
def set_actuators(risk_data):
"""Control LEDs, buzzer, and fan - enhanced for occupancy"""
led_green.off()
led_yellow.off()
led_red.off()
if risk_data["escalation"] == 3:
led_red.on()
# Pulsed buzzer for evacuation (more urgent when people present)
if risk_data["people_present"]:
buzzer.freq(3000) # Higher pitch for people present
buzzer.duty(512)
print("[ACTUATORS] 🔴 EVACUATION SIREN - PEOPLE DETECTED! 🔴")
else:
buzzer.duty(512)
print("[ACTUATORS] 🔴 RED LED | BUZZER | FAN - EVACUATE AREA 🔴")
vent_fan.on()
elif risk_data["escalation"] == 2:
led_yellow.on()
buzzer.duty(256)
vent_fan.on()
if risk_data["people_present"]:
print("[ACTUATORS] 🟡 WARNING: People present - Ventilating area 🟡")
else:
print("[ACTUATORS] 🟡 YELLOW LED | FAN - HIGH RISK 🟡")
elif risk_data["escalation"] == 1:
led_yellow.on()
buzzer.duty(0)
vent_fan.off()
print("[ACTUATORS] 🟡 YELLOW LED - WARNING 🟡")
else:
led_green.on()
buzzer.duty(0)
vent_fan.off()
status = "👤 People present" if risk_data["people_present"] else "Area empty"
print(f"[ACTUATORS] 🟢 GREEN LED - NORMAL ({status}) 🟢")
# ========== LCD UPDATE ==========
def update_lcd(temp, humidity, gas_data, risk_data):
if not lcd_ok:
return
try:
cycle = int(time.time()) % 4 # 4 screens now
lcd.clear()
if cycle == 0:
lcd.move_to(0, 0)
lcd.putstr(f"T:{temp:.1f}C H:{humidity:.0f}%")
lcd.move_to(0, 1)
people_char = "👤" if risk_data["people_present"] else "⬚"
lcd.putstr(f"Risk:{risk_data['risk_score']:.0f}% {people_char}")
elif cycle == 1:
lcd.move_to(0, 0)
lcd.putstr(f"LPG:{gas_data['lpg_ppm']:4d}")
lcd.move_to(0, 1)
lcd.putstr(f"CO:{gas_data['co_ppm']:4d}")
elif cycle == 2:
lcd.move_to(0, 0)
lcd.putstr(f"SMK:{gas_data['smoke_ppm']:4d}")
lcd.move_to(0, 1)
lcd.putstr(f"{risk_data['action'][:12]}")
else:
lcd.move_to(0, 0)
occupancy_text = "PEOPLE HERE!" if risk_data["people_present"] else "Area Empty"
lcd.putstr(occupancy_text[:14])
lcd.move_to(0, 1)
lcd.putstr(f"PIR: {'ACTIVE' if risk_data['people_present'] else 'IDLE'}")
except:
pass
# ========== FAULT DETECTION ==========
fault_state = {"level": 0, "count": 0}
def set_fault(level, message, component):
global fault_state
if level > fault_state["level"]:
fault_state = {
"level": level,
"message": message,
"component": component,
"timestamp": time.time(),
"count": fault_state["count"] + 1
}
fault_payload = json.dumps({
"fault_level": FAULT_LEVELS[level],
"code": level,
"component": component,
"message": message,
"action_required": "REPLACE" if level >= 2 else "MONITOR",
"count": fault_state["count"]
})
safe_publish(TOPIC_FAULT, fault_payload, qos=QOS_AT_LEAST_ONCE)
print(f"[FAULT] Level {level}: {component} - {message}")
# ========== LOCAL QUEUE ==========
def queue_message(topic, payload, qos):
if len(message_queue) < LOCAL_QUEUE_MAX:
message_queue.append({
"topic": topic,
"payload": payload,
"qos": qos,
"timestamp": time.time()
})
print(f"[QUEUE] Stored ({len(message_queue)}/{LOCAL_QUEUE_MAX})")
else:
message_queue.pop(0)
queue_message(topic, payload, qos)
def flush_queue(client):
if not client:
return
sent = 0
while message_queue and sent < 50:
msg = message_queue.pop(0)
try:
client.publish(msg["topic"], msg["payload"], qos=msg["qos"])
sent += 1
except:
message_queue.insert(0, msg)
break
if sent:
print(f"[QUEUE] Flushed {sent} messages")
# ========== MQTT ==========
mqtt_client = None
def connect_mqtt():
global mqtt_client
try:
mqtt_client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, port=MQTT_PORT, keepalive=60)
mqtt_client.connect()
print(f"[MQTT] Connected to {MQTT_BROKER}")
flush_queue(mqtt_client)
return True
except Exception as e:
print(f"[MQTT] Connection failed: {e}")
mqtt_client = None
return False
def safe_publish(topic, payload, qos=QOS_AT_LEAST_ONCE):
global mqtt_client
try:
if mqtt_client:
mqtt_client.publish(topic, payload, qos=qos)
return True
else:
queue_message(topic, payload, qos)
return False
except:
queue_message(topic, payload, qos)
return False
# ========== MQTT PUBLISHING WITH OCCUPANCY ==========
def publish_data(temp, humidity, gas_data, risk_data):
# Status message with occupancy
status_msg = json.dumps({
"risk_score": risk_data["risk_score"],
"status": risk_data["level_name"],
"escalation_level": risk_data["escalation"],
"recommended_action": risk_data["action"],
"primary_hazard": risk_data["primary_hazard"],
"people_present": risk_data["people_present"],
"confidence": 85,
"timestamp": time.time()
})
safe_publish(TOPIC_STATUS, status_msg, qos=QOS_EXACTLY_ONCE)
# Occupancy topic
occupancy_msg = json.dumps({
"people_detected": risk_data["people_present"],
"last_motion": last_motion_time if risk_data["people_present"] else 0,
"area_status": "OCCUPIED" if risk_data["people_present"] else "CLEAR"
})
safe_publish(TOPIC_OCCUPANCY, occupancy_msg, qos=QOS_AT_LEAST_ONCE)
# Detailed gas data
gas_msg = json.dumps({
"lpg_ppm": gas_data["lpg_ppm"],
"co_ppm": gas_data["co_ppm"],
"smoke_ppm": gas_data["smoke_ppm"],
"primary_gas": gas_data["primary_gas"],
"max_ppm": gas_data["max_ppm"],
"rs_ro_ratio": round(gas_data["rs_ro_ratio"], 4)
})
safe_publish(TOPIC_GAS_DETAILED, gas_msg, qos=QOS_AT_LEAST_ONCE)
# Telemetry
telemetry_msg = json.dumps({
"temperature_c": round(temp, 1),
"humidity_pct": round(humidity, 1),
"people_present": risk_data["people_present"],
"gases": {
"lpg_ppm": gas_data["lpg_ppm"],
"co_ppm": gas_data["co_ppm"],
"smoke_ppm": gas_data["smoke_ppm"]
}
})
safe_publish(TOPIC_TELEMETRY, telemetry_msg, qos=QOS_AT_MOST_ONCE)
# Alert on escalation
if risk_data["escalation"] >= 2:
alert_msg = json.dumps({
"alert": risk_data["level_name"],
"action": risk_data["action"],
"primary_hazard": risk_data["primary_hazard"],
"max_gas_ppm": gas_data["max_ppm"],
"temperature_c": round(temp, 1),
"people_present": risk_data["people_present"],
"requires_evacuation": (risk_data["escalation"] >= 3)
})
safe_publish(TOPIC_ALERT, alert_msg, qos=QOS_EXACTLY_ONCE)
# ========== WIFI ==========
def connect_wifi():
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('Wokwi-GUEST', '')
print("Connecting to WiFi", end="")
for i in range(30):
if sta_if.isconnected():
print("\nWiFi connected!")
return True
print(".", end="")
time.sleep(0.5)
print("\nWiFi timeout")
return False
# ========== MAIN LOOP ==========
def main():
global gas_triggered
wifi_ok = connect_wifi()
if wifi_ok:
connect_mqtt()
# Calibration
print("\n" + "="*60)
print("MQ-2 SENSOR CALIBRATION")
print("="*60)
print("Ensure sensor is in CLEAN AIR!\n")
ro = mq_calibration()
print("="*60)
print("INDUSTRIAL AIR SAFETY SYSTEM ACTIVE")
print("="*60)
print(f"✅ Device: {BASE_TOPIC}")
print(f"✅ PIR Sensor on pin {PIN_PIR} - People detection ACTIVE")
print(f"✅ Occupancy risk multiplier: {OCCUPANCY_RISK_MULTIPLIER}x")
print(f"✅ Motion timeout: {MOTION_TIMEOUT} seconds")
print("="*60 + "\n")
last_publish = 0
last_mqtt_reconnect = 0
dht_failures = 0
while True:
try:
# Update occupancy status (timeout)
people_status = update_occupancy_status()
# Read DHT22
try:
dht_sensor.measure()
temp = dht_sensor.temperature()
humidity = dht_sensor.humidity()
dht_failures = 0
except:
dht_failures += 1
if dht_failures >= 3:
temp, humidity = 25.0, 50.0
else:
time.sleep(1)
continue
# Read gases
gas_data = read_all_gases(ro)
# Check gas interrupt
if gas_triggered:
print(f"[INTERRUPT] Gas threshold exceeded")
gas_triggered = False
# Assess risk with occupancy
risk_data = assess_risk(temp, humidity, gas_data, people_status)
# Control actuators
set_actuators(risk_data)
# Update LCD
update_lcd(temp, humidity, gas_data, risk_data)
# Console output
people_symbol = "👤" if people_status else "⬚"
print(f"\n[{risk_data['level_name']}] {people_symbol} Risk:{risk_data['risk_score']:.0f}% | T:{temp:.1f}C H:{humidity:.1f}%")
print(f" LPG: {gas_data['lpg_ppm']:4d}ppm | CO: {gas_data['co_ppm']:4d}ppm | SMOKE: {gas_data['smoke_ppm']:4d}ppm")
print(f" Primary: {gas_data['primary_gas']} ({gas_data['max_ppm']}ppm) | Action: {risk_data['action']}")
# MQTT reconnect
if wifi_ok and time.time() - last_mqtt_reconnect >= 30:
if not mqtt_client:
connect_mqtt()
last_mqtt_reconnect = time.time()
# Publish
if wifi_ok and time.time() - last_publish >= 10:
publish_data(temp, humidity, gas_data, risk_data)
last_publish = time.time()
time.sleep(2)
except Exception as e:
print(f"[ERROR] {e}")
time.sleep(5)
if __name__ == "__main__":
try:
main()
finally:
print("\nShutting down...")
buzzer.duty(0)
vent_fan.off()
led_green.off()
led_yellow.off()
led_red.off()
if lcd_ok:
lcd.clear()