美文网首页
JS散度评估特征距离

JS散度评估特征距离

作者: 夹竹桃的下午 | 来源:发表于2024-06-26 13:39 被阅读0次

直接上代码:

import pandas as pd
import numpy as np
import os,sys
from sklearn.preprocessing import StandardScaler
from scipy.stats import shapiro, ttest_ind, wilcoxon,levene,kruskal,mannwhitneyu
from statistics import median
from statistics import mean
from scipy.stats import normaltest
import itertools
import random
from scipy.spatial.distance import jensenshannon
from scipy.stats import gaussian_kde
import matplotlib.pyplot as plt
import pickle
os.makedirs('./Js_plot', exist_ok=True)
scaler = StandardScaler()
work="./"
tsv_files = [f for f in os.listdir(work) if f.endswith('LE5X.csv')]
tsv_files = [i  for i in tsv_files if i.find('Frag2023_FragArm2023')==-1]
def js_divergence_scipy(p, q,p_SampleID,q_SampleID,name):
    data1 =np.array(np.nan_to_num(p), dtype=np.float64)
    data2=np.array(np.nan_to_num(q), dtype=np.float64)
    kde1 = gaussian_kde(data1)
    kde2 = gaussian_kde(data2)
    x_range = np.linspace(min(data1.min(), data2.min()), max(data1.max(), data2.max()), 1000)
    prob_dist1 = kde1.evaluate(x_range)
    prob_dist2 = kde2.evaluate(x_range)
    out=jensenshannon(prob_dist1, prob_dist2)
    plt.clf()
    plt.figure(figsize=(10, 6))
    plt.plot(x_range, prob_dist1, 'r', label=p_SampleID)
    plt.plot(x_range, prob_dist2, 'b', label=q_SampleID)
    plt.xlabel('Feature Value')
    plt.ylabel('Probability Density')
    plt.title(name.split('.')[0]+' Probability Distribution')
    plt.legend()
    F=p_SampleID+"_"+q_SampleID+"_"+name.split('.')[0]
    plt.savefig('./Js_plot/JSD_' + F + '.png')
    if(out >=0):
        F=p_SampleID+"_"+q_SampleID+"_"+name.split('.')[0]
        with open('./Js_plot/'+F,'w+') as f:
            f.write(str(out)+"\n")
        return out
    return  None
def paired_delta(df1, df2):
    allsum=0
    # 删除元素
    list1,list2=def_na(df1, df2)
    m=len(list1)
    n=len(list2)
    if(m*m==0 or m==0 or n==0 or m!=n):
        return []
    for i in range(0,m):
        allsum=computer(list1[i],list2[i])+allsum
    d=allsum/(m)
    return abs(d)
def def_na(list1,list2):
    nan_idx1 = [i for i, v in enumerate(list1) if np.isnan(v)]
    nan_idx2 = [i for i, v in enumerate(list2) if np.isnan(v)]
    # 合并两个列表的缺失索引
    nan_idx = list(set(nan_idx1 + nan_idx2))
    # 删除元素
    list1 = [v for i, v in enumerate(list1) if i not in nan_idx]
    list2 = [v for i, v in enumerate(list2) if i not in nan_idx]
    return list1,list2
def tmpdffun(tmpdf):
    scaler = StandardScaler()
    grouptmp = tmpdf[['SampleID', 'group']].copy()
    features = tmpdf.drop(['SampleID', 'group'], axis=1)
    scaled_features = scaler.fit_transform(features)
    scaled_df = pd.concat([grouptmp, pd.DataFrame(scaled_features, columns=features.columns)], axis=1)
    return scaled_df
groupname_list=sys.argv[1]
groupFlag=sys.argv[2] #'TypeOfCollection'
#group1=pd.read_table(work+"group1.TypeOfCollection.info.list")
group1=pd.read_table(work+groupname_list)
if 'LabID' not in group1.columns.to_list():
    group1['LabID']=group1['SampleID'].map(lambda x:x.split("-")[0])

group1=group1[['SampleID',groupFlag,'LabID']]  ###LabID 是具体配对编号
print(set(group1[groupFlag]))
group1name=groupFlag
group1.columns=['SampleID','group','LabID']
mygroup_name=group1
mygroup_name_list=list(set(mygroup_name['group']))[0]
mygroup_name_dict_a=[]
mygroup_name_dict_b=[]
def paired_dict(tmpdf):
    a=tmpdf.query("group==@mygroup_name_list")['SampleID'].to_list()[0]
    b=tmpdf.query("group!=@mygroup_name_list")['SampleID'].to_list()[0]
    mygroup_name_dict_a.append(a)
    mygroup_name_dict_b.append(b)
mygroup_name.groupby("LabID").apply(paired_dict)

mygroup_name=mygroup_name[['SampleID','group']]
mydict={}
for name in tsv_files:
    print(name)
    mydict[name]=[]
    datafile=work+name
    frature=pd.read_csv(datafile,header=0)
    df=pd.merge(mygroup_name,frature,on=['SampleID'])
    df=df.dropna(axis=1)
    #df=df.dropna(axis=1,thresh=round( df.shape[0] / 10) )  ##列NA限制10%
    df1=tmpdffun(df)  ## scale
   # df1=df
    groupA=df1.query("SampleID ==@mygroup_name_dict_a")
    groupA['SampleID'] = pd.Categorical(groupA['SampleID'], categories=mygroup_name_dict_a, ordered=True)
    groupA = groupA.sort_values('SampleID')
    groupB=df1.query("SampleID ==@mygroup_name_dict_b")
    groupB['SampleID'] = pd.Categorical(groupB['SampleID'], categories=mygroup_name_dict_b, ordered=True)
    groupB = groupB.sort_values('SampleID')
    for lengroup in range(groupB.shape[0]):
        data_b=np.array(groupA.iloc[lengroup,2:])
        b_SampleID=groupA.iloc[lengroup,0]
        data_p=np.array(groupB.iloc[lengroup,2:])
        p_SampleID=groupB.iloc[lengroup,0]
        JSdis=js_divergence_scipy(data_b,data_p,b_SampleID,p_SampleID,name)
        if(JSdis is None):
            pass
            #mydict[name].append(0)
        else:
            mydict[name].append(JSdis)
name=[]
value=[]
for k,v in mydict.items():
    for j in v:
        name.append(k.replace(".LE5X.csv","").split(".")[-1])
        value.append(j)
newdf=pd.DataFrame({'Categories': name, 'Values': value})
newdf['Categories']=newdf['Categories'].map(lambda x:x.replace("854TF_OCF_Tcell","griffin_ocf"))
newdf['group']=groupFlag
newdf.to_csv(work+"JS_"+groupFlag,index=False,sep="\t")

相关文章

网友评论

      本文标题:JS散度评估特征距离

      本文链接:https://www.haomeiwen.com/subject/hikscjtx.html