# selectionneur_phases_secure.py
# MicroPython (Raspberry Pi Pico)
# Version finale pour prototypage - avec mesures de sécurité logicielles
# Auteur: (Ton nom)
# Date: (mettre date)
#
# Synopsis:
# - Lit 3 canaux ADC (phases L1,L2,L3), détecte présence via seuil + hysteresis
# - Sélectionne la phase la plus proche de la consigne primaire (230 V par défaut)
# - N'effectue un switch que si la candidate améliore l'erreur d'au moins 5%
# - Demande une période de stabilisation (ex: 3s) avant commutation
# - Gère basculement sur Groupe (G) si aucune phase réseau disponible
# - Watchdog matériel pour recovery
# - E-STOP et mode maintenance
# - Optional: feedback pins for contactor closed state (aux contacts) for extra safety
#
# REMARQUE: adapter les pins ADC / GPIO en fonction de ton câblage Proteus / réel.
from machine import ADC, Pin, Timer, WDT
import time
import collections
import sys
# -------------------- CONFIGURATION / PARAMETRES --------------------
# ADC pins (Pico): ADC(0) = GP26, ADC(1) = GP27, ADC(2) = GP28
ADC_L1 = ADC(0)
ADC_L2 = ADC(1)
ADC_L3 = ADC(2)
# Relais drive outputs (à adapter si nécessaire)
OUT_RL_L1 = Pin(10, Pin.OUT) # pilote BC547 -> relais 12V -> contacteur L1
OUT_RL_L2 = Pin(11, Pin.OUT) # pilote BC547 -> relais 12V -> contacteur L2
OUT_RL_L3 = Pin(12, Pin.OUT) # pilote BC547 -> relais 12V -> contacteur L3
OUT_RL_G = Pin(13, Pin.OUT) # pilote relais groupe
# LED et contrôle E-STOP / mode maintenance
LED_OK = Pin(14, Pin.OUT) # LED verte: ON steady => OK ; clignote => défaut
PIN_ESTOP = Pin(15, Pin.IN, Pin.PULL_UP) # E-STOP (active LOW) - wiring: bouton N.C. -> GND when pressed
PIN_MAINT = Pin(16, Pin.IN, Pin.PULL_UP) # Mode maintenance (active LOW) - bloque commutation
# Optional feedback aux contacts (si tes contacteurs ont contacts auxiliaires)
# Mettre None si non utilisé
FEEDBACK_L1 = None # e.g. Pin(20, Pin.IN, Pin.PULL_DOWN)
FEEDBACK_L2 = None
FEEDBACK_L3 = None
FEEDBACK_G = None
# Watchdog (millisecondes) - le WDT doit être alimenté par Pico; donne 8s min par défaut
WDT_TIMEOUT_MS = 8000
# Filtrage / smoothing
N_SMOOTH = 6 # fenêtre moyenne glissante
SAMPLE_INTERVAL = 0.2 # sec entre mesures
# Diviseur et conversion (à calibrer)
FACTOR_DIV = 0.180327869 # 2.2k / (10k + 2.2k) = 2.2 / 12.2 ≈ 0.180327869
# ADC reference conversion (Pico read_u16 -> 0..65535 -> 0..3.3V)
ADC_TO_VOLT = 3.3 / 65535.0
# Tension nominale estimée au secondaire (après redressement + filtrage) - mesurer et ajuster
VSEC_NOMINAL = 16.3 # approximé (12VAC RMS -> Vpeak ~17 V -> - diode ~0.7 => ~16.3 V)
VPRIM_NOMINAL = 230.0
# Seuils de présence (en V secondaire)
SEUIL_PRESENT_SEC = 8.0 # en Vsec -> calibrer
HYST_PCT = 0.02 # 2% hysteresis pour presence detection
# Règle de commutation
SEUIL_SWITCH_PCT = 0.05 # 5% minimal d'amélioration relative
STABILISATION_TIME = 3.0 # secondes de stabilité pour valider une nouvelle candidate
# Délai temporisations (ms)
DELAY_AFTER_OPEN_MS = 350 # temps entre ouverture ancien contacteur et fermeture nouveau
DELAY_BEFORE_RETRY_MS = 2000 # retry delay on failure
MAX_SWITCH_ATTEMPTS = 3 # tentative max de fermeture contacteur avant fallback
# Safety bounds
MIN_VALID_ADC_VOLTAGE = 0.05 # en tension ADC convertie (V) -> valeurs très basses ignorées
# --------------------------------------------------------------------
# -------------------- INITIALISATION --------------------
# Initialize outputs to OFF (relais off)
def relays_all_off():
OUT_RL_L1.off()
OUT_RL_L2.off()
OUT_RL_L3.off()
OUT_RL_G.off()
relays_all_off()
LED_OK.off()
# Initialize smoothing queues
q1 = collections.deque([], 10)
q2 = collections.deque([], 10)
q3 = collections.deque([], 10)
# Presence state for hysteresis
presence_state = {'L1': False, 'L2': False, 'L3': False}
# Initialize Watchdog
try:
wdt = WDT(id=0,timeout=WDT_TIMEOUT_MS)
wdt.feed()
have_wdt = True
except Exception as e:
# WDT may not be available in some builds; degrade gracefully but warn.
print("Warning: WDT not initialized:", e)
wdt = None
have_wdt = False
# -------------------- UTILITAIRES --------------------
def feed_watchdog():
if have_wdt:
try:
wdt.feed()
except Exception:
pass
def safe_sleep(seconds):
# Splits long sleeps into short intervals to keep WDT alive and react to E-STOP
if seconds <= 0:
return
chunks = int(seconds / 0.2) + 1
for _ in range(chunks):
time.sleep(0.2)
if PIN_ESTOP.value() == 0:
# emergency stop pressed
handle_estop()
feed_watchdog()
def adc_voltage(adc):
# read raw and convert to voltage at ADC pin (after divider)
raw = adc.read_u16()
v = raw * ADC_TO_VOLT
return v
def vsec_from_vadc(v_adc):
# convert ADC voltage (at divider output) to secondary DC voltage
if FACTOR_DIV == 0:
return 0.0
return v_adc / FACTOR_DIV
def vprim_est_from_vsec(vsec):
# estimate primary (approx) using nominal mapping
if VSEC_NOMINAL == 0:
return 0.0
return vsec * (VPRIM_NOMINAL / VSEC_NOMINAL)
def smooth_queue(q, val, n):
if len(q) >= n:
q.popleft()
q.append(val)
return sum(q) / len(q)
def detect_presence(vsec, key):
# hysteresis-based presence detection (software)
prev = presence_state[key]
if prev:
thr = SEUIL_PRESENT_SEC * (1 - HYST_PCT)
else:
thr = SEUIL_PRESENT_SEC * (1 + HYST_PCT)
new = (vsec >= thr)
presence_state[key] = new
return new
def contactor_feedback_ok(feedback_pin):
if feedback_pin is None:
# if no hardware feedback available, assume OK but limit attempts
return True
try:
return feedback_pin.value() == 1
except Exception:
return False
def all_off_and_log():
relays_all_off()
print("[SAFETY] Tous relais OFF")
# -------------------- SIGNALISATION LED --------------------
def led_ok_on():
LED_OK.on()
def led_ok_off():
LED_OK.off()
def led_blink(times=3, period=0.2):
for _ in range(times):
LED_OK.toggle()
safe_sleep(period)
LED_OK.off()
# -------------------- LOGIQUE DE SELECTION --------------------
def read_all_measurements():
# returns dict with keys L1,L2,L3,G containing fields: vsec,vprim,present
v_adc1 = adc_voltage(ADC_L1)
v_adc2 = adc_voltage(ADC_L2)
v_adc3 = adc_voltage(ADC_L3)
vsec1 = vsec_from_vadc(v_adc1)
vsec2 = vsec_from_vadc(v_adc2)
vsec3 = vsec_from_vadc(v_adc3)
vsec1_s = smooth_queue(q1, vsec1, N_SMOOTH)
vsec2_s = smooth_queue(q2, vsec2, N_SMOOTH)
vsec3_s = smooth_queue(q3, vsec3, N_SMOOTH)
vprim1 = vprim_est_from_vsec(vsec1_s)
vprim2 = vprim_est_from_vsec(vsec2_s)
vprim3 = vprim_est_from_vsec(vsec3_s)
pres1 = detect_presence(vsec1_s, 'L1')
pres2 = detect_presence(vsec2_s, 'L2')
pres3 = detect_presence(vsec3_s, 'L3')
# group presence via digital input (assumed wired to indicate group ready)
presg = False
try:
presg = bool(OUT_RL_G.value() == 0 and False) # placeholder: don't infer group from output
except Exception:
presg = False
# If you wired a digital input for group presence, read it here instead:
# presg = Pin(GROUP_PRESENCE_PIN, Pin.IN).value() == 1
return {
'L1': {'vsec': vsec1_s, 'vprim': vprim1, 'present': pres1},
'L2': {'vsec': vsec2_s, 'vprim': vprim2, 'present': pres2},
'L3': {'vsec': vsec3_s, 'vprim': vprim3, 'present': pres3},
'G' : {'present': presg}
}
def select_best(meas):
# choose candidate among present phases whose vprim is closest to VPRIM_NOMINAL
candidates = []
for k in ['L1','L2','L3']:
if meas[k]['present']:
err = abs(meas[k]['vprim'] - VPRIM_NOMINAL)
candidates.append((k, err))
if not candidates:
# No network phase present: rely on group if present (we use digital flag - optional)
if meas['G']['present']:
return 'G'
return None
# sort by smallest error
candidates.sort(key=lambda x: x[1])
return candidates[0][0]
# -------------------- CONTROL RELAYS with SAFETY --------------------
def activate_relay_for(source, feedback_pin=None):
"""
Opens all relays, waits a safe delay, then closes the target relay.
Uses feedback_pin (optional) to verify closure. Retries limited.
"""
# Interlocking: open all
relays_all_off()
safe_sleep(DELAY_AFTER_OPEN_MS / 1000.0)
# Do closure attempt
attempts = 0
success = False
while attempts < MAX_SWITCH_ATTEMPTS:
attempts += 1
if source == 'L1':
OUT_RL_L1.on()
elif source == 'L2':
OUT_RL_L2.on()
elif source == 'L3':
OUT_RL_L3.on()
elif source == 'G':
OUT_RL_G.on()
else:
# Nothing to activate
break
# wait short time for contact to settle
safe_sleep(0.2)
# verify feedback (if available)
if contactor_feedback_ok(feedback_pin):
success = True
break
else:
# if no feedback hardware, assume closure success but still allow retry attempts
if feedback_pin is None:
success = True
break
# else try again after small backoff
all_off_and_log()
safe_sleep(DELAY_BEFORE_RETRY_MS / 1000.0)
if not success:
# failure to close contactor -> open all and return False
all_off_and_log()
print("[ERROR] Echec fermeture contacteur pour", source)
return False
print("[INFO] Contacteur ferme pour", source)
return True
# -------------------- EMERGENCY / MAINTENANCE HANDLERS --------------------
def handle_estop():
# Immediately open all and block further automatic switching until reset
print("[EMERGENCY] E-STOP activé ! Ouverture de tous les contacteurs.")
all_off_and_log()
# Blink LED to indicate E-STOP
while PIN_ESTOP.value() == 0:
LED_OK.toggle()
time.sleep(0.2)
# When released, require operator to re-enable (here we wait a second)
print("[EMERGENCY] E-STOP relâché - attente avant reprise automatique.")
safe_sleep(2)
# Optionally require manual reset sequence - here we just return and allow restart
def is_maintenance_mode():
return PIN_MAINT.value() == 0
# -------------------- MAIN LOOP --------------------
def main_loop():
current_source = None
last_switch_ms = time.ticks_ms()
# Start with trying to select an initial available phase
while True:
# Safety checks
if PIN_ESTOP.value() == 0:
handle_estop()
# don't auto resume until manual reset / release of E-STOP
continue
if is_maintenance_mode():
# In maintenance mode: open all and wait
print("[MAINT] Mode maintenance actif - automatic switching désactivé.")
all_off_and_log()
LED_OK.off()
safe_sleep(1.0)
feed_watchdog()
continue
meas = read_all_measurements()
best = select_best(meas)
# Debug: print measurements (sparse)
print("[MEAS] L1:{:.1f}V L2:{:.1f}V L3:{:.1f}V best: {}".format(
meas['L1']['vprim'], meas['L2']['vprim'], meas['L3']['vprim'], best))
if current_source is None:
# choose initial source
if best is None:
# no phase -> try group if available else idle
print("[INFO] Aucune phase réseau détectée.")
# if group present logic exists, we could start it here (but starting generators needs external control)
# For safety, do not automatically crank group; require external group signal
led_blink(times=2, period=0.3)
safe_sleep(SAMPLE_INTERVAL)
feed_watchdog()
continue
# Activate best candidate
# Wait stabilization before committing
stable = True
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < int(STABILISATION_TIME * 1000):
meas_tmp = read_all_measurements()
if select_best(meas_tmp) != best:
stable = False
break
safe_sleep(SAMPLE_INTERVAL)
if stable:
# Attempt activation
fb_pin = None
if best == 'L1' and FEEDBACK_L1: fb_pin = FEEDBACK_L1
if best == 'L2' and FEEDBACK_L2: fb_pin = FEEDBACK_L2
if best == 'L3' and FEEDBACK_L3: fb_pin = FEEDBACK_L3
ok = activate_relay_for(best, fb_pin)
if ok:
current_source = best
last_switch_ms = time.ticks_ms()
led_ok_on()
else:
# failed to activate; stay with None
led_blink(times=3, period=0.15)
else:
# unstable candidate
safe_sleep(SAMPLE_INTERVAL)
else:
# Normal running: check if current still valid
if current_source in ['L1','L2','L3']:
if not meas[current_source]['present']:
# current phase lost -> immediate safe open and try to find new
print("[WARN] Phase courante perdue:", current_source)
all_off_and_log()
current_source = None
led_blink(times=4, period=0.15)
safe_sleep(0.5)
continue
if best == current_source:
# nothing to do
pass
elif best is None:
# No network candidates -> consider switching to group if group present
print("[WARN] Plus aucune phase réseau disponible.")
# Do not auto-start genset here unless external control; optionally switch to RG if signal present
# For safety, open all and wait
all_off_and_log()
current_source = None
led_blink(times=4, period=0.12)
safe_sleep(1.0)
continue
else:
# Compare improvement
err_curr = 9999.0
err_best = 9999.0
if current_source != 'G' and current_source is not None:
err_curr = abs(meas[current_source]['vprim'] - VPRIM_NOMINAL)
if best != 'G':
err_best = abs(meas[best]['vprim'] - VPRIM_NOMINAL)
improvement = 0.0
if err_curr > 0 and err_curr < 9999.0:
improvement = (err_curr - err_best) / err_curr
else:
# if currently on group or unknown, prefer network candidate
improvement = 1.0
if improvement >= SEUIL_SWITCH_PCT:
# require stabilization
stable = True
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < int(STABILISATION_TIME * 1000):
meas_tmp = read_all_measurements()
if select_best(meas_tmp) != best:
stable = False
break
safe_sleep(SAMPLE_INTERVAL)
if stable:
# perform safe switch
fb_pin = None
if best == 'L1' and FEEDBACK_L1: fb_pin = FEEDBACK_L1
if best == 'L2' and FEEDBACK_L2: fb_pin = FEEDBACK_L2
if best == 'L3' and FEEDBACK_L3: fb_pin = FEEDBACK_L3
ok = activate_relay_for(best, fb_pin)
if ok:
current_source = best
last_switch_ms = time.ticks_ms()
print("[INFO] Switch effectué vers", best)
led_ok_on()
else:
print("[ERROR] Echec switch vers", best, "- reprise sur", current_source)
# if failure, ensure safe state and retry later
safe_sleep(1.0)
else:
# not stable long enough
pass
else:
# no sufficient improvement
pass
# heartbeat / keepalive
feed_watchdog()
safe_sleep(SAMPLE_INTERVAL)
# -------------------- START --------------------
if __name__ == "__main__":
print("=== Selectionneur automatique de phases - Mode sécurisé ===")
try:
main_loop()
except KeyboardInterrupt:
print("Arrêt par KeyboardInterrupt")
all_off_and_log()
except Exception as e:
print("Exception critique:", e)
all_off_and_log()
raise