#! /usr/bin/env python
#
# Copyright (C) 2016 Rich Lewis <rl403@cam.ac.uk>
# License: 3-clause BSD
"""
## skchem.cross_validation.similarity_threshold
Similarity threshold dataset partitioning functionality.
"""
import logging
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.spatial.distance import pdist, cdist, squareform
from scipy.sparse import triu, dok_matrix
from scipy.optimize import minimize_scalar
from sklearn.manifold import TSNE, MDS
import multiprocessing
from functools import partial, wraps
from .. import features
LOGGER = logging.getLogger(__name__)
[docs]def returns_pairs(func):
""" Wraps a function that returns a ((i, j), sim) list to return a
dataframe. """
@wraps(func)
def inner(*args, **kwargs):
pairs = func(*args, **kwargs)
return pd.DataFrame([(p[0][0], p[0][1], p[1]) for p in pairs],
columns=['i', 'j', 'sim']).sort_values('sim')
return inner
def _above_minimum(args, inp, metric, threshold, size):
""" finds pairs above a minimum similarity in chunks """
i, j = slice(*args[0]), slice(*args[1])
x_i, x_j = inp[i], inp[j]
sim_mat = 1 - cdist(x_i, x_j, metric=metric)
if i == j:
sim_mat = np.triu(sim_mat, k=1)
sim_mat[sim_mat <= threshold] = 0
sparse_mat = dok_matrix((size, size), dtype=float)
sparse_mat[i, j] = sim_mat
return list(sparse_mat.items())
[docs]class SimThresholdSplit(object):
def __init__(self, min_threshold=0.45, largest_cluster_fraction=0.1,
fper='morgan', similarity_metric='jaccard',
memory_optimized=True, n_jobs=1, block_width=1000,
verbose=False):
""" Threshold similarity split for chemical datasets.
This class implements a splitting technique that will pool compounds
with similarity above a theshold into the same splits. The threshold
value is decided by specifying the maximum number of compounds to pool
into a cluster, as the density of compounds varies with dataset.
Machine learning techniques should be able to extrapolate outside of a
molecular series, or scaffold, however random splits will result in
some 'easy' test sets that are either *identical* or in the same
molecular series or share a significant scaffold with training set
compounds.
This splitting technique reduces or eliminates (depending on the
threshold set) this effect, making the problem harder.
Args:
min_threshold (float):
The minimum similarity threshold. Lower will be slower.
largest_cluster_fraction (float):
The fraction of the total dataset the largest cluster can be.
This decided the final similarity threshold.
fper (str or skchem.Fingerprinter):
The fingerprinting technique to use to generate the similarity
matrix.
similarity_metric (str or callable):
The similarity metric to use.
memory_optimized (bool):
Whether to use the memory optimized implementation.
n_jobs (int):
If memory_optimized is True, how many processes to run it over.
block_width (int):
If memory_optimized, what block length to use. This is the
width of the submatrices that are calculated at a time.
Notes:
The splits will not always be exactly the size requested, due to
the constraint and requirement to maintain random shuffling.
"""
self.index = self.fps = self._n_jobs = self._n_instances_ = None
self.pairs_ = self.threshold_ = None
if isinstance(fper, str):
fper = features.get(fper)
self.fper = fper
self.similarity_metric = similarity_metric
self.memory_optimized = memory_optimized
self.n_jobs = n_jobs
self._block_width = block_width
self.min_threshold = min_threshold
self.largest_cluster = largest_cluster_fraction
if self.fper:
self.fper.verbose = verbose
[docs] def fit(self, inp, pairs=None):
""" Fit the cross validator to the data.
Args:
inp (pd.Series or pd.DataFrame or np.array):
- `pd.Series` of `Mol` instances
- `pd.DataFrame` with `Mol` instances as a `structure` row.
- `pd.DataFrame` of fingerprints if `fper` is `None`
- `pd.DataFrame` of sim matrix if `similarity_metric` is `None`
- `np.array` of sim matrix if `similarity_metric` is `None`
pairs (list<tuple<tuple(i, j), k>>):
An optional precalculated list of pairwise distances.
"""
self.n_instances_ = len(inp)
self.pairs_ = pairs
if isinstance(inp, (pd.Series, pd.DataFrame)):
self.index = inp.index
else:
self.index = pd.RangeIndex(len(inp), name='batch')
if self.similarity_metric is None:
# we were passed a similarity matrix directly
self.pairs_ = self._pairs_from_sim_mat(inp)
elif self.fper is None:
# we were passed fingerprints directly
self.fps = inp
if self.pairs_ is None:
self.pairs_ = self._pairs_from_fps(inp)
else:
# we were passed Mol
if self.pairs_ is None:
self.fps = self.fper.transform(inp)
self.pairs_ = self._pairs_from_fps(self.fps)
self.threshold_ = self._optimal_thresh()
return self
@property
def n_jobs(self):
""" The number of processes to use to calculate the distance matrix.
-1 for all available. """
return self._n_jobs
@n_jobs.setter
def n_jobs(self, val):
if val == -1:
self._n_jobs = multiprocessing.cpu_count()
else:
self._n_jobs = val
@property
def block_width(self):
""" The width of the subsets of features.
Note:
Only used in parallelized. """
return self._block_width
@block_width.setter
def block_width(self, val):
assert val <= self.n_instances_, 'The block width should be less than '
'or equal to the number of instances'
self._block_width = val
@property
def n_instances_(self):
""" The number of instances that were used to fit the object. """
return self._n_instances_
@n_instances_.setter
def n_instances_(self, val):
assert val >= self._block_width, 'The block width should be less than '
'or equal to the number of instances'
self._n_instances_ = val
def _cluster_cumsum(self, shuffled=True):
nums = self.clusters.value_counts()
if shuffled:
nums = nums.ix[np.random.permutation(nums.index)].cumsum()
return nums
[docs] def split(self, ratio):
""" Return splits of the data with thresholded similarity according to a
specified ratio.
Args:
ratio (tuple[ints]):
the ratio to use.
Returns:
generator[pd.Series]:
Generator of boolean split masks for the reqested splits.
Example:
>>> ms = skchem.data.Diversity.read_frame('structure') # doctest: +SKIP
>>> st = SimThresholdSplit(fper='morgan', # doctest: +SKIP
... similarity_metric='jaccard')
>>> st.fit(ms) # doctest: +SKIP
>>> train, valid, test = st.split(ratio=(70, 15, 15)) # doctest: +SKIP
"""
ratio = self._split_sizes(ratio)
nums = self._cluster_cumsum()
res = pd.Series(np.nan, index=nums.index, name='split')
for i in range(len(ratio)):
lower = 0 if i == 0 else sum(ratio[:i])
upper = ratio if i == len(ratio) else sum(ratio[:i + 1])
res[nums[(nums > lower) & (nums <= upper)].index] = i
res = res.sort_index()
res = self.clusters.to_frame().join(res, on='clusters')['split']
return (res == i for i in range(len(ratio)))
[docs] def k_fold(self, n_folds):
""" Returns k-fold cross-validated folds with thresholded similarity.
Args:
n_folds (int):
The number of folds to provide.
Returns:
generator[(pd.Series, pd.Series)]:
The splits in series.
"""
folds = self.split((1,) * n_folds)
return ((~fold, fold) for fold in folds)
def _split_sizes(self, ratio):
""" Calculate the sizes of the splits """
tot = sum(ratio)
return [self.n_instances_ * rat / tot for rat in ratio]
@staticmethod
@returns_pairs
def _pairs_from_sim_mat(sim_mat):
sim_mat = triu(sim_mat, k=1).todok()
return list(sim_mat.items())
@returns_pairs
def _pairs_from_fps(self, fps):
""" Pairs from fps. """
if self.memory_optimized:
pairs = self._pairs_from_fps_mem_opt(fps)
else:
pairs = self._pairs_from_fps_mem_intensive(fps)
return pairs
def _pairs_from_fps_mem_intensive(self, fps):
""" Fast single process, memory intensive implementation of pairs. """
LOGGER.debug('Generating pairs using memory intensive technique.')
dist_mat = squareform(pdist(fps, self.similarity_metric))
sim_mat = 1 - dist_mat # similarity is 1 - distance
sim_mat[sim_mat <= self.min_threshold] = 0
return self._pairs_from_sim_mat(sim_mat)
def _pairs_from_fps_mem_opt(self, fps):
""" Fast, multi-processed and memory efficient pairs."""
def slice_generator(low, high, width, end=False):
""" Checkerboard indices for the upper triangle of a matrix. """
while low < high:
res = (low, low + width if low + width < high else high)
if end:
yield res
else:
gen = slice_generator(low, high, width, end=True)
# py2 compat
# yield from ((res, j) for j in
# slice_generator(low, high, width, end=True))
for slice_ in ((res, j) for j in gen):
yield slice_
low += width
size = len(fps)
fps = fps.values
f = partial(_above_minimum, inp=fps, threshold=self.min_threshold,
metric=self.similarity_metric, size=size)
slices = slice_generator(0, len(fps), self.block_width)
if self.n_jobs == 1:
# single processed
LOGGER.debug('Generating pairs using memory optimized technique.')
return sum((f(slice_) for slice_ in slices), [])
else:
# multiprocessed
LOGGER.debug('Generating pairs using memory optimized technique '
'with %s processes', self.n_jobs)
# py2 compat
# with multiprocessing.Pool(self.n_jobs) as p:
# return sum(p.map(f, [(i, j) for i, j in slices]), [])
p = multiprocessing.Pool(self.n_jobs)
res = sum(p.map(f, [(i, j) for i, j in slices]), [])
p.close()
return res
def _cluster(self, pairs):
""" Assign instances to clusters. """
LOGGER.debug('Generating clusters with %s close pairs', len(pairs))
clustered = np.arange(self.n_instances_)
for i, j in pairs.values.tolist(): # faster as list
i_clust, j_clust = clustered[i], clustered[j]
if i_clust < j_clust:
clustered[clustered == j_clust] = i_clust
else:
clustered[clustered == i_clust] = j_clust
return clustered
def _optimal_thresh(self):
""" Calculate the optimal threshold for the given max pair density. """
def f(threshold):
pairs = self.pairs_.loc[self.pairs_.sim > threshold, ('i', 'j')]
res = pd.Series(self._cluster(pairs))
return np.abs(res.value_counts().max() -
self.largest_cluster * self.n_instances_)
self.threshold_ = minimize_scalar(f,
bounds=(self.min_threshold, 1),
method='bounded').x
LOGGER.info('Optimal threshold: %s', self.threshold_)
gd_prs = self.pairs_.loc[self.pairs_.sim > self.threshold_, ('i', 'j')]
self.clusters = pd.Series(self._cluster(gd_prs),
index=self.index,
name='clusters')
return self.threshold_
[docs] def visualize_similarities(self, subsample=5000, ax=None):
""" Plot a histogram of similarities, with the threshold plotted.
Args:
subsample (int):
For a large dataset, subsample the number of compounds to
consider.
ax (matplotlib.axis):
Axis to make the plot on.
Returns:
matplotlib.axes
"""
if not ax:
ax = plt.gca()
if subsample and len(self.fps) > subsample:
fps = self.fps.sample(subsample)
else:
fps = self.fps
dists = 1 - squareform(pdist(fps, self.similarity_metric))
dists = (dists - np.identity(dists.shape[0])).flatten()
hist = ax.hist(dists, bins=50)
ax.vlines(self.threshold_, 0, max(hist[0]))
ax.set_xlabel('similarity')
return ax
[docs] def visualize_space(self, dim_reducer='tsne', dim_red_kw=None,
subsample=5000, ax=None, plt_kw=None):
""" Plot chemical space using a transformer
Args:
dim_reducer (str or sklearn object):
Technique to use to reduce fingerprint space.
dim_red_kw (dict):
Keyword args to pass to dim_reducer.
subsample (int):
for a large dataset, subsample the number of compounds to
consider.
ax (matplotlib.axis):
Axis to make the plot on.
plt_kw (dict):
Keyword args to pass to the plot.
Returns:
matplotlib.axes
"""
if dim_red_kw is None:
dim_red_kw = {}
if plt_kw is None:
plt_kw = {}
if isinstance(dim_reducer, str):
if dim_reducer not in ('tsne', 'mds'):
msg = 'Dimensionality reducer {} not available'.format(
dim_reducer)
raise NotImplementedError(msg)
reducers = {'tsne': TSNE, 'mds': MDS}
dim_reducer = reducers[dim_reducer](**dim_red_kw)
two_d = dim_reducer.fit_transform(self.fps)
if not ax:
ax = plt.gca()
return ax.scatter(two_d[:, 0], two_d[:, 1], **plt_kw)