美文网首页
FACEBOOK数据开发

FACEBOOK数据开发

作者: hello_world_cxm | 来源:发表于2023-10-26 17:28 被阅读0次
**facebook api请求全部逻辑**
import time
from openpyxl import Workbook
import openpyxl
import os
import random
from datetime import datetime, timedelta
import facebook_business.exceptions
from facebook_business.adobjects.adaccount import AdAccount
from facebook_business.adobjects.campaign import Campaign
from facebook_business.api import FacebookAdsApi
from facebook_business.adobjects.adset import AdSet
from facebook_business.adobjects.ad import Ad
from facebook_business.adobjects.adsinsights import AdsInsights
from facebook_business.adobjects.adcreative import AdCreative
from facebook_business.adobjects.page import Page
from facebook_business.adobjects.pagepost import PagePost
from facebook_business.api import Cursor
from facebook_business.exceptions import *


class GetData:
    def __init__(self):
        #第一个token
        self.access_token="xxxx"
        self.app_secret = 'xxxx'
        self.app_id = 'xxxx'
        self.account_id = "xxxx"
        self.limit=300
        #存放系列、广告组、广告id
        self.campaign_ids_list=[]
        self.adset_ids_list=[]
        self.ad_ids_list=[]
        self.allcount=0
        self.since="2022-10-26"
        self.until="2022-10-26"
        self.maindata_list=[]
        self.getmaindata_list=[]
        self.getmaindata_dict={}
        #计算每次请求的错误,一个函数用完之后就会重置为0
        self.errorcount=0
        self.adactions_dict={}

        #存在广告历史列表
        self.historyList=[]

        #存放已经请求的1500条复制广告
        self.adscopies_all_list=[]
        self.adsetcopies_all_list=[]
        self.campaigncopies_all_list=[]

        #存放已经组成的复制广告{"ad id":[{'id': '23851XXXX270068'},{'id': '2385145XXXX70068'}]}
        self.adscopies_dict={}
        self.adsetcopies_dict={}
        self.campaigncopies_dict={}

        #存放清洗后的action数据
        self.cleared_actiondata_dict={}  #{"年龄":{"id1":{"18-24":{"purchase":1,"addtocart":2},"25-35":{"purchase":1,"addtocart":2}}},{"id2":{"18-24":{"purchase":1,"addtocart":2},"25-35":{"purchase":1,"addtocart":2}}}}
        # self.desktop_path=os.path.join(os.path.expanduser("~"), 'Desktop/')
        # self.nwb = openpyxl.load_workbook(self.desktop_path+"测试.xlsx")
        # self.nws=self.nwb.active
    #获取insight对象
    def getMainData(self,metric):
        print(type(metric),metric)
        getmaindata_list=[]  #临时存放所需内容
        try:
            fields = [
                "campaign_id",
                "adset_id",
                "adset_name",
                "ad_id",
                "ad_name",
                "spend",
                "cpc",
                "ctr",
                "cost_per_unique_click",
                "cost_per_inline_link_click",
                "inline_link_click_ctr",
                "inline_link_clicks",
                "cpm",
                "reach",
                "frequency",
                "impressions",
                "buying_type",
                "purchase_roas"
                      ]
            #
            # 构建 API 请求
            params = {
                "limit":self.limit,  #设置一个安全稳定的值
                'level': 'ad',
                'time_range': {'since': self.since, 'until': self.until},  # 指定时间范围
                'fields': fields,
                'filtering': [
                    {
                    "field": "spend",
                    "operator": "GREATER_THAN",
                    "value": 0
                  }
                ],
                'breakdowns': [metric], #age country publisher_platform
            }
            insights_Cursor = AdAccount(self.account_id).get_insights(params=params)
            getmaindata_list.extend(insights_Cursor)  #返回游标类型,然后调用load_next_page()方法可以获取剩余所有数据
            insights_Cursor.load_next_page()
            getmaindata_list.extend(insights_Cursor)

            #以metric作为key值,主数据作为value,放进字典{"总表":[adsight1,adsight2],"age":[adsight1,adsight2]...}
            if not metric:
                self.getmaindata_dict["总表"]=getmaindata_list
            else:
                self.getmaindata_dict[metric]=getmaindata_list
            #计算请求次数
            self.allcount = self.allcount + 1
            print("getmaindata_list长度",metric,len(getmaindata_list))
        except facebook_business.exceptions.FacebookRequestError:
            print("请求维度 {} 时发生错误,重调函数请求".format(metric))
            self.errorcount = self.errorcount + 1
            self.callStrategy("getMainData")  # 传函数名称,针对不同的函数,有不同的解决侧重点
            self.getMainData(metric)

    def writeToExcel(self):
        #self.maindata_list
        for data_tuple in self.maindata_list:
            print("插入的tuple是",data_tuple)
            self.nws.append(data_tuple)
        print("所有数据已经写入,开始保存表格")
        self.nwb.save(self.desktop_path+"测试.xlsx")

    def getIds(self):
        for i in range(len(self.getmaindata_dict["总表"])):  #只从总表获取campaign id,adse id,ad id即可
            Adsight=self.getmaindata_dict["总表"][i]
            campaign_id=Adsight["campaign_id"]
            adset_id=Adsight["adset_id"]
            ad_id=Adsight["ad_id"]
            print("系列id:{},广告组id:{},广告id:{}".format(campaign_id,adset_id,ad_id))
            #把这些id去重后添加到对应列表中,这点务必要去重
            if campaign_id not in self.campaign_ids_list:
                self.campaign_ids_list.append(campaign_id)
            if adset_id not in self.adset_ids_list:
                self.adset_ids_list.append(adset_id)
            if ad_id not in self.ad_ids_list:
                self.ad_ids_list.append(ad_id)
        print("ids获取完毕,系列id列表长度{},广告组id列表长度{},广告id列表长度{}".format(len(self.campaign_ids_list),len(self.adset_ids_list),len(self.ad_ids_list)))

    #为每条广告获取action数值,补充加购结账
    def getAdActions(self,metric):
        fields = [
                "ad_id",
                "actions",
                "spend"
            ]
        params_1 = {
            'time_range': {'since': self.since, 'until': self.until},  # 指定时间范围
            'fields': fields,
            'breakdowns': [metric]
        }
        adactions_list_temporary=[]
        lenids=len(self.ad_ids_list)
        print("lenids长度",lenids)
        for ad_id in self.ad_ids_list:
            print("广告id为",ad_id)
            try:
                ad_insight=Ad(ad_id).get_insights(params=params_1)
                print("ad_insight",ad_insight)
                adactions_list_temporary.extend(ad_insight)
                self.allcount = self.allcount + 1
                print("第 {} 次请求actions".format(self.allcount))
            except facebook_business.exceptions.FacebookRequestError:
                print("请求维度 {} 时发生错误,重调函数请求".format(metric))
                self.errorcount=self.errorcount+1
                break #暂不抛给策略函数
                self.callStrategy("getAdActions")  # 传函数名称,针对不同的函数,有不同的解决侧重点

        if not metric:
            #因为metric总表传递的空值
            self.adactions_dict["总表"]=adactions_list_temporary
        else:
            self.adactions_dict[metric]=adactions_list_temporary
    def controlRequestFrequency(self):
        pass
    def getCopies(self):
        #思路:通过方法调取账户内所有被复制的广告,包含了所有的广告,先用这部分与已有的匹配,匹配不到的才去请求
        #响应格式 {'copies': {'data': [{'id': '23851XXX4270068'}], 'paging': {'cursors': {'after': 'MjM4XXXXAwNjgZD', 'before': 'MjM4NTXXXzAwNjgZD'}}}, 'id': '238514XXXX350068'}  这个id是复制源广告
        params = {
            "limit": 1500,
            "fields": ["copies"]
        }
        #获取1500条复制广告
        adscopies = AdAccount(self.account_id).get_ads(params=params)
        self.adscopies_all_list.extend(adscopies)
        #获取1500条广告组
        adsetcopies = AdAccount(self.account_id).get_ad_sets(params=params)
        self.adsetcopies_all_list.extend(adsetcopies)
        # 获取1500条系列
        campaigncopies = AdAccount(self.account_id).get_campaigns(params=params)
        self.campaigncopies_all_list.extend(campaigncopies)

        #开始做匹配 广告
        for ad_copies_dict in self.adscopies_all_list:
            ad_source_ad_id=ad_copies_dict["id"]
            ad_copies_list=ad_copies_dict["copies"]["data"]
            if ad_source_ad_id in self.ad_ids_list:
                print("{} 这条 广告 有复制项 {}".format(ad_source_ad_id,ad_copies_list))
                self.adscopies_dict[ad_source_ad_id]=ad_copies_list
                #然后丢进列表
        #开始做匹配 广告组
        for adset_copies_dict in self.adsetcopies_all_list:
            adset_source_ad_id = adset_copies_dict["id"]
            adset_copies_list = adset_copies_dict["copies"]["data"]
            if adset_source_ad_id in self.adset_ids_list:
                print("{} 这条 广告组 有复制项 {}".format(adset_source_ad_id,adset_copies_list))
                self.adsetcopies_dict[adset_source_ad_id] = adset_copies_list
        #开始做匹配 系列
        for campaign_copies_dict in self.campaigncopies_all_list:
            campaign_source_ad_id = campaign_copies_dict["id"]
            campaign_copies_list = campaign_copies_dict["copies"]["data"]
            if campaign_source_ad_id in self.campaign_ids_list:
                print("{} 这条 系列 有复制项 {}".format(adset_source_ad_id,campaign_copies_list))
                self.campaigncopies_dict[campaign_source_ad_id] = campaign_copies_list
        # api请求ad copies
        print("开始请求ad copies")
        for ad_id in self.ad_ids_list:
            if ad_id not in self.adcopies_dict.keys():
                #说明还没有获取,那就通过api获取
                adcopies_list=Ad(ad_id).get_copies()  #如果没有copies返回[]
                if adcopies_list:
                    self.adcopies_dict[ad_id]=adcopies_list
                    print("{}这条 广告 有复制项 {}".format(ad_id,adcopies_list))
        # api请求adset copies
        print("开始请求adset copies")
        for adset_id in self.adset_ids_list:
            if adset_id not in self.adsetcopies_dict.keys():
                #说明还没有获取,那就通过api获取
                adsetcopies_list=AdSet(adset_id).get_copies()  #如果没有copies返回[]
                if adsetcopies_list:
                    self.adsetcopies_dict[adset_id]=adsetcopies_list
                    print("{}这条 广告组 有复制项 {}".format(adset_id,adsetcopies_list))
        # api请求campaign copies
        print("开始请求campaign copies")
        for campaign_id in self.campaign_ids_list:
            if campaign_id not in self.campaigncopies_dict.keys():
                # 说明还没有获取,那就通过api获取
                campaigncopies_list = Campaign(campaign_id).get_copies()  # 如果没有copies返回[]
                if campaigncopies_list:
                    self.campaigncopies_dict[campaign_id] = campaigncopies_list
                    print("{}这条 系列 有复制项 {}".format(campaign_id, campaigncopies_list))
        print("所有广告复制项获取完毕!")
    def getAdsPostIds(self):
        #根据广告id获取每个广告的facebook ins渠道的post信息以及连接,并将其放在列表上存储起来
        pass
        # fields=["effective_object_story_id","instagram_permalink_url","effective_instagram_media_id"]
        # adid_list=["ad1","ad2","ad3"]
        # for ad_id in adid_list:
        #     res=Ad(ad_id).get_ad_creatives(fields=fields)
        # print(len(res))
        # print(res)
        # 响应:
        # [<AdCreative> {
        #     "effective_instagram_media_id": "xxx",  #获取ins互动情况的id
        #     "effective_object_story_id": "page id_post id",
        #     "id": "creative id",
        #     "instagram_permalink_url": "xxx"
        # }]


    def getEachPostReaction(self):
        #获取两个渠道 的每条post id的互动情况。注意这里需要用page access token。这个token是根据access token产生的
        PagePost("主页id_post id").get_comments()
        PagePost("主页id_post id").get_likes()
        PagePost("主页id_post id").get_shared_posts()
        PagePost("主页id_post id").get_reactions()

    def callStrategy(self,functionname):
        token_list = [
            "XXXX",
            "XXXX",
            "XXXX"
        ]

        if functionname =="getMainData":
            #调用主数据,一般可能会发生
            pass
            #token仓库,后面还要考虑token的定时自动更新问题,这里先成一个字典

            random_value = random.choice(token_list)
            #每次返回随机返回不重复的值(不重复?),然后初始化FacebookAdsApi对象
            # FacebookAdsApi.init(access_token=random_value)

        elif functionname == "getAdActions":
            #这个函数多数发生的问题是达到额度,需要换其他的token再重调
            pass

        return
    def getHistory(self):
        #编辑的时间是世界协调时间。2020-06-01的编辑日志,那么就写成{'since': '2020-06-01', 'until' :'2020-06-02'}
        until=datetime.strptime(self.since, "%Y-%m-%d")+timedelta(days=1)
        until=str(until.strftime("%Y-%m-%d"))
        fields=["extra_data","actor_name","date_time_in_timezone","event_type"]
        params={
            "limit": "500",
            "fields":fields,
            'since': self.since, 'until' :until
        }
        history=AdAccount(self.account_id).get_activities(params=params)
        self.historyList.extend(self.historyList)
        history.load_next_page()
        print("{} 时的历史获取成功,长度为 {}".format(self.since,len(self.historyList)))
