让繁琐的工作自动化

作者: 随想的翅膀 | 来源:发表于2019-06-09 19:50 被阅读0次

    背景

    最近琢磨着对全网流量的协议进行一次统计分析,主要是从网络五元素(源地址、源端口、目的地址、目的端口、协议号)来分析服务器开放了哪些服务,以此为参考对防火墙策略进行调整。数据分析流程概括起来就是:数据读写、处理计算、分析建模,可视化。为求分析的准确性,需要对网络上的流量进行一段时间的采样,例如每间隔10分钟采集一次会话表,按此频率采集2-3天,然后把采集到的文件进行合并,对合并后的数据进行筛选和清洗,最后把处理好的数据导入数据库分析。这些繁琐的工作如果交给人工去做显然不太现实,自动化和大数据的分析当然是Python语言的强项。
    工欲善其事,必先利其器,首先下载python编译器,安装jupyter--python的交互式编程环境,导入SSH运维库paramiko,excel操作库openpyxl,大数据分析库pandas,剩下的工作就是如何利用这些利器。

    自动化的实现

    1.从防火墙采集数据,输出到外部文件。

    import paramiko,time,sys,shelve
    #模块功能:采集数据,输出文件
    #创建SSH对象
    def ssh_con(ip,port,username,password):
        ssh=paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(ip,port,username,password)
        shell=ssh.invoke_shell()
        return shell
    #创建SSHShell对象
    def exec_cmd(shell,cmd,period=10):
        shell.send(cmd)
        time.sleep(period)
        result=shell.recv(3000000)
        result=result.decode('utf-8')
        result1=result
    #shell每次接收到的最大缓存是2097152
        while len(result1)==2097152:
            result1=shell.recv(3000000)
            print('缓存不够,继续读取exec_cmd: '+str(len(result1)))
            time.sleep(4)
            result=result+result1.decode('utf-8')
        return result
    def main():
        connStr=shelve.open('connStr')
        ip,port,username,password,command=connStr['parameter']
        connStr.close()
        shell=ssh_con(ip,port,username,password)
        exec_cmd(shell,"enable\n")
        result=exec_cmd(shell,password+'\n')
        print(result)
        i=0
        while True:
            i+=1
    #判断ssh会话是否退出,如果退出重新建立会话
            if shell.exit_status_ready():
                shell = ssh_con(ip, port, username, password)
                exec_cmd(shell, "enable\n")
                exec_cmd(shell, password + '\n')
            if len(sys.argv)>1:
                result=exec_cmd(shell,"show conn \n",int(sys.argv[1]))
            else:
                result = exec_cmd(shell, command)
            print("session%s.txt总长度:"%i+str(len(result))+'bytes')
            file = open("d:\\output_conn\\sessions%s.txt" % i, 'w')
            file.write(result)
            file.close()
    if __name__=="__main__":
        main()
    

    2.文件合并,经过几天的数据采集,产生了上百个文本文件,利用多线程把这几百个文件合并成一个excel文件,在插入excel的过程中需要用到正则表达式对数据进行格式化处理。

    import openpyxl,os,re,sys,time,threading
    #模块功能:数据预处理
    #多线程读写文件
    def session_process(session):
        ipRegex=re.compile(r'\d+\.\d+\.\d+\.\d+')
        singleSession=session.split(' ')
        newSession=''
        i=0
        for field in singleSession:
            if ipRegex.match(field)!=None:
                field=re.sub(':',' ',field)
            newSession+=field+' ' 
        newSession=re.sub(',','',newSession) 
        return newSession.split(' ')
    def loadFile(fileName):
        print(f"线程名称:{threading.current_thread().name}文件名称:{fileName}开始读取时间:{time.strftime('%Y-%m-%d %H:%M:%S')}")
        proRegex=re.compile(r'^TCP|^UDP')
        session=[]
        file=open(path+fileName)
        rows=0
        for line in file.readlines():
            proMo=proRegex.search(line)
            if proMo is not None:
                session=session_process(line)
            rows+=1
            lock.acquire()
            ws.append(session)
            lock.release()
        file.close()
        print(f"线程名称:{threading.current_thread().name}文件名称:{fileName}读取{rows}行,读取结束时间:{time.strftime('%Y-%m-%d %H:%M:%S')}")
    #创建一个线程类
    class loadFileThread(threading.Thread):
        def __init__(self,target,args):
            super().__init__()
            self.target=target
            self.args=args
        def run(self):
            self.target(*self.args)
    #创建一个线程锁,由于excel文件写入不能并行写入,所以在写入excel的过程中启用线程锁
    lock=threading.Lock()
    path="f:\\sessions\\"  
    wb=openpyxl.Workbook()
    ws=wb.active
    def main():
        print(f"主线程开始时间:{time.strftime('%Y-%m-%d %H:%M:%S')}")
        startTime=time.time()
        thread_list=[] 
        for fileName in os.listdir(path):
            t=loadFileThread(target=loadFile,args=(fileName,))
    #         t=threading.Thread(target=loadFile,args=(fileName,))
            thread_list.append(t)
        for thread in thread_list:
              thread.start()
        for thread in thread_list:
              thread.join()
        wb.save("f:\\sessions\\output.xlsx")
        print(f"主线程结束时间:{time.strftime('%Y-%m-%d %H:%M:%S')}")   
    if __name__=='__main__':
        main()
    

    在这种I/O密集型的任务中,采用多线程进行并发操作,读写文件的时间相当于读写最长文件的耗时,程序的执行效率瞬间有了质的提升

    线程名称:Thread-66文件名称:sessions59.txt开始读取时间:2019-06-06 16:58:30
    线程名称:Thread-67文件名称:sessions60.txt开始读取时间:2019-06-06 16:58:30
    线程名称:Thread-68文件名称:sessions61.txt开始读取时间:2019-06-06 16:58:30
    线程名称:Thread-55文件名称:sessions48.txt读取5390行,读取结束时间:2019-06-06 16:58:40
    线程名称:Thread-57文件名称:sessions50.txt读取5058行,读取结束时间:2019-06-06 16:58:40
    线程名称:Thread-59文件名称:sessions52.txt读取6050行,读取结束时间:2019-06-06 16:58:41

    3.利用pandas对海量的数据(637906条数据)进行分析仅需几秒的时间,最后把分析好的数据输出到excel文件,整个流程结束,除了采集数据需要几天的时间,分析数据,计算处理,数据建模只花了几分钟的时间。

    import pandas as pd
    import os
    import time
    #模块功能:分析数据,处理计算,数据建模
    #1.数据处理:打开数据文件
    print(f'数据分析开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
    with pd.ExcelFile('H:\\output_0606\\output1.xlsx') as xlsx1:
        df1=pd.read_excel(xlsx1)
    with pd.ExcelFile('H:\\output_0606\\output2.xlsx') as xlsx2:
        df2=pd.read_excel(xlsx2)
    with pd.ExcelFile('H:\\output_0606\\output3.xlsx') as xlsx3:
        df3=pd.read_excel(xlsx3)
    #1.数据处理:合并数据文件
    frames=[df1,df2,df3]
    result=pd.concat(frames)
    #数据清洗:移除数据头和尾的空字符
    for column in result.columns:
        result[column] =[str(value).strip() for value in result[column]] 
    #数据建模:按照源IP,源端口,目的IP,目的端口统计建模
    portList=[]
    portFile=open('h:\\output\\trustPortList.txt')
    portList=portFile.read().split('\n')   
    reDstPortList=[]
    reSrcPortList=[]
    for port in portList:
        reDstPortList.append(result[result['dst_port'] == port])
        reSrcPortList.append(result[result['src_port'] == port])
    reDstPort=pd.concat(reDstPortList)
    reSrcPort=pd.concat(reSrcPortList)
    reDstPort.groupby(['src_area','src_ip','dst_ip','dst_port',]).count().flag.to_excel('H:\\output_0606\\0604_0606Dst.xlsx')
    reSrcPort.groupby(['src_area','src_ip','src_port','dst_ip',]).count().flag.to_excel('H:\\output_0606\\0604_0606Src.xlsx')
    print(f'数据分析结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
    

    结语

    本文通过一个具体的案例实践介绍了一些python的用法,由于作者水平有限,代码并非最优的解决方案,但有一点Python让我更加关注问题的解决方案而非语言本身。本文作者是搞运维的,所以更在乎的是如何利用python提高工作效率,实现自动化运维。

    相关文章

      网友评论

        本文标题:让繁琐的工作自动化

        本文链接:https://www.haomeiwen.com/subject/kogwxctx.html