import network
import time
import ntptime # Added for Part 7 Latency Test
from umqtt.simple import MQTTClient
# =========================
# ThingSpeak MQTT Settings
# =========================
CHANNEL_ID = "3321063"
MQTT_CLIENT_ID = "MDMkCh01IiIQNiIJFQIZAys"
MQTT_BROKER = "mqtt3.thingspeak.com"
MQTT_USER = "MDMkCh01IiIQNiIJFQIZAys"
MQTT_PASSWORD = "+Y6dgJ87zKumeCaVcaAIAMkY"
MQTT_TOPIC = "channels/{}/publish".format(CHANNEL_ID)
CSV_FILE = "IoTSensorStream.csv"
SEND_INTERVAL = 16 # seconds
# Globals for precise timestamping (Part 7)
EPOCH_OFFSET = 946684800
RESYNC_INTERVAL_MS = 15000
sync_ticks_ms = 0
sync_unix_sec = 0
last_ntp_sync_ticks_ms = 0
def sync_time_base():
"""Sync RTC and align ticks_ms with the next whole RTC second."""
global sync_ticks_ms, sync_unix_sec, last_ntp_sync_ticks_ms
ntptime.settime()
# time.time() has 1-second resolution. Capture ticks_ms exactly when
# the RTC second changes so both Wokwi boards use the same millisecond base.
rtc_sec = time.time()
while time.time() == rtc_sec:
time.sleep_ms(1)
sync_unix_sec = time.time() + EPOCH_OFFSET
sync_ticks_ms = time.ticks_ms()
last_ntp_sync_ticks_ms = sync_ticks_ms
def maybe_resync_time_base():
"""Refresh the time base periodically to limit Wokwi clock drift."""
if time.ticks_diff(time.ticks_ms(), last_ntp_sync_ticks_ms) < RESYNC_INTERVAL_MS:
return
try:
sync_time_base()
print("Time re-synced.")
except Exception as e:
print("NTP re-sync failed:", e)
def connect_wifi_and_sync_time():
global sync_ticks_ms, sync_unix_sec
print("Connecting to WiFi", end="")
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect("Wokwi-GUEST", "")
while not sta_if.isconnected():
print(".", end="")
time.sleep(0.1)
print(" Connected!")
print("IP Address:", sta_if.ifconfig()[0])
# Sync NTP Time
print("Syncing NTP Time for Latency Test...")
while True:
try:
sync_time_base()
print("Time synced successfully.")
break
except Exception as e:
print("NTP sync failed:", e)
print("Retrying NTP in 2 seconds...")
time.sleep(2)
def get_absolute_ms():
"""Returns absolute milliseconds for latency calculation"""
current_ticks = time.ticks_ms()
elapsed_ms = time.ticks_diff(current_ticks, sync_ticks_ms)
return (sync_unix_sec * 1000) + elapsed_ms
def format_actual_time(abs_ms):
"""Converts absolute milliseconds to human-readable format YYYY-MM-DD HH:MM:SS.mmm (UTC+8)"""
seconds = (abs_ms // 1000) + 28800 - EPOCH_OFFSET
ms_remainder = abs_ms % 1000
t = time.localtime(seconds)
return "{:04d}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}.{:03d}".format(
t[0], t[1], t[2], t[3], t[4], t[5], ms_remainder
)
def get_timestamps():
"""Returns local boot ticks and absolute milliseconds"""
current_ticks = time.ticks_ms()
elapsed_ms = time.ticks_diff(current_ticks, sync_ticks_ms)
absolute_ms = (sync_unix_sec * 1000) + elapsed_ms
return current_ticks, absolute_ms
def connect_mqtt():
print("Connecting to ThingSpeak MQTT... ", end="")
client = MQTTClient(
client_id=MQTT_CLIENT_ID,
server=MQTT_BROKER,
user=MQTT_USER,
password=MQTT_PASSWORD
)
client.connect(clean_session=True)
print("Connected!")
return client
def open_csv():
f = open(CSV_FILE, "r")
header = f.readline().strip()
print("CSV header:", header)
return f
def read_next_row(f):
line = f.readline()
if not line:
print("End of file reached, restarting from beginning...")
f.close()
f = open_csv()
line = f.readline()
line = line.strip()
while line == "":
line = f.readline()
if not line:
print("End of file reached, restarting from beginning...")
f.close()
f = open_csv()
line = f.readline()
line = line.strip()
parts = line.split(",")
if len(parts) != 6:
raise ValueError("Unexpected CSV format: " + line)
timestep_val = int(parts[0])
volt = parts[1]
rotate = parts[2]
pressure = parts[3]
vibration = parts[4]
return f, timestep_val, volt, rotate, pressure, vibration
# Main Execution
connect_wifi_and_sync_time()
client = connect_mqtt()
csv_file = open_csv()
while True:
try:
csv_file, timestep_val, volt, rotate, pressure, vibration = read_next_row(csv_file)
payload = "field1={}&field2={}&field3={}&field4={}&status=timestep{}".format(
volt, rotate, pressure, vibration, timestep_val
)
maybe_resync_time_base()
# Get exact timestamp right before publishing
boot_ms, abs_ms = get_timestamps()
actual_time_str = format_actual_time(abs_ms)
#abs_send_time = format_actual_time(get_absolute_ms())
print("\n[Raw ms: {} | Actual: {}] Publishing to {}: {}".format(abs_ms, actual_time_str, MQTT_TOPIC, payload))
print("\n Publishing to {}: {}".format(MQTT_TOPIC, payload))
client.publish(MQTT_TOPIC, payload, qos=0)
print("Publish success")
print("Waiting {} seconds...".format(SEND_INTERVAL))
time.sleep(SEND_INTERVAL)
except Exception as e:
print("Error:", e)
print("Reconnecting in 3 seconds...\n")
time.sleep(3)
try:
client = connect_mqtt()
except Exception as e2:
print("Reconnect failed:", e2)
time.sleep(3)