def collateAllDatas(self):
        #清洗主数据
        for key,value_list in self.getmaindata_dict.items():
            print("现在处理{}的数据".format(key))
            for maindata_dict in value_list:
                #设置一个空列表,将清洗后的值都存放在这里,方便与后面的action融合
                main_data_list=[]
                #开始清洗广告组名称
                cleared_adsetname=self.clearAdsetName(maindata_dict["adset_name"])
                maindata_dict["adset_name"]=cleared_adsetname
                #开始清洗广告名称
                cleared_adname=self.clearAdName(maindata_dict["ad_name"])
                maindata_dict["ad_name"]=cleared_adname
                #针对某些数值进行四舍五入,保持两位小数点
                try:
                    maindata_dict["cost_per_inline_link_click"] = round(maindata_dict["cost_per_inline_link_click"], 2)
                except KeyError:
                    maindata_dict["cost_per_inline_link_click"] = 0  # 如果不存在就默认设置为0
                try:
                    maindata_dict["cost_per_unique_click"] = round(maindata_dict["cost_per_unique_click"], 2)
                except KeyError:
                    maindata_dict["cost_per_unique_click"] = 0
                try:
                    # 将ctr变为百分比
                    maindata_dict["ctr"] = "{:.2%}".format(maindata_dict["ctr"] / 100)
                except KeyError:
                    maindata_dict["ctr"] = 0
                try:
                    maindata_dict["cpc"] = round(maindata_dict["cpc"], 2)
                except KeyError:
                    maindata_dict["cpc"] = 0
                maindata_dict["cpm"] = round(maindata_dict["cpm"], 2)
                try:
                    maindata_dict["inline_link_click_ctr"] = "{:.2%}".format(
                        maindata_dict["inline_link_click_ctr"] / 100)
                except KeyError:
                    maindata_dict["inline_link_click_ctr"] = 0
                # 获取purchase_roas
                try:
                    for dd in maindata_dict["purchase_roas"]:
                        if dd["action_type"] == "omni_purchase":  # 只需要匹配omni_purchase 然后获取其roas
                            maindata_dict["purchase_roas"] = dd["value"]
                except KeyError:  # 如果不存在就设置为0
                    maindata_dict["purchase_roas"] = 0
        print("主数据清洗完毕")
        #开始清洗actions数据
        print("开始清洗actions数据")
        # 将需要的提取出来,然后放到第一级 方便与insight组合
        # {
        #     "ad_id": "23859162940290230",
        #     "add_to_cart": 1,
        #     "initiate_checkout":3,
        #     "date_start": "2023-08-13",
        #     "date_stop": "2023-09-11"
        #     "age":"18-24"
        # }
        # {“总表”:[{},{},{}],"age":[{},{},{}],"coutry":[{},{},{}]}
        for key, value_list in self.adactions_dict.items():
            print("现在处理{}的数据".format(key))
            for action_dict_out in value_list:
                actionparams_list = ["add_to_cart", "initiate_checkout", "purchase", "post_comments",
                                     "post_shares", "post_engagement", "post_reaction"
                                     ]
                action_list=action_dict_out["actions"]
                for action_dict_in in action_list:
                    if action_dict_in["action_type"] in actionparams_list:
                        action_dict_out[action_dict_in["action_type"]]=action_dict_in["value"]
                action_dict_out.pop("actions")   #提取完就删除原来的actions
        print("开始组合insight和action数据")
        #insight {"总表":[{},{}],"age":[{},{}],"country":[{},{}],"publisher_platform":[{},{}]}
        #action # {“总表”:[{},{},{}],"age":[{},{},{}],"coutry":[{},{},{}]}
        #遍历主数据
        for key,insightlist in self.getmaindata_dict.items():
            print("现在在处理 {} 维度的数据".format(key))
            if key =="总表":
                for insightdict in insightlist:
                    for action_dict in self.adactions_dict["总表"]:
                        if insightdict["ad_id"] == action_dict["ad_id"]:
                            for key in action_dict.keys():
                                if key == "add_to_cart":  # 表示有add_to_cart,那可以计算加购成本
                                    insightdict[key] = action_dict[key]
                                    insightdict["cost_add_to_cart"] = round(insightdict["spend"] / action_dict[key], 2)
                                    break
                                elif key == "initiate_checkout":
                                    insightdict[key] = action_dict[key]
                                    insightdict["cost_initiate_checkout"] = round(
                                        insightdict["spend"] / action_dict[key], 2)
                                    break
                                elif key == "purchase":
                                    insightdict[key] = action_dict[key]
                                    insightdict["cost_purchase"] = round(insightdict["spend"] / action_dict[key], 2)
                                    insightdict["purchase_conversion_value"] = round(
                                        insightdict["spend"] * insightdict["purchase_roas"], 2)
                                    break
                                elif key == "post_engagement":
                                    insightdict[key] = action_dict[key]
                                    break
                                elif key == "post_reaction":
                                    insightdict[key] = action_dict[key]
                                    break
                                elif key == "post_comments":
                                    insightdict[key] = action_dict[key]
                                    break
                                elif key == "post_shares":
                                    insightdict[key] = action_dict[key]
                                    break

            else:
                for insightdict in insightlist:
                    for action_dict in self.adactions_dict["总表"]:
                        if insightdict["ad_id"] == action_dict["ad_id"] and insightdict[key] == action_dict[key]:   #除了匹配ad id还要匹配age、country等字段
                            for key in action_dict.keys():
                                    if key =="add_to_cart":  #表示有add_to_cart,那可以计算加购成本
                                        insightdict[key]=action_dict[key]
                                        insightdict["cost_add_to_cart"]=round(insightdict["spend"] / action_dict[key],2)
                                        break
                                    elif key=="initiate_checkout":
                                        insightdict[key] = action_dict[key]
                                        insightdict["cost_initiate_checkout"] = round(insightdict["spend"] / action_dict[key],2)
                                        break
                                    elif key=="purchase":
                                        insightdict[key] = action_dict[key]
                                        insightdict["cost_purchase"] = round(insightdict["spend"] / action_dict[key], 2)
                                        insightdict["purchase_conversion_value"] = round(insightdict["spend"] * insightdict["purchase_roas"], 2)
                                        break
                                    elif key=="post_engagement":
                                        insightdict[key] = action_dict[key]
                                        break
                                    elif key=="post_reaction":
                                        insightdict[key] = action_dict[key]
                                        break
                                    elif key=="post_comments":
                                        insightdict[key] = action_dict[key]
                                        break
                                    elif key=="post_shares":
                                        insightdict[key] = action_dict[key]
                                        break
        print("主数据与action数据已经关联完毕!")
        #先尝试写入表格
        self.writeToExcel()
        print("开始关联编辑日志")
        print("开始关联复制项")

    def clearAdsetName(self,adset_name):
        setnamelist = adset_name.split("_")
        lastword = setnamelist[-1]
        if len(setnamelist) > 4:  # 如果数据已经被清洗过一次,就不会重新清洗
            if " " in lastword:  # 只要有空格就说明需要清洗
                PureDateText = lastword.split(" ")[0]
                setnamelist.pop()  # 删除最后一个元素
                setnamelist.append(PureDateText)  # 添加新的元素进去
                newsetname = "_".join(setnamelist)
                return newsetname
            else:  # 没有空格说明,不需要清洗,直接赋值就行了
                return adset_name
        else:
            print("受众总长度不超过4个,跳过处理")
            return adset_name

    # 清洗广告名称函数
    def clearAdName(self, AdName):
        # 清洗广告素材名称
        if "_" not in AdName:
            AdName = AdName
        else:
            AdNameL = AdName.split("_")[:3]
            AdNameL.append(AdName.split("_")[-1:][0].split(" ")[0])
            AdName = "_".join(AdNameL)
        return AdName

    #获取utc世界协调时间
    def getUtcTime(self):
        # 获取当前的UTC时间
        utc_now = datetime.utcnow()
        # 添加8个小时
        new_time = utc_now + timedelta(hours=8)
        # 打印结果,省略毫秒部分
        print("当前UTC时间:", utc_now.strftime("%Y-%m-%d %H:%M:%S"))
        print("添加8个小时后的时间:", new_time.strftime("%Y-%m-%d %H:%M:%S"))
        return new_time.strftime("%Y-%m-%d %H:%M:%S")

