Commit f23b2412 authored by Philippe SWARTVAGHER's avatar Philippe SWARTVAGHER
Browse files

plot: move fitting functions to a dedicated file

parent 2300042b
import abc
from enum import Enum
import functools
import inspect
import math
import matplotlib.pyplot as plt
import numpy as np
import re
from scipy.optimize import curve_fit
import sys
from plot_base import byte_formatter, byte_formatter_func, get_comm_durations, compute_stats
......@@ -754,126 +749,3 @@ class CommCompGraph:
self._plot()
plt.savefig(filename, dpi=100)
def compute_r_square(y_real, y_approx):
assert(len(y_real) == len(y_approx))
# https://stackoverflow.com/questions/19189362/getting-the-r-squared-value-using-curve-fit/37899817
residuals = np.array([y_real[i] - y_approx[i] for i in range(len(y_real))])
ss_res = np.sum(residuals**2)
ss_tot = np.sum((y_real-np.mean(y_real))**2)
return 1 - (ss_res / ss_tot)
class CurveFit:
def __init__(self, f, x, y, y_std=None, bounds=(-np.inf, np.inf)):
"""
f: the function to fit, for instance: lambda x, a, b, c: a*x**2+b*x+c
x: list of x values
y: list of y values
y_std: list of standard deviation for each y point (computed with np.std() for instance)
bounds: limit the space of parameters. For instance, if 0 <= a <= 3, 0 <= b <= 1 and 0 <= c <= 0.5,
then: bounds=(0, [3., 1., 0.5]
"""
self.x = np.array(x)
self.y = np.array(y)
self.f = f
self.y_std = y_std
if self.y_std is None:
self.coefs, _ = curve_fit(self.f, self.x, self.y, bounds=bounds)
else:
self.coefs, _ = curve_fit(self.f, self.x, self.y, sigma=self.y_std, absolute_sigma=True, bounds=bounds)
@property
@functools.lru_cache()
def r_square(self):
return compute_r_square(self.y, [self.f(self.x[i], *self.coefs) for i in range(len(self.x))])
def predict(self, x):
return self.f(x, *self.coefs)
def __repr__(self):
param_names = inspect.getfullargspec(self.f)[0]
s = "CurveFit: "
for i in range(1, len(param_names)):
s += f"{param_names[i]}={self.coefs[i-1]:.3f} "
return s + f"r²={self.r_square:.3f}"
class MultiCurveFit:
def __init__(self, f, x, y, x_slices, y_std=None, bounds=(-np.inf, np.inf)):
"""
f: the function to fit, for instance: lambda x, a, b, c: a*x**2+b*x+c,
can be a list of functions, ot use different function per slice
x: list of x values
y: list of y values
x_slices: list of values (not list indexes !) which divide x into several chunks
y_std: list of standard deviation for each y point (computed with np.std() for instance)
bounds: limit the space of parameters. For instance, if 0 <= a <= 3, 0 <= b <= 1 and 0 <= c <= 0.5,
then: bounds=(0, [3., 1., 0.5])
If x is [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] and x_slices is [3, 8], the slices will be:
[[0, 1, 2, 3], [3, 4, 5, 6, 7, 8], [8, 9, 10]]
"""
self.curves = []
self.x_slices = x_slices
if type(f) == list and len(f) != (len(self.x_slices)+1):
raise Exception("Incorrect number of functions.")
current_x = []
current_y = []
current_y_std = [] if y_std is not None else None
current_slice = 0
for i in range(len(x)):
if current_slice < len(x_slices) and x[i] > self.x_slices[current_slice]:
_f = f
if type(_f) == list:
_f = f[current_slice]
self.curves.append(CurveFit(_f, current_x, current_y, current_y_std, bounds))
current_x = current_x[-1:] # keep the last of the previous slice
current_y = current_y[-1:]
if current_y_std is not None:
current_y_std = current_y_std[-1:]
current_slice += 1
current_x.append(x[i])
current_y.append(y[i])
if y_std is not None:
current_y_std.append(y_std[i])
_f = f
if type(_f) == list:
_f = f[-1]
self.curves.append(CurveFit(_f, current_x, current_y, current_y_std, bounds))
@property
def r_square(self):
"""
Return mean r^2 of every sub curves.
"""
return sum([c.r_square for c in self.curves]) / len(self.curves)
def _find_curve_for_x(self, x):
i = 0
while i < len(self.x_slices) and self.x_slices[i] < x:
i += 1
return self.curves[i]
def predict(self, x):
return self._find_curve_for_x(x).predict(x)
def __repr__(self):
s = "MultiCurveFit:\n"
for c in self.curves:
s += f"\tx=[{c.x[0]}..{c.x[-1]}], y=[{c.y[0]}..{c.y[-1]}], " + str(c) + "\n"
return s
import functools
import inspect
import numpy as np
from scipy.optimize import curve_fit
def compute_r_square(y_real, y_approx):
assert(len(y_real) == len(y_approx))
# https://stackoverflow.com/questions/19189362/getting-the-r-squared-value-using-curve-fit/37899817
residuals = np.array([y_real[i] - y_approx[i] for i in range(len(y_real))])
ss_res = np.sum(residuals**2)
ss_tot = np.sum((y_real-np.mean(y_real))**2)
return 1 - (ss_res / ss_tot)
class CurveFit:
def __init__(self, f, x, y, y_std=None, bounds=(-np.inf, np.inf)):
"""
f: the function to fit, for instance: lambda x, a, b, c: a*x**2+b*x+c
x: list of x values
y: list of y values
y_std: list of standard deviation for each y point (computed with np.std() for instance)
bounds: limit the space of parameters. For instance, if 0 <= a <= 3, 0 <= b <= 1 and 0 <= c <= 0.5,
then: bounds=(0, [3., 1., 0.5]
"""
self.x = np.array(x)
self.y = np.array(y)
self.f = f
self.y_std = y_std
if self.y_std is None:
self.coefs, _ = curve_fit(self.f, self.x, self.y, bounds=bounds)
else:
self.coefs, _ = curve_fit(self.f, self.x, self.y, sigma=self.y_std, absolute_sigma=True, bounds=bounds)
@property
@functools.lru_cache()
def r_square(self):
return compute_r_square(self.y, [self.f(self.x[i], *self.coefs) for i in range(len(self.x))])
def predict(self, x):
return self.f(x, *self.coefs)
def __repr__(self):
param_names = inspect.getfullargspec(self.f)[0]
s = "CurveFit: "
for i in range(1, len(param_names)):
s += f"{param_names[i]}={self.coefs[i-1]:.3f} "
return s + f"r²={self.r_square:.3f}"
class MultiCurveFit:
def __init__(self, f, x, y, x_slices, y_std=None, bounds=(-np.inf, np.inf)):
"""
f: the function to fit, for instance: lambda x, a, b, c: a*x**2+b*x+c,
can be a list of functions, ot use different function per slice
x: list of x values
y: list of y values
x_slices: list of values (not list indexes !) which divide x into several chunks
y_std: list of standard deviation for each y point (computed with np.std() for instance)
bounds: limit the space of parameters. For instance, if 0 <= a <= 3, 0 <= b <= 1 and 0 <= c <= 0.5,
then: bounds=(0, [3., 1., 0.5])
If x is [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] and x_slices is [3, 8], the slices will be:
[[0, 1, 2, 3], [3, 4, 5, 6, 7, 8], [8, 9, 10]]
"""
self.curves = []
self.x_slices = x_slices
if type(f) == list and len(f) != (len(self.x_slices)+1):
raise Exception("Incorrect number of functions.")
current_x = []
current_y = []
current_y_std = [] if y_std is not None else None
current_slice = 0
for i in range(len(x)):
if current_slice < len(x_slices) and x[i] > self.x_slices[current_slice]:
_f = f
if type(_f) == list:
_f = f[current_slice]
self.curves.append(CurveFit(_f, current_x, current_y, current_y_std, bounds))
current_x = current_x[-1:] # keep the last of the previous slice
current_y = current_y[-1:]
if current_y_std is not None:
current_y_std = current_y_std[-1:]
current_slice += 1
current_x.append(x[i])
current_y.append(y[i])
if y_std is not None:
current_y_std.append(y_std[i])
_f = f
if type(_f) == list:
_f = f[-1]
self.curves.append(CurveFit(_f, current_x, current_y, current_y_std, bounds))
@property
def r_square(self):
"""
Return mean r^2 of every sub curves.
"""
return sum([c.r_square for c in self.curves]) / len(self.curves)
def _find_curve_for_x(self, x):
i = 0
while i < len(self.x_slices) and self.x_slices[i] < x:
i += 1
return self.curves[i]
def predict(self, x):
return self._find_curve_for_x(x).predict(x)
def __repr__(self):
s = "MultiCurveFit:\n"
for c in self.curves:
s += f"\tx=[{c.x[0]}..{c.x[-1]}], y=[{c.y[0]}..{c.y[-1]}], " + str(c) + "\n"
return s
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment