import network
import urequests as requests
import time
import ujson
import random
# WiFi credentials
SSID = "Wokwi-GUEST"
PASSWORD = ""
# Firebase/Firestore Configuration
FIREBASE_PROJECT_ID = "projectx-370c2"
FIREBASE_API_KEY = "AIzaSyBre0u4WuqQJyiKyEmDepHbBCSsVRRT5tY"
FIRESTORE_BASE_URL = f"https://firestore.googleapis.com/v1/projects/{FIREBASE_PROJECT_ID}/databases/(default)/documents"
ACCESS_TOKEN = "eyJhbGciOiJSUzI1NiIsImtpZCI6ImUzZWU3ZTAyOGUzODg1YTM0NWNlMDcwNTVmODQ2ODYyMjU1YTcwNDYiLCJ0eXAiOiJKV1QifQ.eyJpc3MiOiJodHRwczovL3NlY3VyZXRva2VuLmdvb2dsZS5jb20vcHJvamVjdHgtMzcwYzIiLCJhdWQiOiJwcm9qZWN0eC0zNzBjMiIsImF1dGhfdGltZSI6MTc1NzA5Nzk1MywidXNlcl9pZCI6IkpmUDJTV1ZPQW1nMk5SYU9TYTQzSkkzS2NWcDEiLCJzdWIiOiJKZlAyU1dWT0FtZzJOUmFPU2E0M0pJM0tjVnAxIiwiaWF0IjoxNzU3MDk3OTUzLCJleHAiOjE3NTcxMDE1NTMsImVtYWlsIjoib2xhc3VyZEB5YWhvby5jb20iLCJlbWFpbF92ZXJpZmllZCI6ZmFsc2UsImZpcmViYXNlIjp7ImlkZW50aXRpZXMiOnsiZW1haWwiOlsib2xhc3VyZEB5YWhvby5jb20iXX0sInNpZ25faW5fcHJvdmlkZXIiOiJwYXNzd29yZCJ9fQ.h681seMKtr_cg5fO56YcnJ9Au2fhTpOPbaSPpvhPzaOnZLPGzGWvNxUS0MxKb-WVRVYPAebXbPGIqQdfRVMGvGmtDnsufCBs5nSJTL9PsX0Jf4m-Mv0vjjKQ1yOw_QB0MB76x0yUkTahSZAxD2NZ1KeNed-ZqkT1geZ2MxKhG7YbplA87818w7gU7gKHoL9z3a-T1jJLa4e4h_Ak4EFdTR5aQsRWnf236o6TtNGFSzd6zmzm_2rCOs9K9uXnvX97fC-Vr_O25QkTRCIPp-i1icJavuDM62n51KS_sdlfD1p-eN1JwNacIgUXS3-994uQcLkPAVRy8saWa0gyGIfMoA"
# Create or update a document
def set_doc(collection, doc_id, data):
endpoint = f"{collection}/{doc_id}"
payload = {
"fields": {
key: {"stringValue": str(value)} for key, value in data.items()
}
}
return make_request("POST", endpoint, data=payload)
# Retrieve a document
def get_doc(collection, doc_id):
endpoint = f"{collection}/{doc_id}"
response = make_request("GET", endpoint)
if response:
fields = response.get("fields", {})
data = {key: fields[key]["stringValue"] for key in fields}
return data
return None
# Update specific fields in a document
def update_doc(collection, doc_id, data):
endpoint = f"{collection}/{doc_id}"
update_mask = "&".join(f"updateMask.fieldPaths={key}" for key in data.keys())
payload = {
"fields": {
key: {"stringValue": str(value)} for key, value in data.items()
}
}
response = make_request("PATCH", f"{endpoint}?{update_mask}", data=payload)
return response
def make_request(method, endpoint, data=None):
response = None
url = f"{FIRESTORE_BASE_URL}/{endpoint}?key={FIREBASE_API_KEY}"
headers = {
'Content-Type': 'application/json',
'User-Agent': 'MicroPython-ESP32'
}
try:
print("Connecting to firestore")
if method == "GET":
response = requests.get(url=url, headers=headers)
elif method == "POST":
response = requests.post(url=url,headers=headers, data = data)
print(response.json())
elif method == "PATCH":
response = requests.patch(url=url,headers=headers, data = data)
else:
print("Method Not Allowed.")
return None
print(f"Response Status: {response.status_code}")
if response.status_code in [200, 201]:
result = ujson.loads(response.text)
return result
else:
print(f"Failed Response: {response.text} \nStatus Code: {response.status_code}")
response.close()
return None
except Exception as e:
print(f"urequests error: {e}")
if response:
response.close()
return None
# Simulation state
state = {
"id": "WARD-A",
"name": "Ward A - General Medicine",
"bin_id": "BIN-A1",
"bin_height": 100,
"sprays_today": 0,
"last_drop": time.time(),
"sim_time": 0,
"waste_level": 5,
"accumulation_rate": 0.5,
"collected": False
}
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(SSID, PASSWORD)
print("Connecting to WiFi", end="")
while not wlan.isconnected():
print(".", end="")
time.sleep(0.2)
print("\nWiFi connected!")
print("IP address:", wlan.ifconfig()[0])
def simulate_timestamp(state):
state["sim_time"] += 5
hours = 2 + (state["sim_time"] // 3600)
minutes = 46 + ((state["sim_time"] % 3600) // 60)
seconds = state["sim_time"] % 60
return f"2025-09-04T{hours:02d}:{minutes:02d}:{seconds:02d}Z"
def get_gas_ppm():
return random.uniform(50, 500) + random.uniform(-10, 10)
def get_distance(state):
return state["bin_height"] - (state["waste_level"] / 100 * state["bin_height"]) + random.uniform(-2, 2)
# API function
def send_data_urequests(data=None, method="GET"):
response = None
url = "https://hwams.vercel.app/api/robot"
headers = {
'Content-Type': 'application/json',
'User-Agent': 'MicroPython-ESP32',
'Connection': 'close'
}
try:
if method == "GET":
print("Attempting GET request...")
response = urequests.get(url, headers=headers, timeout=10)
result = ujson.loads(response.text)
return result
else:
print("Attempting POST request...")
response = urequests.post(url, json=data, headers=headers, timeout=10)
result = ujson.loads(response.text)
return result
except Exception as e:
print(f"urequests error: {e}")
return None
# Firestore helper function to convert Python data to Firestore format
def python_to_firestore(data):
"""Convert Python dictionary to Firestore document format"""
firestore_data = {}
for key, value in data.items():
if isinstance(value, str):
firestore_data[key] = {"stringValue": value}
elif isinstance(value, int):
firestore_data[key] = {"integerValue": str(value)}
elif isinstance(value, float):
firestore_data[key] = {"doubleValue": value}
elif isinstance(value, bool):
firestore_data[key] = {"booleanValue": value}
elif isinstance(value, list):
firestore_data[key] = {
"arrayValue": {
"values": [{"stringValue": str(item)} for item in value]
}
}
else:
firestore_data[key] = {"stringValue": str(value)}
return {"fields": firestore_data}
def save_sensor_data_to_firestore(data):
"""Save sensor data to Firestore"""
collection = "wards"
document_id = f"{data['id']}"
# Add metadata
firestore_data = data.copy()
firestore_data["created_at"] = data["timestamp"]
firestore_data["device_type"] = "waste_bin_sensor"
return firestore_update_document(collection, document_id=document_id, data=firestore_data)
def update_bin_status_in_firestore(document_id, status_data):
"""Update bin status in Firestore"""
collection = "wards"
return update_doc(collection, document_id, status_data)
def log_alert_to_firestore(alert_data):
"""Log alert events to Firestore"""
collection = "alerts"
document_id = f"alert_{int(time.time())}"
alert_record = {
"bin_id": alert_data["bin_id"],
"alert_type": "disinfectant_spray" if alert_data["alert"] else "normal",
"timestamp": alert_data["timestamp"],
"aqi": alert_data["aqi"],
"disinfectant_level": alert_data["disinfectant_level"],
"waste_level": alert_data["waste_level"]
}
return set_doc(collection, document_id, alert_record)
def main():
connect_wifi()
cycle_count = 0
if network.WLAN(network.STA_IF).isconnected():
while True:
timestamp = simulate_timestamp(state)
# Simulate waste accumulation
if not state["collected"]:
state["waste_level"] += state["accumulation_rate"]
if state["waste_level"] > 100:
state["waste_level"] = 100
print(f"Bin is Full: {state['waste_level']}% at {timestamp}")
print(f"Current waste level: {state['waste_level']}% at {timestamp}")
# set_doc("wards", "waste-level", {"timestamp": timestamp, "level":state['waste_level']})
distance = get_distance(state)
gas_ppm = get_gas_ppm()
aqi = gas_ppm * 0.5
disinfectant = random.uniform(10, 90) + random.uniform(-5, 5)
alert = False
if aqi > 200 or disinfectant < 20:
alert = True
print("Trigger servo motor: disinfectant released")
state["sprays_today"] += 1
data = {
"id":state["id"],
"bin_id": state["bin_id"],
"waste_level": state["waste_level"],
"distance": distance,
"gas_ppm": gas_ppm,
"aqi": aqi,
"disinfectant_level": disinfectant,
"alert": alert,
"sprays_today": state["sprays_today"],
"last_drop": state["last_drop"],
"timestamp": timestamp
}
result = get_doc("/wards/WARD-A","waste_level")
if result:
print(f"Updated document data: {result}")
else:
print("No document found")
# 2. Update bin status
# bin_status = {
# "current_level": state["waste_level"],
# "last_reading": timestamp,
# "status": "full" if state["waste_level"] >= 100 else "active",
# "sprays_today": state["sprays_today"]
# }
# update_bin_status_in_firestore(state["bin_id"], bin_status)
# # 3. Log alerts when they occur
# if alert:
# log_alert_to_firestore(data)
# # 4. Periodic cleanup (every 20 cycles)
# cycle_count += 1
# if cycle_count % 20 == 0:
# cleanup_old_sensor_data()
# # 5. Demonstrate CRUD operations (run once for testing)
# if cycle_count == 1:
# demonstrate_firestore_operations()
time.sleep(2)
cycle_count += 1
else:
print("WiFi Disconnected")
# Run the main loop
main()