Programmation avancée. Suppression des limites.
Avant-propos
DataFrames
, Series
et opérations diverses sur ces structures.Description
Dans ce tuto, nous allons contruire un outil pour la mise au point de stratégies de trading.
Jaune = acheter - Rose = vendre.
self.od
./show/
./show/show_geek.py
:
""" Version 2022-01-07 """
# Imports externes.
from pandas_ta.custom import create_dir, import_dir # https://github.com/twopirllc/pandas-ta
import numpy as np
import seaborn as sns
from matplotlib import pyplot as plt
from matplotlib.ticker import FormatStrFormatter
import matplotlib.animation
import os
# Imports internes.
from functions.utils import Dictionary
class ShowGeek:
def __init__(self, **args):
self.od = Dictionary(self.get_args(args))
self.fig = plt.figure(1)
self.df_pilot = None
self.df_scats = None
self.nb_datas = 0
self.l_ax = None
""" Animation. """
self.b_anim = True
self.abscissa_size = 0
self.pointer = 0
self.b_paused = False
self.b_reverse = False
self.magn = 1 # Appui sur les flèches : Le pas d'avancement est : 1, 10 ou 100.
""" ****************** Algorithme de construction ****************** """
""" 1 - Indicateurs personnalisés. """
self.custom_ta() # Analyse technique et indicateurs personnalisés.
""" 2 - Choix du pilote : Tick, Renko ou Candle. """
self.get_df_pilot()
""" 3 - Ajout des indicateurs dans la dataframe {df_pilot} <-- Colonnes ajoutées à {df_pilot}. """
self.add_indics()
""" 4 - Ajout du signal pilote. """
self.add_pilot()
""" 5 - Paramètres avant la création des graphiques (axes). """
self.pre_params()
""" 6 - Distribution des signaux (des colonnes) : un dataframe par axe dans {self.l_ax}. """
self.distrib()
""" 7 - Création des graphiques (axes). """
self.build_axis()
""" 8 - Paramètres après la création des graphiques (axes). """
self.post_params()
""" 9 - Affichage animé. """
self.show()
def animate(self, _=''):
""" Appelé dans la boucle matplotlib.FuncAnimation depuis self.show(). """
if self.b_paused:
return
""" Gestion du pointeur. """
self.set_pointer()
""" Abscisse commune : liste de {abscissa_size} valeurs depuis {pointer}. """
x = list(range(self.pointer, self.pointer + self.abscissa_size))
""" Parcours des graphiques (axes). """
for axis_name in self.od.keys():
if axis_name == 'figure':
continue
o_ax = self.od[axis_name]['o_ax']
df = self.od.read([axis_name, 'df'])
if df is None or df.empty:
o_ax.set_ylim(-10, 10)
continue
""" Parcours des courbes (lines). """
y_min, y_max = 10 ** 6, -10 ** 6
l_hook = [axis_name, x, o_ax, df]
for o_line in o_ax.lines:
line_name = o_line.get_label()
if line_name.startswith('_line'):
continue
serie = df[line_name]
y = serie[self.pointer: self.pointer + self.abscissa_size]
""" Égalisation des tailles de x et y. """
len_min = min(len(x), len(y))
x, y = x[:len_min], y[:len_min]
""" Hook pour l'injection de paramètres dans cette courbe. """
d_attr = self.hook_line_anim(*l_hook, o_line)
if d_attr is None:
d_attr = {}
""" Injection des datas à afficher. """
if d_attr.get('visible', True):
o_line.set_ydata(y)
o_line.set_xdata(x)
if not np.isnan(y).all():
y_min = min(y_min, np.nanmin(y))
y_max = max(y_max, np.nanmax(y))
""" Calcul des limites des ordonnées (y). """
if y_max < y_min:
y_min, y_max = -10, 10
y_padding = max(0, (y_max - y_min) * 0.05) # Marges top et bottom dans les axes (5%)
y_min, y_max = (round(y_min - y_padding, 6), round(y_max + y_padding, 6)) if y_max > y_min else (-10, 10)
y_min = max(y_min, -10**5)
y_max = min(y_max, 10**5)
""" Limites x et y. """
o_ax.set_xlim(x[0], x[-1])
o_ax.set_ylim(y_min, y_max)
self.hook_axis_anim(*l_hook, y_min, y_max)
if not self.b_anim:
plt.draw()
def set_pointer(self):
if self.b_reverse:
self.pointer -= self.magn
if self.pointer < 0:
self.pointer = self.nb_datas - self.abscissa_size
else:
self.pointer += self.magn
if self.pointer > self.nb_datas - self.abscissa_size:
self.pointer = 0
def add_pilot(self):
""" Affichage conditionnel du signal-pilote. """
self.od.write(['Principal', 'signals', 'Close', 'visible'], self.od.read(['figure', 'show_pilote'], False))
self.add_df('Principal', ['Close'])
def pre_params(self):
""" Fenêtre : taille, position, titre. """
mgr = plt.get_current_fig_manager()
mgr.set_window_title(self.od.read(['figure', 'window_title'], 'Stratégie'))
win = mgr.window
win.setGeometry(*self.od.read(['figure', 'geometry']))
""" Paramètres seaborn : https://www.python-simple.com/python-seaborn/seaborn-general.php """
sns.set_context(self.od.read(['figure', 'seaborn_context'], 'paper')) # paper, notebook, talk, poster
sns.set_style(self.od.read(['figure', 'seaborn_style'], 'darkgrid')) # white, dark, whitegrid, darkgrid, ticks
""" Ini attributs. """
self.abscissa_size = self.od.read(['figure', 'abscissa_size'], 600)
self.b_anim = self.od.read(['figure', 'animation'], False)
""" Affichage des best-zones dans le graphique (subplot) contenant le signal 'Close'. """
subplot_name = self.od.read(['figure', 'best_zones', 'subplot'])
if self.od.read(['figure', 'best_zones', 'show'], False) and subplot_name:
""" Appel de l'indicateur de trend {best_zones}. """
gap = self.od.read(['figure', 'best_zones', 'gap'], 24)
bandwidth = self.od.read(['figure', 'best_zones', 'bandwidth'], 80)
self.df_pilot[['Up', 'Down', 'Peak']], self.df_scats = self.df_pilot.ta.best_zones(gap=gap,
bandwidth=bandwidth)
""" Affichage des palliers haut et bas. """
l_columns = list()
if self.od.read(['figure', 'best_zones', 'up'], False):
l_columns.append('Up')
if self.od.read(['figure', 'best_zones', 'down'], False):
l_columns.append('Down')
self.add_df(subplot_name, l_columns)
def post_params(self):
""" Recherche des paramètres dans le dictionnaire. """
""" Titre des graphiques. """
axis_title = self.od.read(['figure', 'figure_title'])
if isinstance(axis_title, str) and len(axis_title) > 0:
o_ax = self.get_first_axis()
if self.is_axis(o_ax):
o_ax.set_title(axis_title)
""" y axis : position et visibilité des graduations (ticks). """
for l_keys in self.od.key_list():
if l_keys[-1] != 'y_ticks':
continue
position = self.od.read(l_keys)
o_ax = self.od.read([l_keys[0], 'o_ax'])
if not self.is_axis(o_ax):
continue
if position is False:
o_ax.axes.set_yticks([]) # Suppression (ticks + ticklabels + grille).
pass
else:
o_ax.yaxis.set_ticks_position(position) # left, right
""" Suppression de la grille. """
for l_keys in self.od.key_list():
if l_keys[-1] != 'grid':
continue
if self.od.read(l_keys) is False:
o_ax = self.od.read([l_keys[0], 'o_ax'])
o_ax.grid(False) # Suppression de la grille.
""" x axis : visibilité des graduations (ticks) """
for l_keys in self.od.key_list():
if l_keys[-1] != 'x_ticks':
continue
o_ax = self.od.read([l_keys[0], 'o_ax'])
if not self.is_axis(o_ax):
continue
if self.od.read(l_keys) is False:
o_ax.tick_params(axis='x', colors='#fff0') # '#fff0' = invisible
""" Légendes. """
for l_keys in self.od.key_list():
if l_keys[-1] != 'legend':
continue
o_ax = self.od.read([l_keys[0], 'o_ax'])
legend = self.od.read(l_keys)
pos = ['auto', 'rh', 'lh', 'lb', 'rb', 'rm', 'lm', 'rm', 'cb', 'ch', 'cm']
if o_ax.lines:
if isinstance(legend, bool):
o_ax.legend().set_visible(legend) # Visibilité de la légende.
if legend in pos:
# o_ax.legend().set_title(l_keys[0])
# o_ax.legend(loc=pos.index(legend))
o_ax.legend(loc=pos.index(legend)).set_title(l_keys[0])
def build_axis(self):
""" Construction de la fenêtre Windows. """
""" Marges générales et répartition des subplots. """
l_margins = self.od.read(['figure', 'margins'], (6, 8, 10, 8)) # Marges : haut, droite, bas, gauche.
x, w = l_margins[3] / 100, 1 - (l_margins[1] + l_margins[3]) / 100
y_offset, y_height = l_margins[0], 100 - l_margins[0] - l_margins[2] # Valeurs : 0 à 100.
ax_top, y = 100, 1 - y_offset / 100
first_axis, axis_ante = '', ''
for axis_name, h in self.od.read(['figure', 'subplots']).items():
o_ax = None
if isinstance(h, (int, float)):
if first_axis == '':
first_axis = axis_name
else:
self.od.write([axis_ante, 'x_ticks'], False)
axis_ante = axis_name
h = h * y_height / 10_000
y -= h
h *= 1.04 # Ajustement empirique.
o_ax = self.fig.add_axes((x, y, w, h), sharex=self.od.read([first_axis, 'o_ax']))
self.od.write([axis_name, 'geometry'], (x, y, w, h))
else:
""" Graphique jumelé (twined axis). """
tw = h.split()
if tw[0] == 'twinned':
""" Axes jumelés. """
parent_name = tw[-1]
parent_axis = self.od.read([parent_name, 'o_ax'])
if self.is_axis(parent_axis):
o_ax = parent_axis.twinx()
if self.is_axis(o_ax):
self.od.write([axis_name, 'o_ax'], o_ax)
""" Ajout artificiel d'attributs vides. """
o_ax.filled = list()
o_ax.note_x = o_ax.annotate('', (0, 0))
o_ax.note_y = o_ax.annotate('', (0, 0))
""" Repères : lignes hortogonales sous le curseur de la souris. Voir on_mouse_move() """
o_ax.hline = o_ax.axhline(y=-1000, color='k', lw=.2, ls='--') # -1000 = Souris hors figure.
o_ax.vline = o_ax.axvline(x=-1000, color='k', lw=.2, ls='--')
""" Format des graduations y. """
o_ax.yaxis.set_major_formatter(FormatStrFormatter('%.1f'))
""" Initialisation des courbes (DataFrames) dans les graphiques (axes). """
df = self.od.read([axis_name, 'df'])
if axis_name == self.od.read(['figure', 'best_zones', 'subplot']) and self.is_df(self.df_scats):
if self.od.read(['figure', 'best_zones', 'scatters'], False):
sns.scatterplot(data=self.df_scats, x='indx', y='close', hue=axis_name, ax=o_ax)
if self.od.read(['figure', 'best_zones', 'zig_zag'], False):
sns.lineplot(data=self.df_scats, x='indx', y='close', ax=o_ax, lw=.5)
o_ax.set_ylabel('')
""" Lines et titre des légendes. """
if self.is_df(df) and len(df.columns) > 0:
sns.lineplot(data=df[:1], ax=o_ax)
o_ax.legend().set_title(axis_name)
""" Position (left, right) des graduations y. """
b_odd = False
l_axes = self.get_axis_names()
if self.od.read(['Volume', 'twined_axis']) == 'Principal':
b_odd = bool(l_axes.index('Principal') % 2) # Position impaire de 'Principal'.
for indx, axis_name in enumerate(l_axes):
self.od.write([axis_name, 'y_ticks'], 'left' if b_odd == indx % 2 else 'right')
""" Coloriage vertical conditionnel des zônes d'entrée en position (ouverture de trade). """
b_zones = self.od.read(['figure', 'best_zones', 'show'], False)
colors = self.od.read(['figure', 'best_zones', 'colors'])
if b_zones and colors is not None:
peaks = self.df_pilot['Peak']
fc_b, fc_t = colors
l_args = [
dict(x=peaks.index, y1=10**5, y2=-10**5, where=peaks <= -1, fc=fc_b, ec='#fff0', interpolate=True),
dict(x=peaks.index, y1=10**5, y2=-10**5, where=peaks >= 1, fc=fc_t, ec='#fff0', interpolate=True),
]
for ax_name in l_axes:
oax = self.o_ax(ax_name)
if self.is_axis(oax):
self.fill_between(oax, l_args)
@staticmethod
def get_args(args):
""" Calcul des hauteurs (en %) subplots. La somme fait 100%. """
if not isinstance(args['subplots'], dict):
raise SystemExit("Super-dictionnaire : La clé 'subplots' doit exister et doit contenir un dictionnaire. ")
l_heights, sum_heights, nb_zeros = list(), 0, 0
for name, pc_height in args['subplots'].items():
height = pc_height if (isinstance(pc_height, int) and pc_height > 0) else 0
nb_zeros += 1 if height == 0 else 0
sum_heights += height
l_heights.append((name, height))
if sum_heights > 100:
""" La somme des hauteurs dépasse 100% : on effectue une réduction proportionnelle. """
l_heights = [(name, pc_height) for (name, pc_height) in l_heights if pc_height > 0] # Suppression des 0.
l_heights = [(name, pc_height * 100 / sum_heights) for (name, pc_height) in l_heights] # Normalisation.
else:
""" Si la somme == 100%, on ne fait rien. Si elle est < 100%, on ajoute un subplot, nommé 'rest'. """
if nb_zeros == 0:
l_heights.append(('rest', 0))
nb_zeros = 1
h_rest = (100 - sum_heights) / nb_zeros
if h_rest == 0:
l_heights = [(name, pc_height) for (name, pc_height) in l_heights if pc_height > 0] # Suppression des 0
for i, (name, pc_height) in enumerate(l_heights):
if pc_height == 0:
l_heights[i] = (name, h_rest)
args['subplots'] = dict(l_heights)
""" Ajout d'arguments par défaut. """
if 'seaborn_context' not in args: # Valeurs possibles : paper, notebook, talk, poster
args['seaborn_context'] = 'paper'
if 'seaborn_style' not in args: # Valeurs possibles : white, dark, whitegrid, darkgrid, ticks
args['seaborn_style'] = 'darkgrid'
return {'figure': args}
def add_df(self, subplot_name, l_columns, df=None):
""" Création du dataframe qui sera associé à un graphique.
- Méthode appelée par la méthode distrib() de la classe dérivée.
- Le DataFrame {df} a plusieurs colonnes, chacune associée à une courbe. """
""" Vérification. """
if self.od.read(['figure', 'subplots', subplot_name]) is None:
raise SystemExit(f"Erreur dans self.distrib() : Le subplot {subplot_name} n'existe pas.")
l_cols = list()
for column in l_columns:
""" Suppression des colonnes inexistantes. """
try:
self.df_pilot[column] if df is None else df[column]
except KeyError:
""" Si la colonne n'existe pas ... """
print(f"La colonne '{column}' n'existe pas dans le DataFrame.")
continue
l_cols.append(column)
""" Fusion par ajout de colonnes dans le df existant. """
df = self.df_pilot[l_cols] if df is None else df[l_cols]
df_exist = self.od.read([subplot_name, 'df'])
if df_exist is not None:
df_exist[df.columns] = df
df = df_exist # On retrouve les colonnes de df_exist en premières places.
if not self.is_df(df):
return
self.od.write([subplot_name, 'df'], df)
for column in df.columns:
if column.lower() == 'close':
""" Si ce dataframe a une colonne 'Close', on paramètre 3 traitements distincts par défaut :
- Une étiquette-suiveuse (linked_label).
- Un graphique jumelé (twined_axis) contenant l'affichage du volume en semi-transparence.
- Éléments du graphique jumelé non affichés : y_ticks, y_ticks_labels, grille, légende. """
self.od.write(['figure', 'best_zones', 'subplot'], subplot_name)
if self.od.read(['figure', 'show_linked_label'], False):
self.od.write([subplot_name, 'linked_label'], 'Close')
if self.od.read(['figure', 'show_volume'], False):
self.od.write(['figure', 'subplots', 'Volume'], f'twinned with {subplot_name}')
self.od.write(['Volume', 'df'], self.df_pilot[['Volume']])
self.od.write(['Volume', 'signals', 'Volume', 'visible'], False)
self.od.write(['Volume', 'twined_axis'], subplot_name)
self.od.write(['Volume', 'y_ticks'], False), # Suppression (ticks + ticklabels + grille).
self.od.write(['Volume', 'legend'], False) # Suppression de la légende.
def get_df_pilot(self):
instrument, table_name, self.df_pilot = self.get_pilot()
self.nb_datas = self.df_pilot.shape[0]
if self.nb_datas < 2_000:
s = 's' if self.df_pilot.shape[0] > 1 else ''
raise SystemExit(f'{instrument}-{table_name} : {self.nb_datas} point{s}. Nombre de points insuffisant.')
self.od.write(['figure', 'instrument'], instrument)
self.od.write(['figure', 'table_name'], table_name)
self.od.write(['figure', 'nb_points'], self.nb_datas)
@staticmethod
def custom_ta():
""" Indicateurs d'analyse technique personnalisés (dans /functions/custom_ta). """
path = os.path.abspath(__file__)
ta_dir = os.path.join(path.split(os.path.join('', 'show', ''))[0], 'functions', 'custom_ta')
create_dir(ta_dir) # Création automatique de 9 sous-dossiers (peut être supprimé après le 1er passage).
import_dir(ta_dir, verbose=False)
def linked_label(self, axis_name, x, o_ax, df):
""" https://matplotlib.org/stable/tutorials/text/annotations.html#annotating-with-text-with-box """
linked_label = self.od.read([axis_name, 'linked_label'])
if linked_label:
bbox = {'boxstyle': 'larrow', 'fc': '#ff02', 'ec': 'k', 'lw': .5} # fc=face color, ec=edge color
indx = min(x[-1], df.shape[0] - 1)
last_value = round(df[linked_label][indx], 2) # En pips.
""" Affichage de l'étiquette actuelle (m = marge). """
o_ax.note_y.remove() # Effacement de l'étiquette précédente.
m = self.abscissa_size / 70
o_ax.note_y = o_ax.annotate(last_value, (x[-1], last_value), xytext=(x[-1] + m, last_value), bbox=bbox)
def paint_volume(self, x, o_ax, df):
y = df['Volume'][x[0]: x[-1] + 1] * .8 # Affichage du volume à 80% d'amplitude.
l_args = [
dict(x=x, y1=y, y2=0, fc='#0079a333', ec='#fff0')
]
self.fill_between(o_ax, l_args)
def fill_between(self, o_ax, l_params):
""" Colorie entre 2 valeurs, conditionnellement.
@param o_ax: Axis matplotlib : graphique contenant des courbes.
@param l_params: Liste de coloriages. Chaque élément est un dictionnaire personnalisé. Voir lien ci-dessous.
https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.fill_between.html """
""" Effacement du coloriage antérieur, nécessaire si les couleurs ont une transparence. """
if not self.is_axis(o_ax):
raise SystemExit(f"Erreur dans self.fill_between() : L'objet o_ax ({o_ax}) n'est pas un axis.")
for fill in o_ax.filled:
o_ax.collections.remove(fill)
o_ax.filled.clear()
""" l_fill contient autant de paramètres que de coloriages à effectuer. """
for p in l_params:
""" Égalisation des vecteurs. """
len_min = len(p['x'])
if self.is_df(p['y1']):
len_min = min(len_min, len(p['y1']))
if self.is_df(p['y2']):
len_min = min(len_min, len(p['y2']))
p['x'] = p['x'][:len_min]
""" Coloriage. """
o_ax.filled.append(o_ax.fill_between(**p))
""" *********************************** Helpers. ************************************ """
def trace_hline(self, axis_name, y, **kwargs):
o_ax = self.o_ax(axis_name)
if self.is_axis(o_ax):
o_ax.axhline(xmin=0, xmax=1, y=y, **kwargs)
def trace_vline(self, axis_name, x, **kwargs):
o_ax = self.o_ax(axis_name)
if self.is_axis(o_ax):
o_ax.axvline(ymin=0, ymax=1, x=x, **kwargs)
@staticmethod
def is_axis(o_ax):
return o_ax.__class__.__name__ == 'Axes'
@staticmethod
def is_df(df):
return df.__class__.__name__ in ['DataFrame', 'Series']
def get_axis_names(self, b_twin=False):
if b_twin:
""" Liste de noms de tous les graphiques (subplots) de base et twined_axis. """
return list(self.od.read(['figure', 'subplots']).keys())
else:
""" Liste de noms de tous les graphiques (subplots) de base. """
return [axis_name for (axis_name, val) in list(self.od.read(['figure', 'subplots']).items())
if not str(val).startswith('twinned')]
def get_first_axis(self):
l_axes = self.get_axis_names()
o_ax = self.o_ax(l_axes[0])
if self.is_axis(o_ax):
return o_ax
def get_last_axis(self):
l_axes = self.get_axis_names()
o_ax = self.o_ax(l_axes[-1])
if self.is_axis(o_ax):
return o_ax
def _get_all_ax(self):
l_ax = list()
for key, val in self.od.read(['figure', 'subplots']).items():
l_ax.append(self.od.read([key, 'o_ax']))
return l_ax
def o_ax(self, axis_name):
return self.od.read([axis_name, 'o_ax'])
def df(self, axis_name):
return self.od.read([axis_name, 'df'])
def ax_df(self, axis_name):
o_ax = self.od.read([axis_name, 'o_ax'])
df = self.od.read([axis_name, 'df'])
return o_ax, df
""" ***************************** Méthodes surchargées. ***************************** """
def get_pilot(self):
raise SystemExit("show_geek.py > ShowGeek :\nLa méthode 'get_pilot()' doit être surchargée.")
def add_indics(self):
pass
def distrib(self):
pass
""" ***************************** Méthodes à surcharger. **************************** """
def hook_line_anim(self, axis_name, x, o_ax, df, o_line):
return self.od.read([axis_name, 'signals', o_line.get_label()])
def hook_axis_anim(self, axis_name, x, o_ax, df, y_min, y_max):
""" Affichage de 2 éléments distincts :
- Volumes, semi-transparent, sans bordure.
- Étiquette-suiveuse """
""" 1 - Affichage des volumes, semi-transparent, sans bordure. """
if axis_name == 'Volume':
self.paint_volume(x, o_ax, df)
""" 2 - Étiquette-suiveuse à droite du graphique. """
self.linked_label(axis_name, x, o_ax, df)
""" **************************** Événements et affichage. *************************** """
def key_event(self, ev):
def arrows(b_forward):
b_reverse = self.b_reverse
self.b_paused, self.b_reverse = False, False if b_forward else True
self.animate()
self.b_paused = True
self.b_reverse = b_reverse
self.magn = 1
keycode = ev.key
if keycode == ' ':
""" Pause on/off. """
self.b_paused = not self.b_paused
elif keycode == 'tab':
""" Inversion de sens. """
self.b_reverse = not self.b_reverse
elif keycode == 'right' or keycode == 'left':
""" Appui sur touche flèche droite ou flèche gauche. """
arrows(keycode == 'right')
elif keycode == 'up' or keycode == 'down':
""" Appui sur touche flèche haut ou flèche bas. """
self.magn = 10
arrows(keycode == 'up')
elif keycode == 'pageup' or keycode == 'pagedown':
""" Appui sur touche flèche page-haut ou flèche page-bas. """
self.magn = 100
arrows(keycode == 'pageup')
def on_mouse_move(self, ev):
""" Le style des lignes est définidans self.build_axis(). """
for o_ax in self.l_ax:
o_ax.vline.set_data([ev.xdata, ev.xdata], [0, 1])
o_ax.hline.set_data(([0, 1], [ev.ydata, ev.ydata]) if o_ax == ev.inaxes else ([0, 0], [0, 0]))
if ev.xdata is not None and ev.ydata is not None:
ax = self.get_last_axis()
ax.note_x.remove() # Effacement de l'étiquette précédente.
bbox = {'boxstyle': 'round4', 'fc': '#ff02', 'ec': 'k', 'lw': .5} # fc=face color, ec=edge color
ax.note_x = ax.annotate(text=round(ev.xdata), xy=(ev.xdata, ax.viewLim.y0), bbox=bbox)
if self.pointer <= 1 or not self.b_anim:
plt.draw() # Nécessaire si pas d'animation (lorsque self.pointer reste à 1).
def show(self):
self.l_ax = self._get_all_ax()
self.fig.canvas.mpl_connect('key_press_event', self.key_event)
if self.od.read(['figure', 'leader_lines']):
self.fig.canvas.mpl_connect('motion_notify_event', self.on_mouse_move)
if self.b_anim:
ani = matplotlib.animation.FuncAnimation(self.fig, self.animate, frames=None, interval=.1, repeat=True)
# |_ La variable {ani} doit exister pour empêcher le garbage collector de supprimer l'animation.
self.animate()
plt.show()
/trading/strategies/models
./trading/strategies/models/simple_model.py
:
# Imports externes
import pandas_ta as pdt # Utilisé seulement pour lire la doc : print(pdt.macd.__doc__)
# Imports internes
from show.show_geek import ShowGeek
from trading.historiques.ctrl_histos import CtrlHistos
class Geek(ShowGeek):
def __init__(self):
d_args = { # Ces valeurs seront à la racine du super-dictionnaire self.od.
'geometry': (100, 40, 1000, 700), # x, y, w, h. Par défaut : (100, 100, 1000, 600).
'margins': (6, 8, 10, 8), # Marges : haut, droite, bas, gauche.
'abscissa_size': 300, # Nb de points affichés en abscisse. Par défaut : 600.
'window_title': "Modèle simple", # Titre de la fenêtre. Par défaut 'Stratégie'.
'figure_title': "Modèle simple - Ne pas modifier", # Titre des graphiques. Par défaut '' (vide).
'leader_lines': True, # Lignes hortogonales de repère, suivi de la souris.
'subplots': { # Noms et hauteurs (en %) des graphiques modifiables. Supprimer / Ajouter.
'Principal': 55, # Subplot principal : NE PAS MODIFER SON ORTHOGRAPHE.
'Milieu': '', # '' : Si chaîne vide => Les hauteurs seront automatiquement calculées et réparties.
'Milieu2': '',
'Bas': '',
},
'show_pilote': True, # False par défaut.
'show_volume': True, # False par défaut.
'show_linked_label': True, # False par défaut.
'best_zones': {
'show': True, # False par défaut.
'gap': 20, # 24 par défaut. Nombre de pips take-profit ou stop-loss.
'bandwidth': 80, # 80 par défaut. Pourcentage : de 0 à 100.
'up': True, # False par défaut : Affichage des palliers haut.
'down': True, # False par défaut : Affichage des palliers bas.
'scatters': True, # False par défaut : Affichage des optimums sous forme de ronds.
'zig_zag': True, # False par défaut : Affichage de la courbe zig-zag.
'colors': ('#ffff0030', '#ff00ff10'), # Coloriage des ouvertures. Commenter pour ne pas les afficher.
},
'animation': True, # False par défaut.
# Paramètres généraux supplémentaires ici ...
# Paramètres seaborn : https://www.python-simple.com/python-seaborn/seaborn-general.php
}
super().__init__(**d_args)
def get_pilot(self):
""" - Récupération du signal pilote à partir de la db.
- return tuple(instrument, table_name, self.df_pilot). """
instrument, table = 'EUR/USD', 7 # table=int => Renko : 1 2 3 4 5 7 10 14 20
# instrument, table = 'EUR/USD', 'h1' # table=str => Candle : m1 m5 m15 m30 h1 h4 day week
# instrument, table = 'EUR/USD', None # table=None => Tick.
return instrument, *CtrlHistos(instrument).get_pilot(table=table, pc_to=99, nb_rows=3_000) # DataFrame
def add_indics(self):
""" Méthode utilisée en mode GA (sans affichage) et en mode normal. le format de d_params est imposé par GA. """
""" Indicateurs en D:/anaconda/envs/robot/Lib/site-packages/pandas_ta <-- Ne pas les modifier !!!
Doc indicateurs : https://github.com/twopirllc/pandas-ta#indicators-by-category
Génération de signaux dérivés : ajout de colonnes au dataframe {df_pilot}. """
# print(pdt.rsi.__doc__) <-- Affiche la doc du RSI.
self.df_pilot['StochRsi'] = self.df_pilot.ta.stochrsi(length=35).iloc[:, 1] # <-- Seulement la colonne 1
self.df_pilot['EMA'] = self.df_pilot.ta.ema(length=21)
self.df_pilot['EMA2'] = self.df_pilot.ta.ema(length=40)
self.df_pilot[['MACD', 'HISTO', 'SIGNAL']] = self.df_pilot.ta.macd(close='EMA', fast=40, slow=60, signal=14)
def distrib(self):
""" Répartition des signaux entre les différents graphiques (axes). """
""" - Les noms des axes (graphiques) doivent correspondre avec ceux déclarés dans l'ini ('subplots').
- Ceux des signaux (indicateurs, ...) doivent correspondre avec ceux déclarés dans add_indics(). """
self.add_df('Principal', ['EMA', 'EMA2'])
self.add_df('Milieu', ['StochRsi'])
self.add_df('Milieu2', ['MACD', 'HISTO', 'SIGNAL'])
self.add_df('Bas', ['Peak'])
def pre_params(self):
""" Surcharge ou ajout au code pre_params() de la classe-parent. """
super().pre_params()
""" Exemple : écriture dans le super-dictionnaire -> ici, légende épinglée au milieu (cm = Center-Middle) """
self.od.write(['Milieu', 'legend'], 'cm') # 'auto', 'rh', 'lh', 'lb', 'rb', 'rm', 'lm', 'rm', 'cb', 'ch', 'cm'
def post_params(self):
""" Surcharge ou ajout au code post_params() de la classe-parent. """
super().post_params()
""" Exemple de tracé de lignes - https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.axhline.html """
self.trace_hline('Milieu', 50, lw=.4, ls='-.', c='r')
self.trace_vline('Principal', 210, lw=.4, ls='-.', c='r')
def hook_axis_anim(self, axis_name, x, o_ax, df, y_min, y_max):
""" Surcharge ou ajout au code hook_axis_anim() de la classe-parent. """
super().hook_axis_anim(axis_name, x, o_ax, df, y_min, y_max)
""" Exemple de bandes horizontales. """
l_args = [
dict(x=x, y1=15, y2=85, fc='#0aa2')
]
self.fill_between(self.o_ax('Milieu'), l_args)
if __name__ == '__main__':
Geek()
CtrlHisto
qui n'existent pas./trading/historiques/ctrl_histos.py
:
# Imports externes
import datetime
import gzip
import os # http://www.python-simple.com/python-modules-fichiers/os-path.php (Ctrl + clic)
import requests
from io import BytesIO, StringIO
import pandas as pd
import keyboard # https://www.delftstack.com/fr/howto/python/python-detect-keypress/
# Imports internes
from functions.utils import DateTime, Utils
from trading.historiques.db_candle import DbCandle
from trading.historiques.db_tick import DbTick
class CtrlHistos:
def __init__(self, instrument):
""" https://github.com/fxcm/MarketData
Tous les symboles :
AUDCAD, AUDCHF, AUDJPY, AUDNZD, AUDUSD, CADCHF, CADJPY, EURAUD, EURCHF,
EURGBP, EURJPY, EURNZD, EURUSD, GBPCAD, GBPCHF, GBPJPY, GBPNZD, GBPUSD,
NZDCAD, NZDCHF, NZDJPY, NZDUSD, USDCAD, USDCHF, USDJPY, USDTRY
Les plus utilisés (Dans l'ordre d'importance) :
EURUSD, USDJPY, EURCHF, USDCAD, NZDUSD, EURGBP, EURJPY, GBPJPY, GBPCHF, GBPUSD """
self.instrument = instrument
self.symbol = instrument.replace('/', '')
self.dt = DateTime()
self.ut = Utils()
self.kb_break = 'f12' # 'shift', 'ctrl', ... à votre convenance.
self.pips = .01 if instrument.endswith('JPY') else .0001
self.symbol_dir = None
self.db_candle = None
self.db_tick = None
self.setup()
def setup(self):
""" Les données peuvent se trouver sur un disque dur SSD autre que celui en cours.
Le cas échéant, db_path doit être modifié (chemin absolu).
Il est également possible, pour les besoins des tests, d'avoir plusieurs bases de données.
|_ Il faudra alors modifier db_path chaque fois que l'on désire switcher. """
db_path = os.path.dirname(__file__) # <--- Chemin par défaut, à modifier si autre disque dur.
self.symbol_dir = os.path.abspath(f'{db_path}/db/{self.symbol}') # Dossier des fichiers d'historiques.
os.makedirs(self.symbol_dir, exist_ok=True)
self.db_candle = DbCandle(self)
self.db_tick = DbTick(self)
def _helper(self, key, *l_params):
""" ************ Collection d'utilitaires. Nombre de paramètres variable. ************ """
if key == 'nb_weeks_in_year':
""" Renvoie le nombre de semaines dans l'année {year}. """
year = l_params[0] # l_params = [année dont on veut connaître le nombre de semaines].
if len(l_params) > 1:
y, num_week, _ = datetime.date.today().isocalendar()
if y == year:
return num_week - 1
o_dat = self.dt.get_date_from_dtstamp(self.dt.get_dtstamp_from_dtstr(f'{year}-12-28', '%Y-%m-%d'))
return o_dat.isocalendar()[1]
elif key == 'l_iso_yw':
""" Renvoie une liste de tuples ISO (iso_year, iso_str_week) : <-- exemple de tuple : (2019, '08')
- Pour la devise en cours et son type (tick ou candle).
- Toutes les semaines, du début à aujourd'hui.
- L'année est un int, la semaine une str ('02', '12', ...) """
b_ticks = l_params[0] # l_params = [b_ticks].
st_yw = set() # Le set() évite les doublons.
since = 2018 if b_ticks else 2012
yw_today = datetime.date.today().isocalendar()[:2]
for year in range(since, yw_today[0] + 1):
for week in range(1, 54):
iso = self.dt.isoyw_from_fxyw(year, week)
st_yw.add((iso[0], f'0{iso[1]}'[-2:]))
if (year, week) >= yw_today:
lt_yw = list(st_yw)
lt_yw.sort()
return lt_yw
elif key == 'csv_file':
""" Renvoie le chemin complet du fichier correspondant à la devise en cours, à l'année et à la semaine. """
year, week, b_ticks = l_params # l_params = [*t_yw, b_ticks]
str_week = f'0{week}'[-2:]
return os.path.join(self.symbol_dir, str(year), f"{'tick' if b_ticks else 'candle'}_"
f"{str_week}.csv")
elif key == 'url_file':
""" Renvoie l'url du fichier d'histos correspondant à la devise en cours, à l'année et à la semaine. """
year, str_week, b_ticks = l_params # l_params = [*t_yw, b_ticks]
url_candle = 'https://candledata.fxcorporate.com/m1'
url_tick = 'https://tickdata.fxcorporate.com'
return f'{url_tick if b_ticks else url_candle}/{self.symbol}/{year}/{int(str_week)}.csv.gz'
def verify_weeks(self, b_ticks, b_silent=False):
""" Affichage d'un tableau dans la console (Lignes=années, Colonnes=semaines).
'▒' ou '▄' ou '▀' = semaine présente, '.' = semaine absente. """
""" 1 - Listes des semaines réellement existantes en base de données et en fichier .csv. """
l_weeks_csv, l_weeks_db = self._existing_lists(b_ticks)
""" 2 - Affichage du titre et entêtes de colonnes. """
typ = 'Ticks' if b_ticks else 'Candles'
self.ut.gauge(f"{self.instrument}-{typ}", large=17)
[self.ut.gauge(char=f"{str(week) :<2}") for week in range(1, 54)]
self.ut.gauge('end')
""" 3 - Affichage du contenu - Ordonnées = années, abscisse = semaines. """
l_weeks = l_weeks_csv + l_weeks_db
l_weeks.sort()
first_year = (2018 if b_ticks else 2012) if len(l_weeks) == 0 else l_weeks[0][0]
year_now = datetime.date.today().isocalendar()[0]
for year in range(first_year, year_now + 1):
self.ut.gauge(year, large=17)
""" Boucle sur les semaines de l'année {year} : 1 à (52 ou 53). """
for week in range(1, self._helper('nb_weeks_in_year', year, 'today') + 1): # today limite à aujourd'hui.
t_yw = year, f'0{week}'[-2:] # 7 -> 07
b_csv, b_db = t_yw in l_weeks_csv, t_yw in l_weeks_db
if not b_csv and not b_db: # 0 0 /x./y
char, color = '. ', 'BLEU'
elif not b_csv and b_db: # 0 1 /x.y
char, color = '▀ ', 'VERT'
elif b_csv and not b_db: # 1 0 x./y
char, color = '▄ ', 'VIOLET'
else: # 1 1 x.y
char, color = '▒ ', 'BLEU'
self.ut.gauge(char=char, color=color)
self.ut.gauge('end')
""" Vérification de la synchronisation entre la base de données et les fichiers csv. """
msg = "\nAttention : La base de données n'est pas entièrement synchronisée ! csv + db = rectangles bleus." \
"\nL'affichage ci-dessus montre les fichiers .csv en carrés-bas mauves, la db en carrés-hauts verts." \
if not b_silent and l_weeks_csv != l_weeks_db else ''
self.ut.printc(msg)
def download_histos(self, nb_weeks, b_ticks):
if nb_weeks > 1:
self.ut.printc(f"\nArrêt manuel : garder la touche '{self.kb_break.upper()}' appuyée.\n")
""" Intervalle des semaines à télécharger (nombres en base 53). """
""" |_ since = Depuis. """
l_weeks_csv, _ = self._existing_lists(b_ticks=b_ticks) # Semaines = str <-- '08', '09', '10', ...
l_csv = [(y, int(w)) for (y, w) in l_weeks_csv] # Semaines = int <-- 8, 9, 10, ...
for y, w in l_csv:
if w > 2:
since = y * 53 + w - 1
break
else:
since = (2018 if b_ticks else 2012) * 53
y_today, w_today = datetime.date.today().isocalendar()[:2]
now = y_today * 53 + w_today
""" |_ now = Jusqu'à. """
""" l_required = Liste des fichiers à télécharger. """
l_required = list()
for yw53 in range(since, now-1): # dernière semaine = (now-1) => pas la semaine en cours.
csv_yw = yw53 // 53, 1 + yw53 % 53
csv_file = self._helper('csv_file', *csv_yw, b_ticks)
if not os.path.isfile(csv_file):
""" Fichier absent localement => à télécharger. """
l_required.append(csv_yw)
db = self.db_tick if b_ticks else self.db_candle
nb_weeks_loaded = 0
""" Parcours des semaines à traiter. """
for csv_yw_required in l_required:
df = self._download_csv_file(csv_yw_required, b_ticks)
if df is None:
""" Arrêt manuel demandé. """
break
elif df.shape[0] > 0:
""" 1 - Enregistrement du fichier téléchargé sur disque dur. """
self._df_to_csv(df, csv_yw_required, b_ticks)
""" 2 - Écriture du contenu en base de données. break si arrêt manuel demandé. """
if not db.df_to_table(df):
continue
""" 3 - Update des volumes à partir des ticks. """
df_stamps = self.db_tick.get_df_stamps(*csv_yw_required)
if df_stamps.shape[0] > 0:
self.db_candle.update_volume(df_stamps)
""" Arrêt si le nombre de semaines demandées est atteint. """
nb_weeks_loaded += 1
if nb_weeks_loaded >= nb_weeks:
break
s = 's' if nb_weeks > 1 else ''
self.ut.printc(f"Téléchargement{s} terminé{s}.", color='vert')
def synchro_db_csv(self, b_ticks):
""" A l'issue de cette méthode, TOUTE la db sera l'image de TOUS les fichiers csv. """
""" 1 - Base de données. """
db, typ = (self.db_tick, 'Ticks') if b_ticks else (self.db_candle, 'Candles')
""" 2 - État actuel des historiques => Différences entre csv et db
- l_drop = semaines à supprimer de la db, l_add = semaines à ajouter à la db. """
l_drop, l_add, l_vol = self._lists_to_update_db(b_ticks=b_ticks)
if l_drop+l_add+l_vol == []:
self.ut.printc(f"{typ} csv <--> db : OK. La synchronisation est inutile.\n", color='VERT')
return True
""" 3 - Message : état initial. """
for l_weeks in [l_drop, l_add]:
if len(l_weeks) > 0:
s = 's sont' if len(l_weeks) > 1 else ' est'
verb = 'ajouter à' if l_weeks == l_add else 'supprimer de'
self.ut.printc(f"{len(l_weeks)} semaine{s} à {verb} la base de données '{typ.lower()}'.", color='VERT')
""" 4 - Ajout de données dans la db. """
self._add_weeks(l_add, b_ticks)
""" 5 - Suppression de données de la db. """
for yw in l_drop:
db.delete_week(*yw)
""" 6 - Update des volumes. """
for yw in l_vol:
if keyboard.is_pressed(self.kb_break):
self.ut.printc("Arrêt manuel demandé.")
break
df_stamps = self.db_tick.get_df_stamps(*yw)
self.db_candle.update_volume(df_stamps)
""" 7 - Vérification. """
l_drop, l_add, l_vol = self._lists_to_update_db(b_ticks=b_ticks)
success, color = ('réussi', 'VERT') if l_drop+l_add+l_vol == [] else ('échoué', 'ROUGE')
self.ut.printc(f'{typ} : La synchronisation a {success}.', color=color)
def _existing_lists(self, b_ticks):
""" Retourne les listes de semaines des fichiers csv et des enregistrements en base de données. """
""" s_weeks_csv n'est pas une list(), mais un set(), pour éviter les doublons. """
l_weeks_csv, s_weeks_db = list(), set()
db = self.db_tick if b_ticks else self.db_candle
l_iso_yw = self._helper('l_iso_yw', b_ticks) # Liste de toutes les semaines.
typ = 'Ticks' if b_ticks else 'Candles'
self.ut.gauge(f"{self.instrument}-{typ} Création listes :", large=0)
for num, iso_yw in enumerate(l_iso_yw):
""" Semaines existant en fichiers csv : Pour une semaine iso, on a 0, 1 ou 2 fichiers csv. """
l_csv_files, last_stamp = self._get_csv_files(*iso_yw, b_ticks)
csv_size = 0
if len(l_csv_files) > 0:
l_weeks_csv.append(iso_yw)
for csv_file in l_csv_files:
csv_size += os.path.getsize(csv_file)
""" Liste des semaines existant en base de données. """
csv = last_stamp if b_ticks else csv_size
if db.week_exists(*iso_yw, csv):
s_weeks_db.add(iso_yw) # (1)
if num % 8 == 0:
self.ut.gauge(char='.') # , color='Jaune')
self.ut.gauge('end')
self.ut.gauge('end') # Ligne vide supplémentaire.
""" Persistance en pkl. """
l_weeks_db = list(s_weeks_db)
l_weeks_csv.sort()
l_weeks_db.sort()
return l_weeks_csv, l_weeks_db
def _get_csv_files(self, iso_year, iso_week, b_ticks):
""" Attention ! Les noms de fichiers csv ne correspondent pas toujours à leur contenu ! """
""" Return : liste de noms complets de fichiers csv correspondant à (iso_year, iso_week) : 0, 1 ou 2 fichiers.
fxcorporate ne respecte pas la norme ISO. De plus :
- D'une année sur l'autre, le nom du fichier csv ne correspond pas à son contenu.
- On ne trouve pas de correspondance entre diverses devises.
- Par conséquent, le nom du fichier n'est pas fiable pour trouver le bon N° de semaine.
La méthode adoptée pour contourner ce problème est la suivante :
- Recherche de 3 fichiers csv voisins de (iso_year, iso_week) passés en paramètres.
- Lecture, pour chacun, de la date-heure du milieu du fichier.
- On en déduit le VRAI N° de semaine ISO.
Il arrive parfois, au changement d'année, qu'une même semaine soit composée de 2 fichiers csv.
- C'est pourquoi la valeur de retour est une liste. """
int_week, l_fxyw = int(iso_week), list()
last_stamp = 0
for week in range(int_week - 1, int_week + 1): # 2 Semaines : précédente, actuelle.
""" Correction : N° de semaine en base 53. """
y, w = (iso_year, week) if week > 0 else (iso_year - 1, week + 53)
y, w = (y, w) if w <= 53 else (y + 1, w - 53)
""" Les 3 fichiers csv candidats. """
str_w = f'0{w}'[-2:]
file_name = f"{'tick' if b_ticks else 'candle'}_{str_w}.csv" # 'tick_??.csv' ou 'candle_??.csv'.
csv_path = os.path.abspath(f"{os.path.dirname(self.symbol_dir)}/{self.symbol}/{y}/{file_name}")
if os.path.isfile(csv_path):
with open(csv_path, 'r') as fh:
""" On 'goûte' les 300 derniers bytes du fichier pour connaitre le N° ISO de semaine. """
nb_char = 300
fh.seek(0, os.SEEK_END)
fh.seek(fh.tell() - nb_char)
dt_str = fh.read().split('\n')[-3].split(',')[0]
last_stamp = self.dt.get_dtstamp_from_dtstr(dt_str)
o_d = self.dt.get_date_from_dtstamp(last_stamp) # Objet date.
if o_d.isocalendar()[:2] == (iso_year, int_week): # Filtrage.
l_fxyw.append(csv_path) # Candidat sélectionné.
return l_fxyw, last_stamp # 0, 1 ou 2 éléments.
def _download_csv_file(self, csv_yw_required, b_ticks):
""" Téléchargement, décompactage, décodage, DataFrame, enregistrement sur disque dur.
- return DataFrame. """
""" 1 - Demande au helper de construire l'url. """
url_required = self._helper('url_file', *csv_yw_required, b_ticks)
""" 2 - Initialisation des bytes. """
content = b''
""" 3 - Téléchargement en streaming avec lots de taille {size//50}, permettant l'insertion d'une jauge. """
y, w = csv_yw_required
try:
with requests.get(url_required, stream=True) as req:
size = int(req.headers['Content-Length'])
if size < 1_000:
return pd.DataFrame() # return df vide.
print()
self.ut.gauge(f"Téléchargement {' ticks ' if b_ticks else 'candles'} {self.symbol} ({y}, {w})")
for chunk in req.iter_content(chunk_size=size//50):
if keyboard.is_pressed(self.kb_break):
self.ut.printc("Arrêt manuel demandé.")
self.ut.gauge('end')
return None
content += chunk
""" 3.1 - Jauge de progression. """
self.ut.gauge()
self.ut.gauge('end')
except (Exception,):
self.ut.printc('Connexion internet interrompue.')
return None
""" 4 - Décompactage du buffer de données binaires (codées utf), en mémoire. """
buf = BytesIO(content)
f = gzip.GzipFile(fileobj=buf)
coded_bytes = f.read()
""" 5 - Décodage : utf-8 pour les candles, utf-16 pour les ticks. """
codec, type_data = ('utf-16', 'tick') if b_ticks else ('utf-8', 'candle')
decoded_bytes = coded_bytes.decode(codec)
""" 6 - Conversion Bytes -> String """
decoded_str = StringIO(decoded_bytes)
""" 7 - String to DataFrame. """
df = pd.read_csv(decoded_str) # Pas de colonne-index (index_col=0 retiré)
""" 8 - Retourne le dataframe. """
return df
def _df_to_csv(self, df, t_yw, b_ticks):
""" Contrôle préalable : si le dossier-cible n'existe pas, on le crée. """
csv_path = self._helper('csv_file', *t_yw, b_ticks)
os.makedirs(os.path.dirname(csv_path), exist_ok=True)
df.to_csv(csv_path, index=False) # Copie du fichier .csv sur disque dur.
@staticmethod
def _csv_to_df(csv_file):
""" Conversion du fichier {csv_file} en dataframe. """
return pd.read_csv(csv_file) if os.path.isfile(csv_file) else pd.DataFrame()
def _lists_to_update_db(self, b_ticks):
""" Retourne 3 listes : l_drop, l_add et l_vol.
Algorithme pour l_drop et l_add :
- (1) = l_weeks_db : Liste des semaines réellement existantes en base de données.
- (2) = l_weeks_csv : Liste des semaines réellement existantes en fichiers csv.
- Intersection (Inter) = (1) ⋂ (2)
- l_drop : Liste des semaines à supprimer de la db (celles qui n'existent pas en csv) : (1) - (Inter)
- l_add : Liste des semaines à ajouter dans la db (celles des csv qui n'existent pas en db) : (2) - (Inter)
--------------------------------
l_vol = liste des semaines (yw) de candles nécessitant un update du champ 'Volume' dans les tables.
Sans objet pour les ticks.
"""
""" 1 et 2 - Listes des semaines réellement existantes en base de données et en fichier .csv. """
l_weeks_csv, l_weeks_db = self._existing_lists(b_ticks=b_ticks)
""" 3 - Liste des semaines à supprimer de la db (celles qui n'existent pas en csv). """
l_drop = list()
for t_yw in l_weeks_db:
if t_yw not in l_weeks_csv:
# print('A supprimer :', t_yw) # MAP : Permet de lister les semaines qui sont à supprimer.
l_drop.append(t_yw)
""" 4 - Liste des semaines à ajouter à la db (celles des csv qui n'existent pas en db). """
l_add = list()
for t_yw in l_weeks_csv:
if t_yw not in l_weeks_db:
# print('A ajouter :', t_yw) # MAP : Permet de lister les semaines qui sont à ajouter.
l_add.append(t_yw)
""" 5 - Liste des semaines de ticks pour lesquelles les volumes des candles sont à updater. """
l_vol = list()
if not b_ticks: # Renkos non concernés.
""" Semaines ticks existantes (b_ticks=True). """
_, l_weeks_tick_db = self._existing_lists(b_ticks=True)
for yw in l_weeks_tick_db:
if yw in l_weeks_db:
if not self.db_candle.is_volume_in_week(*yw):
l_vol.append(yw)
return l_drop, l_add, l_vol
def _add_weeks(self, l_yw, b_ticks):
"""
@param l_yw : Liste des semaines à ajouter à la base de données.
@param b_ticks: True = ticks, False = candles.
"""
db = self.db_tick if b_ticks else self.db_candle
self.ut.printc(f"\nArrêt manuel : garder la touche '{self.kb_break.upper()}' appuyée.\n")
for iso_yw in l_yw:
if keyboard.is_pressed(self.kb_break):
self.ut.printc("Arrêt manuel demandé.")
break
l_csv_files, _ = self._get_csv_files(*iso_yw, b_ticks) # Liste de 0, 1 ou 2 fichiers.
for csv_file in l_csv_files:
df = self._csv_to_df(csv_file)
if df.shape[0] > 0:
db.df_to_table(df)
def get_stamps(self, table_name, nb_stamps):
db = self.db_tick if (table_name == 'Ticks' or table_name[0] == 'r') else self.db_candle
return db.get_last_stamps(table_name, nb_stamps)
def get_datas(self, table_name, final_stamp, nb_points):
b_ticks = table_name == 'Ticks' or table_name[0] == 'r' # Ticks ou Renko
db = self.db_tick if b_ticks else self.db_candle
return db.get_datas(table_name, final_stamp, nb_points)
""" *************************** Interface Trading <--> Base de données. *************************** """
def slice_tick(self, pc_pos, nb_rows):
"""
@param pc_pos: pourcentage -> 0 à 100.
@param nb_rows: Nombre d'enregistrements demandés -> si > 0, enr après pc_pos ; si < 0, enr avant pc_pos.
@return: pandas DataFrame.
"""
return self.db_tick.slice_tick(pc_pos, nb_rows)
def slice_renko(self, mesh, pc_from, pc_to, nb_rows):
return self.db_tick.slice_renko(mesh, pc_from, pc_to, nb_rows)
def slice_candle(self, period, pc_from, pc_to, nb_rows):
return self.db_candle.slice_candle(period, pc_from, pc_to, nb_rows)
def get_pilot(self, table=None, pc_from=None, pc_to=100, nb_rows=10_000):
""" - Récupération des {nb_rows} points AVANT {pc_to} (en %) ou APRES {pc_from} (en %). <- pc_ comme 'pourcent'
- Par défaut, les 10_000 derniers points. pc_to=100 signifie la fin du fichier, pc_to=50 le milieu. """
if isinstance(table, int):
table_name = f'Renko{table}'
df = self._df_renko(table, pc_from, pc_to, nb_rows)
elif isinstance(table, str):
table_name = 'Candle ' + table
df = self._df_candle(table, pc_from, pc_to, nb_rows)
else: # elif table is None:
pc_pos = pc_from if isinstance(pc_from, (int, float)) else pc_to if isinstance(pc_to, (int, float)) else 50
nb_rows = -nb_rows if pc_pos == pc_to else nb_rows
table_name = 'Tick'
df = self._df_tick(pc_pos, nb_rows)
return table_name, df
def _df_tick(self, pc_pos=50, nb_rows=50_000):
df = self.slice_tick(pc_pos=pc_pos, nb_rows=nb_rows)
""" Uniformisation des colonnes. """
df['Open'] = (df['Bid'] + df['Ask']) / (2 * self.pips)
df['High'] = (df['Bid'] + df['Ask']) / (2 * self.pips)
df['Low'] = (df['Bid'] + df['Ask']) / (2 * self.pips)
df['Close'] = (df['Bid'] + df['Ask']) / (2 * self.pips)
df['Volume'] = 1
df['Spread'] = (df['Ask'] - df['Bid']) / self.pips
return df[['timestamp', 'Open', 'High', 'Low', 'Close', 'Volume', 'Spread']] # Tri des colonnes.
def _df_renko(self, table, pc_from, pc_to, nb_rows):
"""
@param table: Maille 1, 2, 3, 4, 5, 7, 10, 14 ou 20.
@param pc_from: En % sur toute l'étendue des données.
@param pc_to: En % sur toute l'étendue des données.
@param nb_rows: nb_enr = abs(nb_rows). Si > 0, nb_enr depuis pc_from, si < 0, nb_enr jusqu'à pc_to.
@return: NA
"""
try:
df = self.slice_renko(table, pc_from=pc_from, pc_to=pc_to, nb_rows=nb_rows)
if df.empty:
raise SystemExit(f"La table {self.instrument} de maille {table} est vide.")
df['Volume'] = df['Volume'].fillna(0) # Remplace les nan par des zéros.
""" Uniformisation Candles-Renko : Colonne 'Renko' renommée en 'Close'. """
df['Open'] = df['Renko']
df['High'] = df['Renko']
df['Low'] = df['Renko']
df = df.rename(columns={'Renko': 'Close'})
return df[['timestamp', 'Open', 'High', 'Low', 'Close', 'Volume', 'Spread']] # Tri des colonnes.
except (ValueError,) as err:
self.ut.printc(str(err))
return pd.DataFrame()
def _df_candle(self, table, pc_from, pc_to, nb_rows):
"""
@param table: 'm1', 'm5', 'm15', 'm30', 'h1', 'h4', 'day', 'week'
@param pc_from:
@param pc_to:
@param nb_rows: nb_enr = abs(nb_rows). Si > 0, nb_enr depuis pc_from, si < 0, nb_enr jusqu'à pc_to.
@return: NA
"""
try:
df = self.slice_candle(table, pc_from=pc_from, pc_to=pc_to, nb_rows=nb_rows)
if df.empty:
raise SystemExit(f"La table {self.instrument} de période {table} est vide.")
df['Volume'] = df['Volume'].fillna(0)
""" Uniformisation Candles-Renko : Ajout de la colonne 'Close', moyenne de 'BidClose' et 'AskClose'. """
df['Open'] = (df['BidOpen'] + df['AskOpen']) / (2 * self.pips)
df['High'] = (df['BidHigh'] + df['AskHigh']) / (2 * self.pips)
df['Low'] = (df['BidLow'] + df['AskLow']) / (2 * self.pips)
df['Close'] = (df['BidClose'] + df['AskClose']) / (2 * self.pips)
df['Spread'] = (df['BidClose'] - df['AskClose']) / self.pips
""" Filtrage et tri des colonnes. """
return df[['timestamp', 'Open', 'High', 'Low', 'Close', 'Volume', 'Spread']]
except (ValueError,) as err:
self.ut.printc(str(err))
return pd.DataFrame()
def main(): # MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP ***** MAP *****
""" Code de MAP (mise au point). Espace expérimental : ajouter, supprimer, commenter, décommenter, ... """
h = CtrlHistos('EUR/USD')
# h = CtrlHistos('USD/JPY')
# h = CtrlHistos('GBP/USD')
""" Candles. """
# h.download_histos(1, b_ticks=False)
# h.synchro_db_csv(b_ticks=False)
h.verify_weeks(b_ticks=False)
""" Ticks. """
# h.download_histos(1, b_ticks=True)
# h.synchro_db_csv(b_ticks=True)
h.verify_weeks(b_ticks=True)
if __name__ == '__main__':
main()
Fichier complet /trading/historiques/db.py
:
from peewee import SqliteDatabase
from functions.utils import DateTime, Utils
import datetime
import pandas as pd
import numpy as np
import keyboard
import os
""" (Russe - vidéo) https://www.youtube.com/watch?v=8dla28TLvwA <- Ctrl + Clic
(Français - txt) https://linuxtut.com/fr/9d4e1d0afac1865acdbb/
(Anglais - vidéo) https://www.youtube.com/watch?v=Vk6Ptnvqr4M
"""
# noinspection PyUnresolvedReferences
class DataBase:
def __init__(self, o_ctrl, b_ticks):
self.o_ctrl = o_ctrl
self.b_ticks = b_ticks
self.dt = DateTime()
self.ut = Utils()
self.kb_break = o_ctrl.kb_break # Nom de la touche du clavier défini dans le contrôleur CtrlHistos.
self.pips = o_ctrl.pips
self.week_delta = datetime.timedelta(weeks=1) # <class 'datetime.timedelta'>
self.day_delta = datetime.timedelta(days=1) # <class 'datetime.timedelta'>
self.o_table = None
pd.set_option('mode.chained_assignment', None)
""" 3 valeurs pour b_ticks : True (ticks), False (candles) ou None (renkos). """
file_name = 'tick' if b_ticks is True else ('candle' if b_ticks is False else 'renko')
sql_file = os.path.abspath(f"{o_ctrl.symbol_dir}/{file_name}.sql")
self.db = SqliteDatabase(sql_file)
self.d_tables = dict()
""" Création physique du fichier *.sql sur disque dur. """
self.db.connect()
def change_datetime_column(self, df):
""" Mise en conformité de la colonne DateTime : string to float. """
if 'DateTime' in df.columns:
df = df.rename(columns={'DateTime': 'timestamp'})
dt0 = np.datetime64(self.dt.get_dtstr_from_dtstamp(0, dt_format='%Y-%m-%d %H:%M:%S')) # '1970-01-01 01:00'
t_delta = np.timedelta64(1, 's') # <class 'numpy.timedelta64'>
pd_dt = pd.to_datetime(df.timestamp, format='%m/%d/%Y %H:%M:%S.%f') # format nécessaire, sinon très lent.
last_stamp = (np.datetime64(pd_dt.iloc[-1]) - dt0) / t_delta
o_d = self.dt.get_date_from_dtstamp(last_stamp)
yw = o_d.isocalendar()[:2] # (year, week)
len_batch = int(.95 * df.shape[0] / 50)
l_ts = list()
self.ut.gauge(f'Conformité {yw}')
for _from in range(0, df.shape[0], len_batch):
if keyboard.is_pressed(self.kb_break):
self.ut.printc("Arrêt manuel demandé.")
self.ut.gauge('end')
return None
_to = _from + len_batch
self.ut.gauge()
l_ts += [(np.datetime64(x) - dt0) / t_delta for x in pd_dt[_from: _to]]
self.ut.gauge('end')
df.timestamp = l_ts
return df
def get_db_sign(self):
nb_enr = 0
for o_table in list(self.d_tables.values()):
nb_enr += o_table.select().count()
return nb_enr
def _get_range_from_df(self, df):
""" Renvoie un tuple de stamps, qui encadrent df, à l'extérieur de df (samedis 22:00). """
f_stamp = df['timestamp'].iloc[0] # Premier stamp réel dans df. <-- début.
l_stamp = df['timestamp'].iloc[-1] # Dernier stamp réel dans df. <-- fin.
stamp_before = f_stamp + self.dt.get_stamp_offset(f_stamp, b_after=False, num_day=6, _time='22:00')
stamp_after = l_stamp + self.dt.get_stamp_offset(l_stamp, b_after=True, num_day=6, _time='22:00')
return stamp_before, stamp_after
def _get_range_from_yw(self, year, str_week):
stamp = self.dt.get_dtstamp_from_dtstr(f'{year}-{str_week}-3 10', dt_format='%G-%V-%u %H') # Mercredi 10:00
stamp_before = stamp + self.dt.get_stamp_offset(stamp, b_after=False, num_day=6, _time='22:00')
stamp_after = stamp + self.dt.get_stamp_offset(stamp, b_after=True, num_day=6, _time='22:00')
return stamp_before, stamp_after
def is_volume_in_week(self, *yw):
""" Compte le nombre d'enregistrements total et le nombre d'enregistrements ayant du volume. """
""" Limites de la semaine : samedi 22:00 à samedi 22:00. """
range_stamps = self._get_range_from_yw(*yw)
""" Parcours de toutes les tables et cumul des valeurs. """
nb_total = nb_volume = 0
for o_table in list(self.d_tables.values()):
nb_total += o_table.select(o_table.Volume).where(
o_table.timestamp.between(*range_stamps)
).count()
nb_volume += o_table.select(o_table.Volume).where(
o_table.timestamp.between(*range_stamps),
~o_table.Volume.is_null()
).count()
return nb_volume > nb_total * .98 # Booléen.
def delete_week(self, *yw):
""" Supprime la semaine {yw} de chacune des tables """
range_stamps = self._get_range_from_yw(*yw)
for o_table in list(self.d_tables.values())[::-1]: # Parcours à l'envers.
o_table.delete().where(o_table.timestamp.between(*range_stamps)).execute()
def get_datas(self, table_name, final_stamp, nb_points):
o_table = self.get_otable(table_name)
""" 1 - Lecture des {nb_points} derniers points, le plus récent en premier. """
l_datas = o_table.select().order_by(o_table.timestamp.desc()).where(o_table.timestamp <= final_stamp).limit(nb_points)
""" 2 - Peewee -> DataFrame Pandas """
df = pd.DataFrame(list(l_datas.dicts()))
""" 3 - DataFrame Pandas -> Numpy avec retournement : le plus récent en dernier. """
np_datas = df.values[::-1] # Retournement
return np_datas
def get_otable(self, table_name):
if table_name == 'Ticks':
return self.o_table
else:
d_tables = self.o_renko.d_tables if table_name[0] == 'r' else self.d_tables
return d_tables.get(table_name)
def get_last_stamps(self, table_name, nb_points):
o_table = self.get_otable(table_name)
""" 1 - Lecture des {nb_points} derniers points, le plus récent en premier. """
l_datas = o_table.select(o_table.timestamp).order_by(o_table.timestamp.desc()).limit(nb_points)
""" 2 - Peewee -> DataFrame Pandas """
df = pd.DataFrame(list(l_datas.dicts()))
""" 3 - DataFrame Pandas -> List avec retournement : le plus récent en dernier. """
return list(df['timestamp'])[::-1] if df.shape[0] > 0 else []
@staticmethod
def slice_datas(o_table, pc_from, pc_to, nb_rows):
"""
@param o_table: objet table tick, renko ou candle.
@param pc_from: en % : 0% = début de la table, 50% = milieu de la table.
@param pc_to: en % : 50% = milieu de la table, 100% = fin de la table.
@param nb_rows: nb_enr ou None :
|_ Si None => de pc_from (0 par défaut) à pc_to (100 par défaut).
|_ Si int :
|_ Si (pc_from == None) et (pc_to == None) => pc_to forcé à 100.
|_ Si (pc_to == None) => de pc_from à (pc_from + nb_rows).
|_ Sinon => de (pc_to - nb_rows) à pc_to (100 par défaut).
@return: DataFrame avec timestamp en index.
"""
try:
first_ts = int(o_table.select().order_by(o_table.timestamp.asc()).get().timestamp)
last_ts = int(o_table.select().order_by(o_table.timestamp.desc()).get().timestamp)
delta = last_ts - first_ts
except (Exception,):
return pd.DataFrame()
if nb_rows is None:
""" Limites fixées par pc_from et pc_to. """
pc_from = 0 if pc_from is None else pc_from
pc_to = 100 if pc_to is None else pc_to
pc_from, pc_to = min(100, pc_from, pc_to), max(0, pc_from, pc_to)
_from = first_ts + (delta * pc_from) / 100
_to = first_ts + (delta * pc_to) / 100
datas = o_table.select().order_by(o_table.timestamp.asc()).where(o_table.timestamp.between(_from, _to))
l_datas = list(datas.dicts())
else:
if pc_from is None and pc_to is None:
pc_to = 100
# l_datas = []
if pc_to is None:
_from = first_ts + (delta * pc_from) / 100
datas = o_table.select().order_by(o_table.timestamp.asc()).where(
o_table.timestamp >= _from).limit(nb_rows)
l_datas = list(datas.dicts())
else:
_to = first_ts + (delta * pc_to) / 100
datas = o_table.select().order_by(o_table.timestamp.desc()).where(
o_table.timestamp <= _to).limit(nb_rows)
l_datas = list(datas.dicts())[::-1]
df = pd.DataFrame(l_datas)
return df
""" Méthodes surchargées. """
def df_to_table(self, df):
return True
def update_derived(self, df):
pass
def update_derived_table(self, df, stamp_before, stamp_after, o_derived_table):
pass
def week_exists(self, year, str_week, last_stamp):
return False
Fichier complet /trading/historiques/db_tick.py
:
# Imports externes
from peewee import Model, FloatField
import pandas as pd
# Imports internes
from trading.historiques.db import DataBase # Classe-parent.
from trading.historiques.db_renko import DbRenko
# noinspection DuplicatedCode
class DbTick(DataBase):
def __init__(self, o_ctrl):
super().__init__(o_ctrl=o_ctrl, b_ticks=True)
db = self.db
self.o_renko = DbRenko(o_ctrl)
class Ticks(Model):
timestamp = FloatField(primary_key=True) # Index==True et null==False par défaut => vide refusé.
Bid = FloatField()
Ask = FloatField()
class Meta:
database = db
order_by = 'timestamp'
""" Instanciations. """
self.o_table = Ticks
""" Création de la table 'ticks' dans le fichier tick.sql. """
self.db.create_tables([Ticks])
def get_db_sign(self):
""" self.o_table.select().count() est chronophage. Remplacé par la lecture du dernier timestamp. """
try:
last_ts = int(self.o_table.select().order_by(self.o_table.timestamp.desc()).get().timestamp)
except (Exception,):
last_ts = 0
return last_ts + self.o_renko.get_db_sign()
def week_exists(self, year, str_week, last_stamp):
""" - La table de ticks est testée. Les tables Renko sont également testées.
- Une seule table défaillante suffit pour renvoyer False. """
""" Samedi avant, samedi après. Le milieu de la semaine est '3 10' -> mercredi 10:00. """
first_stamp = last_stamp - 10_000
count = self.o_table.select().where(self.o_table.timestamp.between(first_stamp, last_stamp)).count()
""" Relation empirique entre la taille du fichier sur disque et le nombre attendu d'enregistrements. """
if count == 0:
return False
return self.o_renko.range_exists(*self._get_range_from_yw(year, str_week))
def df_to_table(self, df):
""" a) Écriture Ticks (SANS DOUBLONS) : Données de la DateFrame Pandas {df} dans la table de ticks.
b) Écriture Renko (df conforme) : Délégation à l'objet {o_renko}, en dernière ligne."""
""" 1 - Mise en conformité de la colonne date-time. """
df = self.change_datetime_column(df)
if df is None:
return False
""" 2 - df -> l_datas (~700_000 lignes). """
l_datas = df.to_dict(orient='records')
""" 3 - Découpage de l_datas en 50 batches (au minimum). """
len_datas = len(l_datas)
batch_size = max(1, min(50_000, len_datas // 50))
nb_steps = len_datas // batch_size
""" 4 - Boucle d'écritures avec jauge de progression. """
self.ut.gauge('Écriture dans la table de ticks')
with self.db.transaction():
for nb in range(1 + nb_steps):
""" {batch_size} enr. pour éviter l'erreur 'OperationalError: too many SQL variables' (si > 82_000) """
indx_min = nb * batch_size
indx_max = min(len_datas, indx_min + batch_size)
datas = l_datas[indx_min: indx_max] # Slice.
""" replace_many() préférable à insert_many() car remplace en cas de doublon, donc pas d'erreur. """
self.o_table.replace_many(datas).execute()
char = '▒' if nb < (nb_steps - 9) else ('▄' if nb < (nb_steps - 4) else '.')
self.ut.gauge(char=char)
self.ut.gauge('end')
""" 5 - Écriture dans les tables Renko. """
return self.o_renko.df_to_table(df)
def delete_week(self, *yw):
""" 1) Suppression de la semaine {yw} dans la table {tick}.
2) Suppression dans chacune des tables renko, déléguée à l'objet {o_renko}, en dernière ligne. """
range_stamps = self._get_range_from_yw(*yw)
self.o_table.delete().where(self.o_table.timestamp.between(*range_stamps)).execute()
self.o_renko.delete_week(*yw)
def get_df_stamps(self, *yw):
""" Filtrage de la table de ticks sur la semaine passée en paramètre, puis => df (pandas.DataFrame).
Le volume est le nombre de variations du tick : https://www.ig.com/fr/glossaire-trading/volume-definition
Valeur retournée : vecteur des stamps sur la semaine {yw}.
"""
range_stamps = self._get_range_from_yw(*yw)
query_ts = self.o_table.select(self.o_table.timestamp).where(
self.o_table.timestamp.between(*range_stamps)
).order_by(self.o_table.timestamp.asc())
return pd.DataFrame(list(query_ts.dicts()))
def slice_tick(self, pc_pos, nb_rows):
"""
@param pc_pos: pourcentage -> 0 à 100.
@param nb_rows: Nombre d'enregistrements demandés -> si > 0, enr après pc_pos ; si < 0, enr avant pc_pos.
@return: pandas DataFrame.
"""
first_ts = self.o_table.select().order_by(self.o_table.timestamp.asc()).get().timestamp
last_ts = self.o_table.select().order_by(self.o_table.timestamp.desc()).get().timestamp
delta = last_ts - first_ts
pos_ts = first_ts + (delta * pc_pos) / 100
if nb_rows > 0:
""" nb_rows enregistrements après pos_ts. timestamp croissant. """
l_datas = self.o_table.select().order_by(self.o_table.timestamp.asc()).where(
self.o_table.timestamp >= pos_ts).limit(nb_rows)
df = pd.DataFrame(list(l_datas.dicts()))
else:
""" nb_rows enregistrements avant pos_ts. timestamp croissant. """
l_datas = self.o_table.select().order_by(self.o_table.timestamp.desc()).where(
self.o_table.timestamp <= pos_ts).limit(-nb_rows)
df = pd.DataFrame(list(l_datas.dicts())[::-1])
return df
def slice_renko(self, mesh, pc_from, pc_to, nb_rows):
"""
@param mesh: Maille : 1, 2, 3, 4, 5, 7, 10, 14 ou 20
@param pc_from: en % : 0% = début de la table, 50% = milieu de la table.
@param pc_to: en % : 50% = milieu de la table, 100% = fin de la table.
@param nb_rows: nb_enr = abs(nb_rows). Si > 0, nb_enr depuis pc_from, si < 0, nb_enr jusqu'à pc_to.
@return: DataFrame avec timestamp en index.
"""
if mesh not in [1, 2, 3, 4, 5, 7, 10, 14, 20]:
raise SystemExit("La maille doit être 1, 2, 3, 4, 5, 7, 10, 14 ou 20")
o_table = self.get_otable(f'r{mesh}')
return self.slice_datas(o_table, pc_from, pc_to, nb_rows)
Fichier complet /trading/historiques/db_renko.py
:
# Imports externes
from peewee import Model, FloatField, IntegerField
import keyboard
# Imports internes
from trading.historiques.db import DataBase
# noinspection PyTypeChecker,PyProtectedMember,DuplicatedCode
class DbRenko(DataBase):
def __init__(self, o_ctrl):
super().__init__(o_ctrl=o_ctrl, b_ticks=None)
db = self.db
""" Classe mère (hérite de peewee.Model). """
class BaseRenko(Model):
# https://docs.peewee-orm.com/en/latest/peewee/models.html#field-types-table
timestamp = FloatField(primary_key=True) # Index==True et null==False par défaut => vide refusé.
Renko = IntegerField()
Spread = FloatField()
Volume = FloatField()
class Meta:
database = db
order_by = 'timestamp'
""" Classes dérivées (héritent de BaseRenko). """
class Renko1(BaseRenko):
maille = 1
class Renko2(BaseRenko):
maille = 2
class Renko3(BaseRenko):
maille = 3
class Renko4(BaseRenko):
maille = 4
class Renko5(BaseRenko):
maille = 5
class Renko7(BaseRenko):
maille = 7
class Renko10(BaseRenko):
maille = 10
class Renko14(BaseRenko):
maille = 14
class Renko20(BaseRenko):
maille = 20
""" Instanciations. """
self.d_tables = {'r1': Renko1, 'r2': Renko2, 'r3': Renko3, 'r4': Renko4, 'r5': Renko5,
'r7': Renko7, 'r10': Renko10, 'r14': Renko14, 'r20': Renko20}
self.o_table = Renko1
""" Création des tables dans le fichier renko.sql. """
with db:
self.db.create_tables(list(self.d_tables.values()))
def range_exists(self, stamp_before, stamp_after):
""" Les 5 premières tables sont testées. Une seule table défaillante suffit pour renvoyer False.
- En effet, pour les grandes mailles (> 7), il arrive qu'il n'y ait pas d'enr pendant plus d'une semaine. """
for o_table in list(self.d_tables.values())[:5]: # Les 5 premières tables.
count = o_table.select().where(o_table.timestamp.between(stamp_before, stamp_after)).count()
if count == 0:
return False
return True
def df_to_table(self, df):
""" Update Renko1. Code appelant : DbTick.df_to_table() qui a déjà mis en conformité {df}. """
""" 1 - Construction du buffer {l_datas} à partir de la DataFrame {df} pour alimenter la table Renko1. """
indx_ante, price_ante = 0, round((df.iloc[0, 1] + df.iloc[0, 2]) / (2 * self.pips))
l_datas = list()
gauge_step = df.shape[0] // 50
self.ut.gauge("Création du buffer pour Renko1")
for indx in df.index:
if keyboard.is_pressed(self.kb_break):
self.ut.printc("Arrêt manuel demandé.")
self.ut.gauge('end')
return False
price = (df.iloc[indx, 1] + df.iloc[indx, 2]) / (2 * self.pips)
no_duplicate_ts = .001
timestamp = df.timestamp[indx] - no_duplicate_ts # Anti-doublons.
while price >= (price_ante + self.o_table.maille) or price <= (price_ante - self.o_table.maille):
""" - En temps normal, cette boucle ne fait qu'un tour.
- Elle sera parcourue plusieurs fois lors de gaps importants, supérieurs à 2 pips. """
step = 1 if price > price_ante else -1
price_ante += step
spread = round((df.Ask[indx] - df.Bid[indx]) / self.pips, 4)
timestamp += no_duplicate_ts
row = (timestamp, price_ante, spread, indx - indx_ante)
l_datas.append(row)
indx_ante = indx
if indx % gauge_step == 0:
self.ut.gauge()
self.ut.gauge('end')
""" 2 - Découpage du buffer {l_datas} en 50 batches. """
len_datas = len(l_datas)
batch_size = max(1, min(50_000, len_datas // 50))
nb_steps = len_datas // batch_size
""" 3 - Boucle d'écritures avec jauge de progression. """
self.ut.gauge("Écriture buffer dans la table Renko1")
with self.db.transaction():
for nb in range(1 + nb_steps):
""" {batch_size} enr. pour éviter l'erreur 'OperationalError: too many SQL variables' (si > 82_000) """
indx_min = nb * batch_size
indx_max = min(len_datas, indx_min + batch_size)
datas = l_datas[indx_min: indx_max] # Slice.
""" replace_many() préférable à insert_many() car remplace en cas de doublon, donc pas d'erreur. """
self.o_table.replace_many(datas).execute()
char = '▒' if nb < (nb_steps - 9) else ('▄' if nb < (nb_steps - 4) else '.')
self.ut.gauge(char=char)
self.ut.gauge('end')
""" 4 - Tables dérivées de Renko1. """
self.ut.gauge('Tables dérivées de Renko1')
for o_renko_table in list(self.d_tables.values())[1:]: # Toutes les tables, sauf Renko1.
self._update_renko_table(l_datas, o_renko_table)
self.ut.gauge(char=f'{o_renko_table._meta.table_name.capitalize()} ▒')
self.ut.gauge('end')
return True
@staticmethod
def _update_renko_table(l_datas, o_table):
""" Une seule table : {o_table}. l_datas contient les rows de Renko1 : [timestamp, Renko, Spread, Volume]. """
if len(l_datas) == 0:
return
l_renkos = list()
price_ante = o_table.maille * (l_datas[0][1] // o_table.maille) # Multiple de o_table.maille.
volume = 0
for row in l_datas:
price = row[1]
volume += row[3]
if price <= price_ante - o_table.maille or price >= price_ante + o_table.maille:
row = (row[0], price, row[2], volume)
l_renkos.append(row)
volume = 0
price_ante = price
o_table.replace_many(l_renkos).execute()
Fichier complet /trading/historiques/db_candle.py
:
# Imports externes
from peewee import Model, FloatField, IntegerField
# Imports internes
from trading.historiques.db import DataBase
# noinspection PyTypeChecker, PyProtectedMember, DuplicatedCode
class DbCandle(DataBase):
def __init__(self, o_ctrl):
super().__init__(o_ctrl=o_ctrl, b_ticks=False)
db = self.db
""" Classe mère (hérite de peewee.Model). """
class BaseCandles(Model):
# https://docs.peewee-orm.com/en/latest/peewee/models.html#field-types-table
timestamp = IntegerField(primary_key=True) # Index==True et null==False par défaut => vide refusé.
BidOpen = FloatField()
BidHigh = FloatField()
BidLow = FloatField()
BidClose = FloatField()
AskOpen = FloatField()
AskHigh = FloatField()
AskLow = FloatField()
AskClose = FloatField()
Volume = IntegerField(null=True)
class Meta:
# db_table = 'xxxx' Si omis, le nom de la table sera automatique = nom de la classe en minuscules.
database = db
order_by = 'timestamp'
""" Classes dérivées (héritent de BaseCandles). """
class M1(BaseCandles):
""" Table principale. """
seconds = 60
class M5(BaseCandles):
seconds = 5 * 60
class M15(BaseCandles):
seconds = 15 * 60
class M30(BaseCandles):
seconds = 30 * 60
class H1(BaseCandles):
seconds = 60 * 60
class H4(BaseCandles):
seconds = 4 * 60 * 60
class Day(BaseCandles):
seconds = 24 * 60 * 60
class Week(BaseCandles):
seconds = 5 * 24 * 60 * 60 # Marché ouvert 5 jours par semaine.
""" Instanciations. """
self.d_tables = {'m1': M1, 'm5': M5, 'm15': M15, 'm30': M30, 'h1': H1, 'h4': H4, 'day': Day, 'week': Week}
self.o_table = M1
""" Création des tables dans le fichier candle.sql. """
with db:
self.db.create_tables(list(self.d_tables.values()))
def df_to_table(self, df):
""" Écriture SANS DOUBLONS des données de la DateFrame Pandas {df} dans la table principale M1. """
""" 1 - Mise en conformité de la colonne date-time. """
df = self.change_datetime_column(df)
if df is None:
return False
""" 2 - Écriture dans la table M1 : ~38sec pour 1_000_000 lignes.
replace_many() préférable à insert_many() car remplace en cas de doublon, donc pas d'erreur. """
l_datas = df.to_dict(orient='records') # <class 'list'> <-- ~3sec pour 1_000_000 lignes.
self.o_table.replace_many(l_datas).execute()
""" 3 - Écriture dans les tables dérivées : M5 à week. """
return self.update_derived(df)
def update_derived(self, df):
""" Tables dérivées de {M1}. """
stamp_before, stamp_after = self._get_range_from_df(df) # 2 samedis à 22:00.
self.ut.gauge('Tables dérivées de M1')
for o_derived_table in list(self.d_tables.values())[1:]: # Toutes les tables, sauf 'M1'
self.update_derived_table(df, stamp_before, stamp_after, o_derived_table)
self.ut.gauge(char=f'{o_derived_table._meta.table_name.upper()} ▒')
self.ut.gauge('end') # Passe à la ligne en fin de traitement.
self.ut.gauge('end') # Ligne vide supplémentaire.
return True
def update_derived_table(self, df, stamp_before, stamp_after, o_derived_table):
""" Une seule table. """
l_batches = list()
first_stamp, last_stamp, step = int(stamp_before + 86_400), int(stamp_after - 86_400), o_derived_table.seconds
last = ((last_stamp - first_stamp) // step) - 1
for i, since in enumerate(range(first_stamp, last_stamp, step)):
_from = stamp_before if i == 0 else since
_to = stamp_after if i == last else since + step
filtered_df = df[(df['timestamp'] >= _from) & (df['timestamp'] <= _to)]
if filtered_df['timestamp'].count() == 0:
continue
l_record = list()
l_record.append(filtered_df['timestamp'].iloc[-1]) # timestamp de fin.
l_record.append(filtered_df['BidOpen'].iloc[0]) # bidopen de début.
l_record.append(filtered_df['BidHigh'].max()) # valeur max des bidhigh.
l_record.append(filtered_df['BidLow'].min()) # valeur min des bidlow.
l_record.append(filtered_df['BidClose'].iloc[-1]) # bidclose de fin.
l_record.append(filtered_df['AskOpen'].iloc[0]) # askopen de début.
l_record.append(filtered_df['AskHigh'].max()) # valeur max ds askhigh.
l_record.append(filtered_df['AskLow'].min()) # valeur min des asklow.
l_record.append(filtered_df['AskClose'].iloc[-1]) # askclose de fin.
l_batches.append(l_record)
""" Écriture de l_batches (plusieurs enregistrements en une seule fois) dans la table o_table. """
with self.db.transaction():
o_derived_table.replace_many(l_batches).execute()
def week_exists(self, year, str_week, csv_size):
""" Toutes les tables sont testées. Une seule table défaillante suffit pour renvoyer False. """
""" Samedi avant, samedi après. Le milieu de la semaine est '3 10' -> mercredi 10:00. """
stamp = self.dt.get_dtstamp_from_dtstr(f'{year}-{str_week}-3 10', dt_format='%G-%V-%u %H') # Mercredi 10:00
stamp_before = stamp + self.dt.get_stamp_offset(stamp, b_after=False, num_day=6, _time='22:00')
stamp_after = stamp + self.dt.get_stamp_offset(stamp, b_after=True, num_day=6, _time='22:00')
for o_table in list(self.d_tables.values()): # Toutes les tables.
count = o_table.select().where(o_table.timestamp.between(stamp_before, stamp_after)).count()
""" Relation empirique entre la taille du fichier sur disque et le nombre attendu d'enregistrements. """
corr = .65 # Coefficient de correction.
nb_min = max(1, int(corr * csv_size / o_table.seconds))
if count < nb_min:
return False
return True
def update_volume(self, df_tick):
""" Update la colonne 'Volume' de chaque table, à partir de la dataframe des ticks. """
""" Limites de la semaine : samedi 22:00 à samedi 22:00. """
stamp_before, stamp_after = df_tick['timestamp'].iloc[0], df_tick['timestamp'].iloc[-1]
yw = self.dt.get_date_from_dtstamp((stamp_before + stamp_after)/2).isocalendar()[:2]
nb_enr = 0
for o_table in list(self.d_tables.values()):
nb_enr += o_table.select().where(o_table.timestamp.between(stamp_before, stamp_after)).count()
gauge_step = nb_enr // 50
indx = 0
self.ut.gauge(f"Volume dans les tables Candle ({yw[0]}, {yw[1]})") # >1514580360 >1514840400
for o_table in list(self.d_tables.values()):
batch = o_table.select().where(o_table.timestamp.between(stamp_before, stamp_after))
timestamp_ante = int(stamp_before)
l_batch = list()
for rec in batch:
indx += 1
if indx % gauge_step == 0:
nb = indx // gauge_step # 0 <= nb <= 50
char = '▒' if nb < 41 else ('▄' if nb < 46 else '.')
self.ut.gauge(char=char)
period = timestamp_ante + 1, rec.timestamp
vol = df_tick[df_tick['timestamp'].between(*period)].shape[0]
l_batch.append((rec.timestamp, rec.BidOpen, rec.BidHigh, rec.BidLow, rec.BidClose,
rec.AskOpen, rec.AskHigh, rec.AskLow, rec.AskClose, vol))
timestamp_ante = rec.timestamp
o_table.replace_many(l_batch).execute()
self.ut.gauge('end')
def slice_candle(self, period, pc_from, pc_to, nb_rows):
"""
@param period: 'm1', 'm5', 'm15', 'm30', 'h1', 'h4', 'day', 'week'
@param pc_from: en % : 0% = début de la table, 50% = milieu de la table.
@param pc_to: en % : 50% = milieu de la table, 100% = fin de la table.
@param nb_rows: nb_enr = abs(nb_rows). Si > 0, nb_enr depuis pc_from, si < 0, nb_enr jusqu'à pc_to.
@return: DataFrame avec timestamp en index.
"""
if period.lower() not in ['m1', 'm5', 'm15', 'm30', 'h1', 'h4', 'day', 'week']:
raise SystemExit("L'unité de temps doit être 'm1', 'm5', 'm15', 'm30', 'h1', 'h4', 'day' ou 'week'")
o_table = self.get_otable(period.lower())
return self.slice_datas(o_table, pc_from, pc_to, nb_rows)
simple_model.py
./functions/custom_ta
./functions/custom_ta/trend/best_zones.py
:
import numpy as np
import pandas as pd
def mark_peaks(np_vals, gap, bandwidth, l_indx):
if len(l_indx) < 2:
return
l_indx = [0] + l_indx
l_indx.sort()
v1, v2 = np_vals[l_indx[-1], 0], np_vals[l_indx[-2], 0]
if np_vals[l_indx[-2], 0] > np_vals[l_indx[-1], 0]:
""" Sommet : ouverture à la vente. """
for i in range(l_indx[-3]+1, l_indx[-1]):
threshold = v2 - (v2 - v1 - gap) * bandwidth / 100
if i < l_indx[-2]:
threshold = max(threshold, v2-gap)
elif i == l_indx[-2]:
continue
if np_vals[i, 0] >= threshold:
np_vals[i, 3] = 1
else:
""" Vallée : ouverture à l'achat. """
for i in range(l_indx[-3]+1, l_indx[-1]):
threshold = v2 + (v1 - v2 - gap) * bandwidth / 100
if i < l_indx[-2]:
threshold = min(threshold, v2+gap)
elif i == l_indx[-2]:
continue
if np_vals[i, 0] <= threshold:
np_vals[i, 3] = -1
def best_zones(close, gap, bandwidth):
""" Initialisation. """
len_close = close.shape[0]
np_vals = np.zeros((len_close, 4), dtype=np.float32) # 0=Close, 1=Up, 2=Down, 3=Peak
np_vals[:, 0] = close.values # 0 = Close
np_indx = last_indx_max = last_indx_min = 0
max_val = min_val = np_vals[0, 0] # Valeur du 1er Close.
step = 'ini'
b_stay = False
l_max, l_min = list(), list()
""" Grafcet. """
while True:
""" Compteur. """
if b_stay:
""" Le compteur n'avance pas, on reste sur le même {Close}. """
b_stay = False
else:
""" Avant que le compteur n'avance, on mémorise les palliers. """
np_vals[np_indx, 1] = max_val # Valeur du pallier 'Up'.
np_vals[np_indx, 2] = min_val # Valeur du pallier 'Down'.
np_indx += 1 # Avancement du compteur.
if np_indx >= len_close: # Fin.
break
val_now = np_vals[np_indx, 0] # Close.
if step == 'ini':
max_val = max(max_val, val_now)
min_val = min(min_val, val_now)
if val_now == max_val:
last_indx_max = np_indx
if max_val - min_val >= gap:
step = 'up'
elif val_now == min_val:
last_indx_min = np_indx
if max_val - min_val >= gap:
step = 'down'
elif step == 'up':
max_val = max(max_val, val_now)
if val_now == max_val: # Plafond
last_indx_max = np_indx
min_val = max_val - gap
if val_now <= min_val:
np_vals[last_indx_min, 3] = -2
l_min.append(last_indx_min)
mark_peaks(np_vals, gap, bandwidth, l_min+l_max)
b_stay = True # La prochaine boucle passera par 'down', mais avec le même {Close} (b_stay = True).
step = 'down'
elif step == 'down':
min_val = min(min_val, val_now)
if val_now == min_val:
last_indx_min = np_indx
max_val = min_val + gap
if val_now >= max_val:
np_vals[last_indx_max, 3] = 2
l_max.append(last_indx_max)
mark_peaks(np_vals, gap, bandwidth, l_min+l_max)
b_stay = True
step = 'up'
np_x = np_vals
l_scats = [(indx, np_x[indx, 0], 'top') for indx in l_max] + [(indx, np_x[indx, 0], 'bottom') for indx in l_min]
l_scats.sort()
df_scats = pd.DataFrame(columns=['indx', 'close', 'Principal'], data=l_scats)
return pd.DataFrame(np_vals[:, 1:], columns=['Up', 'Down', 'Peak']), df_scats
def best_zones_method(self, gap=12, bandwidth=80, **kwargs):
close = self._get_column(kwargs.pop("close", "close"))
result = best_zones(close=close, gap=gap, bandwidth=min(100, bandwidth))
return result
best_zones.__doc__ = """
Ceci est un faux indicateur :
=============================
- En effet, les signaux sont calculés en tenant compte du futur dans les historiques.
- Il ne faut donc pas s'en servir comme indicateur de trading.
- Toutefois, il peut être précieux pour la mise au point de stratégies.
Il affiche sur les courbes :
- Les zônes où l'on peut ouvrir un trade à l'achat.
- Les zônes où l'on peut ouvrir un trade à la vente.
- Les zônes où l'on ne doit pas ouvrir de trade.
Il est adapté pour la technique du trade-profit / stop-loss.
Calcul:
Paramètres :
close = Par défaut, série de la colonne CLOSE.
gap = Par défaut, 12. Valeur du take profit et du stop loss (Même valeur pour les deux).
bandwidth = Par défaut, 80. Pourcentage, valeur de 0 à 100.
Permet d'obtenir des zônes d'ouverture plus ou moins larges.
Valeur de retour : un tuple de 2 DataFrames.
DataFrame 0 : ['Close', 'Up', 'Down', 'Peak'], même nombre de lignes que la série d'entrée.
DataFrame 1 : ['indx', 'close', 'Principal'], petite taille, ne contient que les points nécessaires
pour afficher les scatters et les lignes zig-zag sur le signal d'entrée.
Le signal 'Peak' a pour valeurs -2, -1, 0, 1, 2 :
0 : Ne pas entrer dans le marché.
1, 2 : Ouverture à la vente.
-1, -2 : Ouverture à l'achat.
Activation automatique :
Dictionnaire {d_args} du script de stratégie : Commenter / Décommenter les clés de {best_zones}.
"""
PC
(Poste de contrôle) sur lequel nous avons travaillé jusqu'à présent :
PC
n'a pas à coder : il s'agit d'une programmation ludique, dessinée par graphes : nodes liés.PC
)best_zones.py
en est un exemple./trading/strategies/
simple_model.py
ne doit pas être modifié, hormis si vous désirez changer ses fonctionnalités ou corriger certains défauts./trading/strategies/tuto/str01.py
.Run 'str01'
.pre_params()
, post_params()
et hook_axis_anim()
.distrib()
.d_args
de __init__()
. pour obtenir ceci :d_args
modifié :
d_args = { # Ces valeurs seront à la racine du super-dictionnaire self.od.
'geometry': (100, 40, 1000, 700), # x, y, w, h. Par défaut : (100, 100, 1000, 600).
'margins': (6, 8, 10, 8), # Marges : haut, droite, bas, gauche.
'abscissa_size': 300, # Nb de points affichés en abscisse. Par défaut : 600.
'window_title': "Stratégie 01", # Titre de la fenêtre. Par défaut 'Stratégie'.
'figure_title': "2 moyennes et leur différence", # Titre des graphiques. Par défaut '' (vide).
'leader_lines': True, # Lignes hortogonales de repère, suivi de la souris.
'subplots': { # Noms et hauteurs (en %) des graphiques modifiables. Supprimer / Ajouter.
'Principal': 60, # Subplot principal : NE PAS MODIFER SON ORTHOGRAPHE.
# 'Milieu': '', # '' : Si chaîne vide => Les hauteurs seront automatiquement calculées et réparties.
# 'Milieu2': '',
'Bas': '',
},
'show_pilote': True, # False par défaut.
'show_volume': True, # False par défaut.
'show_linked_label': True, # False par défaut.
'best_zones': {
# 'show': True, # False par défaut.
'gap': 20, # 24 par défaut. Nombre de pips take-profit ou stop-loss.
'bandwidth': 80, # 80 par défaut. Pourcentage : de 0 à 100.
'up': True, # False par défaut : Affichage des palliers haut.
'down': True, # False par défaut : Affichage des palliers bas.
'scatters': True, # False par défaut : Affichage des optimums sous forme de ronds.
'zig_zag': True, # False par défaut : Affichage de la courbe zig-zag.
'colors': ('#ffff0030', '#ff00ff10'), # Coloriage des ouvertures. Commenter pour ne pas les afficher.
},
'animation': True, # False par défaut.
}
get_pilot()
.
self.df_pilot
.DataFrame Pandas
, dont les colonnes sont : ['timestamp', 'Open', 'High', 'Low', 'Close', 'Volume', 'Spread']
EUR/USD
. Choisir parmi ceux disponibles en base de données (Voir CtrlHistos
).pc_to=99, nb_rows=3_000
← 3 000 derniers points avant la position 99% de la table en base de données.add_indics()
:
def add_indics(self):
# print(pdt.ema.__doc__) # <-- Affiche la doc de l'indicateur.
self.df_pilot['EMA1'] = self.df_pilot.ta.ema(length=30)
self.df_pilot['EMA2'] = self.df_pilot.ta.ema(length=60)
self.df_pilot['Diff'] = self.df_pilot['EMA1'] - self.df_pilot['EMA2'] # Différence EMA1 - EMA2
Les colonnes 'EMA1', 'EMA2' et 'Diff' ont été ajoutées à self.df_pilot
.
distrib()
:
add_df()
:
def distrib(self):
self.add_df('Principal', ['EMA1', 'EMA2'])
self.add_df('Bas', ['Diff'])
Les noms EMA1, EMA2 et Diff sont ceux des colonnes (voir add_indics()
), 'Principal' et 'Bas' ceux des graphiques (voir __init__()
).
self.od
.ShowGeek.__init__()
:
Indicateurs personnalisés : self.custom_ta()
# Analyse technique et indicateurs personnalisés.
Choix du pilote → Tick, Renko ou Candle : self.get_df_pilot()
Ajout des indicateurs dans la dataframe {df_pilot} → Colonnes ajoutées à {df_pilot} : self.add_indics()
Ajout du signal pilote : self.add_pilot()
Paramètres avant la création des graphiques (des axes) : self.pre_params()
Distribution des signaux (des colonnes) → un dataframe par axe dans {self.l_ax}
: self.distrib()
Création des graphiques (des axes) : self.build_axis()
Paramètres après la création des graphiques (des axes) : self.post_params()
Affichage animé : self.show()
Le dictionnaire est chargé pendant les étapes 1 à 6.
Le dictionnaire est utilisé pendant les étapes 7 à 9.
Pour lire le dictionnaire, il suffit d'écrire ce code :
self.od.print()
Ce code doit être intercalé dans la séquence ci-dessus : entre 1 et 2, le dictionnaire est léger, entre 7 et 8 il est important.
Il contient des entiers, des booléens, des chaînes, mais aussi des tableaux, des objets matplotlib (les axes) et des DataFrames avec leurs milliers de données !
La méthode print()
de la classe Dictionary
n'a pas été prévue pour tous ces types de données.
Il en résulte, à partir du point 4 ci-dessus un affichage prohibitif et difficilement exploitable.
C'est pourquoi la méthode print()
de la classe Dictionary
doit être modifiée :
/functions/utils.py > Dictionary.print()
:
def print(self, dic=None, length=60, dash='-', depth=0, blank=2):
"""
- Méthode récursive (qui s'appelle elle-même), d'où la présence du depth.
- Elle affiche plusieurs lignes, chacune se terminant par "<class '...'>"
:param dic: Dictionnaire à afficher.
:param length: Nombre de caractères, ou, plus précisément, position pour chaque ligne du texte "<class '...'>"
:param dash: Symbole '-', '_', '.', ' ', ou tout autre caractère.
:param depth: Le 1er élément a un depth de 0. S'il contient à so tour un dictionnaire,
celui-ci aura un depth de 1, et ainsi de suite.
:param blank: 0, 1, 2 ou 3
0 = pas de ligne vide avant ni après l'affichage du dictionnaire.
1 = Une ligne vide avant l'affichage du dictionnaire.
2 = Une ligne vide après l'affichage du dictionnaire.
3 = Une ligne vide avant ET après l'affichage du dictionnaire (valeur par défaut).
:return: NA, le retour est en fait une succession de print() dans la console 'run'.
En cas d'erreur :
- Si dic vide, afficher 'Dictionnaire vide.'
- Si dic n'est pas un dictionnaire, afficher :
f"Vérifie dico : Le paramètre reçu n'est pas un dictionnaire.\nType : {type(dic)}\nValeur : {dic}"
"""
if dic is None:
dic = self
if len(dic) == 0:
print('Dictionnaire vide.')
return
if blank & 1: # Bit 0 : Espace avant (ligne vide)
print()
try:
for key, val in dic.items(): # Attention ! key peut être de type autre que str.
data = f'DataFrame {val.shape}' if val.__class__.__name__ in ['DataFrame', 'Series'] else val
if isinstance(val, dict):
print(self.dashes(f"{' ' * 4 * depth}{key}:", length, type(val), dash))
depth += 1
self.print(data, length, dash, depth, blank=0)
depth -= 1
else:
print(self.dashes(f"{' ' * 4 * depth}{key}: {data}", length, type(val), dash))
except (Exception,):
print(f"Vérifie dico : Le paramètre reçu n'est pas un dictionnaire."
f"\nType : {type(dic)}\nValeur : {dic}")
return
if blank & 2: # Bit 1 : Espace après (ligne vide)
print()
Voici l'examen du dictionnaire complet (donc entre les point 8 et 9), pour la stratégie str01
:
read()
et write()
doivent également être modifiées, afin de pouvoir utiliser des entiers ou des chaînes comme clés.
/functions/utils.py > Dictionary.read() et write()
:
def read(self, l_keys, default=None):
"""
:param l_keys: Si c'est une liste, elle correspond à la hiérarchie dans l'arbre.
:param default: Valeur par défaut à retourner lorsque la recherche échoue.
:return: Valeur de la dernière clé de la liste -> sans enfant (feuille).
"""
if isinstance(l_keys, (str, int)):
l_keys = l_keys, # str devient tuple
dic = self
try:
for l_key in l_keys[:-1]:
l_key = str(l_key)
dic = dic[l_key]
return dic[str(l_keys[-1])]
except (Exception,):
return default
def write(self, l_keys, value):
"""
Important : Si la clé n'existe pas, elle est créée.
:param l_keys: Si c'est une liste, elle correspond à la hiérarchie dans l'arbre.
:param value: Valeur affectée à la dernière clé de la liste -> sans enfant (feuille).
:return: Booléen de réussite.
"""
if isinstance(l_keys, (str, int)):
l_keys = l_keys, # str devient tuple
dic = self
try:
for l_key in l_keys[:-1]:
l_key = str(l_key)
if l_key not in dic:
dic[l_key] = dict() # Création de la clé si inexistante.
dic = dic[l_key]
dic[str(l_keys[-1])] = value
self._sort_cut(l_keys[0])
return True
except (Exception,):
return False
/trading/strategies/tuto/str02.py
.get_pilot()
: elles seront restaurées si nécessaire.str02.py > Geek.add_indics() et distrib()
:
def add_indics(self):
self.df_pilot['KAMA'] = self.df_pilot.ta.kama(length=40)
def distrib(self):
self.add_df('Principal', ['KAMA'])
KAMA
présente un défaut : il se stabilise au bout de plusieurs périodes.kama.py
et y copier-coller le code suivant./functions/custom_ta/overlap/kama.py
:
# -*- coding: utf-8 -*-
import numpy as np
from pandas import Series
from pandas_ta.utils import get_drift, get_offset, non_zero_range, verify_series
def kama(close, length=None, fast=None, slow=None, drift=None, offset=None, **kwargs):
"""Indicator: Kaufman's Adaptive Moving Average (KAMA)"""
# Validate Arguments
length = int(length) if length and length > 0 else 10
fast = int(fast) if fast and fast > 0 else 2
slow = int(slow) if slow and slow > 0 else 30
close = verify_series(close, max(fast, slow, length))
drift = get_drift(drift)
offset = get_offset(offset)
if close is None: return
# Calculate Result
def weight(length: int) -> float:
return 2 / (length + 1)
fr = weight(fast)
sr = weight(slow)
abs_diff = non_zero_range(close, close.shift(length)).abs()
peer_diff = non_zero_range(close, close.shift(drift)).abs()
peer_diff_sum = peer_diff.rolling(length).sum()
er = abs_diff / peer_diff_sum
x = er * (fr - sr) + sr
sc = x * x
m = close.size
result = [np.nan for _ in range(0, length - 1)] + [0]
for i in range(length, m):
result.append(sc.iloc[i] * close.iloc[i] + (1 - sc.iloc[i]) * result[i - 1])
kama = Series(result, index=close.index)
# Offset
if offset != 0:
kama = kama.shift(offset)
# Handle fills
if "fillna" in kwargs:
kama.fillna(kwargs["fillna"], inplace=True)
if "fill_method" in kwargs:
kama.fillna(method=kwargs["fill_method"], inplace=True)
# Name & Category
kama.name = f"KAMA_{length}_{fast}_{slow}"
kama.category = "overlap"
""" Modification WL : Suppression des premières valeurs du rapprochement asymptotique. """
l_indx = np.where(kama < close)[0]
for i in range(length, len(l_indx)):
indx = l_indx[i]
if indx > l_indx[i - 1] + 1:
kama.loc[0: indx-3] = np.nan
break
return kama
# Pour inhiber cet indicateur surchargé, renommer l'extension de ce fichier, par exemple .py --> .pyx.
def kama_method(self, length=None, fast=None, slow=None, offset=None, **kwargs):
close = self._get_column(kwargs.pop("close", "close"))
result = kama(close=close, length=length, fast=fast, slow=slow, offset=offset, **kwargs)
return self._post_process(result, **kwargs)
kama.__doc__ = \
"""Kaufman's Adaptive Moving Average (KAMA)
Developed by Perry Kaufman, Kaufman's Adaptive Moving Average (KAMA) is a moving average
designed to account for market noise or volatility. KAMA will closely follow prices when
the price swings are relatively small and the noise is low. KAMA will adjust when the
price swings widen and follow prices from a greater distance. This trend-following indicator
can be used to identify the overall trend, time turning points and filter price movements.
Sources:
https://stockcharts.com/school/doku.php?id=chart_school:technical_indicators:kaufman_s_adaptive_moving_average
https://www.tradingview.com/script/wZGOIz9r-REPOST-Indicators-3-Different-Adaptive-Moving-Averages/
Calculation:
Default Inputs:
length=10
Args:
close (pd.Series): Series of 'close's
length (int): It's period. Default: 10
fast (int): Fast MA period. Default: 2
slow (int): Slow MA period. Default: 30
drift (int): The difference period. Default: 1
offset (int): How many periods to offset the result. Default: 0
Kwargs:
fillna (value, optional): pd.DataFrame.fillna(value)
fill_method (value, optional): Type of fill method
Returns:
pd.Series: New feature generated.
"""
ADX
:
self.df_pilot
.str02.py > Geek.add_indics() et distrib()
:
def add_indics(self):
print(pdt.adx.__doc__) # <-- Affiche la doc de l'indicateur.
self.df_pilot['KAMA'] = self.df_pilot.ta.kama(length=40)
self.df_pilot[['ADX', 'DMP', 'DMN']] = self.df_pilot.ta.adx(length=40)
def distrib(self):
self.add_df('Principal', ['KAMA'])
self.add_df('Milieu', ['ADX'])
self.add_df('Bas', ['DMP', 'DMN'])
d_args
pour alléger l'affichage./trading/strategies/tuto/str02.py
:
# Imports externes
import pandas_ta as pdt # Utilisé seulement pour lire la doc : print(pdt.macd.__doc__)
# Imports internes
from show.show_geek import ShowGeek
from trading.historiques.ctrl_histos import CtrlHistos
class Geek(ShowGeek):
def __init__(self):
d_args = { # Ces valeurs seront à la racine du super-dictionnaire self.od.
'geometry': (700, 40, 1000, 700), # x, y, w, h. Par défaut : (100, 100, 1000, 600).
'margins': (6, 8, 10, 8), # Marges : haut, droite, bas, gauche.
'abscissa_size': 300, # Nb de points affichés en abscisse. Par défaut : 600.
'window_title': "Str 02", # Titre de la fenêtre. Par défaut 'Stratégie'.
'figure_title': "KAMA + ADX : Points intéressants.", # Titre des graphiques. Par défaut '' (vide).
'leader_lines': True, # Lignes hortogonales de repère, suivi de la souris.
'subplots': { # Noms et hauteurs (en %) des graphiques modifiables. Supprimer / Ajouter.
'Principal': 40, # Subplot principal : NE PAS MODIFER SON ORTHOGRAPHE.
'Milieu': '', # '' : Si chaîne vide => Les hauteurs seront automatiquement calculées et réparties.
# 'Milieu2': '',
'Bas': '',
},
'show_pilote': True, # False par défaut.
'show_volume': True, # False par défaut.
'show_linked_label': True, # False par défaut.
'best_zones': {
'show': True, # False par défaut.
'gap': 20, # 24 par défaut. Nombre de pips take-profit ou stop-loss.
'bandwidth': 80, # 80 par défaut. Pourcentage : de 0 à 100.
# 'up': True, # False par défaut : Affichage des palliers haut.
# 'down': True, # False par défaut : Affichage des palliers bas.
# 'scatters': True, # False par défaut : Affichage des optimums sous forme de ronds.
# 'zig_zag': True, # False par défaut : Affichage de la courbe zig-zag.
'colors': ('#ffff0030', '#ff00ff10'), # Coloriage des ouvertures. Commenter pour ne pas les afficher.
},
'animation': True, # False par défaut.
# Paramètres généraux supplémentaires ici ...
# Paramètres seaborn : https://www.python-simple.com/python-seaborn/seaborn-general.php
}
super().__init__(**d_args)
def get_pilot(self):
""" Choix du signal-historique-pilote et des options. """
""" Options : Commenter / Décommenter. """
""" Récupération du signal pilote (normalisé) à partir de la db. """
instrument, table = 'EUR/USD', 7 # table=int => Renko.
# instrument, table = 'EUR/USD', 'H1' # table=str => Candle.
# instrument, table = 'EUR/USD', None # table=None => Tick.
return instrument, *CtrlHistos(instrument).get_pilot(table=table, pc_to=99, nb_rows=3_000) # DataFrame
def add_indics(self):
# print(pdt.adx.__doc__) # <-- Affiche la doc de l'indicateur.
self.df_pilot['KAMA'] = self.df_pilot.ta.kama(length=40)
self.df_pilot[['ADX', 'DMP', 'DMN']] = self.df_pilot.ta.adx(length=40)
def distrib(self):
""" Vérification : les graphiques doivent exister dans __init__(), les signaux dans add_indics(). """
self.add_df('Principal', ['KAMA'])
self.add_df('Milieu', ['ADX'])
self.add_df('Bas', ['DMP', 'DMN'])
def pre_params(self):
super().pre_params()
self.od.write(['Bas', 'legend'], 'cm') # Légende épinglée au milieu.
def post_params(self):
""" Surcharge ou ajout au code post_params() de la classe-parent. """
super().post_params()
""" Exemple de tracé de lignes - https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.axhline.html """
self.trace_hline('Milieu', 7, lw=.4, ls='-.', c='r') # Trait rouge à 7 sur l'ADX.
self.trace_hline('Milieu', 10, lw=.4, ls='-.', c='g') # Trait vert à 11 sur l'ADX.
self.trace_hline('Bas', 42, lw=.4, ls='-.', c='b') # Trait bleu à 44 sur les DM.
self.trace_hline('Bas', 58, lw=.4, ls='-.', c='b') # Trait bleu à 56 sur les DM.
if __name__ == '__main__':
Geek()
Sommets bleus : Vendre - Sommets orange : Acheter. Placer stop loss = take profit = 60 pips => 5 trades gagnants.
Conditions : ADX > 7 et DMP ou DMN > 58
/trading/strategies/tuto/str03.py
:
# Imports externes
import pandas_ta as pdt # Utilisé seulement pour lire la doc : print(pdt.macd.__doc__)
# Imports internes
from show.show_geek import ShowGeek
from trading.historiques.ctrl_histos import CtrlHistos
class Geek(ShowGeek):
def __init__(self):
d_args = { # Ces valeurs seront à la racine du super-dictionnaire self.od.
'geometry': (100, 40, 1000, 700), # x, y, w, h. Par défaut : (100, 100, 1000, 600).
'margins': (6, 8, 10, 8), # Marges : haut, droite, bas, gauche.
'abscissa_size': 300, # Nb de points affichés en abscisse. Par défaut : 600.
'window_title': "Str03", # Titre de la fenêtre. Par défaut 'Stratégie'.
'figure_title': "Coloriage inter-zônes.", # Titre des graphiques. Par défaut '' (vide).
'leader_lines': True, # Lignes hortogonales de repère, suivi de la souris.
'subplots': { # Noms et hauteurs (en %) des graphiques modifiables. Supprimer / Ajouter.
'Principal': 45, # Subplot principal : NE PAS MODIFER SON ORTHOGRAPHE.
'Milieu': '', # '' : Si chaîne vide => Les hauteurs seront automatiquement calculées et réparties.
# 'Milieu2': '',
'Bas': '',
},
'show_pilote': True, # False par défaut.
'show_volume': True, # False par défaut.
'show_linked_label': True, # False par défaut.
'best_zones': {
# 'show': True, # False par défaut.
'gap': 20, # 24 par défaut. Nombre de pips take-profit ou stop-loss.
'bandwidth': 80, # 80 par défaut. Pourcentage : de 0 à 100.
'up': True, # False par défaut : Affichage des palliers haut.
'down': True, # False par défaut : Affichage des palliers bas.
'scatters': True, # False par défaut : Affichage des optimums sous forme de ronds.
'zig_zag': True, # False par défaut : Affichage de la courbe zig-zag.
'colors': ('#ffff0030', '#ff00ff10'), # Coloriage des ouvertures. Commenter pour ne pas les afficher.
},
'animation': True, # False par défaut.
# Paramètres généraux supplémentaires ici ...
# Paramètres seaborn : https://www.python-simple.com/python-seaborn/seaborn-general.php
}
super().__init__(**d_args)
def get_pilot(self):
""" - Récupération du signal pilote à partir de la db.
- return tuple(instrument, table_name, self.df_pilot). """
instrument, table = 'EUR/USD', 7 # table=int => Renko.
# instrument, table = 'EUR/USD', 'H1' # table=str => Candle.
# instrument, table = 'EUR/USD', None # table=None => Tick.
return instrument, *CtrlHistos(instrument).get_pilot(table=table, pc_to=99, nb_rows=3_000) # DataFrame
def add_indics(self):
""" Méthode utilisée en mode GA (sans affichage) et en mode normal. le format de d_params est imposé par GA. """
""" Indicateurs en D:/anaconda/envs/robot/Lib/site-packages/pandas_ta <-- Ne pas les modifier !!!
Doc indicateurs : https://github.com/twopirllc/pandas-ta#indicators-by-category
Génération de signaux dérivés : ajout de colonnes au dataframe {df_pilot}. """
# print(pdt.stochrsi.__doc__) <-- Affiche la doc du stochrsi.
x = self.df_pilot.ta.stochrsi()
print(x.tail())
if __name__ == '__main__':
Geek()
x
est affichée :STOCHRSIk_14_14_3_3 STOCHRSId_14_14_3_3 2995 67.250453 74.986721 2996 76.282414 72.360708 2997 85.386116 76.306328 2998 91.684921 84.451150 2999 81.506622 86.192553
x.info()
: Infos diverses sur le DataFrame x
.x.head()
: Les premiers enregistrements.x.tail()
: Les derniers enregistrements.STOCHRSIk_14_14_3_3
et STOCHRSId_14_14_3_3
..iloc[]
:iloc[]
, le MACD
et les zônes colorées./trading/strategies/tuto/str03.py
:
# Imports externes
import pandas_ta as pdt # Utilisé seulement pour lire la doc : print(pdt.macd.__doc__)
# Imports internes
from show.show_geek import ShowGeek
from trading.historiques.ctrl_histos import CtrlHistos
class Geek(ShowGeek):
def __init__(self):
d_args = { # Ces valeurs seront à la racine du super-dictionnaire self.od.
'geometry': (100, 40, 1000, 700), # x, y, w, h. Par défaut : (100, 100, 1000, 600).
'margins': (6, 8, 10, 8), # Marges : haut, droite, bas, gauche.
'abscissa_size': 300, # Nb de points affichés en abscisse. Par défaut : 600.
'window_title': "Str03", # Titre de la fenêtre. Par défaut 'Stratégie'.
'figure_title': "Coloriage inter-zônes.", # Titre des graphiques. Par défaut '' (vide).
'leader_lines': True, # Lignes hortogonales de repère, suivi de la souris.
'subplots': { # Noms et hauteurs (en %) des graphiques modifiables. Supprimer / Ajouter.
'Principal': 45, # Subplot principal : NE PAS MODIFER SON ORTHOGRAPHE.
'Milieu': '', # '' : Si chaîne vide => Les hauteurs seront automatiquement calculées et réparties.
'Bas': '',
},
'show_pilote': True, # False par défaut.
'show_volume': True, # False par défaut.
'show_linked_label': True, # False par défaut.
'best_zones': {
'show': True, # False par défaut.
'gap': 20, # 24 par défaut. Nombre de pips take-profit ou stop-loss.
'bandwidth': 80, # 80 par défaut. Pourcentage : de 0 à 100.
# 'up': True, # False par défaut : Affichage des palliers haut.
# 'down': True, # False par défaut : Affichage des palliers bas.
'scatters': True, # False par défaut : Affichage des optimums sous forme de ronds.
# 'zig_zag': True, # False par défaut : Affichage de la courbe zig-zag.
'colors': ('#ffff0030', '#ff00ff10'), # Coloriage des ouvertures. Commenter pour ne pas les afficher.
},
'animation': True, # False par défaut.
# Paramètres généraux supplémentaires ici ...
# Paramètres seaborn : https://www.python-simple.com/python-seaborn/seaborn-general.php
}
super().__init__(**d_args)
def get_pilot(self):
""" - Récupération du signal pilote à partir de la db.
- return tuple(instrument, table_name, self.df_pilot). """
instrument, table = 'EUR/USD', 7 # table=int => Renko.
# instrument, table = 'EUR/USD', 'H1' # table=str => Candle.
# instrument, table = 'EUR/USD', None # table=None => Tick.
return instrument, *CtrlHistos(instrument).get_pilot(table=table, pc_to=99, nb_rows=3_000) # DataFrame
def add_indics(self):
""" Méthode utilisée en mode GA (sans affichage) et en mode normal. le format de d_params est imposé par GA. """
""" Indicateurs en D:/anaconda/envs/robot/Lib/site-packages/pandas_ta <-- Ne pas les modifier !!!
Doc indicateurs : https://github.com/twopirllc/pandas-ta#indicators-by-category
Génération de signaux dérivés : ajout de colonnes au dataframe {df_pilot}. """
# print(pdt.stochrsi.__doc__) <-- Affiche la doc du stochrsi.
self.df_pilot[['MACD', 'HISTO', 'SIGNAL']] = self.df_pilot.ta.macd()
self.df_pilot['StochRSI'] = self.df_pilot.ta.stochrsi().iloc[:, 0]
def distrib(self):
""" Répartition des signaux entre les différents graphiques (axes). """
""" - Les noms des axes (graphiques) doivent correspondre avec ceux déclarés dans l'ini ('subplots').
- Ceux des signaux (indicateurs, ...) doivent correspondre avec ceux déclarés dans add_indics(). """
self.add_df('Milieu', ['MACD', 'HISTO', 'SIGNAL'])
self.add_df('Bas', ['StochRSI'])
def hook_axis_anim(self, axis_name, x, o_ax, df, y_min, y_max):
""" Surcharge ou ajout au code hook_axis_anim() de la classe-parent. """
super().hook_axis_anim(axis_name, x, o_ax, df, y_min, y_max)
""" Coloriage inter-zônes : https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.fill_between.html """
_from, _to = x[0], x[0] + len(x)
if axis_name == 'Milieu':
macd = df['MACD'][_from: _to]
signal = df['SIGNAL'][_from: _to]
l_args = [
dict(x=x, y1=macd, y2=signal, where=macd > signal, fc='#0aa2', interpolate=True),
dict(x=x, y1=macd, y2=signal, where=macd < signal, fc='#a0a2', interpolate=True)
]
self.fill_between(o_ax, l_args)
elif axis_name == 'Bas':
rsi = df['StochRSI'][_from: _to]
l_args = [
dict(x=x, y1=rsi, y2=80, where=rsi > 80, fc='#0aa2', interpolate=True),
dict(x=x, y1=20, y2=rsi, where=rsi < 20, fc='#a0a2', interpolate=True)
]
self.fill_between(o_ax, l_args)
if __name__ == '__main__':
Geek()
Zônes colorées conditionnellement avec where
(voir code).
kama.py
).Bonjour les codeurs !