美文网首页
百度AI人脸注册源码详解

百度AI人脸注册源码详解

作者: 一个无趣的人W | 来源:发表于2019-07-30 21:00 被阅读0次
    Summer

    1、起点AppConfig.client.addUser(image, imageType, groupId, userId, options)调用人脸注册api,从addUser进入

    def addUser(self, image, image_type, group_id, user_id, options=None):
        """
            人脸注册
        """
        options = options or {}
    
        data = {}
        data['image'] = image
        data['image_type'] = image_type
        data['group_id'] = group_id
        data['user_id'] = user_id
    
        data.update(options)
        return self._request(self.__userAddUrl, json.dumps(data, ensure_ascii=False), {
            'Content-Type': 'application/json',
        })
    
    详解:由此可见我们需要传四个必要参数image, image_type, group_id, user_id,一个可选参数options;
         接下来是 return self._request(self.__userAddUrl, json.dumps(data, ensure_ascii=False),根据__userAddUrl和序列化字段以及Content-Type来发送请求,看一下__userAddUrl是__userAddUrl = 'https://aip.baidubce.com/rest/2.0/face/v3/faceset/user/add',也就是说是向这个路由发送请求;
    

    2、那就要看一下_request请求是怎么发送的,点进去看

    def _request(self, url, data, headers=None):
        """
           self._request('', {})
        """
        try:
            result = self._validate(url, data)  # 判断路由和数据是否有效
            if result != True:
                return result
    
            authObj = self._auth()  # 实例化认证对象
            params = self._getParams(authObj)  # 实例化参数对象,传入的是认证对象,这里的参数就是access token
    
            data = self._proccessRequest(url, params, data, headers)  # 对请求进行处理,传入路由,参数,数据和报头
            headers = self._getAuthHeaders('POST', url, params, headers)  # 对报头进行认证,我猜测这里面有反爬措施
             # 通过实例化的对象发送请求,携带的参数为路由,数据,access token参数,报头,认证,超时时间;
             # 这里的__connectTimeout是自定义的连接超时时间,__socketTimeout是通过打开的连接传输数据的超时时间,单位都为ms
            response = self.__client.post(url, data=data, params=params,
                            headers=headers, verify=False, timeout=(
                                self.__connectTimeout,
                                self.__socketTimeout,
                            ), proxies=self._proxies
                        ) 
            obj = self._proccessResult(response.content)  # 最后返回的结果对象
            # 如果云端未注册并且调用接口的返回错误码是110,表示Access Token失效,这时就要再次进行认证
            if not self._isCloudUser and obj.get('error_code', '') == 110:  
                authObj = self._auth(True)
                params = self._getParams(authObj)
                response = self.__client.post(url, data=data, params=params,
                                headers=headers, verify=False, timeout=(
                                    self.__connectTimeout,
                                    self.__socketTimeout,
                                 ), proxies=self._proxies
                            )
                obj = self._proccessResult(response.content)
        # 抛异常,连接超时
        except (requests.exceptions.ReadTimeout,requests.exceptions.ConnectTimeout) as e:
            return {
                'error_code': 'SDK108',
                'error_msg': 'connection or read data timeout',
            }
    
        return obj
    

    3、至此调用基本结束,接下来看里面用到的几个方法:

    • ._auth()
    def _auth(self, refresh=False):
        """
            api access auth,获得access_token
        """
    
        #未过期
        if not refresh:
            # 设置超时时间为一个月
            tm = self._authObj.get('time', 0) + int(self._authObj.get('expires_in', 0)) - 30
            if tm > int(time.time()):
                return self._authObj
        # 如果access token过期,重新获取,传入参数:grant_type,client_id,client_secret和设置超时时间
        obj = self.__client.get(self.__accessTokenUrl, verify=False, params={
            'grant_type': 'client_credentials',
            'client_id': self._apiKey,
            'client_secret': self._secretKey,
        }, timeout=(
            self.__connectTimeout,
            self.__socketTimeout,
        ), proxies=self._proxies).json()
    
        self._isCloudUser = not self._isPermission(obj)
        obj['time'] = int(time.time())  # 按照当前时间重置access token超时时间
        self._authObj = obj
    
        return obj  # 返回认证过access token的对象
    
    • ._getParams(authObj)
    # 得到的参数就是access_token,认证校验传输都要携带
        def _getParams(self, authObj):
            """
                api request http url params
            """
    
            params = {}
    
            if self._isCloudUser == False:
                params['access_token'] = authObj['access_token']
    
            return params
    
    • ._proccessRequest(url, params, data, headers)
        def _proccessRequest(self, url, params, data, headers):
            """
                参数处理:在参数中加入语言、版本,比如python3
            """
    
            params['aipSdk'] = 'python'
            params['aipVersion'] = self.__version
    
            return data
    
    • ._getAuthHeaders('POST', url, params, headers)
        def _getAuthHeaders(self, method, url, params=None, headers=None):
            """
                api request http headers
            """
    
            headers = headers or {}
            params = params or {}
    
            if self._isCloudUser == False:  # 不是云端用户,返回报头
                return headers
    
            urlResult = urlparse(url)  # 解析路由
            for kv in urlResult.query.strip().split('&'):  # 对路由进行切分,取出有用的值把它赋给参数
                if kv:
                    k, v = kv.split('=')
                    params[k] = v
    
            # UTC timestamp,获取时间戳,主机号,版本等信息,保证唯一性
            timestamp = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
            headers['Host'] = urlResult.hostname
            headers['x-bce-date'] = timestamp
            version, expire = '1', '1800'
    
            # 1 Generate SigningKey
            # 拼接签名钥匙,使用hmac(HMAC是密钥相关的哈希运算消息认证码,HMAC运算利用哈希算法,以一个密钥和一个消息为输入,生成一个消息摘要作为输出)
            # 还要使用hashlib.sha256进行二次加密
            val = "bce-auth-v%s/%s/%s/%s" % (version, self._apiKey, timestamp, expire)
            signingKey = hmac.new(self._secretKey.encode('utf-8'), val.encode('utf-8'),
                hashlib.sha256
            ).hexdigest()
    
            # 2 Generate CanonicalRequest
            # 2.1 Genrate CanonicalURI
            canonicalUri = quote(urlResult.path)
            # 2.2 Generate CanonicalURI: not used here
            # 2.3 Generate CanonicalHeaders: only include host here
    
            canonicalHeaders = []
            for header, val in headers.items():
                canonicalHeaders.append(
                    '%s:%s' % (
                        quote(header.strip(), '').lower(),
                        quote(val.strip(), '')
                    )
                )
            canonicalHeaders = '\n'.join(sorted(canonicalHeaders))
    
            # 2.4 Generate CanonicalRequest
            canonicalRequest = '%s\n%s\n%s\n%s' % (
                method.upper(),
                canonicalUri,
                '&'.join(sorted(urlencode(params).split('&'))),
                canonicalHeaders
            )
    
            # 3 Generate Final Signature
            signature = hmac.new(signingKey.encode('utf-8'), canonicalRequest.encode('utf-8'),
                            hashlib.sha256
                        ).hexdigest()
    
            headers['authorization'] = 'bce-auth-v%s/%s/%s/%s/%s/%s' % (
                version,
                self._apiKey,
                timestamp,
                expire,
                ';'.join(headers.keys()).lower(),
                signature
            )
    
            return headers  # 最后头部携带信息进行传输,其中最重要的是signature,为此进行了两次大加密,大加密中又有两次小加密。不得不说,重要信息在传输过程中一定要慎重!!!
    
    • ._proccessResult(response.content)
        def _proccessResult(self, content):
            """
                formate result,格式化返回结果
            """
    
            if sys.version_info.major == 2:
                return json.loads(content) or {}
            else:
                return json.loads(content.decode()) or {}
    

    相关文章

      网友评论

          本文标题:百度AI人脸注册源码详解

          本文链接:https://www.haomeiwen.com/subject/kxdorctx.html