if __name__ =="__main__":
    #总逻辑:分主数据、补充数据线。非串行运行,便利用不同的token策略

    try:
        start_time = time.time()
        getdata=GetData()
        FacebookAdsApi.init(access_token=getdata.access_token)
        for metric in ["","age","country", "publisher_platform"]:  #“”代表是总表 ,"country","publisher_platform"
            #获取insgiht对象
            insightcursor_Obj=getdata.getMainData(metric)
        getdata.errorcount=0
        print("getmaindata_dict",getdata.getmaindata_dict)
        #从中清洗出广告组id以及广告id,后面的所有api都会使用到
        getdata.getIds()
        for metric in ["","age","country", "publisher_platform"]: #“”代表是总表  , "country", "publisher_platform"
            #获取actions信息
            getdata.getAdActions(metric)
        print("self.adactions_dict",getdata.adactions_dict)  #{""}
        #获取每个广告的编辑日志
        getdata.getHistory()

        #获取每个层级复制后的对象,
        getdata.getCopies()

        #处理函数,针对已经收集的数据进行整理,写入表格
        getdata.collateAllDatas()

        # #获取每条广告的post_id
        # getdata.getAdsPostIds()
        #
        # #承接上一个函数,获取每个post id(去重)的互动情况
        # getdata.getEachPostReaction()
        #
        # #总请求次数
        # print("总请求次数", getdata.allcount)
    # except facebook_business.exceptions.FacebookRequestError:
    #     print("请求超时,请求次数为",getdata.allcount,"尝试换下一个token继续")
    finally:
        print("总请求次数",getdata.allcount)
        end_time = time.time()
    runtime = end_time - start_time
    print("总用时",runtime,"秒")
