from __future__ import division
from builtins import super
import numpy as np
from sklearn.neighbors import NearestNeighbors
from sklearn.utils.extmath import randomized_svd
from sklearn.preprocessing import normalize
from sklearn.cluster import MiniBatchKMeans
from scipy.spatial.distance import pdist, cdist
from scipy.spatial.distance import squareform
from scipy import sparse
import numbers
import warnings
import tasklogger
from . import matrix, utils
from .base import DataGraph, PyGSPGraph
_logger = tasklogger.get_tasklogger("graphtools")
[docs]class kNNGraph(DataGraph):
"""
K nearest neighbors graph
Parameters
----------
data : array-like, shape=[n_samples,n_features]
accepted types: `numpy.ndarray`, `scipy.sparse.spmatrix`,
`pandas.DataFrame`, `pandas.SparseDataFrame`.
knn : `int`, optional (default: 5)
Number of nearest neighbors (including self) to use to build the graph
decay : `int` or `None`, optional (default: `None`)
Rate of alpha decay to use. If `None`, alpha decay is not used.
bandwidth : `float`, list-like,`callable`, or `None`,
optional (default: `None`)
Fixed bandwidth to use. If given, overrides `knn`. Can be a single
bandwidth, or a list-like (shape=[n_samples]) of bandwidths for each
sample
bandwidth_scale : `float`, optional (default : 1.0)
Rescaling factor for bandwidth.
distance : `str`, optional (default: `'euclidean'`)
Any metric from `scipy.spatial.distance` can be used
distance metric for building kNN graph. Custom distance
functions of form `f(x, y) = d` are also accepted.
TODO: actually sklearn.neighbors has even more choices
thresh : `float`, optional (default: `1e-4`)
Threshold above which to calculate alpha decay kernel.
All affinities below `thresh` will be set to zero in order to save
on time and memory constraints.
Attributes
----------
knn_tree : `sklearn.neighbors.NearestNeighbors`
The fitted KNN tree. (cached)
TODO: can we be more clever than sklearn when it comes to choosing
between KD tree, ball tree and brute force?
"""
def __init__(
self,
data,
knn=5,
decay=None,
knn_max=None,
search_multiplier=6,
bandwidth=None,
bandwidth_scale=1.0,
distance="euclidean",
thresh=1e-4,
n_pca=None,
**kwargs
):
if decay is not None:
if thresh <= 0 and knn_max is None:
raise ValueError(
"Cannot instantiate a kNNGraph with `decay=None`, "
"`thresh=0` and `knn_max=None`. Use a TraditionalGraph instead."
)
elif thresh < np.finfo(float).eps:
thresh = np.finfo(float).eps
if callable(bandwidth):
raise NotImplementedError(
"Callable bandwidth is only supported by"
" graphtools.graphs.TraditionalGraph."
)
if knn is None and bandwidth is None:
raise ValueError("Either `knn` or `bandwidth` must be provided.")
elif knn is None and bandwidth is not None:
# implementation requires a knn value
knn = 5
if decay is None and bandwidth is not None:
warnings.warn("`bandwidth` is not used when `decay=None`.", UserWarning)
if knn > data.shape[0] - 2:
warnings.warn(
"Cannot set knn ({k}) to be greater than "
"n_samples - 2 ({n}). Setting knn={n}".format(
k=knn, n=data.shape[0] - 2
)
)
knn = data.shape[0] - 2
if knn_max is not None and knn_max < knn:
warnings.warn(
"Cannot set knn_max ({knn_max}) to be less than "
"knn ({knn}). Setting knn_max={knn}".format(knn=knn, knn_max=knn_max)
)
knn_max = knn
if n_pca in [None, 0, False] and data.shape[1] > 500:
warnings.warn(
"Building a kNNGraph on data of shape {} is "
"expensive. Consider setting n_pca.".format(data.shape),
UserWarning,
)
self.knn = knn
self.knn_max = knn_max
self.search_multiplier = search_multiplier
self.decay = decay
self.bandwidth = bandwidth
self.bandwidth_scale = bandwidth_scale
self.distance = distance
self.thresh = thresh
super().__init__(data, n_pca=n_pca, **kwargs)
[docs] def get_params(self):
"""Get parameters from this object
"""
params = super().get_params()
params.update(
{
"knn": self.knn,
"decay": self.decay,
"bandwidth": self.bandwidth,
"bandwidth_scale": self.bandwidth_scale,
"knn_max": self.knn_max,
"distance": self.distance,
"thresh": self.thresh,
"n_jobs": self.n_jobs,
"random_state": self.random_state,
"verbose": self.verbose,
}
)
return params
[docs] def set_params(self, **params):
"""Set parameters on this object
Safe setter method - attributes should not be modified directly as some
changes are not valid.
Valid parameters:
- n_jobs
- random_state
- verbose
Invalid parameters: (these would require modifying the kernel matrix)
- knn
- knn_max
- decay
- bandwidth
- bandwidth_scale
- distance
- thresh
Parameters
----------
params : key-value pairs of parameter name and new values
Returns
-------
self
"""
if "knn" in params and params["knn"] != self.knn:
raise ValueError("Cannot update knn. Please create a new graph")
if "knn_max" in params and params["knn_max"] != self.knn:
raise ValueError("Cannot update knn_max. Please create a new graph")
if "decay" in params and params["decay"] != self.decay:
raise ValueError("Cannot update decay. Please create a new graph")
if "bandwidth" in params and params["bandwidth"] != self.bandwidth:
raise ValueError("Cannot update bandwidth. Please create a new graph")
if (
"bandwidth_scale" in params
and params["bandwidth_scale"] != self.bandwidth_scale
):
raise ValueError("Cannot update bandwidth_scale. Please create a new graph")
if "distance" in params and params["distance"] != self.distance:
raise ValueError("Cannot update distance. " "Please create a new graph")
if "thresh" in params and params["thresh"] != self.thresh and self.decay != 0:
raise ValueError("Cannot update thresh. Please create a new graph")
if "n_jobs" in params:
self.n_jobs = params["n_jobs"]
if hasattr(self, "_knn_tree"):
self.knn_tree.set_params(n_jobs=self.n_jobs)
if "random_state" in params:
self.random_state = params["random_state"]
if "verbose" in params:
self.verbose = params["verbose"]
# update superclass parameters
super().set_params(**params)
return self
@property
def knn_tree(self):
"""KNN tree object (cached)
Builds or returns the fitted KNN tree.
TODO: can we be more clever than sklearn when it comes to choosing
between KD tree, ball tree and brute force?
Returns
-------
knn_tree : `sklearn.neighbors.NearestNeighbors`
"""
try:
return self._knn_tree
except AttributeError:
try:
self._knn_tree = NearestNeighbors(
n_neighbors=self.knn + 1,
algorithm="ball_tree",
metric=self.distance,
n_jobs=self.n_jobs,
).fit(self.data_nu)
except ValueError:
# invalid metric
warnings.warn(
"Metric {} not valid for `sklearn.neighbors.BallTree`. "
"Graph instantiation may be slower than normal.".format(
self.distance
),
UserWarning,
)
self._knn_tree = NearestNeighbors(
n_neighbors=self.knn + 1,
algorithm="auto",
metric=self.distance,
n_jobs=self.n_jobs,
).fit(self.data_nu)
return self._knn_tree
[docs] def build_kernel(self):
"""Build the KNN kernel.
Build a k nearest neighbors kernel, optionally with alpha decay.
Must return a symmetric matrix
Returns
-------
K : kernel matrix, shape=[n_samples, n_samples]
symmetric matrix with ones down the diagonal
with no non-negative entries.
"""
knn_max = self.knn_max + 1 if self.knn_max else None
K = self.build_kernel_to_data(self.data_nu, knn=self.knn + 1, knn_max=knn_max)
return K
def _check_duplicates(self, distances, indices):
if np.any(distances[:, 1] == 0):
has_duplicates = distances[:, 1] == 0
if np.sum(distances[:, 1:] == 0) < 20:
idx = np.argwhere((distances == 0) & has_duplicates[:, None])
duplicate_ids = np.array(
[
[indices[i[0], i[1]], i[0]]
for i in idx
if indices[i[0], i[1]] < i[0]
]
)
duplicate_ids = duplicate_ids[np.argsort(duplicate_ids[:, 0])]
duplicate_names = ", ".join(
["{} and {}".format(i[0], i[1]) for i in duplicate_ids]
)
warnings.warn(
"Detected zero distance between samples {}. "
"Consider removing duplicates to avoid errors in "
"downstream processing.".format(duplicate_names),
RuntimeWarning,
)
else:
warnings.warn(
"Detected zero distance between {} pairs of samples. "
"Consider removing duplicates to avoid errors in "
"downstream processing.".format(
np.sum(np.sum(distances[:, 1:] == 0)) // 2
),
RuntimeWarning,
)
[docs] def build_kernel_to_data(
self, Y, knn=None, knn_max=None, bandwidth=None, bandwidth_scale=None
):
"""Build a kernel from new input data `Y` to the `self.data`
Parameters
----------
Y: array-like, [n_samples_y, n_features]
new data for which an affinity matrix is calculated
to the existing data. `n_features` must match
either the ambient or PCA dimensions
knn : `int` or `None`, optional (default: `None`)
If `None`, defaults to `self.knn`
bandwidth : `float`, `callable`, or `None`, optional (default: `None`)
If `None`, defaults to `self.bandwidth`
bandwidth_scale : `float`, optional (default : `None`)
Rescaling factor for bandwidth.
If `None`, defaults to self.bandwidth_scale
Returns
-------
K_yx: array-like, [n_samples_y, n_samples]
kernel matrix where each row represents affinities of a single
sample in `Y` to all samples in `self.data`.
Raises
------
ValueError: if the supplied data is the wrong shape
"""
if knn is None:
knn = self.knn
if bandwidth is None:
bandwidth = self.bandwidth
if bandwidth_scale is None:
bandwidth_scale = self.bandwidth_scale
if knn > self.data.shape[0]:
warnings.warn(
"Cannot set knn ({k}) to be greater than "
"n_samples ({n}). Setting knn={n}".format(
k=knn, n=self.data_nu.shape[0]
)
)
knn = self.data_nu.shape[0]
if knn_max is None:
knn_max = self.data_nu.shape[0]
Y = self._check_extension_shape(Y)
if self.decay is None or self.thresh == 1:
with _logger.task("KNN search"):
# binary connectivity matrix
K = self.knn_tree.kneighbors_graph(
Y, n_neighbors=knn, mode="connectivity"
)
else:
with _logger.task("KNN search"):
# sparse fast alpha decay
knn_tree = self.knn_tree
search_knn = min(knn * self.search_multiplier, knn_max)
distances, indices = knn_tree.kneighbors(Y, n_neighbors=search_knn)
self._check_duplicates(distances, indices)
with _logger.task("affinities"):
if bandwidth is None:
bandwidth = distances[:, knn - 1]
bandwidth = bandwidth * bandwidth_scale
# check for zero bandwidth
bandwidth = np.maximum(bandwidth, np.finfo(float).eps)
radius = bandwidth * np.power(-1 * np.log(self.thresh), 1 / self.decay)
update_idx = np.argwhere(np.max(distances, axis=1) < radius).reshape(-1)
_logger.debug(
"search_knn = {}; {} remaining".format(search_knn, len(update_idx))
)
if len(update_idx) > 0:
distances = [d for d in distances]
indices = [i for i in indices]
# increase the knn search
search_knn = min(search_knn * self.search_multiplier, knn_max)
while (
len(update_idx) > Y.shape[0] // 10
and search_knn < self.data_nu.shape[0] / 2
and search_knn < knn_max
):
dist_new, ind_new = knn_tree.kneighbors(
Y[update_idx], n_neighbors=search_knn
)
for i, idx in enumerate(update_idx):
distances[idx] = dist_new[i]
indices[idx] = ind_new[i]
update_idx = [
i
for i, d in enumerate(distances)
if np.max(d)
< (
radius
if isinstance(bandwidth, numbers.Number)
else radius[i]
)
]
_logger.debug(
"search_knn = {}; {} remaining".format(
search_knn, len(update_idx)
)
)
# increase the knn search
search_knn = min(search_knn * self.search_multiplier, knn_max)
if search_knn > self.data_nu.shape[0] / 2:
knn_tree = NearestNeighbors(
n_neighbors=search_knn, algorithm="brute", n_jobs=self.n_jobs
).fit(self.data_nu)
if len(update_idx) > 0:
if search_knn == knn_max:
_logger.debug(
"knn search to knn_max ({}) on {}".format(
knn_max, len(update_idx)
)
)
# give up - search out to knn_max
dist_new, ind_new = knn_tree.kneighbors(
Y[update_idx], n_neighbors=search_knn
)
for i, idx in enumerate(update_idx):
distances[idx] = dist_new[i]
indices[idx] = ind_new[i]
else:
_logger.debug("radius search on {}".format(len(update_idx)))
# give up - radius search
dist_new, ind_new = knn_tree.radius_neighbors(
Y[update_idx, :],
radius=radius
if isinstance(bandwidth, numbers.Number)
else np.max(radius[update_idx]),
)
for i, idx in enumerate(update_idx):
distances[idx] = dist_new[i]
indices[idx] = ind_new[i]
if isinstance(bandwidth, numbers.Number):
data = np.concatenate(distances) / bandwidth
else:
data = np.concatenate(
[distances[i] / bandwidth[i] for i in range(len(distances))]
)
indices = np.concatenate(indices)
indptr = np.concatenate([[0], np.cumsum([len(d) for d in distances])])
K = sparse.csr_matrix(
(data, indices, indptr), shape=(Y.shape[0], self.data_nu.shape[0])
)
K.data = np.exp(-1 * np.power(K.data, self.decay))
# handle nan
K.data = np.where(np.isnan(K.data), 1, K.data)
# TODO: should we zero values that are below thresh?
K.data[K.data < self.thresh] = 0
K = K.tocoo()
K.eliminate_zeros()
K = K.tocsr()
return K
[docs]class LandmarkGraph(DataGraph):
"""Landmark graph
Adds landmarking feature to any data graph by taking spectral clusters
and building a 'landmark operator' from clusters to samples and back to
clusters.
Any transformation on the landmark kernel is trivially extended to the
data space by multiplying by the transition matrix.
Parameters
----------
data : array-like, shape=[n_samples,n_features]
accepted types: `numpy.ndarray`, `scipy.sparse.spmatrix`.,
`pandas.DataFrame`, `pandas.SparseDataFrame`.
n_landmark : `int`, optional (default: 2000)
number of landmarks to use
n_svd : `int`, optional (default: 100)
number of SVD components to use for spectral clustering
Attributes
----------
landmark_op : array-like, shape=[n_landmark, n_landmark]
Landmark operator.
Can be treated as a diffusion operator between landmarks.
transitions : array-like, shape=[n_samples, n_landmark]
Transition probabilities between samples and landmarks.
clusters : array-like, shape=[n_samples]
Private attribute. Cluster assignments for each sample.
Examples
--------
>>> G = graphtools.Graph(data, n_landmark=1000)
>>> X_landmark = transform(G.landmark_op)
>>> X_full = G.interpolate(X_landmark)
"""
def __init__(self, data, n_landmark=2000, n_svd=100, **kwargs):
"""Initialize a landmark graph.
Raises
------
RuntimeWarning : if too many SVD dimensions or
too few landmarks are used
"""
if n_landmark >= data.shape[0]:
raise ValueError(
"n_landmark ({}) >= n_samples ({}). Use "
"kNNGraph instead".format(n_landmark, data.shape[0])
)
if n_svd >= data.shape[0]:
warnings.warn(
"n_svd ({}) >= n_samples ({}) Consider "
"using kNNGraph or lower n_svd".format(n_svd, data.shape[0]),
RuntimeWarning,
)
self.n_landmark = n_landmark
self.n_svd = n_svd
super().__init__(data, **kwargs)
[docs] def get_params(self):
"""Get parameters from this object
"""
params = super().get_params()
params.update({"n_landmark": self.n_landmark, "n_pca": self.n_pca})
return params
[docs] def set_params(self, **params):
"""Set parameters on this object
Safe setter method - attributes should not be modified directly as some
changes are not valid.
Valid parameters:
- n_landmark
- n_svd
Parameters
----------
params : key-value pairs of parameter name and new values
Returns
-------
self
"""
# update parameters
reset_landmarks = False
if "n_landmark" in params and params["n_landmark"] != self.n_landmark:
self.n_landmark = params["n_landmark"]
reset_landmarks = True
if "n_svd" in params and params["n_svd"] != self.n_svd:
self.n_svd = params["n_svd"]
reset_landmarks = True
# update superclass parameters
super().set_params(**params)
# reset things that changed
if reset_landmarks:
self._reset_landmarks()
return self
def _reset_landmarks(self):
"""Reset landmark data
Landmarks can be recomputed without recomputing the kernel
"""
try:
del self._landmark_op
del self._transitions
del self._clusters
except AttributeError:
# landmarks aren't currently defined
pass
@property
def landmark_op(self):
"""Landmark operator
Compute or return the landmark operator
Returns
-------
landmark_op : array-like, shape=[n_landmark, n_landmark]
Landmark operator. Can be treated as a diffusion operator between
landmarks.
"""
try:
return self._landmark_op
except AttributeError:
self.build_landmark_op()
return self._landmark_op
@property
def clusters(self):
"""Cluster assignments for each sample.
Compute or return the cluster assignments
Returns
-------
clusters : list-like, shape=[n_samples]
Cluster assignments for each sample.
"""
try:
return self._clusters
except AttributeError:
self.build_landmark_op()
return self._clusters
@property
def transitions(self):
"""Transition matrix from samples to landmarks
Compute the landmark operator if necessary, then return the
transition matrix.
Returns
-------
transitions : array-like, shape=[n_samples, n_landmark]
Transition probabilities between samples and landmarks.
"""
try:
return self._transitions
except AttributeError:
self.build_landmark_op()
return self._transitions
def _landmarks_to_data(self):
landmarks = np.unique(self.clusters)
if sparse.issparse(self.kernel):
pmn = sparse.vstack(
[
sparse.csr_matrix(self.kernel[self.clusters == i, :].sum(axis=0))
for i in landmarks
]
)
else:
pmn = np.array(
[np.sum(self.kernel[self.clusters == i, :], axis=0) for i in landmarks]
)
return pmn
def _data_transitions(self):
return normalize(self._landmarks_to_data(), "l1", axis=1)
[docs] def build_landmark_op(self):
"""Build the landmark operator
Calculates spectral clusters on the kernel, and calculates transition
probabilities between cluster centers by using transition probabilities
between samples assigned to each cluster.
"""
with _logger.task("landmark operator"):
is_sparse = sparse.issparse(self.kernel)
# spectral clustering
with _logger.task("SVD"):
_, _, VT = randomized_svd(
self.diff_aff,
n_components=self.n_svd,
random_state=self.random_state,
)
with _logger.task("KMeans"):
kmeans = MiniBatchKMeans(
self.n_landmark,
init_size=3 * self.n_landmark,
batch_size=10000,
random_state=self.random_state,
)
self._clusters = kmeans.fit_predict(self.diff_op.dot(VT.T))
# transition matrices
pmn = self._landmarks_to_data()
# row normalize
pnm = pmn.transpose()
pmn = normalize(pmn, norm="l1", axis=1)
pnm = normalize(pnm, norm="l1", axis=1)
landmark_op = pmn.dot(pnm) # sparsity agnostic matrix multiplication
if is_sparse:
# no need to have a sparse landmark operator
landmark_op = landmark_op.toarray()
# store output
self._landmark_op = landmark_op
self._transitions = pnm
[docs] def extend_to_data(self, data, **kwargs):
"""Build transition matrix from new data to the graph
Creates a transition matrix such that `Y` can be approximated by
a linear combination of landmarks. Any
transformation of the landmarks can be trivially applied to `Y` by
performing
`transform_Y = transitions.dot(transform)`
Parameters
----------
Y: array-like, [n_samples_y, n_features]
new data for which an affinity matrix is calculated
to the existing data. `n_features` must match
either the ambient or PCA dimensions
Returns
-------
transitions : array-like, [n_samples_y, self.data.shape[0]]
Transition matrix from `Y` to `self.data`
"""
kernel = self.build_kernel_to_data(data, **kwargs)
if sparse.issparse(kernel):
pnm = sparse.hstack(
[
sparse.csr_matrix(kernel[:, self.clusters == i].sum(axis=1))
for i in np.unique(self.clusters)
]
)
else:
pnm = np.array(
[
np.sum(kernel[:, self.clusters == i], axis=1).T
for i in np.unique(self.clusters)
]
).transpose()
pnm = normalize(pnm, norm="l1", axis=1)
return pnm
[docs] def interpolate(self, transform, transitions=None, Y=None):
"""Interpolate new data onto a transformation of the graph data
One of either transitions or Y should be provided
Parameters
----------
transform : array-like, shape=[n_samples, n_transform_features]
transitions : array-like, optional, shape=[n_samples_y, n_samples]
Transition matrix from `Y` (not provided) to `self.data`
Y: array-like, optional, shape=[n_samples_y, n_features]
new data for which an affinity matrix is calculated
to the existing data. `n_features` must match
either the ambient or PCA dimensions
Returns
-------
Y_transform : array-like, [n_samples_y, n_features or n_pca]
Transition matrix from `Y` to `self.data`
"""
if transitions is None and Y is None:
# assume Y is self.data and use standard landmark transitions
transitions = self.transitions
return super().interpolate(transform, transitions=transitions, Y=Y)
[docs]class TraditionalGraph(DataGraph):
"""Traditional weighted adjacency graph
Parameters
----------
data : array-like, shape=[n_samples,n_features]
accepted types: `numpy.ndarray`, `scipy.sparse.spmatrix`,
`pandas.DataFrame`, `pandas.SparseDataFrame`.
If `precomputed` is not `None`, data should be an
[n_samples, n_samples] matrix denoting pairwise distances,
affinities, or edge weights.
knn : `int`, optional (default: 5)
Number of nearest neighbors (including self) to use to build the graph
decay : `int` or `None`, optional (default: 40)
Rate of alpha decay to use. If `None`, alpha decay is not used.
bandwidth : `float`, list-like,`callable`, or `None`, optional (default: `None`)
Fixed bandwidth to use. If given, overrides `knn`. Can be a single
bandwidth, list-like (shape=[n_samples]) of bandwidths for each
sample, or a `callable` that takes in a `n x m` matrix and returns a
a single value or list-like of length n (shape=[n_samples])
bandwidth_scale : `float`, optional (default : 1.0)
Rescaling factor for bandwidth.
distance : `str`, optional (default: `'euclidean'`)
Any metric from `scipy.spatial.distance` can be used
distance metric for building kNN graph.
TODO: actually sklearn.neighbors has even more choices
n_pca : {`int`, `None`, `bool`, 'auto'}, optional (default: `None`)
number of PC dimensions to retain for graph building.
If n_pca in `[None,False,0]`, uses the original data.
If `True` then estimate using a singular value threshold
Note: if data is sparse, uses SVD instead of PCA
TODO: should we subtract and store the mean?
rank_threshold : `float`, 'auto', optional (default: 'auto')
threshold to use when estimating rank for
`n_pca in [True, 'auto']`.
Note that the default kwarg is `None` for this parameter.
It is subsequently parsed to 'auto' if necessary.
If 'auto', this threshold is
smax * np.finfo(data.dtype).eps * max(data.shape)
where smax is the maximum singular value of the data matrix.
For reference, see, e.g.
W. Press, S. Teukolsky, W. Vetterling and B. Flannery,
“Numerical Recipes (3rd edition)”,
Cambridge University Press, 2007, page 795.
thresh : `float`, optional (default: `1e-4`)
Threshold above which to calculate alpha decay kernel.
All affinities below `thresh` will be set to zero in order to save
on time and memory constraints.
precomputed : {'distance', 'affinity', 'adjacency', `None`},
optional (default: `None`)
If the graph is precomputed, this variable denotes which graph
matrix is provided as `data`.
Only one of `precomputed` and `n_pca` can be set.
"""
def __init__(
self,
data,
knn=5,
decay=40,
bandwidth=None,
bandwidth_scale=1.0,
distance="euclidean",
n_pca=None,
thresh=1e-4,
precomputed=None,
**kwargs
):
if decay is None and precomputed not in ["affinity", "adjacency"]:
# decay high enough is basically a binary kernel
raise ValueError(
"`decay` must be provided for a "
"TraditionalGraph. For kNN kernel, use kNNGraph."
)
if precomputed is not None and n_pca not in [None, 0, False]:
# the data itself is a matrix of distances / affinities
n_pca = None
warnings.warn(
"n_pca cannot be given on a precomputed graph." " Setting n_pca=None",
RuntimeWarning,
)
if knn is None and bandwidth is None:
raise ValueError("Either `knn` or `bandwidth` must be provided.")
if knn is not None and knn > data.shape[0] - 2:
warnings.warn(
"Cannot set knn ({k}) to be greater than "
" n_samples - 2 ({n}). Setting knn={n}".format(
k=knn, n=data.shape[0] - 2
)
)
knn = data.shape[0] - 2
if precomputed is not None:
if precomputed not in ["distance", "affinity", "adjacency"]:
raise ValueError(
"Precomputed value {} not recognized. "
"Choose from ['distance', 'affinity', "
"'adjacency']".format(precomputed)
)
elif data.shape[0] != data.shape[1]:
raise ValueError(
"Precomputed {} must be a square matrix. "
"{} was given".format(precomputed, data.shape)
)
elif (data < 0).sum() > 0:
raise ValueError(
"Precomputed {} should be " "non-negative".format(precomputed)
)
self.knn = knn
self.decay = decay
self.bandwidth = bandwidth
self.bandwidth_scale = bandwidth_scale
self.distance = distance
self.thresh = thresh
self.precomputed = precomputed
super().__init__(data, n_pca=n_pca, **kwargs)
[docs] def get_params(self):
"""Get parameters from this object
"""
params = super().get_params()
params.update(
{
"knn": self.knn,
"decay": self.decay,
"bandwidth": self.bandwidth,
"bandwidth_scale": self.bandwidth_scale,
"distance": self.distance,
"precomputed": self.precomputed,
}
)
return params
[docs] def set_params(self, **params):
"""Set parameters on this object
Safe setter method - attributes should not be modified directly as some
changes are not valid.
Invalid parameters: (these would require modifying the kernel matrix)
- precomputed
- distance
- knn
- decay
- bandwidth
- bandwidth_scale
Parameters
----------
params : key-value pairs of parameter name and new values
Returns
-------
self
"""
if "precomputed" in params and params["precomputed"] != self.precomputed:
raise ValueError("Cannot update precomputed. " "Please create a new graph")
if (
"distance" in params
and params["distance"] != self.distance
and self.precomputed is None
):
raise ValueError("Cannot update distance. " "Please create a new graph")
if "knn" in params and params["knn"] != self.knn and self.precomputed is None:
raise ValueError("Cannot update knn. Please create a new graph")
if (
"decay" in params
and params["decay"] != self.decay
and self.precomputed is None
):
raise ValueError("Cannot update decay. Please create a new graph")
if (
"bandwidth" in params
and params["bandwidth"] != self.bandwidth
and self.precomputed is None
):
raise ValueError("Cannot update bandwidth. Please create a new graph")
if (
"bandwidth_scale" in params
and params["bandwidth_scale"] != self.bandwidth_scale
):
raise ValueError("Cannot update bandwidth_scale. Please create a new graph")
# update superclass parameters
super().set_params(**params)
return self
[docs] def build_kernel(self):
"""Build the KNN kernel.
Build a k nearest neighbors kernel, optionally with alpha decay.
If `precomputed` is not `None`, the appropriate steps in the kernel
building process are skipped.
Must return a symmetric matrix
Returns
-------
K : kernel matrix, shape=[n_samples, n_samples]
symmetric matrix with ones down the diagonal
with no non-negative entries.
Raises
------
ValueError: if `precomputed` is not an acceptable value
"""
if self.precomputed == "affinity":
# already done
# TODO: should we check that precomputed matrices look okay?
# e.g. check the diagonal
K = self.data_nu
elif self.precomputed == "adjacency":
# need to set diagonal to one to make it an affinity matrix
K = self.data_nu
if sparse.issparse(K) and not (
isinstance(K, sparse.dok_matrix) or isinstance(K, sparse.lil_matrix)
):
K = K.tolil()
K = matrix.set_diagonal(K, 1)
else:
with _logger.task("affinities"):
if sparse.issparse(self.data_nu):
self.data_nu = self.data_nu.toarray()
if self.precomputed == "distance":
pdx = self.data_nu
elif self.precomputed is None:
pdx = pdist(self.data_nu, metric=self.distance)
if np.any(pdx == 0):
pdx = squareform(pdx)
duplicate_ids = np.array(
[i for i in np.argwhere(pdx == 0) if i[1] > i[0]]
)
if len(duplicate_ids) < 20:
duplicate_names = ", ".join(
["{} and {}".format(i[0], i[1]) for i in duplicate_ids]
)
warnings.warn(
"Detected zero distance between samples {}. "
"Consider removing duplicates to avoid errors in "
"downstream processing.".format(duplicate_names),
RuntimeWarning,
)
else:
warnings.warn(
"Detected zero distance between {} pairs of samples. "
"Consider removing duplicates to avoid errors in "
"downstream processing.".format(len(duplicate_ids)),
RuntimeWarning,
)
else:
pdx = squareform(pdx)
else:
raise ValueError(
"precomputed='{}' not recognized. "
"Choose from ['affinity', 'adjacency', 'distance', "
"None]".format(self.precomputed)
)
if self.bandwidth is None:
knn_dist = np.partition(pdx, self.knn + 1, axis=1)[
:, : self.knn + 1
]
bandwidth = np.max(knn_dist, axis=1)
elif callable(self.bandwidth):
bandwidth = self.bandwidth(pdx)
else:
bandwidth = self.bandwidth
bandwidth = bandwidth * self.bandwidth_scale
pdx = (pdx.T / bandwidth).T
K = np.exp(-1 * np.power(pdx, self.decay))
# handle nan
K = np.where(np.isnan(K), 1, K)
# truncate
if sparse.issparse(K):
if not (
isinstance(K, sparse.csr_matrix)
or isinstance(K, sparse.csc_matrix)
or isinstance(K, sparse.bsr_matrix)
):
K = K.tocsr()
K.data[K.data < self.thresh] = 0
K = K.tocoo()
K.eliminate_zeros()
K = K.tocsr()
else:
K[K < self.thresh] = 0
return K
[docs] def build_kernel_to_data(self, Y, knn=None, bandwidth=None, bandwidth_scale=None):
"""Build transition matrix from new data to the graph
Creates a transition matrix such that `Y` can be approximated by
a linear combination of landmarks. Any
transformation of the landmarks can be trivially applied to `Y` by
performing
`transform_Y = transitions.dot(transform)`
Parameters
----------
Y: array-like, [n_samples_y, n_features]
new data for which an affinity matrix is calculated
to the existing data. `n_features` must match
either the ambient or PCA dimensions
Returns
-------
transitions : array-like, [n_samples_y, self.data.shape[0]]
Transition matrix from `Y` to `self.data`
Raises
------
ValueError: if `precomputed` is not `None`, then the graph cannot
be extended.
"""
if knn is None:
knn = self.knn
if bandwidth is None:
bandwidth = self.bandwidth
if bandwidth_scale is None:
bandwidth_scale = self.bandwidth_scale
if self.precomputed is not None:
raise ValueError("Cannot extend kernel on precomputed graph")
else:
with _logger.task("affinities"):
Y = self._check_extension_shape(Y)
pdx = cdist(Y, self.data_nu, metric=self.distance)
if bandwidth is None:
knn_dist = np.partition(pdx, knn, axis=1)[:, :knn]
bandwidth = np.max(knn_dist, axis=1)
elif callable(bandwidth):
bandwidth = bandwidth(pdx)
bandwidth = bandwidth_scale * bandwidth
pdx = (pdx.T / bandwidth).T
K = np.exp(-1 * pdx ** self.decay)
# handle nan
K = np.where(np.isnan(K), 1, K)
K[K < self.thresh] = 0
return K
@property
def weighted(self):
if self.precomputed is not None:
return not matrix.nonzero_discrete(self.K, [0.5, 1])
else:
return super().weighted
def _check_shortest_path_distance(self, distance):
if self.precomputed is not None:
if distance == "data":
raise ValueError(
"Graph shortest path with data distance not "
"valid for precomputed graphs. For precomputed graphs, "
"use `distance='constant'` for unweighted graphs and "
"`distance='affinity'` for weighted graphs."
)
super()._check_shortest_path_distance(distance)
def _default_shortest_path_distance(self):
if self.precomputed is not None and not self.weighted:
distance = "constant"
_logger.info("Using constant distances.")
else:
distance = super()._default_shortest_path_distance()
return distance
[docs]class MNNGraph(DataGraph):
"""Mutual nearest neighbors graph
Performs batch correction by forcing connections between batches, but
only when the connection is mutual (i.e. x is a neighbor of y _and_
y is a neighbor of x).
Parameters
----------
data : array-like, shape=[n_samples,n_features]
accepted types: `numpy.ndarray`,
`scipy.sparse.spmatrix`,
`pandas.DataFrame`, `pandas.SparseDataFrame`.
sample_idx : array-like, shape=[n_samples]
Batch index
beta : `float`, optional (default: 1)
Downweight between-batch affinities by beta
adaptive_k : {'min', 'mean', 'sqrt', `None`} (default: None)
Weights MNN kernel adaptively using the number of cells in
each sample according to the selected method.
Attributes
----------
subgraphs : list of `graphtools.graphs.kNNGraph`
Graphs representing each batch separately
"""
def __init__(
self,
data,
sample_idx,
knn=5,
beta=1,
n_pca=None,
decay=None,
adaptive_k=None,
bandwidth=None,
distance="euclidean",
thresh=1e-4,
n_jobs=1,
**kwargs
):
self.beta = beta
self.sample_idx = sample_idx
self.samples, self.n_cells = np.unique(self.sample_idx, return_counts=True)
self.knn = knn
self.decay = decay
self.distance = distance
self.bandwidth = bandwidth
self.thresh = thresh
self.n_jobs = n_jobs
if sample_idx is None:
raise ValueError(
"sample_idx must be given. For a graph without"
" batch correction, use kNNGraph."
)
elif len(sample_idx) != data.shape[0]:
raise ValueError(
"sample_idx ({}) must be the same length as "
"data ({})".format(len(sample_idx), data.shape[0])
)
elif len(self.samples) == 1:
raise ValueError("sample_idx must contain more than one unique value")
if adaptive_k is not None:
warnings.warn(
"`adaptive_k` has been deprecated. Using fixed knn.", DeprecationWarning
)
super().__init__(data, n_pca=n_pca, **kwargs)
def _check_symmetrization(self, kernel_symm, theta):
if (
(kernel_symm == "theta" or kernel_symm == "mnn")
and theta is not None
and not isinstance(theta, numbers.Number)
):
raise TypeError(
"Expected `theta` as a float. " "Got {}.".format(type(theta))
)
else:
super()._check_symmetrization(kernel_symm, theta)
[docs] def get_params(self):
"""Get parameters from this object
"""
params = super().get_params()
params.update(
{
"beta": self.beta,
"knn": self.knn,
"decay": self.decay,
"bandwidth": self.bandwidth,
"distance": self.distance,
"thresh": self.thresh,
"n_jobs": self.n_jobs,
}
)
return params
[docs] def set_params(self, **params):
"""Set parameters on this object
Safe setter method - attributes should not be modified directly as some
changes are not valid.
Valid parameters:
- n_jobs
- random_state
- verbose
Invalid parameters: (these would require modifying the kernel matrix)
- knn
- adaptive_k
- decay
- distance
- thresh
- beta
Parameters
----------
params : key-value pairs of parameter name and new values
Returns
-------
self
"""
# mnn specific arguments
if "beta" in params and params["beta"] != self.beta:
raise ValueError("Cannot update beta. Please create a new graph")
# knn arguments
knn_kernel_args = ["knn", "decay", "distance", "thresh", "bandwidth"]
knn_other_args = ["n_jobs", "random_state", "verbose"]
for arg in knn_kernel_args:
if arg in params and params[arg] != getattr(self, arg):
raise ValueError(
"Cannot update {}. " "Please create a new graph".format(arg)
)
for arg in knn_other_args:
if arg in params:
self.__setattr__(arg, params[arg])
for g in self.subgraphs:
g.set_params(**{arg: params[arg]})
# update superclass parameters
super().set_params(**params)
return self
[docs] def build_kernel(self):
"""Build the MNN kernel.
Build a mutual nearest neighbors kernel.
Returns
-------
K : kernel matrix, shape=[n_samples, n_samples]
symmetric matrix with ones down the diagonal
with no non-negative entries.
"""
with _logger.task("subgraphs"):
self.subgraphs = []
from .api import Graph
# iterate through sample ids
for i, idx in enumerate(self.samples):
_logger.debug(
"subgraph {}: sample {}, "
"n = {}, knn = {}".format(
i, idx, np.sum(self.sample_idx == idx), self.knn
)
)
# select data for sample
data = self.data_nu[self.sample_idx == idx]
# build a kNN graph for cells within sample
graph = Graph(
data,
n_pca=None,
knn=self.knn,
decay=self.decay,
bandwidth=self.bandwidth,
distance=self.distance,
thresh=self.thresh,
verbose=self.verbose,
random_state=self.random_state,
n_jobs=self.n_jobs,
kernel_symm="+",
initialize=True,
)
self.subgraphs.append(graph) # append to list of subgraphs
with _logger.task("MNN kernel"):
if self.thresh > 0 or self.decay is None:
K = sparse.lil_matrix((self.data_nu.shape[0], self.data_nu.shape[0]))
else:
K = np.zeros([self.data_nu.shape[0], self.data_nu.shape[0]])
for i, X in enumerate(self.subgraphs):
K = matrix.set_submatrix(
K,
self.sample_idx == self.samples[i],
self.sample_idx == self.samples[i],
X.K,
)
within_batch_norm = np.array(np.sum(X.K, 1)).flatten()
for j, Y in enumerate(self.subgraphs):
if i == j:
continue
with _logger.task(
"kernel from sample {} to {}".format(
self.samples[i], self.samples[j]
)
):
Kij = Y.build_kernel_to_data(X.data_nu, knn=self.knn)
between_batch_norm = np.array(np.sum(Kij, 1)).flatten()
scale = (
np.minimum(1, within_batch_norm / between_batch_norm)
* self.beta
)
if sparse.issparse(Kij):
Kij = Kij.multiply(scale[:, None])
else:
Kij = Kij * scale[:, None]
K = matrix.set_submatrix(
K,
self.sample_idx == self.samples[i],
self.sample_idx == self.samples[j],
Kij,
)
return K
[docs] def build_kernel_to_data(self, Y, theta=None):
"""Build transition matrix from new data to the graph
Creates a transition matrix such that `Y` can be approximated by
a linear combination of landmarks. Any
transformation of the landmarks can be trivially applied to `Y` by
performing
`transform_Y = transitions.dot(transform)`
Parameters
----------
Y : array-like, [n_samples_y, n_features]
new data for which an affinity matrix is calculated
to the existing data. `n_features` must match
either the ambient or PCA dimensions
theta : array-like or `None`, optional (default: `None`)
if `self.theta` is a matrix, theta values must be explicitly
specified between `Y` and each sample in `self.data`
Returns
-------
transitions : array-like, [n_samples_y, self.data.shape[0]]
Transition matrix from `Y` to `self.data`
"""
raise NotImplementedError
[docs]class kNNLandmarkGraph(kNNGraph, LandmarkGraph):
pass
[docs]class MNNLandmarkGraph(MNNGraph, LandmarkGraph):
pass
[docs]class TraditionalLandmarkGraph(TraditionalGraph, LandmarkGraph):
pass
[docs]class kNNPyGSPGraph(kNNGraph, PyGSPGraph):
pass
[docs]class MNNPyGSPGraph(MNNGraph, PyGSPGraph):
pass
[docs]class TraditionalPyGSPGraph(TraditionalGraph, PyGSPGraph):
pass
[docs]class kNNLandmarkPyGSPGraph(kNNGraph, LandmarkGraph, PyGSPGraph):
pass
[docs]class MNNLandmarkPyGSPGraph(MNNGraph, LandmarkGraph, PyGSPGraph):
pass
[docs]class TraditionalLandmarkPyGSPGraph(TraditionalGraph, LandmarkGraph, PyGSPGraph):
pass