美文网首页
Django:使用django-logpipe进行Kafka整合

Django:使用django-logpipe进行Kafka整合

作者: Zzmi | 来源:发表于2019-04-29 16:22 被阅读0次

    django-logpipe库

    ——该库充当用于在Django应用程序和服务之间移动数据的通用管道。它建立在Boto3Apache Kafkakafka-pythonDjango REST Framework之上。
    安装:pip install django-logpipe

    将logpipe添加到已安装的应用中:
    INSTALLED_APPS = [ 
        ... 
        'logpipe',
        ... 
    ]
    

    将连接设置添加到settings.py文件。配置Kafka是这样的:

    LOGPIPE = { 
        #必需设置
        'OFFSET_BACKEND':'logpipe.backend.kafka.ModelOffsetStore',
        'CONSUMER_BACKEND':'logpipe.backend.kafka.Consumer',
        'PRODUCER_BACKEND':'logpipe.backend.kafka.Producer',
        'KAFKA_BOOTSTRAP_SERVERS ':[ 
            'kafka:9092' 
        ],
        'KAFKA_CONSUMER_KWARGS':{ 
            'group_id':'django-logpipe',
        },
    
        #可选设置
        #'KAFKA_SEND_TIMEOUT':10,
        #'KAFKA_MAX_SEND_RETRIES':0,
        #'MIN_MESSAGE_LAG_MS':0 ,
        #'DEFAULT_FORMAT':'json',
    }
    

    运行迁移python manage.py migrate logpipe
    。这将创建用于存储Kafka日志位置偏移的模型:

    用法

    串行器

    使用logpipe发送或接收消息的第一步是定义序列化程序。logpipe的序列化程序有一些规则:

    1. 必须是rest_framework.serializers.Serializer的子类或实现模仿rest_framework.serializers.Serializer的接口的类。
    2. 必须在类上定义MESSAGE_TYPE属性。该值应该是一个字符串,它定义唯一定义其主题/流中的数据类型。
    3. 必须在类上定义VERSION属性。该值应为表示模式版本号的单调整数。
    4. 必须有KEY_FIELD在类上定义的属性,表示要用作消息键的字段的名称。消息密钥由Kafka在执行日志压缩时使用,并由Kinesis用作分片分区键。对于不需要密钥的主题,可以省略该属性。
    5. 如果序列化程序将用于传入消息,则应实现类方法lookup_instance(cls,** kwargs)。在实例化序列化程序之前,将直接使用消息数据作为关键字参数调用此类方法。它应该查找并返回相关对象(如果存在),以便在初始化期间将其传递给序列化程序的实例参数。如果还没有对象存在(消息表示新对象),则应返回None。
      下面是一个示例Django模型及其序列化程序。
    from django.db import models
    from rest_framework import serializers
    import uuid
    
    class Person(models.Model):
        uuid = models.UUIDField(default=uuid.uuid4, unique=True)
        first_name = models.CharField(max_length=200)
        last_name = models.CharField(max_length=200)
    
    class PersonSerializer(serializers.ModelSerializer):
        MESSAGE_TYPE = 'person'
        VERSION = 1
        KEY_FIELD = 'uuid'
    
        class Meta:
            model = Person
            fields = ['uuid', 'first_name', 'last_name']
    
        @classmethod
        def lookup_instance(cls, uuid, **kwargs):
            try:
                return Person.objects.get(uuid=uuid)
            except models.Person.DoesNotExist:
                pass
    
    发送消息

    一旦存在序列化程序,就可以通过创建Producer对象并调用send方法向Kafka发送消息。

    from logpipe import Producer
    joe = Person.objects.create(first_name='Joe', last_name='Schmoe')
    producer = Producer('people', PersonSerializer)
    producer.send(joe)
    

    上面的示例代码将以下消息发送到名为people的Kafka topic 。

    json:{
        "type":"person",
        "version":1,"message":{
            "first_name":"Joe",
            "last_name":"Schmoe",
            "uuid":"xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx"
        }
    }
    
    接收消息

    为了处理传入的消息,可以重用相同的模型和序列化器。我们只需要实例化一个Consumer对象。

    # Watch for messages, but timeout after 1000ms of no messages
    consumer = Consumer('people', consumer_timeout_ms=1000)
    consumer.register(PersonSerializer)
    consumer.run()
    
    # Watch for messages and block forever
    consumer = Consumer('people')
    consumer.register(PersonSerializer)
    consumer.run()
    

    Consumer对象使用Django REST Framework的内置保存,创建和更新方法来应用消息。如果消息没有直接绑定到Django模型,可以跳过定义的lookup_instance类方法并覆盖save方法来自定义逻辑。
    如果在单个topic或stream中有多个数据类型,则可以通过向Consumer注册多个序列化程序来使用它们。

    consumer = Consumer('people')
    consumer.register(PersonSerializer)
    consumer.register(PlaceSerializer)
    consumer.register(ThingSerializer)
    consumer.run()
    

    还可以通过为每种消息类型版本定义序列化程序并将其全部注册到Consumer来支持多种不兼容的消息类型。

    consumer = Consumer('people')
    consumer.register(PersonSerializerVersion1)
    consumer.register(PersonSerializerVersion2)
    consumer.register(PlaceSerializer)
    consumer.register(ThingSerializer)
    consumer.run()
    

    如果有多个stream或topic,需要为每一个都创建一个Consumer,并使用MultiConsumer查看这些stream或topic。

    from logpipe import MultiConsumer
    people_consumer = Consumer('people')
    people_consumer.register(PersonSerializer)
    places_consumer = Consumer('places')
    places_consumer.register(PlaceSerializer)
    multi = MultiConsumer(people_consumer, places_consumer)
    
    # Watch for 'people' and 'places' topics indefinitely
    multi.run()
    

    最后,可以通过build_kafka_consumer管理命令中的构建来自动注册和运行Consumer。

    # myapp/apps.py
    from django.apps import AppConfig
    from logpipe import Consumer, register_consumer
    
    class MyAppConfig(AppConfig):
        name = 'myapp'
    
    # Register consumers with logpipe
    @register_consumer
    def build_person_consumer():
        consumer = Consumer('people')
        consumer.register(PersonSerializer)
        return consumer
    

    使用register_consumer装饰器可以根据需要注册尽可能多的Consumer和topic。然后运行run_kafka_consumer命令以循环方式自动处理所有Consumer的消息。python manage.py run_kafka_consumer

    处理架构更改

    使用每个序列化程序类所需的VERSION属性处理架构更改。发送时,生产者在消息数据中包含模式版本号。然后,当消费者收到消息时,它会查找具有匹配版本号的寄存器序列化器。如果未找到具有匹配版本号的序列化程序,则会引发logpipe.exceptions.UnknownMessageVersionError异常。

    要执行向后不兼容的架构更改,应执行以下步骤。

    更新使用者代码以了解新架构版本。将生产者代码更新为发送新架构版本。经过一段时间后(当确定Kafka中仍然不存在旧版本消息时),请删除与旧架构版本相关的代码。
    例如,如果我们想要在上面定义的Person模型上需要一个电子邮件字段,那么第一步是更新消费者以了解新字段:

    class Person(models.Model):
        uuid = models.UUIDField(default = uuid.uuid4,unique = True)
        first_name = models.CharField(max_length = 200)
        last_name = models.CharField(max_length = 200)
        email = models.EmailField( max_length = 200,null = True)
    
    
    class PersonSerializerV1(serializers.ModelSerializer):
        MESSAGE_TYPE = 'person'
        VERSION = 1
        KEY_FIELD = 'uuid'class 
        class Meta:
            model = Person 
            fields = ['uuid','first_name','last_name'] 
    
    
    class PersonSerializerV2(PersonSerializerV1):
        MESSAGE_TYPE ='person'
        VERSION = 2 
        class Meta(PersonSerializerV1.META):
            fields = ['uuid','first_name','last_name','email'] 
    
    consumer = Consumer('people',consumer_timeout_ms = 1000)
    consumer.register(PersonSerializerV1)
    consumer.register(PersonSerializerV2)
    

    消费者现在将使用适当的序列化程序来显示消息版本。其次,我们需要将生产者代码更新为使用模式版本2:

    producer = Producer('people', PersonSerializerV2)
    

    最后,在删除所有旧版本1消息(通过日志压缩)之后,可以从代码库中删除PersonSerializerV1类。

    相关文章

      网友评论

          本文标题:Django:使用django-logpipe进行Kafka整合

          本文链接:https://www.haomeiwen.com/subject/rlvynqtx.html