Source code for VolterraBasis.models

import numpy as np
import xarray as xr
import warnings
from scipy.integrate import simpson
from .basis import describe_from_dim

from .fkernel import memory_rect, memory_trapz, corrs_rect, corrs_trapz
from .fkernel import rect_integral, trapz_integral, simpson_integral
from .fkernel import solve_ide_rect, solve_ide_trapz, solve_ide_trapz_stab


def _convert_input_array_for_evaluation(array, dim_x):
    """
    Take input and return xarray Dataset with correct shape
    """
    if isinstance(array, xr.Dataset):  # TODO add check on dimension of array
        return array
    else:
        x = np.asarray(array).reshape(-1, dim_x)
        return xr.Dataset({"x": (["time", "dim_x"], x)})


def matmulPrange(P_range, E):
    """
    Reduce basis size when needed
    """
    # P_range = xr.DataArray(P_mat, dims=["dim_basis", "dim_basis_old"])
    return xr.dot(E.rename({"dim_basis": "dim_basis_old"}), P_range)


class ModelBase(object):
    """
    The base class for holding the model
    """

    set_of_obs = ["x", "v"]

    def __init__(self, basis, dt, dim_x=1, dim_obs=1, trunc_ind=-1, L_obs="a", describe_data=None, **kwargs):
        """
        Create an instance of the Pos_gle class.

        Parameters
        ----------
        basis :  scikit-learn transformer to get the element of the basis
                This class should implement, basis() and deriv() function
                and deal with periodicity of the data.
                If a fit() method is defined, it will be called at initialization
        trunc : float, default=1.0
            Truncate all correlation functions and the memory kernel after this
            time value.
        L_obs: str, default= "a"
            Name of the column containing the time derivative of the observable
        """
        self.L_obs = L_obs

        self.dim_x = dim_x
        self.dim_obs = dim_obs

        self.dt = dt
        self.trunc_ind = int(trunc_ind)

        self._check_basis(basis, describe_data)  # Do check on basis

        self.force_coeff = None
        self.kernel = None

        self.inv_mass_coeff = None
        self.eff_mass = None

        self.gram_force = None
        self.gram_kernel = None

        self.method = None

        self.rank_projection = False
        self.P_range = None

    def _check_basis(self, basis, describe_data=None):
        """
        Simple checks on the basis class
        """
        if not (callable(getattr(basis, "basis", None))) or not (callable(getattr(basis, "deriv", None))):
            raise Exception("Basis class do not define basis() or deriv() method")
        self.basis = basis
        if not hasattr(self.basis, "n_output_features_"):
            if callable(getattr(self.basis, "fit", None)):
                if describe_data is None:
                    describe_data = describe_from_dim(self.dim_x)
                self.basis = self.basis.fit(describe_data)  # Fit basis with minimal information
            else:
                raise Exception("Basis class do have fit() method or do not expose n_output_features_ attribute")

        self.N_basis_elt = self.basis.n_output_features_

    def _set_range_projection(self, rank_tol, B0):
        """
        Set and perfom the projection onto the range of the basis for kernel
        """
        # Check actual rank of the matrix
        # Do SVD
        U, S, V = np.linalg.svd(B0, compute_uv=True, hermitian=True)
        # Compute rank from svd
        if rank_tol is None:
            rank_tol = S.max(axis=-1, keepdims=True) * max(B0.shape[-2:]) * np.finfo(S.dtype).eps
        rank = np.count_nonzero(S > rank_tol, axis=-1)
        # print(rank, U.shape, S.shape, V.shape)
        if rank < self.N_basis_elt_kernel:
            if rank != self.N_basis_elt_kernel - self.dim_x:
                print("Warning: rank is different than expected. Current {}, Expected {}. Consider checking your basis or changing the tolerance".format(rank, self.N_basis_elt_kernel - self.dim_x))
            elif rank == 0:
                raise Exception("Rank of basis is null.")
            # Construct projection
            P_range = U[:, :rank].T  # # Save the matrix for future use, matrix is rank x N_basis_elt_kernel
            # Faster to do one product in order
        else:
            print("No projection onto the range of the basis performed as basis is not deficient.")
            P_range = np.identity(self.N_basis_elt_kernel)
        self.P_range = xr.DataArray(P_range, dims=["dim_basis", "dim_basis_old"])
        return self.P_range

    def basis_vector(self, xva, compute_for="corrs"):
        """
        From one trajectory compute the basis element.
        This is the main method that should be implemented by children class.
        It take as argument a trajectory and should return the value of the basis function depending of the wanted case.
        There is three case that should be implemented.

        "force": for the evaluation and computation of the mean force.

        "pmf": for evaluation of the pmf using integration of the mean force

        "kernel": for the evaluation of the kernel.

        "corrs": for the computation of the correlation function.
        """
        raise NotImplementedError

    def compute_noise(self, xva, trunc_kernel=None, start_point=0, end_point=None):
        """
        From a trajectory get the noise.

        Parameters
        ----------
        xva : xarray dataset (['time', 'x', 'v', 'a']) .
            Use compute_va() or see its output for format details.
            Input trajectory to compute noise.
        trunc_kernel : int
                Number of datapoint of the kernel to consider.
                Can be used to remove unphysical divergence of the kernel or shortten execution time.
        """
        if self.force_coeff is None:
            raise Exception("Mean force has not been computed.")
        if trunc_kernel is None:
            trunc_kernel = self.trunc_ind
        time_0 = xva["time"].data[0]
        xva = xva.isel(time=slice(start_point, end_point))
        time = xva["time"].data - time_0
        dt = time[1] - time[0]
        E_force, E, _ = self.basis_vector(xva)
        if self.rank_projection:
            E = matmulPrange(self.P_range, E)
        force = xr.dot(E_force, self.force_coeff, dims=["dim_basis", "dim_x"])
        if self.method in ["rect", "rectangular", "second_kind_rect"] or self.method is None:
            memory = memory_rect(self.kernel[:trunc_kernel], E, dt)
        elif self.method == "trapz" or self.method == "second_kind_trapz":
            memory = memory_trapz(self.kernel[:trunc_kernel], E, dt)
        else:
            raise ValueError("Cannot compute noise when kernel computed with method {}".format(self.method))
        return time, xva[self.L_obs] - force - memory, xva[self.L_obs], force, memory

    def compute_corrs_w_noise(self, xva, left_op=None):
        """
        Compute correlation between noise and left_op

        Parameters
        ----------
        xva : xarray dataset (['time', 'x', 'v', 'a']) .
            Use compute_va() or see its output for format details.
            Input trajectory to compute noise.
        """
        if self.force_coeff is None:
            raise Exception("Mean force has not been computed.")
        if self.kernel is None:
            raise Exception("Kernel has not been computed.")

        dt = xva["time"].data[1] - xva["time"].data[0]
        E_force, E, _ = self.basis_vector(xva)
        if self.rank_projection:
            E = matmulPrange(self.P_range, E)

        noise = (xva[self.L_obs] - xr.dot(E_force, self.force_coeff)).to_numpy()

        if left_op is None:
            left_op_dat = noise
        elif isinstance(left_op, str):
            left_op_dat = xva[left_op]
        elif callable(left_op):
            left_op_dat = left_op(xva)
        elif isinstance(left_op, np.ndarray) or isinstance(left_op, xr.DataArray):
            left_op_dat = left_op

        if self.method in ["rect", "rectangular", "second_kind_rect"] or self.method is None:
            return self.kernel["time_kernel"], corrs_rect(noise, self.kernel, E, left_op_dat, dt)
        elif self.method == "trapz" or self.method == "second_kind_trapz":
            return self.kernel["time_kernel"][:-1], corrs_trapz(noise, self.kernel, E, left_op_dat, dt)
        else:
            raise ValueError("Cannot compute noise when kernel computed with method {}".format(self.method))

    def force_eval(self, x, coeffs=None):
        """
        Evaluate the force at given points x.
        If coeffs is given, use provided coefficients instead of the force
        """
        if coeffs is None:
            if self.force_coeff is None:
                raise Exception("Mean force has not been computed.")
            coeffs = self.force_coeff
        else:  # Check shape
            if coeffs.shape != (self.N_basis_elt_force, self.dim_obs):
                warnings.warn("Wrong shape of the coefficients. Get {} but expect {}.".format(coeffs.shape, (self.N_basis_elt_force, self.dim_obs)))
        E = self.basis_vector(_convert_input_array_for_evaluation(x, self.dim_x), compute_for="force")
        return xr.dot(E, coeffs).to_numpy()  # Return the force as array (nb of evalution point x dim_obs)

    def pmf_eval(self, x, coeffs=None, kT=1.0, set_zero=True):
        """
        Compute free energy via integration of the mean force at points x.
        This assume that the effective mass is independent of the position.
        If coeffs is given, use provided coefficients instead of the force coefficients.
        """
        if self.dim_obs > 1:
            print("Warning: Computation of the free energy for dimensions higher than 1 is likely to be incorrect.")
        if coeffs is None:
            if self.force_coeff is None:
                raise Exception("Mean force has not been computed.")
            coeffs = self.force_coeff
        else:  # Check shape
            if coeffs.shape != (self.N_basis_elt_force, self.dim_obs):
                warnings.warn("Wrong shape of the coefficients. Get {} but expect {}.".format(coeffs.shape, (self.N_basis_elt_force, self.dim_obs)))
        if self.eff_mass is None:
            warnings.warn("Effective mass has not been computed, use effetive mass of 1.0")
            eff_mass = np.identity(self.dim_x)
        else:
            eff_mass = np.asarray(self.eff_mass)
        E = self.basis_vector(_convert_input_array_for_evaluation(x, self.dim_x), compute_for="pmf")
        pmf = -1 * xr.dot(E, coeffs).values @ eff_mass / kT
        return pmf - float(set_zero) * np.min(pmf)

    def inv_mass_eval(self, x, coeffs=None, set_zero=True):
        """
        Compute free energy via integration of the mean force at points x.
        This assume that the effective mass is independent of the position.
        If coeffs is given, use provided coefficients instead of the force coefficients.
        """
        if coeffs is None:
            if self.inv_mass_coeff is None:
                raise Exception("Effective mass has not been computed.")
            coeffs = self.inv_mass_coeff
        else:  # Check shape
            if coeffs.shape != (self.N_basis_elt_force, self.dim_x, self.dim_x):
                raise Exception("Wrong shape of the coefficients. Get {} but expect {}.".format(coeffs.shape, (self.N_basis_elt_force, self.dim_x, self.dim_x)))
        E = self.basis_vector(_convert_input_array_for_evaluation(x, self.dim_x), compute_for="force")
        return xr.dot(E, coeffs).to_numpy()  # Return the force as array (nb of evalution point x dim_obs x dim_obs)

    def pmf_num_int_eval(self, x, kT=1.0, set_zero=True):
        """
        Compute free energy via integration of the mean force at points x.
        This take into accound the position dependent mass, but the integration is numeric
        """
        force = self.force_eval(x)[:, 0]
        inv_mass = self.inv_mass_eval(x)[:, 0, 0]
        x = np.asarray(x).ravel()
        dx = x[1] - x[0]

        diff_mass = np.gradient(inv_mass) / dx  # Numerical derivative
        grad_pmf = -1 * (force - diff_mass / kT) / inv_mass
        pmf = np.cumsum(grad_pmf) * dx

        return pmf - float(set_zero) * np.min(pmf)

    def kernel_eval(self, x, coeffs_ker=None):
        """
        Evaluate the kernel at given points x.
        If coeffs_ker is given, use provided coefficients instead of the kernel
        """
        if self.kernel is None:
            raise Exception("Kernel has not been computed.")
        if coeffs_ker is None:
            coeffs_ker = self.kernel
        else:  # Check shape
            if coeffs_ker.shape[1:] != self.kernel.shape[1:]:
                raise Exception("Wrong shape of the coefficients. Get {} but expect {}.".format(coeffs_ker.shape[1:], self.kernel.shape[1:]))

        E = self.basis_vector(_convert_input_array_for_evaluation(x, self.dim_x), compute_for="kernel")
        if self.rank_projection:
            E = matmulPrange(self.P_range, E)
        return xr.dot(coeffs_ker, E.rename({"dim_x": "dim_x'"}))

    def evolve_volterra(self, G0, lenTraj, method="trapz", trunc_ind=None):
        """
        Evolve in time the integro-differential equation.
        This assume that the GLE is a linear GLE (i.e. the set of basis function is on the left and right of the equality)

        Parameters
        ----------
        G0 : array
            Initial value of the correlation
        lenTraj : int
            Length of the time evolution
        method : str, default="trapz"
            Method that is used to discretize the continuous Volterra equations
        trunc_ind: int, default= self.trunc_ind
            Truncate the length of the memory to this value
        """
        raise NotImplementedError

    def flux_from_volterra(self, corrs_force, corrs_kernel=None, force_coeff=None, kernel=None, method="trapz", trunc_ind=None):
        """
        From a solution of the Volterra equation, compute the flux term.
        That allow to compute decomposition of the flux
        """
        if corrs_kernel is None:
            corrs_kernel = corrs_force
        if corrs_force.shape[-1] != corrs_kernel.shape[-1]:
            print("# WARNING: Different lenght in time for correlation")
        if force_coeff is None:
            force_coeff = self.force_coeff.to_numpy()
        else:
            force_coeff = np.asarray(force_coeff)
        if kernel is None:
            kernel = self.kernel.to_numpy()
        else:
            kernel = np.asarray(kernel)
        if trunc_ind is not None:
            kernel = kernel[:trunc_ind, :, :]
        force_term = np.matmul(force_coeff, corrs_force)
        res_int = np.zeros((corrs_kernel.shape[-1], corrs_kernel.shape[1], kernel.shape[-1]))
        for n in range(1, corrs_kernel.shape[-1]):
            max_len = min(n, kernel.shape[0])
            if method == "rect":
                res_int[n, :] = -1 * rect_integral(self.dt, corrs_kernel[:, :, n - max_len : n], kernel[:max_len, :, :])
            elif method == "trapz":
                res_int[n, :] = -1 * trapz_integral(self.dt, corrs_kernel[:, :, n - max_len : n], kernel[:max_len, :, :])
            elif method == "simpson":
                res_int[n, :] = -1 * simpson_integral(self.dt, corrs_kernel[:, :, n - max_len : n], kernel[:max_len, :, :])

        return force_term, res_int

    def laplace_transform_kernel(self, s_start=0.0, s_end=None, n_points=None):
        """
        Compute the Laplace transform of the kernel matrix
        """
        if self.kernel is None:
            raise Exception("Kernel has not been computed.")
        if n_points is None:
            n_points = self.trunc_ind
        if s_end is None:
            s_end = 1.0 / self.dt
        # mintimelenght = self.trunc_ind * dt
        s_range = np.linspace(s_start, s_end, n_points)
        laplace = np.zeros((n_points, self.kernel.shape[1], self.kernel.shape[2]))
        for n, s in enumerate(s_range):
            laplace[n, :, :] = simpson(np.einsum("i,ijk-> ijk", np.exp(-s * self.kernel["time"][:, 0]), self.kernel), self.kernel["time"][:, 0], axis=0)
        return s_range, laplace

    def save_model(self):
        """
        Return DataSet version of the model than can be save to file
        """
        # On doit retourner coeffs de la force, le noyau mémoire, et de quoi générer la base
        coeffs = xr.Dataset(attrs={"dt": self.dt, "trunc_ind": self.trunc_ind, "L_obs": self.L_obs, "dim_x": self.dim_x, "dim_obs": self.dim_obs})  # Ajouter quelques attributs
        if self.force_coeff is not None:
            coeffs.update({"force_coeff": self.force_coeff.rename({"dim_basis": "dim_basis_force"})})
        if self.gram_force is not None:
            coeffs.update({"gram_force": self.gram_force.rename({"dim_basis": "dim_basis_force", "dim_basis'": "dim_basis_force'"})})
        for key, dat in self.__dict__.items():
            if key not in coeffs.attrs and key not in ["basis", "force_coeff", "gram_force", "N_basis_elt_force", "N_basis_elt_kernel"]:  # Eclude some vaiable
                if dat is not None:
                    coeffs.update({key: dat})

        return coeffs

    @classmethod
    def load_model(cls, basis, coeffs, **kwargs):
        """
        Create a model from a save
        """
        # A partir des attibuts du DataSet construire un dictionnaire des arguments Ă  donner
        # kwargs = .. dim_x, dim_obs, trunc_ind, L_obs, describe_data
        cls_attrs = coeffs.attrs
        for key, value in kwargs.items():
            cls_attrs[key] = value
        model = cls(basis, **cls_attrs)

        for key in ["force_coeff", "kernel", "rank_projection", "P_range", "gram_force"]:
            if key in coeffs:
                setattr(model, key, coeffs[key])
        if model.force_coeff is not None:
            model.force_coeff = model.force_coeff.rename({"dim_basis_force": "dim_basis"})
        if model.gram_force is not None:
            model.gram_force = model.gram_force.rename({"dim_basis_force": "dim_basis", "dim_basis_force'": "dim_basis'"})

        return model


