Réaction rapide lors d'une modification des paramètres
Avant-propos
Ce tuto termine le chapitre 'Coding des nodes'
np.nan
.get_matrix()
.
Description
ctrl_node.py
.show_matplotlib.py
.ctrl_node.py
:Node
qui hérite de CtrlNode
, nécessaire pour le PC
(Poste de contrôle).Calcul
qui hérite de CtrlCalcul
, nécessaire pour les calculs.ctrl_node.py
contient ces 2 classes-mères, c'est à dire tout le code factorisé.fichier complet /pc/ctrl_node.py
:
# Imports externes
import copy
import shutil
import os
import hashlib
import numpy as np
import pickle
# Imports internes
from functions.utils import Dictionary, DateTime, Utils
from pc.ui_node import UiNode
from pc.parameters import Parameters
from pc.ctrl_socket import CtrlSocket
from pc.ctrl_edge import CtrlEdge
class Helper:
@staticmethod
def deepcopy(reference):
return copy.deepcopy(reference)
@staticmethod
def new_od(dic=None):
return Dictionary(dic)
@staticmethod
def join(*l_variants, sep='-'):
""" Exemple d'appel 1 : y = self.join(a, b, c, d, ...)
Exemple d'appel 2 : y = self.join(*l_x) <-- Ne pas oublier '*'
Concatène les éléments de l_strings en une chaine. Les chaînes vides sont ignorées. le séparateur est sep.
:param l_variants: Nombre quelconque d'éléments de tout type.
:param sep: '-' par défaut. Peut être '' (vide) ou \n (retour à la ligne) ... ou autre.
:return: Chaîne concaténée.
Exemple l_strings = ['Paris', 'Lille', '', 'Bruxelles', 'Prague'] => return "Paris-Lille-Bruxelles-Prague"
"""
string_txt = ''
for string in l_variants:
if string != '':
string_txt += f'{sep}{string}' # Accepte tout type de variable pour string.
return string_txt[len(sep):] # Suppression du 1er sep.
class CtrlNode(Helper):
def __init__(self, o_scene, s_id, pos):
self.o_scene = o_scene
self.s_id = s_id # Clé d'entrée dans le super-dictionnaire pkl.
self.od_pkl = self.o_scene.o_pkl.od_pkl
self.o_grnode = None
self.o_params = None
self.pos = pos # pos = (x, y)
self.width = 96
self.height = 60
self.type = ''
self.ut = Utils()
self.main_key = f"Paramètres du node '{self.s_id}'"
self.b_chk = bool(self.od_pkl.read([self.s_id, 'b_chk'], True))
self.b_on = self.b_chk
self.child_file = ''
""" Sockets. """
self.lo_sockets_in = list() # Liste d'objets 'sockets in' instanciés.
self.lo_sockets_out = list() # Liste d'objets 'sockets out' instanciés.
self.default_color = '#88bbff' # Couleur par défaut des sockets de sortie.
self.l_sep_inputs = [] # Emplacement des séparateurs (après les entrées indiquées).
self.l_sep_outputs = [] # Emplacement des séparateurs (après les sorties indiquées).
""" Contenu spécifique. """
self.o_grcontent = None
""" Secousses => Court-circuits. """
self.dt = DateTime()
self.l_short_circuits = list() # Surchargé par les classes dérivées.
self.l_shortables = list()
self.b_removable = True # Les edges d'entrée peuvent être détruits en cas de court-circuit.
self.k_shake = 0 # Nombre de secousses.
self.b_shaking = False # Secousse en cours.
self.x, self.y = 0, 0 # Position antérieure (x, y), mémorisée.
self.dx, self.dy = 0, 0 # Deltas (variations) antérieurs, mémorisés.
def setup(self, child_file):
""" Code appelant : classe dérivée, on connait donc ses attributs. """
""" Paramètres. """
self.o_params = Parameters(self)
""" Hauteur automatique du node : elle dépend du nombre de sockets en entrée ou en sortie (le plus grand). """
h_title = self.d_display['geometry']['h_title']
nb_sockets_max = max(len(self.ld_inputs), len(self.ld_outputs)) # En fait : Sockets + séparateurs.
first_y = 16 * (1 + (h_title + 4) // 16) # Multiple de 16.
self.height = first_y + nb_sockets_max * 16 - 4
""" Nouveau node par drag & drop. """
self.child_file = child_file
b_new = self.pos != (0, 0) # Nouveau node (node dropé).
if b_new:
""" - Cas d'un nouveau node (dropé).
- Son centre apparaît exactement à la position du curseur de la souris.
- On mémorise son path et sa position. """
self.pos = self.pos[0] - self.width // 2, self.pos[1] - self.height // 2
path = 'nodes' + os.path.relpath(child_file).split('nodes')[1][:-3].replace(os.sep, '.')
self.od_pkl.write([self.s_id, 'path'], path)
else:
self.pos = self.od_pkl.read([self.s_id, 'position'], (0, 0))
""" Instanciation de l'UI. """
self.o_grnode = UiNode(self) # Gestion de l'UI (Interface utilisateur) : dessin couleurs, ...
if b_new:
self.o_grnode.save_pos() # Important ! à la fin (contient o_pkl.backup()).
self.o_scene.o_grscene.addItem(self.o_grnode) # Incorporation dans la scène.
""" Sockets. """
num_socket = 0
for i, d_input in enumerate(self.ld_inputs):
""" On complète les dictionnaires de base. """
if isinstance(d_input, dict): # Si ce n'est pas un dictionnaire, c'est un séparateur.
d_input['pos'] = True, i # On ajoute l'emplacement. Tuple (input: True, num_position).
d_input['id'] = self.id, num_socket # On ajoute l'identifiant. Tuple (id_node, num_socket)
self.lo_sockets_in.append(CtrlSocket(self, d_input)) # Socket instancié et ajouté à la liste.
num_socket += 1
num_socket = 0
for i, d_output in enumerate(self.ld_outputs):
""" On complète les dictionnaires de base. """
if isinstance(d_output, dict): # Si ce n'est pas un dictionnaire, c'est un séparateur.
d_output['pos'] = False, i # On ajoute l'emplacement. Tuple (input: False, num_position).
d_output['id'] = self.id, num_socket # On ajoute l'identifiant. Tuple (id_node, num_socket)
self.lo_sockets_out.append(CtrlSocket(self, d_output)) # Socket instancié et ajouté à la liste.
num_socket += 1
self.short_circuit_check()
@property
def id(self):
""" Ex : 'Node 14' renvoie 14. """
return int(self.s_id[4:])
@property
def d_display(self):
""" Attributs par défaut pouvant être surchargés dans les classes dérivées. """
return {
'geometry': {'h_title': 32, 'round': 6},
'col_pen': {'select': '#ffa637', 'hover': '#888', 'on': "#000", 'off': '#aaaa00'}, # ordre = priorité.
'col_brush': {'on': "#8888bbcc", 'off': '#cccccccc'},
'thick_pen': {'hover': 4., 'leave': 2.}
}
def get_default(self):
""" Titre du node et couleurs des 'socket_out' pour chaque type de node. """
""" Complétée par les classes dérivées avec leurs paramètres statiques et dynamiques. """
""" 1) Paramètres communs : Titre, bouton et autant de couleurs que de sorties. """
d_default = {
'Titre du node': 'Choisir un titre',
'*🔆': '' # Un astérisque devant indique qu'il s'agit d'un bouton.
}
""" 2) Ajout de paramètres fixes spécifiques au type de node, AVANT les couleurs. """
d_default.update(self.fixed0_params()) # méthode fixed_params()
""" 3) Couleurs. """
d_colors = dict()
for d_output in self.ld_outputs:
if isinstance(d_output, dict):
d_colors[d_output['label']] = self.default_color
if d_colors:
d_default['Couleurs'] = d_colors
""" 4) Ajout de paramètres fixes spécifiques au type de node, APRES les couleurs. """
d_default.update(self.fixed_params()) # méthode fixed_params()
""" 5) Ajout des paramètres dynamiques, qui dépendent des signaux entrants. """
d_default.update(self.dynamic_params) # propriété dynamic_params
""" 6) Dictionnaire complet. Paramètres : fixes communs + fixes spécifiques + dynamiques. """
return {self.main_key: d_default}
def get_param(self, l_key, v_default=None):
if self.o_params is None:
return v_default
od_params = Dictionary(self.o_params.od_params[self.main_key])
return od_params.read(l_key, v_default)
def set_checked(self, val):
self.b_chk = bool(val)
""" Update my_signals() """
for o_socket_out in self.lo_sockets_out:
o_socket_out.to_update()
""" Infrastructure. """
self.o_scene.infrastructure()
""" Enregistrement. """
self.od_pkl.write([self.s_id, 'b_chk'], self.b_chk)
self.o_scene.o_ur.b_action = True # Ajout dans l'historique du "Undo-Redo".
self.o_scene.o_pkl.backup()
""" ************************* Méthodes à surcharger ************************** """
@property
def ld_inputs(self):
return list()
@property
def ld_outputs(self):
return list()
def fixed0_params(self):
""" - Renvoie un dictionnaire de paramètres, indépendants des connexions.
- Placé AVANT les couleurs.
- Exemple : le node de type 'Labo' renvoie 'Expérience', etc. """
return {}
def fixed_params(self):
""" - Renvoie un dictionnaire de paramètres, indépendants des connexions.
- Placé APRES les couleurs.
- Exemple : le node de type 'Signaux' renvoie 'Sinus', 'Cosinus', etc. """
return {}
def bundle_params(self):
""" - Renvoie un dictionnaire de paramètres pour chaque entrée.
- Une entrée reçoit un faisceau de signaux (bundle). """
return {}
def my_params(self, context):
"""
:param context: Liste de 7 valeurs, pouvant être utilisée pour construire le dictionnaire de sortie :
- context[0] typ_id_from = Type + id du node en amont, producteur du signal. Ex : 'MM12'.
- context[1] signal_ante_from = Nom du signal créé par le node en amont du node producteur. Ex : 'Cosinus'.
- context[2] signal_now_from = Nom du signal créé par le node en amont. Ex : 'SMA18'.
- context[3] signal_source_from = 'Provenance du signal' affiché dans le node en amont.
- context[4] (num_input, num_signal) = Tuple. num_signal = ordre du signal dans le faisceau entrant.
- context[5] signal_title = Titre du signal dans le dockable des paramètres.
- context[6] signal_source = 'Provenance du signal' -> texte avec retours à la ligne.
:return: Dictionnaire formaté pour les paramètres.
|_ Exemple {'Type de MM': ['SMA', {'values': ['SMA', 'EMA', 'SMMA', 'LWMA']}], "Périodes (sep=',')": '14'}
"""
return {}
def my_signals(self, l_signals_in, num_socket_out):
""" Description des signaux délivrés à la sortie N° num_socket_out.
Comment ça marche ?
- Cette fonction prend en entrée tous les signaux entrants ainsi que les paramètres pkl.
- Un traitement spécifique est décrit, qui produit plusieurs signaux.
- Ces signaux sont ensuite distribués aux différentes sorties du node.
:param l_signals_in: Signaux d'entrée. Cette liste contient autant d'éléments que d'entrées connectées.
- Chaque élément est un tuple : (typ_id_from, signal_ante_from, signal_now_from, signal_source_from, \
(num_input, num_signal), signal_title, typ_id, signal_ante, signal_source)
(Les variables suffixées de '_from' proviennent du node en amont).
:param num_socket_out: N° de la sortie.
:return: Liste de signaux exclusivement destinés à la sortie N° num_socket_out.
Chaque signal est décrit par un tuple à 4 valeurs : (typ_id, signal_ante, signal_now, signal_source).
La valeur de signal_now doit être soigneusement choisie afin d'éviter des doublons de noms.
"""
raise SystemExit(self.child_file + '\nCtrlNode.my_signals() : Cette méthode doit être surchargée.')
""" *********************** Fin - Méthodes à surcharger ********************** """
@property
def dynamic_params(self):
""" Code appelant : self.get_default().
Ces paramètres sont dynamiques car ils dépendent des signaux connectés aux entrées.
- Ce node demande la liste des signaux à chaque socket d'entrée : o_socket_in.l_signals
- Cette liste est formatée de la même façon pour tous les types de node.
- C'est une liste de tuples, chaque tuple a 4 valeurs et caractérise un signal.
- Il est rappelé qu'un edge véhicule non pas un signal, mais un faisceau de signaux (un faisceau).
- Le format d'un tuple est le suivant :
- typ_id_from = Type + id du node en amont, producteur du signal. Ex : 'MM12'.
- signal_ante_from = Nom du signal créé par le node en amont du node producteur. Ex : 'Cosinus'.
- signal_now_from = Nom du signal créé par le node producteur. Ex : 'SMA18'.
- signal_source_from = Texte destiné à être affiché dans le dockable, dans 'Provenance du signal :'
- Node
|_ Socket_in
|_ Faisceau de signaux
|_ Signal
|_ Paramètre dynamique.
"""
if not self.lo_sockets_in:
""" Tant que l'initialisation n'est pas terminée, lo_sockets_in est une liste vide."""
return {}
d_params = dict()
nb_inputs = len(self.lo_sockets_in)
for k in range(nb_inputs):
d_input = self.bundle_params() # Paramètres pour cette entrée.
""" Récupération en cascade : Socket_in -> Edge -> Socket_out -> Node -> Socket_in -> etc. """
for num_signal, t_signal in enumerate(self.lo_sockets_in[k].l_signals):
typ_id_from, signal_ante_from, signal_now_from, signal_source_from = t_signal[:4] # tuple à 4 valeurs.
signal_title = self.join(typ_id_from, signal_ante_from, signal_now_from)
signal_source = self.join(signal_source_from, self.join(typ_id_from, signal_now_from), sep='\n')
""" Paramètres communs à tous les signaux. """
d_input[signal_title] = {
'$Provenance du signal :': [signal_source, {'readonly': True}],
'Signal actif': True,
}
l_args = [num_signal, signal_title, signal_source] + list(t_signal)
d_input[signal_title].update(self.my_params(l_args))
d_params[f'Entrée {k}'] = d_input
if nb_inputs == 1 and 'Entrée 0' in d_params:
""" S'il n'y a qu'une entrée, inutile d'écrire, dans le titre des paramètres, de laquelle il s'agit. """
d_params = d_params['Entrée 0']
return d_params
def get_signals(self, num_socket):
""" Code appelant : oSocket_out N° num_socket.
- Ce node crée des signaux et les fournit aux nodes en aval par l'intermédiaire des sockets et des edges.
- Il crée autant de faisceaux de signaux (vecteurs) qu'il a de sorties.
- Pour chaque sortie, la liste fournie est formatée de la même façon pour tout type de node.
- Voir détails ci-dessus, dans dynamic_params().
"""
if not (self.b_chk and self.lo_sockets_out): # Setup en cours
return []
""" Update od_params. """
self.o_params.set_params()
nb_inputs = len(self.lo_sockets_in)
l_all_signals_in = list()
for k in range(nb_inputs):
""" Le nom de l'entrée se sera pas écrit dans le titre des paramètres, s'il n'y en a qu'une. """
l_name_input = [f'Entrée {k}'] if nb_inputs > 1 else []
l_signals = list() # Liste des signaux actifs de l'entrée N° k.
""" self.lo_sockets_in[k].l_signals = Faisceau de signaux arrivant sur l'entrée k. """
for num_signal, t_signal in enumerate(self.lo_sockets_in[k].l_signals):
typ_id_from, signal_ante_from, signal_now_from, signal_source_from = t_signal[:4] # tuple à 4 valeurs.
signal_title = self.join(typ_id_from, signal_ante_from, signal_now_from)
if self.get_param(l_name_input + [signal_title, 'Signal actif'], False):
typ_id, signal_ante = f'{self.type}{self.id}', self.join(signal_ante_from, signal_now_from)
signal_source = self.join(signal_source_from, self.join(typ_id_from, signal_now_from), sep='\n')
l_signals.append(t_signal[:4] + ((k, num_signal), signal_title, typ_id, signal_ante, signal_source))
l_all_signals_in.append(l_signals)
return self.my_signals(l_all_signals_in, num_socket)
""" ******************************* Secousses ******************************* """
def short_circuit_check(self):
""" Passage unique dans cette méthode, au démarrage. Fin programme si erreur.
- Peut-on supprimer les edges d'entrée en cas de court-circuit ?
- Oui, à condition que TOUTES les sorties existent dans self.l_short_circuits. """
l_indx_outputs = list() # Liste des index de sortie.
d_counter = dict()
for short_circuit in self.l_short_circuits: # Exemple -> self.l_short_circuits = [(0, 0), (0, 2)]
num_input = short_circuit[0]
num_output = short_circuit[1]
if num_input >= len(self.ld_inputs):
""" Fin programme. """
raise SystemExit(f"La liste des court-circuits du type de node '{self.type}' est erronée."
f"\nL'entrée N°{num_input} est hors limites."
f"\nRevoir le contenu de self.l_short_circuits dans le setup de '{self.type}'.")
if num_output >= len(self.ld_outputs):
""" Fin programme. """
raise SystemExit(f"La liste des court-circuits du type de node '{self.type}' est erronée."
f"\nLa sortie N°{num_output} est hors limites."
f"\nRevoir le contenu de self.l_short_circuits dans le setup de '{self.type}'.")
l_indx_outputs.append(num_output) # Dans cet exemple -> ( 0 , 2 )
if num_output in d_counter:
d_counter[num_output] += 1
else:
d_counter[num_output] = 1
for i, d_output in enumerate(self.ld_outputs):
if isinstance(d_output, dict):
if i not in l_indx_outputs:
""" Condition non satisfaite. """
self.b_removable = False # b_removable est à True dans __init__().
break
""" Contrainte technologique : Plusieurs entrées sur une même sortie est interdit ! """
for nb_inputs in d_counter.values():
if nb_inputs > 1:
""" Fin programme. """
raise SystemExit("Plusieurs entrées sont court-circuitables sur une même sortie."
f"\nRevoir le contenu de self.l_short_circuits dans le setup de '{self.type}'.")
def set_shortables(self):
""" Méthode appelée lorsque le bouton gauche de la souris est appuyé (sur ce node).
- Chaque type de node possède sa liste de court-circuits faisables.
- Retourne une liste de tuples.
- Chaque tuple est composé des 2 gr_edges pouvant être réunis pour n'en faire qu'un. """
self.l_shortables = list()
for short_circuit in self.l_short_circuits: # Exemple -> self.l_short_circuits = [(0, 0), (0, 2)]
lo_gredges_in = list(self.lo_sockets_in[short_circuit[0]].get_gredges())
lo_gredges_out = list(self.lo_sockets_out[short_circuit[1]].get_gredges())
if lo_gredges_in: # Cette liste contient 0 ou 1 edge.
for o_gredge_out in lo_gredges_out:
self.l_shortables.append((lo_gredges_in[0], o_gredge_out))
def shake(self):
def shortcircuit():
for to_gredge in self.l_shortables: # Tuples de gredges.
o_socket_out = to_gredge[0].o_edge.o_socket_out # From, depuis.
o_socket_in = to_gredge[1].o_edge.o_socket_in # To, jusqu'à.
""" Suppression des 2 edges du tuple. """
self.o_scene.o_grscene.removeItem(to_gredge[1]) # Edge de sortie.
if self.b_removable:
self.o_scene.o_grscene.removeItem(to_gredge[0]) # Edge d'entrée.
""" Création du nouvel edge de court-circuit. """
CtrlEdge(self.o_scene, o_socket_out, o_socket_in)
o_socket_in.to_update()
""" Persistance. """
self.o_scene.save_edges(backup=True)
""" Infrastructure. """
self.o_scene.infrastructure()
def raz_shakes():
""" Après cette raz, un nouveau court-circuit est autorisé. """
self.k_shake = 0
self.b_shaking = False
if self.b_shaking or not self.l_shortables:
return
self.dt.delay(raz_shakes, delay=300)
x, y = self.o_grnode.pos().x(), self.o_grnode.pos().y() # Positions.
dx, dy = x - self.x, y - self.y # Déplacements (delta x, delta y).
b_cond = (round(dx*self.dx) < 0) or (round(dy*self.dy) < 0)
""" Mémorisation. """
self.x, self.y = x, y
self.dx, self.dy = dx, dy
if b_cond:
""" Le mouvement a changé de sens => incrémentation du compteur. """
self.k_shake += 1
if self.k_shake > 10: # Valeur à ajuster.
self.b_shaking = True
self.k_shake = 0
self.dt.delay(shortcircuit, delay=50)
""" ***************************** Fin secousses ***************************** """
def set_colors(self, l_keys):
""" - La couleur est appliquée au socket_out concerné.
- Elle est également appliquée à tous ses éventuels edges connectés.
- A leur tour, ceux-ci propagent cette couleur à leur socket d'arrivée. """
for o_socket_out in self.lo_sockets_out:
if o_socket_out.__class__.__name__ == 'CtrlSocket':
if o_socket_out.label == l_keys[-1]:
""" Affectation de la couleur au socket_out concerné. """
o_socket_out.color = self.get_param(l_keys, '#666')
""" Récupération de tous les edgess partant de ce socket. """
l_gredges = list(o_socket_out.get_gredges())
for o_gredge in l_gredges:
""" Affectation de la même couleur à chaque edge. """
o_gredge.o_edge.color = o_socket_out.color
""" Affectation de la même couleur à chaque socket d'arrivée. """
o_gredge.o_edge.o_socket_in.color = o_socket_out.color
""" Rafraîchissement de l'affichage. """
self.o_grnode.update()
def need_update(self, l_keys):
"""
:param l_keys: Clé, dans od_params, du paramètre modifié (liste).
:return: True si les nodes en aval nécessitent d'être recalculés, False sinon.
"""
l_param_names = list(self.my_params('')) + list(self.fixed_params()) # Les paramètres du dockable.
b_actif = self.get_param([l_keys[1], 'Signal actif'], True) # True 'Signal actif' n'existe pas (générateurs).
""" Update nécessaire (si l'état actif a changé) OU (si actif ET un des paramètres a changé). """
return (l_keys[-1] == 'Signal actif') or (b_actif and l_keys[-1] in l_param_names)
def get_state(self):
""" Chaque node, selon ses spécificités, retourne un booléen de son état on/off (True/False). """
b_on = False
if self.b_chk:
for key in self.o_params.od_params.key_list():
if key[-1] == 'Signal actif':
if self.o_params.od_params.read(key, False):
b_on = True
break
return b_on
def recalculate(self, num_input):
""" - Propagation vers l'aval. Les nodes finaux sont des afficheurs ou des actionneurs.
- Après cela, les nodes finaux construisent leur dict de calcul par propagation vers l'amont. """
for o_socket_out in self.lo_sockets_out:
o_socket_out.recalculate()
def update_dcalc(self, d_calc):
if self.b_on:
""" Propagation amont. """
for o_socket_in in self.lo_sockets_in:
o_socket_in.update_dcalc(d_calc)
od_params = Dictionary(self.o_params.od_params[self.main_key])
""" Construction du dictionnaire pour les calculs, à partir du od_params. """
od_calc = Dictionary()
path = 'nodes' + os.path.relpath(self.child_file).split('nodes')[1][:-3].replace(os.sep, '.')
od_calc.write('Compute', self.type)
od_calc.write('Yaml file', self.get_yaml_files()[0])
od_calc.write('Compile from', path)
od_calc.write('Checked', self.b_chk)
for key in od_params.key_list():
if key[0] == 'Titre du node' or key[0] == 'Couleurs' or key[0] == '*🔆':
continue
if key[-1] == '$Provenance du signal :':
continue
od_calc.write(key, od_params.read(key))
""" Modification 'en place'. """
d_calc[self.s_id] = dict(od_calc)
def get_yaml_files(self):
py_file = os.path.basename(self.child_file)
seeder = self.child_file.replace(py_file, f'{py_file[:-3]}_seeder.yaml')
path_node = f"{__file__.split('pc')[0]}backups/{self.o_scene.graph_name}/node{self.id}"
extended_params = os.path.abspath(f'{path_node}/{self.id}-{py_file[:-3]}.yaml')
return extended_params, seeder
def yaml(self):
""" Méthode appelée lorsqu'on clique sur le bouton '🔆'. Lancement de l'appli associée à '.yaml'. """
extended_params, seeder_file = self.get_yaml_files()
path_node = os.path.dirname(extended_params)
os.makedirs(path_node, exist_ok=True)
if not os.path.exists(extended_params):
""" Création du fichier de paramètress étendus en yaml : copie du seeder si possible, sinon vide. """
if os.path.exists(seeder_file):
shutil.copyfile(seeder_file, extended_params)
else:
with open(extended_params, 'w'):
pass
if not os.path.exists(extended_params):
return
""" Lancement du fichier avec application associée. """
os.startfile(extended_params)
def rebuild(self, b_input=True):
""" Reconstruction du node suite à un changement dynamique du nombre d'entrées/sorties. """
""" Suppression des (sockets + edges) de ce node. """
lo_sockets = self.lo_sockets_in if b_input else self.lo_sockets_out
for o_socket in lo_sockets:
""" Suppression des éventuels edges connectés à ce socket. """
for o_gredge in list(o_socket.get_gredges()):
self.o_scene.o_grscene.removeItem(o_gredge)
""" Suppression du socket. """
self.o_scene.o_grscene.removeItem(o_socket.o_grsocket)
o_socket.o_grsocket = None # Suppression de l'objet.
""" Nettoyage de la liste des entrées/sorties. """
lo_sockets.clear()
""" Redessine le node avec sa nouvelle hauteur. """
ld_inputs, ld_outputs = self.deepcopy(self.ld_inputs), self.deepcopy(self.ld_outputs)
nb_inputs, nb_outputs = len(ld_inputs), len(ld_outputs)
h_title = self.d_display['geometry']['h_title']
first_y = 16 * (1 + (h_title + 4) // 16)
self.height = first_y + max(nb_inputs, nb_outputs) * 16 - 4 # Nouvelle hauteur.
self.o_grnode.set_outline()
""" Régénération de sockets. """
for pos, d_input in enumerate(ld_inputs):
""" On complète les dictionnaires de base. Pas de séparateurs. """
if isinstance(d_input, dict):
indx = len(self.lo_sockets_in)
d_input['pos'] = True, pos # On ajoute la pos. Tuple (input: True, num_position).
d_input['id'] = self.id, indx # On ajoute l'identifiant. Tuple (id_node, num_socket)
self.lo_sockets_in.append(CtrlSocket(self, d_input)) # Socket instancié et ajouté à la liste.
for pos, d_output in enumerate(ld_outputs):
""" On complète les dictionnaires de base. Pas de séparateurs. """
if isinstance(d_output, dict):
indx = len(self.lo_sockets_out)
d_output['pos'] = False, pos # On ajoute la pos. Tuple (output: False, num_position).
d_output['id'] = self.id, indx # On ajoute l'identifiant. Tuple (id_node, num_socket)
self.lo_sockets_out.append(CtrlSocket(self, d_output)) # Socket instancié et ajouté à la liste.
""" Régénération des edges. """
self.o_scene.show_edges()
self.o_scene.save_edges()
""" Dockable des paramètres : le nombre d'entrées a changé. """
self.o_params.set_params()
self.dt.delay(self.o_scene.show_params, 2000) # delay : Permet de continuer la saisie dans le dockable.
def refresh(self, l_keys):
if l_keys[-1] == '🔆':
self.yaml()
elif l_keys[-1] == 'Titre du node':
""" Titre du node. """
self.o_grnode.set_title()
elif l_keys[-2] == 'Couleurs':
""" Couleur des sockets et des edges. """
self.set_colors(l_keys[1:])
else:
""" Fin de refresh - Appel conditionnel à infrastructure(). """
if self.b_chk and (l_keys[0] == 'infrastructure' or l_keys[-1] == 'Signal actif'):
""" Les nodes de type générateur doivent insérer le mot 'infrastructure'
en 1ère place dans la liste l_keys[], car ils n'ont pas 'Signal actif' dans leurs paramètres. """
self.o_scene.infrastructure()
""" Tous les paramètres, autres que le titre et les couleurs, influencent les calculs. """
self.o_scene.recalculate()
class CtrlCalcul(Helper):
""" Attributs de classe. """
d_servers = dict() # Dictionnaire de classe.
od_calcs = Dictionary()
def __init__(self):
self.id = None
self.s_id = None
self.compute = None
self.o_yaml = None
self.l_mykey = list()
self.np_array = None
self.b_forcing_calc = False
self.od_descr = Dictionary()
self.od_mydic = Dictionary()
self.len_buffer = 2_000_000 # 300_000
self.no_params = ['Compute', 'Yaml file', 'Compile from', 'Checked']
def setup_server(self, id_server):
self.id = id_server
self.l_mykey = self.get_keys_from_idnode(id_server) # Clé dans self.od_calcs.
self.od_mydic = Dictionary(self.od_calcs.read(self.l_mykey))
self.o_yaml.yaml_file = self.od_mydic.read(['Yaml file'])
self.compute = self.od_mydic.read(['Compute'])
self.s_id = f"{self.compute}{self.id}"
self.o_yaml.update_odyaml() # Update du super-dictionnaire des paramètres (self.o_yaml.od_yaml).
@property
def dt_servers_in(self):
""" - Le dictionnaire retourné contient autant de clés que d'entrées connectées et actives.
- Chaque valeur est un tuple : (o_server, signature, edge). """
""" 1 - Récupération de la liste de tous les edges dans self.od_calcs. """
s_edges = set()
ll_keys = self.od_calcs.key_list()
for l_keys in ll_keys:
if l_keys[-1] == 'edges':
s_edges.update(self.od_calcs.read(l_keys))
""" 2 - Instance et signature des nodes-serveurs en amont connectés aux entrées. """
d_servers = dict()
for edge in list(s_edges):
if edge[1][0] == self.id:
""" edge = Edge connecté à l'entrée de self. """
o_server, signature = self.get_server(edge[0][0])
d_servers[f'e{edge[1][1]}'] = o_server, signature, edge
return d_servers
@classmethod
def get_oserver(cls, id_server, compile_from):
""" Ce code est appelé par un node-client. Il aura un serveur par entrée.
- Chaque node-serveur est associé à une instance de classe Calcul (dérivée de CtrlCalcul).
- Au premier passage concernant ce node-serveur, l'objet o_server n'existe pas.
- Dans ce cas, on le crée par compilation dynamique et on l'enregistre dans un dictionnaire.
- Le dictionnaire est un dictionnaire de classe : cls.d_servers.
- Cela permet d'éviter de créer une nouvelle instance si celle-ci existe déjà.
"""
server_sid = f"{compile_from.split('.')[-1]}{id_server}" # Clé du dictionnaire des serveurs. ex : macd3
if server_sid not in cls.d_servers:
""" Compilation dynamique. Si l'objet o_server n'existe pas, on le crée."""
d_context = {}
code = f"from {compile_from} import Calcul; o_server=Calcul()"
try:
compiled = compile(code, '<string>', 'exec')
eval(compiled, d_context)
cls.d_servers[server_sid] = d_context['o_server']
except (Exception,) as err:
print(server_sid, ': Erreur de compilation dans', compile_from, '\n', err)
return None
return cls.d_servers[server_sid] # Objet o_server.
def get_keys_from_idnode(self, id_node):
"""
@param id_node: ex : 1
@return: Renvoie la clé d'accès au node 'node_sid', dans le dictionnaire général self.od_calcs.
"""
node_sid = f'Node{id_node}' # Ex : 'Node1'
for l_key in self.od_calcs.key_list():
if l_key[1] == node_sid:
return l_key[:2] # ['calc_*', 'Node*'] Clé du node-serveur dans le dictionnaire général self.od_calcs.
def get_server(self, id_server):
l_key_node = self.get_keys_from_idnode(id_server)
compile_from = self.od_calcs.read(l_key_node + ['Compile from'])
if compile_from is None:
return None
child = self.od_calcs.read(l_key_node + ['Expérience'], '')
compile_from += f'0{child}'[-2:] if isinstance(child, int) else ''
o_server = self.get_oserver(id_server, compile_from)
if o_server is None:
return None, None
o_server.setup_server(id_server)
return o_server, o_server.get_signature()
def get_signature(self):
""" La valeur de retour est un hash de od_yaml, od_mydic, liste des signatures des nodes-serveurs. """
l_signs_in = list()
for server_in in self.dt_servers_in.values():
""" server_in = tuple (instance, signature) du node-serveur. """
l_signs_in.append(server_in[1]) # server_in[1] = signature.
""" Assemblage des constituants de la signature. """
params = (self.o_yaml.od_yaml, self.od_mydic, l_signs_in)
return f'{self.id}-{hashlib.sha256(str(params).encode("utf-8")).hexdigest()}'
def calculation(self):
""" Ici node-serveur. Première méthode appelée par le node-client en aval, avant de commencer ses calculs. """
""" 1 - Chargement des derniers datas mémorisés en *.calc (pickle) : self.od_descr et self.np_array. """
self.load_last_datas() # Super-dictionnaire et tableau numpy vides si *.calc inexistant ou erroné.
""" 2 - Création du dict. de description actualisé od_descr, aux fins de comparaison avec self.od_descr. """
od_descr = self.setup_od_descr()
""" 3 - Comparaison sélective des descriptifs now et ante. Si aucune différence, return (calculs inutiles). """
if self.now_equal_ante(od_descr):
return
self.od_descr = od_descr
""" 4 - Propagation des calculs en amont. """
for o_server, _, edge in self.dt_servers_in.values():
""" Bien que ce soit cette même méthode, ce n'est pas une récursivité, mais une propagation amont."""
o_server.calculation()
""" 5 - self.np_array vide. """
nb_cols = len([key for key in list(self.od_descr.keys()) if key.startswith(f'{self.s_id}-')])
self.np_array = np.full((self.len_buffer, nb_cols+1), fill_value=np.nan, dtype=np.float32) # nan
self.np_array[:, 0] = 0 # col0 = status (0 ou 1)
""" 6 - Traitement des calculs. """
self.pre_process()
""" 7 - Persistance en *.calc (pickle). """
self.create_out_calc()
def load_last_datas(self):
""" Ici, node-serveur. Récupération des derniers datas depuis son fichier out_*.calc. Les datas sont :
- Un dictionnaire descriptif des consignes : 'self.od_descr'.
- Un tableau numpy : 'self.np_array'.
- Pour certains nodes, les datas ne sont pas dans out_*.cal, mais dans une base de données.
- Le cas échéant, cette méthode sera surchargée. """
server_root = os.path.dirname(self.o_yaml.yaml_file)
os.makedirs(server_root, exist_ok=True)
out_file = os.path.normpath(f'{server_root}/out.calc')
try:
with open(out_file, 'rb') as calc:
self.od_descr, self.np_array = pickle.load(calc)
if not self.od_descr or self.np_array.size == 0 or self.np_array.shape[0] != self.len_buffer:
raise() # On force le passage par except.
except (Exception,):
""" Initialisation. """
self.od_descr, self.np_array = self.new_od(), np.empty((self.len_buffer, 0))
if not self.dt_servers_in:
self.od_mydic.write(['self_server', 'Signal actif'], True)
def setup_od_descr(self):
""" Création du super-dictionnaire descriptif des calculs : od_descr.
- Une clé pour chaque signal, actif ou pas, ayant exactement le même libellé que celui
qui est affiché dans le dockable du node-client. Exemple pour un node 'Signaux' : 'Signaux1-Sinus'
- La valeur de cette clé est un dictionnaire ayant pour clés obligatoires :
- Diverses valeurs, spécifiques au signal, nécessaires aux calculs.
- num_col : Entier. Défini au dernier passage, sinon -1, destinée aux nodes-clients.
- actif : Booléen.
- notes : Aucune utilité en programmation, à l'attention du développeur.
- A la racine du dictionnaire, outre les clés de signaux, il y a :
- signature : signature sha256. Contient un assemblage des paramètres + signatures des nodes-serveurs.
Cette signature-composite nous permet de savoir si les signaux entrants ont changé.
- roots : liste de tuples, autant que de signaux en entrée : (name_input, key_dock)
Ex : name_input = 'e0', 'e1', ... key_dock = ADD4-Poisson-Signal
- Diverses valeurs globales, spécifiques au node, nécessaires aux calculs.
- Valeur de retour : od_descr.
"""
od_descr = Dictionary()
""" Signature = ma signature + signatures des nodes-serveurs. """
od_descr['signature'] = self.get_signature()
od_descr['roots'] = list()
num_col = 0
for l_keys in self.od_mydic.key_list():
if l_keys[-1] != 'Signal actif':
continue
key = l_keys[:-1]
val = self.od_mydic.read(key)
name_input = 'e' + l_keys[0][-1] if l_keys[0].startswith('Entrée ') else 'e0'
root_key = self.join(*[self.s_id] + key[-1].split('-')[1:])
if val.get('Signal actif', False):
od_descr['roots'].append((name_input, root_key))
od_descr['name_input'] = name_input
self.descr_signal(od_descr, val, root_key)
del od_descr['name_input']
for key_dock in od_descr.keys():
if key_dock.startswith(root_key):
if od_descr.read([key_dock, 'num_col']) is None:
num_col += 1
od_descr.write([key_dock, 'num_col'], num_col)
if key_dock.endswith('-Original'):
try:
od_descr.write([key_dock, 'actif'], val['Original'])
except (Exception,):
pass
elif od_descr.read([key_dock, 'actif']) is None:
od_descr.write([key_dock, 'actif'], val['Signal actif'])
return od_descr
def now_equal_ante(self, od_descr):
l_no_compare = ['actif']
for key_dock in od_descr.key_list():
if key_dock[-1] in l_no_compare:
continue
if key_dock[-1] == 'signature' and od_descr.read(key_dock) != self.od_descr.read(key_dock):
return False
if od_descr.read(key_dock) != self.od_descr.read(key_dock):
return False
else:
for o_server, _, _ in self.dt_servers_in.values():
if o_server.np_array is None:
return False
else:
return True
def descr_signal(self, od_descr, val, root_key):
""" Méthode OBLIGATOIREMENT surchargée par les classes dérivées. """
""" - Création du super-dictionnaire de description 'od_descr'.
- Les clés de 'od_descr' devront contenir les arguments nécessaires aux calculs.
- Les calculs seront effectués par pre_process(), ou, en mode 'générateur', par get_matrix().
- {key_dock} doit avoir EXACTEMENT la même orthographe que
la clé affichée dans le dockable des paramètres du node CLIENT
- Sources EXCLUSIVES : 'self.od_mydic' et 'self.o_yaml'.
- Dynamique - Cette méthode est appelée en boucle par la classe-mère :
- Une fois par signal aux entrées (actif ou pas).
- La construction du super-dictionnaire se fait donc par couches successives.
- Cela ne signifie pas qu'il y a autant de signaux en sortie qu'en entrée !
- Ces 2 nombres sont INDÉPENDANTS !
- En effet, plusieurs signaux d'entrée peuvent contribuer pour un seul signal de sortie ...
- ... et/ou, inversement, un signal d'entrée peut produire plusieurs signaux de sortie.
************************* IMPORTANT *************************
- Le coding se fera par itérations successives :
- Commencer à coder les calculs (pre_process() ou calculate()), qui parcourra ce dictionnaire 'od_descr'
- Si les calculs nécessitent une information absente dans 'od_descr', on revient ici pour l'ajouter.
- On prolonge le coding des calculs, puis on boucle ici ...
- ... jusqu'à la fin.
"""
raise SystemExit(f"{self.s_id} : ctrl_node.py > CtrlCalcul."
"\nLa méthode descr_signal() doit obligatoirement être surchargée.")
def pre_process(self):
""" Méthode surchargée par les classes 'générateur' dérivées : Signaux, Aléatoire, Histos, ... """
""" Cette méthode peut également être surchargée par d'autres classes dérivées, mais attention :
- La classe dérivée doit utiliser EXCLUSIVEMENT la méthode pre_process() OU la méthode get_matrix() :
- pre_process() : Le tableau numpy va être affecté à 100% ...
|_ ... ce qui risque d'être chronophage pour certains calculs.
- get_matrix() : Juste une toute petite partie du tableau numpy va être remplie en live, ...
|_ ... selon les besoins des nodes-clients, grâce à l'utilisation de générateurs python.
|_ Dans ce cas, la réactivité sera excellente, même si les calculs sont complexes.
Méthode pre_process() :
===================
- La récupération des paramètres se fait exclusivement depuis le super-dictionnaire self.od_descr.
- Le tableau numpy existe avec la bonne shape, mais est entièrement vide :
- Autant de colonnes que de signaux, plus une : la colonne de status, en position 0.
- Les valeurs sont des np.nan ...
- ... sauf la colonne 0 qui est remplie de '0'.
- Les valeurs sont produites :
- Par calculs, qui utilisent des fonctions "prêtes à l'emploi", du module 'indicators'.
- Par lecture d'une base de données.
"""
pass
def get_matrix(self, pointer, nb_lines=0):
"""
@param pointer: Pointeur "Depuis", si pointer < 0 , la totalité du tableau est retournée.
@param nb_lines: "Jusqu'a" = pointer + nb_lignes.
- Pour certains nodes, les datas ne sont pas dans 'self.np_array', mais en base de données.
- Le cas échéant, cette méthode sera surchargée.
- Si le node n'est pas de type 'générateur', cette méthode est également surchargée.
- Le traitement est court, ponctuel et partiel.
- A l'issue du traitement, une toute petite partie de tableau numpy 'self.np_array' a été mise à jour.
- La colonne 0 contient des flags (0 ou 1), levés lorsque les lignes correspondantes ont été traitées.
- Le retour est une matrice ayant le même nombre de colonnes que np_array et {nb_lines} lignes.
- Si les données demandées sont dans self.np_array, on les transmet directement.
- Sinon, on les génère, on les place dans self.np_array, puis on les transmet.
- Les paramètres sont à récupérer EXCLUSIVEMENT dans self.od_descr.
- Les données d'entrée sont demandées aux nodes-serveurs des entrées.
"""
_from, _to = pointer, pointer+nb_lines
""" Calculer si 'forcing' ou si le status = 0 (si au moins une valeur de la colonne 0 est à 0). """
if self.b_forcing_calc or np.prod(self.np_array[_from: _to, 0]) == 0:
""" Update de self.np_array. Début de transaction. """
self.process(pointer, nb_lines) # Traitement de tous les signaux aux entrées.
""" flag = 1 (fait) <--- Status. Fin de transaction. """
self.np_array[_from: _to, 0] = 1 # <-- Écriture du status = 1 (en colonne 0).
return self.np_array if pointer < 0 else self.np_array[_from: _to] # Tout ou slice.
def process(self, pointer, nb_lines):
od_buffer = self.new_od()
for name_input, root_key in self.od_descr.get('roots'):
o_server = self.dt_servers_in[name_input][0] # [0] = server, [1] = signature, [2] = edge
""" On demande au serveur une matrice un peu plus grande que nécessaire,
pour permettre l'amorce de certains calculs. On insère {previous_values} valeurs au début. """
previous_values = min(pointer, int(self.od_descr.read(['max_length', root_key], 0)))
np_matrix = o_server.get_matrix(pointer - previous_values, nb_lines + previous_values)
num_col_server = o_server.get_num_col(root_key)
vector_in = np_matrix[:, num_col_server]
od_buffer['name_input'] = name_input
self.calculate(pointer, nb_lines, root_key, vector_in, od_buffer)
del od_buffer['name_input']
if od_buffer:
""" Si dictionnaire non vide, modifié par calculate(), on termine le traitement. """
self.post_process(pointer, nb_lines, od_buffer)
def calculate(self, pointer, nb_lines, root_key, vector_in, od_buffer):
""" Pas de contrôle pour les nodes sans entrées (type générateur, ...) """
if self.dt_servers_in:
raise SystemExit(f"{self.s_id} : ctrl_node.py > CtrlCalcul."
"\nLa méthode calculate() doit obligatoirement être surchargée.")
def post_process(self, pointer, nb_lines, od_buffer):
""" - Passage unique, après process() afin de finaliser certains calculs ...
... incluant d'autres signaux que le signal en cours.
- Pré-traitement spécifique. """
pass
def create_out_calc(self):
""" Création du fichier-résultat out_*.calc = (od_descr, liste de tableaux numpy). """
""" 1 - Contrôle - Le tableau nd_array doit avoir une shape à 2 dimensions. """
""" Correction nécessaire si le tableau est un vecteur. """
if len(self.np_array.shape) == 1: # <-- C'est un vecteur, on reshape.
self.np_array = self.np_array.reshape(self.np_array.shape[0], 1)
""" 2 - Persistance. """
server_root = os.path.dirname(self.od_mydic['Yaml file'])
out_file = os.path.normpath(f'{server_root}/out.calc')
try:
with open(out_file, 'wb') as calc:
pickle.dump((self.od_descr, self.np_array), calc, pickle.HIGHEST_PROTOCOL)
except (Exception,) as err:
print(err)
return False
return True
def add_vector(self, key_dock, num_col, vector):
if vector is None:
return
vector = vector.reshape(vector.shape[0], 1)
add_nan = np.full((self.len_buffer - vector.shape[0], 1), np.nan)
vector = np.vstack((add_nan, vector))
if num_col is None or num_col < 0:
""" Nouvelle colonne d'indice num_col (égal au nombre actuel de colonnes). """
num_col = self.np_array.shape[1]
self.np_array = np.hstack((self.np_array, vector))
else:
""" Surcharge de la colonne num_col. """
self.np_array[:, num_col:num_col + 1] = vector
self.od_descr.write([key_dock, 'num_col'], num_col)
def get_num_col(self, client_key): # client_key = MM4-Normal-Original
sliced_key = self.join(*client_key.split('-')[1:]) # sliced_key = Normal-Original (Début retiré).
if sliced_key != '':
for l_key in self.od_descr.key_list(): # [Aléatoire1-Normal, 'num_col']
if l_key[-1] == 'num_col' and l_key[-2].endswith(sliced_key):
return self.od_descr.read(l_key)
def get_vector_in(self, client_key): # client_key = MM4-Normal-Original
return self.np_array[:, self.get_num_col(client_key)]
show_matplotlib.py
:UI
d'affichage seront disponibles à l'issue de cette formation : Matplotlib, pyqtgraph, bokeh, ...UI Matplotlib
est codée, dans le fichier show_matpltlib.py
.fichier complet /show/show_matpltlib.py
:
# Imports externes. *******************************************
from PyQt5.QtCore import QTimer
import psutil
import pickle
import numpy as np
import sys
import os
from matplotlib import pyplot as plt, rcParams
from matplotlib.widgets import Slider
# sys.path.append(os.path.dirname(sys.path[0])) # Pour MAP depuis le terminal.
# Imports internes. *******************************************
from functions.utils import Utils, Dictionary, DateTime
from nodes.afficheurs.plots.matplotlib_yaml import YamlParams
from nodes.afficheurs.plots.plots import Calcul
# noinspection PyUnresolvedReferences
class ShowMatPlotLib(Calcul):
def __init__(self):
super().__init__()
""" Ini: récupération des arguments passés en ligne de commande. """
self.graph_name = sys.argv[1]
self.id = int(sys.argv[2])
self.parent_pid = int(sys.argv[3])
self.final_path = ''
self.ut = None
""" Watch. """
self.l_files_ante = list()
""" Matplotlib. """
self.fig = None
self.l_ax = [None for _ in range(8)] # Liste de 8 valeurs (les axes, ou graphiques : 1 par entrée)
self.b_busy = False
self.b_paused = False
self.b_reverse = False
""" Graphiques. """
self.l_inputs = list()
self.width_x = None
self.pointer = None
""" Timers. """
self.dt = DateTime()
self.listen = QTimer()
self.anim = QTimer()
""" Setup. """
self.setup()
def setup(self):
""" 1 - Chemin complet du node final 'Plots'. Ce chemin contient les modèles de calculs 'calc_*.pkl. """
bk_path = os.path.dirname(__file__).replace('show', 'backups') # Dossier de backup.
self.final_path = os.path.normpath(os.path.join(bk_path, self.graph_name, f'node{self.id}')) # 'Plots'
self.ut = Utils(path=self.final_path, file='matplotlib')
""" 2 - Aide pour la mise au point : styles et paramètres diponibles dans matplotlib. Décommenter pour voir. """
# [print(style) for style in plt.style.available] # Liste des styles disponibles.
# Dictionary(rcParams).print() # Liste des paramètres disponibles.
""" 3 - Fichier de configuration, éditable manuellement en yaml. """
self.o_yaml = YamlParams(self)
self.o_yaml.yaml_file = os.path.normpath(f'{self.final_path}/{self.id}-matplotlib.yaml')
self.o_yaml.update_odyaml()
self.o_yaml.set_rcparams(rcParams)
plt.style.use(self.o_yaml.get_style())
""" 4 - Vitesse d'animation (Modifiable : touches + et -), pause, reverse, pas à pas. """
self.anim.indx = 5 # <-- Vitesse moyenne (de 0 à 11)
speed = self.speed(self.anim.indx)
self.anim.delay = speed[0]
self.anim.step = speed[1]
""" 5 - Initialisation de matplotlib.pyplot. """
self.fig = plt.figure()
self.mgr.set_window_title('**ini**')
""" Sliders. https://matplotlib.org/stable/api/widgets_api.html#matplotlib.widgets.Slider """
slide_width = plt.axes([.12, .06, .77, .02]) # Taille et position du slider 'Largeur'.
slide_posit = plt.axes([.12, .02, .77, .02]) # Taille et position du slider 'Position'.
slide_width.semilogx()
self.width_x = Slider(slide_width, 'Largeur', 10, 10_000, 316) # 316 10**1=10, 10**2.5=316 (au milieu), 10**4=10_000
self.pointer = Slider(slide_posit, 'Position', -1, self.len_buffer-self.width_x.val-1, 0, valfmt='%0.0f')
self.pointer.set_val(-1)
""" 6 - Restauration de l'état de la fenêtre. """
self.ut.restore_state(self.mgr.window)
""" 7 - Surveillance des fichiers '.pkl'. """
self.ini_watch()
""" 8 - Événements. """
self.listen.timeout.connect(self.listener)
self.anim.timeout.connect(self.show_anim)
self.fig.canvas.mpl_connect('key_press_event', self.key_event)
self.pointer.on_changed(self.show_anim)
self.width_x.on_changed(self.set_width_x)
""" 9 - Lancement des timers. """
self.listen.start(1000)
self.anim.start(self.anim.delay)
""" 10 - Boucle d'exécution Matplotlib. """
plt.show()
@property
def mgr(self):
return plt.get_current_fig_manager()
def listener(self):
""" 1 - Si le poste de contrôle est fermé => fin programme, sauf en mode autonome. """
if self.parent_pid > 0 and not psutil.pid_exists(self.parent_pid):
exit()
""" 2 - Persistance géométrie. """
self.ut.save_state(self.mgr.window)
def set_width_x(self, val):
self.pointer.valmax = self.len_buffer-round(val)-1
self.show_anim(1)
def show_anim(self, pos=0):
""" Anti ré-entrance. """
if self.b_busy or (pos > 0 and not self.b_paused):
return
self.b_busy = True
""" Pointeur commun. """
self.pointer.set_val(round(self.pointer.val + (-self.anim.step if self.b_reverse else self.anim.step)))
if self.pointer.val > self.len_buffer - round(self.width_x.val): # Bouclage provisoire.
self.pointer.set_val(0)
if self.pointer.val < 0: # Bouclage provisoire.
self.pointer.set_val(self.len_buffer - round(self.width_x.val))
""" Scrolling des entrées connectées et actives du node 'Plots' : subplots ou graphiques. """
for ax in self.l_ax:
if ax is None or not hasattr(ax, 'o_server') or ax.o_server is None:
continue
""" Nombre de points en abscisse.
Si non précisé dans le yaml, ce subplot utilisera la largeur commune, fixée par le slider 'Largeur'. """
width_x = ax.od_self_yaml.get('Largeur') # Prioritaire sur le slider. Valeur par défaut = None.
if width_x is None:
width_x = round(self.width_x.val) # aleur du slider.
""" Avancement individuel du pointeur. """
if pos == 0 and len(ax.lines) > 0: # Immobile si aucune courbe n'est affichée.
ax.pointer += -self.anim.step if self.b_reverse else self.anim.step
if ax.pointer > self.len_buffer - width_x: # Bouclage provisoire.
ax.pointer = 0
if ax.pointer < 0: # Bouclage provisoire.
ax.pointer = self.len_buffer - width_x
""" pointer = Position du 1er point à gauche dans le graphique.
Si ShareX dans le yaml (True par défaut), ce subplot utilisera le slider 'Position'. """
pointer = self.pointer.val if ax.od_self_yaml.get('ShareX', True) else ax.pointer
""" Récupération des données sous forme de matrices numpy. """
x = np.arange(pointer, pointer + width_x)
y = ax.o_server.get_matrix(pointer, width_x)
if y is None:
continue
""" Boucle sur les courbes. """
y_min, y_max = np.inf, -np.inf
for o_line in ax.lines:
num_col = int(o_line.od.get('num_col'))
if not np.isnan(y[:, num_col]).all():
y_min = min(y_min, np.nanmin(y[:, num_col]))
y_max = max(y_max, np.nanmax(y[:, num_col]))
""" Dessin d'une courbe. Attribution des valeurs. """
try:
o_line.set_xdata(x)
o_line.set_ydata(y[:, num_col])
except (Exception,):
pass
""" Limites x et y (abscisses et ordonnées). """
ax.set_xlim(pointer, pointer + width_x - 1) # Abscisses.
if y_min != np.inf:
y_padding = max(1, (y_max - y_min) * 0.05) # Marges top et bottom du graphique (5%)
y_min, y_max = y_min - y_padding, y_max + y_padding
ax.set_ylim(y_min, y_max) # Ordonnées.
""" Coloriage inter-courbes. """
self.o_yaml.between(ax, x, y)
""" Fin. """
plt.draw()
self.b_busy = False
def ini_watch(self):
""" Liste des éléments à surveiller : dossier {graphe} + dossiers {nodes} + fichiers .pkl + fichiers .yaml
- Le dossier est self.final_path, dans les backups.
- Les fichiers calc_*.pkl : un par entrée active de ce node d'affichage.
- Les fichiers .yaml : valeurs des clés 'Yaml file' dans les .pkl """
""" 1 - Dossier du graphe dans les backups :dossier-parent de tous les nodes du graphe. """
graph_path = os.path.normpath(f"{os.path.dirname(__file__).replace('show', 'backups')}/{self.graph_name}")
""" 2 - Liste des dossiers des nodes du graphe (qui contiennent des fichiers nécessaires aux calculs) """
l_folders = [graph_path] + [f'{graph_path}{os.sep}{node}' for node in os.listdir(graph_path) if
str(node).startswith('node')]
""" 3 - Liste des modèles yaml. """
l_models = list()
nodes_dir = os.path.dirname(__file__).replace('show', 'nodes')
for (folder, subfolder, files) in os.walk(nodes_dir):
if folder.endswith('models'):
for file in files:
f_include = os.path.join(folder, file)
l_models.append(f_include)
""" 4 - Liste des fichiers calc_*.pkl physiquement présents dans le dossier self.final_path """
l_watched_files = [os.path.normpath(f'{self.final_path}/{calc}') for calc in
os.listdir(self.final_path) if calc.startswith('calc_')]
""" 5 - Liste des fichiers .yaml : chaque od_calc (extrait du pkl) contient plusieurs clés 'Yaml file'.
On en fait une liste sans doublons. """
s_yaml = set() # set() au lieu de list() pour s'affranchir des doublons.
for path, folder, l_files in os.walk(graph_path):
[s_yaml.add(os.path.normpath(f'{path}/{file}')) for file in l_files if os.path.splitext(file)[1] == '.yaml']
""" 6 - Concaténation des listes précédentes. """
l_watched_files += list(s_yaml) + l_models # Concaténation de 3 listes de fichiers.
l_files = l_folders + l_watched_files # Concaténation des listes : dossiers et fichiers.
""" 7 - Vérification des changements dans cette liste, mais non dans le contenu des fichiers. """
if self.l_files_ante != l_files:
self.l_files_ante = self.ut.deepcopy(l_files) # Mémorisation (copie profonde).
self.ut.watch_file(l_files, self.watch) # (Re)lancement de la surveillance.
self.dt.delay(self.watch, 10, l_watched_files)
def watch(self, l_files):
""" l_files : liste contenant les dossiers et fichiers modifiés."""
files_only = [file for file in l_files if os.path.isfile(file)] # Filtrage : que les fichiers.
if len(files_only) == 0:
""" 1 - l_files ne contient que des dossiers : cela signifie que des fichiers ont été ajoutés ou supprimés.
Par conséquent on relance la méthode ini_watch() pour mettre à jour la liste des fichiers à surveiller. """
self.ini_watch()
return
""" 2 - Lecture de tous les calc_*.pkl (un par entrée) et écriture de leur contenu dans self.od_calcs. """
Calcul.od_calcs.clear() # Attribut de la classe CtrlCalcul.
l_calcs = [(f'calc_{i}', os.path.normpath(f'{self.final_path}/calc_{i}.pkl')) for i in range(8)]
for name_input, calc_file in l_calcs:
""" ex : name_input = 'calc_0' """
try:
with open(calc_file, 'rb') as pk:
self.od_calcs.write(name_input, pickle.load(pk))
except (Exception,):
pass
""" 3 - Création ou update des paramètres étendus od_yaml de o_yaml. """
self.o_yaml.update_odyaml()
""" 4 - La méthode principale update_figure() est chargée des mises à jour des calculs et des affichages. """
self.update_figure()
def update_figure(self):
""" Update suite aux modifications .pkl et .yaml """
""" 1 - Titre de la fenêtre. """
local = ' - Mode local' if len(sys.argv) < 6 else '' # Nombre d'arguments passés en ligne de commande.
title_ante = self.mgr.get_window_title()
node_sid = f'Node{self.id}'
b_first = title_ante == '**ini**' # b_first = Premier passage, au lancement de l'appli.
for key in self.od_calcs.keys():
name_input = key
title_now = self.od_calcs.read([name_input, node_sid, 'Fenêtre', 'Titre'], 'Graphiques')
break
else:
title_now = 'Graphiques'
title_now += local
l_inputs = [key for (key, val) in self.od_calcs.items() if val['edges']] # Entrées actives seulement.
if title_now != title_ante:
self.mgr.set_window_title(title_now)
""" 1.1 - Si seul le titre de la fenêtre a changé, inutile de continuer (sauf au 1er passage). """
if l_inputs == self.l_inputs and not b_first:
return True
""" 2 - Paramètres étendus yaml. """
self.o_yaml.set_rcparams(rcParams)
""" 3 - Subplots : création, update, géométrie. Chaque entrée est associée à un subplot.
- Celui-ci est créé ou mis à jour en fonction des changements de grille (nb d'entrées ou yaml). """
if self.o_yaml.grid_modified or l_inputs != self.l_inputs: # Respecter l'ordre de comparaison !
self.l_inputs = self.ut.deepcopy(l_inputs)
d_grids = self.o_yaml.get_dgrids()
for num_input in range(8):
self.update_ax(d_grids, num_input)
""" 4 - Paramètres des subplots (un par entrée active) et de leurs courbes. """
self.od_mydic.clear()
for num_input in range(8): # self.l_inputs = ['calc_0', 'calc_1', ...]
name_input = f'calc_{num_input}'
if name_input not in l_inputs:
self.l_ax[num_input] = None
continue
ax = self.l_ax[num_input] # Copie par référence.
""" 4.1 - Pour cette entrée : recherche de son serveur et de sa signature. """
edge = self.get_edge(name_input) # ex : edge = ((1, 0), (0, 0))
ax.o_server, signature = self.get_server(edge[0][0]) # edge[0][0] = id du node serveur.
ax.num_input = num_input
if ax.o_server is None:
continue # Entrée non connectée ou désactivée.
""" 4.2 - Si nécessaire, calculs et paramètres des courbes. """
if not hasattr(ax, 'signature') or ax.signature != signature:
ax.signature = signature
ax.od_self_yaml = Dictionary(self.o_yaml.od_yaml.read(f'Entrée {num_input}'))
ax.o_server.calculation()
""" 4.3 - Paramètres des courbes. """
self.update_subplot(ax)
""" 5 - Actualisation du buffer d'affichage nécessaire lorsque il est en pause. """
self.show_anim()
""" 6 - Affichage à l'écran du contenu du buffer d'affichage. """
plt.draw()
def get_edge(self, name_input): # ex: name_input = 'calc_1'.
""" Recherche du socket_out emetteur (directement connecté au socket_in 'name_input' de ce node 'Plots'). """
s_edges = self.od_calcs.read([name_input, 'edges'])
for edge in s_edges:
if edge[1][0] == self.id:
return edge
def update_subplot(self, ax):
""" Mise à jour des paramètres de ce subplot (axe), ainsi que de chacune de ses courbes.
@param ax: objet subplot.
@return: NA.
"""
def update_line():
""" Code d'appel : point 2 un peu plus bas.
https://matplotlib.org/stable/api/_as_gen/matplotlib.lines.Line2D.html """
""" 2.1 - Affectation des attributs aux courbes : couleur, épaisseur, style, légende. """
o_line.od = Dictionary(od_lines.read(o_line.name))
if not o_line.od:
return # 'Kalman2-Normal-Original'
num_col = ax.o_server.od_descr.read([o_line.name, 'num_col']) # 'Kalman2-Normal-kalman'
o_line.od.write('Entrée', f'Entrée {ax.num_input}')
o_line.od.write('num_col', num_col)
o_line.set_color(o_line.od.read('Couleur'))
o_line.set_linewidth(o_line.od.read('Épaisseur'))
o_line.set_linestyle(self.get_linestyle(o_line.od.read('Style')))
o_line.set_label(o_line.od.read('Légende'))
self.o_yaml.line_params(o_line) # Paramètres étendus pour cette courbe.
""" 2.2 - Exemple de compilation dynamique au niveau de la courbe 'o_line'. Décommenter si nécessaire. """
# d_context = {'line': o_line}
# self.o_yaml.compile([f'Entrée {num_input}', o_line.name, 'Code'], d_context)
""" 1 - Actualisation de la liste des courbes ax.lines.
- Si 'Plots' a 1 seule entrée, clé de type ['calc_0', 'Node0'], sinon ['calc_0', 'Node0', 'Entrée 0']. """
l_mykey = [f'calc_{ax.num_input}', f'Node{self.id}'] # Clé de od_mydic dans od_calcs.
l_keys = l_mykey + [f'Entrée {ax.num_input}'] # ['calc_0', 'Node0', 'Entrée 0']
od_lines = Dictionary(self.od_calcs.read(l_keys, self.od_calcs.read(l_mykey)))
self.update_lines(ax.num_input, od_lines) # Suppression et ajout de courbes.
""" 2 - Mise à jour de chacune des courbe. """
for o_line in ax.lines: # Parcours des courbes de ce subplot (de cette entrée de node).
update_line()
""" 3 - Exemple de compilation dynamique au niveau du subplot 'num_input'. Décommenter si nécessaire. """
# d_context = {'subplot': ax}
# self.o_yaml.compile([f'Entrée {num_input}', 'Code'], d_context)
""" 4 - Paramètres étendus yaml. """
self.o_yaml.subplot(self.l_ax, ax.num_input)
def update_lines(self, num_input, od_plots):
""" Actualisation de la liste de courbes self.l_ax[num_input].lines[].
On compare les listes de coubes nécessaires et de courbes existantes.
On en déduit celles qu'il faut ajouter, celles qu'il faut supprimer. """
""" 1) Courbes nécessaires : l_needed. """
l_needed = list()
for key, val in od_plots.items():
if isinstance(val, dict) and 'Signal actif' in val and val['Signal actif']:
l_needed.append(key)
""" 2) Courbes existantes : l_exist. """
l_exist = self.line_exist(num_input)
""" 3) Ajout (création) de courbes. """
for needed in l_needed:
if needed not in l_exist:
o_line, = self.l_ax[num_input].plot(np.empty(0)) # [num_input] = Élément 0 de la liste.
o_line.name = needed
""" 4) Suppression de courbes. """
for exist in l_exist:
if exist not in l_needed:
o_line = self.get_line(num_input, exist)
if o_line is not None:
self.l_ax[num_input].lines.remove(o_line)
def line_exist(self, num_input):
""" Appelée par update_lines(). """
l_lines = list()
for line in self.l_ax[num_input].lines:
l_lines.append(line.name)
return l_lines
def get_line(self, num_input, line_name):
""" Appelée par update_lines(). """
for o_line in self.l_ax[num_input].lines:
if o_line.name == line_name:
return o_line
return None
@staticmethod
def get_linestyle(param_style):
if param_style.startswith('___'):
return '-'
elif param_style.startswith('- -'):
return '--'
elif param_style.startswith('. .'):
return ':'
return '-.'
def update_ax(self, d_grids, num_input):
""" On vérifie les 8 inputs, connectés ou pas. """
name_input = f'calc_{num_input}'
ax = self.l_ax[num_input]
if name_input not in self.l_inputs:
""" Vérification des entrées non connectées aussi, afin éviter des affichages indésirables."""
if ax is not None:
""" Ce subplot a existé, il n'existe plus. """
ax.set_visible(False) # Suppression des graphiques (subplots) fantômes.
self.l_ax[num_input] = None
return
""" Le subplot {ax} contient l'attribut 'pointer' qui disparait après cet update.
Par conséquent, on le sauvegarde d'abord. """
if ax is None:
pointer = 0
else:
ax.set_visible(False) # Suppression des graphiques (subplots) fantômes.
pointer = ax.pointer
""" Update du subplot {ax}. add_axes() permet un positionnement absolu, totalement libre. """
ax = self.fig.add_axes(d_grids[name_input]) # Position absolue. x, y, w, h. Valeurs entre 0 et 1.
""" Réintégration de l'attribut 'pointer'. """
ax.pointer = pointer
self.l_ax[num_input] = ax
@staticmethod
def speed(indx):
""" Calcul empirique. """
values = [(3000, 1), (2000, 1), (1200, 1), (640, 1), (480, 1), (320, 1), # (durée en ms, pas de 1) <-- Lents.
(280, 2), (296, 4), (273, 7), (280, 14), (286, 26), (282, 47), (400, 20)] # <-- Rapides.
return values[indx]
def key_event(self, ev):
""" Appui sur une touche du clavier. """
keycode = ev.key
if keycode == ' ':
""" Pause on/off. """
self.b_paused = not self.b_paused
self.anim.stop() if self.b_paused else self.anim.start(int(self.anim.delay))
elif keycode == '+':
""" Accélérer. """
self.anim.stop()
self.anim.indx += 1
self.anim.indx = min(self.anim.indx, 11)
speed = self.speed(self.anim.indx)
self.anim.delay = speed[0]
self.anim.step = speed[1]
self.anim.start(int(self.anim.delay))
elif keycode == '-':
""" Ralentir. """
self.anim.stop()
self.anim.indx -= 1
self.anim.indx = max(self.anim.indx, 0)
speed = self.speed(self.anim.indx)
self.anim.delay = speed[0]
self.anim.step = speed[1]
self.anim.start(int(self.anim.delay))
elif keycode == 'tab':
self.b_reverse = not self.b_reverse
elif keycode == 'right' or keycode == 'left':
self.b_paused = True
self.anim.stop()
step = self.anim.step
self.anim.step = 1 if self.b_reverse == (keycode == 'left') else -1 # Ou exclusif.
self.show_anim()
self.anim.step = step
if __name__ == '__main__':
""" Ajout d'arguments à la ligne de commande permettant un lancement autonome (pour la MAP). """
sys.argv.append('Calculs') # Nom du graphe.
sys.argv.append('0') # Plots id. (La ligne de commande ne doit contenir que du str).
sys.argv.append('-1') # Gestion de la fermeture automatique. Poste de contrôle PID : -1 en local.
mpl = ShowMatPlotLib()
CtrlNode
, le fichier Plots.py
doit être adapté (la méthode my_params()
a dû être modifiée)./nodes/afficheurs/plots/plots.py
:
# Imports externes
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QPushButton
from PyQt5.QtCore import Qt, QTimer
import pickle
import os
import psutil
# Imports externes
from pc.ctrl_node import CtrlNode, CtrlCalcul
d_datas = {
'name': 'Plots',
'icon': 'plots.png',
}
class Node(CtrlNode):
def __init__(self, o_scene, s_id, pos):
super().__init__(o_scene, s_id, pos)
self.type = 'Plots'
self.l_sep_inputs = [2] # Emplacement des séparateurs
self.scripts = dict()
self.graph_pid = None # PID du process affichant le graphique.
self.path_node = ''
self.ld_calc = [None for _ in range(8)] # Liste de 8 None.
self.clock = QTimer()
self.setup()
def setup(self, child_file=__file__):
self.o_grcontent = UiContentPlots(self)
self.clock.timeout.connect(self.detect_graph_closed)
self.scripts = {
'Matplotlib': 'show_matplotlib.py',
'Pyqtgraph': 'show_pyqtgraph.py',
'Plotly': 'show_plotly.py',
'Bokeh': 'show_bokeh.py',
'Cufflinks': 'show_cufflinks.py',
'Dash': 'show_dash.py',
'Guiqwt': 'show_guiqwt.py',
'OpenCV': 'show_opencv.py',
'Seaborn': 'show_seaborn.py',
}
super().setup(child_file)
self.init_calc()
def init_calc(self):
""" Persistance dans un pkl. Un sous-dossier par node. """
self.path_node = os.path.abspath(
f"{os.path.dirname(__file__).split('nodes')[0]}backups/{self.o_scene.graph_name}/node{self.id}")
os.makedirs(self.path_node, exist_ok=True) # Création dossier. Si ce dossier existe, pas d'erreur renvoyée.
""" Affectaion de self.ld_calc[] : Lecture des modèles de calcul mémorisés. """
for num_input in range(len(self.lo_sockets_in)):
fic_pkl = os.path.abspath(f'{self.path_node}/calc_{num_input}.pkl')
if os.path.isfile(fic_pkl):
try:
""" Lecture du fichier pkl. """
with open(fic_pkl, 'rb') as pk: # Pas d'encoding pour les fichiers binaires.
self.ld_calc[num_input] = pickle.load(pk)
except (Exception,):
pass
@property
def ld_inputs(self):
""" Lecture des paramètres pour connaître le nombre d'entrées. """
nb_inputs = self.get_param(["Nombre d'entrées"], 1) # 1 entrée par défaut.
l_inputs = list()
for k in range(nb_inputs):
""" Ajout des entrées. """
label = 'gr' if nb_inputs == 1 else f'gr {k}'
l_inputs.append({'label': label, 'label_pos': (6, -10)})
""" Ajout des séparateurs (on évite un séparateur en fin de liste). """
if k in self.l_sep_inputs and k < nb_inputs - 1:
l_inputs.append('sep')
return l_inputs
def fixed_params(self):
""" Valeurs par défaut. """
return {
"Nombre d'entrées": [1, {'step': 1, 'limits': (1, 8), 'compactHeight': False}],
'Fenêtre': {
'Titre': 'Titre de la fenêtre',
'Interface': ['Matplotlib', {'values': list(self.scripts.keys())}]
}
}
def bundle_params(self):
""" Valeurs par défaut. """
return {
'Titre': 'Titre du graphique',
}
def my_params(self, context):
""" Voir documentation dans la classe-mère : CtrlNode.my_params(). """
palette = ['ff0000', 'aa007f', '55aa00', '0000ff', 'ff5500', 'aa557f', '00aa00', 'ff00ff', '55aaff', '00ff7f']
legend = context[7] if len(context) >= 8 else self.join(context[4], context[5])
indx = context[0] % len(palette) # Ordre du signal dans le faisceau entrant.
return {
'Légende': legend,
'Couleur': '#' + palette[indx],
'Épaisseur': [1., {'step': .1, 'limits': (.1, 7.), 'compactHeight': False}],
'Style': ['__________', {
'values': ['__________', '- - - - - - - - -', '. . . . . . . . .', '_ . _ . _ . _ .',
'_ . . _ . . _ . .']}]
}
def get_yaml_files(self):
py_file = os.path.basename(__file__)
seeder_name = f"{self.get_param(['Fenêtre', 'Interface']).lower()}"
seeder = __file__.replace(py_file, seeder_name) + '_seeder.yaml'
path_node = f"{os.path.dirname(__file__).split('nodes')[0]}backups/{self.o_scene.graph_name}/node{self.id}"
extended_params = os.path.abspath(f'{path_node}/{self.id}-{seeder_name}.yaml')
return extended_params, seeder
def recalculate(self, num_input):
d_calc = {'edges': set()}
o_socket_in = self.lo_sockets_in[num_input]
o_socket_in.update_dcalc(d_calc)
od_params = self.new_od(self.o_params.od_params[self.main_key])
od_calc = self.new_od()
od_calc.write('Compute', self.type)
od_calc.write('Yaml file', self.get_yaml_files()[0])
od_calc.write('Checked', self.b_chk)
for key in od_params.key_list():
if key[0] == 'Titre du node' or key[0] == 'Couleurs' or key[0] == '*🔆':
continue
if key[-1] == '$Provenance du signal :' or key[-1] == "Nombre d'entrées":
continue
b_add = True
if key[0].startswith('Entrée'):
t_socket = self.id, int(key[0][7:])
b_add = False
for edge in d_calc['edges']:
if t_socket in edge:
b_add = True
break
if b_add:
od_calc.write(key, od_params.read(key))
d_calc[self.s_id] = dict(od_calc)
self.fix_colors()
""" On n'enregistre le pkl que s'il a été modifié. """
if d_calc != self.ld_calc[num_input]:
fic_pkl = os.path.abspath(f'{self.path_node}/calc_{num_input}.pkl')
try:
with open(fic_pkl, 'wb') as pk:
pickle.dump(d_calc, pk, pickle.HIGHEST_PROTOCOL)
self.ld_calc[num_input] = self.deepcopy(d_calc) # Mémorisation.
except (Exception,):
pass
def fix_colors(self):
""" Dès qu'une couleur par défaut est attribuée, on l'enregistre dans le pkl afin d'éviter des changements
lorsque le N° de courbe change. Exemple : on a [Sinus, Cosinus, Carré] produits par 'Signaux' :
- Carré est à l'indice 2, donc prend par défaut la couleur palette[2].
- On désactive Cosinus, donc Carré passe à l'indice 1 => Couleur par défaut = palette[1].
- Pour éviter ce défaaut visuel, on enregistre la couleur dès son attribution.
- En effet, les paramètres enregistrés dans le pkl sont prioritaires par rapport à ceux par défaut.
"""
b_modified = False
for l_keys in self.o_params.od_params.key_list():
if l_keys[-1] == 'Couleur':
color = self.o_params.od_params.read(l_keys)
if self.od_pkl.read([self.s_id] + l_keys) is None:
self.od_pkl.write([self.s_id] + l_keys, color)
b_modified = True
if b_modified:
self.o_scene.o_pkl.backup()
def refresh(self, l_keys):
if l_keys[-1] == "Nombre d'entrées":
self.rebuild()
""" L'entrée ajoutée ou retirée peut modifier l'infrastructure. """
l_keys = ['infrastructure'] + l_keys
super().refresh(l_keys)
def detect_graph_closed(self):
if self.graph_pid is None or not psutil.pid_exists(self.graph_pid):
self.o_grcontent.button.setText('Voir')
self.clock.stop()
def show_figure(self):
""" Fonctions internes. """
def get_fic(nom_afficheur):
return os.path.abspath(f"{os.path.dirname(__file__).split('nodes')[0]}show/{nom_afficheur}")
def close_figure():
if self.graph_pid is None:
return
for proc in psutil.process_iter():
if proc.pid == self.graph_pid:
proc.kill()
""" Désélection de tous les items de la scène, puis sélection de moi-même. """
for item in self.o_scene.o_grscene.items():
item.setSelected(False)
self.o_grnode.setSelected(True)
ui_fig = self.get_param(['Fenêtre', 'Interface'])
nom_projet = self.o_scene.graph_name
appli = ["pythonw", get_fic(self.scripts[ui_fig]), nom_projet, str(self.id), str(os.getpid())]
if self.graph_pid is None or not psutil.pid_exists(self.graph_pid):
""" Ne peut être lancé qu'une fois. """
self.graph_pid = psutil.Popen(args=appli).pid
self.o_grcontent.button.setText('Fermer')
self.clock.start(3000)
else:
close_figure()
self.o_grcontent.button.setText('Voir')
class UiContentPlots(QWidget):
""" Widgets affichés dans le node (ici, un bouton). L'affichage est assuré par 'UiNode.initContent()'. """
def __init__(self, o_parent):
super().__init__()
self.o_parent = o_parent
""" Le style est dans qss """
self.setGeometry(40, 38, 10, 10)
self.layout = QVBoxLayout()
self.layout.setContentsMargins(0, 0, 0, 0) # g, h, d, b
self.setLayout(self.layout)
""" Bouton : le style est dans qss/nodestyle.qss. """
self.button = QPushButton('Voir', self)
self.button.setCursor(Qt.PointingHandCursor)
self.layout.addWidget(self.button)
self.button.clicked.connect(self.o_parent.show_figure)
class Calcul(CtrlCalcul):
def __init__(self):
super().__init__() # Plots est client => d_map doit exister, même vide.
Calculs
:
Aleatoire
est de type générateur. Ses calculs sont globaux.Calcul
:np_array
est rempli à 100 % dans la méthode pre_process()
./nodes/generateurs/aleatoire/aleatoire.py
:
# Imports internes
import functions.indicators as indic
from nodes.generateurs.aleatoire.aleatoire_yaml import YamlParams
from pc.ctrl_node import CtrlNode, CtrlCalcul
d_datas = {
'name': 'Aléatoire',
'icon': 'aleatoire.png',
}
class Node(CtrlNode):
def __init__(self, o_scene, s_id, pos):
super().__init__(o_scene, s_id, pos)
self.type = 'Aléatoire'
self.setup()
def setup(self, child_file=__file__):
super().setup(child_file)
@property
def ld_outputs(self):
return [{
'label': 'Sortie',
'label_pos': (-38, -10)
}]
def fixed_params(self):
return { # 'Émulation histos'
'Type': ['Normal', {'values': ['Normal', 'Poisson', 'Binomial', 'Logistic']}],
'Émulation histos': True,
'Graine (<0=None)': [-1, {'tip': "Une valeur ⩾ 0 crée une série pseudo-aléatoire,"
"\nUne valeur < 0 crée une série aléatoire.", 'compactHeight': False}],
}
def my_signals(self, l_signals_in, num_socket_out):
typ_id, signal_ante, signal_now, signal_source = f'{self.type}{self.id}', '', self.get_param('Type'), ''
return [(typ_id, signal_ante, signal_now, signal_source)]
def get_state(self):
""" Surcharge. """
return bool(self.b_chk)
def refresh(self, l_keys):
if l_keys[-1] == 'Type':
self.lo_sockets_out[0].to_update()
super().refresh(l_keys)
class Calcul(CtrlCalcul):
""" ********** Le code ci-dessous ne concerne pas le poste de contrôle, mais seulement les calculs. ********** """
def __init__(self):
super().__init__()
self.o_yaml = YamlParams(self)
def descr_signal(self, od_descr, val, root_key):
""" Voir docstring dans la classe-mère. """
typ = self.od_mydic.read('Type')
key_dock = f"{root_key}-{typ}" # Ex : Aléatoire3-e0-Logistic
kwargs = {
'amplitude': self.o_yaml.od_yaml.read(['Normal', 'amplitude'], 10),
'lambda': abs(self.o_yaml.od_yaml.read(['Poisson', 'lambda'], 15)) # Paramètre lambda.
}
od_descr.write([key_dock, 'notes'], 'Signal aléatoire') # Aide au développeur.
od_descr.write([key_dock, 'type'], typ)
od_descr.write([key_dock, 'emul'], self.od_mydic.read('Émulation histos'))
od_descr.write([key_dock, 'seed'], self.od_mydic.read('Graine (<0=None)'))
od_descr.write([key_dock, 'kwargs'], kwargs)
def pre_process(self):
""" Voir docstring dans la classe-mère. """
""" La récupération des paramètres se fait EXCLUSIVEMENT depuis le super-dictionnaire self.od_descr. """
""" Le tableau numpy (self.np_array) est entièrement rempli, en une seule passe. """
for key_dock, val in self.od_descr.items():
if key_dock.startswith(self.s_id+'-'):
typ = val.get('type')
b_emul = val.get('emul')
seed = val.get('seed')
kwargs = val.get('kwargs')
num_col = val.get('num_col')
vector = indic.get_random(self.len_buffer, typ, seed, b_emul, **kwargs).reshape(self.len_buffer, 1)
self.np_array[:, num_col:num_col+1] = vector
break # 'Aléatoire' ne produit qu'un signal.
self.np_array[:, 0] = 1 # Colonne des status.
show_matplotlib.py
(Vérifier dans ses dernières lignes que le nom du graphe et l'id du node correspondent).matplotlib_yaml.py > YamlParams
.matplotlib_yaml.py
:
""" Mise en garde importante :
Cette classe YamlParams est intimement couplée au fichier de paramètres étendus yaml.
Les méthodes ci-dessous, ainsi que les fichiers seeders sont volontairement limités à certains exemples pratiques.
En effet les possibilités sont infinies et leur étude sort du cadre de ce cours.
Il vous appartient de vous en inspirer pour augmenter cette classe en ajoutant vos propres méthodes.
"""
# Imports externes
from matplotlib import pyplot as plt
# Imports internes
from nodes.yaml_parent import YamlParent
class YamlParams(YamlParent):
def __init__(self, o_calc):
super().__init__(o_calc, __file__)
self.grid_ante = []
self.l_geom = list()
def get_dgrids(self):
""" Return un dictionnaire : une clé par entrée active. Pour chacune, définition de sa géométrie à l'écran. """
d_geometry = dict()
default_fig_margins = [0, 0, 14, 0] # [ft, fr, fb, fl] -> [top, right, bottom, left]. <-- en % (0 à 100)
fig_margins = self.od_yaml.read(['Figure', 'marges'], default_fig_margins)
for indx, name_input in enumerate(self.o_calc.l_inputs):
num_input = int(name_input[-1]) # 0 à 7
default_geometry = self.get_default_grid(indx) + default_fig_margins # 12 valeurs par défaut.
size = self.od_yaml.read([f'Entrée {num_input}', 'Géométrie', 'taille'], default_geometry[:4])
margins = self.od_yaml.read([f'Entrée {num_input}', 'Géométrie', 'marges'], default_geometry[4:8])
geometry = size + margins + fig_margins # 12 valeurs réelles.
for i, val in enumerate(geometry):
if not isinstance(val, (int, float)):
geometry[i] = default_geometry[i]
px, py, pw, ph, pt, pr, pb, pl, ft, fr, fb, fl = geometry # p* = Plot margin *, f* = Figure margin *.
fw = 100-fl-fr
fu = fw/100 # fu = Figure Unit = 1% de fw
_pw = pw*fu
x = (fl + px*fu + pl*fu)/100
w = (_pw - (pl+pr)*fu)/100
fh = 100-ft-fb
fu = fh/100 # 1% de fh
_ph = ph*fu
y = (100 - (ft + (py+ph-pb)*fu))/100 # (100 - (0 + 0*1 + (50-0)*.5))/100
h = (_ph - (pt+pb)*fu)/100
d_geometry[name_input] = x, y, w, h
return d_geometry
def get_default_grid(self, indx):
""" Disposition par défaut des subplots : 1 à 8 """
nb_inputs = len(self.o_calc.l_inputs)
if nb_inputs == 0:
return []
xywh, trbl = [], [5, .5, 1.5, 5] # top, right, bottom, left. <-- en % (0 à 100).
if nb_inputs < 4:
h = 100 / nb_inputs
y = indx * h
xywh = [0, y, 100, h] # x, y, width, height. <-- en % (0 à 100).
elif nb_inputs in [4, 6, 8]:
h = 200 / nb_inputs
x = (indx % 2) * 50
y = h * (indx // 2)
xywh = [x, y, 50, h]
elif nb_inputs in [5, 7]:
h = 200 / (nb_inputs + 1)
if indx == 0:
x, y, w = 0, 0, 100
else:
i = indx - 1
x = (i % 2) * 50
y = h + h * (i // 2)
w = 50
xywh = [x, y, w, h]
return xywh + trbl
@property
def grid_modified(self):
""" Grille de disposition des subplots ou géométrie(s) modifiées depuis le fichier yaml.
@return: Bool """
l_geom = [self.od_yaml.read(['Figure', 'marges'])]
for i in range(8):
l_geom.append(self.od_yaml.read([f'Entrée {i}', 'Géométrie']))
b_modified = l_geom != self.l_geom
self.l_geom = l_geom # Mémorisation (pour le prochain passage).
return b_modified
""" Fenêtre. Relancer l'affichage pour la prise en compte des modifications. """
def set_rcparams(self, rc_params):
d_params = self.od_yaml.read(['Fenêtre', 'rcParams'])
if not isinstance(d_params, dict):
d_params = dict()
for key, val in d_params.items():
rc_params[key] = val
def get_style(self):
style = self.od_yaml.read(['Fenêtre', 'style'], 'seaborn')
if style not in plt.style.available:
style = 'seaborn'
return style
""" Subplots. """
def subplot(self, l_ax, num_input):
ax = l_ax[num_input]
self.legend(ax, num_input) # Cartouche de légendes : visibilité, position, nb colonnes, couleurs.
self.axes_xy(ax, num_input) # Axe abscisse et Axe ordonnée : visibilités, polices, positions.
""" Cadre : visibilité. """
b_cadre = self.od_yaml.read([f'Entrée {num_input}', 'Cadre'], True)
ax.set_frame_on(b_cadre)
""" Titre : Texte vient du dockable, visibilité vient du yaml. """
title = self.o_calc.od_mydic.read('Titre', self.o_calc.od_mydic.read([f'Entrée {num_input}', 'Titre']))
title_visible = self.od_yaml.read([f'Entrée {num_input}', 'Titre visible']) # True, False ou None.
title_visible = False if title_visible is False else True
ax.title.set_text(title)
ax.title.set_visible(title_visible)
""" Réticule : liste de 3 valeurs [axis, linestyle, color]. axis = 'x', 'y' ou 'both' """
l_grid = self.od_yaml.read([f'Entrée {num_input}', 'Réticule']) # True, False ou None.
ax.grid(False)
try:
ax.grid(axis=l_grid[0], linestyle=l_grid[1], color=l_grid[2])
except (Exception,):
pass
def between(self, ax, x, y):
""" Coloriage d'une zône entre 2 limites. Chacune des limites peut être :
- Une courbe : Une portion du nom suffit pour identifier le signal.
(Vérifier qu'il n'y ait pas risque de doublon)
- Un nombre (int ou float) constant => Ligne horizontale.
- Une variable existant dans le dictionnaire od_descr du node-serveur.
- Le format d'écriture dans yaml est le suivant : {portion du nom de la courbe}${nom de la variable}
- Exemple pour le RSI : courbes: [ Normal, Normal$low ] <-- Voir rsi.py """
def get_oline(line_name_or_int): # ex : line_name = 'Signaux1-Cosinus'
""" Renvoie l'objet line (courbe) à partir d'une partie du nom affiché dans le dockable des paramètres. """
for o_line in ax.lines:
if isinstance(line_name_or_int, str):
line = line_name_or_int.split('$')
if line[0] in o_line.name:
if len(line) == 1:
return o_line
else:
var = line[1]
value = ax.o_server.od_descr.read([o_line.name, var], False)
return value if isinstance(value, (int, float)) else False
else:
return line_name_or_int if isinstance(line_name_or_int, (int, float)) else False
name_input = f"Entrée {ax.num_input}"
color_between = self.od_yaml.read([name_input, 'color between'])
if not isinstance(color_between, list):
return
""" Nettoyage pour éviter les cumuls. """
try:
if hasattr(ax, 'filled'):
for fill in ax.filled:
ax.collections.remove(fill)
except (Exception,):
pass
ax.filled = list()
for d_between in color_between:
for key, val in d_between.items():
if not isinstance(val, dict):
continue
try:
o_line0 = get_oline(val['courbes'][0])
o_line1 = get_oline(val['courbes'][1])
if o_line0 is False or o_line1 is False:
continue
b_active, num_col0, num_col1 = True, None, None
if hasattr(o_line0, 'od'):
num_col0 = o_line0.od['num_col']
b_active = b_active and o_line0.od['Signal actif']
if hasattr(o_line1, 'od'):
num_col1 = o_line1.od['num_col']
b_active = b_active and o_line1.od['Signal actif']
b_active = b_active and o_line0 is not None and o_line1 is not None
if not b_active:
return
colors = val['couleurs']
y0 = o_line0 if isinstance(o_line0, (int, float)) else y[:, num_col0]
y1 = o_line1 if isinstance(o_line1, (int, float)) else y[:, num_col1]
if len(colors) == 1:
ax.filled.append(ax.fill_between(x, y0, y1, color=colors, alpha=.45))
if len(colors) > 1:
ax.filled.append(ax.fill_between(x, y0, y1, where=(y0 > y1), color=colors[0]))
ax.filled.append(ax.fill_between(x, y0, y1, where=(y1 > y0), color=colors[1]))
except (Exception,):
print(f"Yaml.matplotlib, erreur de saisie dans Entrée {ax.num_input} / color between")
def axes_xy(self, ax, num_input):
""" Axe abscisse et Axe ordonnée : visibilités, polices, positions. """
for b_x in [True, False]:
dic = self.od_yaml.read([f'Entrée {num_input}', 'Axe ' + ('abscisse' if b_x else 'ordonnée')])
od = self.new_od(dic)
axis = ax.xaxis if b_x else ax.yaxis
label = od.read('label', '')
fsize = od.read('label font size', 8)
tsize = od.read('ticks font size', 8)
pos_label = od.read('label pos', [7, 6] if b_x else [2, 22])
pos_axe = od.read('ticks pos', 'bottom' if b_x else 'left')
b_tick_visible = pos_axe is not False
try:
for tick in axis.get_major_ticks():
""" 1 = left ou bottom. """
tick.tick1line.set_visible(b_tick_visible) # Petit tiret : visibilité.
tick.label1.set_visible(b_tick_visible) # Texte : visibilité.
tick.label1.set_fontsize(tsize) # Texte : taille police.
""" 2 = right ou top. """
tick.tick2line.set_visible(b_tick_visible)
tick.label2.set_visible(b_tick_visible)
tick.label2.set_fontsize(tsize)
if b_tick_visible:
axis.set_ticks_position(pos_axe)
ax.set_xlabel(xlabel=label, fontsize=fsize) if b_x else ax.set_ylabel(ylabel=label, fontsize=fsize)
# https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.ticklabel_format.html#matplotlib.axes.Axes.ticklabel_format
ax.ticklabel_format(style='plain', useOffset=False) # plain si plus de 6 chiffres.
axis.set_label_coords(pos_label[0]/100, pos_label[1]/100)
except (Exception,):
axe = f"Entrée {num_input} / Axe {'abscisse' if b_x else 'ordonnée'}"
print(f"Yaml.matplotlib, erreur de saisie dans {axe}")
def legend(self, ax, num_input):
""" Cartouche de légendes : visibilité, position, nb colonnes, couleurs. """
""" https://matplotlib.org/stable/api/legend_api.html (Pycharm : Ctrl + clic) """
od_legend = self.new_od(self.od_yaml.read([f'Entrée {num_input}', 'Légende'], {}))
d_legend = {
'loc': od_legend.read('loc', 'upper left'),
'ncol': od_legend.read('ncol', 2),
'framealpha': od_legend.read('framealpha', .3),
'facecolor': od_legend.read('facecolor', '#ffe'),
'edgecolor': od_legend.read('edgecolor', '#333'),
}
b_visible = od_legend.read('visible', True)
if b_visible:
ax.legend(**d_legend).set_frame_on(True)
else:
legend = ax.get_legend()
if legend is not None:
legend.remove()
def line_params(self, o_line):
"""
:param o_line: objet Line2D, auquel on a ajouté un membre :
o_line.od = super-dictionnaire des paramètres du dockable.
:return: NA.
On a ici 2 super-dictionnaires exploitables :
- o_line.od = dockable des paramètres du PC : seulement la grappe concernant cette courbe (line).
- self.od_yaml = paramètres du fichier yaml éditable manuellement.
Code libre, à compléter au fur et à mesure des besoins du yaml.
"""
l_key_line = [o_line.od.read('Entrée'), o_line.name]
""" alpha (transparence) """
alpha = self.od_yaml.read(l_key_line + ['alpha'], None)
if alpha is not None:
o_line.set_alpha(alpha)
☐ Fichier complet matplotlib_seeder.yaml
:
---
Models: # /nodes/afficheurs/plots/models
# Liste de modèles fusionnés. Cependant, les données de ce fichier restent prioritaires.
# - subplots_60_20_20.yaml
- xxxxxxxx.yaml
- yyyyyyyy.yaml
- zzzzzzzz.yaml
Fenêtre:
# Redémarrer pour la prise en compte des paramètres de la fenêtre.
# Styles disponibles, faire : [print(style) for style in plt.style.available]
# ['seaborn', 'dark_background', 'Solarize_Light2', '_classic_test_patch', 'bmh', 'classic', 'fast', ...]
style: dark_background
# Pour voir tous les paramètres disponibles, faire: Dictionary(rcParams).print()
# (Vérifier que l'import existe : from matplotlib import rcParams)
rcParams:
toolbar: 'None' # 'None', 'toolbar2', 'toolmanager'
axes.titlesize: 6 # Relancer. N'est pas pris en compte par certains styles (seaborn, ... à tester.)
legend.fontsize: 7 # Ne pas relancer.
# Types de graphiques -> plt.{x} : plot, hist, pie, bar, scatter
# Exemples d'attributs : https://python.doctor/page-creer-graphiques-scientifiques-python-apprendre
# - plt.grid(True)
# - plt.text(150, 6.5, r'Danger')
# - plt.xlabel('Vitesse')
# - plt.ylabel('Temps')
# - plt.legend()
# - plt.annotate('Limite', xy=(150, 7), xytext=(165, 5.5), arrowprops={'facecolor':'black', 'shrink':0.05})
Figure:
marges: [0, 0, 14, 0] # En % : haute, droite, basse, gauche. '_' = valeur par défaut
Entrée 0:
Réticule: [ both, dotted, C9 ] # Traits verticaux(x), horizontaux (y) ou les 2 (both).
# Entrée 1:
# Entrée 2:
# Entrée 3:
# Entrée 4:
# Entrée 5:
# Entrée 6:
# Entrée 7:
Entrée x - exemples de code:
# Note sur les couleurs. Palette ou Hexa :
# Palette : C0 à C9 ...
# ... ou bien écriture hexadécimale -> 3, 4, 6 ou 8 chiffres hexa :
# 6 chiffres hexa. ex : #2fd155 -> rouge : #2f, vert : #d1, bleu : #55
# 8 chiffres hexa. ex : #f411a680 -> 6 chiffres = Couleur, les 2 derniers = alpha.
# 3 chiffres hexa. ex : #4b6 identique à #44bb66
# 4 chiffres hexa. ex : #87fc identique à #8877ffcc <-- transparence = #cc
Géométrie: # _ = valeur par défaut. Valeurs de 0 à 100.
taille: [0, 0, 60, 70] # En % : x, y (coin haut gauche), largeur, hauteur
marges: [10, _, _, 12] # En % : haut, droit, bas, gauche.
Légende:
visible: False # True par défaut
loc: 'upper right' # Automatique par défaut
ncol: 2 # 1 par défaut
framealpha: .2 # 1 par défaut (opaque)
facecolor: '#ffe' # Transparent par défaut
edgecolor: '#333' # Bordure blanche par défaut
Titre visible: False
Cadre: False # ax.set_frame_on(bool)
Réticule: [ both, dotted, C9 ] # Traits verticaux(x), horizontaux (y) ou les 2 (both).
color between:
- Nom quelconque 1: # Juste pour l'utilisateur. Non utilisé dans le code.
# Nom des courbes : seule une partie du nom affiché dns le dockable.
courbes: [ -MACD, 0, Courte ] # Seules le 2 premières valeurs de la liste sont traitées.
couleurs: [ '#00f8', '#f006' ]
- RSI - Nom quelconque 2:
courbes: [ Normal, Normal$low ] # Ici emploi de la variable 'low' du signal contenant 'Normal' dans son nom.
couleurs: [ '#0000', '#f008' ]
Axe abscisse:
label: Blah-x Blah-1 # ax.set_xlabel(...)
label pos: [ 7, 4 ] # 0 à 100. Pourcentages (horizontal, vertical) : (50, 0) = center
label font size: 8
ticks pos: False # bottom (défaut), top. ax.xaxis.tick_top() Position de l'abscisse : bottom, top, False (invisible)
ticks font size: 8
Axe ordonnée:
label: Blah-y Blah-1 # ax.set_ylabel(...)
label pos: [ 2, 14 ]
label font size: 8
ticks pos: False # ax.yaxis.tick_left() Position de l'ordonnée : left, right, False (invisible)
ticks font size: 8
ShareX: False # Axe x partagé. True par défaut.
ShareW: False # Largeur partagée. True par défaut. (Nombre de points en abscisse).
Réglage des marges par défaut : default_fig_margins = [0, 0, 14, 0]
setup
de ShowMatPlotLib
.Signaux
est de type générateur. Ses calculs sont globaux.Aléatoire
./nodes/generateurs/signaux/signaux.py
:
# imports internes
import functions.indicators as indic
from nodes.generateurs.signaux.signaux_yaml import YamlParams
from pc.ctrl_node import CtrlNode, CtrlCalcul
d_datas = {
'name': 'Signaux',
'icon': 'signaux.png',
}
class Node(CtrlNode):
def __init__(self, o_scene, s_id, pos):
super().__init__(o_scene, s_id, pos)
self.type = 'Signaux'
self.main_key = f"Paramètres du node Signaux-{self.id}"
self.setup()
def setup(self, child_file=__file__):
super().setup(child_file)
@property
def ld_outputs(self):
return [{
'label': 'Sortie',
'label_pos': (-38, -10)
}]
def fixed_params(self):
return {
'Sinus': True,
'Cosinus': False,
'Carré': False,
'Triangle': False,
'Dent de scie montante': False,
'Dent de scie descendante': False,
'Amplitude': [20, {'compactHeight': False}],
'Points par période': [100, {'compactHeight': False}],
}
def my_signals(self, l_signals_in, num_socket_out):
""" Voir documentation dans la classe-mère CtrlNode.my_signals().
:param l_signals_in: Liste toujours vide car ce node n'a pas d'entrée.
:param num_socket_out: Ce node n'a qu'une sortie : N° 0.
:return: Liste de tuples. Pour chaque signal actif, un tuple de 4 valeurs.
"""
l_signals = list()
for signal in self.fixed_params().keys():
if isinstance(self.get_param(signal), bool) and self.get_param(signal, False):
typ_id, signal_ante, signal_now, signal_source = f'{self.type}{self.id}', '', signal, ''
l_signals.append((typ_id, signal_ante, signal_now, signal_source))
return l_signals
def get_state(self):
""" Surcharge. """
if self.b_chk:
for signal in self.fixed_params().keys():
if isinstance(self.get_param(signal), bool) and self.get_param(signal, False):
return True
return False
def refresh(self, l_keys):
""" Propagation en cascade : Seulement sur les noms de signaux (type valeur = bool). """
if isinstance(self.get_param(l_keys[-1]), bool):
self.lo_sockets_out[0].to_update()
""" Infrastructure :Insertion de 'infrastructure' en 1ère place. """
l_keys = ['infrastructure'] + l_keys
super().refresh(l_keys)
class Calcul(CtrlCalcul):
""" ********** Le code ci-dessous ne concerne pas le poste de contrôle, mais seulement les calculs. ********** """
def __init__(self):
super().__init__()
self.o_yaml = YamlParams(self) # Classe spécifique à 'Signaux'.
def descr_signal(self, od_descr, val, root_key):
""" - Création du super-dictionnaire de description. Voir docstring dans la classe-mère.
- key_dock doit avoir EXACTEMENT la même orthographe que la clé affichée
dans le dockable des paramètres du node CLIENT.
- Sources EXCLUSIVES : self.od_mydic et self.o_yaml. """
for signal_name, val in self.od_mydic.items():
if isinstance(val, bool) and signal_name != 'Checked':
key_dock = f'{self.s_id}-{signal_name}' # Ex : Signaux2-Carré
per_amp = self.o_yaml.get_params(signal_name)
od_descr.write([key_dock, 'notes'], signal_name)
od_descr.write([key_dock, 'actif'], self.od_mydic.read(signal_name, False)) # True ou False.
od_descr.write([key_dock, 'per_amp'], per_amp)
def pre_process(self):
""" Voir docstring dans la classe-mère. """
""" La récupération des paramètres se fait EXCLUSIVEMENT depuis le super-dictionnaire self.od_descr. """
""" Le tableau numpy (self.np_array) est entièrement rempli, en une seule passe. """
for key_dock, val in self.od_descr.items():
if key_dock.startswith(self.s_id+'-'):
period, amplitude = val.get('per_amp')
signal_name = key_dock.split('-')[1]
num_col = val.get('num_col')
vector = indic.get_periodic(self.len_buffer, signal_name, period, amplitude)
self.np_array[:, num_col: num_col+1] = vector
self.np_array[:, 0] = 1 # Colonne des status.
Moyennes mobiles
produit ses signaux à la demande, en direct.pre_process()
est donc vide ou inexistante. Elle peut toutefois être utilisée pour faire un pré-traitement.ShowMatPlotLib.show_anim()
) est piloté par timer.
get_matrix()
.calculate()
effectue les calculs. Elle est donc appelée à chaque affichage.Calcul
n'a que 2 méthodes : descr_signal()
et calculate()
./nodes/indicateurs/moy_mobile/moy_mobile.py
:
# Imports internes
import functions.indicators as indic
from nodes.indicateurs.moy_mobile.moy_mobile_yaml import YamlParams
from pc.ctrl_node import CtrlNode, CtrlCalcul
d_datas = {
'name': 'MMobile', # Label affiché sous l'icone.
'icon': 'moy_mobile.png', # Icone affichée.
}
class Node(CtrlNode):
def __init__(self, o_scene, s_id, pos):
super().__init__(o_scene, s_id, pos)
self.type = 'MM'
self.setup()
def setup(self, child_file=__file__):
self.l_short_circuits = [(0, 0)] # Entrée N°0 'court-circuitable' avec la sortie N°0.
super().setup(child_file)
@property
def ld_inputs(self):
return [{
'label': '',
'label_pos': (6, -10)
}]
@property
def ld_outputs(self):
return [{
'label': 'Sortie',
'label_pos': (-38, -10)
}]
def my_params(self, context):
return {
'Original': True,
'Type de MM': ['SMA', {'values': ['SMA', 'EMA', 'SMMA', 'LWMA']}],
"Périodes (sep=',')": '14',
}
def my_signals(self, l_signals_in, num_socket_out):
""" Faisceau de signaux (l_signals) délivré à la sortie. """
l_signals = list() # Liste de tuples à 4 valeurs.
for signals_in in l_signals_in[0]:
typ_id_from, signal_ante_from, signal_now_from, signal_source_from, \
_, signal_title, typ_id, signal_ante, signal_source = signals_in[:9]
try:
periods = [int(p.strip()) for p in self.get_param([signal_title, "Périodes (sep=',')"]).split(',')]
except (Exception,):
periods = [14]
if self.get_param([signal_title, 'Original'], True):
l_signals.append((typ_id, signal_ante, 'Original', signal_source))
for num, period in enumerate(periods):
legend = f"{signal_ante}-{self.get_param([signal_title, 'Type de MM'], 'SMA')}{period}"
signal_now = f'sign{num}'
signal_source = self.join(signal_source_from, self.join(typ_id_from, signal_now_from), sep='\n')
typ_id, signal_ante = f'{self.type}{self.id}', self.join(signal_ante_from, signal_now_from)
l_signals.append((typ_id, signal_ante, signal_now, signal_source, legend))
return l_signals
def refresh(self, l_keys):
""" Code spécifique. """
if self.need_update(l_keys):
self.lo_sockets_out[0].to_update() # Une seule sortie, N° 0.
super().refresh(l_keys)
class Calcul(CtrlCalcul):
""" ********** Le code ci-dessous ne concerne pas le poste de contrôle, mais seulement les calculs. ********** """
def __init__(self):
super().__init__()
self.o_yaml = YamlParams(self)
def descr_signal(self, od_descr, val, root_key):
""" Voir docstring dans la classe-mère. """
""" Original. """
signal_name = root_key.split('-')[1]
key_dock = f"{root_key}-Original"
od_descr.write([key_dock, 'notes'], f'{signal_name} original')
typ = val['Type de MM']
l_periods = [int(p.strip()) for p in val["Périodes (sep=',')"].split(',')]
od_descr.write(['max_length', root_key], int(1.5 * max(l_periods)))
for num, period in enumerate(l_periods):
""" key_dock doit avoir EXACTEMENT la même orthographe que la clé affichée
dans le dockable des paramètres du node CLIENT. """
key_dock = f'{root_key}-sign{num}'
od_descr.write([key_dock, 'notes'], f'{signal_name} {typ}{period}')
od_descr.write([key_dock, 'typ'], typ)
od_descr.write([key_dock, 'period'], period)
od_descr.write([key_dock, 'kwargs', 'nb_last'], min(period, self.o_yaml.smma_nb_last))
od_descr.write([key_dock, 'kwargs', 'ratio'], self.o_yaml.lwma_ratio)
def pre_process(self):
""" - Passage unique, après descr_signal() et avant get_matrix(). voir docstring dans la classe-mère.
- Pré-traitement spécifique. """
pass # Cette méthode peut être supprimée.
def calculate(self, pointer, nb_lines, root_key, vector_in, od_buffer):
""" Passage multiple. """
_to = pointer + nb_lines
""" Original. """
key_dock = root_key + '-Original'
num_col = self.od_descr.read([key_dock, 'num_col'])
self.np_array[pointer: _to, num_col] = vector_in[-nb_lines:]
""" Si plusieurs moyennes, key_dock se termine par 'sign0', 'sign1', ... """
for key_dock, d_val in self.od_descr.items():
if key_dock.startswith(root_key+'-sign'):
num_col = d_val.get('num_col')
typ = d_val.get('typ')
period = d_val.get('period')
kwargs = d_val.get('kwargs')
vector_out = indic.get_mm(vector_in, typ=typ, period=period, **kwargs)
nb_valid_lines = min(len(vector_out), nb_lines)
self.np_array[_to - nb_valid_lines: _to, num_col] = vector_out[-nb_valid_lines:]
Aléatoire
), et serveur (de Plots
).
Les légendes ont été retirées pour améliorer la lisibilité.
low
et high
utilisées dans le yaml
pour le coloriage de zones./nodes/oscillateurs/rsi/rsi.py
:
from pc.ctrl_node import CtrlNode, CtrlCalcul
from nodes.oscillateurs.rsi.rsi_yaml import YamlParams
import functions.indicators as indic
d_datas = {
'name': 'RSI', # Label affiché sous l'icone.
'icon': 'rsi.png', # Icone affichée.
}
class Node(CtrlNode):
def __init__(self, o_scene, s_id, pos):
super().__init__(o_scene, s_id, pos)
self.type = 'RSI'
self.setup()
def setup(self, child_file=__file__):
self.l_short_circuits = [(0, 0)] # Entrée N°0 'court-circuitable' avec la sortie N°0.
super().setup(child_file)
@property
def ld_inputs(self):
return [{
'label': '',
'label_pos': (6, -10)
}]
@property
def ld_outputs(self):
return [{
'label': 'Sortie',
'label_pos': (-38, -10)
}]
def my_params(self, context):
return {
'Zone neutre': [70., {'step': 1, 'limits': (5, 95), 'suffix': '%', 'compactHeight': False}],
"Périodes (sep=',')": '14',
}
def my_signals(self, l_signals_in, num_socket_out):
""" Faisceau de signaux (l_signals) délivré à la sortie. """
l_signals = list()
for signals_in in l_signals_in[0]:
typ_id_from, signal_ante_from, signal_now_from, signal_source_from, \
_, signal_title, typ_id, signal_ante, signal_source = signals_in
periods = [int(p) for p in self.get_param([signal_title, "Périodes (sep=',')"]).split(',')]
for period in periods:
signal_now = f't{period}'
signal_source = self.join(signal_source_from, self.join(typ_id_from, signal_now_from), sep='\n')
typ_id, signal_ante = f'{self.type}{self.id}', self.join(signal_ante_from, signal_now_from)
l_signals.append((typ_id, signal_ante, signal_now, signal_source))
return l_signals
def refresh(self, l_keys):
""" Code spécifique. """
if self.need_update(l_keys):
self.lo_sockets_out[0].to_update() # Une seule sortie, N° 0.
super().refresh(l_keys)
class Calcul(CtrlCalcul):
""" ********** Le code ci-dessous ne concerne pas le poste de contrôle, mais seulement les calculs. ********** """
def __init__(self):
super().__init__()
self.o_yaml = YamlParams(self)
def descr_signal(self, od_descr, val, root_key):
""" - Voir docstring dans la classe-mère.
- Les clés 'low' et 'high' sont utilisées par matplotlib pour colorier les zônes extrêmes. """
signal_name = root_key.split('-')[1]
l_periods = [int(p.strip()) for p in val["Périodes (sep=',')"].split(',')]
od_descr.write(['max_length', root_key], int(1.5 * max(l_periods)))
for period in l_periods:
key_dock = f"{root_key}-t{period}"
""" Limites low et high. """
low = (100 - val['Zone neutre']) / 2
high = 100 - low
od_descr.write([key_dock, 'notes'], f'{signal_name} {period}')
od_descr.write([key_dock, 'period'], period)
""" Variables 'low' et 'high' utilisables dans les paramètres étendus yaml. """
od_descr.write([key_dock, 'low'], low)
od_descr.write([key_dock, 'high'], high)
def pre_process(self):
""" - Passage unique, après descr_signal() et avant get_matrix(). voir docstring dans la classe-mère.
- Pré-traitement spécifique. """
pass
def calculate(self, pointer, nb_lines, root_key, vector_in, od_buffer):
_to = pointer + nb_lines
""" Si plusieurs périodes de rsi, key_dock se termine par 't14', 't22', ... """
for key_dock, d_val in self.od_descr.items():
if key_dock.startswith(root_key+'-t'):
num_col = d_val.get('num_col')
period = d_val.get('period')
vector_out = indic.rsi(vector_in, period)
nb_valid_lines = min(len(vector_out), nb_lines)
self.np_array[_to - nb_valid_lines: _to, num_col] = vector_out[-nb_valid_lines:]
paramètres étendus du node Plots, Entrée 0
:
Entrée 0:
Réticule: [ both, dotted, C9 ] # Traits verticaux(x), horizontaux (y) ou les 2 (both).
color between:
- RSI - Haut:
courbes: [ RSI, RSI$high ] # Ici emploi de la variable 'high' du signal contenant 'RSI' dans son nom.
couleurs: [ '#88f8', '#0000' ]
- RSI - Bas:
courbes: [ RSI, RSI$low ] # Ici emploi de la variable 'low' du signal contenant 'RSI' dans son nom.
couleurs: [ '#0000', '#f888' ]
Le dernier RSI a sa zone neutre à 40%.
Il vous appartient de vous inspirer des codes précédents pour coder les types de nodes : Macd, Additionneur, Union.
Bon coding et bon courage !
Laboratoire
était dans le groupe Opérateurs
./nodes/operateurs/labo
vers /nodes/laboratoire/labo
./nodes/laboratoire/savgol
(il nous servira un peu plus loin).UiMain.setup() > self.do_groupes_nodes
./pc/main.py > UiMain.setup()
:
def setup(self):
""" Feuille de style. """
with open('nodestyle.qss', 'r', encoding='utf-8') as file:
QApplication.instance().setStyleSheet(file.read())
self.do_groups_nodes = { # Chaque onglet du dockable 'Noeuds' = Un groupes de modèles de nodes.
'Afficheurs': GroupNodes('afficheurs'),
'Générateurs': GroupNodes('generateurs'),
'Indicateurs': GroupNodes('indicateurs'),
'Oscillateurs': GroupNodes('oscillateurs'),
'Opérateurs': GroupNodes('operateurs'),
'Laboratoire': GroupNodes('laboratoire'),
}
ut.restore_state(self) # Restauration de l'état de la fenêtre.
ut.restore_tabs(self) # Restauration des onglets du panneau central.
self.show_dock_nodes()
self.tab_changed()
/nodes/laboratoire/savgol
, et contiendra les 4 fichiers.savgol.png
: savgol_seeder.yaml
:
# Fichier vide. À compléter si nécessaire.
savgol_yaml.py
:
from nodes.yaml_parent import YamlParent
class YamlParams(YamlParent):
def __init__(self, o_calc):
super().__init__(o_calc, __file__)
savgol.py
:
# Imports internes
import functions.indicators as indic
from nodes.laboratoire.savgol.savgol_yaml import YamlParams
from pc.ctrl_node import CtrlNode, CtrlCalcul
d_datas = {
'name': 'SavGol', # Label affiché sous l'icone.
'icon': 'savgol.png', # Icone affichée.
}
class Node(CtrlNode):
def __init__(self, o_scene, s_id, pos):
super().__init__(o_scene, s_id, pos)
self.type = 'SavGol'
self.setup()
def setup(self, child_file=__file__):
self.l_short_circuits = [(0, 0)] # Exemple : ntrée N°0 'court-circuitable' avec la sortie N°0.
super().setup(child_file)
@property
def ld_inputs(self):
return [{
'label': 'e',
'label_pos': (6, -10)
}]
@property
def ld_outputs(self):
return [{
'label': 's',
'label_pos': (-22, -10)
}]
def my_params(self, context):
return {
'Original': True,
'Taille window': [25, {'step': 2, 'compactHeight': False}],
"Ordre du filtre": [3, {'values': [1, 2, 3, 4, 5, 6, 7]}],
"Trace finale": False,
'Mode': ['interp', {'values': ['mirror', 'constant', 'nearest', 'wrap', 'interp']}]
}
def my_signals(self, l_signals_in, num_socket_out):
"""
@param l_signals_in: Liste de listes (autant que d'entrées) des signaux aux entrées.
@param num_socket_out: Non utilisé.
@return: Liste de 1 à 3 signaux : Original (option), Signal SavGol, Trace (option).
"""
""" Faisceau de signaux (l_signals) délivré à la sortie. """
l_signals = list() # Liste de listes (autant que de sorties), de 4-tuples (tuples de 4 valeurs).
for signals_in in l_signals_in[0]:
typ_id_from, signal_ante_from, signal_now_from, signal_source_from, \
_, signal_title, typ_id, signal_ante, signal_source = signals_in[:9]
typ_id = f'{self.type}{self.id}'
signal_ante = self.join(signal_ante_from, signal_now_from)
""" Signaux transmis aux nodes-clients : 1) Original. """
if self.get_param([signal_title, 'Original'], True): # Case à cocher.
signal_now = 'Original' # Fin du titre dans nodes-clients.
l_signals.append((typ_id, signal_ante, signal_now, signal_source))
""" 2) Signal. """
ident = 'SavGol' # Légende par défaut dans le node-client.
signal_now = 'Signal' # Fin du titre dans nodes-clients.
l_signals.append((typ_id, signal_ante, signal_now, signal_source, ident))
""" 3) Trace finale. """
if self.get_param([signal_title, 'Trace finale'], True):
signal_now = 'Trace' # Fin du titre dans nodes-clients.
l_signals.append((typ_id, signal_ante, signal_now, signal_source))
return l_signals
def refresh(self, l_keys):
if self.need_update(l_keys):
""" Signale aux nodes en aval (via les socket_out), que leurs calculs doivent être refaits. """
self.lo_sockets_out[0].to_update() # Exemple : ici, une seule sortie, la N° 0.
super().refresh(l_keys)
class Calcul(CtrlCalcul):
""" ********** Le code ci-dessous ne concerne pas le poste de contrôle, mais seulement les calculs. ********** """
def __init__(self):
super().__init__()
self.o_yaml = YamlParams(self)
self.b_forcing_calc = True
def descr_signal(self, od_descr, val, root_key):
""" Voir docstring dans la classe-mère. """
""" Un passage par signal d'entrée. """
signal_name = root_key.split('-')[1]
window_size = 2 * (val.get('Taille window', 26) // 2) + 1 # Ce doit être un nombre impair.
od_descr.write(['max_length', root_key], window_size)
order = val.get('Ordre du filtre', 3)
mode = val.get('Mode', 'interp')
""" 1) Original."""
key_dock = f"{root_key}-Original"
od_descr.write([key_dock, 'notes'], f'{signal_name} original')
""" 2) Signal. """
key_dock = f'{root_key}-Signal'
od_descr.write([key_dock, 'notes'], f'{signal_name} filtré')
od_descr.write([key_dock, 'taille_window'], window_size)
od_descr.write([key_dock, 'ordre_filtre'], order)
od_descr.write([key_dock, 'mode'], mode)
""" 3) Trace. """
key_dock = f"{root_key}-Trace"
od_descr.write([key_dock, 'notes'], f'Trace du dernier point de {signal_name}')
od_descr.write([key_dock, 'actif'], val.get('Trace finale', False))
def pre_process(self):
""" Voir docstring dans la classe-mère. """
""" Mémorisation des paramètres pour accélérer le traitement. """
for _, root_key in self.od_descr.get('roots'):
window_size = self.od_descr.read([f'{root_key}-Signal', 'taille_window'])
order = self.od_descr.read([f'{root_key}-Signal', 'ordre_filtre'])
mode = self.od_descr.read([f'{root_key}-Signal', 'mode'])
col_orig = self.od_descr.read([f'{root_key}-Original', 'num_col'])
col_signal = self.od_descr.read([f'{root_key}-Signal', 'num_col'])
col_trace = self.od_descr.read([f'{root_key}-Trace', 'num_col'])
self.od_descr.write([root_key, 'params'], (window_size, order, mode, col_orig, col_signal, col_trace))
pass
def calculate(self, pointer, nb_lines, root_key, vector_in, od_buffer):
_to = pointer + nb_lines
""" Lecture des paramètres. """
window_size, order, mode, col_orig, col_signal, col_trace = self.od_descr.read([root_key, 'params'])
""" Signal original. """
nb_valid_lines = min(len(vector_in), nb_lines)
self.np_array[_to - nb_valid_lines: _to, col_orig] = vector_in[-nb_valid_lines:]
""" Filtre savitzky_golay. """
v_savgol = indic.savitzky_golay(vector_in, window_size, order, mode=mode)
nb_valid_lines = min(len(v_savgol), nb_lines)
self.np_array[_to - nb_valid_lines: _to, col_signal] = v_savgol[-nb_valid_lines:]
""" Trace = mémorisation du dernier point de v_savgol. """
v_len = len(vector_in)
for indx in range(nb_lines):
v_size = v_len - nb_lines + indx + 1
""" Seulement si le status est à 0. """
if self.np_array[pointer + indx, 0] == 0:
v_mobile = vector_in[: v_size]
v_savgol = indic.savitzky_golay(v_mobile, window_size, order, mode=mode)
self.np_array[pointer + indx, col_trace] = v_savgol[-1]
indicators.py
a besoin de cette nouvelle fonction./functions/indicators.py > savitzky_golay()
:
def savitzky_golay(v_signal, window_size, order, mode='interp'):
""" (Ctrl + Clic)
https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.savgol_filter.html
"""
if mode == 'interp' and len(v_signal) < window_size:
""" En mode 'interp', v_size doit être >= window_size. """
return np.full(v_signal.shape, np.nan)
else:
return sig.savgol_filter(v_signal, window_size, order, mode=mode)
Scipy
.signal
de Scipy
dans le fichier indicators.py
.
# Imports externes
import scipy.signal as sig
show_matplotlib.py
depuis Pycharm.
☐ Nettoyer le code, supprimer les print() et autres codes de mise au point.
Snippets
Bonjour les codeurs !