**获取指定日期所有的花费大于0的所有广告**
fields = [
          "campaign_id",
          "adset_id",
          "adset_name",
          "ad_id",
          "ad_name",
          "spend",
          "cpc",
          "ctr",
          "cost_per_unique_click",
          "cost_per_inline_link_click",
          "inline_link_click_ctr",
          "inline_link_clicks",
          "cpm",
          "reach",
          "frequency",
          "impressions",
          "buying_type",
          "purchase_roas",
          ]

# 构建 API 请求
params = {
    "limit":"10",  #默认设置1个大的int
    'level': 'ad',
    'time_range': {'since': '2023-08-02', 'until': '2023-08-02'},  # 指定时间范围
    'fields': ','.join(fields),
    'filtering': [
        {
        "field": "spend",
        "operator": "GREATER_THAN",
        "value": 0
      }
    ],
    'breakdowns': ["age"], #age country publisher_platform
}
响应:
<AdsInsights> {
    "ad_id": "xxx",
    "ad_name": "xxx",
    "adset_id": "xxx",
    "adset_name": "20 - 54_US,CA,UM,AU_All_xxxx",
    "age": "25-34",
    "buying_type": "AUCTION",
    "campaign_id": "xxxx",
    "cost_per_inline_link_click": "1.343333",
    "cost_per_unique_click": "0.806",
    "cpc": "0.806",
    "cpm": "14.041812",
    "ctr": "1.74216",
    "date_start": "2022-10-30",
    "date_stop": "2022-10-30",
    "frequency": "1.059041",
    "impressions": "287",
    "inline_link_click_ctr": "1.045296",
    "inline_link_clicks": "3",
    "purchase_roas": [
        {
            "action_type": "omni_purchase",
            "value": "22.48139"
        }
    ],
    "reach": "271",
    "spend": "4.03"
}
ps:如果没有转化那么就不会返回purchase_roas这个字段

# 按照指定维度(汇总、age、country、pulisher_platform)以及筛选条件批量获取每条广告fields字段。
ad_account = AdAccount(account_id)
insights = ad_account.get_insights(params=params)
print(len(insights))
print("insights",insights)
#响应缺失了加购以及层级状态这些,所以需要在下面用方法补充这些字段

fields_1 = \
    [
        "ad_id",
        "actions",
        "spend",
        #Purchases conversion value 需要自己计算purchase_roas*spend
        #Cost per results 需要自己计算spend/purchase
          ]

params_1 = {
    'time_range': {'since': '2023-08-02', 'until': '2023-08-02'},  # 指定时间范围
    'fields': ','.join(fields_1),
    'breakdowns': ["age"]
}

响应:
<AdsInsights> {
    "actions": [
        {
            "action_type": "onsite_web_add_to_cart",
            "value": "1"
        },
        {
            "action_type": "add_to_cart",
            "value": "1"
        },
        {
            "action_type": "onsite_web_app_view_content",
            "value": "3"
        },
        {
            "action_type": "onsite_web_purchase",
            "value": "1"
        },
        {
            "action_type": "post_engagement",
            "value": "50"
        },
        {
            "action_type": "onsite_web_app_add_to_cart",
            "value": "1"
        },
        {
            "action_type": "page_engagement",
            "value": "50"
        },
        {
            "action_type": "purchase",
            "value": "1"
        },
        {
            "action_type": "onsite_web_app_purchase",
            "value": "1"
        },
        {
            "action_type": "omni_add_to_cart",
            "value": "1"
        },
        {
            "action_type": "view_content",
            "value": "3"
        },
        {
            "action_type": "landing_page_view",
            "value": "8"
        },
        {
            "action_type": "onsite_web_view_content",
            "value": "3"
        },
        {
            "action_type": "video_view",
            "value": "42"
        },
        {
            "action_type": "omni_view_content",
            "value": "3"
        },
        {
            "action_type": "offsite_conversion.fb_pixel_view_content",
            "value": "3"
        },
        {
            "action_type": "offsite_conversion.fb_pixel_add_to_cart",
            "value": "1"
        },
        {
            "action_type": "offsite_conversion.fb_pixel_purchase",
            "value": "1"
        },
        {
            "action_type": "link_click",
            "value": "8"
        },
        {
            "action_type": "omni_purchase",
            "value": "1"
        }
    ],
    "ad_id": "xxx",
    "age": "45-54",
    "date_start": "2022-10-31",
    "date_stop": "2022-10-31",
    "spend": "2.68"
}]


#获取每条广告的加购、结账、购买、转化价值(需自计算)、roas、转化成本(需自计算)、帖子互动、帖子评论。尚缺失层级状态字段
ad_insight=Ad(ad_id).get_insights(params=params_1)
print(len(ad_insight))
print("广告层级",ad_insight)

#获取广告,广告组状态
ad_status=Campaign(campaign_id).get_ads(fields=["status"])
adset_status=Campaign(campaign_id).get_ad_sets(fields=["status"])
print(len(ad_status))
print("ad_status",ad_status)
print("adset_status",adset_status)

#判断层级预算类型并获取对应预算值
for cam_id in ["系列id1","系列id2","系列id3","系列id4","系列id5","系列id6"]:
    try:
        cambudget = Campaign(cam_id).api_get(fields=["daily_budget"])["daily_budget"]
        print("系列预算是",cambudget)
    except KeyError:
        print("说明是组预算")
        adset_budget=Campaign(cam_id).get_ad_sets(fields=["daily_budget"])
        print(adset_budget)
#第一步获取系列预算,如果不存在键daily_budget,那么就是组预算。数据库需要campaign_budget、adset_budget两个字段

处理逻辑:获取当天所有花费大于0的广告。然后根据系列id、组id进行归类。因为批量反馈的字段不全,所以需要遍历每个广告才能获取加购、结账、购买这些数据。另外也需要遍历每个广告组、广告获取其目前状态,最后获取账户的所有编辑记录,获取层级复制后的对象。然后根据其层级的各自id进行归属。以此类推

额外拓展:operator还有其他的的一些比较符。例如EQUAL, NOT_EQUAL, GREATER_THAN, GREATER_THAN_OR_EQUAL, LESS_THAN, LESS_THAN_OR_EQUAL, IN_RANGE, NOT_IN_RANGE, CONTAIN, NOT_CONTAIN, IN, NOT_IN, STARTS_WITH, ENDS_WITH, ANY, ALL, AFTER, BEFORE, ON_OR_AFTER, ON_OR_BEFORE, NONE, TOP
 "field": "spend",也可以改为"impresions"
# fields=["extra_data","actor_name","date_time_in_timezone","event_type"]
# params={'since': '2020-06-01', 'until' :'2023-08-15'}
#这里的时间是按照世界协调时间发送过去;如果想要获取2020-06-01的编辑日志,
那么就写成{'since': '2020-06-01', 'until' :'2020-06-02'}、2020-06-02号就写成{'since': '2020-06-02', 'until' :'2020-06-03'},以此类推
"date_time_in_timezone": "29/10/2022 at 18:25",
 "event_time": "2022-10-29T10:25:43+0000",
