# main.py runs automatically after boot.py
# ### Machine Sleep
# print("going to sleep")
# machine.deepsleep(10000)
# machine.lightsleep(10000)
# import toilet_flush
import network
import utime as time
from machine import Pin
import uasyncio as asyncio
import ntptime # time sinc
import urequests
import ujson
led = Pin(2, Pin.OUT)
ip_address = ""
async def led_blink(cycles=0, n=1, on=0.1, off=0.1, cool=0.5, led=Pin(2, Pin.OUT)):
led.off()
x = 0
while x < cycles or cycles == 0:
if cycles != 0:
x += 1
for i in range(n):
led.on()
await asyncio.sleep(on)
led.off()
await asyncio.sleep(off)
await asyncio.sleep(cool-off)
led.off()
def log_event(events=[], filename="logs.txt"):
ev = '\t'.join(events)
with open(filename, 'a') as file:
# Write a new line to the file
file.write(ev + '\n')
file.close()
def get_time():
time_str = ""
tm = ""
try:
### Getting location
timezone_name = ujson.loads(urequests.get("http://ip-api.com/json/" + ip_address).content)["timezone"]
### Getting local time
time_str = ujson.loads(urequests.get("http://worldtimeapi.org/api/timezone/" + timezone_name).text)["datetime"]
tm = time.localtime(time.mktime((int(time_str[0:4]), int(time_str[5:7]), int(time_str[8:10]), int(time_str[11:13]), int(time_str[14:16]), int(time_str[17:19]), 0, 0, -1)))
time_str = "{:04}-{:02}-{:02} {:02}:{:02}:{:02}".format(tm[0], tm[1], tm[2], tm[3], tm[4], tm[5])
except:
time_str = time.time()
return time_str
async def wifi_connect(ssid="", password=""):
### Getting the pass from file
filename = 'wifi.txt'
with open(filename, 'r') as f:
file = f.readlines()
f.close()
if ssid == "": ssid=str(file[0]).strip()
if password == "": password=str(file[1]).strip()
wifi = network.WLAN(network.STA_IF)
wifi.active(False)
await asyncio.sleep(1)
wifi.active(True)
while not wifi.isconnected():
wifi.connect(ssid, password)
try: wifi.ifconfig(('192.168.1.30', '255.255.255.0', '192.168.1.254', '192.168.1.254'))
except: pass
print('\rConnected to {0}'.format(ssid))
ip_address = wifi.ifconfig()[0]
for i in wifi.ifconfig():
print(i)
try:
asyncio.create_task(led_blink(3))
await asyncio.sleep(3)
ext_ip_address = urequests.get('http://myexternalip.com/raw').text
print("Connected, external IP:\n{0}".format(ext_ip_address))
log_event([get_time(), "Connected", ext_ip_address])
except: pass
await asyncio.sleep(10)
# initial value = 1
led = Pin(2, Pin.OUT)
led.off()
sensor = Pin(23, Pin.IN, Pin.PULL_UP) #23
switch = Pin(22, Pin.IN, Pin.PULL_UP)
btn = Pin(33, Pin.IN, Pin.PULL_UP) #33
# mic = Pin(25, Pin.IN)
# initial value 0
motor_push = Pin(19, Pin.OUT)
motor_push.on()
motor_pull = Pin(18, Pin.OUT)
motor_pull.on()
time_freqvency_check = 0.01 #1
time_check_presence = 2 #10
time_delay_before = 20 #60
time_flush_duration = 10 #10
time_between_flushes = 20 #30
time_cool_down = 20 #30
flush_times = 2 #2
button_pushed = False
sensor_presence = False
busy = False
async def flush(hold=2):
print("{0} Flush".format(get_time()))
time_init = time.time()
motor_push.value(not motor_push.value())
await asyncio.sleep(0.5)
t1 = time.time()
while switch.value() == 1 or time.time() - t1 > 3:
await asyncio.sleep(0.01)
motor_push.value(not motor_push.value())
await asyncio.sleep(hold) # hold the button
motor_pull.value(not motor_pull.value())
await asyncio.sleep(0.5)
t1 = time.time()
while switch.value() == 1 or time.time() - t1 > 3:
await asyncio.sleep(0.01)
motor_pull.value(not motor_pull.value())
await asyncio.sleep(time_between_flushes - (time.time() - time_init))
print("{0} CoolDn".format(get_time()))
async def check_busy():
global busy
prev = busy
while True:
if prev != busy:
if busy:
print("{0} Busy".format(get_time()))
led.on()
else:
print("{0} Ready".format(get_time()))
led.off()
prev = busy
await asyncio.sleep(time_freqvency_check)
async def check_btn():
global button_pushed
while True:
if btn.value() == 0 and not button_pushed:
print("{0} Button Pushed".format(get_time()))
button_pushed = True
await asyncio.sleep(time_freqvency_check)
async def check_sensor():
global sensor_presence
while True:
if sensor.value() == 0 and not sensor_presence:
print("{0} Presence detedted".format(get_time()))
sensor_presence = True
await asyncio.sleep(time_freqvency_check)
async def listening():
global button_pushed
global sensor_presence
global time_freqvency_check
global time_between_flushes
global time_check_presence
global busy
global flush_times
while True:
await asyncio.sleep(time_freqvency_check)
# Button actiong
if button_pushed:
busy = True
while btn.value() == 0:
await asyncio.sleep(0.1)
print("{0} Acting Button".format(get_time()))
asyncio.create_task(flush())
log_event([get_time(), "Flush_Button"])
await asyncio.sleep(time_between_flushes + time_flush_duration)
button_pushed = False
sensor_presence = False
busy = False
continue
# Sensor acting
if sensor_presence and not busy:
# initiating
busy = True
# awaiting delay flush
t = 0
while (t < time_delay_before) and not button_pushed:
if sensor.value() == 0:
# print("{0} Present".format(get_time()))
t = 0
else:
# print("{0} awaiting {1}".format(get_time(), time_delay_before-t))
pass
await asyncio.sleep(1)
t += 1
if not button_pushed:
print("{0} Acting Sensor".format(get_time()))
for i in range(flush_times):
asyncio.create_task(flush())
log_event([get_time(), "Flush_Sensor"])
await asyncio.sleep(time_between_flushes + time_flush_duration)
busy = False
sensor_presence = False
async def main():
while True:
await asyncio.gather(
wifi_connect(),
check_btn(),
check_sensor(),
listening(),
check_busy()
)
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()