> %Run -c $EDITOR_CONTENT
======================================================================
Zadanie 2.1: Budowa sensora z interfejsem IP
======================================================================
Czujnik wilgotnosci: GPIO 4
W5500 SPI: CS=5, SCK=18, MISO=19, MOSI=23
======================================================================
[W5500] Inicjalizacja modulu...
[W5500] Ustawianie MAC: DE:AD:BE:EF:FE:ED
[W5500] Ustawianie Gateway: 192.168.1.1
[W5500] Ustawianie Subnet: 255.255.255.0
[W5500] Ustawianie IP: 192.168.1.100
[W5500] Odczytane IP: 0.0.0.0
[W5500] Inicjalizacja zakonczona
[WiFi] Laczenie z siecia...
...
[WiFi] Polaczono
[WiFi] IP: 192.168.1.73
[HTTP] Serwer uruchomiony na porcie 80
[HTTP] Dostep przez:
Ethernet: http://192.168.1.100
Wi-Fi: http://192.168.1.73
======================================================================
[INFO] Oczekiwanie na polaczenia...
"""
IoT Lab 03 - Zadanie 2.1: Budowa sensora z interfejsem IP
Funkcjonalność:
- Odczyt danych z czujnika wilgotności (GROVE-WATER-SENSOR)
- Komunikacja przez Ethernet (WIZnet W5500) i Wi-Fi
- Prosty serwer HTTP wyświetlający odczytane wartości
Połączenia:
Czujnik wilgotności (GROVE-WATER-SENSOR):
- SIG → GPIO 4 (D4)
- GND → GND
- VCC → 3.3V
WIZnet W5500 (interfejs SPI):
- CS → GPIO 5 (D5)
- SCK → GPIO 18 (D18)
- MISO → GPIO 19 (D19)
- MOSI → GPIO 23 (D23)
- VCC → 3.3V
- GND → GND
"""
from machine import Pin, SPI
import network
import socket
import time
import struct
WATER_SENSOR_PIN = 4
W5500_CS_PIN = 5
W5500_SCK_PIN = 18
W5500_MISO_PIN = 19
W5500_MOSI_PIN = 23
WIFI_SSID = "IoT_Lab"
WIFI_PASSWORD = "IronManLamus"
ETH_IP = "192.168.1.100"
ETH_SUBNET = "255.255.255.0"
ETH_GATEWAY = "192.168.1.1"
ETH_DNS = "8.8.8.8"
print("=" * 70)
print("Zadanie 2.1: Budowa sensora z interfejsem IP")
print("=" * 70)
water_sensor = Pin(WATER_SENSOR_PIN, Pin.IN)
cs = Pin(W5500_CS_PIN, Pin.OUT)
cs.value(1)
spi = SPI(2,
baudrate=2000000,
polarity=0,
phase=0,
sck=Pin(W5500_SCK_PIN),
mosi=Pin(W5500_MOSI_PIN),
miso=Pin(W5500_MISO_PIN))
print("Czujnik wilgotnosci: GPIO " + str(WATER_SENSOR_PIN))
print("W5500 SPI: CS=" + str(W5500_CS_PIN) + ", SCK=" + str(W5500_SCK_PIN) + ", MISO=" + str(W5500_MISO_PIN) + ", MOSI=" + str(W5500_MOSI_PIN))
print("=" * 70)
def read_water_sensor():
"""
Odczytaj wartość z czujnika wilgotności.
Returns:
dict: {'digital': bool, 'status': str}
"""
digital_value = water_sensor.value()
status = "MOKRO" if digital_value == 1 else "SUCHO"
return {
'digital': digital_value,
'status': status
}
class W5500:
MR = 0x0000
GAR = 0x0001
SUBR = 0x0005
SHAR = 0x0009
SIPR = 0x000F
def __init__(self, spi, cs):
self.spi = spi
self.cs = cs
def write_reg(self, addr, data):
self.cs.value(0)
control = 0x04
self.spi.write(struct.pack('>HB', addr, control))
if isinstance(data, (list, bytes)):
self.spi.write(bytes(data))
else:
self.spi.write(bytes([data]))
self.cs.value(1)
def read_reg(self, addr, length=1):
self.cs.value(0)
control = 0x00
self.spi.write(struct.pack('>HB', addr, control))
data = self.spi.read(length)
self.cs.value(1)
return data
def init(self, ip, subnet, gateway, mac):
print("\n[W5500] Inicjalizacja modulu...")
self.write_reg(self.MR, 0x80)
time.sleep_ms(100)
mac_str = ""
for b in mac:
mac_str += "%02X:" % b
print("[W5500] Ustawianie MAC: " + mac_str[:-1])
for i, byte in enumerate(mac):
self.write_reg(self.SHAR + i, byte)
print("[W5500] Ustawianie Gateway: " + gateway)
gateway_bytes = [int(x) for x in gateway.split('.')]
for i, byte in enumerate(gateway_bytes):
self.write_reg(self.GAR + i, byte)
print("[W5500] Ustawianie Subnet: " + subnet)
subnet_bytes = [int(x) for x in subnet.split('.')]
for i, byte in enumerate(subnet_bytes):
self.write_reg(self.SUBR + i, byte)
print("[W5500] Ustawianie IP: " + ip)
ip_bytes = [int(x) for x in ip.split('.')]
for i, byte in enumerate(ip_bytes):
self.write_reg(self.SIPR + i, byte)
time.sleep_ms(100)
read_ip = self.read_reg(self.SIPR, 4)
ip_str = str(read_ip[0]) + "." + str(read_ip[1]) + "." + str(read_ip[2]) + "." + str(read_ip[3])
print("[W5500] Odczytane IP: " + ip_str)
print("[W5500] Inicjalizacja zakonczona\n")
def init_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('\n[WiFi] Laczenie z siecia...')
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
timeout = 10
while not wlan.isconnected() and timeout > 0:
time.sleep(1)
timeout -= 1
print('.', end='')
print()
if wlan.isconnected():
print('[WiFi] Polaczono')
print('[WiFi] IP: ' + wlan.ifconfig()[0])
return wlan
else:
print('[WiFi] Nie udalo sie polaczyc')
return None
def init_ethernet():
try:
w5500 = W5500(spi, cs)
mac = bytes([0xDE, 0xAD, 0xBE, 0xEF, 0xFE, 0xED])
w5500.init(ETH_IP, ETH_SUBNET, ETH_GATEWAY, mac)
return w5500
except Exception as e:
print('[Ethernet] Blad inicjalizacji: ' + str(e))
return None
def create_http_response(sensor_data):
html = """<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Sensor IoT</title>
<meta http-equiv="refresh" content="5">
</head>
<body>
<h1>Czujnik Wilgotnosci IoT</h1>
<h2>Odczyt czujnika:</h2>
<p>Status: {status}</p>
<p>Wartosc cyfrowa: {digital}</p>
<h2>Interfejsy sieciowe:</h2>
<p>Ethernet (W5500): {eth_ip}</p>
<p>Wi-Fi: {wifi_ip}</p>
<h2>Informacje:</h2>
<p>Ostatni odczyt: {timestamp}</p>
<p>Strona odswieza sie automatycznie co 5 sekund</p>
</body>
</html>"""
timestamp = time.localtime()
timestamp_str = "%02d:%02d:%02d" % (timestamp[3], timestamp[4], timestamp[5])
html = html.format(
status=sensor_data['status'],
digital=sensor_data['digital'],
eth_ip=ETH_IP,
wifi_ip="Polaczone" if wifi else "Niepolaczone",
timestamp=timestamp_str
)
response = "HTTP/1.1 200 OK\r\n"
response += "Content-Type: text/html; charset=utf-8\r\n"
response += "Connection: close\r\n"
response += "Content-Length: " + str(len(html)) + "\r\n"
response += "\r\n"
response += html
return response
def start_http_server(port=80):
addr = socket.getaddrinfo('0.0.0.0', port)[0][-1]
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(addr)
s.listen(1)
print('\n[HTTP] Serwer uruchomiony na porcie ' + str(port))
print('[HTTP] Dostep przez:')
print(' Ethernet: http://' + ETH_IP)
if wifi:
print(' Wi-Fi: http://' + wifi.ifconfig()[0])
print('=' * 70)
print('\n[INFO] Oczekiwanie na polaczenia...\n')
return s
try:
eth = init_ethernet()
wifi = None
try:
wifi = init_wifi()
except:
print("[WiFi] Pominieto inicjalizacje Wi-Fi")
server = start_http_server(80)
while True:
try:
cl, addr = server.accept()
print('[HTTP] Polaczenie od: ' + str(addr))
request = cl.recv(1024).decode('utf-8')
req_parts = request.split()
if len(req_parts) >= 2:
print('[HTTP] Zadanie: ' + req_parts[0] + ' ' + req_parts[1])
else:
print('[HTTP] Zadanie: Invalid')
sensor_data = read_water_sensor()
print('[SENSOR] Status: ' + sensor_data["status"] + ' (wartosc: ' + str(sensor_data["digital"]) + ')')
response = create_http_response(sensor_data)
cl.send(response.encode('utf-8'))
cl.close()
print('[HTTP] Odpowiedz wyslana\n')
except OSError as e:
print('[HTTP] Blad polaczenia: ' + str(e))
continue
except KeyboardInterrupt:
print("\n\n[STOP] Zatrzymywanie serwera...")
server.close()
print("[INFO] Serwer zatrzymany.")
except Exception as e:
print("\n[ERROR] Wystapil blad: " + str(e))
import sys
sys.print_exception(e)