#世界协调时间+8个小时,就等于广告账户时间。
**获取广告账户指定时间的所有编辑记录**
fields=["extra_data","actor_name","date_time_in_timezone","event_type","object_id","event_time"]
params={'since': '2022-10-28', 'until' :'2022-10-29',"limit":"10000"}
res=AdAccount(account_id).get_activities(fields=fields,params=params)
#limit参数目前可以设置到10000,就是每次返回的数据量
fields=["campaign_name","adset_name","conversions","spend"]
params = {'time_range': {'since': '2022-10-01', 'until': '2022-10-05'},"level":"campaign","limit":"10","filtering":[{"Lifetime spent (campaign)":">100"}]}
#breakdown 官方文档https://developers.facebook.com/docs/marketing-api/insights/breakdowns/
#{'since': '2022-01-01', 'until': '2022-12-05'}
#响应数据如下,这里没有世界协调时间
#"date_start": "2022-01-01",
#"date_stop": "2022-12-05",
res=AdAccount(account_id).get_insights(params=params,fields=fields)
print("长度",len(res))
print(res)
**按照特定筛选条件筛选广告系列出来**
#设置给limit设置:10000,基本获取完了广告账户里所有的系列;接受int类型
fields=["campaign_id","adset_id","ad_id","adset_name","ad_name","campaign_name","conversions","spend","campaign_id","cost_per_inline_link_click"]
params = {
    'time_range': {'since': '2023-08-01', 'until': '2023-08-01'},
    "level":"campaign",
    "limit":5,    #响应的数量,默认可以设置为10000最大
    "filtering": [
      {
        "field": "spend",
        "operator": "GREATER_THAN",
        "value": 0
      }
]
          }
#https://developers.facebook.com/docs/marketing-api/reference/ad-account/insights/
#level 要获取的层级,有三个层级:{ad, adset, campaign, account}
#breakdown 官方文档https://developers.facebook.com/docs/marketing-api/insights/breakdowns/
#{'since': '2022-01-01', 'until': '2022-12-05'}
#这个时间就是广告账户的实际时间
res=AdAccount(account_id).get_insights(params=params,fields=fields)
print("长度",len(res))
print(res)
**获取单独的广告组层级编辑日志**
fields=["extra_data","actor_name","date_time_in_timezone","event_type"]
params={'since': '2020-06-01', 'until' :'2023-08-15'}
res=AdSet(adset_id).get_activities(params=params,fields=fields)
print(res)
**获取层级复制后的对象**
fields=["source_campaign_id"]
Campaign(campaign_id).get_copies(fields=fields)
Adset(adset_id).get_copies(fields=fields)
Ad(ad_id).get_copies(fields=fields)
**获取指定广告的数据以及互动情况**
fields=["actions"]
params={"time_range":{'since':"2023-06-29",'until':"2023-08-24"}}
print(Ad(ad_id).get_insights(fields=fields,params=params))

5)获取主页帖子的相关信息。产品标题、着陆页链接等

PAGEPOST-ID?fields=object_story_spec

响应:
{
  "object_story_spec": {
    "page_id": "[]()",
    "instagram_actor_id": "[]()",
    "video_data": {
      "video_id": "[]()",
      "title": "title",
      "message": "帖子文案",
      "link_description": "",
      "call_to_action": {
        "type": "SHOP_NOW",
        "value": {
          "link": "[)"
        }
      },
      "image_url": "[)",
      "image_hash": ""
    }
  },
  "id": "[]()"
}

7)如何获取60天的access token

curl -i -X GET "https://graph.facebook.com/oauth/access_token?grant_type=fb_exchange_token&
  client_id=APP-ID&
  client_secret=APP-SECRET&
  fb_exchange_token=SHORT-LIVED-USER-ACCESS-TOKEN"
#卸载应用、重装应用 或者更改facebook账号密码,这个token就会失效

8)如何获取60天的主页访问access token?

curl -i -X GET "https://graph.facebook.com/PAGE-ID?
  fields=access_token&
  access_token=USER-ACCESS-TOKEN"

9)如何读取广告post的评论?要使用主页token才能访问

PagePost("主页id_post id").get_comments()
#能获取所有评论、附件、分享的总数
#需要获取每条post的信息,同步到每条上,然后根据这些做个统计

10)获取广告账户下所有的post信息(post id、图片链接、广告文案等)

**
根据实际需求直接获取effective_object_story_id,instagram_permalink_url,effective_instagram_media_id就行,方便获取其帖子评论数、分享数。缺点这条api无法获取对应的ad id回来,不知道这个post应该归属到哪个
fields=["effective_object_story_id","instagram_permalink_url","effective_instagram_media_id"]
**
fields=["id","effective_object_story_id","title","instagram_permalink_url","object_story_spec"]
params = {
    'limit': 10
    }
res=AdAccount(account_id).get_ad_creatives(fields=fields,params=params)
响应:
<AdCreative> {
    "effective_object_story_id": "主页id_postid",
    "id": "创意id",  #每个创意都有其独一无二的id,即使完完整整复制出来也是
    "instagram_permalink_url": "xxx",
    "object_story_spec": {
        "instagram_actor_id": "xxx",
        "link_data": {
            "call_to_action": {
                "type": "SHOP_NOW"
            },
            "child_attachments": [  #有这个字段的说明是幻灯片
                {
                    "call_to_action": {
                        "type": "SHOP_NOW"
                    },
                    "image_hash": "xxx",
                    "link": "xxx",  #产品着陆页
                    "name": "Free Shipping over $49"
                },
                {
                    "call_to_action": {
                        "type": "SHOP_NOW"
                    },
                    "image_hash": "xxx",
                    "link": "xxx",
                    "name": "Free Shipping over $49",
                    "picture": "xxx",  #提示过期
                    "video_id": "xxx"
                },
                {
                    "call_to_action": {
                        "type": "SHOP_NOW"
                    },
                    "image_hash": "xxx",
                    "link": "xxx",
                    "name": "Free Shipping over $49"
                },
                {
                    "call_to_action": {
                        "type": "SHOP_NOW"
                    },
                    "image_hash": "xxx",
                    "link": "xxx",
                    "name": "Free Shipping over $49",
                    "picture": "xxx",
                    "video_id": "xxx"
                }
            ],
            "description": "xxx",
            "link": "xxx",
            "message": "xxx",
            "multi_share_end_card": true,
            "multi_share_optimized": true
        },
        "page_id": "xxx"
    },
    "title": "xxx" 主页名字
}]

11)已知所有广告id,所以通过以下方法遍历ids,得到["effective_object_story_id","instagram_permalink_url","effective_instagram_media_id"]字段值,然后写入到对应广告中,然后再通过以下方法获取每个post在facebook、ins的点赞数、评论数、分享数以及评论的所有值。当在操作视图中点击广告层级,就展现其帖子在两个渠道的互动情况。并且设置一个“互动情况视图”,以时间轴的形式,展示每3小时的评论、转化、点赞数。以折线图的形式展示,并且在图表上点击每天的评论数的时候,会有个区域展示其详细的评论内容,并且评论内容下方有对应的广告id点击直接复制。
难点:每次获取的评论数需要做去重处理,也就是只获取每天新增的评论数、评论值、分享数、点赞数。

fields=["effective_object_story_id","instagram_permalink_url","effective_instagram_media_id"]
adid_list=["ad1","ad2","ad3"]
for ad_id in adid_list:
    res=Ad(ad_id).get_ad_creatives(fields=fields)
print(len(res))
print(res)
响应:
[<AdCreative> {
    "effective_instagram_media_id": "xxx",  #获取ins互动情况的id
    "effective_object_story_id": "page id_post id",
    "id": "creative id",
    "instagram_permalink_url": "xxx"
}]

12)按照筛选日期,筛选出所有评论值,并对每条评论进行分析(用自然语言分析工具分析,GPT?)看看他们更关注的是那方面内容,最终做成“词云”,并显示在“互动情况视图”内


词云

13)应用请求速率
1)请求627条国家维度insight1次,加上41次action请求,一共42次,共耗用66%的usage,暂停3分钟,额度恢复到90%,大概3分钟恢复50%的额度。

