Source code for om_code.omfitderiv

import numpy as np
import warnings
import matplotlib.pylab as plt
from scipy.interpolate import interp1d
import copy
import gaussianprocessderivatives as gp
import om_code.omgenutils as gu


[docs]class fitderiv: """ to smooth data and estimate the time derivative of the data using Gaussian processes. Summary statistics - the maximal time derivative, the time at which the maximal time derivative occurs, the timescale found from inverting the maximal time derivative, the maximal value of the smoothed data, and the lag time (the time when the tangent from the point with the maximal time derivative crosses a line parallel to the time-axis that passes through the first data point) - are found and their errors estimated using bootstrapping. All statistics can be postfixed by ' err' to find this error. A summary statistic is given as the median of a distribution of the statistic calculated from time series sampled from the optimal Gaussian process. Its error is estimated as the interquartile range of this distribution. After a successful optimisation, the following attributes are generated: t: array The times specified as input. d: array The data specified as input. f: array The mean of the Gaussian process with the optimal hyperparmeters at each time point. fvar: array The variance of the optimal Gaussian process at each time point. df: array The inferred first time-derivative. dfvar: array The inferred variance of the first time-derivative. ddf: array The inferred second time-derivative. ddfvar: array The inferred variance of the second time-derivative. ds: dictionary The summary statistics and their estimated errors. Examples -------- A typical work flow is: >>> from fitderiv import fitderiv >>> q= fitderiv(t, od, figs= True) >>> q.plotfit('df') or potentially >>> import matplotlib.pyplot as plt >>> plt.plot(q.t, q.d, 'r.', q.t, q.y, 'b') Reference --------- PS Swain, K Stevenson, A Leary, LF Montano-Gutierrez, IBN Clark, J Vogel, and T Pilizota. Inferring time derivatives including growth rates using Gaussian processes Nat Commun 7 (2016) 13766 """
[docs] def __init__( self, t, d, cvfn="sqexp", logs=True, noruns=5, noinits=100, exitearly=False, bd=False, empirical_errors=False, optmethod="l_bfgs_b", nosamples=100, stats=True, statnames=False, showstaterrors=True, warn=False, linalgmax=3, iskip=False, ): """ Runs a Gaussian process to both smooth time-series data and estimate its time-derivatives. Parameters ---------- t: array The time points. d: array The data corresponding to the time points with any replicates given as columns. cvfn: string The type of kernel function for the Gaussian process either 'sqexp' (squared exponential) or 'matern' (Matern with nu= 5/2) or 'nn' (neural network). logs: boolean If True, the Gaussian process is used to smooth the natural logarithm of the data and the time-derivative is therefore of the logarithm of the data. noruns: integer, optional The number of attempts to be made at optimising the kernel's hyperparmeters. noinits: integer, optional The number of random attempts made to find good initial choices for the hyperparameters before running their optimisation. exitearly: boolean, optional If True, stop at the first successful attempt at optimising the hyperparameters otherwwise take the best choice from all successful optimisations. bd: dictionary, optional Specifies the limits on the hyperparameters for the Gaussian process. For example, bd= {0: [-1, 4], 2: [2, 6]}) sets confines the first hyperparameter to be between 1e-1 and 1e^4 and confines the third hyperparmater between 1e2 and 1e6. empirical_errors: boolean, optional If True, measurement errors are empirically estimated by the variance across replicates at each time point. If False, the variance of the measurement error is assumed to be the same for all time points and its magnitude is a hyperparameter that is optimised. optmethod: string, optional The algorithm used to optimise the hyperparameters, either 'l_bfgs_b' or 'tnc'. nosamples: integer, optional The number of bootstrap samples taken to estimate errors in statistics. stats: boolean, optional If True, calcuate summary statistics for both the smoothed data and the inferred time- derivative. statnames: list of strings To customise the names of the statistics. The default names are: 'max df' for the maximal time derivative; 'time of max df' for the time at which the maximal time derivative occurs; 'inverse max df' for the timescale found from inverting the maximal time derivative; 'max f' for the maximal value of the smoothed data; 'lag time' for the lag time defined as the time when the tangent from the point with the maximal time derivative crosses a line parallel to the time-axis that passes through the first data point. showstaterrors: boolean, optional If True, display estimated errors for the statistics. warn: boolean, optional If False, warnings created by covariance matrices that are not positive semi-definite are suppressed. linalgmax: integer, optional The number of times errors generated by underlying linear algebra modules during the optimisation by poor choices of the hyperparameters should be ignored. iskip: integer, optional If non-zero, only every iskip'th data point is used to increase speed. """ self.linalgmax = linalgmax self.success = True if not warn: # warning generated occasionally when sampling from the Gaussian # process likely because of numerical errors warnings.simplefilter("ignore", RuntimeWarning) try: noreps = d.shape[1] except IndexError: noreps = 1 self.noreps = noreps self.d = np.copy(d) self.t = np.copy(t) if iskip: t = self.t[::iskip] d = self.d[::iskip] else: t = self.t d = self.d # default bounds for hyperparameters bddict = { "nn": {0: (-1, 5), 1: (-7, -2), 2: (-6, 2)}, "sqexp": {0: (-5, 5), 1: (-6, 2), 2: (-5, 2)}, "matern": {0: (-5, 5), 1: (-4, 4), 2: (-5, 2)}, } # display details of covariance function try: # find bounds if bd: bds = gu.mergedicts(original=bddict[cvfn], update=bd) else: bds = bddict[cvfn] gt = getattr(gp, cvfn + "GP")(bds, t, d) print("Using a " + gt.description + ".") gt.info() except NameError: raise (SystemExit("Gaussian process not recognised")) self.bds = bds # log data self.logs = logs if logs: print("Taking natural logarithm of the data") if np.any(np.nonzero(d < 0)): print("Negative data found, but all data must be positive " "if taking logs") self.success = False else: # replace zeros by machine error so that logs can be applied d[d == 0] = np.finfo(float).eps # take log of data d = np.log(np.asarray(d)) # run checks and define measurement errors if empirical_errors: # errors must be empirically estimated if noreps > 1: lod = [np.count_nonzero(np.isnan(d[:, i])) for i in range(noreps)] if np.sum(np.diff(lod)) != 0: print( "The replicates have different number of data " "points, but equal numbers of data points are " "needed for empirically estimating errors" ) merrors = None else: # estimate errors empirically print("Estimating measurement errors empirically") merrors = gu.findsmoothvariance(d) else: print("Not enough replicates to estimate errors empirically") merrors = None else: merrors = None self.merrors = merrors # combine data into one array tb = np.tile(t, noreps) db = np.reshape(d, np.size(d), order="F") # check for NaNs if np.any(merrors): mb = np.tile(merrors, noreps) keep = np.intersect1d( np.nonzero(~np.isnan(db))[0], np.nonzero(~np.isnan(mb))[0] ) else: keep = np.nonzero(~np.isnan(db))[0] # remove any NaNs da = db[keep] ta = tb[keep] # check data remains after removing NaNs if not np.any(da): print("Warning: fitderiv failed - too many NaNs") self.success = False elif np.any(merrors): ma = mb[keep] if not np.any(ma): print("Warning: fitderiv failed - too many NaNs") self.success = False else: ma = None if self.success: self.run(cvfn, ta, da, ma, noruns, noinits, exitearly, optmethod, stats, nosamples, statnames, showstaterrors)
###
[docs] def run(self, cvfn, ta, da, ma, noruns, noinits, exitearly, optmethod, stats, nosamples, statnames, showstaterrors): """ Instantiates and runs Gaussian process """ # instantiate Gaussian process g = getattr(gp, cvfn + "GP")(self.bds, ta, da, merrors=ma) # optimise parameters g.findhyperparameters( noruns, noinits=noinits, exitearly=exitearly, optmethod=optmethod, linalgmax=self.linalgmax, ) # display results g.results() if np.any(ma): if len(ma) != len(self.t): # NaNs have been removed mainterp = interp1d( ta, ma, bounds_error=False, fill_value=(ma[0], ma[-1]) ) ma = mainterp(self.t) g.predict(self.t, derivs=2, addnoise=True, merrorsnew=ma) # results self.g = g self.logmaxlike = -g.nlml_opt self.hparamerr = g.hparamerr self.lth = g.lth_opt self.f = g.f self.df = g.df self.ddf = g.ddf self.fvar = g.fvar self.dfvar = g.dfvar self.ddfvar = g.ddfvar if stats: self.calculatestats(nosamples, statnames, showstaterrors)
###
[docs] def fitderivsample(self, nosamples, newt=None): """ Generate sample values for the latent function and its first two derivatives (returned as a tuple). Parameters ---------- nosamples: integer The number of samples. newt: array, optional Time points for which the samples should be made. If None, the orginal time points are used. Returns ------- samples: a tuple of arrays The first element of the tuple gives samples of the latent function; the second element gives samples of the first time derivative; and the third element gives samples of the second time derivative. """ if np.any(newt): newt = np.asarray(newt) # make prediction for new time points gps = copy.deepcopy(self.g) gps.predict(newt, derivs=2, addnoise=True) else: gps = self.g samples = gps.sample(nosamples, derivs=2) return samples
###
[docs] def calculatestats(self, nosamples=100, statnames=None, showerrors=True): """ Calculates the statistics from the smoothed data and the inferred time derivative. The default names are 'max df', 'time of max df', 'inverse max grad', 'max f', and 'lag time'. Parameters ---------- nosamples: integer The number of bootstrap samples used to estimate errors. statnames: list of strings, optional A list of alternative names for the statistics. showerrors: boolean, optional If True, display the estimated errors. """ print("\nCalculating statistics with " + str(nosamples) + " samples") if showerrors: print("\t(displaying median +/- half interquartile range)\n") if statnames: self.stats = statnames else: self.stats = [ "max df", "time of max df", "doubling time from max df", "lag time", ] t = self.t fs, gs, hs = self.fitderivsample(nosamples) # calculate stats im = np.argmax(gs, 0) # max df mgr = gs[im, np.arange(nosamples)] # time of max df tmgr = np.array([t[i] for i in im]) # inverse max df dt = np.log(2) / mgr # lag time lagtime = ( tmgr + (fs[0, np.arange(nosamples)] - fs[im, np.arange(nosamples)]) / mgr ) # store stats ds = {} for stname, st in zip(self.stats, [mgr, tmgr, dt, lagtime]): ds[stname] = np.median(st) ds[stname + " err"] = gu.findiqr(st) / 2 self.ds = ds self.nosamples = nosamples self.printstats(showerrors=showerrors)
###
[docs] def printstats(self, showerrors=True, performprint=True): """ Creates and potentially displays a dictionary of the statistics calculated from the smoothed data and its inferred time-derivatives. Parameters ---------- showerrors: boolean, optional If True, display the errors. performprint: boolean optional If True, display the statistics. Returns ------- statd: dictionary The statistics and their errors. """ ds = self.ds statd = {} lenstr = np.max([len(s) for s in self.stats]) for s in self.stats: statd[s] = ds[s] statd[s + " err"] = ds[s + " err"] if performprint: stname = s.rjust(lenstr + 1) if showerrors: print( "{:s}= {:6e} +/- {:6e}".format( stname, statd[s], ds[s + " err"] ) ) else: print("{:s}= {:6e}".format(stname, statd[s])) return statd
###
[docs] def plotfit( self, char="f", errorfac=1, xlabel="time", ylabel=False, figtitle=False ): """ Plots either the data and the mean of the optimal Gaussian process or the inferred time derivatives. Parameters ---------- char: string The variable to plot either 'f' or 'df' or 'ddf'. errorfac: float, optional The size of the errorbars are errorfac times the standard deviation of the optimal Gaussian process. ylabel: string, optional A label for the y-axis. figtitle: string, optional A title for the figure. """ x = getattr(self, char) xv = getattr(self, char + "var") if char == "f": d = np.log(self.d) if self.logs else self.d plt.plot(self.t, d, "r.") plt.plot(self.t, x, "b") plt.fill_between( self.t, x - errorfac * np.sqrt(xv), x + errorfac * np.sqrt(xv), facecolor="blue", alpha=0.2, ) if ylabel: plt.ylabel(ylabel) else: plt.ylabel(char) plt.xlabel(xlabel) if figtitle: plt.title(figtitle)