# -*- coding: utf-8 -*- """ A collection of noise reduction algorithms """ import numpy as np import scipy import pandas as pd from matplotlib import pyplot from mpl_toolkits.mplot3d import Axes3D from sklearn.decomposition import PCA from tqdm import trange from numba import njit, prange from persistence import persistence from decorators import multi_input @njit(parallel=True) def compute_gradient(S, X, sigma, omega): """Compute gradient of F as in arxiv:0910.5947""" gradF = np.zeros(S.shape) d = X.shape[1] N = X.shape[0] M = S.shape[0] for j in range(0,M): normsSX = np.square(S[j] - X).sum(axis=1) normsSS = np.square(S[j] - S).sum(axis=1) expsSX = np.exp(-1/(2*sigma**2)*normsSX) expsSS = np.exp(-1/(2*sigma**2)*normsSS) SX, SS = np.zeros(d), np.zeros(d) for k in range(0,d): SX[k] = -1/(N*sigma**2) * np.sum((S[j] - X)[:,k] * expsSX) SS[k] = omega/(M*sigma**2) * np.sum((S[j] - S)[:,k] * expsSS) gradF[j] = SX + SS return gradF @multi_input def top_noise_reduction(X, n=100, omega=0.2, fraction=0.1, plot=False): """ Topological denoising algorithm as in arxiv:0910.5947 Parameters ---------- X: dataframe(n_datapoints, n_features): Dataframe containing the data n: int, optional, default 100 Number of iterations omega: float, optional, default 0.2 Strength of the repulsive force between datapoints fraction: float between 0 and 1, optional, default 0.1 The fraction of datapoints from which the denoised dataset is constructed plot: bool, optional, default False When true plot the dataset and homology each iteration """ N = X.shape[0] inds = np.random.choice(N, round(fraction*N), replace=False) S = X.iloc[inds] sigma = X.stack().std() c = 0.02*np.max(scipy.spatial.distance.cdist(X, X, metric='euclidean')) iterator = trange(0, n, position=0, leave=True) iterator.set_description("Topological noise reduction") for i in iterator: gradF = compute_gradient(S.to_numpy(), X.to_numpy(), sigma, omega) if i == 0: maxgradF = np.max(np.sqrt(np.square(gradF).sum(axis=1))) S = S + c* gradF/maxgradF if plot: fig = pyplot.figure() ax = Axes3D(fig) ax.scatter(X[0],X[1],X[2],alpha=0.1) ax.scatter(S[0],S[1],S[2]) pyplot.show() return S, inds @njit(parallel=True) def density_estimation(X,k): """Estimates density at each point""" N = X.shape[0] densities = np.zeros(N) for i in prange(N): distances = np.sum((X[i] - X)**2, axis=1) densities[i] = 1/np.sort(distances)[k] return densities @multi_input def density_filtration(X, k, fraction): """ Returns the points which are in locations with high density Parameters ---------- X: dataframe(n_datapoints, n_features): Dataframe containing the data k: int Density is estimated as 1 over the distance to the k-th nearest point fraction: float between 0 and 1 The fraction of highedst density datapoints that are returned """ print("Applying density filtration...", end=" ") N = X.shape[0] X["densities"] = density_estimation(X.to_numpy().astype(np.float),k) X = X.nlargest(int(fraction * N), "densities") X = X.drop(columns="densities") print("done") return X @njit(parallel=True) def compute_averages(X, r): """Used in neighborhood_average""" N = X.shape[0] averages = np.zeros(X.shape) for i in prange(N): distances = np.sum((X[i] - X)**2, axis=1) neighbors = X[distances < r] averages[i] = np.sum(neighbors, axis=0)/len(neighbors) return averages @multi_input def neighborhood_average(X, r): """ Replace each point by an average over its neighborhood Parameters ---------- X: dataframe(n_datapoints, n_features): Dataframe containing the data r : float Points are averaged over all points within radius r """ print("Applying neighborhood average...", end=" ") averages = compute_averages(X.to_numpy().astype(np.float),r) print("done") result = pd.DataFrame(data=averages,index=X.index) return result @multi_input def z_cutoff(X, z_cutoff): """ Remove outliers with a high Z-score Parameters ---------- X: dataframe(n_datapoints, n_features): Dataframe containing the data z_cutoff : float The Z-score at which points are removed """ z=np.abs(scipy.stats.zscore(np.sqrt(np.square(X).sum(axis=1)))) result = X[(z < z_cutoff)] print(f"{len(X) - len(result)} datapoints with Z-score above {z_cutoff}" + "removed") return result @multi_input def PCA_reduction(X, dim): """ Use principle component analysis to reduce the data to a lower dimension Also print the variance explained by each component Parameters ---------- X: dataframe(n_datapoints, n_features): Dataframe containing the data dim : int The number of dimensions the data is reduced to """ pca = PCA(n_components=dim) pca.fit(X) columns = [i for i in range(dim)] X = pd.DataFrame(pca.transform(X), columns=columns, index=X.index) print("PCA explained variance:") print(pca.explained_variance_ratio_) return X