美文网首页
Django:使用django-logpipe进行Kafka整合

Django:使用django-logpipe进行Kafka整合

作者: Zzmi | 来源:发表于2019-04-29 16:22 被阅读0次

django-logpipe库

——该库充当用于在Django应用程序和服务之间移动数据的通用管道。它建立在Boto3Apache Kafkakafka-pythonDjango REST Framework之上。
安装:pip install django-logpipe

将logpipe添加到已安装的应用中:
INSTALLED_APPS = [ 
    ... 
    'logpipe',
    ... 
]

将连接设置添加到settings.py文件。配置Kafka是这样的:

LOGPIPE = { 
    #必需设置
    'OFFSET_BACKEND':'logpipe.backend.kafka.ModelOffsetStore',
    'CONSUMER_BACKEND':'logpipe.backend.kafka.Consumer',
    'PRODUCER_BACKEND':'logpipe.backend.kafka.Producer',
    'KAFKA_BOOTSTRAP_SERVERS ':[ 
        'kafka:9092' 
    ],
    'KAFKA_CONSUMER_KWARGS':{ 
        'group_id':'django-logpipe',
    },

    #可选设置
    #'KAFKA_SEND_TIMEOUT':10,
    #'KAFKA_MAX_SEND_RETRIES':0,
    #'MIN_MESSAGE_LAG_MS':0 ,
    #'DEFAULT_FORMAT':'json',
}

运行迁移python manage.py migrate logpipe
。这将创建用于存储Kafka日志位置偏移的模型:

用法

串行器

使用logpipe发送或接收消息的第一步是定义序列化程序。logpipe的序列化程序有一些规则:

  1. 必须是rest_framework.serializers.Serializer的子类或实现模仿rest_framework.serializers.Serializer的接口的类。
  2. 必须在类上定义MESSAGE_TYPE属性。该值应该是一个字符串,它定义唯一定义其主题/流中的数据类型。
  3. 必须在类上定义VERSION属性。该值应为表示模式版本号的单调整数。
  4. 必须有KEY_FIELD在类上定义的属性,表示要用作消息键的字段的名称。消息密钥由Kafka在执行日志压缩时使用,并由Kinesis用作分片分区键。对于不需要密钥的主题,可以省略该属性。
  5. 如果序列化程序将用于传入消息,则应实现类方法lookup_instance(cls,** kwargs)。在实例化序列化程序之前,将直接使用消息数据作为关键字参数调用此类方法。它应该查找并返回相关对象(如果存在),以便在初始化期间将其传递给序列化程序的实例参数。如果还没有对象存在(消息表示新对象),则应返回None。
    下面是一个示例Django模型及其序列化程序。
from django.db import models
from rest_framework import serializers
import uuid

class Person(models.Model):
    uuid = models.UUIDField(default=uuid.uuid4, unique=True)
    first_name = models.CharField(max_length=200)
    last_name = models.CharField(max_length=200)

class PersonSerializer(serializers.ModelSerializer):
    MESSAGE_TYPE = 'person'
    VERSION = 1
    KEY_FIELD = 'uuid'

    class Meta:
        model = Person
        fields = ['uuid', 'first_name', 'last_name']

    @classmethod
    def lookup_instance(cls, uuid, **kwargs):
        try:
            return Person.objects.get(uuid=uuid)
        except models.Person.DoesNotExist:
            pass
发送消息

一旦存在序列化程序,就可以通过创建Producer对象并调用send方法向Kafka发送消息。

from logpipe import Producer
joe = Person.objects.create(first_name='Joe', last_name='Schmoe')
producer = Producer('people', PersonSerializer)
producer.send(joe)

上面的示例代码将以下消息发送到名为people的Kafka topic 。

json:{
    "type":"person",
    "version":1,"message":{
        "first_name":"Joe",
        "last_name":"Schmoe",
        "uuid":"xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx"
    }
}
接收消息

为了处理传入的消息,可以重用相同的模型和序列化器。我们只需要实例化一个Consumer对象。

# Watch for messages, but timeout after 1000ms of no messages
consumer = Consumer('people', consumer_timeout_ms=1000)
consumer.register(PersonSerializer)
consumer.run()

