Source code for VolterraBasis.gle_estimation

import numpy as np
import xarray as xr
from scipy.integrate import trapezoid
from scipy.stats import describe

from .basis import sum_describe

from .correlation import correlation_1D, correlation_ND, correlation_direct_1D, correlation_direct_ND

from .fkernel import kernel_first_kind_trapz, kernel_first_kind_rect, kernel_first_kind_midpoint, kernel_second_kind_rect, kernel_second_kind_trapz


def solve_linear(G, b):  # Write also a sparse version
    """
    Solve the linear problem Gx = b
    """
    x_np = np.linalg.inv(G.values) @ b.values
    return xr.DataArray(x_np, dims=b.dims)


[docs]class Estimator_gle(object): """ The main class for the position dependent memory extraction holding all data. """
[docs] def __init__(self, xva_arg, model_class, basis, trunc=1.0, L_obs=None, saveall=True, prefix="", verbose=True, n_jobs=1, **kwargs): """ Create an instance of the Pos_gle class. Parameters ---------- xva_arg : xarray dataset (['time', 'x', 'v', 'a']) or list of datasets. Use compute_va() or see its output for format details. The timeseries to analyze. It should be either a xarray timeseries or a listlike collection of them. basis : scikit-learn transformer to get the element of the basis This class should implement, basis() and deriv() function and deal with periodicity of the data. If a fit() method is defined, it will be called at initialization saveall : bool, default=True Whether to save all output functions. prefix : str Prefix for the saved output functions. verbose : bool, default=True Set verbosity. trunc : float, default=1.0 Truncate all correlation functions and the memory kernel after this time value. L_obs: str, default given by the model Name of the column containing the time derivative of the observable """ # Create all internal variables self.saveall = saveall self.prefix = prefix self.verbose = verbose # filenames self.corrsfile = "corrs.nc" self.kernelfile = "kernel.nc" self.data_describe = None self._do_check(xva_arg) # Do some check on the trajectories # Save trajectory properties if self.xva_list is None: return # processing input arguments self.weights = np.array([xva["time"].shape[0] for xva in self.xva_list], dtype=int) # Should be the various lenght of trajectory self.weightsum = np.sum(self.weights) if self.verbose: print("Found trajectories with the following lengths:") print(self.weights) self.n_jobs = int(n_jobs) if self.n_jobs <= 1: self.loop_over_trajs = self._loop_over_trajs_serial else: self.loop_over_trajs = self._loop_over_trajs_parallel if L_obs is None: # Default value given by the class of the model L_obs = model_class.set_of_obs[-1] dim_x = self.xva_list[0].dims["dim_x"] dim_obs = self.xva_list[0][L_obs].shape[1] self.dt = self.xva_list[0].attrs["dt"] trunc_ind = self._compute_trunc_ind(trunc) # Fit basis from some description of the data describe_data = self.describe_data() self.model = model_class(basis, self.dt, dim_x=dim_x, dim_obs=dim_obs, trunc_ind=trunc_ind, L_obs=L_obs, describe_data=describe_data) self._do_check_obs(model_class.set_of_obs, self.model.L_obs) # Check that we have in the trajectories what we need # For retrocompatibility, expose model methods for coefficients evaluation for func in dir(self.model): if callable(getattr(self.model, func)) and not func.startswith("_"): if "eval" in func or "compute" in func or "func" in ["basis_vector"]: setattr(self, func, getattr(self.model, func))
def _do_check(self, xva_arg): if xva_arg is not None: if isinstance(xva_arg, xr.Dataset): self.xva_list = [xva_arg] else: self.xva_list = xva_arg for xva in self.xva_list: if "time" not in xva.dims: raise Exception("Time is not a coordinate. Please provide dataset with time, " "or an iterable collection (i.e. list) " "of dataset with time.") if "dt" not in xva.attrs: raise Exception("Timestep not in dataset attrs") else: self.xva_list = None def _do_check_obs(self, set_of_obs, L_obs): for xva in self.xva_list: for col in set_of_obs: if col not in xva.data_vars: raise Exception("Please provide time,{} dataset, " "or an iterable collection (i.e. list) " "of time,{} dataset.".format(set_of_obs, set_of_obs)) if L_obs not in xva.data_vars: raise Exception("Please provide dataset that include {} as variable .".format(L_obs)) def _compute_trunc_ind(self, trunc): lastinds = np.array([xva["time"][-1] for xva in self.xva_list]) smallest = np.min(lastinds) if smallest < trunc: if self.verbose: print("Warning: Found a trajectory shorter than " "the argument trunc. Override.") trunc = smallest # Find index of the time truncation self.trunc_ind = (self.xva_list[0]["time"] <= trunc).sum().data if self.verbose: print("Trajectories are truncated at lenght {} for dynamic analysis".format(self.trunc_ind)) return self.trunc_ind
[docs] def describe_data(self): """ Return a description of the data """ # Prévoir la sauvegarde du résultats pour 1) ne pas avoir à la calculer à chaque fois 2) pouvoir le sauvegarder if self.data_describe is None: describe_set = [describe(xva["x"].data) for xva in self.xva_list] self.data_describe = describe_set[0] for des_data in describe_set[1:]: self.data_describe = sum_describe(self.data_describe, des_data) return self.data_describe
[docs] def to_gfpe(self, model=None, new_obs_name="dE"): """ Update trajectories to compute derivative of the basis function """ if model is None: model = self.model self.model.rank_projection = False for i in range(len(self.xva_list)): _, _, dE = model.basis_vector(self.xva_list[i]) self.xva_list[i].update({new_obs_name: dE.rename({"dim_basis": "dim_dE"})}) self.model.L_obs = new_obs_name self.model.dim_obs = dE.shape[-1]
def _loop_over_trajs_serial(self, func, model, **kwargs): """ A generator for iteration over trajectories """ # Et voir alors pour faire une version parallélisé (en distribué) array_res = [func(weight, xva, model, **kwargs) for weight, xva in zip(self.weights, self.xva_list)] # print(array_res) res = [0.0] * len(array_res[0]) for weight, single_res in zip(self.weights, array_res): for i, arr in enumerate(single_res): res[i] += arr * weight / self.weightsum return res def _loop_over_trajs_parallel(self, func, model, **kwargs): """ A generator for iteration over trajectories """ from joblib import Parallel, delayed array_res = Parallel(n_jobs=self.n_jobs)(delayed(func)(weight, xva, model, **kwargs) for weight, xva in zip(self.weights, self.xva_list)) res = [0.0] * len(array_res[0]) for weight, single_res in zip(self.weights, array_res): for i, arr in enumerate(single_res): res[i] += arr * weight / self.weightsum return res
[docs] def compute_basis_mean(self, basis_type="force"): """ Compute mean value of the basis function """ return self.loop_over_trajs(self._compute_basis_mean, self.model, basis_type=basis_type)[0]
def compute_gram_force(self): if self.verbose: print("Calculate gram...") avg_gram = self.loop_over_trajs(self._compute_gram, self.model, gram_type="force")[0] self.model.gram_force = avg_gram if self.verbose: print("Found gram:", avg_gram) return self.model
[docs] def compute_gram_kernel(self): """ Return gram matrix of the kernel part of the basis. """ if self.verbose: print("Calculate kernel gram...") self.model.gram_kernel = self.loop_over_trajs(self._compute_gram, self.model, gram_type="kernel")[0] if self.model.rank_projection: self.model.gram_kernel = np.einsum("lj,jk,mk->lm", self.model.P_range, self.gram_kernel, self.model.P_range) return self.model
[docs] def compute_effective_mass(self): """ Return average effective mass computed from equipartition with the velocity. """ if self.verbose: print("Calculate effective mass...") v2 = self.loop_over_trajs(self._compute_square_vel, self.model)[0] self.model.eff_mass = xr.DataArray(np.linalg.inv(v2), dims=("dim_x'", "dim_x")) if self.verbose: print("Found effective mass:", self.model.eff_mass) return self.model
[docs] def compute_pos_effective_mass(self): """ Return position-dependent effective inverse mass """ if self.verbose: print("Calculate effective_mass...") pos_inv_mass, avg_gram = self.loop_over_trajs(self._compute_square_vel_pos, self.model) self.model.inv_mass_coeff = solve_linear(avg_gram, pos_inv_mass) return self.model
[docs] def compute_mean_force(self): """ Computes the mean force from the trajectories. """ if self.verbose: print("Calculate mean force...") avg_disp, avg_gram = self.loop_over_trajs(self._projection_on_basis, self.model) self.model.gram_force = avg_gram self.model.force_coeff = solve_linear(avg_gram, avg_disp) return self.model
def set_zero_force(self): self.model.force_coeff = xr.DataArray(np.zeros((self.model.N_basis_elt_force, self.model.dim_obs)), dims=["dim_basis", self.xva_list[0][self.model.L_obs].dims[1]])
[docs] def compute_corrs(self, large=False, rank_tol=None, **kwargs): """ Compute correlation functions. Parameters ---------- large : bool, default=False When large is true, it use a slower way to compute correlation that is less demanding in memory rank_tol: float, default=None Tolerance for rank computation in case of projection onto the range of the basis second_order_method:bool, default = True If set to False do less computation but prevent to use second_order method in Volterra """ if self.verbose: print("Calculate correlation functions...") if self.model.force_coeff is None: raise Exception("Mean force has not been computed.") self.bkdxcorrw, self.dotbkdxcorrw, self.bkbkcorrw, self.dotbkbkcorrw = self.loop_over_trajs(self._correlation_ufunc, self.model, **kwargs) if self.model.rank_projection: if self.verbose: print("Projection on range space...") P_range = self.model._set_range_projection(rank_tol, self.bkbkcorrw.isel(time_trunc=0)) P_range_tranpose = P_range.rename({"dim_basis_old": "dim_basis_old'", "dim_basis": "dim_basis'"}) tempbkbkcorrw = xr.dot(P_range, self.bkbkcorrw.rename({"dim_basis": "dim_basis_old"})) self.bkbkcorrw = xr.dot(P_range_tranpose, tempbkbkcorrw.rename({"dim_basis'": "dim_basis_old'"})) # self.bkbkcorrw = np.einsum("lj,ijk,mk->ilm", self.model.P_range, self.bkbkcorrw, self.model.P_range) self.bkdxcorrw = xr.dot(P_range, self.bkdxcorrw.rename({"dim_basis": "dim_basis_old"})) if self.dotbkdxcorrw is not None: self.dotbkdxcorrw = xr.dot(P_range, self.dotbkdxcorrw.rename({"dim_basis": "dim_basis_old"})) if isinstance(self.dotbkbkcorrw, xr.DataArray): self.dotbkbkcorrw = xr.dot(P_range_tranpose, P_range, self.dotbkbkcorrw.rename({"dim_basis": "dim_basis_old", "dim_basis'": "dim_basis_old'"})) if self.saveall: xr.Dataset({"bkbk": self.bkbkcorrw, "bkdx": self.bkdxcorrw, "dotbkbk": self.dotbkbkcorrw, "dotbkdx": self.dotbkdxcorrw}, coords={"time_trunc": np.arange(self.bkbkcorrw.shape[-1]) * self.dt}).to_netcdf(self.corrsfile) return self.model
[docs] def compute_kernel(self, method="rectangular", k0=None): """ Computes the memory kernel. Parameters ---------- method : {"rectangular", "midpoint", "midpoint_w_richardson","trapz","second_kind_rect","second_kind_trapz"}, default=rectangular Choose numerical method of inversion of the volterra equation k0 : float, default=0. If you give a nonzero value for k0, this is used at time zero for the trapz and second kind method. If set to None, the F-routine will calculate k0 from the second kind memory equation. """ if self.bkbkcorrw is None or self.bkdxcorrw is None: raise Exception("Need correlation functions to compute the kernel.") if self.verbose: print("Compute memory kernel using {} method".format(method)) time = np.arange(self.model.trunc_ind) * self.dt time_ker = time - time[0] # Set zero time self.model.method = method # Save used method if self.verbose: print("Use dt:", self.dt) if k0 is None and method in ["trapz", "second_kind_rect", "second_kind_trapz"]: # Then we should compute initial value from time derivative at zero if self.dotbkdxcorrw is None: raise Exception("Need correlation with derivative functions to compute the kernel using this method or provide initial value.") k0 = solve_linear(self.bkbkcorrw.isel(time_trunc=0), self.dotbkdxcorrw.isel(time_trunc=0)).to_numpy() if self.verbose: print("K0", k0) # print("Gram", self.bkbkcorrw[0, :, :]) # print("Gram eigs", np.linalg.eigvals(self.bkbkcorrw[0, :, :])) if method in ["rect", "rectangular"]: kernel = kernel_first_kind_rect(self.bkbkcorrw.to_numpy(), self.bkdxcorrw.to_numpy(), self.dt) elif method == "midpoint": # Deal with not even data lenght kernel = kernel_first_kind_midpoint(self.bkbkcorrw.to_numpy(), self.bkdxcorrw.to_numpy(), self.dt) time_ker = time_ker[:-1:2] elif method == "midpoint_w_richardson": ker = kernel_first_kind_midpoint(self.bkbkcorrw.to_numpy(), self.bkdxcorrw.to_numpy(), self.dt) ker_3 = kernel_first_kind_midpoint(self.bkbkcorrw.to_numpy()[:, :, ::3], self.bkdxcorrw.to_numpy()[:, :, ::3], 3 * self.dt) kernel = (9 * ker[::3][: ker_3.shape[0]] - ker_3) / 8 time_ker = time_ker[:-3:6] elif method == "trapz": ker = kernel_first_kind_trapz(k0, self.bkbkcorrw.to_numpy(), self.bkdxcorrw.to_numpy(), self.dt) kernel = 0.5 * (ker[1:-1, :, :] + 0.5 * (ker[:-2, :, :] + ker[2:, :, :])) # Smoothing kernel = np.insert(kernel, 0, k0, axis=0) time_ker = time_ker[:-1] elif method == "second_kind_rect": if self.dotbkdxcorrw is None or self.dotbkbkcorrw is None: raise Exception("Need correlation with derivative functions to compute the kernel using this method, please use other method.") kernel = kernel_second_kind_rect(k0, self.bkbkcorrw.isel(time_trunc=0).to_numpy(), self.dotbkbkcorrw.to_numpy(), self.dotbkdxcorrw.to_numpy(), self.dt) elif method == "second_kind_trapz": if self.dotbkdxcorrw is None or self.dotbkbkcorrw is None: raise Exception("Need correlation with derivative functions to compute the kernel using this method, please use other method.") kernel = kernel_second_kind_trapz(k0, self.bkbkcorrw.isel(time_trunc=0).to_numpy(), self.dotbkbkcorrw.to_numpy(), self.dotbkdxcorrw.to_numpy(), self.dt) else: raise Exception("Method for volterra inversion is not in {rectangular, midpoint, midpoint_w_richardson,trapz,second_kind_rect,second_kind_trapz}") self.model.kernel = xr.DataArray(kernel, dims=("time_kernel", "dim_basis", self.bkdxcorrw.dims[1]), coords={"time_kernel": time_ker}) if self.saveall: # TODO: change to xarray save xr.Dataset({"kernel": self.model.kernel}).to_netcdf(self.kernelfile) return self.model
[docs] def check_volterra_inversion(self, return_diff=False): """ For checking if the volterra equation is correctly inversed Compute the integral in volterra equation using trapezoidal rule. This only check the volterra of the first kind Parameters ---------- return_diff : bool, default = False Indicate if you want the result of the intégral or the difference between the result and the expected value """ if self.model.kernel is None: raise Exception("Kernel has not been computed.") dt = self.dt res_int = np.zeros(self.bkdxcorrw.shape) for n in range(self.model.kernel.shape[0]): to_integrate = np.einsum("jki,ikl->ijl", self.bkbkcorrw[:, :, : n + 1][:, :, ::-1], self.model.kernel[: n + 1, :, :]) res_int[:, :, n] = -1 * trapezoid(to_integrate, dx=dt, axis=0) if return_diff: return np.abs(res_int - self.bkdxcorrw) else: return res_int
[docs] def compute_projected_corrs(self, left_op=None): """ Compute correlation between noise and left_op using the projected correlations """ return self.loop_over_trajs(self._corrs_w_noise, self.model, left_op=left_op)
@staticmethod def _projection_on_basis(weight, xva, model, gram_type="force", **kwargs): """ Do the needed scalar product for one traj Return 2 array with dimension: avg_disp: (N_basis_elt_force, dim_obs)) avg_gram: (N_basis_elt_force, N_basis_elt_force) """ E = model.basis_vector(xva, compute_for=gram_type) avg_disp = xr.dot(E, xva[model.L_obs]) / weight avg_gram = xr.dot(E, E.rename({"dim_basis": "dim_basis'"})) / weight return avg_disp, avg_gram @staticmethod def _compute_basis_mean(weight, xva, model, basis_type="force", **kwargs): """ Do the needed scalar product for one traj """ E = model.basis_vector(xva, compute_for=basis_type) return (E.mean(dim="time"),) @staticmethod def _compute_gram(weight, xva, model, gram_type="force", **kwargs): """ Do the needed scalar product for one traj """ E = model.basis_vector(xva, compute_for=gram_type) avg_gram = xr.dot(E, E.rename({"dim_basis": "dim_basis'"})) / weight return (avg_gram,) @staticmethod def _compute_square_vel(weight, xva, model, **kwargs): """ Squared velocity for effective masse """ return (xr.dot(xva["v"], xva["v"].rename({"dim_x": "dim_x'"})) / weight,) @staticmethod def _compute_square_vel_pos(weight, xva, model, **kwargs): """ Do the needed scalar product for one traj """ E = model.basis_vector(xva, compute_for="kernel") avg_disp = xr.dot(E, xva["v"], xva["v"].rename({"dim_x": "dim_x'"})) / weight avg_gram = xr.dot(E, E.rename({"dim_basis": "dim_basis'"})) / weight return avg_disp, avg_gram @staticmethod def _correlation_ufunc(weight, xva, model, method="fft", vectorize=False, second_order_method=True, **kwargs): """ Do the correlation Return 4 array with dimensions bkbkcorrw :(trunc_ind, N_basis_elt_kernel, N_basis_elt_kernel) bkdxcorrw :(trunc_ind, N_basis_elt_kernel, dim_obs) dotbkdxcorrw :(trunc_ind, N_basis_elt_kernel, dim_obs) dotbkbkcorrw :(trunc_ind, N_basis_elt_kernel, N_basis_elt_kernel) """ if method == "fft" and vectorize: func = correlation_1D elif method == "direct" and not vectorize: func = correlation_direct_ND elif method == "direct" and vectorize: func = correlation_direct_1D else: func = correlation_ND E_force, E, dE = model.basis_vector(xva) # print(E_force, model.force_coeff) ortho_xva = xva[model.L_obs] - xr.dot(E_force, model.force_coeff) # print(ortho_xva.head(), E.head()) bkdxcorrw = xr.apply_ufunc( func, E, ortho_xva, input_core_dims=[["time"], ["time"]], output_core_dims=[["time_trunc"]], exclude_dims={"time"}, kwargs={"trunc": model.trunc_ind}, dask_gufunc_kwargs={"output_sizes": {"time_trunc": model.trunc_ind}, "allow_rechunk": True}, vectorize=vectorize, dask="parallelized" ) bkbkcorrw = xr.apply_ufunc( func, E.rename({"dim_basis": "dim_basis'"}), E, input_core_dims=[["time"], ["time"]], output_core_dims=[["time_trunc"]], exclude_dims={"time"}, kwargs={"trunc": model.trunc_ind}, dask_gufunc_kwargs={"output_sizes": {"time_trunc": model.trunc_ind}, "allow_rechunk": True}, vectorize=vectorize, dask="parallelized", ) if second_order_method: dotbkdxcorrw = xr.apply_ufunc( func, dE, ortho_xva, input_core_dims=[["time"], ["time"]], output_core_dims=[["time_trunc"]], exclude_dims={"time"}, kwargs={"trunc": model.trunc_ind}, dask_gufunc_kwargs={"output_sizes": {"time_trunc": model.trunc_ind}, "allow_rechunk": True}, vectorize=vectorize, dask="parallelized", ) dotbkbkcorrw = xr.apply_ufunc( func, dE.rename({"dim_basis": "dim_basis'"}), E, input_core_dims=[["time"], ["time"]], output_core_dims=[["time_trunc"]], exclude_dims={"time"}, kwargs={"trunc": model.trunc_ind}, dask_gufunc_kwargs={"output_sizes": {"time_trunc": model.trunc_ind}, "allow_rechunk": True}, vectorize=vectorize, dask="parallelized", ) else: # We can compute only the first element then, that is faster dotbkdxcorrw = xr.dot(dE, ortho_xva).expand_dims({"time_trunc": 1}) / weight dotbkbkcorrw = np.array([[0.0]]) return bkdxcorrw, dotbkdxcorrw, bkbkcorrw, dotbkbkcorrw @staticmethod def _corrs_w_noise(weight, xva, model, left_op=None, **kwargs): """ Do the needed scalar product for one traj """ return model.compute_corrs_w_noise(xva, left_op)