**facebook api请求全部逻辑**
import time
from openpyxl import Workbook
import openpyxl
import os
import random
from datetime import datetime, timedelta
import facebook_business.exceptions
from facebook_business.adobjects.adaccount import AdAccount
from facebook_business.adobjects.campaign import Campaign
from facebook_business.api import FacebookAdsApi
from facebook_business.adobjects.adset import AdSet
from facebook_business.adobjects.ad import Ad
from facebook_business.adobjects.adsinsights import AdsInsights
from facebook_business.adobjects.adcreative import AdCreative
from facebook_business.adobjects.page import Page
from facebook_business.adobjects.pagepost import PagePost
from facebook_business.api import Cursor
from facebook_business.exceptions import *
class GetData:
def __init__(self):
#第一个token
self.access_token="xxxx"
self.app_secret = 'xxxx'
self.app_id = 'xxxx'
self.account_id = "xxxx"
self.limit=300
#存放系列、广告组、广告id
self.campaign_ids_list=[]
self.adset_ids_list=[]
self.ad_ids_list=[]
self.allcount=0
self.since="2022-10-26"
self.until="2022-10-26"
self.maindata_list=[]
self.getmaindata_list=[]
self.getmaindata_dict={}
#计算每次请求的错误,一个函数用完之后就会重置为0
self.errorcount=0
self.adactions_dict={}
#存在广告历史列表
self.historyList=[]
#存放已经请求的1500条复制广告
self.adscopies_all_list=[]
self.adsetcopies_all_list=[]
self.campaigncopies_all_list=[]
#存放已经组成的复制广告{"ad id":[{'id': '23851XXXX270068'},{'id': '2385145XXXX70068'}]}
self.adscopies_dict={}
self.adsetcopies_dict={}
self.campaigncopies_dict={}
#存放清洗后的action数据
self.cleared_actiondata_dict={} #{"年龄":{"id1":{"18-24":{"purchase":1,"addtocart":2},"25-35":{"purchase":1,"addtocart":2}}},{"id2":{"18-24":{"purchase":1,"addtocart":2},"25-35":{"purchase":1,"addtocart":2}}}}
# self.desktop_path=os.path.join(os.path.expanduser("~"), 'Desktop/')
# self.nwb = openpyxl.load_workbook(self.desktop_path+"测试.xlsx")
# self.nws=self.nwb.active
#获取insight对象
def getMainData(self,metric):
print(type(metric),metric)
getmaindata_list=[] #临时存放所需内容
try:
fields = [
"campaign_id",
"adset_id",
"adset_name",
"ad_id",
"ad_name",
"spend",
"cpc",
"ctr",
"cost_per_unique_click",
"cost_per_inline_link_click",
"inline_link_click_ctr",
"inline_link_clicks",
"cpm",
"reach",
"frequency",
"impressions",
"buying_type",
"purchase_roas"
]
#
# 构建 API 请求
params = {
"limit":self.limit, #设置一个安全稳定的值
'level': 'ad',
'time_range': {'since': self.since, 'until': self.until}, # 指定时间范围
'fields': fields,
'filtering': [
{
"field": "spend",
"operator": "GREATER_THAN",
"value": 0
}
],
'breakdowns': [metric], #age country publisher_platform
}
insights_Cursor = AdAccount(self.account_id).get_insights(params=params)
getmaindata_list.extend(insights_Cursor) #返回游标类型,然后调用load_next_page()方法可以获取剩余所有数据
insights_Cursor.load_next_page()
getmaindata_list.extend(insights_Cursor)
#以metric作为key值,主数据作为value,放进字典{"总表":[adsight1,adsight2],"age":[adsight1,adsight2]...}
if not metric:
self.getmaindata_dict["总表"]=getmaindata_list
else:
self.getmaindata_dict[metric]=getmaindata_list
#计算请求次数
self.allcount = self.allcount + 1
print("getmaindata_list长度",metric,len(getmaindata_list))
except facebook_business.exceptions.FacebookRequestError:
print("请求维度 {} 时发生错误,重调函数请求".format(metric))
self.errorcount = self.errorcount + 1
self.callStrategy("getMainData") # 传函数名称,针对不同的函数,有不同的解决侧重点
self.getMainData(metric)
def writeToExcel(self):
#self.maindata_list
for data_tuple in self.maindata_list:
print("插入的tuple是",data_tuple)
self.nws.append(data_tuple)
print("所有数据已经写入,开始保存表格")
self.nwb.save(self.desktop_path+"测试.xlsx")
def getIds(self):
for i in range(len(self.getmaindata_dict["总表"])): #只从总表获取campaign id,adse id,ad id即可
Adsight=self.getmaindata_dict["总表"][i]
campaign_id=Adsight["campaign_id"]
adset_id=Adsight["adset_id"]
ad_id=Adsight["ad_id"]
print("系列id:{},广告组id:{},广告id:{}".format(campaign_id,adset_id,ad_id))
#把这些id去重后添加到对应列表中,这点务必要去重
if campaign_id not in self.campaign_ids_list:
self.campaign_ids_list.append(campaign_id)
if adset_id not in self.adset_ids_list:
self.adset_ids_list.append(adset_id)
if ad_id not in self.ad_ids_list:
self.ad_ids_list.append(ad_id)
print("ids获取完毕,系列id列表长度{},广告组id列表长度{},广告id列表长度{}".format(len(self.campaign_ids_list),len(self.adset_ids_list),len(self.ad_ids_list)))
#为每条广告获取action数值,补充加购结账
def getAdActions(self,metric):
fields = [
"ad_id",
"actions",
"spend"
]
params_1 = {
'time_range': {'since': self.since, 'until': self.until}, # 指定时间范围
'fields': fields,
'breakdowns': [metric]
}
adactions_list_temporary=[]
lenids=len(self.ad_ids_list)
print("lenids长度",lenids)
for ad_id in self.ad_ids_list:
print("广告id为",ad_id)
try:
ad_insight=Ad(ad_id).get_insights(params=params_1)
print("ad_insight",ad_insight)
adactions_list_temporary.extend(ad_insight)
self.allcount = self.allcount + 1
print("第 {} 次请求actions".format(self.allcount))
except facebook_business.exceptions.FacebookRequestError:
print("请求维度 {} 时发生错误,重调函数请求".format(metric))
self.errorcount=self.errorcount+1
break #暂不抛给策略函数
self.callStrategy("getAdActions") # 传函数名称,针对不同的函数,有不同的解决侧重点
if not metric:
#因为metric总表传递的空值
self.adactions_dict["总表"]=adactions_list_temporary
else:
self.adactions_dict[metric]=adactions_list_temporary
def controlRequestFrequency(self):
pass
def getCopies(self):
#思路:通过方法调取账户内所有被复制的广告,包含了所有的广告,先用这部分与已有的匹配,匹配不到的才去请求
#响应格式 {'copies': {'data': [{'id': '23851XXX4270068'}], 'paging': {'cursors': {'after': 'MjM4XXXXAwNjgZD', 'before': 'MjM4NTXXXzAwNjgZD'}}}, 'id': '238514XXXX350068'} 这个id是复制源广告
params = {
"limit": 1500,
"fields": ["copies"]
}
#获取1500条复制广告
adscopies = AdAccount(self.account_id).get_ads(params=params)
self.adscopies_all_list.extend(adscopies)
#获取1500条广告组
adsetcopies = AdAccount(self.account_id).get_ad_sets(params=params)
self.adsetcopies_all_list.extend(adsetcopies)
# 获取1500条系列
campaigncopies = AdAccount(self.account_id).get_campaigns(params=params)
self.campaigncopies_all_list.extend(campaigncopies)
#开始做匹配 广告
for ad_copies_dict in self.adscopies_all_list:
ad_source_ad_id=ad_copies_dict["id"]
ad_copies_list=ad_copies_dict["copies"]["data"]
if ad_source_ad_id in self.ad_ids_list:
print("{} 这条 广告 有复制项 {}".format(ad_source_ad_id,ad_copies_list))
self.adscopies_dict[ad_source_ad_id]=ad_copies_list
#然后丢进列表
#开始做匹配 广告组
for adset_copies_dict in self.adsetcopies_all_list:
adset_source_ad_id = adset_copies_dict["id"]
adset_copies_list = adset_copies_dict["copies"]["data"]
if adset_source_ad_id in self.adset_ids_list:
print("{} 这条 广告组 有复制项 {}".format(adset_source_ad_id,adset_copies_list))
self.adsetcopies_dict[adset_source_ad_id] = adset_copies_list
#开始做匹配 系列
for campaign_copies_dict in self.campaigncopies_all_list:
campaign_source_ad_id = campaign_copies_dict["id"]
campaign_copies_list = campaign_copies_dict["copies"]["data"]
if campaign_source_ad_id in self.campaign_ids_list:
print("{} 这条 系列 有复制项 {}".format(adset_source_ad_id,campaign_copies_list))
self.campaigncopies_dict[campaign_source_ad_id] = campaign_copies_list
# api请求ad copies
print("开始请求ad copies")
for ad_id in self.ad_ids_list:
if ad_id not in self.adcopies_dict.keys():
#说明还没有获取,那就通过api获取
adcopies_list=Ad(ad_id).get_copies() #如果没有copies返回[]
if adcopies_list:
self.adcopies_dict[ad_id]=adcopies_list
print("{}这条 广告 有复制项 {}".format(ad_id,adcopies_list))
# api请求adset copies
print("开始请求adset copies")
for adset_id in self.adset_ids_list:
if adset_id not in self.adsetcopies_dict.keys():
#说明还没有获取,那就通过api获取
adsetcopies_list=AdSet(adset_id).get_copies() #如果没有copies返回[]
if adsetcopies_list:
self.adsetcopies_dict[adset_id]=adsetcopies_list
print("{}这条 广告组 有复制项 {}".format(adset_id,adsetcopies_list))
# api请求campaign copies
print("开始请求campaign copies")
for campaign_id in self.campaign_ids_list:
if campaign_id not in self.campaigncopies_dict.keys():
# 说明还没有获取,那就通过api获取
campaigncopies_list = Campaign(campaign_id).get_copies() # 如果没有copies返回[]
if campaigncopies_list:
self.campaigncopies_dict[campaign_id] = campaigncopies_list
print("{}这条 系列 有复制项 {}".format(campaign_id, campaigncopies_list))
print("所有广告复制项获取完毕!")
def getAdsPostIds(self):
#根据广告id获取每个广告的facebook ins渠道的post信息以及连接,并将其放在列表上存储起来
pass
# fields=["effective_object_story_id","instagram_permalink_url","effective_instagram_media_id"]
# adid_list=["ad1","ad2","ad3"]
# for ad_id in adid_list:
# res=Ad(ad_id).get_ad_creatives(fields=fields)
# print(len(res))
# print(res)
# 响应:
# [<AdCreative> {
# "effective_instagram_media_id": "xxx", #获取ins互动情况的id
# "effective_object_story_id": "page id_post id",
# "id": "creative id",
# "instagram_permalink_url": "xxx"
# }]
def getEachPostReaction(self):
#获取两个渠道 的每条post id的互动情况。注意这里需要用page access token。这个token是根据access token产生的
PagePost("主页id_post id").get_comments()
PagePost("主页id_post id").get_likes()
PagePost("主页id_post id").get_shared_posts()
PagePost("主页id_post id").get_reactions()
def callStrategy(self,functionname):
token_list = [
"XXXX",
"XXXX",
"XXXX"
]
if functionname =="getMainData":
#调用主数据,一般可能会发生
pass
#token仓库,后面还要考虑token的定时自动更新问题,这里先成一个字典
random_value = random.choice(token_list)
#每次返回随机返回不重复的值(不重复?),然后初始化FacebookAdsApi对象
# FacebookAdsApi.init(access_token=random_value)
elif functionname == "getAdActions":
#这个函数多数发生的问题是达到额度,需要换其他的token再重调
pass
return
def getHistory(self):
#编辑的时间是世界协调时间。2020-06-01的编辑日志,那么就写成{'since': '2020-06-01', 'until' :'2020-06-02'}
until=datetime.strptime(self.since, "%Y-%m-%d")+timedelta(days=1)
until=str(until.strftime("%Y-%m-%d"))
fields=["extra_data","actor_name","date_time_in_timezone","event_type"]
params={
"limit": "500",
"fields":fields,
'since': self.since, 'until' :until
}
history=AdAccount(self.account_id).get_activities(params=params)
self.historyList.extend(self.historyList)
history.load_next_page()
print("{} 时的历史获取成功,长度为 {}".format(self.since,len(self.historyList)))
def collateAllDatas(self):
#清洗主数据
for key,value_list in self.getmaindata_dict.items():
print("现在处理{}的数据".format(key))
for maindata_dict in value_list:
#设置一个空列表,将清洗后的值都存放在这里,方便与后面的action融合
main_data_list=[]
#开始清洗广告组名称
cleared_adsetname=self.clearAdsetName(maindata_dict["adset_name"])
maindata_dict["adset_name"]=cleared_adsetname
#开始清洗广告名称
cleared_adname=self.clearAdName(maindata_dict["ad_name"])
maindata_dict["ad_name"]=cleared_adname
#针对某些数值进行四舍五入,保持两位小数点
try:
maindata_dict["cost_per_inline_link_click"] = round(maindata_dict["cost_per_inline_link_click"], 2)
except KeyError:
maindata_dict["cost_per_inline_link_click"] = 0 # 如果不存在就默认设置为0
try:
maindata_dict["cost_per_unique_click"] = round(maindata_dict["cost_per_unique_click"], 2)
except KeyError:
maindata_dict["cost_per_unique_click"] = 0
try:
# 将ctr变为百分比
maindata_dict["ctr"] = "{:.2%}".format(maindata_dict["ctr"] / 100)
except KeyError:
maindata_dict["ctr"] = 0
try:
maindata_dict["cpc"] = round(maindata_dict["cpc"], 2)
except KeyError:
maindata_dict["cpc"] = 0
maindata_dict["cpm"] = round(maindata_dict["cpm"], 2)
try:
maindata_dict["inline_link_click_ctr"] = "{:.2%}".format(
maindata_dict["inline_link_click_ctr"] / 100)
except KeyError:
maindata_dict["inline_link_click_ctr"] = 0
# 获取purchase_roas
try:
for dd in maindata_dict["purchase_roas"]:
if dd["action_type"] == "omni_purchase": # 只需要匹配omni_purchase 然后获取其roas
maindata_dict["purchase_roas"] = dd["value"]
except KeyError: # 如果不存在就设置为0
maindata_dict["purchase_roas"] = 0
print("主数据清洗完毕")
#开始清洗actions数据
print("开始清洗actions数据")
# 将需要的提取出来,然后放到第一级 方便与insight组合
# {
# "ad_id": "23859162940290230",
# "add_to_cart": 1,
# "initiate_checkout":3,
# "date_start": "2023-08-13",
# "date_stop": "2023-09-11"
# "age":"18-24"
# }
# {“总表”:[{},{},{}],"age":[{},{},{}],"coutry":[{},{},{}]}
for key, value_list in self.adactions_dict.items():
print("现在处理{}的数据".format(key))
for action_dict_out in value_list:
actionparams_list = ["add_to_cart", "initiate_checkout", "purchase", "post_comments",
"post_shares", "post_engagement", "post_reaction"
]
action_list=action_dict_out["actions"]
for action_dict_in in action_list:
if action_dict_in["action_type"] in actionparams_list:
action_dict_out[action_dict_in["action_type"]]=action_dict_in["value"]
action_dict_out.pop("actions") #提取完就删除原来的actions
print("开始组合insight和action数据")
#insight {"总表":[{},{}],"age":[{},{}],"country":[{},{}],"publisher_platform":[{},{}]}
#action # {“总表”:[{},{},{}],"age":[{},{},{}],"coutry":[{},{},{}]}
#遍历主数据
for key,insightlist in self.getmaindata_dict.items():
print("现在在处理 {} 维度的数据".format(key))
if key =="总表":
for insightdict in insightlist:
for action_dict in self.adactions_dict["总表"]:
if insightdict["ad_id"] == action_dict["ad_id"]:
for key in action_dict.keys():
if key == "add_to_cart": # 表示有add_to_cart,那可以计算加购成本
insightdict[key] = action_dict[key]
insightdict["cost_add_to_cart"] = round(insightdict["spend"] / action_dict[key], 2)
break
elif key == "initiate_checkout":
insightdict[key] = action_dict[key]
insightdict["cost_initiate_checkout"] = round(
insightdict["spend"] / action_dict[key], 2)
break
elif key == "purchase":
insightdict[key] = action_dict[key]
insightdict["cost_purchase"] = round(insightdict["spend"] / action_dict[key], 2)
insightdict["purchase_conversion_value"] = round(
insightdict["spend"] * insightdict["purchase_roas"], 2)
break
elif key == "post_engagement":
insightdict[key] = action_dict[key]
break
elif key == "post_reaction":
insightdict[key] = action_dict[key]
break
elif key == "post_comments":
insightdict[key] = action_dict[key]
break
elif key == "post_shares":
insightdict[key] = action_dict[key]
break
else:
for insightdict in insightlist:
for action_dict in self.adactions_dict["总表"]:
if insightdict["ad_id"] == action_dict["ad_id"] and insightdict[key] == action_dict[key]: #除了匹配ad id还要匹配age、country等字段
for key in action_dict.keys():
if key =="add_to_cart": #表示有add_to_cart,那可以计算加购成本
insightdict[key]=action_dict[key]
insightdict["cost_add_to_cart"]=round(insightdict["spend"] / action_dict[key],2)
break
elif key=="initiate_checkout":
insightdict[key] = action_dict[key]
insightdict["cost_initiate_checkout"] = round(insightdict["spend"] / action_dict[key],2)
break
elif key=="purchase":
insightdict[key] = action_dict[key]
insightdict["cost_purchase"] = round(insightdict["spend"] / action_dict[key], 2)
insightdict["purchase_conversion_value"] = round(insightdict["spend"] * insightdict["purchase_roas"], 2)
break
elif key=="post_engagement":
insightdict[key] = action_dict[key]
break
elif key=="post_reaction":
insightdict[key] = action_dict[key]
break
elif key=="post_comments":
insightdict[key] = action_dict[key]
break
elif key=="post_shares":
insightdict[key] = action_dict[key]
break
print("主数据与action数据已经关联完毕!")
#先尝试写入表格
self.writeToExcel()
print("开始关联编辑日志")
print("开始关联复制项")
def clearAdsetName(self,adset_name):
setnamelist = adset_name.split("_")
lastword = setnamelist[-1]
if len(setnamelist) > 4: # 如果数据已经被清洗过一次,就不会重新清洗
if " " in lastword: # 只要有空格就说明需要清洗
PureDateText = lastword.split(" ")[0]
setnamelist.pop() # 删除最后一个元素
setnamelist.append(PureDateText) # 添加新的元素进去
newsetname = "_".join(setnamelist)
return newsetname
else: # 没有空格说明,不需要清洗,直接赋值就行了
return adset_name
else:
print("受众总长度不超过4个,跳过处理")
return adset_name
# 清洗广告名称函数
def clearAdName(self, AdName):
# 清洗广告素材名称
if "_" not in AdName:
AdName = AdName
else:
AdNameL = AdName.split("_")[:3]
AdNameL.append(AdName.split("_")[-1:][0].split(" ")[0])
AdName = "_".join(AdNameL)
return AdName
#获取utc世界协调时间
def getUtcTime(self):
# 获取当前的UTC时间
utc_now = datetime.utcnow()
# 添加8个小时
new_time = utc_now + timedelta(hours=8)
# 打印结果,省略毫秒部分
print("当前UTC时间:", utc_now.strftime("%Y-%m-%d %H:%M:%S"))
print("添加8个小时后的时间:", new_time.strftime("%Y-%m-%d %H:%M:%S"))
return new_time.strftime("%Y-%m-%d %H:%M:%S")
if __name__ =="__main__":
#总逻辑:分主数据、补充数据线。非串行运行,便利用不同的token策略
try:
start_time = time.time()
getdata=GetData()
FacebookAdsApi.init(access_token=getdata.access_token)
for metric in ["","age","country", "publisher_platform"]: #“”代表是总表 ,"country","publisher_platform"
#获取insgiht对象
insightcursor_Obj=getdata.getMainData(metric)
getdata.errorcount=0
print("getmaindata_dict",getdata.getmaindata_dict)
#从中清洗出广告组id以及广告id,后面的所有api都会使用到
getdata.getIds()
for metric in ["","age","country", "publisher_platform"]: #“”代表是总表 , "country", "publisher_platform"
#获取actions信息
getdata.getAdActions(metric)
print("self.adactions_dict",getdata.adactions_dict) #{""}
#获取每个广告的编辑日志
getdata.getHistory()
#获取每个层级复制后的对象,
getdata.getCopies()
#处理函数,针对已经收集的数据进行整理,写入表格
getdata.collateAllDatas()
# #获取每条广告的post_id
# getdata.getAdsPostIds()
#
# #承接上一个函数,获取每个post id(去重)的互动情况
# getdata.getEachPostReaction()
#
# #总请求次数
# print("总请求次数", getdata.allcount)
# except facebook_business.exceptions.FacebookRequestError:
# print("请求超时,请求次数为",getdata.allcount,"尝试换下一个token继续")
finally:
print("总请求次数",getdata.allcount)
end_time = time.time()
runtime = end_time - start_time
print("总用时",runtime,"秒")
**获取指定日期所有的花费大于0的所有广告**
fields = [
"campaign_id",
"adset_id",
"adset_name",
"ad_id",
"ad_name",
"spend",
"cpc",
"ctr",
"cost_per_unique_click",
"cost_per_inline_link_click",
"inline_link_click_ctr",
"inline_link_clicks",
"cpm",
"reach",
"frequency",
"impressions",
"buying_type",
"purchase_roas",
]
# 构建 API 请求
params = {
"limit":"10", #默认设置1个大的int
'level': 'ad',
'time_range': {'since': '2023-08-02', 'until': '2023-08-02'}, # 指定时间范围
'fields': ','.join(fields),
'filtering': [
{
"field": "spend",
"operator": "GREATER_THAN",
"value": 0
}
],
'breakdowns': ["age"], #age country publisher_platform
}
响应:
<AdsInsights> {
"ad_id": "xxx",
"ad_name": "xxx",
"adset_id": "xxx",
"adset_name": "20 - 54_US,CA,UM,AU_All_xxxx",
"age": "25-34",
"buying_type": "AUCTION",
"campaign_id": "xxxx",
"cost_per_inline_link_click": "1.343333",
"cost_per_unique_click": "0.806",
"cpc": "0.806",
"cpm": "14.041812",
"ctr": "1.74216",
"date_start": "2022-10-30",
"date_stop": "2022-10-30",
"frequency": "1.059041",
"impressions": "287",
"inline_link_click_ctr": "1.045296",
"inline_link_clicks": "3",
"purchase_roas": [
{
"action_type": "omni_purchase",
"value": "22.48139"
}
],
"reach": "271",
"spend": "4.03"
}
ps:如果没有转化那么就不会返回purchase_roas这个字段
# 按照指定维度(汇总、age、country、pulisher_platform)以及筛选条件批量获取每条广告fields字段。
ad_account = AdAccount(account_id)
insights = ad_account.get_insights(params=params)
print(len(insights))
print("insights",insights)
#响应缺失了加购以及层级状态这些,所以需要在下面用方法补充这些字段
fields_1 = \
[
"ad_id",
"actions",
"spend",
#Purchases conversion value 需要自己计算purchase_roas*spend
#Cost per results 需要自己计算spend/purchase
]
params_1 = {
'time_range': {'since': '2023-08-02', 'until': '2023-08-02'}, # 指定时间范围
'fields': ','.join(fields_1),
'breakdowns': ["age"]
}
响应:
<AdsInsights> {
"actions": [
{
"action_type": "onsite_web_add_to_cart",
"value": "1"
},
{
"action_type": "add_to_cart",
"value": "1"
},
{
"action_type": "onsite_web_app_view_content",
"value": "3"
},
{
"action_type": "onsite_web_purchase",
"value": "1"
},
{
"action_type": "post_engagement",
"value": "50"
},
{
"action_type": "onsite_web_app_add_to_cart",
"value": "1"
},
{
"action_type": "page_engagement",
"value": "50"
},
{
"action_type": "purchase",
"value": "1"
},
{
"action_type": "onsite_web_app_purchase",
"value": "1"
},
{
"action_type": "omni_add_to_cart",
"value": "1"
},
{
"action_type": "view_content",
"value": "3"
},
{
"action_type": "landing_page_view",
"value": "8"
},
{
"action_type": "onsite_web_view_content",
"value": "3"
},
{
"action_type": "video_view",
"value": "42"
},
{
"action_type": "omni_view_content",
"value": "3"
},
{
"action_type": "offsite_conversion.fb_pixel_view_content",
"value": "3"
},
{
"action_type": "offsite_conversion.fb_pixel_add_to_cart",
"value": "1"
},
{
"action_type": "offsite_conversion.fb_pixel_purchase",
"value": "1"
},
{
"action_type": "link_click",
"value": "8"
},
{
"action_type": "omni_purchase",
"value": "1"
}
],
"ad_id": "xxx",
"age": "45-54",
"date_start": "2022-10-31",
"date_stop": "2022-10-31",
"spend": "2.68"
}]
#获取每条广告的加购、结账、购买、转化价值(需自计算)、roas、转化成本(需自计算)、帖子互动、帖子评论。尚缺失层级状态字段
ad_insight=Ad(ad_id).get_insights(params=params_1)
print(len(ad_insight))
print("广告层级",ad_insight)
#获取广告,广告组状态
ad_status=Campaign(campaign_id).get_ads(fields=["status"])
adset_status=Campaign(campaign_id).get_ad_sets(fields=["status"])
print(len(ad_status))
print("ad_status",ad_status)
print("adset_status",adset_status)
#判断层级预算类型并获取对应预算值
for cam_id in ["系列id1","系列id2","系列id3","系列id4","系列id5","系列id6"]:
try:
cambudget = Campaign(cam_id).api_get(fields=["daily_budget"])["daily_budget"]
print("系列预算是",cambudget)
except KeyError:
print("说明是组预算")
adset_budget=Campaign(cam_id).get_ad_sets(fields=["daily_budget"])
print(adset_budget)
#第一步获取系列预算,如果不存在键daily_budget,那么就是组预算。数据库需要campaign_budget、adset_budget两个字段
处理逻辑:获取当天所有花费大于0的广告。然后根据系列id、组id进行归类。因为批量反馈的字段不全,所以需要遍历每个广告才能获取加购、结账、购买这些数据。另外也需要遍历每个广告组、广告获取其目前状态,最后获取账户的所有编辑记录,获取层级复制后的对象。然后根据其层级的各自id进行归属。以此类推
额外拓展:operator还有其他的的一些比较符。例如EQUAL, NOT_EQUAL, GREATER_THAN, GREATER_THAN_OR_EQUAL, LESS_THAN, LESS_THAN_OR_EQUAL, IN_RANGE, NOT_IN_RANGE, CONTAIN, NOT_CONTAIN, IN, NOT_IN, STARTS_WITH, ENDS_WITH, ANY, ALL, AFTER, BEFORE, ON_OR_AFTER, ON_OR_BEFORE, NONE, TOP
"field": "spend",也可以改为"impresions"
# fields=["extra_data","actor_name","date_time_in_timezone","event_type"]
# params={'since': '2020-06-01', 'until' :'2023-08-15'}
#这里的时间是按照世界协调时间发送过去;如果想要获取2020-06-01的编辑日志,
那么就写成{'since': '2020-06-01', 'until' :'2020-06-02'}、2020-06-02号就写成{'since': '2020-06-02', 'until' :'2020-06-03'},以此类推
"date_time_in_timezone": "29/10/2022 at 18:25",
"event_time": "2022-10-29T10:25:43+0000",
#世界协调时间+8个小时,就等于广告账户时间。
**获取广告账户指定时间的所有编辑记录**
fields=["extra_data","actor_name","date_time_in_timezone","event_type","object_id","event_time"]
params={'since': '2022-10-28', 'until' :'2022-10-29',"limit":"10000"}
res=AdAccount(account_id).get_activities(fields=fields,params=params)
#limit参数目前可以设置到10000,就是每次返回的数据量
fields=["campaign_name","adset_name","conversions","spend"]
params = {'time_range': {'since': '2022-10-01', 'until': '2022-10-05'},"level":"campaign","limit":"10","filtering":[{"Lifetime spent (campaign)":">100"}]}
#breakdown 官方文档https://developers.facebook.com/docs/marketing-api/insights/breakdowns/
#{'since': '2022-01-01', 'until': '2022-12-05'}
#响应数据如下,这里没有世界协调时间
#"date_start": "2022-01-01",
#"date_stop": "2022-12-05",
res=AdAccount(account_id).get_insights(params=params,fields=fields)
print("长度",len(res))
print(res)
**按照特定筛选条件筛选广告系列出来**
#设置给limit设置:10000,基本获取完了广告账户里所有的系列;接受int类型
fields=["campaign_id","adset_id","ad_id","adset_name","ad_name","campaign_name","conversions","spend","campaign_id","cost_per_inline_link_click"]
params = {
'time_range': {'since': '2023-08-01', 'until': '2023-08-01'},
"level":"campaign",
"limit":5, #响应的数量,默认可以设置为10000最大
"filtering": [
{
"field": "spend",
"operator": "GREATER_THAN",
"value": 0
}
]
}
#https://developers.facebook.com/docs/marketing-api/reference/ad-account/insights/
#level 要获取的层级,有三个层级:{ad, adset, campaign, account}
#breakdown 官方文档https://developers.facebook.com/docs/marketing-api/insights/breakdowns/
#{'since': '2022-01-01', 'until': '2022-12-05'}
#这个时间就是广告账户的实际时间
res=AdAccount(account_id).get_insights(params=params,fields=fields)
print("长度",len(res))
print(res)
**获取单独的广告组层级编辑日志**
fields=["extra_data","actor_name","date_time_in_timezone","event_type"]
params={'since': '2020-06-01', 'until' :'2023-08-15'}
res=AdSet(adset_id).get_activities(params=params,fields=fields)
print(res)
**获取层级复制后的对象**
fields=["source_campaign_id"]
Campaign(campaign_id).get_copies(fields=fields)
Adset(adset_id).get_copies(fields=fields)
Ad(ad_id).get_copies(fields=fields)
**获取指定广告的数据以及互动情况**
fields=["actions"]
params={"time_range":{'since':"2023-06-29",'until':"2023-08-24"}}
print(Ad(ad_id).get_insights(fields=fields,params=params))
5)获取主页帖子的相关信息。产品标题、着陆页链接等
PAGEPOST-ID?fields=object_story_spec
响应:
{
"object_story_spec": {
"page_id": "[]()",
"instagram_actor_id": "[]()",
"video_data": {
"video_id": "[]()",
"title": "title",
"message": "帖子文案",
"link_description": "",
"call_to_action": {
"type": "SHOP_NOW",
"value": {
"link": "[)"
}
},
"image_url": "[)",
"image_hash": ""
}
},
"id": "[]()"
}
7)如何获取60天的access token
curl -i -X GET "https://graph.facebook.com/oauth/access_token?grant_type=fb_exchange_token&
client_id=APP-ID&
client_secret=APP-SECRET&
fb_exchange_token=SHORT-LIVED-USER-ACCESS-TOKEN"
#卸载应用、重装应用 或者更改facebook账号密码,这个token就会失效
8)如何获取60天的主页访问access token?
curl -i -X GET "https://graph.facebook.com/PAGE-ID?
fields=access_token&
access_token=USER-ACCESS-TOKEN"
9)如何读取广告post的评论?要使用主页token才能访问
PagePost("主页id_post id").get_comments()
#能获取所有评论、附件、分享的总数
#需要获取每条post的信息,同步到每条上,然后根据这些做个统计
10)获取广告账户下所有的post信息(post id、图片链接、广告文案等)
**
根据实际需求直接获取effective_object_story_id,instagram_permalink_url,effective_instagram_media_id就行,方便获取其帖子评论数、分享数。缺点这条api无法获取对应的ad id回来,不知道这个post应该归属到哪个
fields=["effective_object_story_id","instagram_permalink_url","effective_instagram_media_id"]
**
fields=["id","effective_object_story_id","title","instagram_permalink_url","object_story_spec"]
params = {
'limit': 10
}
res=AdAccount(account_id).get_ad_creatives(fields=fields,params=params)
响应:
<AdCreative> {
"effective_object_story_id": "主页id_postid",
"id": "创意id", #每个创意都有其独一无二的id,即使完完整整复制出来也是
"instagram_permalink_url": "xxx",
"object_story_spec": {
"instagram_actor_id": "xxx",
"link_data": {
"call_to_action": {
"type": "SHOP_NOW"
},
"child_attachments": [ #有这个字段的说明是幻灯片
{
"call_to_action": {
"type": "SHOP_NOW"
},
"image_hash": "xxx",
"link": "xxx", #产品着陆页
"name": "Free Shipping over $49"
},
{
"call_to_action": {
"type": "SHOP_NOW"
},
"image_hash": "xxx",
"link": "xxx",
"name": "Free Shipping over $49",
"picture": "xxx", #提示过期
"video_id": "xxx"
},
{
"call_to_action": {
"type": "SHOP_NOW"
},
"image_hash": "xxx",
"link": "xxx",
"name": "Free Shipping over $49"
},
{
"call_to_action": {
"type": "SHOP_NOW"
},
"image_hash": "xxx",
"link": "xxx",
"name": "Free Shipping over $49",
"picture": "xxx",
"video_id": "xxx"
}
],
"description": "xxx",
"link": "xxx",
"message": "xxx",
"multi_share_end_card": true,
"multi_share_optimized": true
},
"page_id": "xxx"
},
"title": "xxx" 主页名字
}]
11)已知所有广告id,所以通过以下方法遍历ids,得到["effective_object_story_id","instagram_permalink_url","effective_instagram_media_id"]字段值,然后写入到对应广告中,然后再通过以下方法获取每个post在facebook、ins的点赞数、评论数、分享数以及评论的所有值。当在操作视图中点击广告层级,就展现其帖子在两个渠道的互动情况。并且设置一个“互动情况视图”,以时间轴的形式,展示每3小时的评论、转化、点赞数。以折线图的形式展示,并且在图表上点击每天的评论数的时候,会有个区域展示其详细的评论内容,并且评论内容下方有对应的广告id点击直接复制。
难点:每次获取的评论数需要做去重处理,也就是只获取每天新增的评论数、评论值、分享数、点赞数。
fields=["effective_object_story_id","instagram_permalink_url","effective_instagram_media_id"]
adid_list=["ad1","ad2","ad3"]
for ad_id in adid_list:
res=Ad(ad_id).get_ad_creatives(fields=fields)
print(len(res))
print(res)
响应:
[<AdCreative> {
"effective_instagram_media_id": "xxx", #获取ins互动情况的id
"effective_object_story_id": "page id_post id",
"id": "creative id",
"instagram_permalink_url": "xxx"
}]
12)按照筛选日期,筛选出所有评论值,并对每条评论进行分析(用自然语言分析工具分析,GPT?)看看他们更关注的是那方面内容,最终做成“词云”,并显示在“互动情况视图”内
词云
13)应用请求速率
1)请求627条国家维度insight1次,加上41次action请求,一共42次,共耗用66%的usage,暂停3分钟,额度恢复到90%,大概3分钟恢复50%的额度。
目前请求每条广告160之后(83条spend大于0的广告,最快用时60s左右)请求额度恰好到了100%,然后5分钟之后开始重置,重置大概30s。所以最好是弄多几个用户token,并且可以计算剩余的请求额度,快用完额度的时候,切换到其他的用户进行请求,依此类推。
具体access_token调用
注意要有pages_manage_engagement
https://stackoverflow.com/questions/19517086/facebook-post-insights-metrics-download-via-api?rq=4
https://stackoverflow.com/questions/50108314/how-to-get-insights-metrics-facebook-graph-api
根据这个获取post的所有洞察信息(like comment shares)
https://developers.facebook.com/docs/graph-api/reference/v18.0/insights
主页id_post id?fields=insights.metric(post_activity_by_action_type,post_clicks_by_type,post_engaged_fan,post_negative_feedback_by_type_unique,post_impressions_unique,post_impressions,post_reactions_by_type_total,post_impressions_fan_unique,post_impressions_fan_paid_unique,post_impressions_organic_unique,post_impressions_viral_unique){values,title}
post_activity_by_action_type能获取评论数、分享数、喜欢
获取主页下所有post的洞察
主页id/posts?fields=insights.metric(post_consumptions_unique,post_negative_feedback_unique,post_engaged_users,post_impressions_unique,post_impressions,post_reactions_by_type_total,post_impressions_fan_unique,post_impressions_fan_paid_unique,post_impressions_organic_unique,post_impressions_viral_unique){values,title}
from facebook_business.adobjects.pagepost import PagePost
page_access_token=""
FacebookAdsApi.init(access_token=page_access_token)
#如何用python post id去获取帖子丰富的信息(facebook)获取主页帖子上的信息,需要用主页的访问token,在第8)点提到如何获取page access token
params={
"metric":[
"post_activity_by_action_type",
"post_clicks_by_type",
"post_impressions_unique",
"post_impressions"]
}
ss=PagePost("pageid_postid").get_insights(params=params)
print(ss)
响应:
[<InsightsResult> {
"description": "Lifetime: The number of stories created about your Page post, by action type. (Total Count)",
"id": "pageid_postid/insights/post_activity_by_action_type/lifetime",
"name": "post_activity_by_action_type",
"period": "lifetime",
"title": "Lifetime Post Stories by action type",
"values": [
{
"value": {
"comment": 7,
"like": 70,
"share": 2
}
}
]
}, <InsightsResult> {
"description": "Lifetime: The number of clicks anywhere in the post on News Feed from users that matched the audience targeting on the post, by type. (Total Count)",
"id": "pageid_postid/insights/post_clicks_by_type/lifetime",
"name": "post_clicks_by_type",
"period": "lifetime",
"title": "Lifetime Matched Audience Targeting Consumptions by Type",
"values": [
{
"value": {
"link clicks": 13844,
"other clicks": 9708,
"video play": 2410
}
}
]
}, <InsightsResult> {
"description": "Lifetime: The number of people who had your Page's post enter their screen. Posts include statuses, photos, links, videos and more. (Unique Users)",
"id": "pageid_postid/insights/post_impressions_unique/lifetime",
"name": "post_impressions_unique",
"period": "lifetime",
"title": "Lifetime Post Total Reach",
"values": [
{
"value": 655125
}
]
}, <InsightsResult> {
"description": "Lifetime: The number of times your Page's post entered a person's screen. Posts include statuses, photos, links, videos, Reels and more. (Total Count)",
"id": "pageid_postid/insights/post_impressions/lifetime",
"name": "post_impressions",
"period": "lifetime",
"title": "Lifetime Post Total Impressions",
"values": [
{
"value": 1206517
}
]
}]
#如何单独获取指定系列、组的所有数据?适用于特别监测功能
跟获取整个账户数据差不多的逻辑,主要变动是在filtering那里,如果获取某个组的数据就adset.id
fields = [
"campaign_id",
"adset_id",
"adset_name",
"ad_id",
"ad_name",
"spend",
"cpc",
"ctr",
"cost_per_unique_click",
"cost_per_inline_link_click",
"inline_link_click_ctr",
"inline_link_clicks",
"cpm",
"reach",
"frequency",
"impressions",
"buying_type",
"purchase_roas"
]
params = {
'level': 'ad',
'time_range': {'since': since, 'until': until}, # 指定时间范围
'fields': fields,
'filtering': [{"field": "campaign.id","operator": "EQUAL","value": "23851411457790068"}],
'breakdowns': [metric] # age country publisher_platform
}
insights_Cursor = AdAccount(account_id).get_insights(params=params)
print("insights_Cursor",insights_Cursor)
适用的分别有以下几种流量类型
ads_insights、ads_management、custom_audience、instagram、leadgen、messenger 或 pages
在解决cursor的问题之前触犯的都是#80000的错误代码,但后面触犯的都是#80004的类型,
获取广告预算或者复制项似乎都是属于ads_management,因为每次返回的报错类型都是#80004
需要计算每次请求后的usage的使用量
ads_insights、ads_management两个api请求的限制程度不同,后者最为严格
获取的键有type 值是ads_management
获取的键有call_count 值是100 #超过100就会节流,并且是在1个小时后才能reset。
获取的键有total_cputime 值是8
获取的键有total_time 值是9
获取的键有estimated_time_to_regain_access 值是0
获取的键有ads_api_access_tier 值是development_access
获取广告编辑记录占用的也是ads_management的资源
image.png
{"run_status":{"old_value":1,"new_value":15},"old_value":"Active","new_value":"Inactive","rule_info":{"rule_id":"23851414101130068","rule_latest_name":"1026\u5e7f\u544a\u7cfb\u5217\u52a0\u8d2d","rule_status":"DELETED","evaluation_spec_id":"23851414101110068","execution_spec_id":"23851277479300068","rule_name":"1026\u5e7f\u544a\u7cfb\u5217\u52a0\u8d2d"},"type":"run_status"}
如果这个结果里面没有系列、广告组、广告的id,那么object就是被编辑对象的id
要对历史编辑进行清洗
通过以下方法可以获取剩余请求量,取ads_insights、ads_management的call_count、total_cputime、total_time最大值,如果有值大于75的,则程序可以需要停留60分钟。
ads_insights的限制比较小,每小时可以大约可以获取过去4天的数据d,2个广告账户(约12次4天2)。后面预算可以从编辑日志那里补充
##返回的对象AdAccount(self.account_id)有header属性,但Campaign()返回的是没有,所以直接将
appheader设置为“”,在checkAppUsage用另外的请求获取剩余请求量
insights_Cursor = AdAccount(self.account_id).get_insights(params=params)
appheader=dict(insights_Cursor.headers())
def checkAppUsage(self,appheader):
#data_type 如果是主数据、action数据都可以直接获取header,但campaign对象没有header属 性,所以需要用另外方式去获取
if appheader: #如果没有appheader说明是campaign这种对象
print("appheader",appheader)
usage_dict=json.loads(appheader["x-business-use-case-usage"])
# print("usage_dict",type(usage_dict),usage_dict)
for account_id,values_list in usage_dict.items():
print("======账号id {} 的请求信息如下".format(account_id))
for usage_inner_dict in values_list:
for key,value in usage_inner_dict.items():
print("获取的键有{} 值是{}".format(key,value))
print("======信息结束")
else:
self.allcount = self.allcount + 1
params = {
"limit":1,
'time_range': {'since': self.since, 'until': self.until}
}# 指定时间范围
res=AdAccount(self.account_id).get_campaigns(params=params)
print("res",res)
print("类型", type(res.headers()))
appheader = dict(res.headers())
# 获取请求量信息
self.checkAppUsage(appheader)
响应:
获取的键有type 值是ads_insights
获取的键有call_count 值是1
获取的键有total_cputime 值是2
获取的键有total_time 值是3
获取的键有estimated_time_to_regain_access 值是0
获取的键有ads_api_access_tier 值是development_access
获取的键有type 值是ads_management
获取的键有call_count 值是32
获取的键有total_cputime 值是3
获取的键有total_time 值是4
获取的键有estimated_time_to_regain_access 值是0
获取的键有ads_api_access_tier 值是development_access
广告历史清洗
重要:每次获取的广告日志历史,都要针对7个字段计算一个hash值(id,object_id,actor_name,extra_data,event_type,event_time,
date_time_in_timezone),也就是生成一个指纹,然后对比库内数据,去重。如果是抓取每日的历史数据,都不会存在日志重复的概率,因为每日时间不同。但如果是分时去抓取,那肯定就是会存在重复的历史记录,所以需要md5去重。
原有方式漏洞:以往是根据每3个小时获取一次最新的预算值,但预算值在3个小时内可以变化无数次,不固定。虽说能获取最新的预算值,但并不知道这个预算值在3个小时内的变化情况,显然这不是我们想要的。所以直接提取日志里的预算变化是最正确的
直接遍历每个key,value,然后匹配到对应的对象上就行;因为编辑历史包含了广告生涯的所有信息,所以是否是系列预算、组预算等都可以匹配到,不需要单独再通过api请求
image.png
如何从广告日志获取预算信息;逻辑展示
def matchLog(self,id,getbudgetBoolean,metricType):
'''
:param id: 这个ID可以为系列id、广告组id、广告id
:param getbudgetBoolean: 这个是布尔值 意思是否要获取预算,真或假预示不同的处理逻辑
:param metricType:代表层级类型 campaign、adset、ad。这里要跟id是同等级对应
:return:
#前提条件:
1)所有日志都以去重的方式插入到数据库。如hash_id object_id actor_name date_time_in_timezone event_time event_type extra_data
2)前端已经根据筛选条件给出要关联日志的系列id、广告组、广告id
'''
if getbudgetBoolean: #说明单纯是为了找到系列预算、组预算
#获取其id所有的日志记录
if metricType == "campaign":
#筛选id,event_type筛选 create_campaign_group,如果返回的对象存在new_value这个key,那么说明这个系列是系列预算
#然后可以将这个对应关系直接写入原表或者设置个长期存储区,储存这个标记,知道一次预算类型,下次就不用判断
#然后同样的id,event_type筛选 改为update_campaign_budget,然后提取extra_data的new_value值,然后按照时间由近到远排序,返回前端即可
else: #说明获取是获取组预算。还有这个id传入如果系列id已经为系列预算,那么旗下的组id就不能传入到这里,可以提前做识别,减少查询的消耗
#通过上一步判断,已知道传入的组id肯定是组预算类型
#判断组id的event_type是否有update_ad_set_budget这个值,如果没有就获取create_ad_set的值,然后同样提取extra_data的new_value值,然后按照时间由近到远排序,返回前端即可(因为有种情况就是广告从创建到现在都没有update_ad_set_budget的操作)
#注意预算,在图表筛选器那里可以输入的,是用折线表示
pass
else: #那么则是匹配所有日志记录
#按照id以及metricType获取所有的日志记录就行了,因为入库之前已经做了去重
#这个日志记录就是图表上y轴的虚线。按照actor_name,event_type,date_time_in_timezone,extra_data。第一版先直接展示extra_data,第二版再清洗一下extra_data就行
pass
受众构建示例代码展示
if __name__ =="__main__":
interestseed_list_before=["Sports", "Running","Healthy Habits"] #这里的种子列表,就是requestAvailableBoolean为True 的兴趣词+手动添加的兴趣词
interestseed_list_after=[word for i in interestseed_list_before for word in i.split(" ")]
store_list = [] # 设置一个临时存放id的列表
getdata.createInterestsLibrary(interestseed_list_after)
def createInterestsLibrary(self,interestseed_list_after):
'''
程序任务:每天需要新增1000个兴趣词,管理员可以手动设置所需;根据{"call_count":0,"total_cputime":0,"total_time":0}来判断停歇时间(目前请求测试发现不会占用)
自动创建任务;种子兴趣词的来源则是requestAvailableBoolean为True的兴趣词+手动添加的兴趣词;如果全部兴趣词的requestAvailableBoolean为False,则需要展示在任务结果(失败?失败理由是)
最好在后端展示任务完成结果
'''
self.nwb = openpyxl.load_workbook(self.desktop_path + "受众表.xlsx")
self.nws = self.nwb.active
limit=200
inserttime=datetime.datetime.now().date()
for interest in interestseed_list_after:
print("现在请求的兴趣词是 {}".format(interest))
#构建数据包
params={
"type":"adinterest",
"q":interest,
"limit":limit,
"access_token":self.access_token #到时候会另外提供一个access_token,所以后面也要有个地方是填这个的
}
try:
interestResponse=requests.get("https://graph.facebook.com/v18.0/search",params=params)
print("头部",interestResponse.headers["x-app-usage"]) #{"call_count":0,"total_cputime":0,"total_time":0}目前测试并没有使用过
interestResponse_dict = json.loads(interestResponse.text)
interests_list = interestResponse_dict["data"]
print("兴趣词",interest,"结果",len(interests_list),limit,len(interests_list) == limit)
requestAvailableBoolean = len(interests_list) == limit # True代表下次还可以请求
if requestAvailableBoolean:
pass
#如果能在name找到对应的值,则对种子兴趣词的requestAvailableBoolean设置为True,因为有些split是后期split的
else:
pass
#requestAvailableBoolean设置为False 那么下次就不会请求这个
except Exception as e:
print("发生ConnectionError,忽略,继续下一个", e)
continue
for interest_dict in interests_list: # 先将已经获取的添加进表格
interestId = interest_dict["id"]
if interestId not in store_list: # 用兴趣词id作为判断条件就行 不需要另外生成md5
store_list.append(interestId)
self.nws.append((inserttime, interestId, interest_dict.setdefault("topic", "其他"),
interest_dict.setdefault("disambiguation_category", "其他"),
interest_dict["name"], interest_dict["audience_size_lower_bound"],
interest_dict["audience_size_upper_bound"], str(interest_dict["path"]),
"True")) #返回来的结果一律设置为True
self.nwb.save(self.desktop_path + "受众表.xlsx") # split之前先保存
self.nwb.close()
def success_callback(self,response):
try:
requestLink=response._call["relative_url"] #23851301777270068/adcreatives?ad_id=23851301777270068&fields=effective_object_story_id%2Cinstagram_permalink_url%2Ceffective_instagram_media_id
#23851301777270068 这个就是广告id,然后设置字典就行
for item in response.headers():# response.headers()返回的是列表
if item['name'] == 'X-Business-Use-Case-Usage':
value = json.loads(item['value'])
print("请求量信息 {},现在请求的连接 {} ".format(value,requestLink))
pair = [response.json()['data']]
self.batch_body_responses.extend(pair)
except IndexError:
print("发生 IndexError")
except UnicodeEncodeError:
print("发生 UnicodeEncodeError")
def error_callback(self,response):
pass
def generate_batches(self,iterable, batch_size_limit):
# This function can be found in examples/batch_utils.py
batch = []
for item in iterable:
if len(batch) == batch_size_limit:
yield batch
batch = []
batch.append(item)
if len(batch):
yield batch
def getBatchData(self,api,requestData_dict):
#requestData_dict {"requestIds_list":requestIds_list,"fields_list":fields_list,"params_dict":params_dict,"endpoint":"/copies"}
batches = []
batch_limit = 50
for batch in self.generate_batches(requestData_dict["requestIds_list"], batch_limit):
print("batch",batch)
next_batch = api.new_batch()
for requestid in batch:
requestss = [FacebookRequest(node_id=requestid, method="GET", endpoint=requestData_dict["endpoint"]).add_fields(requestData_dict["fields_list"]).add_params(requestData_dict["params_dict"])]
for req in requestss:
next_batch.add_request(req, self.success_callback, self.error_callback)
batches.append(next_batch)
for batch_request in batches:
batch_request.execute()
time.sleep(2)
print("batch_body_responses",self.batch_body_responses)
if __name__ =="__main__":
api=FacebookAdsApi.init(access_token=getdata.access_token)
# ================利用批量方法请求 主数据===============开始
for metric in ["", "age", "country", "publisher_platform"]:
print("现在请求的维度是 {}".format(metric))
fields_list = ["campaign_id", "adset_id", "adset_name", "ad_id", "ad_name", "spend", "cpc", "ctr", "clicks",
"unique_clicks", "cost_per_unique_click", "cost_per_inline_link_click",
"inline_link_click_ctr", "inline_link_clicks", "cpm", "reach", "frequency", "impressions",
"buying_type", "purchase_roas"]
requestIds_list = [getdata.account_id] # 若请求主数据,那么id就是广告账户id
params_dict= {
"limit":300, #设置一个安全稳定的值
'level': 'ad',
'time_range': {'since': getdata.since, 'until': getdata.until}, # 指定时间范围
'fields': fields_list,
'filtering': [
{
"field": "spend",
"operator": "GREATER_THAN",
"value": 0
}
],
'breakdowns': [metric], #age country publisher_platform
}
requestData_dict={"requestIds_list":requestIds_list,"fields_list":fields_list,"params_dict":params_dict,"endpoint":"/insights"}
print("requestIds_list长度",len(requestIds_list))
getdata.getBatchData(api,requestData_dict)
# ================利用批量方法请求 主数据===============结束
#================利用批量方法请求 creative===============开始
requestIds_list = ["id1","id2"] # 存放要请求的id
params_dict={}
fields_list=["effective_object_story_id","instagram_permalink_url","effective_instagram_media_id"]
requestData_dict={"requestIds_list":requestIds_list,"fields_list":fields_list,"params_dict":params_dict,"endpoint":"/adcreatives"}
print("requestIds_list长度",len(requestIds_list))
getdata.getBatchData(api,requestData_dict)
# ================利用批量方法请求 creative===============结束
# ================清洗creative 提取post_id===============开始
print("清洗creative 提取post_id")
api=FacebookAdsApi.init(access_token=getdata.page_access_token)
#['105106692296682_110891305051554', '105106692296682_126300916843926', '105106692296682_131739622966722']
effective_object_story_ids_list = [item[0]['effective_object_story_id'] for item in getdata.batch_body_responses]
print("effective_object_story_ids_list",effective_object_story_ids_list)
# effective_object_story_ids_list=['105106692296682_126300916843926', '105106692296682_126300916843926', '105106692296682_131739622966722']
fields_list=[]
params_dict = {
"metric":[
"post_activity_by_action_type",
"post_negative_feedback_unique",
"post_clicks_by_type",
"post_impressions_unique",
"post_impressions",
"post_reactions_by_type_total"]
}
requestData_dict = {
"requestIds_list": effective_object_story_ids_list,
"fields_list": fields_list,
"params_dict": params_dict,
"endpoint": "/insights",
"batch_limit": 50
}
getdata.getBatchData(api,requestData_dict)
# ================清洗creative 提取post_id===============结束
# ================利用批量方法请求 copies===============开始
requestIds_list = ["id1","id2"] # 存放要请求的id
params_dict = {}
fields_list = []
requestData_dict = {"requestIds_list": requestIds_list, "fields_list": fields_list,
"params_dict": params_dict, "endpoint": "/copies"}
print("requestIds_list长度", len(requestIds_list))
getdata.getBatchData(api, requestData_dict)
# ================利用批量方法请求 copies===============结束
# ================利用批量方法请求 actions===============开始
requestIds_list = ["id1","id2"] # 存放要请求的id
fields_list = ["ad_id", "actions", "spend"]
#requestIds_list不会变,breakdowns走遍历就行了,日期也是一样,我这里直接懒得完全写出来
params_dict = {'time_range': {'since': "2022-10-28", 'until': "2022-10-28"},'breakdowns': "age"}
#将这些打包成一个字典数据包
requestData_dict = {"requestIds_list": requestIds_list,
"fields_list": fields_list,
"params_dict": params_dict,
"endpoint": "/insights"}
print("requestIds_list长度", len(requestIds_list))
getdata.getBatchData(api, requestData_dict)
# ================利用批量方法请求 actions===============结束
比如我想批量获取action数据,请求头信息如下:
image.png兴趣词提取规则
import re
ss=["22 - 54_US,IT,GB,AU,DE_All_Sports,Running,Academy Sports + Outdoors,Women's clothing_+7","20 - 49_US,IT,CA,GB,AU,FR,DE_Women_PoleFreaks - Pole Dance & Fitness Community,Skin-tight garment_",
"18 - 54_US,IT,CA,GB,AU,FR,DE_All_Personal care,Yoga,Muscle & Fitness,Sportswear (fashion)_+2","20 - 54_US,CA,UM,AU_All_Leggings,Sports,Bodybuilding,Physical exercise_+5"
]
pattern = r'\d+\s*-\s*\d+'
for adsetName in ss:
adsetName_list=adsetName.split("_")
print("adsetName_list",adsetName_list)
for single_str in adsetName_list:
if re.findall(pattern, single_str):
print("是年龄 跳过")
elif single_str.isupper():
print("是国家")
elif single_str.lower() in ["women","men","all","*"]:
print("是性别")
elif single_str[0].isupper() and single_str[1].islower():
print("是兴趣词")
for single_ins in single_str.split(","):
print("每个兴趣词是",single_ins)
break
else:
print("无法识别兴趣词组,跳过")
特别监测功能如何特定请求特定对象编辑日志
fields=["extra_data","actor_name","date_time_in_timezone","event_type","object_id"]
params={
"limit": "500",
"fields":fields,
'since': self.since, 'until' :until,
"extra_oids":["系列id1","系列id2","广告组id1","广告id1"]
}
# extra_oids请求参数 输入要查询层级对象的id列表,可以将不同的层级的id放进来请求,fb会一次性返回。此方法适用于特别监测功能
#将所有要特别监测的系列(旗下所有儿子 孙子id)+组预算的广告组(旗下儿子id),所有都丢进列表,响应去重插入数据库
#优势:大大减少多余请求,节约api请求资源。
history=AdAccount(self.account_id).get_activities(params=params)
print("请求头",history.headers()["x-business-use-case-usage"])
网友评论