#!/usr/bin/python3 # vim: set fileencoding=utf-8 : from datetime import timedelta from itertools import product import os import sys import pandas as pd import numpy as np import matplotlib.pyplot as plt import matplotlib.dates as mdates import matplotlib.cbook as cbook from matplotlib.animation import FuncAnimation from numpy.polynomial import Chebyshev from neuralnetwork import Network if len(sys.argv) > 1 and sys.argv[1] == 'save': SAVE = True else: SAVE = False class Graphs: """ The problem processes in this course """ def __init__(self, file_name, save=False): data = pd.read_csv(file_name, sep=',', header=0) self._times = pd.DatetimeIndex(data.Date) self._times_rel = (self._times - self._times.min()).total_seconds() self.dpi = 120 self.linear = np.linspace(-1, 1, 365 * 24) self.normalized = (data.Close - data.Close.min()) / (data.Close.max() - data.Close.min()) self.results = None self.save = save self.times = 2 * (self._times_rel - self._times_rel.min()) / (self._times_rel.max() - self._times_rel.min()) - 1 self.values = data.Close.values # random selection of data temp = np.arange(len(self.times)) np.random.seed(123456789) np.random.shuffle(temp) # store selection self.selection = np.zeros(len(self.times), dtype=bool) self.selection[temp[:50]] = True self.validation = np.zeros(len(self.times), dtype=bool) self.validation[temp[50:80]] = True def time(self, data): return pd.to_datetime(((0.5 + data / 2) * (self._times.max().value - self._times.min().value) + self._times.min().value).astype(int)) def get_axis(self, values, title): years = mdates.YearLocator() # every year months = mdates.MonthLocator() # every month years_fmt = mdates.DateFormatter('%m/%Y') fig, ax = plt.subplots() ax.format_xdata = mdates.DateFormatter('%Y-%m-%d') ax.grid(True) # ax.set_xlim(np.datetime64('2007-12-31'), np.datetime64('2009-01-01')) ax.set_xlim(self._times.min() - timedelta(7), self._times.max() + timedelta(7)) ax.set_title(title) ax.set_ylabel("German DAX (closing values)") ax.xaxis.set_major_formatter(years_fmt) ax.xaxis.set_major_locator(months) return fig, ax def save_fig(self, name, fig, ax, animation=None): ax.legend(loc=1) fig.autofmt_xdate() if self.save: if animation is None: fig.savefig(f'{name}', dpi=self.dpi) else: split = name.rsplit('.')[0] animation.save(f'{split}.gif', dpi=self.dpi, writer='imagemagick') os.system(f'gif2apng {split}.gif {split}.png') else: plt.show() def draw_initial(self, name): fig, ax = self.get_axis(self.values, "Can you find a function to describe these values?") ax.plot( self.time(self.times[self.selection]), self.values[self.selection], marker='.', color='black', lw=0, label="data", ) ax.set_ylim(self.values.min(), self.values.max()) self.save_fig(name, fig, ax) plt.close() def draw_activations(self, name, derivatives=False): fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2) fig.subplots_adjust(wspace=0.25, hspace=0.32, right=0.98, top=0.98) for ax in [ax1, ax2, ax3, ax4]: ax.grid(True) ax.set_xlabel("$x$") ax.set_xlim(-3, 3) x = np.linspace(-3, 3, 100) ax1.set_ylabel("sigmoid") ax2.set_ylabel("ReLu") ax3.set_ylabel("tanh") ax4.set_ylabel("Leaky ReLu") if derivatives: ax1.plot(x, Network.derivative_sigmoid(x), 'b') ax2.plot(x, Network.derivative_relu(x), 'b') ax3.plot(x, Network.derivative_tanh(x), 'b') ax4.plot(x, Network.derivative_leakyrelu(x), 'b') else: ax1.plot(x, Network.activation_sigmoid(x), 'b') ax2.plot(x, Network.activation_relu(x), 'b') ax3.plot(x, Network.activation_tanh(x), 'b') ax4.plot(x, Network.activation_leakyrelu(x), 'b') if SAVE: fig.savefig(f'{name}', dpi=self.dpi) else: plt.show() plt.close() def draw_poly(self, name, cost=False, highlight=False, show_data=False): fig, ax = self.get_axis(self.values, "N-th polynom fit on stock-index values") line, = ax.plot(self.time(self.linear), 0 * self.linear, label="Polynom") if highlight: ax.axvspan(np.datetime64('2008-09-01'), np.datetime64('2008-09-30'), alpha=0.2, color='yellow') TIMES = [35, 36, 37, 38, 39, 40] else: TIMES = [0, 1, 2, 3, 4, 5, 10, 15, 20, 25, 30, 35, 40, 45] ax.plot( self.time(self.times[self.selection]), self.values[self.selection], marker='.', color='black', lw=0, label="data", ) ax.set_ylim(self.values.min(), self.values.max()) if show_data: ax.plot(self._times, self.values, '-', lw=1, color="#999999", label="All data") anim = FuncAnimation(fig, self.update_poly, frames=list((time, cost) for time in TIMES), interval=2000) self.graph_data = ax, line self.save_fig(name, fig, ax, anim) plt.close() def update_poly(self, frame): ax, line = self.graph_data terms, has_costs = frame estimate = Chebyshev.fit(self.times[self.selection], self.values[self.selection], deg=terms) line.set_ydata(estimate(self.linear)) if has_costs: costs = np.average(np.abs(estimate(self.times[self.selection]) - self.values[self.selection])) ax.set_xlabel(f"N={terms}; Costs={costs:.2f} (avg. abs. differences)") else: ax.set_xlabel(f"N={terms}") return line, ax def draw_nn(self, name): fig, ax = self.get_axis(self.normalized, "NN fit on normalized stock-index values") ax.plot(self._times, self.normalized, '-', lw=1, color="#999999", label="All data") ax.set_ylabel("Normalized German DAX (closing values)") ax.plot( self.time(self.times[self.selection]), self.normalized[self.selection], marker='.', color='black', lw=0, label="data", ) ax.set_ylim(self.normalized.min(), self.normalized.max()) epochs = 500000 for x in [1, 10, 30]: print("calculation of NN", x) # noqa nn = Network([1, x, 1], activations=["sigmoid", "linear"], seed=1517541860) nn.train(self.times[self.selection], self.normalized[self.selection], epochs=epochs, learning_rate=0.75) a = nn.forward_propagation(self.linear).flatten() ax.plot(self.time(self.linear), a, label=f"NN[1,{x},1]") ax.set_xlabel(f"Epoch={epochs}") self.save_fig(name, fig, ax) plt.close() def update_nn(self, frame): ax, line, nn, x = self.graph_data nn.train(self.times[self.selection], self.normalized[self.selection], epochs=frame, learning_rate=0.2) a = nn.forward_propagation(self.linear).flatten() line.set_ydata(a) self.results[x] = a ax.set_xlabel(f"Epoch={nn.epochs}") return line, ax def draw_optimized_nn(self, name): self.results = {} for x in [6, 12]: fig, ax = self.get_axis(self.normalized, f"NN with good initial conditions") line, = ax.plot(self.time(self.linear), 0 * self.linear, label=f"NN [1,{x},1]") ax.plot(self._times, self.normalized, '-', lw=1, color="#999999", label="All data") ax.set_ylabel("Normalized German DAX (closing values)") ax.plot( self.time(self.times[self.selection]), self.normalized[self.selection], marker='.', color='black', lw=0, label="data", ) ax.set_ylim(self.normalized.min(), self.normalized.max()) print("calculation of opt NN", x) # noqa nn = Network([1, x, 1], activations=["sigmoid", "linear"]) # set initial weights an biases nn.weights[0] = np.ones(nn.weights[0].shape) * 10 * x nn.biases[0] = np.linspace(-1, 1, x + 2)[1:-1].reshape(x, 1) * 10 * x nn.weights[1] = np.ones(nn.weights[1].shape) / x nn.biases[1] = np.array([[0]]) anim = FuncAnimation(fig, self.update_nn, frames=[0, 100, 900, 9000, 90000, 100000, 100000, 100000, 100000], interval=2500) self.graph_data = ax, line, nn, x ax.set_xlabel(f"Date") self.save_fig(name.format(x), fig, ax, anim) plt.close() fig, ax = self.get_axis(self.normalized, "Comparison of NN with different sizes") ax.plot(self._times, self.normalized, '-', lw=1, color="#999999", label="All data") ax.set_ylabel("Normalized German DAX (closing values)") ax.plot( self.time(self.times[self.selection]), self.normalized[self.selection], marker='.', color='black', lw=0, label="data", ) ax.set_ylim(self.normalized.min(), self.normalized.max()) for x, data in self.results.items(): ax.plot(self.time(self.linear), data, label=f"NN[1,{x},1]") ax.set_xlabel(f"Final optimization step") self.save_fig(name.format("all"), fig, ax) plt.close() def draw_neural_net(ax, layers, margin=0.1): ''' Draw a neural network using matplotilb. ''' vspace = (1 - 2 * margin) / (max(layers) - 1) hspace = (1 - 2 * margin) / (len(layers) - 1) neuron_size = min([0.06, vspace / 3.5]) # Neurons for i, size in enumerate(layers): top = vspace * (size - 1) / 2. + 0.5 for j in range(size): circle = plt.Circle( (i * hspace + margin, top - j * vspace), neuron_size, color='w', ec='k', zorder=3, ) ax.add_artist(circle) # Weights for i, (size_s, size_e) in enumerate(zip(layers[:-1], layers[1:])): top_s = vspace * (size_s - 1) / 2. + 0.5 top_e = vspace * (size_e - 1) / 2. + 0.5 for j, k in product(range(size_s), range(size_e)): line = plt.Line2D( [i * hspace + margin, (i + 1) * hspace + margin], [top_s - j * vspace, top_e - k * vspace], c='#555555', lw=0.5, ) ax.add_artist(line) def main(): graph = Graphs("dax.csv", save=SAVE) graph.draw_activations("activations.png") graph.draw_activations("derivatives.png", derivatives=True) graph.draw_optimized_nn("nn_opt_{}.png") graph.draw_initial("initial.png") graph.draw_poly("poly_cost.png", cost=True, highlight=False, show_data=False) graph.draw_poly("poly_data.png", cost=True, highlight=False, show_data=True) graph.draw_poly("poly_highlight.png", cost=True, highlight=True, show_data=True) graph.draw_nn("nn_layersize.png") for x in ([1,5,1], [3,5,5,2]): fig = plt.figure(figsize=(12, 12)) ax = fig.gca() ax.axis('off') draw_neural_net(ax, x) name = "nn_%s.png" % ''.join(map(str, x)) fig.savefig(f'{name}', dpi=600) os.system(f'convert -trim -scale 768 {name} {name}') if __name__ == '__main__': main()