import scanpy as sc
import scipy.sparse
import pandas as pd
import anndata
import numpy as np
import collections
np.random.seed(1234)
[docs]
def get_pseudobulk_matrix(adata, cluster_key = 'leiden', method = 'sum'):
'''
Constructs pseudobulk by features matrix given the cluster key.
Parameters:
- adata (AnnData): An AnnData object containing the sc count matrix.
- cluster_key (str, optional): The key for the cluster key from which the pseudobulk matrix is constructed. Default is "leiden".
- method: method to aggregate the cells:
- sum: sums the feature counts across cells
- mean: takes the mean of the feature counts across cells
Returns:
- Pandas dataframe in th shape of pseudobulk x feature.
'''
pseudobulk_by_feature_df = pd.DataFrame(index = np.squeeze(np.asarray(adata.var.values)))
#iterate over the clusteter ids, subset the matrix per clusters, and sum/mean/median the values
for clust_id in set(adata.obs[cluster_key].values):
if method == "sum":
rname = "clust_" + str(clust_id)
adata.X[adata.obs[cluster_key] == clust_id].sum(0)
pseudobulk_by_feature_df[rname]= np.squeeze(np.asarray(adata.X[adata.obs[cluster_key] == clust_id].sum(0)))
if method == "mean":
rname = "clust_" + str(clust_id)
adata.X[adata.obs[cluster_key] == clust_id].sum(0)
pseudobulk_by_feature_df[rname]= np.squeeze(np.asarray(adata.X[adata.obs[cluster_key] == clust_id].mean(0)))
return pseudobulk_by_feature_df
[docs]
def get_pseudobulk_matrix_ext(adata_to_subset, adata_to_get_clusters, cluster_key="leiden" , method = 'sum'):
'''
Constructs pseudobulk by features matrix given the cluster key and AnnData objects.
Parameters:
- adata_to_subset (AnnData): An AnnData object containing the sc count matrix.
- adata_to_get_clusters (AnnData): An AnnData object containing the clusterong information for give cluster_key.
- cluster_key (str, optional): The key for the cluster key from which the pseudobulk matrix is constructed. Default is "leiden".
- method: method to aggregate the cells:
- sum: sums the feature counts across cells
- mean: takes the mean of the feature counts across cells
Returns:
- Pandas dataframe in th shape of pseudobulk x feature.
'''
pseudobulk_by_feature_df = pd.DataFrame(index = np.squeeze(np.asarray(adata_to_subset.var.values)))
#iterate over the clusteter ids, subset the matrix per clusters, and sum/mean/median the values
for clust_id in set(adata_to_get_clusters.obs[cluster_key].values):
if method == "sum":
rname = "clust_" + str(clust_id)
adata_to_subset.X[adata_to_get_clusters.obs[cluster_key] == clust_id].sum(0)
pseudobulk_by_feature_df[rname]= np.squeeze(np.asarray(adata_to_subset.X[adata_to_get_clusters.obs[cluster_key] == clust_id].sum(0)))
if method == "mean":
rname = "clust_" + str(clust_id)
adata_to_subset.X[adata_to_get_clusters.obs[cluster_key] == clust_id].sum(0)
pseudobulk_by_feature_df[rname]= np.squeeze(np.asarray(adata_to_subset.X[adata_to_get_clusters.obs[cluster_key] == clust_id].mean(0)))
return pseudobulk_by_feature_df
[docs]
def get_closest_prototype_to_pseudobulk(pseudobulk_prototype_centroid_euclidean_dis_df):
'''
Calculates the distances between prototypes and pseudobulks and returns the closest prototype to a pseudobulk.
Paramaters:
- pseudobulk_prototype_centroid_euclidean_dis_df (Pandas Dataframe): square matrix of pairwise distances between prorootype centroids and pseudobulk samples. Can be obtained by running plot_pca_dist_cent_heatmap() function.
Returns:
- {pseudobulk:closest_prototype} dictionary
'''
# define a dict to keep pbulk: closest bulk pairs
pbulk_closest_prototype_dict = {}
# we hardcoded "pbulk" suffix for centorid matrix. now first subset the df to pbulks
for pbulk in pseudobulk_prototype_centroid_euclidean_dis_df[pseudobulk_prototype_centroid_euclidean_dis_df.columns.str.endswith("pbulk") == True].index:
# order by the smallest distance
smallest_dist_ordered= pseudobulk_prototype_centroid_euclidean_dis_df.nsmallest(pseudobulk_prototype_centroid_euclidean_dis_df.shape[0], pbulk)
# get the closest protoype that is not pbulk
closest_prototype_id = smallest_dist_ordered.iloc[smallest_dist_ordered.index.str.endswith("pbulk") == False,:].index[0]
pbulk_closest_prototype_dict[pbulk]=closest_prototype_id
return(dict(sorted(pbulk_closest_prototype_dict.items())))
[docs]
def get_closest_pseubulk_to_prototype(pseudobulk_prototype_centroid_euclidean_dis_df):
'''
Calculates the distances between pseudobulks and prototypes and returns the closest pseudobulk to a prototype.
Paramaters:
- pseudobulk_prototype_centroid_euclidean_dis_df (Pandas Dataframe): square matrix of pairwise distances between prorootype centroids and pseudobulk samples. Can be obtained by running plot_pca_dist_cent_heatmap() function.
Returns:
- {prototype:closest_pseudobulk} dictionary
'''
# define a dict to keep pbulk: closest bulk pairs
prototype_closest_pbulk_dict = {}
# we hardcoded "pbulk" suffix for centorix matrix. now first subset the df to pbulks
for prototype in pseudobulk_prototype_centroid_euclidean_dis_df[pseudobulk_prototype_centroid_euclidean_dis_df.columns.str.endswith("pbulk") == False].index:
# order by the smallest distance
smallest_dist_ordered= pseudobulk_prototype_centroid_euclidean_dis_df.nsmallest(pseudobulk_prototype_centroid_euclidean_dis_df.shape[0], prototype)
# get the closest prototype that is not pbulk
closest_pbulk_id = smallest_dist_ordered.iloc[smallest_dist_ordered.index.str.endswith("pbulk") == True,:].index[0]
prototype_closest_pbulk_dict[bulk]=closest_pbulk_id
return(dict(sorted(prototype_closest_pbulk_dict.items())))
[docs]
def get_pseudobulk_to_prototype_distance(pseudobulk_prototype_centroid_euclidean_dis_df, pbulk_to_prototype=True):
'''
Transfers Euclidean distances to scaled similarities based on pseudobulk and bulk samples' perspectives.
This function takes a square matrix of pairwise Euclidean distances between bulk centroids and pseudobulk samples.
It then scales the distances to the minimum and returns a DataFrame representing the percentage contributions for each sample.
Parameters:
- pseudobulk_prototype_centroid_euclidean_dis_df (DataFrame): A square matrix of pairwise distances between bulk centroids and pseudobulk samples.
- pbulk_to_prototype (bool, optional): If True, the distances are determined by the prorotypes' perspective.
If False, the distances are determined by the pseudobulk samples' perspective. Default is True.
'''
if pbulk_to_prototype==False:
per_cont_df = pd.DataFrame()
per_cont_df.index = pseudobulk_prototype_centroid_euclidean_dis_df[pseudobulk_prototype_centroid_euclidean_dis_df.columns.str.endswith("pbulk") == True].index
for prototype in pseudobulk_prototype_centroid_euclidean_dis_df[pseudobulk_prototype_centroid_euclidean_dis_df.columns.str.endswith("pbulk") == False].index:
sub_df = pseudobulk_prototype_centroid_euclidean_dis_df[[prototype]].iloc[pseudobulk_prototype_centroid_euclidean_dis_df.index.str.endswith("pbulk") == True,:]
sub_df_scaled = sub_df.max() -sub_df
sub_df_scaled_perc = sub_df_scaled.div(sub_df_scaled.sum(axis=0), axis=1)*100
per_cont_df = pd.concat([per_cont_df,sub_df_scaled_perc], axis=1, join="inner")
if pbulk_to_prototype ==True:
per_cont_df = pd.DataFrame()
per_cont_df.index = pseudobulk_prototype_centroid_euclidean_dis_df[pseudobulk_prototype_centroid_euclidean_dis_df.columns.str.endswith("pbulk") == False].index
for prototype in pseudobulk_prototype_centroid_euclidean_dis_df[pseudobulk_prototype_centroid_euclidean_dis_df.columns.str.endswith("pbulk") == True].index:
sub_df = pseudobulk_prototype_centroid_euclidean_dis_df[[prototype]].iloc[pseudobulk_prototype_centroid_euclidean_dis_df.index.str.endswith("pbulk") == False,:]
sub_df_scaled = sub_df.max() -sub_df
sub_df_scaled_perc = sub_df_scaled.div(sub_df_scaled.sum(axis=0), axis=1)*100
per_cont_df = pd.concat([per_cont_df,sub_df_scaled_perc], axis=1, join="inner")
return per_cont_df