django-logpipe库
——该库充当用于在Django应用程序和服务之间移动数据的通用管道。它建立在Boto3,Apache Kafka,kafka-python和Django REST Framework之上。
安装:pip install django-logpipe
将logpipe添加到已安装的应用中:
INSTALLED_APPS = [
...
'logpipe',
...
]
将连接设置添加到settings.py文件。配置Kafka是这样的:
LOGPIPE = {
#必需设置
'OFFSET_BACKEND':'logpipe.backend.kafka.ModelOffsetStore',
'CONSUMER_BACKEND':'logpipe.backend.kafka.Consumer',
'PRODUCER_BACKEND':'logpipe.backend.kafka.Producer',
'KAFKA_BOOTSTRAP_SERVERS ':[
'kafka:9092'
],
'KAFKA_CONSUMER_KWARGS':{
'group_id':'django-logpipe',
},
#可选设置
#'KAFKA_SEND_TIMEOUT':10,
#'KAFKA_MAX_SEND_RETRIES':0,
#'MIN_MESSAGE_LAG_MS':0 ,
#'DEFAULT_FORMAT':'json',
}
运行迁移python manage.py migrate logpipe
。这将创建用于存储Kafka日志位置偏移的模型:
用法
串行器
使用logpipe发送或接收消息的第一步是定义序列化程序。logpipe的序列化程序有一些规则:
- 必须是rest_framework.serializers.Serializer的子类或实现模仿rest_framework.serializers.Serializer的接口的类。
- 必须在类上定义MESSAGE_TYPE属性。该值应该是一个字符串,它定义唯一定义其主题/流中的数据类型。
- 必须在类上定义VERSION属性。该值应为表示模式版本号的单调整数。
- 必须有KEY_FIELD在类上定义的属性,表示要用作消息键的字段的名称。消息密钥由Kafka在执行日志压缩时使用,并由Kinesis用作分片分区键。对于不需要密钥的主题,可以省略该属性。
- 如果序列化程序将用于传入消息,则应实现类方法lookup_instance(cls,** kwargs)。在实例化序列化程序之前,将直接使用消息数据作为关键字参数调用此类方法。它应该查找并返回相关对象(如果存在),以便在初始化期间将其传递给序列化程序的实例参数。如果还没有对象存在(消息表示新对象),则应返回None。
下面是一个示例Django模型及其序列化程序。
from django.db import models
from rest_framework import serializers
import uuid
class Person(models.Model):
uuid = models.UUIDField(default=uuid.uuid4, unique=True)
first_name = models.CharField(max_length=200)
last_name = models.CharField(max_length=200)
class PersonSerializer(serializers.ModelSerializer):
MESSAGE_TYPE = 'person'
VERSION = 1
KEY_FIELD = 'uuid'
class Meta:
model = Person
fields = ['uuid', 'first_name', 'last_name']
@classmethod
def lookup_instance(cls, uuid, **kwargs):
try:
return Person.objects.get(uuid=uuid)
except models.Person.DoesNotExist:
pass
发送消息
一旦存在序列化程序,就可以通过创建Producer对象并调用send方法向Kafka发送消息。
from logpipe import Producer
joe = Person.objects.create(first_name='Joe', last_name='Schmoe')
producer = Producer('people', PersonSerializer)
producer.send(joe)
上面的示例代码将以下消息发送到名为people的Kafka topic 。
json:{
"type":"person",
"version":1,"message":{
"first_name":"Joe",
"last_name":"Schmoe",
"uuid":"xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx"
}
}
接收消息
为了处理传入的消息,可以重用相同的模型和序列化器。我们只需要实例化一个Consumer对象。
# Watch for messages, but timeout after 1000ms of no messages
consumer = Consumer('people', consumer_timeout_ms=1000)
consumer.register(PersonSerializer)
consumer.run()
# Watch for messages and block forever
consumer = Consumer('people')
consumer.register(PersonSerializer)
consumer.run()
Consumer对象使用Django REST Framework的内置保存,创建和更新方法来应用消息。如果消息没有直接绑定到Django模型,可以跳过定义的lookup_instance类方法并覆盖save方法来自定义逻辑。
如果在单个topic或stream中有多个数据类型,则可以通过向Consumer注册多个序列化程序来使用它们。
consumer = Consumer('people')
consumer.register(PersonSerializer)
consumer.register(PlaceSerializer)
consumer.register(ThingSerializer)
consumer.run()
还可以通过为每种消息类型版本定义序列化程序并将其全部注册到Consumer来支持多种不兼容的消息类型。
consumer = Consumer('people')
consumer.register(PersonSerializerVersion1)
consumer.register(PersonSerializerVersion2)
consumer.register(PlaceSerializer)
consumer.register(ThingSerializer)
consumer.run()
如果有多个stream或topic,需要为每一个都创建一个Consumer,并使用MultiConsumer查看这些stream或topic。
from logpipe import MultiConsumer
people_consumer = Consumer('people')
people_consumer.register(PersonSerializer)
places_consumer = Consumer('places')
places_consumer.register(PlaceSerializer)
multi = MultiConsumer(people_consumer, places_consumer)
# Watch for 'people' and 'places' topics indefinitely
multi.run()
最后,可以通过build_kafka_consumer管理命令中的构建来自动注册和运行Consumer。
# myapp/apps.py
from django.apps import AppConfig
from logpipe import Consumer, register_consumer
class MyAppConfig(AppConfig):
name = 'myapp'
# Register consumers with logpipe
@register_consumer
def build_person_consumer():
consumer = Consumer('people')
consumer.register(PersonSerializer)
return consumer
使用register_consumer装饰器可以根据需要注册尽可能多的Consumer和topic。然后运行run_kafka_consumer命令以循环方式自动处理所有Consumer的消息。python manage.py run_kafka_consumer
处理架构更改
使用每个序列化程序类所需的VERSION属性处理架构更改。发送时,生产者在消息数据中包含模式版本号。然后,当消费者收到消息时,它会查找具有匹配版本号的寄存器序列化器。如果未找到具有匹配版本号的序列化程序,则会引发logpipe.exceptions.UnknownMessageVersionError异常。
要执行向后不兼容的架构更改,应执行以下步骤。
更新使用者代码以了解新架构版本。将生产者代码更新为发送新架构版本。经过一段时间后(当确定Kafka中仍然不存在旧版本消息时),请删除与旧架构版本相关的代码。
例如,如果我们想要在上面定义的Person模型上需要一个电子邮件字段,那么第一步是更新消费者以了解新字段:
class Person(models.Model):
uuid = models.UUIDField(default = uuid.uuid4,unique = True)
first_name = models.CharField(max_length = 200)
last_name = models.CharField(max_length = 200)
email = models.EmailField( max_length = 200,null = True)
class PersonSerializerV1(serializers.ModelSerializer):
MESSAGE_TYPE = 'person'
VERSION = 1
KEY_FIELD = 'uuid'class
class Meta:
model = Person
fields = ['uuid','first_name','last_name']
class PersonSerializerV2(PersonSerializerV1):
MESSAGE_TYPE ='person'
VERSION = 2
class Meta(PersonSerializerV1.META):
fields = ['uuid','first_name','last_name','email']
consumer = Consumer('people',consumer_timeout_ms = 1000)
consumer.register(PersonSerializerV1)
consumer.register(PersonSerializerV2)
消费者现在将使用适当的序列化程序来显示消息版本。其次,我们需要将生产者代码更新为使用模式版本2:
producer = Producer('people', PersonSerializerV2)
最后,在删除所有旧版本1消息(通过日志压缩)之后,可以从代码库中删除PersonSerializerV1类。
网友评论