Kafka Rest API发送Avro格式消息
Kafka Rest 及 Schema-Registry的部署本文不做介绍。
如果需要使用java 发送Avro格式消息,请参考Kafka Schema Registry 使用教程(JAVA)。
下面示例使用Rest 和Schema-Registry API 发送消息。
-
创建topic: schema-test。
-
使用schema-registry API注册Schema,获取schema id
1. 检查schema是否存在: curl -X POST \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -H "Accept: application/vnd.schemaregistry.v1+json,application/vnd.schemaregistry+json, application/json" \ "http://schema-host:port/subjects/schema-test-value" Response: { "error_code": 40401, "message": "Subject not found." } 2. 注册schema: curl -X POST \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -H "Accept: application/vnd.schemaregistry.v1+json,application/vnd.schemaregistry+json, application/json" \ --data '{"schema":"{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"},{\"name\": \"age\", \"type\": \"int\"}]}"}' \ "http://schema-host:port/subjects/schema-test-value/versions" // 返回的id在发送消息时会用到 Response: { "id": 484 }
-
使用Rest API向topic发送消息
curl -X POST \ -H "Content-Type: application/vnd.kafka.avro.v2+json" \ -H "Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json" \ --data '{"value_schema_id":484,"records":[{"value":{"name":"kyle","age":10}},{"value":{"name":"lola","age":11}}]}' \ "http://rest-host:port/topics/schema-test" Response: { "offsets": [ { "partition": 2, "offset": 0, "error_code": null, "error": null }, { "partition": 1, "offset": 0, "error_code": null, "error": null } ], "key_schema_id": null, "value_schema_id": 484 }
-
创建 consumer 实例
curl -X POST \ -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"name":"schema-test_consumer","format":"avro","auto.offset.reset":"earliest","auto.commit.enable":"false"}' \ "http://rest-host:port/consumers/schema-test-group" Response: { "instance_id": "schema-test_consumer", "base_uri": "http://rest-host:port/consumers/schema-test-group/instances/schema-test_consumer" }
-
订阅topic
curl -X POST \ -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"topics":["schema-test"]}' \ "http://rest-host:port/consumers/schema-test-group/instances/schema-test_consumer/subscription" Response: No Content
-
消费数据
curl -X GET \ -H "Accept: application/vnd.kafka.avro.v2+json" \ "http://rest-host:port/consumers/schema-test-group/instances/schema-test_consumer/records?timeout=3000&max_bytes=300000" Response: [ { "key": null, "value": { "name": "kyle", "age": 10 }, "partition": 2, "offset": 0, "topic": "schema-test" }, { "key": null, "value": { "name": "lola", "age": 11 }, "partition": 1, "offset": 0, "topic": "schema-test" } ]
其他更详细的API及参数请参阅官网 .
网友评论