# Watch for messages and block forever
consumer = Consumer('people')
consumer.register(PersonSerializer)
consumer.run()

Consumer对象使用Django REST Framework的内置保存,创建和更新方法来应用消息。如果消息没有直接绑定到Django模型,可以跳过定义的lookup_instance类方法并覆盖save方法来自定义逻辑。
如果在单个topic或stream中有多个数据类型,则可以通过向Consumer注册多个序列化程序来使用它们。

consumer = Consumer('people')
consumer.register(PersonSerializer)
consumer.register(PlaceSerializer)
consumer.register(ThingSerializer)
consumer.run()

还可以通过为每种消息类型版本定义序列化程序并将其全部注册到Consumer来支持多种不兼容的消息类型。

consumer = Consumer('people')
consumer.register(PersonSerializerVersion1)
consumer.register(PersonSerializerVersion2)
consumer.register(PlaceSerializer)
consumer.register(ThingSerializer)
consumer.run()

如果有多个stream或topic,需要为每一个都创建一个Consumer,并使用MultiConsumer查看这些stream或topic。

from logpipe import MultiConsumer
people_consumer = Consumer('people')
people_consumer.register(PersonSerializer)
places_consumer = Consumer('places')
places_consumer.register(PlaceSerializer)
multi = MultiConsumer(people_consumer, places_consumer)

# Watch for 'people' and 'places' topics indefinitely
multi.run()

最后,可以通过build_kafka_consumer管理命令中的构建来自动注册和运行Consumer。

# myapp/apps.py
from django.apps import AppConfig
from logpipe import Consumer, register_consumer

class MyAppConfig(AppConfig):
    name = 'myapp'

# Register consumers with logpipe
@register_consumer
def build_person_consumer():
    consumer = Consumer('people')
    consumer.register(PersonSerializer)
    return consumer

使用register_consumer装饰器可以根据需要注册尽可能多的Consumer和topic。然后运行run_kafka_consumer命令以循环方式自动处理所有Consumer的消息。python manage.py run_kafka_consumer

处理架构更改

使用每个序列化程序类所需的VERSION属性处理架构更改。发送时,生产者在消息数据中包含模式版本号。然后,当消费者收到消息时,它会查找具有匹配版本号的寄存器序列化器。如果未找到具有匹配版本号的序列化程序,则会引发logpipe.exceptions.UnknownMessageVersionError异常。

要执行向后不兼容的架构更改,应执行以下步骤。

更新使用者代码以了解新架构版本。将生产者代码更新为发送新架构版本。经过一段时间后(当确定Kafka中仍然不存在旧版本消息时),请删除与旧架构版本相关的代码。
例如,如果我们想要在上面定义的Person模型上需要一个电子邮件字段,那么第一步是更新消费者以了解新字段:

class Person(models.Model):
    uuid = models.UUIDField(default = uuid.uuid4,unique = True)
    first_name = models.CharField(max_length = 200)
    last_name = models.CharField(max_length = 200)
    email = models.EmailField( max_length = 200,null = True)


class PersonSerializerV1(serializers.ModelSerializer):
    MESSAGE_TYPE = 'person'
    VERSION = 1
    KEY_FIELD = 'uuid'class 
    class Meta:
        model = Person 
        fields = ['uuid','first_name','last_name'] 


class PersonSerializerV2(PersonSerializerV1):
    MESSAGE_TYPE ='person'
    VERSION = 2 
    class Meta(PersonSerializerV1.META):
        fields = ['uuid','first_name','last_name','email'] 

consumer = Consumer('people',consumer_timeout_ms = 1000)
consumer.register(PersonSerializerV1)
consumer.register(PersonSerializerV2)

消费者现在将使用适当的序列化程序来显示消息版本。其次,我们需要将生产者代码更新为使用模式版本2:

producer = Producer('people', PersonSerializerV2)

最后,在删除所有旧版本1消息(通过日志压缩)之后,可以从代码库中删除PersonSerializerV1类。

相关文章

网友评论

      本文标题:Django:使用django-logpipe进行Kafka整合

      本文链接:https://www.haomeiwen.com/subject/rlvynqtx.html