美文网首页Confluent 组件
Kafka Rest API发送Avro格式消息

Kafka Rest API发送Avro格式消息

作者: 扶我起来改bug | 来源:发表于2019-08-16 16:31 被阅读0次

    Kafka Rest API发送Avro格式消息

    Kafka Rest 及 Schema-Registry的部署本文不做介绍。
    如果需要使用java 发送Avro格式消息,请参考Kafka Schema Registry 使用教程(JAVA)
    下面示例使用Rest 和Schema-Registry API 发送消息。

    1. 创建topic: schema-test。

    2. 使用schema-registry API注册Schema,获取schema id

      1. 检查schema是否存在:
      curl -X POST \ 
      -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      -H "Accept: application/vnd.schemaregistry.v1+json,application/vnd.schemaregistry+json, application/json" \
      "http://schema-host:port/subjects/schema-test-value"
      Response:
      {
          "error_code": 40401,
          "message": "Subject not found."
      }
      
      2. 注册schema:
      curl -X POST \ 
      -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      -H "Accept: application/vnd.schemaregistry.v1+json,application/vnd.schemaregistry+json, application/json" \
      --data '{"schema":"{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"},{\"name\": \"age\", \"type\": \"int\"}]}"}' \
      "http://schema-host:port/subjects/schema-test-value/versions"
      // 返回的id在发送消息时会用到
      Response:
      {
          "id": 484
      }                
      
    3. 使用Rest API向topic发送消息

      curl -X POST \ 
      -H "Content-Type: application/vnd.kafka.avro.v2+json" \
      -H "Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json" \
      --data '{"value_schema_id":484,"records":[{"value":{"name":"kyle","age":10}},{"value":{"name":"lola","age":11}}]}' \
      "http://rest-host:port/topics/schema-test"
      Response:
      {
          "offsets": [
              {
                  "partition": 2,
                  "offset": 0,
                  "error_code": null,
                  "error": null
              },
              {
                  "partition": 1,
                  "offset": 0,
                  "error_code": null,
                  "error": null
              }
          ],
          "key_schema_id": null,
          "value_schema_id": 484
      }
      
    4. 创建 consumer 实例

      curl -X POST \ 
      -H "Content-Type: application/vnd.kafka.v2+json" \
      --data '{"name":"schema-test_consumer","format":"avro","auto.offset.reset":"earliest","auto.commit.enable":"false"}' \
      "http://rest-host:port/consumers/schema-test-group"
      Response:
      {
          "instance_id": "schema-test_consumer",
          "base_uri": "http://rest-host:port/consumers/schema-test-group/instances/schema-test_consumer"
      }
      
    5. 订阅topic

      curl -X POST \ 
      -H "Content-Type: application/vnd.kafka.v2+json" \
      --data '{"topics":["schema-test"]}' \
      "http://rest-host:port/consumers/schema-test-group/instances/schema-test_consumer/subscription"
      Response:
      No Content
      
    6. 消费数据

      curl -X GET \ 
      -H "Accept: application/vnd.kafka.avro.v2+json" \
      "http://rest-host:port/consumers/schema-test-group/instances/schema-test_consumer/records?timeout=3000&max_bytes=300000"
      Response:
      [
          {
              "key": null,
              "value": {
                  "name": "kyle",
                  "age": 10
              },
              "partition": 2,
              "offset": 0,
              "topic": "schema-test"
          },
          {
              "key": null,
              "value": {
                  "name": "lola",
                  "age": 11
              },
              "partition": 1,
              "offset": 0,
              "topic": "schema-test"
          }
      ]
      

      其他更详细的API及参数请参阅官网 .

      Kafka Rest Api

      Schema-registry API

    相关文章

      网友评论

        本文标题:Kafka Rest API发送Avro格式消息

        本文链接:https://www.haomeiwen.com/subject/rxsqsctx.html