"""
ESP32 Coffee Dryer IoT - MQTT Focus Version
✅ Focuses on MQTT (ThingSpeak) since HTTP/HTTPS has Wokwi limitations
✅ Enhanced coffee simulation for realistic dashboard-like data
✅ Full sensor simulation with coffee drying physics
✅ Alternative to Django direct connection
"""
from machine import Pin, PWM, SoftI2C, ADC
import time, math
import dht
import wifi
from lcd_api import LcdApi
from i2c_lcd import I2cLcd
from umqtt.simple import MQTTClient
import uasyncio as asyncio
import ntptime
import ujson
import random
# =======================
# 🌐 MQTT Focus Configuration (WORKS IN WOKWI!)
# =======================
# Note: Since Wokwi has HTTP/HTTPS limitations, we focus on MQTT
# MQTT to ThingSpeak works perfectly and can display data
# Coffee simulation parameters
INITIAL_MOISTURE = 48.5
TARGET_MOISTURE = 12.5
DRYING_RATE = 0.016
# Enhanced sensor parameters
rl10 = 50e3
gamma = 0.7
class AdvancedCoffeeSimulation:
"""Advanced coffee drying simulation with realistic physics"""
def __init__(self):
self.current_moisture = INITIAL_MOISTURE
self.drying_complete = False
self.total_cycles = 0
self.start_time = time.time()
def simulate_realistic_drying(self, temperature, humidity, light, cycle):
"""Simulate realistic coffee drying process"""
if self.drying_complete:
# Maintain target with small natural variations
return TARGET_MOISTURE + random.uniform(-0.18, 0.18)
# Advanced coffee drying physics
heat_effect = max(0, (temperature - 24) * 0.025) # Heat accelerates drying
light_effect = (light / 1000.0) * 0.014 # Light assists drying
humidity_resistance = max(0, (humidity - 42) * 0.012) # Humidity slows drying
time_progression = cycle * DRYING_RATE # Natural time progression
airflow_simulation = random.uniform(0.005, 0.015) # Simulated airflow
# Environmental factors
ambient_factor = random.uniform(-0.008, 0.010)
# Calculate total moisture loss
total_drying = (heat_effect + light_effect + time_progression +
airflow_simulation + ambient_factor - humidity_resistance)
# Apply drying
self.current_moisture = max(TARGET_MOISTURE, self.current_moisture - total_drying)
# Check if drying is complete
if self.current_moisture <= TARGET_MOISTURE + 0.4:
if not self.drying_complete:
print(f"🎉 Coffee drying process complete! Final moisture: {self.current_moisture:.1f}%")
self.drying_complete = True
# Add realistic measurement variation
measured = self.current_moisture + random.uniform(-0.22, 0.22)
return max(10.0, min(55.0, round(measured, 2)))
class EnhancedSensorSimulation:
"""Enhanced sensor simulation with realistic correlations"""
def __init__(self):
self.base_temp = 33.5
self.base_humidity = 53.0
self.temp_trend = 0.0
self.humidity_trend = 0.0
self.weather_pattern = random.choice(['sunny', 'cloudy', 'variable'])
def get_realistic_temperature(self, cycle):
"""Generate realistic temperature with daily patterns"""
# Daily temperature cycle (simulates day/night)
daily_cycle = math.sin(cycle * 0.016) * 4.2
# Slow temperature drift (weather changes)
self.temp_trend += random.uniform(-0.07, 0.07)
self.temp_trend = max(-2.0, min(2.0, self.temp_trend))
# Weather pattern effects
weather_effect = 0
if self.weather_pattern == 'sunny':
weather_effect = random.uniform(0, 1.5)
elif self.weather_pattern == 'cloudy':
weather_effect = random.uniform(-1.0, 0.5)
else: # variable
weather_effect = random.uniform(-0.8, 0.8)
# Random noise
noise = random.uniform(-1.0, 1.4)
# Calculate final temperature
temperature = self.base_temp + daily_cycle + self.temp_trend + weather_effect + noise
# Occasionally change weather pattern
if random.random() < 0.05: # 5% chance per cycle
self.weather_pattern = random.choice(['sunny', 'cloudy', 'variable'])
print(f"🌤️ Weather pattern changed to: {self.weather_pattern}")
return max(28.0, min(43.0, round(temperature, 1)))
def get_realistic_humidity(self, temperature, cycle):
"""Generate humidity with strong temperature correlation"""
# Strong inverse temperature relationship (physics-based)
temp_correlation = -(temperature - 33.0) * 1.4
# Humidity cycles (different from temperature)
humidity_cycle = math.cos(cycle * 0.021) * 5.0
# Humidity drift
self.humidity_trend += random.uniform(-0.14, 0.14)
self.humidity_trend = max(-4.0, min(4.0, self.humidity_trend))
# Weather pattern effects on humidity
weather_humidity = 0
if self.weather_pattern == 'sunny':
weather_humidity = random.uniform(-2.0, 0)
elif self.weather_pattern == 'cloudy':
weather_humidity = random.uniform(0, 3.0)
else: # variable
weather_humidity = random.uniform(-1.5, 1.5)
# Random variations
noise = random.uniform(-2.8, 2.8)
# Calculate final humidity
humidity = (self.base_humidity + temp_correlation + humidity_cycle +
self.humidity_trend + weather_humidity + noise)
return max(32.0, min(82.0, round(humidity, 1)))
def get_realistic_light(self, cycle):
"""Generate realistic light with day/night and weather"""
# Base day/night cycle
base_light = 500 + math.sin(cycle * 0.052) * 250
# Weather effects on light
weather_light_factor = 1.0
if self.weather_pattern == 'sunny':
weather_light_factor = random.uniform(1.1, 1.5)
elif self.weather_pattern == 'cloudy':
weather_light_factor = random.uniform(0.5, 0.9)
else: # variable
weather_light_factor = random.uniform(0.7, 1.3)
# Random cloud effects
cloud_effect = random.uniform(0.8, 1.2)
# Random noise
noise = random.uniform(-50, 50)
# Calculate final light
light = (base_light * weather_light_factor * cloud_effect) + noise
return max(220, min(950, round(light, 1)))
def calculate_resistance():
"""Calculate resistance from ADC reading"""
value = adc.read()
voltage_ratio = value / (4095 - value)
return 10e3 * voltage_ratio
def calculate_lux(resistance):
"""Calculate lux from resistance"""
return 10 * math.pow(rl10/resistance, 1/gamma)
def send_enhanced_mqtt_data(temp, humidity, moisture, light, motion, cycle):
"""Send enhanced data to MQTT (ThingSpeak) with coffee-specific fields"""
if not mqtt_connected or not mqtt_client:
print("❌ MQTT not connected")
return False
try:
# Enhanced payload with coffee-specific data
payload = (f"field1={temp}&field2={humidity}&field3={moisture}&"
f"field4={light}&field5={1 if motion else 0}&"
f"field6={cycle}&field7={'1' if coffee_sim.drying_complete else '0'}&"
f"status=COFFEE_DRYER_CYCLE_{cycle}")
mqtt_client.publish(publish_topic, payload.encode("utf-8"))
print(f"📡 MQTT Enhanced: Cycle {cycle}")
print(f" 📊 T:{temp}°C H:{humidity}% M:{moisture}% L:{light}lux")
print(f" ☕ Drying: {'Complete' if coffee_sim.drying_complete else 'In Progress'}")
return True
except Exception as e:
print(f"❌ MQTT send error: {e}")
return False
# Hardware initialization
ledGreen = Pin(12, Pin.OUT)
ledRed = Pin(13, Pin.OUT)
ledBlue = Pin(14, Pin.OUT)
dht22_sensor = dht.DHT22(Pin(32, Pin.IN))
pir_sensor = Pin(25, Pin.IN)
servo = PWM(Pin(26), freq=50, duty=0)
adc = ADC(Pin(34))
adc.atten(ADC.ATTN_11DB)
# LCD Setup
I2C_ADDR = 0x27
totalRows = 2
totalColumns = 16
i2c = SoftI2C(scl=Pin(18), sda=Pin(19), freq=100000)
lcd = I2cLcd(i2c, I2C_ADDR, totalRows, totalColumns)
# Initialize simulation engines
coffee_sim = AdvancedCoffeeSimulation()
sensor_sim = EnhancedSensorSimulation()
def enhanced_startup():
"""Enhanced startup sequence"""
lcd.clear()
lcd.move_to(0, 0)
lcd.putstr("Coffee Dryer")
lcd.move_to(0, 1)
lcd.putstr("MQTT Enhanced")
time.sleep(2)
lcd.clear()
lcd.move_to(0, 0)
lcd.putstr("Initializing...")
lcd.move_to(0, 1)
lcd.putstr("ThingSpeak MQTT")
time.sleep(1)
def advanced_motion_control(motion_detected):
"""Advanced motion detection and door control"""
if motion_detected:
print("🚶 Advanced motion detection triggered")
lcd.clear()
lcd.move_to(0, 0)
lcd.putstr("Motion Detected!")
lcd.move_to(0, 1)
lcd.putstr("Smart Door Open")
# Smooth door opening sequence
for duty in range(500000, 1500000, 50000):
servo.duty_ns(duty)
time.sleep(0.1)
ledGreen.on()
time.sleep(3)
# Smooth door closing
for duty in range(1500000, 500000, -50000):
servo.duty_ns(duty)
time.sleep(0.1)
else:
ledGreen.off()
def advanced_lighting_control(lux):
"""Advanced ambient lighting control with 5 levels"""
if lux < 160:
# Very dark - blue only (night mode)
ledRed.off()
ledGreen.off()
ledBlue.on()
return "Very Dark"
elif lux < 350:
# Dark - green only
ledRed.off()
ledGreen.on()
ledBlue.off()
return "Dark"
elif lux < 550:
# Normal - green + red
ledRed.on()
ledGreen.on()
ledBlue.off()
return "Normal"
elif lux < 750:
# Bright - all LEDs
ledRed.on()
ledGreen.on()
ledBlue.on()
return "Bright"
else:
# Very bright - all LEDs with special indication
ledRed.on()
ledGreen.on()
ledBlue.on()
return "Very Bright"
# WiFi connection
wifi.connect_ap()
enhanced_startup()
# NTP time synchronization
try:
ntptime.settime()
print("✅ Time synchronized")
except Exception as e:
print(f"⚠️ Time sync failed: {e}")
# Enhanced MQTT setup
def setup_enhanced_mqtt():
"""Setup enhanced MQTT connection to ThingSpeak"""
try:
client_id = "GhghMzQEKQY7Ay08MQA4HAM"
username = "GhghMzQEKQY7Ay08MQA4HAM"
password = "UPZLKo0pl+6M2lQenkNbVxnv"
server = "mqtt3.thingspeak.com"
client = MQTTClient(client_id=client_id, server=server, user=username, password=password)
client.set_callback(enhanced_mqtt_callback)
client.connect()
client.subscribe(b"channels/2387122/subscribe/fields/field5")
global publish_topic, mqtt_client, mqtt_connected
publish_topic = b"channels/2387122/publish"
mqtt_client = client
mqtt_connected = True
print("✅ Enhanced MQTT Connected to ThingSpeak")
return client
except Exception as e:
print(f"❌ Enhanced MQTT setup failed: {e}")
return None
def enhanced_mqtt_callback(topic, msg):
"""Enhanced MQTT command handler"""
try:
command = msg.decode("utf-8").strip()
print(f"📨 Enhanced MQTT command: {command}")
# Basic LED commands
if command == "door open":
advanced_motion_control(True)
elif command == 'green on':
ledGreen.on()
elif command == 'green off':
ledGreen.off()
elif command == 'red on':
ledRed.on()
elif command == 'red off':
ledRed.off()
elif command == 'blue on':
ledBlue.on()
elif command == 'blue off':
ledBlue.off()
elif command == 'all on':
ledGreen.on()
ledRed.on()
ledBlue.on()
elif command == 'all off':
ledGreen.off()
ledRed.off()
ledBlue.off()
# Coffee-specific commands
elif command == 'reset_drying':
coffee_sim.current_moisture = INITIAL_MOISTURE
coffee_sim.drying_complete = False
print("🔄 Coffee drying process reset")
elif command == 'force_complete':
coffee_sim.current_moisture = TARGET_MOISTURE
coffee_sim.drying_complete = True
print("✅ Coffee drying marked complete")
except Exception as e:
print(f"❌ Enhanced MQTT callback error: {e}")
# Initialize enhanced MQTT
mqtt_client = setup_enhanced_mqtt()
mqtt_connected = mqtt_client is not None
# Main enhanced monitoring task
async def enhanced_monitoring_system():
"""Enhanced monitoring system focusing on MQTT data transmission"""
cycle_count = 1
mqtt_success_count = 0
print("🚀 Enhanced Coffee Dryer - MQTT Focus Edition Starting...")
print(f"📡 MQTT Target: ThingSpeak Channel 2387122")
print(f"☕ Advanced coffee drying physics simulation")
print(f"🌡️ Enhanced sensor correlations and weather patterns")
print(f"💡 Note: Focuses on MQTT since Wokwi has HTTP limitations")
print("=" * 70)
while True:
try:
# Generate advanced sensor readings
temperature = sensor_sim.get_realistic_temperature(cycle_count)
humidity = sensor_sim.get_realistic_humidity(temperature, cycle_count)
light_lux = sensor_sim.get_realistic_light(cycle_count)
# Calculate advanced coffee moisture
moisture_content = coffee_sim.simulate_realistic_drying(temperature, humidity, light_lux, cycle_count)
# Motion detection
motion_detected = pir_sensor.value()
# Handle motion with advanced control
if motion_detected:
advanced_motion_control(motion_detected)
# Advanced lighting control
light_state = advanced_lighting_control(light_lux)
# Enhanced status display
print("=" * 55)
print(f"📊 Cycle {cycle_count} | MQTT Success: {mqtt_success_count}")
print(f"🌡️ Temperature: {temperature}°C ({'Daily cycle' if cycle_count % 30 < 15 else 'Night cycle'})")
print(f"💧 Humidity: {humidity}% (Correlated with temp)")
print(f"☕ Coffee Moisture: {moisture_content}% (Advanced physics)")
print(f"💡 Light: {light_state} ({light_lux} lux)")
print(f"🌤️ Weather: {sensor_sim.weather_pattern.title()}")
print(f"🚶 Motion: {'Detected' if motion_detected else 'None'}")
print(f"⏱️ Coffee Status: {'Drying Complete! ✅' if coffee_sim.drying_complete else 'Drying in Progress 🔄'}")
# Enhanced LCD display
lcd.clear()
lcd.move_to(0, 0)
lcd.putstr(f"T:{temperature}C M:{moisture_content}%")
lcd.move_to(0, 1)
status_text = "DONE" if coffee_sim.drying_complete else "WORK"
weather_icon = "S" if sensor_sim.weather_pattern == "sunny" else "C" if sensor_sim.weather_pattern == "cloudy" else "V"
lcd.putstr(f"H:{humidity}% {status_text}{weather_icon}")
# Send enhanced data to MQTT (ThingSpeak)
mqtt_success = send_enhanced_mqtt_data(temperature, humidity, moisture_content,
light_lux, motion_detected, cycle_count)
if mqtt_success:
mqtt_success_count += 1
print(f"📈 MQTT TOTAL SUCCESS: {mqtt_success_count}")
# Show MQTT success on LCD
lcd.move_to(15, 1)
lcd.putstr("✓")
else:
print("📉 MQTT: Send failed")
lcd.move_to(15, 1)
lcd.putstr("✗")
# Display data summary for dashboard equivalent
if cycle_count % 10 == 0: # Every 10 cycles
print(f"📋 10-Cycle Summary:")
print(f" ☕ Coffee Progress: {INITIAL_MOISTURE:.1f}% → {moisture_content:.1f}%")
print(f" 🌡️ Avg Temp Range: {temperature-2:.1f}°C - {temperature+2:.1f}°C")
print(f" 💧 Humidity Stability: {'Good' if abs(humidity - 53) < 10 else 'Variable'}")
print(f" 📡 MQTT Reliability: {(mqtt_success_count/cycle_count)*100:.1f}%")
cycle_count += 1
except Exception as main_error:
print(f"❌ Enhanced monitoring error: {main_error}")
await asyncio.sleep_ms(5000) # 5-second monitoring cycle
# MQTT monitoring task
async def mqtt_monitoring_system():
"""Enhanced MQTT message monitoring"""
while True:
try:
if mqtt_connected and mqtt_client:
mqtt_client.check_msg()
except Exception as e:
print(f"❌ MQTT monitoring error: {e}")
await asyncio.sleep_ms(1000)
# System startup
print("🔧 Enhanced Coffee Dryer - MQTT Focus Edition")
print(f"📡 MQTT Status: {'✅ Connected' if mqtt_connected else '❌ Failed'}")
print("☕ Features:")
print(" ✅ Advanced coffee drying physics")
print(" ✅ Enhanced sensor correlations")
print(" ✅ Weather pattern simulation")
print(" ✅ MQTT data transmission to ThingSpeak")
print(" 💡 Dashboard-equivalent data via MQTT")
print("=" * 70)
# Start enhanced system
try:
loop = asyncio.get_event_loop()
loop.create_task(enhanced_monitoring_system())
loop.create_task(mqtt_monitoring_system())
loop.run_forever()
except Exception as system_error:
print(f"❌ System startup error: {system_error}")
except KeyboardInterrupt:
print("\n🛑 Enhanced system stopped")
print("📊 All sensor data has been sent to ThingSpeak via MQTT")
print("🔄 Restart to continue monitoring")