Transmission de signaux entre nodes
Avant-propos
print()
, les imports
et les commentaires inutiles.1 - Graphe |
2 - Déplacement partiellement hors champ |
3 - Relancement de l'application : les edges ont disparu ! |
UI
: lorsqu'on fait appel à la méthode paint()
, il est nécessaire de déclarer au préalable un rectangle délimitant l'objet à dessiner : il s'agit de la méthode boundingRect()
.
UiNode
et UiSocket
, elle est bien présente.UiEdge
, elle est absente : Oups !UiEdge.boundingRect()
:
def boundingRect(self):
""" Méthode nécessaire, sans quoi les affichages hors champ n'ont pas lieu. """
t_ends = self.o_edge.end_points
point_from, point_to = t_ends[0], t_ends[1] # Point de départ (P0), Point d'arrivée (P3)
x, y = min(point_from.x(), point_to.x()), min(point_from.y(), point_to.y())
w, h = max(point_from.x(), point_to.x()) - x, max(point_from.y(), point_to.y()) - y
return QRectF(x, y, w, h).normalized()
(Importer QRectF)
Vérifier.
1 - État initial |
2 - Retrait du dockable des nodes |
3 - Relancement de l'appli : la position n'a pas été mémorisée. |
Ce n'est pas très grave, mais nous allons tout de même le corriger :
"resize anchor"
à UiView
. Il s'agit de self.setResizeAnchor()
dans le code ci-dessous.init_ui()
en 2.UiView.init_ui()
et UiView.set_flags()
:
def init_ui(self):
""" Le centrage est retardé, pour laisser le temps au processur graphique de tout dessiner. """
p_center = self.od_pkl.read(['UiView', 'position'], (0, 0)) # (0, 0) = tuple par défaut.
self.dt.delay(self.centerOn, 20, *p_center) # 20ms, x et y de p_center.
""" Restauration du zoom. """
self.zoom = self.od_pkl.read(['UiView', 'zoom'], 0)
zoom_factor = self.zoom_factor**self.zoom
self.scale(zoom_factor, zoom_factor)
""" Flags. """
self.set_flags()
def set_flags(self):
self.setResizeAnchor(QGraphicsView.AnchorViewCenter) # Redimensionnement : le point fixe est au centre.
self.setRenderHints(QPainter.Antialiasing | QPainter.HighQualityAntialiasing |
QPainter.TextAntialiasing | QPainter.SmoothPixmapTransform)
self.setViewportUpdateMode(QGraphicsView.FullViewportUpdate) # Sans cette ligne, warning dans drawBackground.
self.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff) # Slider horizonal non affiché.
self.setVerticalScrollBarPolicy(Qt.ScrollBarAlwaysOff) # Ascenseur non affiché.
self.setTransformationAnchor(QGraphicsView.AnchorUnderMouse) # Zoom -> Centre de la vue = position souris.
self.setDragMode(QGraphicsView.RubberBandDrag) # Sélections par dragging
Vérifier.
id
du node dans l'affichage de son type.setPlainText()
dans UiNode
.""" Texte du type """
de UiNode.init_ui()
:
self.type_item.setPlainText(f"{self.o_node.type}-{self.o_node.id}")
Description
Exemple de graphe minimaliste.
Node 'Signaux' |
Node 'Moyennes' |
Node 'Graphique' |
SMA
de période 14.SMMA
de période 12.période de 12
.12, 15, 23
. Dans ce cas, 3 moyennes distinctes pour le carré.calculs.pkl
: un par graphe.Node3
(Graphique) dépendent du node précédent, Node2
(Moyennes).Node2
(Moyennes) dépendent du node précédent Node1
(Signaux).dyn_params()
) pour mettre à jour les paramètres dynamiques de chaque node.show_items()
.CtrlScene.show_items()
:
def show_items(self):
""" Appel : démarrage de l'appli et undo-redo. """
""" Reset : suppression de tous les items. """
l_gritems = self.o_grscene.items()
for gritem in l_gritems:
self.o_grscene.removeItem(gritem)
if DEBUG:
self.debug()
else:
""" Affichage des nodes mémorisés dans les paramètres. """
self.show_nodes()
""" Affichage des edges mémorisés dans les paramètres. """
self.show_edges()
""" Paramètres dynamiques. """
self.dyn_params()
""" Aucun item sélectionné => Paramètres de la scène. """
self.show_params()
dyn_params()
n'existe pas. Nous la créons.CtrlScene.dyn_params()
:
def dyn_params(self):
""" l_ends = Liste des nodes finaux : AVEC entrée(s) ET n'ayant pas de connexion en aval. """
l_ends = list()
for o_grnode in self.get_grnodes():
if o_grnode.o_node.lo_sockets_in: # Node avec socket(s) d'entrée.
nb_edges_out = 0 # Compteur d'edges en sortie.
for o_socket_out in o_grnode.o_node.lo_sockets_out:
nb_edges_out += len(o_socket_out.get_gredges())
if nb_edges_out == 0: # Si aucun edge en sortie, c'est un node final.
l_ends.append(o_grnode.o_node)
""" Paramètres dynamiques. """
for o_node in l_ends:
o_node.o_params.set_params()
DEBUG
dans show_items()
:
CtrlScene.debug()
:
def debug(self):
""" En cas d'erreur de compilation, passer b_compile à False pour connaître l'erreur. Tester les 2 cas. """
b_compile = False
""" 2 nodes reliés par un edge. """
if b_compile:
""" Avec compilation dynamique. """
self.o_pkl.od_pkl.clear()
self.o_pkl.od_pkl.write(['Node0', 'path'], 'nodes.generateurs.signaux')
self.o_pkl.od_pkl.write(['Node0', 'position'], (-144, -32))
self.o_pkl.od_pkl.write(['Node1', 'path'], 'nodes.afficheurs.plot')
self.o_pkl.od_pkl.write(['Node1', 'position'], (48, -32))
self.o_pkl.od_pkl.write('edges', {((0, 0), (1, 0))})
self.show_nodes()
self.show_edges()
else:
""" Sans compilation dynamique. """
from nodes.generateurs.signaux import Node
o_signaux = Node(self, 'Node0', (-140, 0))
from nodes.afficheurs.plot import Node
o_plot = Node(self, 'Node1', (100, 0))
o_socket_out = o_signaux.lo_sockets_out[0]
o_socket_in = o_plot.lo_sockets_in[0]
CtrlEdge(self, o_socket_out, o_socket_in)
Affichage de 2 nodes liés, avec ou sans compilation dynamique.
Retirez l'éventuel debug
du setup()
, puisqu'il a été déplacé dans show_items()
.
CtrlScene.setup()
:
def setup(self):
""" Paramètres. """
self.o_pkl = Pkl(self)
self.o_params = Parameters(self)
self.friendly_name = self.o_params.od_params.read([self.main_key, "Nom de l'onglet"], 'Choisir un nom')
""" Undo-Redo. """
self.o_ur = UndoRedo(self)
""" Création de la scène. """
self.o_grscene = UiScene(self) # pyqtgraph.GraphicsScene.GraphicsScene dérivée.
""" Création de l'unique vue. """
self.o_grview = UiView(self) # pyqtgraph.GraphicsView dérivée.
""" Affectaion de la scène à visionner. """
self.o_grview.setScene(self.o_grscene) # Unique vue de la scène (il pourrait y en avoir plusieurs).
""" 'Boîte' container verticale - elle pourrait aussi être horizontale car elle ne contient qu'un élément. """
box = QVBoxLayout() # Son but est d'étirer la vue sur toute sa surface.
""" On place la vue dans cette boîte. """
box.addWidget(self.o_grview)
""" Suppression des marges (10px par défaut). """
box.setContentsMargins(0, 0, 0, 0)
""" Enfin, on place cette boîte dans l'onglet. """
self.setLayout(box)
""" Affichage des items : nodes et edges, mémorisés dans les paramètres. """
self.show_items()
""" MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** """
# self.o_pkl.od_pkl.print()
""" MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** """
Propagation vers l'amont. Partons dans notre graphe-exemple, du Plot-3
, lire dans le sens des flèches :
Signaux(1) ← Socket_out(1.0) ← Edge((1.0),(2.0)) ← Socket_in(2.0) ← Moyennes(2) ← Socket_out(2.0) ← Edge((2.0),(3.0)) ← Socket_in(3.0) ← Plot(3)
Plot-3
demande à son Socket_in-3-0
la liste des noms de signaux afin qu'il puisse les afficher dans le dockable de ses paramètres.Socket_in-3-0
demande cette liste à l'edge connecté.Socket_out-2-0
.Socket_out-2-0
demande cette liste à son node MM-2
.MM-2
doit faire un traitement : Sortie0 = f(Entrée0, paramètres)
.
Socket_in-2-0
.Socket_in-2-0
la liste des noms de signaux, car il en a besoin :
Plot-3
.Socket_in-2-0
demande cette liste à l'edge connecté.Socket_out-1-0
.Socket_out-1-0
demande cette liste à son node Signaux-1
.
Propagation vers l'aval, lire dans le sens des flèches :
Signaux(1) → Socket_out(1.0) → Edge((1.0),(2.0)) → Socket_in(2.0) → Moyennes(2) → Socket_out(2.0) → Edge((2.0),(3.0)) → Socket_in(3.0) → Plot(3)
Signaux-1
retourne cette liste : ["Signaux-1 : Cosinus", "Signaux-1 : Carré"]
à son Socket_out-1-0
.
Socket_out-1-0
fournit cette liste à tous les edges connectés (ici, il n'y en a qu'un).Socket_in-2-0
.Socket_in-2-0
fournit cette liste à son node MM-2
.MM-2
fait son traitement : Sortie0 = f(["Signaux-1 : Cosinus", "Signaux-1 : Carré"], paramètres) ...
Socket_out-2-0
.Socket_out-2-0
fournit cette liste à tous les edges connectés (ici, il n'y en a qu'un).Socket_in-3-0
Socket_in-3-0
fournit cette liste à son node, à savoir Plot-3
.
Représentation généralisée d'un node.
Cette apparente complexité est réduite à 4 traitements :
node
, le traitement sera fait dans la méthode get_signals()
, exclusivement appelée par ses socket_out
.socket_out
, nous aurons la @property l_signals
, exclusivement appelée par ses edges
.edges
, nous aurons la @property l_signals
, exclusivement appelée par leur socket_in
d'arrivée.socket_in
, nous aurons la @property l_signals
, exclusivement appelée par leur node
.socket_in
et les socket_out
faisant partie de la même classe CtrlSocket
, les traitements 2 et 4 seront groupés dans la même @property l_signals
.
Création des paramètres dynamiques.
Plot-3
par exemple, on obtient ceci :
Seuls les paramètres communs sont affichés.
|
Node de type 'Plots' à 4 entrées. |
Résultat à obtenir.
DEBUG
à True
en début du fichier ctrl_scene.py
.debug()
crée 2 nodes reliés (image ci-dessus).Node
de /nodes/generateurs/signaux.py
.Node
de /nodes/afficheurs/plot.py
.CtrlNode
.CtrlSocket
.CtrlEdge
.signaux.py > Node
.plot.py > Node
.Parameters
, 2 modifications :
Parameters.od_default
.Parameters.setup()
vers Parameters.set_params()
:
def setup(self):
self.o_scene = self.o_parent if self.o_parent.s_id == 'CtrlScene' else self.o_parent.o_scene
self.od_real = self.o_scene.o_pkl.od_pkl
self.layout = self.o_scene.o_main.params_layout # Nom donné dans Qt Designer.
self.od_params = Dictionary()
self.set_params()
def set_params(self):
""" Les paramètres par défaut contiennent des paramètres fixes et des paramètres dynamiques. """
self.od_default = Dictionary(self.o_parent.get_default())
ll_keys = self.od_default.key_list()
""" self.od_params n'est instancié qu'une fois (setup()), mais affecté plusieurs fois (ici). """
self.od_params.clear() # self.od_params conserve son adresse-mémoire.
for l_keys in ll_keys:
v_default = self.od_default.read(l_keys)
if isinstance(v_default, list):
v_default = v_default[0]
l_k = [self.o_parent.s_id] + l_keys
self.od_params.write(l_keys, self.od_real.read(l_k, v_default))
Cela permet de rafraîchir les paramètres dynamiques à chaque modification.
Amélioration de la méthode dic2params()
. Ajout du type de paramètre text
(Champ texte multi-lignes).
Parameters.dic2params()
:
def dic2params(self):
r""" La valeur de retour est une liste formatée pour satisfaire aux conditions imposées par pyqtgraph.Parameter.
Cette liste contient un ou plusieurs dictionnaires, contenant à leur tour :
- Des paramètres (nom, type, valeur par défaut, valeur réelle, paramètres optionnels)
- Des 'children' de type 'group' qui contiennent à leur tour une liste au même format. <-- list()
L'ensemble des paramètres constitue donc une arborescence (donc hiérarchique) assez complexe.
Voir exemple : D:\anaconda\envs\robot\Lib\site-packages\pyqtgraph\examples\parametertree.py
"""
def _compute_val(l_keys, key, val):
""" Cette méthode produit un dictionnaire (5 clés ou plus) selon les exemples ci-après :
Exemple1 :
- Avec l_keys = ["Paramètres du graphe 'Londres'"]
key = "Nom de l'onglet"
val = "Choisir un nom"
- On obtient : dic = {
'name': "Nom de l'onglet",
'default': 'Choisir un nom', # Provient des valeurs par défaut (od_default).
'value': 'Stratégie 01', # Provient des valeurs réelles (od_real).
'l_keys': ["Paramètres du graphe 'Londres'", "Nom de l'onglet"],
'type': 'str'
}
Exemple2 :
- Avec l_keys = ["Paramètres du graphe 'Londres'", 'Épaisseurs']
key = "Traits"
val = [1.1, {'step': 0.1, 'limits': (0.1, 3.0)}]
- On obtient : dic = {
'step': 0.1,
'limits': (0.1, 3.0),
'name': 'Traits',
'default': 1.1,
'value': 1.0,
'l_keys': ["Paramètres du graphe 'Londres'", 'Épaisseurs', 'Traits'],
'type': 'float'
}
"""
dic = dict()
if isinstance(val, list):
if len(val) != 2 or not isinstance(val[1], dict):
return False
dic = val[1]
val = val[0]
if 'values' in dic:
dic['type'] = 'list' # Liste de choix.
if key[0] == '$':
dic['type'] = 'text' # Champ texte multilignes.
key = key[1:]
dic['name'] = key
dic['default'] = val
dic['value'] = self.od_real.read([self.o_parent.s_id] + l_keys + [key], dic['default'])
dic['l_keys'] = l_keys + [key]
if 'type' not in dic: # Si ce n'est pas une liste ...
if isinstance(val, bool):
dic['type'] = 'bool'
elif isinstance(val, int):
dic['type'] = 'int'
elif isinstance(val, float):
dic['type'] = 'float'
elif isinstance(val, str):
if val and val[0] == '#':
dic['type'] = 'color'
dic['default'] = pg.mkColor(dic['default'])
dic['value'] = pg.mkColor(dic['value'])
else:
dic['type'] = 'str' # ... c'est un str.
return dic
def _build_params(default_in, list_out, l_keys):
""" Fonction récursive.
:param default_in: Sous-dictionnaire des valeurs par défaut en entrée.
:param list_out: Liste en sortie, formatée pour ParameterTree. Modifiée 'in place'.
:param l_keys: clés 'à plat' (liste) des dictionnaires od_default et od_real.
:return: list_out, modifiée 'in place'.
"""
for key, val in default_in.items():
if isinstance(val, dict):
branche = {'name': key, 'type': 'group', 'children': []}
list_out.append(branche)
l_keys.append(key) # Ajout d'une clé pour traiter la descendance.
_build_params(val, branche['children'], l_keys)
del(l_keys[-1]) # Descendance traitée => suppression de la dernière clé.
else:
""" Traitement déporté pour simplifier la compréhension du code. """
dic = _compute_val(l_keys, key, val) # Retourne un dictionnaire, sauf si erreur.
if isinstance(dic, dict):
list_out.append(dic)
l_params = list()
_build_params(copy.deepcopy(self.od_default), l_params, [])
return l_params
Cela permet d'afficher dans le dockable, le parcours du signal (node après node), sur plusieurs lignes.
Classe UiView
, 1 modification :
Propagation des modifications lors de l'ajout d'un edge. Modifier la méthode stop_edge()
.
UiView.stop_edge()
:
def stop_edge(self, ev):
def break_edge(o_socket_out):
""" Remplace un edge par 2 edges. Plusieurs conditions sont nécessaires :
- Le socket d'arrivée du mi-edge est déjà occupé par un edge.
- Le node de départ a au moins un socket d'entrée libre.
- Le tuple formé par ce socket d'entrée et le socket de sortie en cours est dans l_short_circuits.
"""
o_node = self.o_mid_edge.o_socket_out.o_node # Node de départ.
num_socket_out = self.o_mid_edge.o_socket_out.t_id[1] # N° du socket de sortie en cours de liaison.
for s_c in o_node.l_short_circuits:
""" Recherche des sockets d'entrée pouvant être court-circuités avec le socket de sortie en cours. """
if s_c[1] == num_socket_out:
o_socket_in = o_node.lo_sockets_in[s_c[0]]
if len(o_socket_in.get_gredges()) == 0:
""" Construction de l'edge supplémentaire, à condition que l'entrée soit libre. """
CtrlEdge(self.o_scene, o_socket_out, o_socket_in)
if self.o_mid_edge.__class__.__name__ == 'CtrlEdge':
""" Reset. """
self.normal_size_sockets() # Rétablit les tailles normales aux sockets.
self.o_scene.o_grscene.removeItem(self.o_mid_edge.o_gredge) # Effacement du mi_edge.
""" Drop = fin du drag. """
if self.is_dropable(ev):
gr_item = self.itemAt(ev.pos()) # UiSocket
""" Suppression d'un éventuel edge connecté à ce socket. """
for o_gredge in gr_item.o_socket.get_gredges():
break_edge(o_gredge.o_edge.o_socket_out) # "Dé-court-circuit" : remplace un edge par 2 edges.
self.o_scene.o_grscene.removeItem(o_gredge) # Effacement.
""" Création d'un edge complet. """
CtrlEdge(self.o_scene, self.o_mid_edge.o_socket_out, gr_item.o_socket)
gr_item.o_socket.to_update() # Paramètres des nodes en aval à updater.
""" Enregistrement. """
self.o_scene.save_edges(backup=True)
""" Destruction de l'objet. """
self.o_mid_edge = None
Cela permet de propager la baisse du drapeau b_updated_out
vers tous les sockets en aval.
signaux.py
et plot.py
pour prendre en compte les adaptations nécessaires./nodes/generateurs/signaux.py
:
from pc.ctrl_node import CtrlNode
d_datas = {
'name': 'Signaux',
'icon': 'signaux.png',
}
class Node(CtrlNode):
def __init__(self, o_scene, s_id, pos):
super().__init__(o_scene, s_id, pos)
self.type = 'Signaux'
self.setup()
def setup(self, child_file=__file__):
super().setup(child_file)
@property
def ld_outputs(self):
return [{
'label': 'Sortie',
'label_pos': (-38, -10)
}]
def fixed_params(self):
return {
'Sinus': True,
'Cosinus': False,
'Carré': False,
'Triangle': False,
'Dent de scie montante': False,
'Dent de scie descendante': False,
'Amplitude': 20,
'Points par période': 100,
}
def my_signals(self, l_signals_in, num_socket_out):
""" Voir documentation dans la classe-mère : CtrlNode.my_signals().
:param l_signals_in: Liste toujours vide car ce node n'a pas d'entrée.
:param num_socket_out: Ce node n'a qu'une sortie : N° 0.
:return: Liste de tuples. Pour chaque signal actif, un tuple de 4 valeurs.
"""
l_signals = list()
# à coder ...
return l_signals
def refresh(self, l_keys):
""" Propagation en cascade : Seulement sur les noms de signaux (type valeur = bool). """
if isinstance(self.get_param(l_keys[-1]), bool):
self.lo_sockets_out[0].to_update()
super().refresh(l_keys)
Vous devrez coder la méthode my_signals()
./nodes/afficheurs/plot.py
:
from pc.ctrl_node import CtrlNode
d_datas = {
'name': 'Plot',
'icon': 'plot.png',
}
class Node(CtrlNode):
def __init__(self, o_scene, s_id, pos):
super().__init__(o_scene, s_id, pos)
self.type = 'Plot'
self.setup()
def setup(self, child_file=__file__):
super().setup(child_file)
@property
def ld_inputs(self):
return [{ # Liste de 1 dictionnaire.
'label': 'Entrée',
'label_pos': (6, -10)
}]
def fixed_params(self):
return {
'Graphique': {
'Titre': 'titre du graphique',
}
}
def my_params(self, context):
""" Voir documentation dans la classe-mère : CtrlNode.my_params(). """
d_params = dict()
# à coder ...
return d_params
Vous devrez coder la méthode my_params()
.
CtrlSocket
et CtrlEdge
pour prendre en compte les adaptations nécessaires.ctrl_socket.py
:
from pc.ui_socket import UiSocket
class CtrlSocket:
def __init__(self, o_node, d_datas):
self.o_node = o_node
self.d_datas = d_datas # Dictionnaire. clés : label, label_pos, id, is_input
self.t_id = self.d_datas['id'] # (id_node: int, num_socket: int) <- tuple.
self.b_input = self.d_datas['pos'][0] # Position Hori. physique : Entrée (True) à gauche, Sortie à droite.
self.pos = self.d_datas['pos'][1] # Position Vert. physique, tient compte des séparateurs.
self.label = self.d_datas['label']
self.t_label_pos = self.d_datas['label_pos']
self.o_grsocket = None # Objet UiSocket.
self.color = '#bbb' # Gris clair pour les socket_in libres (non connectés).
self.b_updated_out = False
self._l_signals = list()
self.setup()
def setup(self):
if not self.b_input:
""" Socket de sortie. """
l_keys = ['Couleurs', self.label]
self.color = self.o_node.get_param(l_keys, '#666')
self.o_grsocket = UiSocket(self)
@property
def l_signals(self):
""" Si les signaux sont à jour, on ne fait rien. """
if not self.b_updated_out:
""" Si les signaux ne sont pas à jour, on les régénère. """
if self.b_input:
""" Socket d'entrée - code appelant : son propre CtrlNode. """
lo_gredges = list(self.get_gredges())
if not lo_gredges:
return []
""" On délègue la recherche des signaux à l'edge connecté. """
self._l_signals = lo_gredges[0].o_edge.l_signals # [0] : Une entrée reçoit un edge au maximum.
else:
""" Socket de sortie - codes appelant : Les CtrlEdge connectés à ce socket_out. """
self._l_signals = self.o_node.get_signals(num_socket=self.t_id[1])
self.b_updated_out = True
return self._l_signals
def to_update(self):
self.b_updated_out = False
if self.b_input: # Socket in
""" Mise à jour des paramètres dynamiques de son node. """
self.o_node.o_params.set_params()
""" Propagation vers tous les sockets de sortie de son propre node. """
for o_socket_out in self.o_node.lo_sockets_out:
o_socket_out.to_update()
else: # Socket out
""" Chaque edge de ce socket (sortie) propage la baisse du drapeau au socket (entrée) à l'autre bout. """
for o_gredge in self.get_gredges():
o_gredge.o_edge.o_socket_in.to_update()
def get_gredges(self):
""" Retourne un set contenant tous les edges connectés à ce socket.
Remarque :
- Un socket d'entrée peut être connecté à 0 ou 1 edge.
- Un socket de sortie peut être connecté à 0, 1 ou plusieurs edges.
"""
s_gredges = set()
for gredge in self.o_node.o_scene.o_grscene.items():
if gredge.__class__.__name__ == 'UiEdge':
""" Choix du socket accroché à la bonne extrémité du edge. """
o_socket = gredge.o_edge.o_socket_in if self.b_input else gredge.o_edge.o_socket_out
if o_socket == self:
s_gredges.add(gredge)
return s_gredges
@property
def position(self):
""" Les positions sont optimisées pour une grille de maille 16. """
h_title = self.o_node.d_display['geometry']['h_title']
first_y = 16*(1+(h_title+4)//16) # Multiple de 16
place_y = self.pos # num socket
y = first_y + place_y * 16
x = 0 if self.b_input else self.o_node.width
return x, y
ctrl_edge.py
:
from pc.ui_edge import UiEdge
class CtrlEdge:
def __init__(self, o_scene, o_socket_out, o_socket_in):
"""
:param o_scene: objet CtrlScene
:param o_socket_out: objet CtrlSocket
:param o_socket_in: objet CtrlSocket ou QPointF
"""
self.o_scene = o_scene
self.o_gredge = None
self.o_socket_out = o_socket_out # Attention ! risque de confusion : Socket FROM (de départ)
self.o_socket_in = o_socket_in # Attention ! risque de confusion : Socket TO (d'arrivée)
self.color = '#bbb'
self.setup()
def setup(self):
self.o_gredge = UiEdge(self) # Dessin de l'edge dans l'UI.
self.o_scene.o_grscene.addItem(self.o_gredge) # Ajout de l'edge dans la scène.
""" Couleurs, sauf si l'edge est en cours de construction. """
if self.o_socket_in.__class__.__name__ == 'CtrlSocket':
self.color = self.o_socket_out.color
self.o_socket_in.color = self.color # On impose sa couleur au socket_in d'arrivée.
def get_tid(self):
""" Renvoie un tuple de tuples. Exemple : (90, 0), (91, 4) """
return self.o_socket_out.t_id, self.o_socket_in.t_id
@property
def l_signals(self):
return self.o_socket_out.l_signals
@property
def end_points(self):
""" - Renvoie un tuple : les points de départ et d'arrivée.
- Les coordonnées d'un socket sont relatives à celles de son node.
- Il faut donc les additionner pour obtenir les coordonnées absolues."""
""" Point de départ. """
pos_node_from = self.o_socket_out.o_node.o_grnode.pos() # type QPointF
pos_socket_out = self.o_socket_out.o_grsocket.pos() # type QPointF
""" Point d'arrivée. """
if self.o_socket_in.__class__.__name__ == 'QPointF':
""" Edge en construction : pos_to = position de la souris. """
pos_to = self.o_socket_in # type QPointF
else:
pos_node_to = self.o_socket_in.o_node.o_grnode.pos() # type QPointF
pos_socket_in = self.o_socket_in.o_grsocket.pos() # type QPointF
pos_to = pos_node_to + pos_socket_in
""" Les QPointF peuvent s'additionner. """
return pos_node_from + pos_socket_out, pos_to # Tuple.
CtrlNode
: De nouvelles méthodes sont ajoutées.deepcopy()
: Utilitaire, permettant une copie profonde des variables par référence (listes, dictionnaires, ...)get_signals()
: Fournit les signaux de sortie à partir des signaux d'entrée et des paramètres. Voir figure "Représentation généralisée d'un node".join()
: Utilitaire. Permet une concaténation aisée d'éléments (str
, int
, ...), en ignorant les chaines.vides.@property ld_inputs()
: Remplace l'ancien attribut ld_inputs
.@property ld_outputs()
: Remplace l'ancien attribut ld_outputs
.my_params()
: Doit obligatoirement être surchargée par les classes dérivées.my_signals()
: Doit obligatoirement être surchargée par les classes dérivées.fichier ctrl_node.py
:
from ui_node import UiNode
from functions.utils import Dictionary, DateTime, Utils
from parameters import Parameters
from ctrl_socket import CtrlSocket
from ctrl_edge import CtrlEdge
import copy
import os
class CtrlNode:
def __init__(self, o_scene, s_id, pos):
self.o_scene = o_scene
self.s_id = s_id # Clé d'entrée dans le super-dictionnaire pkl.
self.od_pkl = self.o_scene.o_pkl.od_pkl
self.o_grnode = None
self.o_params = None
self.pos = pos # pos = (x, y)
self.width = 96
self.height = 60
self.type = ''
self.ut = Utils()
self.main_key = f"Paramètres du node '{self.s_id}'"
self.b_chk = self.od_pkl.read([self.s_id, 'b_chk'], True)
self.b_on = self.b_chk # Provisoire.
self.child_file = ''
""" Sockets. """
self.lo_sockets_in = list() # Liste d'objets 'sockets in' instanciés.
self.lo_sockets_out = list() # Liste d'objets 'sockets out' instanciés.
self.default_color = '#88bbff' # Couleur par défaut des sockets de sortie.
""" Secousses => Court-circuits. """
self.dt = DateTime()
self.l_short_circuits = list() # Surchargé par les classes dérivées.
self.l_shortables = list()
self.b_removable = True # Les edges d'entrée peuvent être détruits en cas de court-circuit.
self.k_shake = 0 # Nombre de secousses.
self.b_shaking = False # Secousse en cours.
self.x, self.y = 0, 0 # Position antérieure (x, y), mémorisée.
self.dx, self.dy = 0, 0 # Deltas (variations) antérieurs, mémorisés.
def setup(self, child_file):
""" Code appelant : classe dérivée, on connait donc ses attributs. """
""" Paramètres. """
self.o_params = Parameters(self)
""" Hauteur automatique du node : elle dépend du nombre de sockets en entrée ou en sortie (le plus grand). """
h_title = self.d_display['geometry']['h_title']
nb_sockets_max = max(len(self.ld_inputs), len(self.ld_outputs)) # En fait : Sockets + séparateurs.
first_y = 16 * (1 + (h_title + 4) // 16) # Multiple de 16.
self.height = first_y + nb_sockets_max * 16 - 4
""" Nouveau node par drag & drop. """
self.child_file = child_file
b_new = self.pos != (0, 0) # Nouveau node (node dropé).
if b_new:
""" - Cas d'un nouveau node (dropé).
- Son centre apparaît exactement à la position du curseur de la souris.
- On mémorise son path et sa position. """
self.pos = self.pos[0] - self.width // 2, self.pos[1] - self.height // 2
path = 'nodes' + os.path.relpath(child_file).split('nodes')[1][:-3].replace(os.sep, '.')
self.od_pkl.write([self.s_id, 'path'], path)
else:
self.pos = self.od_pkl.read([self.s_id, 'position'], (0, 0))
""" Instanciation de l'UI. """
self.o_grnode = UiNode(self) # Gestion de l'UI (Interface utilisateur) : dessin couleurs, ...
if b_new:
self.o_grnode.save_pos() # Important ! à la fin (contient o_pkl.backup()).
self.o_scene.o_grscene.addItem(self.o_grnode) # Incorporation dans la scène.
""" Sockets. """
num_socket = 0
for i, d_input in enumerate(self.ld_inputs):
""" On complète les dictionnaires de base. """
if isinstance(d_input, dict): # Si ce n'est pas un dictionnaire, c'est un séparateur.
d_input['pos'] = True, i # On ajoute l'emplacement. Tuple (input: True, num_position).
d_input['id'] = self.id, num_socket # On ajoute l'identifiant. Tuple (id_node, num_socket)
self.lo_sockets_in.append(CtrlSocket(self, d_input)) # Socket instancié et ajouté à la liste.
num_socket += 1
num_socket = 0
for i, d_output in enumerate(self.ld_outputs):
""" On complète les dictionnaires de base. """
if isinstance(d_output, dict): # Si ce n'est pas un dictionnaire, c'est un séparateur.
d_output['pos'] = False, i # On ajoute l'emplacement. Tuple (input: False, num_position).
d_output['id'] = self.id, num_socket # On ajoute l'identifiant. Tuple (id_node, num_socket)
self.lo_sockets_out.append(CtrlSocket(self, d_output)) # Socket instancié et ajouté à la liste.
num_socket += 1
self.short_circuit_check()
@property
def id(self):
""" Ex : 'Node 14' renvoie 14. """
return int(self.s_id[4:])
@property
def d_display(self):
""" Attributs par défaut pouvant être surchargés dans les classes dérivées. """
return {
'geometry': {'h_title': 32, 'round': 6},
'col_pen': {'select': '#ffa637', 'hover': '#888', 'on': "#000", 'off': '#aaaa00'}, # ordre = priorité.
'col_brush': {'on': "#8888bbcc", 'off': '#cccccccc'},
'thick_pen': {'hover': 4., 'leave': 2.}
}
@staticmethod
def deepcopy(reference):
return copy.deepcopy(reference)
def get_default(self):
""" 1) Paramètres communs : Titre, et autant de couleurs que de sorties. """
d_default = {'Titre du node': 'Choisir un titre'}
d_colors = dict()
for d_output in self.ld_outputs:
if isinstance(d_output, dict):
d_colors[d_output['label']] = self.default_color
if d_colors:
d_default['Couleurs'] = d_colors
""" 2) Ajout de paramètres fixes spécifiques au type de node. """
d_default.update(self.fixed_params()) # méthode fixed_params()
""" 3) Ajout des paramètres dynamiques, qui dépendent des signaux entrants. """
d_default.update(self.dynamic_params) # propriété dynamic_params
""" Dictionnaire complet. Paramètres : fixes communs + fixes spécifiques + dynamiques. """
return {self.main_key: d_default}
def get_param(self, l_keys, v_default=None):
od_params = Dictionary(self.o_params.od_params[self.main_key])
return od_params.read(l_keys, v_default)
def set_checked(self, val):
self.b_chk = val
self.b_on = self.b_chk # Provisoire.
""" Enregistrement. """
self.od_pkl.write([self.s_id, 'b_chk'], val)
self.o_scene.o_pkl.backup()
@staticmethod
def join(*l_variants, sep='-'):
"""
Concatène les éléments de l_strings en une chaine. Les chaînes vides sont ignorées. le séparateur est sep.
:param l_variants: Nombre quelconque d'éléments de tout type.
:param sep: '-' par défaut. Peut être '' (vide) ou \n (retour à la ligne) ... ou autre.
:return: Chaîne concaténée.
Exemple l_strings = 'Paris', 'Lille', '', 'Bruxelles', 'Prague' => return "Paris-Lille-Bruxelles-Prague"
"""
string_txt = ''
for string in l_variants:
if string != '':
string_txt += f'{sep}{string}' # Accepte tout type de variable pour string.
return string_txt[len(sep):] # Suppression du 1er sep.
""" ************************* Méthodes à surcharger ************************** """
@property
def ld_inputs(self):
return list()
@property
def ld_outputs(self):
return list()
def fixed_params(self):
""" - Renvoie un dictionnaire de paramètres, indépendants des connexions.
- Exemple : le node de type 'Signaux' renvoie 'Sinus', 'Cosinus', etc. """
return {}
def my_params(self, context):
"""
:param context: Liste de 7 valeurs, pouvant être utilisée pour construire le dictionnaire de sortie :
- context[0] typ_id_from = Type + id du node en amont, producteur du signal. Ex : 'MM12'.
- context[1] signal_ante_from = Nom du signal créé par le node en amont du node producteur. Ex : 'Cosinus'.
- context[2] signal_now_from = Nom du signal créé par le node en amont. Ex : 'SMA18'.
- context[3] signal_source_from = 'Provenance du signal' affiché dans le node en amont.
- context[4] num_signal = Ordre du signal dans le faisceau entrant.
- context[5] signal_title = Titre du signal dans le dockable des paramètres.
- context[6] signal_source = 'Provenance du signal' -> texte avec retours à la ligne.
:return: Dictionnaire formaté pour les paramètres.
|_ Exemple {'Type de MA': ['SMA', {'values': ['SMA', 'EMA', 'SMMA', 'LWMA']}], "Périodes (sep=',')": '14'}
"""
raise SystemExit(self.child_file + '\nCtrlNode.my_params() : Cette méthode doit être surchargée.')
def my_signals(self, l_signals_in, num_socket_out):
""" Description des signaux délivrés à la sortie N° num_socket_out.
Comment ça marche ?
- Cette fonction prend en entrée tous les signaux entrants ainsi que les paramètres pkl.
- Un traitement spécifique est décrit, qui produit plusieurs signaux.
- Ces signaux sont ensuite distribués aux différentes sorties du node.
:param l_signals_in: Signaux d'entrée. Cette liste contient autant d'éléments que d'entrées connectées.
- Chaque élément est un tuple : (typ_id_from, signal_ante_from, signal_now_from, signal_source_from, \
num_signal, signal_title, typ_id, signal_ante, signal_source)
(Les variables suffixées de '_from' proviennent du node en amont).
:param num_socket_out: N° de la sortie.
:return: Liste de signaux exclusivement destinés à la sortie N° num_socket_out.
Chaque signal est décrit par un tuple à 4 valeurs : (typ_id, signal_ante, signal_now, signal_source).
"""
raise SystemExit(self.child_file + '\nCtrlNode.my_signals() : Cette méthode doit être surchargée.')
""" *********************** Fin - Méthodes à surcharger ********************** """
@property
def dynamic_params(self):
""" Code appelant : self.get_default().
Ces paramètres sont dynamiques car ils dépendent des signaux connectés aux entrées.
- Ce node demande la liste des signaux à chaque socket d'entrée : o_socket_in.l_signals
- Cette liste est formatée de la même façon pour tous les types de node.
- C'est une liste de tuples, chaque tuple a 4 valeurs et caractérise un signal.
- Il est rappelé qu'un edge véhicule non pas un signal, mais un faisceau de signaux (un vecteur).
- Le format d'un tuple est le suivant :
- typ_id_from = Type + id du node en amont, producteur du signal. Ex : 'MM12'.
- signal_ante_from = Nom du signal créé par le node en amont du node producteur. Ex : 'Cosinus'.
- signal_now_from = Nom du signal créé par le node producteur. Ex : 'SMA18'.
- signal_source_from = Texte destiné à être affiché dans le dockable, dans 'Provenance du signal :'
- Node
|_ Socket_in
|_ Faisceau de signaux
|_ Signal
|_ Paramètre dynamique.
"""
if not self.lo_sockets_in:
""" Tant que l'initialisation n'est pas terminée, lo_sockets_in est une liste vide."""
return {}
d_params = dict()
# à coder ...
return d_params
def get_signals(self, num_socket):
""" Code appelant : oSocket_out N° num_socket.
- Ce node crée des signaux et les fournit aux nodes en aval par l'intermédiaire des sockets et des edges.
- Il crée autant de faisceaux de signaux (vecteurs) qu'il a de sorties.
- Pour chaque sortie, la liste fournie est formatée de la même façon pour tout type de node.
- Voir détails ci-dessus, dans dynamic_params().
"""
if not self.lo_sockets_out: # Setup en cours
return []
""" Update od_params. """
self.o_params.set_params()
nb_inputs = len(self.lo_sockets_in)
l_all_signals_in = list()
# à coder ...
return self.my_signals(l_all_signals_in, num_socket)
""" ******************************* Secousses ******************************* """
def short_circuit_check(self):
""" Passage unique dans cette méthode, au démarrage. Fin programme si erreur.
- Peut-on supprimer les edges d'entrée en cas de court-circuit ?
- Oui, à condition que TOUTES les sorties existent dans self.l_short_circuits. """
l_indx_outputs = list() # Liste des index de sortie.
d_counter = dict()
for short_circuit in self.l_short_circuits: # Exemple -> self.l_short_circuits = [(0, 0), (0, 2)]
num_input = short_circuit[0]
num_output = short_circuit[1]
if num_input >= len(self.ld_inputs):
""" Fin programme. """
raise SystemExit(f"La liste des court-circuits du type de node '{self.type}' est erronée."
f"\nL'entrée N°{num_input} est hors limites."
f"\nRevoir le contenu de self.l_short_circuits dans le setup de '{self.type}'.")
if num_output >= len(self.ld_outputs):
""" Fin programme. """
raise SystemExit(f"La liste des court-circuits du type de node '{self.type}' est erronée."
f"\nLa sortie N°{num_output} est hors limites."
f"\nRevoir le contenu de self.l_short_circuits dans le setup de '{self.type}'.")
l_indx_outputs.append(num_output) # Dans cet exemple -> ( 0 , 2 )
if num_output in d_counter:
d_counter[num_output] += 1
else:
d_counter[num_output] = 1
for i, d_output in enumerate(self.ld_outputs):
if isinstance(d_output, dict):
if i not in l_indx_outputs:
""" Condition non satisfaite. """
self.b_removable = False # b_removable est à True dans __init__().
break
""" Contrainte technologique : Plusieurs entrées sur une même sortie est interdit ! """
for nb_inputs in d_counter.values():
if nb_inputs > 1:
""" Fin programme. """
raise SystemExit("Plusieurs entrées sont court-circuitables sur une même sortie."
f"\nRevoir le contenu de self.l_short_circuits dans le setup de '{self.type}'.")
def set_shortables(self):
""" Méthode appelée lorsque le bouton gauche de la souris est appuyé (sur ce node).
- Chaque type de node possède sa liste de court-circuits faisables.
- Retourne une liste de tuples.
- Chaque tuple est composé des 2 gredges pouvant être réunis pour n'en faire qu'un. """
self.l_shortables = list()
for short_circuit in self.l_short_circuits: # Exemple -> self.l_short_circuits = [(0, 0), (0, 2)]
lo_gredges_in = list(self.lo_sockets_in[short_circuit[0]].get_gredges())
lo_gredges_out = list(self.lo_sockets_out[short_circuit[1]].get_gredges())
if lo_gredges_in: # Cette liste contient 0 ou 1 edge.
for o_gredge_out in lo_gredges_out:
self.l_shortables.append((lo_gredges_in[0], o_gredge_out))
def shake(self):
def shortcircuit():
for to_gredge in self.l_shortables: # Tuples de gredges.
o_socket_out = to_gredge[0].o_edge.o_socket_out # From, depuis.
o_socket_in = to_gredge[1].o_edge.o_socket_in # To, jusqu'à.
""" Suppression des 2 edges du tuple. """
self.o_scene.o_grscene.removeItem(to_gredge[1]) # Edge de sortie.
if self.b_removable:
self.o_scene.o_grscene.removeItem(to_gredge[0]) # Edge d'entrée.
""" Création du nouvel edge de court-circuit. """
CtrlEdge(self.o_scene, o_socket_out, o_socket_in)
o_socket_in.to_update()
""" Persistance. """
self.o_scene.save_edges(backup=True)
def raz_shakes():
""" Après cette raz, un nouveau court-circuit est autorisé. """
self.k_shake = 0
self.b_shaking = False
if not self.l_shortables:
return
self.dt.delay(raz_shakes, delay=100)
x, y = self.o_grnode.pos().x(), self.o_grnode.pos().y() # Positions.
dx, dy = x - self.x, y - self.y # Déplacements (delta x, delta y).
if (dx * self.dx < 0) or (dy * self.dy < 0):
""" Le mouvement a changé de sens => incrémentation du compteur. """
self.k_shake += 1
if self.k_shake > 5: # Valeur à ajuster.
shortcircuit()
""" Mémorisation. """
self.x, self.y = x, y
self.dx, self.dy = dx, dy
""" ***************************** Fin secousses ***************************** """
def set_colors(self, l_keys):
""" - La couleur est appliquée au socket_out concerné.
- Elle est également appliquée à tous ses éventuels edges connectés.
- A leur tour, ceux-ci propagent cette couleur à leur socket d'arrivée. """
for o_socket_out in self.lo_sockets_out:
if o_socket_out.__class__.__name__ == 'CtrlSocket':
if o_socket_out.label == l_keys[-1]:
""" Affectation de la couleur au socket_out concerné. """
o_socket_out.color = self.get_param(l_keys, '#666')
""" Récupération de tous les edgess partant de ce socket. """
l_gredges = list(o_socket_out.get_gredges())
for o_gredge in l_gredges:
""" Affectation de la même couleur à chaque edge. """
o_gredge.o_edge.color = o_socket_out.color
""" Affectation de la même couleur à chaque socket d'arrivée. """
o_gredge.o_edge.o_socket_in.color = o_socket_out.color
""" Rafraîchissement de l'affichage. """
self.o_grnode.update()
def refresh(self, l_keys):
""" Titre du node. """
if l_keys[-1] == 'Titre du node':
self.o_grnode.set_title()
""" Couleur des sockets et des edges. """
if l_keys[-2] == 'Couleurs':
self.set_colors(l_keys[1:])
Vous devrez coder les méthodes dynamic_params()
et get_signals()
.
Vérification
Plot
connecté à un node Signaux
:
Bon coding et bon courage !
Snippets
Bonjour les codeurs !