美文网首页
逆向分析依云所写expect.py

逆向分析依云所写expect.py

作者: 养猫的老鼠 | 来源:发表于2017-04-21 14:18 被阅读0次

    作为一个python小白,在查找expect的替代中发现了依云用python写得expect.py。阅读后,发现很多都不明白,遂决定一一百度分析一下,另外还需要更改功能,遂记录之。

    依云:expect.py

    import fcntl #文件锁
    import os
    import re
    from select import select #异步socket
    import signal
    import sys
    import termios #for tty io control
    import tty
    
    class Expect:
      buffer = b''
      encoding = sys.getdefaultencoding()
    
      def __init__(self):
        self.master, self.slave = os.openpty()
        self._old_winch = signal.signal(signal.SIGWINCH, self._size_changed)
        self._size_changed()
        try:
          os.set_inheritable(self.slave, True)
        except AttributeError:
          pass
    
      def spawn(self, cmd, *, executable=None, preexec_fn=None):
        if executable is None:
          executable = cmd[0]
    
        pid = os.fork()
        if pid == 0:
          slave = self.slave
          os.dup2(slave, 0)
          os.dup2(slave, 1)
          os.dup2(slave, 2)
          os.close(slave)
          if preexec_fn is not None:
            preexec_fn()
          os.execvp(executable, cmd)
        elif pid > 0:
          os.close(self.slave)
          return
        else:
          raise RuntimeError('os.fork() return negative value?')
    
      def expect_line(self, regex):
        if isinstance(regex, str):
          regex = regex.encode(self.encoding)
        if isinstance(regex, bytes):
          regex = re.compile(regex)
        if not isinstance(getattr(regex, 'pattern', b''), bytes):
          raise TypeError('regex must be one of type str, bytes or regex of bytes')
    
        fd = self.master
        outfd = sys.stdout.fileno()
        buf = self.buffer
        try:
          while True:
            if buf:
              data = buf
            else:
              data = os.read(fd, 4096)
              os.write(outfd, data)
            data = data.split(b'\n', 1)
            if len(data) == 2:
              data, buf = data
            else:
              data = data[0]
            if regex.search(data):
              return
        finally:
          self.buffer = buf
    
      def _read_write(self, data):
        fd = self.master
        outfd = sys.stdout.fileno()
        to_write = len(data)
        while to_write:
          try:
            r, w, e = select([fd], [fd], [])
            if r:
              rdata = os.read(fd, 4096)
              os.write(outfd, rdata)
            if w:
              written = os.write(fd, data)
              data = data[written:]
              to_write -= written
          except InterruptedError:
            continue
    
      def send(self, data):
        if isinstance(data, str):
          data = data.encode(self.encoding)
        self._read_write(data)
    
      def interact(self):
        pfd = sys.stdin.fileno()
        cfd = self.master
        outfd = sys.stdout.fileno()
        old = termios.tcgetattr(pfd)
        old_signal = signal.signal(signal.SIGCHLD, self._interact_leave)
    
        tty.setraw(pfd)
        try:
          while True:
            try:
              r, w, e = select([pfd, cfd], [], [])
              for fd in r:
                data = os.read(fd, 4096)
                other = outfd if fd == cfd else cfd
                os.write(other, data)
            except InterruptedError:
              continue
            except OSError as e:
              if e.errno == 5: # Input/output error: no clients run
                break
              else:
                raise
        except ChildProcessError:
          pass
        finally:
          termios.tcsetattr(pfd, termios.TCSAFLUSH, old)
          signal.signal(signal.SIGCHLD, old_signal)
    
      def _interact_leave(self, signum, sigframe):
        while os.waitid(os.P_ALL, 0, os.WEXITED | os.WNOHANG) is not None:
          pass
    
      def _size_changed(self, signum=0, sigframe=None):
        size = fcntl.ioctl(sys.stdin.fileno(), termios.TIOCGWINSZ, '1234')
        fcntl.ioctl(self.master, termios.TIOCSWINSZ, size)
    
      def __del__(self):
        if self._old_winch:
          signal.signal(signal.SIGWINCH, self._old_winch)
    

    解剖

    依云自己给的用法是

    import sys, expect
    def main(host):
        p = expect.Expect()
        p.spawn(['ssh',host])
        p.expect_line('# ')
        p.send('ls\n')
        p.interact()
    if __name__ == '__main__':
        host = '10.17.17.17'
        if len(sys.argv) == 2:
            host += sys.argv[1]
        main(host)
    

    这里需要手动键入ssh host的密码,且不退出当前连接。功能与我的需要有差异,就从这里入手分析,然后修改。

    1. 分析下类的初始化中都做了什么
      os.openpty 用于打开一个新的伪终端时,返回pty与tty的文件描述符。

    tty是各种类型设备的终端设备的简称
    pty是虚拟终端。远程telnet登陆的就是pty

    signal.signal用于绑定信号处理函数,修改进程收到信号以后的行为
    SIGWINCH是窗口信号改变的信号
    _size_changed中调用fcntl.ioctl(sys.stdin.fileno,termios.TIOCGWINSZ,'1234')获取当前窗口的长宽。
    os.set_inheritable用于为文件标识符设定可继承标识

    fcntl
    参考

    2.spawn方法开启任务
    spawn函数声明中位置参数与关键字参数混合的方法,起初查找时,只知道a,*a这种用法,不明白放在中间什么意思,后来查找后才明白,*号只是起到分隔的作用,*号本身并没有被用到。
    python位置参数,关键字参数
    os.fork创建子进程
    python fork
    os.dup2重定向pty输入输出至当前输入输出
    os.execvp开启一个新进程,执行相应的cmd
    python dup2
    3.expect_line方法接收关键信息
    isinstance判断变量类型,支持判断"int,float,str,bool,list,tuple,dict,bytes"。其中bytes类型是python3.x开始支持
    python bytes

    相关文章

      网友评论

          本文标题:逆向分析依云所写expect.py

          本文链接:https://www.haomeiwen.com/subject/itydzttx.html