背景
最近琢磨着对全网流量的协议进行一次统计分析,主要是从网络五元素(源地址、源端口、目的地址、目的端口、协议号)来分析服务器开放了哪些服务,以此为参考对防火墙策略进行调整。数据分析流程概括起来就是:数据读写、处理计算、分析建模,可视化。为求分析的准确性,需要对网络上的流量进行一段时间的采样,例如每间隔10分钟采集一次会话表,按此频率采集2-3天,然后把采集到的文件进行合并,对合并后的数据进行筛选和清洗,最后把处理好的数据导入数据库分析。这些繁琐的工作如果交给人工去做显然不太现实,自动化和大数据的分析当然是Python语言的强项。
工欲善其事,必先利其器,首先下载python编译器,安装jupyter--python的交互式编程环境,导入SSH运维库paramiko,excel操作库openpyxl,大数据分析库pandas,剩下的工作就是如何利用这些利器。
自动化的实现
1.从防火墙采集数据,输出到外部文件。
import paramiko,time,sys,shelve
#模块功能:采集数据,输出文件
#创建SSH对象
def ssh_con(ip,port,username,password):
ssh=paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ip,port,username,password)
shell=ssh.invoke_shell()
return shell
#创建SSHShell对象
def exec_cmd(shell,cmd,period=10):
shell.send(cmd)
time.sleep(period)
result=shell.recv(3000000)
result=result.decode('utf-8')
result1=result
#shell每次接收到的最大缓存是2097152
while len(result1)==2097152:
result1=shell.recv(3000000)
print('缓存不够,继续读取exec_cmd: '+str(len(result1)))
time.sleep(4)
result=result+result1.decode('utf-8')
return result
def main():
connStr=shelve.open('connStr')
ip,port,username,password,command=connStr['parameter']
connStr.close()
shell=ssh_con(ip,port,username,password)
exec_cmd(shell,"enable\n")
result=exec_cmd(shell,password+'\n')
print(result)
i=0
while True:
i+=1
#判断ssh会话是否退出,如果退出重新建立会话
if shell.exit_status_ready():
shell = ssh_con(ip, port, username, password)
exec_cmd(shell, "enable\n")
exec_cmd(shell, password + '\n')
if len(sys.argv)>1:
result=exec_cmd(shell,"show conn \n",int(sys.argv[1]))
else:
result = exec_cmd(shell, command)
print("session%s.txt总长度:"%i+str(len(result))+'bytes')
file = open("d:\\output_conn\\sessions%s.txt" % i, 'w')
file.write(result)
file.close()
if __name__=="__main__":
main()
2.文件合并,经过几天的数据采集,产生了上百个文本文件,利用多线程把这几百个文件合并成一个excel文件,在插入excel的过程中需要用到正则表达式对数据进行格式化处理。
import openpyxl,os,re,sys,time,threading
#模块功能:数据预处理
#多线程读写文件
def session_process(session):
ipRegex=re.compile(r'\d+\.\d+\.\d+\.\d+')
singleSession=session.split(' ')
newSession=''
i=0
for field in singleSession:
if ipRegex.match(field)!=None:
field=re.sub(':',' ',field)
newSession+=field+' '
newSession=re.sub(',','',newSession)
return newSession.split(' ')
def loadFile(fileName):
print(f"线程名称:{threading.current_thread().name}文件名称:{fileName}开始读取时间:{time.strftime('%Y-%m-%d %H:%M:%S')}")
proRegex=re.compile(r'^TCP|^UDP')
session=[]
file=open(path+fileName)
rows=0
for line in file.readlines():
proMo=proRegex.search(line)
if proMo is not None:
session=session_process(line)
rows+=1
lock.acquire()
ws.append(session)
lock.release()
file.close()
print(f"线程名称:{threading.current_thread().name}文件名称:{fileName}读取{rows}行,读取结束时间:{time.strftime('%Y-%m-%d %H:%M:%S')}")
#创建一个线程类
class loadFileThread(threading.Thread):
def __init__(self,target,args):
super().__init__()
self.target=target
self.args=args
def run(self):
self.target(*self.args)
#创建一个线程锁,由于excel文件写入不能并行写入,所以在写入excel的过程中启用线程锁
lock=threading.Lock()
path="f:\\sessions\\"
wb=openpyxl.Workbook()
ws=wb.active
def main():
print(f"主线程开始时间:{time.strftime('%Y-%m-%d %H:%M:%S')}")
startTime=time.time()
thread_list=[]
for fileName in os.listdir(path):
t=loadFileThread(target=loadFile,args=(fileName,))
# t=threading.Thread(target=loadFile,args=(fileName,))
thread_list.append(t)
for thread in thread_list:
thread.start()
for thread in thread_list:
thread.join()
wb.save("f:\\sessions\\output.xlsx")
print(f"主线程结束时间:{time.strftime('%Y-%m-%d %H:%M:%S')}")
if __name__=='__main__':
main()
在这种I/O密集型的任务中,采用多线程进行并发操作,读写文件的时间相当于读写最长文件的耗时,程序的执行效率瞬间有了质的提升
线程名称:Thread-66文件名称:sessions59.txt开始读取时间:2019-06-06 16:58:30
线程名称:Thread-67文件名称:sessions60.txt开始读取时间:2019-06-06 16:58:30
线程名称:Thread-68文件名称:sessions61.txt开始读取时间:2019-06-06 16:58:30
线程名称:Thread-55文件名称:sessions48.txt读取5390行,读取结束时间:2019-06-06 16:58:40
线程名称:Thread-57文件名称:sessions50.txt读取5058行,读取结束时间:2019-06-06 16:58:40
线程名称:Thread-59文件名称:sessions52.txt读取6050行,读取结束时间:2019-06-06 16:58:41
3.利用pandas对海量的数据(637906条数据)进行分析仅需几秒的时间,最后把分析好的数据输出到excel文件,整个流程结束,除了采集数据需要几天的时间,分析数据,计算处理,数据建模只花了几分钟的时间。
import pandas as pd
import os
import time
#模块功能:分析数据,处理计算,数据建模
#1.数据处理:打开数据文件
print(f'数据分析开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
with pd.ExcelFile('H:\\output_0606\\output1.xlsx') as xlsx1:
df1=pd.read_excel(xlsx1)
with pd.ExcelFile('H:\\output_0606\\output2.xlsx') as xlsx2:
df2=pd.read_excel(xlsx2)
with pd.ExcelFile('H:\\output_0606\\output3.xlsx') as xlsx3:
df3=pd.read_excel(xlsx3)
#1.数据处理:合并数据文件
frames=[df1,df2,df3]
result=pd.concat(frames)
#数据清洗:移除数据头和尾的空字符
for column in result.columns:
result[column] =[str(value).strip() for value in result[column]]
#数据建模:按照源IP,源端口,目的IP,目的端口统计建模
portList=[]
portFile=open('h:\\output\\trustPortList.txt')
portList=portFile.read().split('\n')
reDstPortList=[]
reSrcPortList=[]
for port in portList:
reDstPortList.append(result[result['dst_port'] == port])
reSrcPortList.append(result[result['src_port'] == port])
reDstPort=pd.concat(reDstPortList)
reSrcPort=pd.concat(reSrcPortList)
reDstPort.groupby(['src_area','src_ip','dst_ip','dst_port',]).count().flag.to_excel('H:\\output_0606\\0604_0606Dst.xlsx')
reSrcPort.groupby(['src_area','src_ip','src_port','dst_ip',]).count().flag.to_excel('H:\\output_0606\\0604_0606Src.xlsx')
print(f'数据分析结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
结语
本文通过一个具体的案例实践介绍了一些python的用法,由于作者水平有限,代码并非最优的解决方案,但有一点Python让我更加关注问题的解决方案而非语言本身。本文作者是搞运维的,所以更在乎的是如何利用python提高工作效率,实现自动化运维。
网友评论