美文网首页
大数据:用ApacheKafka和Python来实时提取新冠数据

大数据:用ApacheKafka和Python来实时提取新冠数据

作者: Detian_e8ab | 来源:发表于2020-05-07 23:09 被阅读0次

    Apache Kafka 是分布式的流处理平台, 能够发布消息和订阅消息, 并且能够以容错的持久的方式来存储记录数据流, 作为大数据生态的重要组成部分, Apache Kafka主要应用在构建实时的流数据管道,在系统和应用间得到可靠的数据, 并且能够构建转换或响应实时数据流的应用。这里通过用一个小demo展示如何使用 Apache Kafka producer和consumer 来实时发布和订阅数据。

    数据的来源是https://covid19api.com/。网站提供完全免费的rest api 新冠数据。如通过以下的Api call 可以获得如下的json.

    (https://api.covid19api.com/country/germany/status/confirmed/live?from=2020-03-01T00:00:00Z&to=2020-04-01T00:00:00Z)
      {
        "Country": "Germany",
        "CountryCode": "DE",
        "Province": "",
        "City": "",
        "CityCode": "",
        "Lat": "51.17",
        "Lon": "10.45",
        "Cases": 130,
        "Status": "confirmed",
        "Date": "2020-03-01T00:00:00Z"
      },
      {
        "Country": "Germany",
        "CountryCode": "DE",
        "Province": "",
        "City": "",
        "CityCode": "",
        "Lat": "51.17",
        "Lon": "10.45",
        "Cases": 159,
        "Status": "confirmed",
        "Date": "2020-03-02T00:00:00Z"
      },
      {
        "Country": "Germany",
        "CountryCode": "DE",
        "Province": "",
        "City": "",
        "CityCode": "",
        "Lat": "51.17",
        "Lon": "10.45",
        "Cases": 196,
        "Status": "confirmed",
        "Date": "2020-03-03T00:00:00Z"
      }
    
    
    

    在开始数据的发布和订阅之前,首先要开始Kafka 服务。代码如下

    (base) cloud_user@yin2c:~$ sudo systemctl start confluent-zookeeper
    (base) cloud_user@yin2c:~$ sudo systemctl enable confluent-zookeeper
    (base) cloud_user@yin2c:~$ sudo systemctl start confluent-kafka
    (base) cloud_user@yin2c:~$ sudo systemctl enable confluent-kafka
    

    之后查看kafka broker是否在运行。

    这样Kafka就设置好了,下一步要创建一个话题topic

    kafka-topics --bootstrap-server localhost:9092 --create --topic py --partitions 1 --replication-factor 1
    

    接下来用python 来创建消息发布者和订阅者。消息的来源是新冠数据, 通过api call来获取数据, 是德国从4月20号以来每天的现存病例数量, 先创建一个发布者实例, 设置好服务器,然后通过loop 把得到的json数据字典中的每天的病例数量发布到topic 里面。当启动发布者之后, 订阅者就会逐行打印得到的信息。

    from kafka import KafkaProducer
    from json import loads
    import json
    import requests
    from time import sleep
    
    #list of all data from first date
    #URL = "https://api.covid19api.com/total/dayone/country/germany/status/confirmed"
    URL ="https://api.covid19api.com/live/country/germany/status/confirmed/date/2020-04-20T13:13:30Z"
    req = requests.get(url = URL)
    data = req.json()
    producer = KafkaProducer(bootstrap_servers = ['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8'))
    
    for i in range (len(data)):
        file = data[i]
        sleep(1)
        producer.send('py', value=str(file["Date"].split("T")[0])+':'+str(file["Active"]))
       
    

    消息的订阅者很简单就是一个监听topic 的订阅者。首先开始订阅者, 由于还没有消息发布, 所以没有信息。当发布者启动之后, 就可以看到信息被逐行打印出来。


    image.png
    image.png

    代码可以通过我的github 分叉:https://github.com/dtdetianyin/ApacheKafka/tree/master/Corona19%20Data%20processed%20with%20ApacheKafka%20and%20Python
    _

    相关文章

      网友评论

          本文标题:大数据:用ApacheKafka和Python来实时提取新冠数据

          本文链接:https://www.haomeiwen.com/subject/vbozghtx.html