美文网首页
Python学习:基于paramiko的交互式shell

Python学习:基于paramiko的交互式shell

作者: khaos | 来源:发表于2020-12-25 19:58 被阅读0次

    问题

    我们希望在windows或者linux上,可以使用ssh连接远程服务器,并且能够执行一般的linux命令,同时还要能够有一定交互能力。比如需要切换root用户,输入管理员用户密码等。

    解决方案

    Python的paramiko库,可以支持。但实现也有挺多问题需要考虑。主要有以下几点内容:

    • 命令执行,能够获取命令结果
    • 命令执行,能够支持指定的预期结果
    • 命令执行,要有超时能力,不能挂死。

    用法1:

    ssh = Ssh2Client('127.0.0.1', 22)
    ssh.connect('root', 'xxxx')
    
    result = ssh.exec('pwd')
    print(result)
    

    用法2:

    ssh = Ssh2Client('127.0.0.1', 22)
    ssh.connect('user-name', 'user-pwd')
    ssh.exec('sudo su -', 'Password:')
    ssh.exec('root-pwd')
    ssh.exec('ls -l /var/root')
    

    代码实现如下所示:

    import re
    import socket
    import time
    
    import paramiko
    
    
    class Ssh2Client:
        def __init__(self, host: str, port: int):
            self.__host = host
            self.__port = port
            self.__ssh = None
            self.__channel = None
    
        def __del__(self):
            self.__close()
    
        def connect(self, user: str, pwd: str) -> bool:
            self.__close()
    
            self.__ssh = paramiko.SSHClient()
            self.__ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            self.__ssh.connect(self.__host, username=user, password=pwd, port=self.__port)
            return True
    
        def exec(self, cmd: str, end_str=('# ', '$ ', '? ', '% '), timeout=30) -> str:
            if not self.__channel:
                self.__channel = self.__ssh.invoke_shell()
                time.sleep(0.020)
                self.__channel.recv(4096).decode()
    
            if cmd.endswith('\n'):
                self.__channel.send(cmd)
            else:
                self.__channel.send(cmd + '\n')
    
            return self.__recv(self.__channel, end_str, timeout)
    
        def __recv(self, channel, end_str, timeout) -> str:
            result = ''
            out_str = ''
            max_wait_time = timeout * 1000
            channel.settimeout(0.05)
            while max_wait_time > 0:
                try:
                    out = channel.recv(1024 * 1024).decode()
    
                    if not out or out == '':
                        continue
                    out_str = out_str + out
    
                    match, result = self.__match(out_str, end_str)
                    if match is True:
                        return result.strip()
                    else:
                        max_wait_time -= 50
                except socket.timeout:
                    max_wait_time -= 50
    
            raise Exception('recv data timeout')
    
        def __match(self, out_str: str, end_str: list) -> (bool, str):
            result = out_str
            for it in end_str:
                if result.endswith(it):
                    return True, result
            return False, result
    
        def __close(self):
            if not self.__ssh:
                return
            self.__ssh.close()
            self.__ssh = None
    

    讨论

    我们使用用法1,输出类似如下格式(用户名做了处理):

    pwd
    /Users/user1
    [xxx:~] xxx%
    

    这里有两个问题要处理,命令和命令提示符都一并输出了。我们需要做特殊处理。处理方法也很简单,第一行和最后一行直接去掉即可,同时考虑命令无结果输出的处理即可。修改exec方法如下:

        def exec(self, cmd: str, end_str=('# ', '$ ', '? ', '% '), timeout=30) -> str:
            # ...
            
           # 以下是新增的代码
            result = self.__recv(self.__channel, end_str, timeout)
            begin_pos = result.find('\r\n')
            end_pos = result.rfind('\r\n')
            if begin_pos == end_pos:
                return ''
            return result[begin_pos + 2:end_pos]
    

    现状输出结果就正确了,这个就是我们想要的结果。

    /Users/user1
    

    偶然的机会,测试输入的命令比较长,取得结果又不正确了。比如执行

    ssh.exec('echo 0111111111111111111111111111111111112222222222222222222222222222233333333333333333333333444444444444444')
    

    输出结果,有的服务器,会返回下面这个奇怪的结果:

    2222222222233333333333333333333333444444444444444
    ls: 0111111111111111111111111111111111112222222222222222222222222222233333333333333333333333444444444444444: No such file or directory
    

    这个问题的原因,主要是因为ssh2输出时使用了窗口的概念,默认是80*24,输入命令如果超过长度,会自动换行,导致处理命令结果时出错,主要修改invoke_shell函数调用方式,代码如下:

    def exec(self, cmd: str, end_str=('# ', '$ ', '? ', '% '), timeout=30) -> str:
        if not self.__channel:
            # width和height,可以指定输出窗口的大小。
            self.__channel = self.__ssh.invoke_shell(term='xterm', width=4096)
            time.sleep(0.020)
            self.__channel.recv(4096).decode()
       
       # ....
    

    命令窗口的宽度设置为4096,输出结果就对了。不过如果命令超过4096,输出还会出问题,根据实际情况,设置width的值,可以设置更大一点。

    ls: 0111111111111111111111111111111111112222222222222222222222222222233333333333333333333333444444444444444: No such file or directory
    

    到目前为止,已经基本够用了。但是还有一个问题,试用ls命令返回的结果,有一些奇怪的转义字符,比如:

    �[1;34mCacheVolume�[0m  �[1;34mbin�[0m          �[1;34mboot�[0m         �[1;34mdev�[0m          �[1;34metc�[0m          �[1;34mhome�[0m         �[1;34mlib�[0m          �[1;36mlinuxrc�[0m      �[1;34mlost+found�[0m   �[1;34mmnt�[0m          �[1;36mnfs�[0m          �[1;34mopt�[0m          �[1;
    

    这个问题的处理比较麻烦,处理了很久也不行。开始使用字符串分析处理,忽略这些转义符,但总是有点麻烦,处理不够彻底。后来终于在网上搜索到,这个转义是叫ansi转义码,可以在term上显示彩色。网上给出了正则处理方法:

        # 7-bit C1 ANSI sequences
        self.__ansi_escape = re.compile(r'''
                \x1B  # ESC
                (?:   # 7-bit C1 Fe (except CSI)
                [@-Z\\-_]
                |     # or [ for CSI, followed by a control sequence
                \[
                [0-?]*  # Parameter bytes
                [ -/]*  # Intermediate bytes
                [@-~]   # Final byte
            )
        ''', re.VERBOSE)
    
     def __match(self, out_str: str, end_str: list) -> (bool, str):
            result = self.__ansi_escape.sub('', out_str)
    
            for it in end_str:
                if result.endswith(it):
                    return True, result
            return False, result
    

    正则表达式比较复杂,有兴趣的同学自己分析这个re。

    到目前为止,Ssh2Client已经基本实现,而且比较实用。可以处理绝大多数问题,实现也不复杂,比网上很多帖子都讲得全一些,代码可以直接拿来用。

    但也不并是全部问题都能解决。比如有的linux系统,命令输出会出现换行,中文处理,都容易会导致输出结果获取不正确。不过,这些基本就是字符串分析和解码问题了。

    完整的代码如下:

    import re
    import socket
    import time
    
    import paramiko
    
    
    class Ssh2Client:
        """
        ssh2客户端封装
        """
    
        def __init__(self, host: str, port: int):
            """
            功能描述:构造函数
    
            :param host: 主机地址
            :param port: 端口信息
            """
            self.__host = host
            self.__port = port
            self.__ssh = None
            self.__channel = None
    
            # 7-bit C1 ANSI sequences
            self.__ansi_escape = re.compile(r'''
                    \x1B  # ESC
                    (?:   # 7-bit C1 Fe (except CSI)
                    [@-Z\\-_]
                    |     # or [ for CSI, followed by a control sequence
                    \[
                    [0-?]*  # Parameter bytes
                    [ -/]*  # Intermediate bytes
                    [@-~]   # Final byte
                )
            ''', re.VERBOSE)
    
        def __del__(self):
            self.__close()
    
        def connect(self, user: str, pwd: str) -> bool:
            """
            功能描述:连接远程主机
            :param user: 用户名
            :param pwd:  用户密码
            :return: 连接成功还是失败
            """
            self.__close()
    
            self.__ssh = paramiko.SSHClient()
            self.__ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            self.__ssh.connect(self.__host, username=user, password=pwd, port=self.__port)
            return True
    
        def exec(self, cmd: str, end_str=('# ', '$ ', '? ', '% ', '#', '$', '?', '%'), timeout=5) -> str:
            """
            功能描述:执行命令
            :param cmd: shell命令
            :param end_str: 提示符
            :param timeout: 超时间时间
            :return: 命令执行结果
            """
            if not self.__channel:
                self.__channel = self.__ssh.invoke_shell(term='xterm', width=4096, height=48)
                time.sleep(0.1)
                self.__channel.recv(4096).decode()
    
            if cmd.endswith('\n'):
                self.__channel.send(cmd)
            else:
                self.__channel.send(cmd + '\n')
    
            if end_str is None:
                return self.__recv_without_end(cmd, timeout)
    
            result = self.__recv(end_str, timeout)
            begin_pos = result.find('\r\n')
            end_pos = result.rfind('\r\n')
            if begin_pos == end_pos:
                return ''
            return result[begin_pos + 2:end_pos]
    
        def __recv_without_end(self, cmd, timeout):
            """
            功能描述:接收命令执行结果,不进行任何比对。
            :param cmd: 命令
            :param timeout:超时时间,最长等待3秒
            :return: 命令执行结果
            """
            out_str = ''
            if timeout > 3:
                timeout = 3
            max_wait_time = timeout * 1000
            self.__channel.settimeout(0.1)
            while max_wait_time > 0.0:
                try:
                    start = time.perf_counter()
                    out = self.__channel.recv(1024 * 1024).decode()
                    out_str = out_str + out
                    max_wait_time = max_wait_time - (time.perf_counter() - start) * 1000
                except socket.timeout:
                    max_wait_time -= 100
            return out_str
    
        def __recv(self, end_str, timeout) -> str:
            """
            功能描述:根据提示符,接收命令执行结果
            :param end_str: 预期结果结尾
            :param timeout: 超时间
            :return: 命令执行结果,去除命令输入提示符
            """
            out_str = ''
            max_wait_time = timeout * 1000
            self.__channel.settimeout(0.05)
            while max_wait_time > 0.0:
                start = time.perf_counter()
                try:
                    out = self.__channel.recv(1024 * 1024).decode()
    
                    if not out or out == '':
                        continue
                    out_str = out_str + out
    
                    match, result = self.__match(out_str, end_str)
                    if match is True:
                        return result.strip()
                    else:
                        max_wait_time = max_wait_time - (time.perf_counter() - start) * 1000
                except socket.timeout:
                    max_wait_time = max_wait_time - (time.perf_counter() - start) * 1000
    
            raise Exception('recv data timeout')
    
        def __match(self, out_str: str, end_str: list) -> (bool, str):
            result = self.__ansi_escape.sub('', out_str)
    
            for it in end_str:
                if result.endswith(it):
                    return True, result
            return False, result
    
        def __close(self):
            if not self.__ssh:
                return
            self.__ssh.close()
            self.__ssh = None
    

    相关文章

      网友评论

          本文标题:Python学习:基于paramiko的交互式shell

          本文链接:https://www.haomeiwen.com/subject/ituinktx.html