1.python使用shell命令,并将结果转 dataframe
import os
os.system('hdfs dfs -du / > tmp.csv')
df=pd.read_csv('tmp.csv',encoding='utf-8',header=-1)
2.python切换工作目录
import os
os.chdir(r'/data/pyspark/program/auto_report/zhengyuan/hdfs_size')
3.python对于一整块的数据,切分赋值(例如shell命令打印出来的只有一列)
list_temp = []
for i,j in enumerate(df.loc[:,0]): # i 是从这个迭代器里面拿索引, j 是这个对应的值,这样迭代会方便很多
list_temp = j.split()
df.loc[i, 'path'] = list_temp[2]
df.loc[i, 'parent_path'] = '/'
df.loc[i, 'size'] = round(int(list_temp[0])/1024/1024/1024,1)
df.loc[i, 'levels'] = 1
4.python排序的问题
size=df.sort_values(by=['size'], ascending=False) #多个字段排序,by 对应的list里面多传几个字段就好
contrast=contrast1.sort_values(['levels', 'size'],ascending=[True, False]) #多列不同顺序排序
5.python筛选问题
a=size[(size['levels']==m) & (size['size']>100)]['path'] #多条件筛选后取对应字段的值
6.python嵌套循环,while循环,以及赋值问题
m=2
while (a.empty-1):
for n in a:
os.system('hdfs dfs -du ' + n + ' > tmp.csv')
df=pd.read_csv('tmp.csv',encoding='utf-8',header=-1)
if len(df)<100:
list_temp = []
for i,j in enumerate(df.loc[:,0]):
list_temp = j.split()
df.loc[i, 'path'] = list_temp[2]
df.loc[i, 'parent_path'] = n
df.loc[i, 'size'] = round(int(list_temp[0])/1024/1024/1024,1)
df.loc[i, 'levels'] = m
df.drop([0],axis=1,inplace=True)
size1=df.sort_values(by=['size'], ascending=False)
size=size.append(size1)
a=size[(size['levels']==m) & (size['size']>100)]['path']
m=m+1
7.python链接MySQL数据库,把 dataframe 的数据导入MySQL里
from sqlalchemy import create_engine
engine = create_engine("mysql+mysqldb://{}:{}@{}/{}".format('datateam', 'RJ4f84S4RjfRxRMm', '172.16.8.4', 'warehouse'))
con = engine.connect()
size.to_sql(name='dj_disk_used_di', con=con, if_exists='append', index=False) #if_exists 有三个参数,增加,删表增加之类的,用的时候再查一下
8.python读取MySQL数据库里的数据,存为 dataframe
import pymysql
cf = configparser.ConfigParser()
cf.read(os.path.join(os.path.dirname('/data/pyspark/program/auto_report/zhengyuan/test/'),"dj.conf"))
#cf.read(os.path.join(os.path.dirname("./"),"dj.conf"))
def get_pymysql_conn(db_name):
#db_name = 'mysql_' + db_name
cf = configparser.ConfigParser()
cf.read(os.path.join(os.path.dirname('/data/pyspark/program/auto_report/zhengyuan/test/'),"dj.conf"))
return pymysql.connect(host=cf.get(db_name, 'host'),
charset="utf8",
port=int(cf.get(db_name, 'port')),
db=cf.get(db_name, 'db'),
user=cf.get(db_name, 'user'),
passwd=cf.get(db_name, 'passwd'))
db = get_pymysql_conn('mysql_bg')
#其实上面整个部分就是连接数据库,和 db=pymysql.connect(...)一样
to_sql = "select * from dj_disk_used_di where ds='" + today +"'" #today和seven_day 都是前面已经赋值的变量,现在就是拼接一下sql语句
se_sql = "select * from dj_disk_used_di where ds='" + seven_day +"'"
to_df = pd.read_sql(to_sql, con=db)
se_df = pd.read_sql(se_sql, con=db)
db.close()
8.路径拼接,读取配置文件
import configparser
cf = configparser.ConfigParser()
cf.read(os.path.join(os.path.dirname('/data/pyspark/program/auto_report/zhengyuan/test/'),"dj.conf"))
cf.read(os.path.join(os.path.dirname(__file__),"dj.conf")) #其实应该这么用,os.path.dirname(__file__) 就是现在工作的目录
9.对nan值的赋值,其实其他条件的赋值也是一样
a.loc[a['parent_path'].isnull(),'parent_path']=a[a['parent_path'].isnull()]['se_parent_path']
a.loc[a['levels'].isnull(),'levels']=a[a['levels'].isnull()]['se_levels']
a.loc[a['size'].isnull(),'size']=0
a.loc[a['se_size'].isnull(),'se_size']=0
10.改变 dataframe 列的排列顺序
order=['levels', 'path', 'parent_path', 'size', 'se_size', 'size_chg']
contrast=contrast[order]
11.改变 dataframe 列的名称
se_df.columns=['path', 'se_parent_path', 'se_size', 'se_levels']
12.删除 dataframe 列
contrast1=a.drop(['se_parent_path', 'se_levels',], axis=1, inplace=False)
13.画双柱形图
%matplotlib notebook #魔术命令,可以改变图片显示大小
import matplotlib.pyplot as plt
b=contrast[contrast['levels'] == 1]
m=b['path']
list_00 = b['size']
list_01 = b['se_size']
x = list(range(len(m)))
name_list=m
total_width, n = 0.8, 2
width = total_width / n
plt.bar(x, list_00, width=width, label='to_size', tick_label=m, fc='y')
for i in range(len(x)):
x[i] = x[i] + width
plt.bar(x, list_01, width=width, label='se_size', fc='r')
plt.legend()
plt.show()
14.绘图坐标轴旋转,显示完整的坐标轴标签
import pylab as pl
pl.xticks(rotation=60)
plt.legend()
plt.show()
plt.tight_layout()
15.写入csv(带表头/不带表头),读取csv数据
filename = '/Users/zy/Desktop/work/other/algo/data/biz.csv'
os.remove(filename)
i=0
for code in all_code:
i=i+1
if i%60 == 0:
time.sleep(65)
else:
df0=pro.fina_mainbz(ts_code=code, start_date='20170101', type='P')
df = df0[df0['end_date'] == df0['end_date'].max()]
if os.path.exists(filename):
df.to_csv(filename, mode='a', header=None)
else:
df.to_csv(filename)
main_bus = pd.read_csv(filename)
- 本质上,都是找index(Series)或者key(字典)与数据表本身的行或者列之间的对应关系,在groupby之后所使用的聚合函数都是对每个group的操作,聚合函数操作完之后,再将其合并到一个DataFrame中,每一个group最后都变成了一列(或者一行)。
a3=a2.groupby('ts_code')['bz_item'].apply(list).reset_index()
df.groupby(['key1','key2'])[['data2']].mean() #有表头
df.groupby(['key1','key2'])['data2'].mean() #无表头
17.python 实现sql的窗口函数
- 可以参考上面的group一起看,针对每个group之后的分组使用lambda函数
df = pd.DataFrame([['a', 1, 'c'], ['a', 3, 'a'], ['a', 2, 'b'],
['c', 3, 'a'], ['c', 2, 'b'], ['c', 1, 'c'],
['b', 2, 'b'], ['b', 3, 'a'], ['b',1, 'c']], columns=['A', 'B', 'C'])
df.groupby('A',sort = 0).apply(lambda x:x.sort_values('B',ascending = 1).head(1)).reset_index(drop = 1) #如果不加head(1) 就是全部的,不会只取第一行
17.python 筛选 nan 值
order_detail1[order_detail1['order_item_id'].isnull()]
网友评论