目前请求每条广告160之后(83条spend大于0的广告,最快用时60s左右)请求额度恰好到了100%,然后5分钟之后开始重置,重置大概30s。所以最好是弄多几个用户token,并且可以计算剩余的请求额度,快用完额度的时候,切换到其他的用户进行请求,依此类推。
具体access_token调用

注意要有pages_manage_engagement
https://stackoverflow.com/questions/19517086/facebook-post-insights-metrics-download-via-api?rq=4
https://stackoverflow.com/questions/50108314/how-to-get-insights-metrics-facebook-graph-api
根据这个获取post的所有洞察信息(like comment shares)
https://developers.facebook.com/docs/graph-api/reference/v18.0/insights
主页id_post id?fields=insights.metric(post_activity_by_action_type,post_clicks_by_type,post_engaged_fan,post_negative_feedback_by_type_unique,post_impressions_unique,post_impressions,post_reactions_by_type_total,post_impressions_fan_unique,post_impressions_fan_paid_unique,post_impressions_organic_unique,post_impressions_viral_unique){values,title}
post_activity_by_action_type能获取评论数、分享数、喜欢
获取主页下所有post的洞察
主页id/posts?fields=insights.metric(post_consumptions_unique,post_negative_feedback_unique,post_engaged_users,post_impressions_unique,post_impressions,post_reactions_by_type_total,post_impressions_fan_unique,post_impressions_fan_paid_unique,post_impressions_organic_unique,post_impressions_viral_unique){values,title}
from facebook_business.adobjects.pagepost import PagePost

page_access_token=""
FacebookAdsApi.init(access_token=page_access_token)
#如何用python post id去获取帖子丰富的信息(facebook)获取主页帖子上的信息,需要用主页的访问token,在第8)点提到如何获取page access token

params={
    "metric":[
        "post_activity_by_action_type",
        "post_clicks_by_type",
        "post_impressions_unique",
        "post_impressions"]
}
ss=PagePost("pageid_postid").get_insights(params=params)
print(ss)

响应:
[<InsightsResult> {
    "description": "Lifetime: The number of stories created about your Page post, by action type. (Total Count)",
    "id": "pageid_postid/insights/post_activity_by_action_type/lifetime",
    "name": "post_activity_by_action_type",
    "period": "lifetime",
    "title": "Lifetime Post Stories by action type",
    "values": [
        {
            "value": {
                "comment": 7,
                "like": 70,
                "share": 2
            }
        }
    ]
}, <InsightsResult> {
    "description": "Lifetime: The number of clicks anywhere in the post on News Feed from users that matched the audience targeting on the post, by type. (Total Count)",
    "id": "pageid_postid/insights/post_clicks_by_type/lifetime",
    "name": "post_clicks_by_type",
    "period": "lifetime",
    "title": "Lifetime Matched Audience Targeting Consumptions by Type",
    "values": [
        {
            "value": {
                "link clicks": 13844,
                "other clicks": 9708,
                "video play": 2410
            }
        }
    ]
}, <InsightsResult> {
    "description": "Lifetime: The number of people who had your Page's post enter their screen. Posts include statuses, photos, links, videos and more. (Unique Users)",
    "id": "pageid_postid/insights/post_impressions_unique/lifetime",
    "name": "post_impressions_unique",
    "period": "lifetime",
    "title": "Lifetime Post Total Reach",
    "values": [
        {
            "value": 655125
        }
    ]
}, <InsightsResult> {
    "description": "Lifetime: The number of times your Page's post entered a person's screen. Posts include statuses, photos, links, videos, Reels and more. (Total Count)",
    "id": "pageid_postid/insights/post_impressions/lifetime",
    "name": "post_impressions",
    "period": "lifetime",
    "title": "Lifetime Post Total Impressions",
    "values": [
        {
            "value": 1206517
        }
    ]
}]

#如何单独获取指定系列、组的所有数据?适用于特别监测功能
跟获取整个账户数据差不多的逻辑,主要变动是在filtering那里,如果获取某个组的数据就adset.id
fields = [
        "campaign_id",
        "adset_id",
        "adset_name",
        "ad_id",
        "ad_name",
        "spend",
        "cpc",
        "ctr",
        "cost_per_unique_click",
        "cost_per_inline_link_click",
        "inline_link_click_ctr",
        "inline_link_clicks",
        "cpm",
        "reach",
        "frequency",
        "impressions",
        "buying_type",
        "purchase_roas"
]
params = {
    'level': 'ad',
    'time_range': {'since': since, 'until': until},  # 指定时间范围
    'fields': fields,
    'filtering': [{"field": "campaign.id","operator": "EQUAL","value": "23851411457790068"}],
    'breakdowns': [metric]  # age country publisher_platform
}
insights_Cursor = AdAccount(account_id).get_insights(params=params)
print("insights_Cursor",insights_Cursor)

适用的分别有以下几种流量类型
ads_insights、ads_management、custom_audience、instagram、leadgen、messenger 或 pages
在解决cursor的问题之前触犯的都是#80000的错误代码,但后面触犯的都是#80004的类型,

获取广告预算或者复制项似乎都是属于ads_management,因为每次返回的报错类型都是#80004

需要计算每次请求后的usage的使用量
ads_insights、ads_management两个api请求的限制程度不同,后者最为严格
获取的键有type 值是ads_management
获取的键有call_count 值是100  #超过100就会节流,并且是在1个小时后才能reset。
获取的键有total_cputime 值是8
获取的键有total_time 值是9
获取的键有estimated_time_to_regain_access 值是0
获取的键有ads_api_access_tier 值是development_access

获取广告编辑记录占用的也是ads_management的资源

image.png
{"run_status":{"old_value":1,"new_value":15},"old_value":"Active","new_value":"Inactive","rule_info":{"rule_id":"23851414101130068","rule_latest_name":"1026\u5e7f\u544a\u7cfb\u5217\u52a0\u8d2d","rule_status":"DELETED","evaluation_spec_id":"23851414101110068","execution_spec_id":"23851277479300068","rule_name":"1026\u5e7f\u544a\u7cfb\u5217\u52a0\u8d2d"},"type":"run_status"}
如果这个结果里面没有系列、广告组、广告的id,那么object就是被编辑对象的id
要对历史编辑进行清洗

通过以下方法可以获取剩余请求量,取ads_insights、ads_management的call_count、total_cputime、total_time最大值,如果有值大于75的,则程序可以需要停留60分钟。

ads_insights的限制比较小,每小时可以大约可以获取过去4天的数据d,2个广告账户(约12次4天2)。后面预算可以从编辑日志那里补充

##返回的对象AdAccount(self.account_id)有header属性,但Campaign()返回的是没有,所以直接将
appheader设置为“”,在checkAppUsage用另外的请求获取剩余请求量

insights_Cursor = AdAccount(self.account_id).get_insights(params=params)
appheader=dict(insights_Cursor.headers())

    def checkAppUsage(self,appheader):
        #data_type 如果是主数据、action数据都可以直接获取header,但campaign对象没有header属    性,所以需要用另外方式去获取
        if appheader:  #如果没有appheader说明是campaign这种对象
            print("appheader",appheader)
            usage_dict=json.loads(appheader["x-business-use-case-usage"])
            # print("usage_dict",type(usage_dict),usage_dict)
            for account_id,values_list in usage_dict.items():
                print("======账号id {} 的请求信息如下".format(account_id))
                for usage_inner_dict in values_list:
                    for key,value in usage_inner_dict.items():
                        print("获取的键有{} 值是{}".format(key,value))
                print("======信息结束")
        else:
            self.allcount = self.allcount + 1
            params = {
                "limit":1,
                'time_range': {'since': self.since, 'until': self.until}
            }# 指定时间范围
            res=AdAccount(self.account_id).get_campaigns(params=params)
            print("res",res)
            print("类型", type(res.headers()))
            appheader = dict(res.headers())
            # 获取请求量信息
            self.checkAppUsage(appheader)
