美文网首页
CTFZone2018-Signature server

CTFZone2018-Signature server

作者: Robin_Tan | 来源:发表于2018-07-24 14:44 被阅读0次

    翻译自:https://github.com/p4-team/ctf/tree/master/2018-07-21-ctfzone-quals/crypto_signature

    给了server.py的代码:

    #!/usr/bin/python
    import sys
    import hashlib
    import logging
    import SocketServer
    import base64
    from flag import secret
    from checksum_gen import WinternizChecksum
    
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    ch = logging.StreamHandler(sys.stdout)
    ch.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
    logger.addHandler(ch)
    
    
    HASH_LENGTH=32
    CHECKSUM_LENGTH=4
    MESSAGE_LENGTH=32
    CHANGED_MESSAGE_LENGTH=MESSAGE_LENGTH+CHECKSUM_LENGTH
    BITS_PER_BYTE=8
    show_flag_command="show flag"+(MESSAGE_LENGTH-9)*"\xff"
    admin_command="su admin"+(MESSAGE_LENGTH-8)*"\x00"
    PORT = 1337
    
    def extend_signature_key(initial_key):
      full_sign_key=str(initial_key)
      for i in range(0,255):
        for j in range(0,CHANGED_MESSAGE_LENGTH):
          full_sign_key+=hashlib.sha256(full_sign_key[j*HASH_LENGTH+i*CHANGED_MESSAGE_LENGTH*HASH_LENGTH:(j+1)*HASH_LENGTH+i*CHANGED_MESSAGE_LENGTH*HASH_LENGTH]).digest()
      return full_sign_key
    class Signer:
      
      def __init__(self):
        with open("/dev/urandom","rb") as f:
          self.signkey=f.read(HASH_LENGTH*CHANGED_MESSAGE_LENGTH)
        self.full_sign_key=extend_signature_key(self.signkey)
        self.wc=WinternizChecksum()
        self.user_is_admin=False
    
      def sign_byte(self,a,ind):
        assert(0<=a<=255)
        signature=self.full_sign_key[(CHANGED_MESSAGE_LENGTH*a+ind)*HASH_LENGTH:(CHANGED_MESSAGE_LENGTH*a+ind+1)*HASH_LENGTH]
        return signature
    
      def sign(self,data):
        decoded_data=base64.b64decode(data)
        if len(decoded_data)>MESSAGE_LENGTH:
          return "Error: message too large"
        if decoded_data==show_flag_command or decoded_data==admin_command:
          return "Error: nice try, punk"
        decoded_data+=(MESSAGE_LENGTH-len(decoded_data))*"\xff"
        decoded_data+=self.wc.generate(decoded_data)
        signature=""
        for i in range(0, CHANGED_MESSAGE_LENGTH):
          signature+=self.sign_byte(ord(decoded_data[i]),i)
        return base64.b64encode(decoded_data)+','+base64.b64encode(signature)
      
      def execute_command(self,data_sig):
        (data_with_checksum, signature)=map(base64.b64decode,data_sig.split(','))
        data=data_with_checksum[:MESSAGE_LENGTH]
        data_checksummed=data+self.wc.generate(data)
        if data_checksummed!=data_with_checksum:
          return "Error: wrong checksum!"
        signature_for_comparison=""
        for i in range(0, CHANGED_MESSAGE_LENGTH):
          signature_for_comparison+=self.sign_byte(ord(data_with_checksum[i]),i)
        if signature!=signature_for_comparison:
          return "Error: wrong signature!"
        if data==admin_command:
          self.user_is_admin=True
          return "Hello, admin"
        if data==show_flag_command:
          if self.user_is_admin:
            return "The flag is %s"%secret
          else:
            return "Only admin can get the flag\n"
        else:
          return "Unknown command\n"
    def process(data,signer):
      [query,params]=data.split(':')
      params=params.rstrip("\n")
      if query=="hello":
        return "Hi"
      elif query=="sign":
        return signer.sign(params)
      elif query=="execute_command":
        return signer.execute_command(params)
      else:
        return "bad query"
    
    class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
      
      def handle(self):
        signer=Signer()
        logger.info("%s client sconnected" % self.client_address[0])
        self.request.sendall("Welcome to the Tiny Signature Server!\nYou can sign any messages except for controlled ones\n")
        while True:
          data = self.request.recv(2048)
          try:
            ret = process(data,signer)
          except Exception:
            ret = 'Error'
          try:
            self.request.sendall(ret + '\n')
          except Exception:
            break
    
      def finish(self):
        logger.info("%s client disconnected" % self.client_address[0])
    
    
    class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
      pass
    
    if __name__ == '__main__':
      server = ThreadedTCPServer(('0.0.0.0', PORT), ThreadedTCPRequestHandler)
      server.allow_reuse_address = True
      server.serve_forever()
    

    根据代码,这个服务器有三个功能

    • 发送 hi信息,没什么用
    • 让服务器对我们发送的数据进行签名
    • 执行带有签名的指令

    有两条比较重要的命令,切换为admin以及请求flag。这两条指令具体是:

    show_flag_command = "show flag" + (MESSAGE_LENGTH - 9) * "\xff"
    admin_command = "su admin" + (MESSAGE_LENGTH - 8) * "\x00"
    

    我们可以具体看下签名是如何生成的:

    def sign(self, data):
        decoded_data = base64.b64decode(data)
        if len(decoded_data) > MESSAGE_LENGTH:
            return "Error: message too large"
        if decoded_data == show_flag_command or decoded_data == admin_command:
            return "Error: nice try, punk"
        decoded_data += (MESSAGE_LENGTH - len(decoded_data)) * "\xff"
        decoded_data += self.wc_generate(decoded_data)
        signature = ""
        for i in range(0, CHANGED_MESSAGE_LENGTH):
            signature += self.sign_byte(ord(decoded_data[i]), i)
        return base64.b64encode(decoded_data) + ',' + base64.b64encode(signature)
    

    需要注意的是对敏感命令的检查是在padding之前发生的,这也就意味着我们可以直接发送show flag字符串,这将会通过检查,服务器会对其进行padding 并 签名。我们可以比较简单的获得签了名的show flag指令。
    比较难获得的是切换为admin身份的指令,因为其padding为\x00,因此没有办法像上面一样绕过检查。
    因此我们只需要伪造对该条指令的签名。
    当我们深入签名生成算法,我们可以观察到两点:

    • 原始的输入会使用\xff进行pad,并加上Winternitz checksum
    • 签名是逐字节生成的,每个字节对应的签名和字节的位置以及值有关。

    此外服务器会依次对checksum和签名进行检查,并告诉我们具体是哪步出错。

    第一步需要伪造checksum,可以看到checksum的长度为4字节,如果要直接爆破 的 话2^32还是有点大。但如果我们尝试让服务器签名一些输入,并观察他的返回值,会发现checksum的后两个字节永远是\x00\x00,因此只需要爆破2个字节。
    因此我们可以在我们想要执行的指令后加上穷举的两个字节,两个\x00 以及一些随机字节作为签名,并发送,如果服务器返回 incorrect signature 就说明了checksum 猜对了。
    代码如下:

    def find_checkum_conflict(s, wanted_msg, signature):
        print("Looking for checksum conflict")
        for a in range(256):
            for b in range(256):
                forged = wanted_msg + chr(a) + chr(b) + "\x00\x00"
                result = execute_command(s, forged, signature)
                if 'wrong signature' in result:
                    print('Found checksum conflict for', a, b)
                    return a, b
    

    第二部需要伪造正确的签名,在一开始的分析中,我们提到签名是逐字节生成的,这意味着如果我们发送admin_command,把器最后一个字节替换掉,我们将得到前31字节的正确签名。同时由于后两个字节恒为\x00,因此这两个字节的签名也是正确的。
    这样,我们只缺中间3个字节的签名。由于checksum只有两个字节,而前面的message有32个字节,显然会有很多冲突,所以我们可以爆破找到一个输入和我们想要的命令有同样的checksum。同时我们可以让这些输入的结尾都为'\x00',这样当我们找到一个checksum冲突的时候,同时也获得了\x00 对应的签名。
    代码实现如下:

    def get_proper_signature(checksum_we_need, s, original_signature_chunks):
        print("Looking for signature suffix for conflicting checksum")
        i = 0
        while True:
            msg = long_to_bytes(i)
            pad = 32 - len(msg)
            msg = msg + ('a' * (pad - 1)) + "\x00"
            result = sign(s, msg)
            ext_msg, signature = map(base64.b64decode, result.split(","))
            if ext_msg[32:36] == checksum_we_need:
                forged_signature_chunks = chunk(signature, 32)
                return "".join(original_signature_chunks[:-5] + forged_signature_chunks[-5:])
            i += 1
    

    这样我们就获得了正确的签名,即可执行两条指令,获得flag:

    def main():
        url = "crypto-02.v7frkwrfyhsjtbpfcppnu.ctfz.one"
        port = 1337
        s = nc(url, port)
        receive_until_match(s, "You can sign any messages except for controlled ones")
        receive_until(s, "\n")
        msg = "show flag"
        show_flag_command = sign(s, msg)
        msg = "su admin" + (32 - 9) * "\x00"
        almost_admin_command = sign(s, msg)
        print(almost_admin_command)
        msg, signature = map(base64.b64decode, almost_admin_command.split(","))
        signature_chunks = chunk(signature, 32)
        wanted_msg = 'su admin\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
        a, b = find_checkum_conflict(s, wanted_msg, signature)
        checksum = chr(a) + chr(b) + "\x00\x00"
        forged_msg = wanted_msg + checksum
        signature = get_proper_signature(checksum, s, signature_chunks)
        print(execute_command(s, forged_msg, signature))
        send(s, 'execute_command:' + show_flag_command)
        interactive(s)
    
    
    main()
    

    相关文章

      网友评论

          本文标题:CTFZone2018-Signature server

          本文链接:https://www.haomeiwen.com/subject/kiwemftx.html