推特API总结
Twitter 是美国一个微博客和社交网络服务平台。它可以让用户更新不超过280个字符的消息(中文、日文和韩文为140个),这些消息也被称作“推文(Tweet)”。这个服务是由杰克·多西在2006年3月创办并在当年7月启动的。Twitter于美国访问量排名35。[[19]](https://zh.wikipedia.org/wiki/Twitter#cite_note-19)据Twitter现任[首席执行官](https://zh.wikipedia.org/wiki/行政總裁)[迪克·科斯特洛](https://zh.wikipedia.org/w/index.php?title=迪克·科斯特洛&action=edit&redlink=1)宣布,截至2018年3月,Twitter共有3.36亿[活跃用户](https://zh.wikipedia.org/wiki/活躍用戶)[[20]](https://zh.wikipedia.org/wiki/Twitter#cite_note-Twitter_Turns_Six-20),这些用户每天会发表约3.4亿条推文[[20]](https://zh.wikipedia.org/wiki/Twitter#cite_note-Twitter_Turns_Six-20)。同时,Twitter每天还会处理约16亿的网络搜索请求[[21]](https://zh.wikipedia.org/wiki/Twitter#cite_note-21)[22]。公司总部设立在美国旧金山,其部分办公室及服务器位于纽约。
获取推特消息有一些方式,下面分别简单地总结一下。
1. 通过程序模拟浏览器的行为进行爬虫
首先是要有推特账号,然后订阅一下自己关心的人,在个人的主页上,打开浏览器的检查元素,刷新一下页面,然后选择网络
->Fetch/XHR
,可以找到一条请求,名为home_latest.json?include_profile……
,可以看一下其响应结果,其中globalObjects
里的tweets
内容就是我们主页上显示的关注的人的推文,例如下面这条:
{
"created_at": "Tue Sep 21 16:20:33 +0000 2021",
"id_str": "1440350243355201541",
"full_text": "Our AGLD-USD order book is now in full-trading mode. Limit, market and stop orders are all now available.",
"display_text_range": [
0,
105
],
"entities": {},
"source": "<a href=\"https://sproutsocial.com\" rel=\"nofollow\">Sprout Social</a>",
"user_id_str": "720487892670410753",
"retweet_count": 45,
"favorite_count": 360,
"reply_count": 165,
"quote_count": 9,
"conversation_id_str": "1440350243355201541",
"lang": "en",
"ext": {
"superFollowMetadata": {
"r": {
"ok": {}
},
"ttl": -1
}
}
}
那么我们就可以据此来进行爬取了,只要在浏览器里右键点击复制
->以cURL复制
,然后就可以直接导入到postman
里去进行测试了。
不过这种方式非常不理想,在此只是顺带一提。
2. 通过推特的开发者API v1.1进行获取
有一个python
的三方库叫作tweepy
,我们使用这个库。不过这种通过推特API的方式获取推特消息是要有推特开发者账号的,这个很难申请。
安装tweedy
:
pip3 install tweepy
这个Twitter API v1.1 获取推文主要是通过API.user_timeline
这个接口来获取的,不过它是有频率限制的,每15分钟只能请求900次,也就是1秒一次,其实获取一个用户的推文也够用了,不过按照我测试的结果,有些消息延迟挺高的,好几十秒,所以这个方式我感觉也并非很理想。下面贴一段这种方式的代码:
import tweepy
from typing import List
from operator import itemgetter
from tweepy.models import Status
from asyncioquant.entrance import *
class FetchTwitter:
def __init__(self):
consumer_key = Config.twitter.get("consumer_key")
consumer_secret = Config.twitter.get("consumer_secret")
access_token = Config.twitter.get("access_token")
access_token_secret = Config.twitter.get("access_token_secret")
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
self.api = tweepy.API(auth=auth, wait_on_rate_limit=True)
self.tweet_id = None # 查推文时指定大于这个id
self.get_information_when_start()
Quant.create_loop_task(10, self.get_latest_one_status)
Quant.create_loop_task(60, self.get_rate_limit)
self.last_text = ""
def get_information_when_start(self):
"""启动时获取当前推特用户的名称及时间线的限速情况"""
me = self.api.verify_credentials().screen_name
rate_limit = self.api.rate_limit_status()["resources"]["statuses"]["/statuses/user_timeline"]
Logger.info(f"Fetch Twitter Program Start, my twitter name is {me}", caller=self)
Logger.info(f"rate limit is {rate_limit}", caller=self)
# 取到最大的推文id
user_id = Config.twitter.get("user_id")
screen_name = Config.twitter.get("screen_name")
# 返回的是一个Status对象的列表
status: List[Status] = self.api.user_timeline(user_id=user_id, screen_name=screen_name, trim_user=True)
id_list = []
for s in status:
tweet_id = s.id
utc_created_at = str(s.created_at).replace("+00:00", "")
ts = Tools.utctime_str_to_ts(utctime_str=utc_created_at, fmt="%Y-%m-%d %H:%M:%S")
id_list.append({"tweet_id": tweet_id, "ts": ts})
id_list = sorted(id_list, key=itemgetter("ts"), reverse=True)
Logger.info("id_list:", id_list, caller=self)
newest = id_list[0]
self.tweet_id = newest["tweet_id"]
Logger.info(f"self.tweet_id:{self.tweet_id}", caller=self)
async def get_rate_limit(self):
"""定时获取限速情况"""
rate_limit = self.api.rate_limit_status()["resources"]["statuses"]["/statuses/user_timeline"]
if rate_limit["remaining"] == 0:
await DingTalk.text(
token="",
text=f"推特限频通知:{rate_limit}"
)
Logger.warn(f"Rate Limit: {rate_limit}", caller=self)
@async_method_locker(name="get_latest_one_status.lock", wait=False)
async def get_latest_one_status(self):
"""获取指定推特用户最新的一条状态"""
user_id = Config.twitter.get("user_id")
screen_name = Config.twitter.get("screen_name")
# 返回的是一个Status对象的列表
status: List[Status] = self.api.user_timeline(
user_id=user_id, screen_name=screen_name, count=1, trim_user=True, since_id=self.tweet_id
)
if len(status) > 0:
s: Status = status[0]
else:
return
utc_created_at = str(s.created_at).replace("+00:00", "")
datetime = Tools.ts_to_datetime_str(Tools.utctime_str_to_ts(utctime_str=utc_created_at, fmt="%Y-%m-%d %H:%M:%S"))
text = s.text
is_reply = True if s.in_reply_to_status_id else False
tweet_id = s.id
if text != self.last_text:
self.last_text = text
self.tweet_id = tweet_id
Logger.info(f"datetime: {datetime}, is_reply: {is_reply}, text: {text}", caller=self)
if __name__ == '__main__':
Quant.start("config.json", FetchTwitter)
3. Twitter API v2
这个是之前1.1的升级版,没有实际测试过,两者都属于restapi
,想来差异不大。
4. tweepy.Stream
这个是流式api,用起来也很简单:
import tweepy
from logger import Logger
Logger.init_logger(
level="info",
console=False
)
consumer_key = ""
consumer_secret = ""
access_token = ""
access_token_secret = ""
# Subclass Stream to print IDs of Tweets received
class Stream(tweepy.Stream):
def on_status(self, status):
Logger.info(status.user.screen_name, status.id, status.created_at, status.text)
# Initialize instance of the subclass
stream = Stream(
consumer_key, consumer_secret,
access_token, access_token_secret
)
# Filter realtime Tweets by follow
# [HobaBot]
stream.filter(follow=["1048905259"])
4. tweepy.asynchronous.AsyncStream
这个是异步的stream api
,下面是我写的main.py
文件的代码:
import signal
import asyncio
from utils.logger import Logger
from utils.tools import Tools
from utils.mysql import Mysql
from utils.translate import Translate
from tweepy.asynchronous.streaming import AsyncStream
from tweepy import models
class Stream(AsyncStream):
def __init__(self, consumer_key, consumer_secret, access_token, access_token_secret):
super().__init__(consumer_key, consumer_secret, access_token, access_token_secret)
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.access_token = access_token
self.access_token_secret = access_token_secret
# Filter realtime Tweets by twitter user, [HobaBot]
self.follows = [1048905259] # 要订阅的用户的id
self.filter(follow=self.follows) # 把id列表传进这个过滤器里
def disconnect(self): # 断开连接
self.disconnect()
async def on_closed(self, resp): # 链接关闭回调
Logger.warn(f"AsyncStream closed:{resp}, trying to reconnect ...", caller=self)
self.__init__(self.consumer_key, self.consumer_secret, self.access_token, self.access_token_secret)
async def on_connect(self): # 链接成功回调
Logger.info("AsyncStream connected success ...", caller=self)
async def on_connection_error(self): # 链接错误回调
Logger.error("AsyncStream connection error ...", caller=self)
async def on_exception(self, exception): # 异常信息回调
Logger.exception(f"AsyncStream exception: {exception}", caller=self)
async def on_keep_alive(self): # 心跳回调
Logger.info("Keep alive signal received ...", caller=self)
async def on_status(self, status: models.Status): # status更新回调(推特的数据都有相应的模型,比如这里是Status模型)
# do some filter based on user's `id`(type of int)
user: models.User = status.user
if user.id not in self.follows:
return
# do some other filter based on timestamp
utc_created_at = str(status.created_at).replace("+00:00", "")
ts = Tools.utctime_str_to_ts(utctime_str=utc_created_at, fmt="%Y-%m-%d %H:%M:%S")
datetime = Tools.ts_to_datetime_str(ts)
if Tools.get_cur_timestamp() - ts >= 10:
return
# whether this tweet is a reply to other people
is_reply = True if status.in_reply_to_status_id else False
# log specific information we need
Logger.info("author:", user.screen_name, "datetime:", datetime, "content:", status.text, "is_reply:", is_reply, caller=self)
# insert tweet into database, but in this place we need to do some filter by tweet's unique id
success, error = await Mysql.fetchone("SELECT `id` "
"FROM `tweets` "
"WHERE `tweet_id` = '%s' "
"AND `is_translate` = 0;" % status.id)
if error or success:
return
success, error = await Mysql.query(
"INSERT INTO `tweets` "
"VALUES(NULL,'%s','%s','%s','%s','%s','%s');" % (user.screen_name, datetime, status.text, is_reply, status.id, 0))
if error:
Logger.error("insert tweet into tweets error:", error, caller=self)
return
Logger.info("insert tweet into tweets success...", caller=self)
# 翻译一下推文
translated_content, error = await Translate.translate(content=status.text)
if error:
Logger.error(f"translate tweets error:", error, caller=self)
return
# 存一下译文
success, error = await Mysql.query(
"INSERT INTO `tweets` "
"VALUES(NULL,'%s','%s','%s','%s','%s','%s');" % (user.screen_name, datetime, translated_content, is_reply, status.id, 1))
if error:
Logger.error("insert translated_content into tweets error:", error, caller=self)
return
if __name__ == '__main__':
async def connect_mysql():
"""Connect to mysql database when strategy start."""
success, error = await Mysql.initialize_mysql_connection_pool(
host="",
user="",
password="",
db=""
)
if error:
Logger.error("CONNECT MYSQL WHEN STRATEGY START ERROR, STOPPING STRATEGY !", caller=connect_mysql)
asyncio.get_event_loop().stop()
Logger.info("Connect mysql success !", caller=connect_mysql)
def start():
# initialize logger settings
Logger.init_logger(level="info", path="./logs", name="error.log", clear=False, backup_count=0, console=False)
# initialize mysql database connection pool
asyncio.get_event_loop().create_task(connect_mysql())
# start async stream
stream = Stream( # 初始化Stream类时,会将filter包装为future对象注册到事件循环中等到执行
consumer_key="",
consumer_secret="",
access_token="",
access_token_secret=""
)
def keyboard_interrupt(s, f):
"""disconnect stream and stop event loop when keyboard interrupt."""
print("KeyboardInterrupt (ID: {}) has been caught. Cleaning up...".format(s))
stream.disconnect()
asyncio.get_event_loop().stop()
signal.signal(signal.SIGINT, keyboard_interrupt)
# start asyncio EventLoop
Logger.info("start io loop ...", caller=start)
asyncio.get_event_loop().run_forever() # 启动asyncio事件循环
# start program
start()
网友评论