响应:
获取的键有type 值是ads_insights
获取的键有call_count 值是1
获取的键有total_cputime 值是2
获取的键有total_time 值是3
获取的键有estimated_time_to_regain_access 值是0
获取的键有ads_api_access_tier 值是development_access

获取的键有type 值是ads_management
获取的键有call_count 值是32
获取的键有total_cputime 值是3
获取的键有total_time 值是4
获取的键有estimated_time_to_regain_access 值是0
获取的键有ads_api_access_tier 值是development_access

广告历史清洗
重要:每次获取的广告日志历史,都要针对7个字段计算一个hash值(id,object_id,actor_name,extra_data,event_type,event_time,
date_time_in_timezone),也就是生成一个指纹,然后对比库内数据,去重。如果是抓取每日的历史数据,都不会存在日志重复的概率,因为每日时间不同。但如果是分时去抓取,那肯定就是会存在重复的历史记录,所以需要md5去重。

原有方式漏洞:以往是根据每3个小时获取一次最新的预算值,但预算值在3个小时内可以变化无数次,不固定。虽说能获取最新的预算值,但并不知道这个预算值在3个小时内的变化情况,显然这不是我们想要的。所以直接提取日志里的预算变化是最正确的

直接遍历每个key,value,然后匹配到对应的对象上就行;因为编辑历史包含了广告生涯的所有信息,所以是否是系列预算、组预算等都可以匹配到,不需要单独再通过api请求
image.png

如何从广告日志获取预算信息;逻辑展示

    def matchLog(self,id,getbudgetBoolean,metricType):
        '''
        :param id: 这个ID可以为系列id、广告组id、广告id
        :param getbudgetBoolean: 这个是布尔值 意思是否要获取预算,真或假预示不同的处理逻辑
        :param metricType:代表层级类型 campaign、adset、ad。这里要跟id是同等级对应
        :return:
        #前提条件:
        1)所有日志都以去重的方式插入到数据库。如hash_id object_id actor_name date_time_in_timezone event_time event_type extra_data
        2)前端已经根据筛选条件给出要关联日志的系列id、广告组、广告id
        '''
        if getbudgetBoolean:  #说明单纯是为了找到系列预算、组预算
            #获取其id所有的日志记录
            if metricType == "campaign":
                #筛选id,event_type筛选 create_campaign_group,如果返回的对象存在new_value这个key,那么说明这个系列是系列预算
                #然后可以将这个对应关系直接写入原表或者设置个长期存储区,储存这个标记,知道一次预算类型,下次就不用判断
                #然后同样的id,event_type筛选 改为update_campaign_budget,然后提取extra_data的new_value值,然后按照时间由近到远排序,返回前端即可
            else:   #说明获取是获取组预算。还有这个id传入如果系列id已经为系列预算,那么旗下的组id就不能传入到这里,可以提前做识别,减少查询的消耗
                #通过上一步判断,已知道传入的组id肯定是组预算类型
                #判断组id的event_type是否有update_ad_set_budget这个值,如果没有就获取create_ad_set的值,然后同样提取extra_data的new_value值,然后按照时间由近到远排序,返回前端即可(因为有种情况就是广告从创建到现在都没有update_ad_set_budget的操作)
                #注意预算,在图表筛选器那里可以输入的,是用折线表示
            pass
        else:  #那么则是匹配所有日志记录
            #按照id以及metricType获取所有的日志记录就行了,因为入库之前已经做了去重
            #这个日志记录就是图表上y轴的虚线。按照actor_name,event_type,date_time_in_timezone,extra_data。第一版先直接展示extra_data,第二版再清洗一下extra_data就行
            pass

受众构建示例代码展示

if __name__ =="__main__":
        interestseed_list_before=["Sports", "Running","Healthy Habits"]     #这里的种子列表,就是requestAvailableBoolean为True    的兴趣词+手动添加的兴趣词
        interestseed_list_after=[word for i in interestseed_list_before for word in i.split(" ")]
        store_list = []  # 设置一个临时存放id的列表
        getdata.createInterestsLibrary(interestseed_list_after)

    def createInterestsLibrary(self,interestseed_list_after):
        '''
                程序任务:每天需要新增1000个兴趣词,管理员可以手动设置所需;根据{"call_count":0,"total_cputime":0,"total_time":0}来判断停歇时间(目前请求测试发现不会占用)
                自动创建任务;种子兴趣词的来源则是requestAvailableBoolean为True的兴趣词+手动添加的兴趣词;如果全部兴趣词的requestAvailableBoolean为False,则需要展示在任务结果(失败?失败理由是)
                最好在后端展示任务完成结果
        '''
        self.nwb = openpyxl.load_workbook(self.desktop_path + "受众表.xlsx")
        self.nws = self.nwb.active
        limit=200
        inserttime=datetime.datetime.now().date()
        for interest in interestseed_list_after:
            print("现在请求的兴趣词是 {}".format(interest))
            #构建数据包
            params={
                "type":"adinterest",
                "q":interest,
                "limit":limit,
                "access_token":self.access_token           #到时候会另外提供一个access_token,所以后面也要有个地方是填这个的
            }
            try:
                interestResponse=requests.get("https://graph.facebook.com/v18.0/search",params=params)
                print("头部",interestResponse.headers["x-app-usage"])  #{"call_count":0,"total_cputime":0,"total_time":0}目前测试并没有使用过
                interestResponse_dict = json.loads(interestResponse.text)
                interests_list = interestResponse_dict["data"]
                print("兴趣词",interest,"结果",len(interests_list),limit,len(interests_list) == limit)
                requestAvailableBoolean = len(interests_list) == limit  # True代表下次还可以请求
                if requestAvailableBoolean:
                    pass
                    #如果能在name找到对应的值,则对种子兴趣词的requestAvailableBoolean设置为True,因为有些split是后期split的
                else:
                    pass
                    #requestAvailableBoolean设置为False 那么下次就不会请求这个
            except Exception as e:
                print("发生ConnectionError,忽略,继续下一个", e)
                continue
            for interest_dict in interests_list:  # 先将已经获取的添加进表格
                interestId = interest_dict["id"]
                if interestId not in store_list:  # 用兴趣词id作为判断条件就行 不需要另外生成md5
                    store_list.append(interestId)
                    self.nws.append((inserttime, interestId, interest_dict.setdefault("topic", "其他"),
                                     interest_dict.setdefault("disambiguation_category", "其他"),
                                     interest_dict["name"], interest_dict["audience_size_lower_bound"],
                                     interest_dict["audience_size_upper_bound"], str(interest_dict["path"]),
                                     "True"))  #返回来的结果一律设置为True
            self.nwb.save(self.desktop_path + "受众表.xlsx")  # split之前先保存
            self.nwb.close()
   def success_callback(self,response):
        try:
            requestLink=response._call["relative_url"]  #23851301777270068/adcreatives?ad_id=23851301777270068&fields=effective_object_story_id%2Cinstagram_permalink_url%2Ceffective_instagram_media_id
            #23851301777270068 这个就是广告id,然后设置字典就行
            for item in response.headers():# response.headers()返回的是列表
                if item['name'] == 'X-Business-Use-Case-Usage':
                    value = json.loads(item['value'])
                    print("请求量信息 {},现在请求的连接 {} ".format(value,requestLink))
            pair = [response.json()['data']]
            self.batch_body_responses.extend(pair)

        except IndexError:
            print("发生 IndexError")
        except UnicodeEncodeError:
            print("发生 UnicodeEncodeError")

    def error_callback(self,response):
        pass

    def generate_batches(self,iterable, batch_size_limit):
        # This function can be found in examples/batch_utils.py
        batch = []

        for item in iterable:
            if len(batch) == batch_size_limit:
                yield batch
                batch = []
            batch.append(item)

        if len(batch):
            yield batch
    def getBatchData(self,api,requestData_dict):
        #requestData_dict {"requestIds_list":requestIds_list,"fields_list":fields_list,"params_dict":params_dict,"endpoint":"/copies"}
        batches = []
        batch_limit = 50
        for batch in self.generate_batches(requestData_dict["requestIds_list"], batch_limit):
            print("batch",batch)
            next_batch = api.new_batch()
            for requestid in batch:
                requestss = [FacebookRequest(node_id=requestid, method="GET", endpoint=requestData_dict["endpoint"]).add_fields(requestData_dict["fields_list"]).add_params(requestData_dict["params_dict"])]
                for req in requestss:
                    next_batch.add_request(req, self.success_callback, self.error_callback)
            batches.append(next_batch)
        for batch_request in batches:
            batch_request.execute()
        time.sleep(2)
        print("batch_body_responses",self.batch_body_responses)


 if __name__ =="__main__":
