美文网首页
ssh远程:paramiko模块使用

ssh远程:paramiko模块使用

作者: LittleJessy | 来源:发表于2019-01-24 19:22 被阅读0次
# _*_ encoding:utf-8 _*_
import paramiko
import os

class UiAutoRemote(object):

    # 定义日志名称
    timer = time.strftime("%Y%m%d%H%M%S", time.localtime(time.time()))
    logname = os.path.join('/project/log/', 'uiauto_log' + timer + ".txt")

    def __init__(self,host,port,username,password):
        self.host = host
        self.port = port
        self.username = username
        self.password = password

    # 得到一个连接的对象
    def connect(self):
        'this is use the paramiko connect the host,return conn'
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        try:
            #    ssh.connect(host,username='root',allow_agent=True,look_for_keys=True)
            ssh.connect(self.host, username=self.username, password=self.password, allow_agent=True)
            print('%s:连接成功', host)
            success_msg = host + ':连接成功'+ '\n'
            self.writelog(success_msg, self.logname)
            return ssh
        except:
            print(host + ':连接失败')
            error_msg = host + ':连接失败' + '\n'
            self.writelog(error_msg, self.logname)
            return None

    # 获取设置的命令
    def command(self,args, outpath):
        'this is get the command the args to return the command'
        cmd = '%s %s' % (args,outpath)
        return cmd

    # 执行命令
    def exec_commands(self,conn, cmd):
        'this is use the conn to excute the cmd and return the results of excute the command'
        channel = conn.invoke_shell()
        channel.send(cmd+'\n')
        start_time = 0
        start_cmd = '执行命令:' + cmd
        self.writelog(start_cmd, self.logname)
        while True:
            time.sleep(0.5)
            start_time += 0.5
            res = channel.recv(65535).decode('utf8')
            if len(res) > 0:
                print(res.strip('\n'))
                start_time = 0
                #因为执行的是自动化脚本,需要动态显示执行过程,因此需要写入日志文件中
                self.writelog(res, self.logname)

            if res.endswith('# ') or res.endswith('$ '):
                finish = "task finish"
                self.writelog(finish, self.logname)
                break
            # 判断连接超时
            if start_time > 3000:
                self.writelog("连接超时", self.logname)
                break

    # 执行命令得到结果
    def excutor(self, outpath, args):
        conn = self.connect()
        if not conn:
            return [host, None]
        # exec_commands(conn, 'chmod +x %s' % outpath)
        cmd = self.command(args, outpath)
        self.exec_commands(conn, cmd)


    # 文件上传到远程服务器
    def sftp_upload(self,file,localpath, remotepath):
        ssh = paramiko.Transport(self.host, self.port)
        ssh.connect(username=self.username, password=self.password)
        sftp = paramiko.SFTPClient.from_transport(ssh)

        localfile = os.path.join(localpath, file)
        remotefile = remotepath + file

        try:
            if os.path.exists(localfile):
                print(file +"上传中...")
                self.writelog((file +"上传中..." + '\n'), self.logname)
                sftp.put(localfile, remotefile)
                self.writelog((file + ":上传成功"), self.logname)
            else:
                print('文件不存在')
        except Exception:
            print("上传失败:User name or password error or uploaded file does not exist")
            self.writelog((file +"上传失败:User name or password error or uploaded file does not exist"), self.logname)

        ssh.close()

    # 下载文件到本机
    def sftp_download(self,file,localpath, remotepath):
        client = paramiko.Transport((self.host,self.port))
        client.connect(username=self.username, password=self.password)
        sftp = paramiko.SFTPClient.from_transport(client)

        localfile = os.path.join(localpath, file)
        remotefile = remotepath + file
        print('localfile:', localfile)
        print('remotefile:', remotefile)

        try:
            # 使用paramiko下载文件到本机
            sftp.get(remotefile, localfile)
            print("[%s]下载成功",file)
        except Exception as e:
            print("[%s]下载失败:%s",(file,e))

        client.close()


    # 写入日志文件
    def writelog(self,res,logname):
        with open(logname, 'a+', encoding='utf-8') as f:
            f.write(res)


if __name__ == '__main__':

    # 远程服务配置
    host = '10.107.17.59' 
    port = 22
    username = 'username'
    password = '123456'

    # 获取当前路径
    cur_path = os.path.abspath('..')

    # 远程脚本路径
    remote_script_path = '/AutoTest/'

    # 本地测试包所在路径
    local_app_path = cur_path + '\\upload\\app_auto\\'
    remote_app_path = '/AutoTest/app/'

    # 设置platform文件所在路径
    local_platform_path = cur_path + '\\public\\'
    remote_platform_path = '/AutoTest/'

    # 设置下载报告路径
    local_report_path = cur_path + '\\upload\\app_auto\\'
    remote_report_path = '/AutoTest/files/report/'

    #创建一个对象
    ui = UiAutoRemote(host,port,username,password)

    # 执行自动化测试用例
    run_script = remote_script_path + 'run_all_test.py'
    cmd = 'PYTHONIOENCODING=utf-8 python3'
    ui.excutor(run_script,cmd)

    # 上传文件
    up_file = 'app-debug-v3.9.9.apk.1'
    ui.sftp_upload(up_file,local_app_path,remote_app_path)

    # 下载测试报告
    down_file = '2019-01-23 16_23_02result.html'
    ui.sftp_download(down_file, local_report_path, remote_report_path)

相关文章

网友评论

      本文标题:ssh远程:paramiko模块使用

      本文链接:https://www.haomeiwen.com/subject/zcmujqtx.html