美文网首页
微信公众号加密Python3版

微信公众号加密Python3版

作者: Mrgz | 来源:发表于2019-11-21 00:50 被阅读0次

    个人用途(itchat不能用之后转公众号)
    官网的Python示例版本是python2
    改成python3版本,兼容中文
    python3.8.0下可用

    pip install pycryptodome==3.9.4

    WXBizMsgCrypt.py

    #!/usr/bin/env python
    #-*- encoding:utf-8 -*-
    
    """ 对公众平台发送给公众账号的消息加解密示例代码.
    @copyright: Copyright (c) 1998-2014 Tencent Inc.
    
    """
    # ------------------------------------------------------------------------
    
    import base64
    import string
    import random
    import hashlib
    import time
    import struct
    from Crypto.Cipher import AES
    import xml.etree.cElementTree as ET
    import socket
    from .import ierror
    
    """
    关于Crypto.Cipher模块,ImportError: No module named 'Crypto'解决方案
    请到官方网站 https://www.dlitz.net/software/pycrypto/ 下载pycrypto。
    下载后,按照README中的“Installation”小节的提示进行pycrypto安装。
    """
    class FormatException(Exception):
        pass
    
    def throw_exception(message, exception_class=FormatException):
        """my define raise exception function"""
        raise exception_class(message)
    
    class SHA1:
        """计算公众平台的消息签名接口"""
    
        def getSHA1(self, token, timestamp, nonce, encrypt):
            """用SHA1算法生成安全签名
            @param token:  票据
            @param timestamp: 时间戳
            @param encrypt: 密文
            @param nonce: 随机字符串
            @return: 安全签名
            """
            try:
                sortlist = [token, timestamp, nonce, encrypt]
                sortlist.sort()
                sha = hashlib.sha1()
                sha.update(("".join(sortlist)).encode())
                return ierror.WXBizMsgCrypt_OK, sha.hexdigest()
            except Exception as e:
                return ierror.WXBizMsgCrypt_ComputeSignature_Error, None
    
    
    class XMLParse:
        """提供提取消息格式中的密文及生成回复消息格式的接口"""
    
        # xml消息模板
        AES_TEXT_RESPONSE_TEMPLATE = """<xml>
    <Encrypt><![CDATA[%(msg_encrypt)s]]></Encrypt>
    <MsgSignature><![CDATA[%(msg_signaturet)s]]></MsgSignature>
    <TimeStamp>%(timestamp)s</TimeStamp>
    <Nonce><![CDATA[%(nonce)s]]></Nonce>
    </xml>"""
    
        def extract(self, xmltext):
            """提取出xml数据包中的加密消息
            @param xmltext: 待提取的xml字符串
            @return: 提取出的加密消息字符串
            """
            try:
                xml_tree = ET.fromstring(xmltext)
                encrypt  = xml_tree.find("Encrypt")
                return ierror.WXBizMsgCrypt_OK, encrypt.text
            except Exception as e:
                print(e)
                return ierror.WXBizMsgCrypt_ParseXml_Error, None, None
    
        def generate(self, encrypt, signature, timestamp, nonce):
            """生成xml消息
            @param encrypt: 加密后的消息密文
            @param signature: 安全签名
            @param timestamp: 时间戳
            @param nonce: 随机字符串
            @return: 生成的xml字符串
            """
            resp_dict = {
                        'msg_encrypt' : encrypt,
                        'msg_signaturet': signature,
                        'timestamp'    : timestamp,
                        'nonce'        : nonce,
                         }
            resp_xml = self.AES_TEXT_RESPONSE_TEMPLATE % resp_dict
            return resp_xml
    
    
    class PKCS7Encoder():
        """提供基于PKCS7算法的加解密接口"""
    
        block_size = 32
        def encode(self, bytes):
            """ 对需要加密的明文进行填充补位
            @param text: 需要进行填充补位操作的明文
            @return: 补齐明文字符串
            """
            text_length = len(bytes)
            # 计算需要填充的位数
            amount_to_pad = self.block_size - (text_length % self.block_size)
            if amount_to_pad == 0:
                amount_to_pad = self.block_size
            # 获得补位所用的字符
            pad = chr(amount_to_pad).encode()
            return bytes + pad * amount_to_pad
    
        def decode(self, decrypted):
            """删除解密后明文的补位字符
            @param decrypted: 解密后的明文
            @return: 删除补位字符后的明文
            """
            pad = ord(decrypted[-1])
            if pad<1 or pad >32:
                pad = 0
            return decrypted[:-pad]
    
    
    class Prpcrypt(object):
        """提供接收和推送给公众平台消息的加解密接口"""
    
        def __init__(self,key):
            #self.key = base64.b64decode(key+"=")
            self.key = key
            # 设置加解密模式为AES的CBC模式
            self.mode = AES.MODE_CBC
    
    
        def encrypt(self,text,appid):
            """对明文进行加密
            @param text: 需要加密的明文
            @return: 加密得到的字符串
            """
            # 16位随机字符串添加到明文开头
            bytes = self.get_random_str().encode() + struct.pack("I",socket.htonl(len(text.encode()))) + text.encode() + appid.encode()
            # 使用自定义的填充方式对明文进行补位填充
            pkcs7 = PKCS7Encoder()
            bytes = pkcs7.encode(bytes)
            cryptor = AES.new(self.key,self.mode,self.key[:16])
            try:
                ciphertext = cryptor.encrypt(bytes)
                # 使用BASE64对加密后的字符串进行编码
                return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext).decode()
            except Exception as e:
                return ierror.WXBizMsgCrypt_EncryptAES_Error, None
    
        def decrypt(self,text,appid):
            """对解密后的明文进行补位删除
            @param text: 密文
            @return: 删除填充补位后的明文
            """
            try:
                cryptor = AES.new(self.key,self.mode,self.key[:16])
                # 使用BASE64对密文进行解码,然后AES-CBC解密
                plain_text  = cryptor.decrypt(base64.b64decode(text)).decode()
            except Exception as e:
                return ierror.WXBizMsgCrypt_DecryptAES_Error, None
            try:
                pad = ord(plain_text[-1])
                # 去掉补位字符串
                #pkcs7 = PKCS7Encoder()
                #plain_text = pkcs7.encode(plain_text)
                # 去除16位随机字符串
                content = plain_text[16:-pad]
                xml_len = socket.ntohl(struct.unpack("I",content.encode()[ : 4])[0])
                xml_content = content.encode()[4 : xml_len+4].decode()
                from_appid = content.encode()[xml_len+4:].decode()
            except Exception as e:
                return ierror.WXBizMsgCrypt_IllegalBuffer, None
            if  from_appid != appid:
                return ierror.WXBizMsgCrypt_ValidateAppid_Error, None
            return 0,xml_content
    
        def get_random_str(self):
            """ 随机生成16位字符串
            @return: 16位字符串
            """
            rule = string.ascii_letters + string.digits
            str = random.sample(rule, 16)
            return "".join(str)
    
    class WXBizMsgCrypt(object):
        #构造函数
        #@param sToken: 公众平台上,开发者设置的Token
        # @param sEncodingAESKey: 公众平台上,开发者设置的EncodingAESKey
        # @param sAppId: 企业号的AppId
        def __init__(self,sToken,sEncodingAESKey,sAppId):
            try:
                self.key = base64.b64decode((sEncodingAESKey+"="))
                assert len(self.key) == 32
            except:
                throw_exception("[error]: EncodingAESKey unvalid !", FormatException)
            self.token = sToken
            self.appid = sAppId
    
        def EncryptMsg(self, sReplyMsg, sNonce, timestamp = None):
            #将公众号回复用户的消息加密打包
            #@param sReplyMsg: 企业号待回复用户的消息,xml格式的字符串
            #@param sTimeStamp: 时间戳,可以自己生成,也可以用URL参数的timestamp,如为None则自动用当前时间
            #@param sNonce: 随机串,可以自己生成,也可以用URL参数的nonce
            #sEncryptMsg: 加密后的可以直接回复用户的密文,包括msg_signature, timestamp, nonce, encrypt的xml格式的字符串,
            #return:成功0,sEncryptMsg,失败返回对应的错误码None
            pc = Prpcrypt(self.key)
            ret,encrypt = pc.encrypt(sReplyMsg, self.appid)
            if ret != 0:
                return ret,None
            if timestamp is None:
                timestamp = str(int(time.time()))
            # 生成安全签名
            sha1 = SHA1()
            ret,signature = sha1.getSHA1(self.token, timestamp, sNonce, encrypt)
            if ret != 0:
                return ret,None
            xmlParse = XMLParse()
            return ret,xmlParse.generate(encrypt, signature, timestamp, sNonce)
    
        def DecryptMsg(self, sPostData, sMsgSignature, sTimeStamp, sNonce):
            # 检验消息的真实性,并且获取解密后的明文
            # @param sMsgSignature: 签名串,对应URL参数的msg_signature
            # @param sTimeStamp: 时间戳,对应URL参数的timestamp
            # @param sNonce: 随机串,对应URL参数的nonce
            # @param sPostData: 密文,对应POST请求的数据
            #  xml_content: 解密后的原文,当return返回0时有效
            # @return: 成功0,失败返回对应的错误码
             # 验证安全签名
            xmlParse = XMLParse()
            ret,encrypt = xmlParse.extract(sPostData)
            if ret != 0:
                return ret, None
            sha1 = SHA1()
            ret,signature = sha1.getSHA1(self.token, sTimeStamp, sNonce, encrypt)
            if ret  != 0:
                return ret, None
            if not signature == sMsgSignature:
                return ierror.WXBizMsgCrypt_ValidateSignature_Error, None
            pc = Prpcrypt(self.key)
            ret,xml_content = pc.decrypt(encrypt,self.appid)
            return ret,xml_content
    
    
    

    ierrror.py不变

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    #########################################################################
    # Author: jonyqin
    # Created Time: Thu 11 Sep 2014 01:53:58 PM CST
    # File Name: ierror.py
    # Description:定义错误码含义 
    #########################################################################
    WXBizMsgCrypt_OK = 0
    WXBizMsgCrypt_ValidateSignature_Error = -40001
    WXBizMsgCrypt_ParseXml_Error = -40002
    WXBizMsgCrypt_ComputeSignature_Error = -40003
    WXBizMsgCrypt_IllegalAesKey = -40004
    WXBizMsgCrypt_ValidateAppid_Error = -40005
    WXBizMsgCrypt_EncryptAES_Error = -40006
    WXBizMsgCrypt_DecryptAES_Error = -40007
    WXBizMsgCrypt_IllegalBuffer = -40008
    WXBizMsgCrypt_EncodeBase64_Error = -40009
    WXBizMsgCrypt_DecodeBase64_Error = -40010
    WXBizMsgCrypt_GenReturnXml_Error = -40011
    
    

    顺便贴一下服务器验证的脚本,兼容模式下可用
    pip install tornado==6.0.3

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import tornado.web
    import tornado.httpserver
    import tornado.ioloop
    import tornado.options
    
    
    def check_signature(token,timestamp,nonce,signature):
        import hashlib
        list = [token, timestamp, nonce]
        list.sort()
        temp = ''.join(list)
        sha1 = hashlib.sha1(temp.encode())
        hashcode = sha1.hexdigest()
        return hashcode ==signature
    
    class MainHandler(tornado.web.RequestHandler):
        def get(self):
            signature = self.get_argument("signature")
            timestamp = self.get_argument("timestamp")
            nonce = self.get_argument("nonce")
            echostr = self.get_argument("echostr")
            token = "xxxxxx"
            if check_signature(token,timestamp,nonce,signature):
                self.write(echostr.encode())
                self.finish()
            else:
                self.finish()
    
        def post(self):
            content = self.request.body.decode()
            print(content)
            self.finish()
    
    urls = [
        (r'/.*',MainHandler)
    ]
    
    app = tornado.web.Application(handlers=urls)
    
    http_server = tornado.httpserver.HTTPServer(app)
    http_server.listen(8089)
    tornado.options.parse_command_line()
    tornado.ioloop.IOLoop.current().start()
    

    把post部分改下,即可回复文本

        def post(self):
            from libs.wx.WXBizMsgCrypt import WXBizMsgCrypt
    
            encodingAESKey = "xxxx"
            token="xxxx"
            appid = "xxxx"
    
            signature = self.get_argument("signature")
            timestamp = self.get_argument("timestamp")
            nonce = self.get_argument("nonce")
            if check_signature(token,timestamp,nonce,signature):
                pass
            msg_sign = self.get_argument("msg_signature")
            content = self.request.body.decode()
            wx_crypt = WXBizMsgCrypt(token, encodingAESKey, appid)
            ret, decryp_xml = wx_crypt.DecryptMsg(content, msg_sign, timestamp, nonce)
    
            import xml.etree.cElementTree as ET
    
            xml_tree = ET.fromstring(decryp_xml)
            to_user = xml_tree.find("ToUserName").text
            from_user = xml_tree.find("FromUserName").text
            msg_type = xml_tree.find("MsgType").text
            text = xml_tree.find("Content").text
    
            to_xml = """<xml><ToUserName><![CDATA[{to_user}]]></ToUserName>
      <FromUserName><![CDATA[{from_user}]]></FromUserName>
      <CreateTime>{create_time}</CreateTime>
      <MsgType><![CDATA[text]]></MsgType>
      <Content><![CDATA[{text}]]></Content>
    </xml>"""
            import time
            to_xml = to_xml.format(**{"to_user":from_user,"from_user":to_user,"create_time":time.time(),"text":"你好,"+text})
    
            ret, encrypt_xml = wx_crypt.EncryptMsg(to_xml, nonce)
            self.write(encrypt_xml.encode())
            self.finish()
    

    相关文章

      网友评论

          本文标题:微信公众号加密Python3版

          本文链接:https://www.haomeiwen.com/subject/gufzictx.html