import urequests as requests
import json
import time
import network
from time import sleep
import gc
#import uzlib
from io import BytesIO
import ntptime
import machine
from machine import Pin, I2C, PWM
import ssd1306 # <<--- New import
from font import Font # <<--- New import
# Initialize the buzzer <<--- New code block
buzzer = PWM(Pin(12))
BUZZER_GND = Pin(26, Pin.OUT)
boot = Pin(0, Pin.IN)
test = Pin(2, Pin.IN)
TEST_GND = Pin(15, Pin.OUT)
text_pos = [10, 40]
text_pos_index = 0
max_retries = 5 # Max number of attempts to initialize the display
for attempt in range(1, max_retries + 1):
try:
# Initialize the display <<--- New code block
i2c = I2C(scl=Pin(22), sda=Pin(21), freq=4000000)
display = ssd1306.SSD1306_I2C(128, 64, i2c)
f = Font(display)
print("Successfully initialized the display!")
break # Exit the loop if successful
except Exception as e:
print(f"Attempt {attempt}: Failed to initialize the display. Error: {e}")
if attempt == max_retries:
print("Max attempts reached. Giving up.")
else:
print("Retrying...")
time.sleep(1) # Wait for 1 second before retrying
def connect():
keys_ = {
'AAMCAR': 'goodkarma',
'Reuveni':'0522254848'
}
station = network.WLAN(network.STA_IF)
sleep(0.2)
station.active(True)
stat = station.scan()
best_ap = (None,0,0,-100)
#print(stat)
SSID = ''
for s in stat:
#print(s)
check = s[0].decode('utf-8')
if check in keys_.keys() and s[3] > best_ap[3]:
#print('Im in')
best_ap = s
print(f'The best Access Point found is {best_ap[0]}')
SSID = best_ap[0].decode('utf-8')
if best_ap[0] != None:
station.connect(SSID, keys_[SSID])
while station.isconnected() == False:
pass
print(f'Connected to {SSID}')
else:
print('Did not connect')
def play_buzz(frequency, duration):
buzzer.freq(frequency)
buzzer.duty(512) # 50% duty cycle
sleep(duration)
buzzer.duty(0) # Turn off
class RedAlert():
def __init__(self):
# initialize locations list
# self.locations = self.get_locations_list()
# cookies
self.cookies = ""
# initialize user agent for web requests
self.headers = {
"Host":"www.oref.org.il",
"Connection":"close", #changed
"Content-Type":"application/json",
"charset":"utf-8",
"X-Requested-With":"XMLHttpRequest",
"sec-ch-ua-mobile":"?0",
"User-Agent":"",
"sec-ch-ua-platform":"macOS",
"Accept":"*/*", # changed
"sec-ch-ua": '".Not/A)Brand"v="99", "Google Chrome";v="103", "Chromium";v="103"',
"Sec-Fetch-Site":"same-origin",
"Sec-Fetch-Mode":"cors",
"Sec-Fetch-Dest":"empty",
"Referer":"https://www.oref.org.il/12481-he/Pakar.aspx",
"Accept-Encoding":"gzip, deflate, br",
"Accept-Language":"en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7",
}
# intiiate cokies
self.get_cookies()
def get_cookies(self):
HOST = "https://www.oref.org.il/"
r = requests.get(HOST,headers=self.headers)
#print(HOST)
#print(self.headers)
#print(f'status code: {r.status_code}')
#print(r.json())
try:
self.cookies = r.cookies
except:
pass
#print('No cookies item found')
#print('Done')
def count_alerts(self,alerts_data):
# this function literally return how many alerts there are currently
return len(alerts_data)
def get_red_alerts(self):
# get red alerts
URL = "https://www.oref.org.il/WarningMessages/alert/alerts.json"
try:
r = requests.get(URL, headers=self.headers)
# Check if the response is successful
if r.status_code == 200:
#print(r.content) # For debugging
pass
# try:
# alerts = r.content.decode("UTF-8").replace("\n", "").replace("\r", "")
# except UnicodeError:
# print('Decoding error. Could not decode content.')
# alerts = ''
else:
print(f"HTTP error. Status code: {r.status_code}")
except Exception as e:
print(f"Network or request error: {e}")
return
#print(r.content)
decompressor = uzlib.DecompIO(BytesIO(r.content), 31) # 31 is the gzip window size
decompressed_data = decompressor.read()
#print(decompressed_data)
alerts = decompressed_data.decode('utf-8-sig').strip() # 'utf-8-sig' will automatically remove BOM if it exists
#print(alerts.strip()) # This will remove the newline characters at the beginning or end
if(len(alerts) <= 1):
result = f'No alerts on'
else:
result = alerts
return result
#print("alerts 162: ")
#print(alerts)
#print("alerts['data'] : ")
if test.value()==1:
print(163)
# Create a Python dictionary
my_dict = {'data': ['NYC', 'Chcago', 'London', 'Tokyo']}
# Sample JSON string
json_str = '''
{
"id": "133428799180000000",
"cat": "1",
"title": "ירי רקטות וטילים",
"data": [
"אור יהודה",
"אזור",
"חולון",
"בית דגן",
"חמד",
"משמר השבעה",
"גנות",
"גבעת שמואל",
"גת רימון",
"סביון",
"פתח תקווה",
"קריית אונו",
"רמת גן - מזרח",
"גני תקווה"
],
"desc": "היכנסו למרחב המוגן ושהו בו 10 דקות"
}
'''
# Convert the dictionary to a JSON string
json_str = json.dumps(json_str)
result= json_str
#j = json.loads(r.content)
# check if there is no alerts - if so, return null.
#if(len(j["data"]) == 0):
# return None
# initialize the current timestamp to know when the rocket alert started
#j["timestamp"] = time.time()
## parse data
return result
def adjust_for_israel(dt):
year, month, day, _, hour, minute, second, _ = dt
dst_start = (year, 3, (31 - (5 + year * 5 // 4) % 7), 2) # Friday before last Sunday in March at 2am
dst_end = (year, 10, (31 - (2 + year * 5 // 4) % 7), 2) # Last Sunday in October at 2am
is_dst = dst_start <= (year, month, day, hour) < dst_end
# Adjust for timezone and DST
hour += 2 + is_dst # UTC+2 for IST and +1 for DST
# Handle overflow
if hour >= 24:
hour -= 24
day += 1
# Handle day overflow for each month
if month in [4, 6, 9, 11] and day > 30 or month == 2 and (day > 29 or (year % 4 != 0 or (year % 100 == 0 and year % 400 != 0)) and day > 28) or day > 31:
day = 1
month += 1
if month > 12:
month = 1
year += 1
return (year, month, day, hour, minute, second)
# ******************************************************
for attempt in range(1, max_retries + 1):
try:
# Initialize the display <<--- New code block
connect()
ntptime.settime() # This will set the board's RTC using an NTP server
rtc = machine.RTC()
print("Successfully initialized the display!")
break # Exit the loop if successful
except Exception as e:
print(f"Attempt {attempt}: Failed to initialize the display. Error: {e}")
if attempt == max_retries:
print("Max attempts reached. Giving up.")
else:
print("Retrying...")
time.sleep(1) # Wait for 1 second before retrying
alert = RedAlert()
alert_data = {}
city_data = []
migun_time = 0
gc.collect()
# ... (rest of your code)
def print_text(name):
for x in range(128, -len(name)*8, -1): # len(name)*8 gives the width of the text
display.fill(0) # Clear the display
display.text(name, x, 0, 1) # Display the name at (x,0)
display.show() # Refresh the display
sleep(0.001) # Short sleep to slow down the movement
#sleep(1) # Pause for 1 second before the next name
def print_text_static(name):
global text_pos_index # Declare text_pos_index as global to modify it
display.fill(0) # Clear the display
display.text(name, text_pos[text_pos_index], text_pos[text_pos_index], 1) # Display the name
display.show() # Refresh the display
# Toggle text_pos_index for the next call
text_pos_index = not text_pos_index
#sleep(1) # Pause for 1 second before the next name
BUZZER_GND.value(0) # Set to low to behave like GND
TEST_GND.value(1) # Set to low to behave like GND
sleep(0.3)
# Adding a polling flag and a counter
poll_for_alerts = True
counter = 0
def display_data(json_str):
try:
# Parse JSON string to Python dictionary
json_dict = json.loads(json_str)
print(type(json_dict)) # Debug: print the type of json_dict
print(json_dict) # Debug: print the content of json_dict
# Extract list of strings from "data" key
data_list = json_dict['data']
# Additional debugging: Print the type and content of data_list
print(type(data_list)) # Debug: print the type of data_list
print(data_list) # Debug: print the content of data_list
# Clear the display
display.fill(0)
# Initialize variables to manage positions
x = 0
y = 0
# Loop through the list of strings and print them
for i, item in enumerate(data_list):
display.text(item, x, y, 1)
# Manage positions for 2 columns
if i % 2 == 0:
x = 64 # Move to the second column
else:
x = 0 # Move back to the first column
y += 10 # Move down to the next row
# Refresh the display
display.show()
except (ValueError, KeyError, TypeError):
# If JSON is invalid or doesn't have a "data" key, call print_text_static
print_text_static("Invalid JSON")
def alert_sound():
play_buzz(1000, 0.5)
sleep(0.3)
play_buzz(1000, 0.5)
sleep(0.3)
play_buzz(1000, 0.5)
sleep(0.3)
alert_sound()
text_pos_index=1
while True:
if not boot.value():
break
if poll_for_alerts:
dt = adjust_for_israel(rtc.datetime())
print("{:02d}:{:02d}:{:02d}".format(dt[3], dt[4], dt[5]))
alerts_result = alert.get_red_alerts()
try:
# First parse to convert escaped string into regular string
intermediate_str = json.loads(alerts_result)
print(359, intermediate_str)
print(intermediate_str.get('data', "360 no data"))
print(360, type(intermediate_str))
# Then parse to convert it into a dictionary
json_data = json.loads(intermediate_str)
print(json_data.get('data', "364 no data"))
print(362, type(json_data), json_data)
keys = json_data.keys()
print("Keys in the JSON object:", keys)
if 'data' in json_data:
print(306, json_data['data'])
name_str = ', '.join(json_data['data'])
print("json_data['data'] ", name_str)
alert_sound()
display_data(name_str)
#print_text(name_str)
poll_for_alerts = False # Stop polling once we get data
except (ValueError, KeyError, TypeError, AttributeError) as ex:
# Handle exceptions here
print_text_static("Invalid JSON")
print(ex)
except (OSError): # MicroPython may raise OSError for JSON issues
print_text_static("ALL CLEAR")
pass
else:
# When the counter reaches a certain value, start polling again
counter += 1
if counter >= 5: # Adjust the value as needed
poll_for_alerts = True
counter = 0
sleep(0.1) # Sleep for a shorter time to keep things responsive