[docs]class Pos_gle(ModelBase): """ The main class for the position dependent memory extraction, holding all data and the extracted memory kernels. """ set_of_obs = ["x", "v", "a"]
[docs] def __init__(self, *args, **kwargs): """ Create an instance of the Pos_gle class. Parameters ---------- xva_arg : xarray dataset (['time', 'x', 'v', 'a']) or list of datasets. Use compute_va() or see its output for format details. The timeseries to analyze. It should be either a xarray timeseries or a listlike collection of them. basis : scikit-learn transformer to get the element of the basis This class should implement, basis() and deriv() function and deal with periodicity of the data. If a fit() method is defined, it will be called at initialization saveall : bool, default=True Whether to save all output functions. prefix : str Prefix for the saved output functions. verbose : bool, default=True Set verbosity. kT : float, default=2.494 Numerical value for kT. trunc : float, default=1.0 Truncate all correlation functions and the memory kernel after this time value. """ ModelBase.__init__(self, *args, **kwargs) self.N_basis_elt_force = self.N_basis_elt self.N_basis_elt_kernel = self.N_basis_elt - int(self.basis.const_removed) * self.dim_x self.rank_projection = not self.basis.const_removed
[docs] def basis_vector(self, xva, compute_for="corrs"): bk = xr.apply_ufunc(self.basis.basis, xva["x"], input_core_dims=[["dim_x"]], output_core_dims=[["dim_basis"]], exclude_dims={"dim_x"}, dask_gufunc_kwargs={"output_sizes": {"dim_basis": self.N_basis_elt}}, dask="parallelized") if compute_for == "force": return bk elif compute_for == "pmf": return xr.apply_ufunc(self.basis.antiderivative, xva["x"], input_core_dims=[["dim_x"]], output_core_dims=[["dim_basis"]], exclude_dims={"dim_x"}, dask_gufunc_kwargs={"output_sizes": {"dim_basis": self.N_basis_elt}}, dask="parallelized") dbk = xr.apply_ufunc(self.basis.deriv, xva["x"], input_core_dims=[["dim_x"]], output_core_dims=[["dim_basis", "dim_x"]], dask_gufunc_kwargs={"output_sizes": {"dim_basis": self.N_basis_elt_kernel, "dim_x": self.dim_x}}, dask="parallelized") if compute_for == "kernel": # For kernel evaluation return dbk elif compute_for == "corrs": ddbk = xr.apply_ufunc(self.basis.hessian, xva["x"], input_core_dims=[["dim_x"]], output_core_dims=[["dim_basis", "dim_x", "dim_x'"]], dask_gufunc_kwargs={"output_sizes": {"dim_basis": self.N_basis_elt_kernel, "dim_x": self.dim_x, "dim_x'": self.dim_x}}, dask="parallelized") E = xr.dot(dbk, xva["v"], dims=["dim_x"]) dE = xr.dot(dbk, xva["a"], dims=["dim_x"]) + xr.dot(ddbk, xva["v"], xva["v"].rename({"dim_x": "dim_x'"}), dims=["dim_x", "dim_x'"]) return bk, E, dE else: raise ValueError("Basis evaluation goal not specified")
[docs]class Pos_gle_with_friction(Pos_gle): """ A derived class in which we don't enforce zero instantaneous friction """ set_of_obs = ["x", "v", "a"]
[docs] def __init__(self, *args, **kwargs): Pos_gle.__init__(self, *args, **kwargs) if not self.basis.const_removed: print("Warning: The model cannot deal with dask array if the constant is not removed.") self.N_basis_elt_force = 2 * self.N_basis_elt - int(self.basis.const_removed) * self.dim_x self.N_basis_elt_kernel = self.N_basis_elt - int(self.basis.const_removed) * self.dim_x self.rank_projection = not self.basis.const_removed
[docs] def basis_vector(self, xva, compute_for="corrs"): # We have to deal with the multidimensionnal case as well bk = xr.apply_ufunc(self.basis.basis, xva["x"], input_core_dims=[["dim_x"]], output_core_dims=[["dim_basis"]], exclude_dims={"dim_x"}, dask_gufunc_kwargs={"output_sizes": {"dim_basis": self.N_basis_elt}}, dask="parallelized") if compute_for == "force_eval": return bk elif compute_for == "pmf": return xr.apply_ufunc(self.basis.antiderivative, xva["x"], input_core_dims=[["dim_x"]], output_core_dims=[["dim_basis"]], exclude_dims={"dim_x"}, dask_gufunc_kwargs={"output_sizes": {"dim_basis": self.N_basis_elt}}, dask="parallelized") dbk = xr.apply_ufunc(self.basis.deriv, xva["x"], input_core_dims=[["dim_x"]], output_core_dims=[["dim_basis", "dim_x"]], dask_gufunc_kwargs={"output_sizes": {"dim_basis": self.N_basis_elt_kernel, "dim_x": self.dim_x}}, dask="parallelized") if compute_for == "kernel": # To have the term proportional to velocity return dbk Evel = xr.dot(dbk, xva["v"], dims=["dim_x"]) E = xr.concat([bk, Evel], dim="dim_basis") if compute_for == "force": return E elif compute_for == "corrs": ddbk = xr.apply_ufunc(self.basis.hessian, xva["x"], input_core_dims=[["dim_x"]], output_core_dims=[["dim_basis", "dim_x", "dim_x'"]], dask_gufunc_kwargs={"output_sizes": {"dim_basis": self.N_basis_elt_kernel, "dim_x": self.dim_x, "dim_x'": self.dim_x}}, dask="parallelized") dE = xr.dot(dbk, xva["a"], dims=["dim_x"]) + xr.dot(ddbk, xva["v"], xva["v"].rename({"dim_x": "dim_x'"}), dims=["dim_x", "dim_x'"]) return E, Evel, dE else: raise ValueError("Basis evaluation goal not specified")
[docs] def force_eval(self, x): """ Evaluate the force for the position dependent part only """ if self.force_coeff is None: raise Exception("Mean force has not been computed.") E = self.basis_vector(_convert_input_array_for_evaluation(x, self.dim_x), compute_for="force_eval") return np.einsum("ik,kl->il", E, self.force_coeff[: self.N_basis_elt]) # -1 * np.matmul(self.force_coeff[: self.N_basis_elt, :], E.T)
[docs] def friction_force_eval(self, x): """ Compute the term of friction, that should be zero """ if self.force_coeff is None: raise Exception("Mean force has not been computed.") E = self.basis_vector(_convert_input_array_for_evaluation(x, self.dim_x), compute_for="kernel") return np.einsum("ikd,kl->ild", E, self.force_coeff[self.N_basis_elt :]) # np.matmul(self.force_coeff[self.N_basis_elt :, :], E.T)
[docs] def pmf_eval(self, x, coeffs=None, kT=1.0, set_zero=True): """ Compute free energy via integration of the mean force at points x. This assume that the effective mass is independent of the position. If coeffs is given, use provided coefficients instead of the force coefficients. """ if self.dim_obs > 1: print("Warning: Computation of the free energy for dimensions higher than 1 is likely to be incorrect.") if coeffs is None: if self.force_coeff is None: raise Exception("Mean force has not been computed.") coeffs = self.force_coeff[: self.N_basis_elt, :] else: # Check shape if coeffs.shape != (self.N_basis_elt, self.dim_obs): warnings.warn("Wrong shape of the coefficients. Get {} but expect {}.".format(coeffs.shape, (self.N_basis_elt_force, self.dim_obs))) if self.eff_mass is None: warnings.warn("Effective mass has not been computed, use effetive mass of 1.0") eff_mass = np.identity(self.dim_x) else: eff_mass = np.asarray(self.eff_mass) E = self.basis_vector(_convert_input_array_for_evaluation(x, self.dim_x), compute_for="pmf") pmf = -1 * xr.dot(E, coeffs).values @ eff_mass / kT return pmf - float(set_zero) * np.min(pmf)
[docs]class Pos_gle_no_vel_basis(Pos_gle): """ Use basis function dependent of the position only """ set_of_obs = ["x", "v"]
[docs] def __init__(self, *args, **kwargs): ModelBase.__init__(self, *args, **kwargs) self.N_basis_elt_force = self.N_basis_elt self.N_basis_elt_kernel = self.N_basis_elt if self.basis.const_removed: self.basis.const_removed = False print("Warning: remove_const on basis function have been set to False.")
[docs] def basis_vector(self, xva, compute_for="corrs"): E = xr.apply_ufunc(self.basis.basis, xva["x"], input_core_dims=[["dim_x"]], output_core_dims=[["dim_basis"]], exclude_dims={"dim_x"}, dask_gufunc_kwargs={"output_sizes": {"dim_basis": self.N_basis_elt}}, dask="parallelized") if compute_for == "force": return E elif compute_for == "pmf": return xr.apply_ufunc(self.basis.antiderivative, xva["x"], input_core_dims=[["dim_x"]], output_core_dims=[["dim_basis"]], exclude_dims={"dim_x"}, dask_gufunc_kwargs={"output_sizes": {"dim_basis": self.N_basis_elt}}, dask="parallelized") elif compute_for == "kernel": # Extend the basis for multidim value return E.expand_dims({"dim_x": self.dim_x}, axis=-1) elif compute_for == "corrs": dbk = xr.apply_ufunc(self.basis.deriv, xva["x"], input_core_dims=[["dim_x"]], output_core_dims=[["dim_basis", "dim_x"]], dask_gufunc_kwargs={"output_sizes": {"dim_basis": self.N_basis_elt_kernel, "dim_x": self.dim_x}}, dask="parallelized") dE = xr.dot(dbk, xva["v"], dims=["dim_x"]) return E, E, dE else: raise ValueError("Basis evaluation goal not specified")
[docs]class Pos_gle_const_kernel(Pos_gle): """ A derived class in which we the kernel is computed independent of the position """ set_of_obs = ["x", "v", "a"]
[docs] def __init__(self, *args, **kwargs): ModelBase.__init__(self, *args, **kwargs) self.N_basis_elt_force = self.N_basis_elt self.N_basis_elt_kernel = self.dim_obs
[docs] def basis_vector(self, xva, compute_for="corrs"): bk = xr.apply_ufunc(self.basis.basis, xva["x"], input_core_dims=[["dim_x"]], output_core_dims=[["dim_basis"]], exclude_dims={"dim_x"}, dask_gufunc_kwargs={"output_sizes": {"dim_basis": self.N_basis_elt}}, dask="parallelized") if compute_for == "force": return bk elif compute_for == "pmf": return xr.apply_ufunc(self.basis.antiderivative, xva["x"], input_core_dims=[["dim_x"]], output_core_dims=[["dim_basis"]], exclude_dims={"dim_x"}, dask_gufunc_kwargs={"output_sizes": {"dim_basis": self.N_basis_elt}}, dask="parallelized") if compute_for == "kernel": # For kernel evaluation grad = np.zeros((bk.shape[0], self.dim_obs, self.dim_obs)) for i in range(self.dim_obs): grad[:, i, i] = 1.0 return xr.DataArray(grad, dims=("time", "dim_basis", "dim_x")) elif compute_for == "corrs": return bk, xva["v"].rename({"dim_x": "dim_basis"}), xva["a"].rename({"dim_x": "dim_basis"}) else: raise ValueError("Basis evaluation goal not specified")
[docs]class Pos_gle_hybrid(Pos_gle): """ Implement the hybrid projector of arXiv:2202.01922 """ set_of_obs = ["x", "v", "a"]
[docs] def __init__(self, *args, **kwargs): ModelBase.__init__(self, *args, **kwargs) self.N_basis_elt_force = self.N_basis_elt self.N_basis_elt_kernel = self.N_basis_elt + 1 if self.basis.const_removed: self.basis.const_removed = False print("Warning: remove_const on basis function have been set to False.")
[docs] def basis_vector(self, xva, compute_for="corrs"): # We have to deal with the multidimensionnal case as well bk = xr.apply_ufunc(self.basis.basis, xva["x"], input_core_dims=[["dim_x"]], output_core_dims=[["dim_basis"]], exclude_dims={"dim_x"}, dask_gufunc_kwargs={"output_sizes": {"dim_basis": self.N_basis_elt}}, dask="parallelized") if compute_for == "force": return bk elif compute_for == "pmf": return xr.apply_ufunc(self.basis.antiderivative, xva["x"], input_core_dims=[["dim_x"]], output_core_dims=[["dim_basis"]], exclude_dims={"dim_x"}, dask_gufunc_kwargs={"output_sizes": {"dim_basis": self.N_basis_elt}}, dask="parallelized") elif compute_for == "kernel": # Extend the basis for multidim value return bk.expand_dims({"dim_x": self.dim_x}, axis=-1) # return bk.reshape(-1, self.N_basis_elt_kernel - 1, 1) elif compute_for == "corrs": E = xr.concat([xva["v"].rename({"dim_x": "dim_basis"}), bk], dim="dim_basis") dbk = xr.dot(xr.apply_ufunc(self.basis.deriv, xva["x"], input_core_dims=[["dim_x"]], output_core_dims=[["dim_basis", "dim_x"]], dask_gufunc_kwargs={"output_sizes": {"dim_basis": self.N_basis_elt, "dim_x": self.dim_x}}, dask="parallelized"), xva["v"]) dE = xr.concat([xva["a"].rename({"dim_x": "dim_basis"}), dbk], dim="dim_basis") # To test return bk, E, dE else: raise ValueError("Basis evaluation goal not specified")
[docs] def get_const_kernel_part(self): """ Return the position independent part of the kernel """ if self.kernel is None: raise Exception("Kernel has not been computed.") return self.kernel.sel(dim_basis=0)
[docs] def kernel_eval(self, x): """ Evaluate the position dependant part of the kernel at given points x """ if self.kernel is None: raise Exception("Kernel has not been computed.") E = self.basis_vector(_convert_input_array_for_evaluation(x, self.dim_x), compute_for="kernel") if self.rank_projection: E = matmulPrange(self.P_range, E) return xr.dot(self.kernel.sel(dim_basis=slice(1, None)), E.rename({"dim_x": "dim_x'"}))
# return self.kernel["time_kernel"], np.einsum("jkd,ikl->ijld", E, self.kernel[:, 1:, :]) # Return the kernel as array (time x nb of evalution point x dim_obs x dim_x)
[docs]class Pos_gle_overdamped(ModelBase): """ Extraction of position dependent memory kernel for overdamped dynamics. """ set_of_obs = ["x", "v"]
[docs] def __init__(self, *args, L_obs="v", rank_projection=False, **kwargs): ModelBase.__init__(self, *args, L_obs=L_obs, **kwargs) self.N_basis_elt_force = self.N_basis_elt self.N_basis_elt_kernel = self.N_basis_elt if self.basis.const_removed: self.basis.const_removed = False print("Warning: remove_const on basis function have been set to False.") self.rank_projection = rank_projection
[docs] def basis_vector(self, xva, compute_for="corrs"): E = xr.apply_ufunc(self.basis.basis, xva["x"], input_core_dims=[["dim_x"]], output_core_dims=[["dim_basis"]], exclude_dims={"dim_x"}, dask_gufunc_kwargs={"output_sizes": {"dim_basis": self.N_basis_elt}}, dask="parallelized") if compute_for == "force": return E elif compute_for == "pmf": return xr.apply_ufunc(self.basis.antiderivative, xva["x"], input_core_dims=[["dim_x"]], output_core_dims=[["dim_basis"]], exclude_dims={"dim_x"}, dask_gufunc_kwargs={"output_sizes": {"dim_basis": self.N_basis_elt}}, dask="parallelized") elif compute_for == "kernel": return E.expand_dims({"dim_x": self.dim_x}, axis=-1) # return E.reshape(-1, self.N_basis_elt_kernel, self.dim_x) elif compute_for == "corrs": dbk = xr.apply_ufunc(self.basis.deriv, xva["x"], input_core_dims=[["dim_x"]], output_core_dims=[["dim_basis", "dim_x"]], dask_gufunc_kwargs={"output_sizes": {"dim_basis": self.N_basis_elt_kernel, "dim_x": self.dim_x}}, dask="parallelized") dE = xr.dot(dbk, xva["v"], dims=["dim_x"]) return E, E, dE else: raise ValueError("Basis evaluation goal not specified")
[docs] def pmf_eval(self, x, coeffs=None, kT=1.0, set_zero=True): """ Compute free energy via integration of the mean force at points x. This assume that the effective mass is independent of the position. If coeffs is given, use provided coefficients instead of the force coefficients. """ if self.dim_obs > 1: print("Warning: Computation of the free energy for dimensions higher than 1 is likely to be incorrect.") if coeffs is None: if self.force_coeff is None: raise Exception("Mean force has not been computed.") coeffs = self.force_coeff else: # Check shape if coeffs.shape != (self.N_basis_elt_force, self.dim_obs): raise Exception("Wrong shape of the coefficients. Get {} but expect {}.".format(coeffs.shape, (self.N_basis_elt_force, self.dim_obs))) # if self.eff_mass is None: # self.compute_effective_mass(kT=kT) E = self.basis_vector(_convert_input_array_for_evaluation(x, self.dim_x), compute_for="pmf") pmf = -1 * np.einsum("ik,kl->il", E, coeffs) / kT return pmf - float(set_zero) * np.min(pmf)
[docs] def evolve_volterra(self, G0, lenTraj, method="trapz", trunc_ind=None): """ Evolve in time the integro-differential equation. This assume that the GLE is a linear GLE (i.e. the set of basis function is on the left and right of the equality) Parameters ---------- G0 : array Initial value of the correlation lenTraj : int Length of the time evolution method : str, default="trapz" Method that is used to discretize the continuous Volterra equations trunc_ind: int, default= self.trunc_ind Truncate the length of the memory to this value """ if self.force_coeff.shape[0] != self.force_coeff.shape[1] or self.kernel.shape[1] != self.kernel.shape[2]: raise ValueError("Cannot evolve volterra equation if the coefficients are not square") if G0.shape[0] != self.kernel.shape[-1]: raise ValueError("Wrong shape for initial value") if trunc_ind is None or trunc_ind <= 0: trunc_ind = self.kernel.shape[0] G0 = np.asarray(G0) coeffs_force = self.force_coeff.to_numpy() coeffs_ker = self.kernel.to_numpy()[:trunc_ind, :, :] if method == "rect": res = solve_ide_rect(coeffs_ker, G0, coeffs_force, lenTraj, self.dt) elif method == "trapz": res = solve_ide_trapz(coeffs_ker, G0, coeffs_force, lenTraj, self.dt) elif method == "trapz_stab": res = solve_ide_trapz_stab(coeffs_ker, G0, coeffs_force, lenTraj, self.dt) return np.arange(lenTraj) * self.dt, res
# class Pos_gle_overdamped_const_kernel(ModelBase): # """ # Extraction of position dependent memory kernel for overdamped dynamics # using position-independent kernel. # """ # # set_of_obs = ["x", "v"] # # def __init__(self, *args, L_obs="v", **kwargs): # ModelBase.__init__(self, *args, L_obs=L_obs, **kwargs) # self.N_basis_elt_force = self.N_basis_elt # self.N_basis_elt_kernel = self.dim_obs # # def basis_vector(self, xva, compute_for="corrs"): # E = xr.apply_ufunc(self.basis.basis, xva["x"], input_core_dims=[["dim_x"]], output_core_dims=[["dim_basis"]], exclude_dims={"dim_x"}, dask="parallelized") # if compute_for == "force": # return E # elif compute_for == "pmf": # return xr.apply_ufunc(self.basis.antiderivative, xva["x"], input_core_dims=[["dim_x"]], output_core_dims=[["dim_basis"]], exclude_dims={"dim_x"}, dask="parallelized") # elif compute_for == "kernel": # return xr.DataArray(np.ones((E.shape[0], self.N_basis_elt_kernel, self.dim_x)), dims=("time", "dim_basis", "dim_x")) # elif compute_for == "corrs": # return E, xr.DataArray(np.ones((E.shape[0], self.dim_obs)), dims=("time", "dim_basis")), xva["v"] # else: # raise ValueError("Basis evaluation goal not specified")