api=FacebookAdsApi.init(access_token=getdata.access_token)
            # ================利用批量方法请求 主数据===============开始
            for metric in ["", "age", "country", "publisher_platform"]:
                print("现在请求的维度是 {}".format(metric))
                fields_list = ["campaign_id", "adset_id", "adset_name", "ad_id", "ad_name", "spend", "cpc", "ctr", "clicks",
                               "unique_clicks", "cost_per_unique_click", "cost_per_inline_link_click",
                               "inline_link_click_ctr", "inline_link_clicks", "cpm", "reach", "frequency", "impressions",
                               "buying_type", "purchase_roas"]
                requestIds_list = [getdata.account_id]  # 若请求主数据,那么id就是广告账户id
                params_dict= {
                    "limit":300,  #设置一个安全稳定的值
                    'level': 'ad',
                    'time_range': {'since': getdata.since, 'until': getdata.until},  # 指定时间范围
                    'fields': fields_list,
                    'filtering': [
                        {
                        "field": "spend",
                        "operator": "GREATER_THAN",
                        "value": 0
                      }
                    ],
                    'breakdowns': [metric], #age country publisher_platform
                }
                requestData_dict={"requestIds_list":requestIds_list,"fields_list":fields_list,"params_dict":params_dict,"endpoint":"/insights"}
                print("requestIds_list长度",len(requestIds_list))
                getdata.getBatchData(api,requestData_dict)
            # ================利用批量方法请求 主数据===============结束

            #================利用批量方法请求 creative===============开始
            requestIds_list = ["id1","id2"]  # 存放要请求的id
            params_dict={}
            fields_list=["effective_object_story_id","instagram_permalink_url","effective_instagram_media_id"]
            requestData_dict={"requestIds_list":requestIds_list,"fields_list":fields_list,"params_dict":params_dict,"endpoint":"/adcreatives"}
            print("requestIds_list长度",len(requestIds_list))
            getdata.getBatchData(api,requestData_dict)
            # ================利用批量方法请求 creative===============结束

             # ================清洗creative 提取post_id===============开始
            print("清洗creative 提取post_id")
            api=FacebookAdsApi.init(access_token=getdata.page_access_token)
            #['105106692296682_110891305051554', '105106692296682_126300916843926', '105106692296682_131739622966722']
            effective_object_story_ids_list = [item[0]['effective_object_story_id'] for item in getdata.batch_body_responses]
            print("effective_object_story_ids_list",effective_object_story_ids_list)
            # effective_object_story_ids_list=['105106692296682_126300916843926', '105106692296682_126300916843926', '105106692296682_131739622966722']
            fields_list=[]
            params_dict = {
            "metric":[
                "post_activity_by_action_type",
                "post_negative_feedback_unique",
                "post_clicks_by_type",
                "post_impressions_unique",
                "post_impressions",
                "post_reactions_by_type_total"]
                }
            requestData_dict = {
                "requestIds_list": effective_object_story_ids_list,
                "fields_list": fields_list,
                "params_dict": params_dict,
                "endpoint": "/insights",
                "batch_limit": 50
            }
            getdata.getBatchData(api,requestData_dict)
            # ================清洗creative 提取post_id===============结束

            # ================利用批量方法请求 copies===============开始
            requestIds_list = ["id1","id2"]   # 存放要请求的id
            params_dict = {}
            fields_list = []
            requestData_dict = {"requestIds_list": requestIds_list, "fields_list": fields_list,
                                "params_dict": params_dict, "endpoint": "/copies"}
            print("requestIds_list长度", len(requestIds_list))
            getdata.getBatchData(api, requestData_dict)
            # ================利用批量方法请求 copies===============结束

            # ================利用批量方法请求 actions===============开始
            requestIds_list = ["id1","id2"] # 存放要请求的id
            fields_list = ["ad_id", "actions", "spend"]
            #requestIds_list不会变,breakdowns走遍历就行了,日期也是一样,我这里直接懒得完全写出来
            params_dict = {'time_range': {'since': "2022-10-28", 'until': "2022-10-28"},'breakdowns': "age"}   
            #将这些打包成一个字典数据包
            requestData_dict = {"requestIds_list": requestIds_list,
                                "fields_list": fields_list,
                                "params_dict": params_dict,
                                "endpoint": "/insights"}
            print("requestIds_list长度", len(requestIds_list))
            getdata.getBatchData(api, requestData_dict)
            # ================利用批量方法请求 actions===============结束

比如我想批量获取action数据,请求头信息如下:

image.png

兴趣词提取规则

import re

ss=["22 - 54_US,IT,GB,AU,DE_All_Sports,Running,Academy Sports + Outdoors,Women's clothing_+7","20 - 49_US,IT,CA,GB,AU,FR,DE_Women_PoleFreaks - Pole Dance & Fitness Community,Skin-tight garment_",
    "18 - 54_US,IT,CA,GB,AU,FR,DE_All_Personal care,Yoga,Muscle & Fitness,Sportswear (fashion)_+2","20 - 54_US,CA,UM,AU_All_Leggings,Sports,Bodybuilding,Physical exercise_+5"
    ]
pattern = r'\d+\s*-\s*\d+'
for adsetName in ss:
    adsetName_list=adsetName.split("_")
    print("adsetName_list",adsetName_list)
    for single_str in adsetName_list:
        if re.findall(pattern, single_str):
            print("是年龄 跳过")
        elif single_str.isupper():
            print("是国家")
        elif single_str.lower() in ["women","men","all","*"]:
            print("是性别")
        elif single_str[0].isupper() and single_str[1].islower():
            print("是兴趣词")
            for single_ins in single_str.split(","):
                print("每个兴趣词是",single_ins)
            break
        else:
            print("无法识别兴趣词组,跳过")

特别监测功能如何特定请求特定对象编辑日志

fields=["extra_data","actor_name","date_time_in_timezone","event_type","object_id"]
        params={
            "limit": "500",
            "fields":fields,
            'since': self.since, 'until' :until,
            "extra_oids":["系列id1","系列id2","广告组id1","广告id1"]
        }
        # extra_oids请求参数 输入要查询层级对象的id列表,可以将不同的层级的id放进来请求,fb会一次性返回。此方法适用于特别监测功能
        #将所有要特别监测的系列(旗下所有儿子 孙子id)+组预算的广告组(旗下儿子id),所有都丢进列表,响应去重插入数据库
        #优势:大大减少多余请求,节约api请求资源。
        history=AdAccount(self.account_id).get_activities(params=params)
        print("请求头",history.headers()["x-business-use-case-usage"])

相关文章

网友评论

      本文标题:FACEBOOK数据开发

      本文链接:https://www.haomeiwen.com/subject/cmtuidtx.html