'''
-------------------------------------------------
Samsung Innovation Campus Batch 6
Team: Luthfi Hanif
Univ: Universitas Gadjah Mada (UGM)
Theme: Navigation and Tilt Measure for Ship
-------------------------------------------------
'''
from machine import Pin, I2C
import ssd1306
import mpu6050
import time
from math import atan2, degrees
from umqtt.simple import MQTTClient
import json
import network
# Konfigurasi WiFi
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# Konfigurasi MQTT
MQTT_BROKER = "broker.emqx.io"
MQTT_PORT = ""
MQTT_TOPIC_PUB = "/UNI544/LuthfiHanif/data_sensor"
MQTT_TOPIC_SUB = "/UNI544/LuthfiHanif/aktuasi_led"
MQTT_USER = ""
MQTT_PASSWORD = ""
MQTT_CLIENT_ID = "LuthfiHanif_SymbIoT_UNI544"
LED_PINS = [19, 18, 5, 17]
LED_RULES = {
"Utara": [17],
"Timur Laut": [17, 5],
"Timur": [5],
"Tenggara": [5, 18],
"Selatan": [18],
"Barat Daya": [18, 19],
"Barat": [19],
"Barat Laut": [19, 17]
}
DIRECTIONS = [
(22.5, 67.5, "Timur Laut"),
(67.5, 112.5, "Timur"),
(112.5, 157.5, "Tenggara"),
(157.5, 202.5, "Selatan"),
(202.5, 247.5, "Barat Daya"),
(247.5, 292.5, "Barat"),
(292.5, 337.5, "Barat Laut"),
(337.5, 22.5, "Utara")
]
class OLED:
def __init__(self, scl_pin=22, sda_pin=21, width=128, height=64):
i2c = I2C(0, scl=Pin(scl_pin), sda=Pin(sda_pin))
self.oled = ssd1306.SSD1306_I2C(width, height, i2c)
def display_text(self, lines):
self.oled.fill(0)
for i, line in enumerate(lines):
self.oled.text(line, 0, i * 10)
self.oled.show()
class MPU6050Sensor:
def __init__(self, scl_pin=22, sda_pin=21):
i2c = I2C(0, scl=Pin(scl_pin), sda=Pin(sda_pin))
self.mpu = mpu6050.MPU6050(i2c)
self.mpu.wake()
def read_sensor_data(self):
return self.mpu.read_accel_data()
def calculate_heading(self, accel):
x, y = accel[0], accel[1]
heading = (degrees(atan2(y, x)) + 360) % 360
return heading
def calculate_tilt(self, accel):
x, y, z = accel
roll = degrees(atan2(y, (x**2 + z**2)**0.5))
return roll
def get_direction(self, yaw):
for start, end, name in DIRECTIONS:
if start <= yaw < end or (start > end and (yaw >= start or yaw < end)):
return name
return "Utara"
class LEDController:
def __init__(self, pins):
self.leds = {pin: Pin(pin, Pin.OUT) for pin in pins}
def update_leds(self, active_pins):
for pin in self.leds:
self.leds[pin].value(0)
for pin in active_pins:
self.leds[pin].value(1)
def all_on(self):
for pin in self.leds:
self.leds[pin].value(1)
def all_off(self):
for pin in self.leds:
self.leds[pin].value(0)
def connect_wifi(ssid, password):
print("Connecting to WiFi", end="")
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect(ssid, password)
while not sta_if.isconnected():
time.sleep(0.5)
print(".", end="")
print("\nConnected:", sta_if.ifconfig())
if not sta_if.isconnected():
raise Exception("Failed to connect to WiFi")
def message_callback(topic, msg):
global system_active
try:
data = json.loads(msg)
print("Received message on topic {}: {}".format(topic.decode("utf-8"), data))
if data.get("msg", "").lower() == "on":
system_active = True
print("System activated!")
except json.JSONDecodeError as e:
print("Invalid JSON received:", e)
def main():
oled = OLED()
mpu = MPU6050Sensor()
global leds, system_active
leds = LEDController(LED_PINS)
system_active = False
try:
connect_wifi(WIFI_SSID, WIFI_PASSWORD)
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, user=MQTT_USER, password=MQTT_PASSWORD)
client.set_callback(message_callback)
client.connect()
client.subscribe(MQTT_TOPIC_SUB)
print("Subscribed to topic:", MQTT_TOPIC_SUB)
while not system_active:
print("Waiting for 'on' command... System active:", system_active)
client.check_msg()
time.sleep(3)
oled.display_text([
"SIC ASSIGNMENT 1",
"SymbIoT",
"Luthfi Hanif",
"Sistem Navigasi",
"Kapal",
])
leds.all_on()
time.sleep(3)
leds.all_off()
last_publish_time = time.ticks_ms()
while True:
if time.ticks_diff(time.ticks_ms(), last_publish_time) >= 5000:
accel = mpu.read_sensor_data()
yaw = mpu.calculate_heading(accel)
roll = mpu.calculate_tilt(accel)
direction = mpu.get_direction(yaw)
payload = json.dumps({"direction": direction, "roll": roll})
client.publish(MQTT_TOPIC_PUB, payload)
print("Published:", payload)
oled.display_text([
"Dir : {:.2f} deg".format(yaw),
"Roll : {:.2f} deg".format(roll),
"{}".format(direction)
])
last_publish_time = time.ticks_ms()
client.check_msg()
time.sleep(0.1)
except Exception as e:
print("Error:", e)
if __name__ == "__main__":
main()