美文网首页
推特API总结

推特API总结

作者: GaryHertel | 来源:发表于2022-01-15 23:04 被阅读0次

    推特API总结


    Twitter 是美国一个微博客社交网络服务平台。它可以让用户更新不超过280个字符的消息(中文日文韩文为140个),这些消息也被称作“推文(Tweet)”。这个服务是由杰克·多西在2006年3月创办并在当年7月启动的。Twitter于美国访问量排名35。[[19]](https://zh.wikipedia.org/wiki/Twitter#cite_note-19)据Twitter现任[首席执行官](https://zh.wikipedia.org/wiki/行政總裁)[迪克·科斯特洛](https://zh.wikipedia.org/w/index.php?title=迪克·科斯特洛&action=edit&redlink=1)宣布,截至2018年3月,Twitter共有3.36亿[活跃用户](https://zh.wikipedia.org/wiki/活躍用戶)[[20]](https://zh.wikipedia.org/wiki/Twitter#cite_note-Twitter_Turns_Six-20),这些用户每天会发表约3.4亿条推文[[20]](https://zh.wikipedia.org/wiki/Twitter#cite_note-Twitter_Turns_Six-20)。同时,Twitter每天还会处理约16亿的网络搜索请求[[21]](https://zh.wikipedia.org/wiki/Twitter#cite_note-21)[22]。公司总部设立在美国旧金山,其部分办公室及服务器位于纽约

    获取推特消息有一些方式,下面分别简单地总结一下。

    1. 通过程序模拟浏览器的行为进行爬虫

    首先是要有推特账号,然后订阅一下自己关心的人,在个人的主页上,打开浏览器的检查元素,刷新一下页面,然后选择网络->Fetch/XHR,可以找到一条请求,名为home_latest.json?include_profile……,可以看一下其响应结果,其中globalObjects里的tweets内容就是我们主页上显示的关注的人的推文,例如下面这条:

    {
        "created_at": "Tue Sep 21 16:20:33 +0000 2021",
        "id_str": "1440350243355201541",
        "full_text": "Our AGLD-USD order book is now in full-trading mode. Limit, market and stop orders are all now available.",
        "display_text_range": [
            0,
            105
        ],
        "entities": {},
        "source": "<a href=\"https://sproutsocial.com\" rel=\"nofollow\">Sprout Social</a>",
        "user_id_str": "720487892670410753",
        "retweet_count": 45,
        "favorite_count": 360,
        "reply_count": 165,
        "quote_count": 9,
        "conversation_id_str": "1440350243355201541",
        "lang": "en",
        "ext": {
            "superFollowMetadata": {
                "r": {
                    "ok": {}
                },
                "ttl": -1
            }
        }
    }
    

    那么我们就可以据此来进行爬取了,只要在浏览器里右键点击复制->以cURL复制,然后就可以直接导入到postman里去进行测试了。

    不过这种方式非常不理想,在此只是顺带一提。

    2. 通过推特的开发者API v1.1进行获取

    有一个python的三方库叫作tweepy,我们使用这个库。不过这种通过推特API的方式获取推特消息是要有推特开发者账号的,这个很难申请。

    安装tweedy:

    pip3 install tweepy
    

    这个Twitter API v1.1 获取推文主要是通过API.user_timeline这个接口来获取的,不过它是有频率限制的,每15分钟只能请求900次,也就是1秒一次,其实获取一个用户的推文也够用了,不过按照我测试的结果,有些消息延迟挺高的,好几十秒,所以这个方式我感觉也并非很理想。下面贴一段这种方式的代码:

    import tweepy
    
    from typing import List
    from operator import itemgetter
    
    from tweepy.models import Status
    
    from asyncioquant.entrance import *
    
    
    class FetchTwitter:
    
        def __init__(self):
            consumer_key = Config.twitter.get("consumer_key")
            consumer_secret = Config.twitter.get("consumer_secret")
            access_token = Config.twitter.get("access_token")
            access_token_secret = Config.twitter.get("access_token_secret")
    
            auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
            auth.set_access_token(access_token, access_token_secret)
    
            self.api = tweepy.API(auth=auth, wait_on_rate_limit=True)
    
            self.tweet_id = None    # 查推文时指定大于这个id
            self.get_information_when_start()
            Quant.create_loop_task(10, self.get_latest_one_status)
            Quant.create_loop_task(60, self.get_rate_limit)
    
            self.last_text = ""
    
        def get_information_when_start(self):
            """启动时获取当前推特用户的名称及时间线的限速情况"""
            me = self.api.verify_credentials().screen_name
            rate_limit = self.api.rate_limit_status()["resources"]["statuses"]["/statuses/user_timeline"]
            Logger.info(f"Fetch Twitter Program Start, my twitter name is {me}", caller=self)
            Logger.info(f"rate limit is {rate_limit}", caller=self)
    
            # 取到最大的推文id
            user_id = Config.twitter.get("user_id")
            screen_name = Config.twitter.get("screen_name")
            # 返回的是一个Status对象的列表
            status: List[Status] = self.api.user_timeline(user_id=user_id, screen_name=screen_name, trim_user=True)
            id_list = []
            for s in status:
                tweet_id = s.id
                utc_created_at = str(s.created_at).replace("+00:00", "")
                ts = Tools.utctime_str_to_ts(utctime_str=utc_created_at, fmt="%Y-%m-%d %H:%M:%S")
                id_list.append({"tweet_id": tweet_id, "ts": ts})
            id_list = sorted(id_list, key=itemgetter("ts"), reverse=True)
            Logger.info("id_list:", id_list, caller=self)
            newest = id_list[0]
            self.tweet_id = newest["tweet_id"]
            Logger.info(f"self.tweet_id:{self.tweet_id}", caller=self)
    
        async def get_rate_limit(self):
            """定时获取限速情况"""
            rate_limit = self.api.rate_limit_status()["resources"]["statuses"]["/statuses/user_timeline"]
    
            if rate_limit["remaining"] == 0:
                await DingTalk.text(
                    token="",
                    text=f"推特限频通知:{rate_limit}"
                )
                Logger.warn(f"Rate Limit: {rate_limit}", caller=self)
    
        @async_method_locker(name="get_latest_one_status.lock", wait=False)
        async def get_latest_one_status(self):
            """获取指定推特用户最新的一条状态"""
            user_id = Config.twitter.get("user_id")
            screen_name = Config.twitter.get("screen_name")
            # 返回的是一个Status对象的列表
            status: List[Status] = self.api.user_timeline(
                user_id=user_id, screen_name=screen_name, count=1, trim_user=True, since_id=self.tweet_id
            )
    
            if len(status) > 0:
                s: Status = status[0]
            else:
                return
    
            utc_created_at = str(s.created_at).replace("+00:00", "")
            datetime = Tools.ts_to_datetime_str(Tools.utctime_str_to_ts(utctime_str=utc_created_at, fmt="%Y-%m-%d %H:%M:%S"))
            text = s.text
            is_reply = True if s.in_reply_to_status_id else False
            tweet_id = s.id
    
            if text != self.last_text:
                self.last_text = text
                self.tweet_id = tweet_id
                Logger.info(f"datetime: {datetime}, is_reply: {is_reply}, text: {text}", caller=self)
    
    
    if __name__ == '__main__':
    
        Quant.start("config.json", FetchTwitter)
    

    3. Twitter API v2

    这个是之前1.1的升级版,没有实际测试过,两者都属于restapi,想来差异不大。

    4. tweepy.Stream

    这个是流式api,用起来也很简单:

    import tweepy
    
    from logger import Logger
    
    
    Logger.init_logger(
        level="info",
        console=False
    )
    
    
    consumer_key = ""
    consumer_secret = ""
    access_token = ""
    access_token_secret = ""
    
    
    # Subclass Stream to print IDs of Tweets received
    class Stream(tweepy.Stream):
    
        def on_status(self, status):
            Logger.info(status.user.screen_name, status.id, status.created_at, status.text)
    
    
    # Initialize instance of the subclass
    stream = Stream(
      consumer_key, consumer_secret,
      access_token, access_token_secret
    )
    
    
    # Filter realtime Tweets by follow
    # [HobaBot]
    stream.filter(follow=["1048905259"])
    

    4. tweepy.asynchronous.AsyncStream

    这个是异步的stream api,下面是我写的main.py文件的代码:

    import signal
    import asyncio
    
    from utils.logger import Logger
    from utils.tools import Tools
    from utils.mysql import Mysql
    from utils.translate import Translate
    
    from tweepy.asynchronous.streaming import AsyncStream
    
    from tweepy import models
    
    
    class Stream(AsyncStream):
    
        def __init__(self, consumer_key, consumer_secret, access_token, access_token_secret):
            super().__init__(consumer_key, consumer_secret, access_token, access_token_secret)
    
            self.consumer_key = consumer_key
            self.consumer_secret = consumer_secret
            self.access_token = access_token
            self.access_token_secret = access_token_secret
    
            # Filter realtime Tweets by twitter user, [HobaBot]
            self.follows = [1048905259]     # 要订阅的用户的id
            self.filter(follow=self.follows)    # 把id列表传进这个过滤器里
    
        def disconnect(self):   # 断开连接
            self.disconnect()
    
        async def on_closed(self, resp):    # 链接关闭回调
            Logger.warn(f"AsyncStream closed:{resp}, trying to reconnect ...", caller=self)
            self.__init__(self.consumer_key, self.consumer_secret, self.access_token, self.access_token_secret)
    
        async def on_connect(self):     # 链接成功回调
            Logger.info("AsyncStream connected success ...", caller=self)
    
        async def on_connection_error(self):    # 链接错误回调
            Logger.error("AsyncStream connection error ...", caller=self)
    
        async def on_exception(self, exception):    # 异常信息回调
            Logger.exception(f"AsyncStream exception: {exception}", caller=self)
    
        async def on_keep_alive(self):      # 心跳回调
            Logger.info("Keep alive signal received ...", caller=self)
    
        async def on_status(self, status: models.Status):   # status更新回调(推特的数据都有相应的模型,比如这里是Status模型)
            # do some filter based on user's `id`(type of int)
            user: models.User = status.user
            if user.id not in self.follows:
                return
    
            # do some other filter based on timestamp
            utc_created_at = str(status.created_at).replace("+00:00", "")
            ts = Tools.utctime_str_to_ts(utctime_str=utc_created_at, fmt="%Y-%m-%d %H:%M:%S")
            datetime = Tools.ts_to_datetime_str(ts)
            if Tools.get_cur_timestamp() - ts >= 10:
                return
    
            # whether this tweet is a reply to other people
            is_reply = True if status.in_reply_to_status_id else False
    
            # log specific information we need
            Logger.info("author:", user.screen_name, "datetime:", datetime, "content:", status.text, "is_reply:", is_reply, caller=self)
    
            # insert tweet into database, but in this place we need to do some filter by tweet's unique id
            success, error = await Mysql.fetchone("SELECT `id` "
                                                  "FROM `tweets` "
                                                  "WHERE `tweet_id` = '%s' "
                                                  "AND `is_translate` = 0;" % status.id)
            if error or success:
                return
    
            success, error = await Mysql.query(
                "INSERT INTO `tweets` "
                "VALUES(NULL,'%s','%s','%s','%s','%s','%s');" % (user.screen_name, datetime, status.text, is_reply, status.id, 0))
            if error:
                Logger.error("insert tweet into tweets error:", error, caller=self)
                return
            Logger.info("insert tweet into tweets success...", caller=self)
    
            # 翻译一下推文
            translated_content, error = await Translate.translate(content=status.text)
            if error:
                Logger.error(f"translate tweets error:", error, caller=self)
                return
    
            # 存一下译文
            success, error = await Mysql.query(
                "INSERT INTO `tweets` "
                "VALUES(NULL,'%s','%s','%s','%s','%s','%s');" % (user.screen_name, datetime, translated_content, is_reply, status.id, 1))
            if error:
                Logger.error("insert translated_content into tweets error:", error, caller=self)
                return
    
    
    if __name__ == '__main__':
    
        async def connect_mysql():
            """Connect to mysql database when strategy start."""
            success, error = await Mysql.initialize_mysql_connection_pool(
                host="",
                user="",
                password="",
                db=""
            )
            if error:
                Logger.error("CONNECT MYSQL WHEN STRATEGY START ERROR, STOPPING STRATEGY !", caller=connect_mysql)
                asyncio.get_event_loop().stop()
            Logger.info("Connect mysql success !", caller=connect_mysql)
    
    
        def start():
    
            # initialize logger settings
            Logger.init_logger(level="info", path="./logs", name="error.log", clear=False, backup_count=0, console=False)
    
            # initialize mysql database connection pool
            asyncio.get_event_loop().create_task(connect_mysql())
    
            # start async stream
            stream = Stream(    # 初始化Stream类时,会将filter包装为future对象注册到事件循环中等到执行
                consumer_key="",
                consumer_secret="",
                access_token="",
                access_token_secret=""
            )
    
            def keyboard_interrupt(s, f):
                """disconnect stream and stop event loop when keyboard interrupt."""
                print("KeyboardInterrupt (ID: {}) has been caught. Cleaning up...".format(s))
                stream.disconnect()
                asyncio.get_event_loop().stop()
    
            signal.signal(signal.SIGINT, keyboard_interrupt)
    
            # start asyncio EventLoop
            Logger.info("start io loop ...", caller=start)
            asyncio.get_event_loop().run_forever()  # 启动asyncio事件循环
    
        # start program
        start()
    

    相关文章

      网友评论

          本文标题:推特API总结

          本文链接:https://www.haomeiwen.com/subject/mdmqhrtx.html