Source code for graphtools.base

from future.utils import with_metaclass
from builtins import super
from copy import copy as shallow_copy
import numpy as np
import abc
import pygsp
from inspect import signature
from sklearn.decomposition import PCA, TruncatedSVD
from sklearn.preprocessing import normalize
from sklearn.utils.graph import graph_shortest_path
from scipy import sparse
import warnings
import numbers
import pickle
import sys
import tasklogger

from . import matrix, utils

_logger = tasklogger.get_tasklogger("graphtools")


[docs]class Base(object): """Class that deals with key-word arguments but is otherwise just an object. """ def __init__(self): super().__init__() @classmethod def _get_param_names(cls): """Get parameter names for the estimator""" # fetch the constructor or the original constructor before # deprecation wrapping if any init = getattr(cls.__init__, "deprecated_original", cls.__init__) if init is object.__init__: # No explicit constructor to introspect return [] # introspect the constructor arguments to find the model parameters # to represent init_signature = signature(init) # Consider the constructor parameters excluding 'self' parameters = [ p for p in init_signature.parameters.values() if p.name != "self" and p.kind != p.VAR_KEYWORD ] # Extract and sort argument names excluding 'self' parameters = set([p.name for p in parameters]) # recurse for superclass in cls.__bases__: try: parameters.update(superclass._get_param_names()) except AttributeError: # object and pygsp.graphs.Graph don't have this method pass return parameters
[docs] def set_params(self, **kwargs): # for k in kwargs: # raise TypeError("set_params() got an unexpected " # "keyword argument '{}'".format(k)) return self
[docs]class Data(Base): """Parent class that handles the import and dimensionality reduction of data Parameters ---------- data : array-like, shape=[n_samples,n_features] accepted types: `numpy.ndarray`, `scipy.sparse.spmatrix`. `pandas.DataFrame`, `pandas.SparseDataFrame`. n_pca : {`int`, `None`, `bool`, 'auto'}, optional (default: `None`) number of PC dimensions to retain for graph building. If n_pca in `[None, False, 0]`, uses the original data. If 'auto' or `True` then estimate using a singular value threshold Note: if data is sparse, uses SVD instead of PCA TODO: should we subtract and store the mean? rank_threshold : `float`, 'auto', optional (default: 'auto') threshold to use when estimating rank for `n_pca in [True, 'auto']`. If 'auto', this threshold is s_max * eps * max(n_samples, n_features) where s_max is the maximum singular value of the data matrix and eps is numerical precision. [press2007]_. random_state : `int` or `None`, optional (default: `None`) Random state for random PCA Attributes ---------- data : array-like, shape=[n_samples,n_features] Original data matrix n_pca : int or `None` data_nu : array-like, shape=[n_samples,n_pca] Reduced data matrix data_pca : sklearn.decomposition.PCA or sklearn.decomposition.TruncatedSVD sklearn PCA operator """ def __init__( self, data, n_pca=None, rank_threshold=None, random_state=None, **kwargs ): self._check_data(data) n_pca, rank_threshold = self._parse_n_pca_threshold(data, n_pca, rank_threshold) if utils.is_SparseDataFrame(data): data = data.to_coo() elif utils.is_DataFrame(data): try: # sparse data data = data.sparse.to_coo() except AttributeError: # dense data data = np.array(data) elif utils.is_Anndata(data): data = data.X self.data = data self.n_pca = n_pca self.rank_threshold = rank_threshold self.random_state = random_state self.data_nu = self._reduce_data() super().__init__(**kwargs) def _parse_n_pca_threshold(self, data, n_pca, rank_threshold): if isinstance(n_pca, str): n_pca = n_pca.lower() if n_pca != "auto": raise ValueError( "n_pca must be an integer " "0 <= n_pca < min(n_samples,n_features), " "or in [None, False, True, 'auto']." ) if isinstance(n_pca, numbers.Number): if not float(n_pca).is_integer(): # cast it to integer n_pcaR = np.round(n_pca).astype(int) warnings.warn( "Cannot perform PCA to fractional {} dimensions. " "Rounding to {}".format(n_pca, n_pcaR), RuntimeWarning, ) n_pca = n_pcaR if n_pca < 0: raise ValueError( "n_pca cannot be negative. " "Please supply an integer " "0 <= n_pca < min(n_samples,n_features) or None" ) elif np.min(data.shape) <= n_pca: warnings.warn( "Cannot perform PCA to {} dimensions on " "data with min(n_samples, n_features) = {}".format( n_pca, np.min(data.shape) ), RuntimeWarning, ) n_pca = 0 if n_pca in [0, False, None]: # cast 0, False to None. n_pca = None elif n_pca is True: # notify that we're going to estimate rank. n_pca = "auto" _logger.info( "Estimating n_pca from matrix rank. " "Supply an integer n_pca " "for fixed amount." ) if not any([isinstance(n_pca, numbers.Number), n_pca is None, n_pca == "auto"]): raise ValueError( "n_pca was not an instance of numbers.Number, " "could not be cast to False, and not None. " "Please supply an integer " "0 <= n_pca < min(n_samples,n_features) or None" ) if rank_threshold is not None and n_pca != "auto": warnings.warn( "n_pca = {}, therefore rank_threshold of {} " "will not be used. To use rank thresholding, " "set n_pca = True".format(n_pca, rank_threshold), RuntimeWarning, ) if n_pca == "auto": if isinstance(rank_threshold, str): rank_threshold = rank_threshold.lower() if rank_threshold is None: rank_threshold = "auto" if isinstance(rank_threshold, numbers.Number): if rank_threshold <= 0: raise ValueError( "rank_threshold must be positive float or 'auto'. " ) else: if rank_threshold != "auto": raise ValueError( "rank_threshold must be positive float or 'auto'. " ) return n_pca, rank_threshold def _check_data(self, data): if len(data.shape) != 2: msg = "Expected 2D array, got {}D array " "instead (shape: {}.) ".format( len(data.shape), data.shape ) if len(data.shape) < 2: msg += ( "\nReshape your data either using array.reshape(-1, 1) " "if your data has a single feature or array.reshape(1, -1) if " "it contains a single sample." ) raise ValueError(msg) def _reduce_data(self): """Private method to reduce data dimension. If data is dense, uses randomized PCA. If data is sparse, uses randomized SVD. TODO: should we subtract and store the mean? TODO: Fix the rank estimation so we do not compute the full SVD. Returns ------- Reduced data matrix """ if self.n_pca is not None and ( self.n_pca == "auto" or self.n_pca < self.data.shape[1] ): with _logger.task("PCA"): n_pca = self.data.shape[1] - 1 if self.n_pca == "auto" else self.n_pca if sparse.issparse(self.data): if ( isinstance(self.data, sparse.coo_matrix) or isinstance(self.data, sparse.lil_matrix) or isinstance(self.data, sparse.dok_matrix) ): self.data = self.data.tocsr() self.data_pca = TruncatedSVD(n_pca, random_state=self.random_state) else: self.data_pca = PCA( n_pca, svd_solver="randomized", random_state=self.random_state ) self.data_pca.fit(self.data) if self.n_pca == "auto": s = self.data_pca.singular_values_ smax = s.max() if self.rank_threshold == "auto": threshold = ( smax * np.finfo(self.data.dtype).eps * max(self.data.shape) ) self.rank_threshold = threshold threshold = self.rank_threshold gate = np.where(s >= threshold)[0] self.n_pca = gate.shape[0] if self.n_pca == 0: raise ValueError( "Supplied threshold {} was greater than " "maximum singular value {} " "for the data matrix".format(threshold, smax) ) _logger.info( "Using rank estimate of {} as n_pca".format(self.n_pca) ) # reset the sklearn operator op = self.data_pca # for line-width brevity.. op.components_ = op.components_[gate, :] op.explained_variance_ = op.explained_variance_[gate] op.explained_variance_ratio_ = op.explained_variance_ratio_[gate] op.singular_values_ = op.singular_values_[gate] self.data_pca = ( op # im not clear if this is needed due to assignment rules ) data_nu = self.data_pca.transform(self.data) return data_nu else: data_nu = self.data if sparse.issparse(data_nu) and not isinstance( data_nu, (sparse.csr_matrix, sparse.csc_matrix, sparse.bsr_matrix) ): data_nu = data_nu.tocsr() return data_nu
[docs] def get_params(self): """Get parameters from this object """ return {"n_pca": self.n_pca, "random_state": self.random_state}
[docs] def set_params(self, **params): """Set parameters on this object Safe setter method - attributes should not be modified directly as some changes are not valid. Valid parameters: - n_pca - random_state Parameters ---------- params : key-value pairs of parameter name and new values Returns ------- self """ if "n_pca" in params and params["n_pca"] != self.n_pca: raise ValueError("Cannot update n_pca. Please create a new graph") if "random_state" in params: self.random_state = params["random_state"] super().set_params(**params) return self
[docs] def transform(self, Y): """Transform input data `Y` to reduced data space defined by `self.data` Takes data in the same ambient space as `self.data` and transforms it to be in the same reduced space as `self.data_nu`. Parameters ---------- Y : array-like, shape=[n_samples_y, n_features] n_features must be the same as `self.data`. Returns ------- Transformed data, shape=[n_samples_y, n_pca] Raises ------ ValueError : if Y.shape[1] != self.data.shape[1] """ try: # try PCA first return self.data_pca.transform(Y) except ValueError: # shape is wrong raise ValueError( "data of shape {0} cannot be transformed" " to graph built on data of shape {1}. " "Expected shape ({2}, {3})".format( Y.shape, self.data.shape, Y.shape[0], self.data.shape[1] ) ) except AttributeError: # no pca, try to return data if len(Y.shape) < 2 or Y.shape[1] != self.data.shape[1]: # shape is wrong raise ValueError( "data of shape {0} cannot be transformed" " to graph built on data of shape {1}. " "Expected shape ({2}, {3})".format( Y.shape, self.data.shape, Y.shape[0], self.data.shape[1] ) ) else: return Y
[docs] def inverse_transform(self, Y, columns=None): """Transform input data `Y` to ambient data space defined by `self.data` Takes data in the same reduced space as `self.data_nu` and transforms it to be in the same ambient space as `self.data`. Parameters ---------- Y : array-like, shape=[n_samples_y, n_pca] n_features must be the same as `self.data_nu`. columns : list-like list of integers referring to column indices in the original data space to be returned. Avoids recomputing the full matrix where only a few dimensions of the ambient space are of interest Returns ------- Inverse transformed data, shape=[n_samples_y, n_features] Raises ------ ValueError : if Y.shape[1] != self.data_nu.shape[1] """ try: if not hasattr(self, "data_pca"): # no pca performed try: if Y.shape[1] != self.data_nu.shape[1]: # shape is wrong raise ValueError except IndexError: # len(Y.shape) < 2 raise ValueError if columns is None: return Y else: columns = np.array([columns]).flatten() return Y[:, columns] else: if columns is None: return self.data_pca.inverse_transform(Y) else: # only return specific columns columns = np.array([columns]).flatten() Y_inv = np.dot(Y, self.data_pca.components_[:, columns]) if hasattr(self.data_pca, "mean_"): Y_inv += self.data_pca.mean_[columns] return Y_inv except ValueError: # more informative error raise ValueError( "data of shape {0} cannot be inverse transformed" " from graph built on reduced data of shape ({1}, {2}). Expected shape ({3}, {2})".format( Y.shape, self.data_nu.shape[0], self.data_nu.shape[1], Y.shape[0] ) )
[docs]class BaseGraph(with_metaclass(abc.ABCMeta, Base)): """Parent graph class Parameters ---------- kernel_symm : string, optional (default: '+') Defines method of kernel symmetrization. '+' : additive '*' : multiplicative 'mnn' : min-max MNN symmetrization 'none' : no symmetrization theta: float (default: 1) Min-max symmetrization constant. K = `theta * min(K, K.T) + (1 - theta) * max(K, K.T)` anisotropy : float, optional (default: 0) Level of anisotropy between 0 and 1 (alpha in Coifman & Lafon, 2006) initialize : `bool`, optional (default : `True`) if false, don't create the kernel matrix. Attributes ---------- K : array-like, shape=[n_samples, n_samples] kernel matrix defined as the adjacency matrix with ones down the diagonal kernel : synonym for `K` P : array-like, shape=[n_samples, n_samples] (cached) diffusion operator defined as a row-stochastic form of the kernel matrix diff_op : synonym for `P` """ def __init__( self, kernel_symm="+", theta=None, anisotropy=0, gamma=None, initialize=True, **kwargs ): if gamma is not None: warnings.warn( "gamma is deprecated. " "Setting theta={}".format(gamma), FutureWarning ) theta = gamma if kernel_symm == "gamma": warnings.warn( "kernel_symm='gamma' is deprecated. " "Setting kernel_symm='mnn'", FutureWarning, ) kernel_symm = "mnn" if kernel_symm == "theta": warnings.warn( "kernel_symm='theta' is deprecated. " "Setting kernel_symm='mnn'", FutureWarning, ) kernel_symm = "mnn" self.kernel_symm = kernel_symm self.theta = theta self._check_symmetrization(kernel_symm, theta) if not (isinstance(anisotropy, numbers.Real) and 0 <= anisotropy <= 1): raise ValueError( "Expected 0 <= anisotropy <= 1. " "Got {}".format(anisotropy) ) self.anisotropy = anisotropy if initialize: _logger.debug("Initializing kernel...") self.K else: _logger.debug("Not initializing kernel.") super().__init__(**kwargs) def _check_symmetrization(self, kernel_symm, theta): if kernel_symm not in ["+", "*", "mnn", None]: raise ValueError( "kernel_symm '{}' not recognized. Choose from " "'+', '*', 'mnn', or 'none'.".format(kernel_symm) ) elif kernel_symm != "mnn" and theta is not None: warnings.warn( "kernel_symm='{}' but theta is not None. " "Setting kernel_symm='mnn'.".format(kernel_symm) ) self.kernel_symm = kernel_symm = "mnn" if kernel_symm == "mnn": if theta is None: self.theta = theta = 1 warnings.warn( "kernel_symm='mnn' but theta not given. " "Defaulting to theta={}.".format(self.theta) ) elif not isinstance(theta, numbers.Number) or theta < 0 or theta > 1: raise ValueError( "theta {} not recognized. Expected " "a float between 0 and 1".format(theta) ) def _build_kernel(self): """Private method to build kernel matrix Runs public method to build kernel matrix and runs additional checks to ensure that the result is okay Returns ------- Kernel matrix, shape=[n_samples, n_samples] Raises ------ RuntimeWarning : if K is not symmetric """ kernel = self.build_kernel() kernel = self.symmetrize_kernel(kernel) kernel = self.apply_anisotropy(kernel) if (kernel - kernel.T).max() > 1e-5: warnings.warn("K should be symmetric", RuntimeWarning) if np.any(kernel.diagonal() == 0): warnings.warn("K should have a non-zero diagonal", RuntimeWarning) return kernel
[docs] def symmetrize_kernel(self, K): # symmetrize if self.kernel_symm == "+": _logger.debug("Using addition symmetrization.") K = (K + K.T) / 2 elif self.kernel_symm == "*": _logger.debug("Using multiplication symmetrization.") K = K.multiply(K.T) elif self.kernel_symm == "mnn": _logger.debug("Using mnn symmetrization (theta = {}).".format(self.theta)) K = self.theta * matrix.elementwise_minimum(K, K.T) + ( 1 - self.theta ) * matrix.elementwise_maximum(K, K.T) elif self.kernel_symm is None: _logger.debug("Using no symmetrization.") pass else: raise NotImplementedError return K
[docs] def apply_anisotropy(self, K): if self.anisotropy == 0: # do nothing return K else: if sparse.issparse(K): d = np.array(K.sum(1)).flatten() K = K.tocoo() K.data = K.data / ((d[K.row] * d[K.col]) ** self.anisotropy) K = K.tocsr() else: d = K.sum(1) K = K / (np.outer(d, d) ** self.anisotropy) return K
[docs] def get_params(self): """Get parameters from this object """ return { "kernel_symm": self.kernel_symm, "theta": self.theta, "anisotropy": self.anisotropy, }
[docs] def set_params(self, **params): """Set parameters on this object Safe setter method - attributes should not be modified directly as some changes are not valid. Valid parameters: Invalid parameters: (these would require modifying the kernel matrix) - kernel_symm - theta Parameters ---------- params : key-value pairs of parameter name and new values Returns ------- self """ if "theta" in params and params["theta"] != self.theta: raise ValueError("Cannot update theta. Please create a new graph") if "anisotropy" in params and params["anisotropy"] != self.anisotropy: raise ValueError("Cannot update anisotropy. Please create a new graph") if "kernel_symm" in params and params["kernel_symm"] != self.kernel_symm: raise ValueError("Cannot update kernel_symm. Please create a new graph") super().set_params(**params) return self
@property def P(self): """Diffusion operator (cached) Return or calculate the diffusion operator Returns ------- P : array-like, shape=[n_samples, n_samples] diffusion operator defined as a row-stochastic form of the kernel matrix """ try: return self._diff_op except AttributeError: self._diff_op = normalize(self.kernel, "l1", axis=1) return self._diff_op @property def kernel_degree(self): """Weighted degree vector (cached) Return or calculate the degree vector from the affinity matrix Returns ------- degrees : array-like, shape=[n_samples] Row sums of graph kernel """ try: return self._kernel_degree except AttributeError: self._kernel_degree = matrix.to_array(self.kernel.sum(axis=1)).reshape( -1, 1 ) return self._kernel_degree @property def diff_aff(self): """Symmetric diffusion affinity matrix Return or calculate the symmetric diffusion affinity matrix .. math:: A(x,y) = K(x,y) (d(x) d(y))^{-1/2} where :math:`d` is the degrees (row sums of the kernel.) Returns ------- diff_aff : array-like, shape=[n_samples, n_samples] symmetric diffusion affinity matrix defined as a doubly-stochastic form of the kernel matrix """ row_degrees = self.kernel_degree if sparse.issparse(self.kernel): # diagonal matrix degrees = sparse.csr_matrix( ( 1 / np.sqrt(row_degrees.flatten()), np.arange(len(row_degrees)), np.arange(len(row_degrees) + 1), ) ) return degrees @ self.kernel @ degrees else: col_degrees = row_degrees.T return (self.kernel / np.sqrt(row_degrees)) / np.sqrt(col_degrees) @property def diff_op(self): """Synonym for P """ return self.P @property def K(self): """Kernel matrix Returns ------- K : array-like, shape=[n_samples, n_samples] kernel matrix defined as the adjacency matrix with ones down the diagonal """ try: return self._kernel except AttributeError: self._kernel = self._build_kernel() return self._kernel @property def kernel(self): """Synonym for K """ return self.K @property def weighted(self): return self.decay is not None
[docs] @abc.abstractmethod def build_kernel(self): """Build the kernel matrix Abstract method that all child classes must implement. Must return a symmetric matrix Returns ------- K : kernel matrix, shape=[n_samples, n_samples] symmetric matrix with ones down the diagonal with no non-negative entries. """ raise NotImplementedError
[docs] def to_pygsp(self, **kwargs): """Convert to a PyGSP graph For use only when the user means to create the graph using the flag `use_pygsp=True`, and doesn't wish to recompute the kernel. Creates a graphtools.graphs.TraditionalGraph with a precomputed affinity matrix which also inherits from pygsp.graphs.Graph. Parameters ---------- kwargs keyword arguments for graphtools.Graph Returns ------- G : graphtools.base.PyGSPGraph, graphtools.graphs.TraditionalGraph """ from . import api if "precomputed" in kwargs: if kwargs["precomputed"] != "affinity": warnings.warn( "Cannot build PyGSPGraph with precomputed={}. " "Using 'affinity' instead.".format(kwargs["precomputed"]), UserWarning, ) del kwargs["precomputed"] if "use_pygsp" in kwargs: if kwargs["use_pygsp"] is not True: warnings.warn( "Cannot build PyGSPGraph with use_pygsp={}. " "Use True instead.".format(kwargs["use_pygsp"]), UserWarning, ) del kwargs["use_pygsp"] return api.Graph(self.K, precomputed="affinity", use_pygsp=True, **kwargs)
[docs] def to_igraph(self, attribute="weight", **kwargs): """Convert to an igraph Graph Uses the igraph.Graph constructor Parameters ---------- attribute : str, optional (default: "weight") kwargs : additional arguments for igraph.Graph """ try: import igraph as ig except ImportError: # pragma: no cover raise ImportError( "Please install igraph with " "`pip install --user python-igraph`." ) try: W = self.W except AttributeError: # not a pygsp graph W = self.K.copy() W = matrix.set_diagonal(W, 0) sources, targets = W.nonzero() edgelist = list(zip(sources, targets)) g = ig.Graph(W.shape[0], edgelist, **kwargs) weights = W[W.nonzero()] weights = matrix.to_array(weights) g.es[attribute] = weights.flatten().tolist() return g
[docs] def to_pickle(self, path): """Save the current Graph to a pickle. Parameters ---------- path : str File path where the pickled object will be stored. """ pickle_obj = shallow_copy(self) is_oldpygsp = all( [isinstance(self, pygsp.graphs.Graph), int(sys.version.split(".")[1]) < 7] ) if is_oldpygsp: pickle_obj.logger = pickle_obj.logger.name with open(path, "wb") as f: pickle.dump(pickle_obj, f, protocol=pickle.HIGHEST_PROTOCOL)
def _check_shortest_path_distance(self, distance): if distance == "data" and self.weighted: raise NotImplementedError( "Graph shortest path with constant or data distance only " "implemented for unweighted graphs. " "For weighted graphs, use `distance='affinity'`." ) elif distance == "constant" and self.weighted: raise NotImplementedError( "Graph shortest path with constant distance only " "implemented for unweighted graphs. " "For weighted graphs, use `distance='affinity'`." ) elif distance == "affinity" and not self.weighted: raise ValueError( "Graph shortest path with affinity distance only " "valid for weighted graphs. " "For unweighted graphs, use `distance='constant'` " "or `distance='data'`." ) def _default_shortest_path_distance(self): if not self.weighted: distance = "data" _logger.info("Using ambient data distances.") else: distance = "affinity" _logger.info("Using negative log affinity distances.") return distance
[docs] def shortest_path(self, method="auto", distance=None): """ Find the length of the shortest path between every pair of vertices on the graph Parameters ---------- method : string ['auto'|'FW'|'D'] method to use. Options are 'auto' : attempt to choose the best method for the current problem 'FW' : Floyd-Warshall algorithm. O[N^3] 'D' : Dijkstra's algorithm with Fibonacci stacks. O[(k+log(N))N^2] distance : {'constant', 'data', 'affinity'}, optional (default: 'data') Distances along kNN edges. 'constant' gives constant edge lengths. 'data' gives distances in ambient data space. 'affinity' gives distances as negative log affinities. Returns ------- D : np.ndarray, float, shape = [N,N] D[i,j] gives the shortest distance from point i to point j along the graph. If no path exists, the distance is np.inf Notes ----- Currently, shortest paths can only be calculated on kNNGraphs with `decay=None` """ if distance is None: distance = self._default_shortest_path_distance() self._check_shortest_path_distance(distance) if distance == "constant": D = self.K elif distance == "data": D = sparse.coo_matrix(self.K) D.data = np.sqrt( np.sum((self.data_nu[D.row] - self.data_nu[D.col]) ** 2, axis=1) ) elif distance == "affinity": D = sparse.csr_matrix(self.K) D.data = -1 * np.log(D.data) else: raise ValueError( "Expected `distance` in ['constant', 'data', 'affinity']. " "Got {}".format(distance) ) P = graph_shortest_path(D, method=method) # symmetrize for numerical error P = (P + P.T) / 2 # sklearn returns 0 if no path exists P[np.where(P == 0)] = np.inf # diagonal should actually be zero P[(np.arange(P.shape[0]), np.arange(P.shape[0]))] = 0 return P
[docs]class PyGSPGraph(with_metaclass(abc.ABCMeta, pygsp.graphs.Graph, Base)): """Interface between BaseGraph and PyGSP. All graphs should possess these matrices. We inherit a lot of functionality from pygsp.graphs.Graph. There is a lot of overhead involved in having both a weight and kernel matrix """ def __init__(self, lap_type="combinatorial", coords=None, plotting=None, **kwargs): if plotting is None: plotting = {} W = self._build_weight_from_kernel(self.K) super().__init__( W, lap_type=lap_type, coords=coords, plotting=plotting, **kwargs ) @property @abc.abstractmethod def K(): """Kernel matrix Returns ------- K : array-like, shape=[n_samples, n_samples] kernel matrix defined as the adjacency matrix with ones down the diagonal """ raise NotImplementedError def _build_weight_from_kernel(self, kernel): """Private method to build an adjacency matrix from a kernel matrix Just puts zeroes down the diagonal in-place, since the kernel matrix is ultimately not stored. Parameters ---------- kernel : array-like, shape=[n_samples, n_samples] Kernel matrix. Returns ------- Adjacency matrix, shape=[n_samples, n_samples] """ weight = kernel.copy() self._diagonal = weight.diagonal().copy() weight = matrix.set_diagonal(weight, 0) return weight
[docs]class DataGraph(with_metaclass(abc.ABCMeta, Data, BaseGraph)): """Abstract class for graphs built from a dataset Parameters ---------- data : array-like, shape=[n_samples,n_features] accepted types: `numpy.ndarray`, `scipy.sparse.spmatrix`. n_pca : {`int`, `None`, `bool`, 'auto'}, optional (default: `None`) number of PC dimensions to retain for graph building. If n_pca in `[None,False,0]`, uses the original data. If `True` then estimate using a singular value threshold Note: if data is sparse, uses SVD instead of PCA TODO: should we subtract and store the mean? rank_threshold : `float`, 'auto', optional (default: 'auto') threshold to use when estimating rank for `n_pca in [True, 'auto']`. Note that the default kwarg is `None` for this parameter. It is subsequently parsed to 'auto' if necessary. If 'auto', this threshold is smax * np.finfo(data.dtype).eps * max(data.shape) where smax is the maximum singular value of the data matrix. For reference, see, e.g. W. Press, S. Teukolsky, W. Vetterling and B. Flannery, “Numerical Recipes (3rd edition)”, Cambridge University Press, 2007, page 795. random_state : `int` or `None`, optional (default: `None`) Random state for random PCA and graph building verbose : `bool`, optional (default: `True`) Verbosity. n_jobs : `int`, optional (default : 1) The number of jobs to use for the computation. If -1 all CPUs are used. If 1 is given, no parallel computing code is used at all, which is useful for debugging. For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. Thus for n_jobs = -2, all CPUs but one are used """ def __init__(self, data, verbose=True, n_jobs=1, **kwargs): # kwargs are ignored self.n_jobs = n_jobs self.verbose = verbose _logger.set_level(verbose) super().__init__(data, **kwargs)
[docs] def get_params(self): """Get parameters from this object """ params = Data.get_params(self) params.update(BaseGraph.get_params(self)) return params
[docs] @abc.abstractmethod def build_kernel_to_data(self, Y): """Build a kernel from new input data `Y` to the `self.data` Parameters ---------- Y: array-like, [n_samples_y, n_dimensions] new data for which an affinity matrix is calculated to the existing data. `n_features` must match either the ambient or PCA dimensions Returns ------- K_yx: array-like, [n_samples_y, n_samples] kernel matrix where each row represents affinities of a single sample in `Y` to all samples in `self.data`. Raises ------ ValueError: if this Graph is not capable of extension or if the supplied data is the wrong shape """ raise NotImplementedError
def _check_extension_shape(self, Y): """Private method to check if new data matches `self.data` Parameters ---------- Y : array-like, shape=[n_samples_y, n_features_y] Input data Returns ------- Y : array-like, shape=[n_samples_y, n_pca] (Potentially transformed) input data Raises ------ ValueError : if `n_features_y` is not either `self.data.shape[1]` or `self.n_pca`. """ if len(Y.shape) != 2: raise ValueError("Expected a 2D matrix. Y has shape {}".format(Y.shape)) if not Y.shape[1] == self.data_nu.shape[1]: # try PCA transform if Y.shape[1] == self.data.shape[1]: Y = self.transform(Y) else: # wrong shape if self.data.shape[1] != self.data_nu.shape[1]: # PCA is possible msg = ("Y must be of shape either " "(n, {}) or (n, {})").format( self.data.shape[1], self.data_nu.shape[1] ) else: # no PCA, only one choice of shape msg = "Y must be of shape (n, {})".format(self.data.shape[1]) raise ValueError(msg) return Y
[docs] def extend_to_data(self, Y): """Build transition matrix from new data to the graph Creates a transition matrix such that `Y` can be approximated by a linear combination of samples in `self.data`. Any transformation of `self.data` can be trivially applied to `Y` by performing `transform_Y = self.interpolate(transform, transitions)` Parameters ---------- Y: array-like, [n_samples_y, n_dimensions] new data for which an affinity matrix is calculated to the existing data. `n_features` must match either the ambient or PCA dimensions Returns ------- transitions : array-like, shape=[n_samples_y, self.data.shape[0]] Transition matrix from `Y` to `self.data` """ Y = self._check_extension_shape(Y) kernel = self.build_kernel_to_data(Y) transitions = normalize(kernel, norm="l1", axis=1) return transitions
[docs] def interpolate(self, transform, transitions=None, Y=None): """Interpolate new data onto a transformation of the graph data One of either transitions or Y should be provided Parameters ---------- transform : array-like, shape=[n_samples, n_transform_features] transitions : array-like, optional, shape=[n_samples_y, n_samples] Transition matrix from `Y` (not provided) to `self.data` Y: array-like, optional, shape=[n_samples_y, n_dimensions] new data for which an affinity matrix is calculated to the existing data. `n_features` must match either the ambient or PCA dimensions Returns ------- Y_transform : array-like, [n_samples_y, n_features or n_pca] Transition matrix from `Y` to `self.data` Raises ------ ValueError: if neither `transitions` nor `Y` is provided """ if transitions is None: if Y is None: raise ValueError("Either `transitions` or `Y` must be provided.") else: transitions = self.extend_to_data(Y) Y_transform = transitions.dot(transform) return Y_transform
[docs] def set_params(self, **params): """Set parameters on this object Safe setter method - attributes should not be modified directly as some changes are not valid. Valid parameters: - n_jobs - verbose Parameters ---------- params : key-value pairs of parameter name and new values Returns ------- self """ if "n_jobs" in params: self.n_jobs = params["n_jobs"] if "verbose" in params: self.verbose = params["verbose"] _logger.set_level(self.verbose) super().set_params(**params) return self