"""Internet Coffee with Django API"""
from machine import Pin, PWM, SoftI2C, ADC
import time, math
import dht
import wifi
from lcd_api import LcdApi
from i2c_lcd import I2cLcd
import urequests as requests
from umqtt.simple import MQTTClient
import uasyncio as asyncio
import ntptime
# =======================
# Django API Endpoint (replace with ngrok http link)
# =======================
serverUrl = " https://nervous-wants-our-health.trycloudflare.com/api/sensor-data/" # 👈 change xxxx each time you restart ngrok
rl10 = 50e3
gamma = 0.7
def calculate_resistance():
value = adc.read()
voltage_ratio = value / (4095 - value)
return 10e3 * voltage_ratio
def calculate_lux(resistance):
return 10 * math.pow(rl10/resistance, 1/gamma)
# Setup Pins
ledGreen = Pin(12, Pin.OUT)
ledRed = Pin(13, Pin.OUT)
ledBlue = Pin(14, Pin.OUT)
dht22_sensor = dht.DHT22(Pin(32, Pin.IN))
pir_sensor = Pin(25, Pin.IN)
servo = PWM(Pin(26), freq=50, duty=0)
adc = ADC(Pin(34))
adc.atten(ADC.ATTN_11DB)
# Setup LCD
I2C_ADDR = 0x27
i2c = SoftI2C(scl=Pin(18), sda=Pin(19), freq=100000)
lcd = I2cLcd(i2c, I2C_ADDR, 2, 16)
lcd.move_to(0, 0)
lcd.putstr("Internet Coffee")
lcd.move_to(0, 1)
lcd.putstr("System")
def automatic_door(value):
if (value == 1):
lcd.clear()
lcd.putstr("Somebody is\ncomming...")
print("Motion Detection")
servo.duty_ns(1500000)
ledGreen.on()
time.sleep(4)
else:
ledGreen.off()
servo.duty_ns(500000)
def light_state(lux):
if lux < 100:
ledRed.off(); ledGreen.off(); ledBlue.off()
return "Dark"
elif lux < 500:
ledRed.off(); ledGreen.on(); ledBlue.off()
return "Dim"
elif lux < 1100:
ledRed.on(); ledGreen.on(); ledBlue.off()
return "Light"
elif lux < 3000:
ledRed.on(); ledGreen.on(); ledBlue.on()
return "Bright"
else:
ledRed.on(); ledGreen.on(); ledBlue.on()
return "Very Bright"
#Connect to WiFi
wifi.connect_ap()
# NTP sync
try:
ntptime.settime()
print("Time synchronized successfully.")
except Exception as e:
print("Error syncing time:", e)
# MQTT Broker
client_id = "GhghMzQEKQY7Ay08MQA4HAM"
user_name = "GhghMzQEKQY7Ay08MQA4HAM"
password = "UPZLKo0pl+6M2lQenkNbVxnv"
server = "mqtt3.thingspeak.com"
client = MQTTClient(client_id=client_id, server=server, user=user_name, password=password)
client.connect()
client.subscribe(b"channels/2387122/subscribe/fields/field5")
publish_topic = b"channels/2387122/publish"
# =======================
# Task 1: Publish + Django POST
# =======================
async def task1():
while True:
dht22_sensor.measure()
t = dht22_sensor.temperature()
h = dht22_sensor.humidity()
resistance = calculate_resistance()
lux = round(calculate_lux(resistance), 1)
motion_value = pir_sensor.value()
automatic_door(motion_value)
print("=====================================")
print("Temperature:", t, "°C")
print("Humidity:", h, "%")
print("Light state:", light_state(lux))
# Print to LCD
lcd.clear()
lcd.putstr(f"Temp: {t}C\nHum: {h}%")
# --- Send to ThingSpeak (MQTT) ---
payload = f"field1={t}&field2={h}&field3={lux}&field4={motion_value}&status=MQTTPUBLISH"
client.publish(publish_topic, payload.encode("utf-8"))
# --- Send to Django REST API ---
data = {"temperature": t, "humidity": h, "moisture_content": lux}
try:
response = requests.post(serverUrl, json=data)
print("Sent to Django, response:", response.text)
except Exception as e:
print("Error sending to Django:", e)
await asyncio.sleep_ms(5000)
# =======================
# Task 2: Check MQTT msgs
# =======================
async def task2():
while True:
client.check_msg()
await asyncio.sleep_ms(1000)
# =======================
# Main Loop
# =======================
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.create_task(task1())
loop.create_task(task2())
loop.run_forever()