一. 不同版本的读兼容性结果
服务端版本 |
是否开snappy压缩 |
客户端0.8.2.2 |
客户端0.10.2.2 |
0.8.2.2 |
true |
成功 |
失败 |
0.8.2.2 |
false |
成功 |
失败 |
0.10.2.2 |
true |
成功 |
成功 |
0.10.2.2 |
false |
成功 |
成功 |
二. 0.8server 命令
2.1 0.8 启动
bin/kafka-server-start.sh -daemon config/server.properties
2.2 0.8 发送消息,不开压缩
bin/kafka-topics.sh --zookeeper localhost:2181/ --create --topic t1 --partitions 3 --replication-factor 1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic t1 --compression-codec none
2.2.1 0.10 消费不开压缩的消息 失败
afly@afly-desktop:~/local/kafka_2.11-0.10.2.2$ bin/kafka-console-consumer.sh --zookeeper localhost:2181/ --topic t1
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
[2019-07-06 22:43:47,725] WARN [ConsumerFetcherThread-console-consumer-66613_afly-desktop-1562424227400-8033055c-0-0], Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@5185dcf4 (kafka.consumer.ConsumerFetcherThread)
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130)
at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:111)
at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:31)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
afly@afly-desktop:~/local/kafka_2.11-0.10.2.2$ bin/kafka-console-consumer.sh --zookeeper localhost:2181/ --topic t1
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
[2019-07-06 22:43:47,725] WARN [ConsumerFetcherThread-console-consumer-66613_afly-desktop-1562424227400-8033055c-0-0], Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@5185dcf4 (kafka.consumer.ConsumerFetcherThread)
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130)
at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:111)
at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:31)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
2.2.2 0.8 消费不开压缩的消息 Okay
bin/kafka-console-consumer.sh --zookeeper localhost:2181/ --topic t1
2.3 0.8 发送消息,开压缩
bin/kafka-topics.sh --zookeeper localhost:2181/ --create --topic t2 --partitions 3 --replication-factor 1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic t1 --compression-codec snappy
2.3.1 0.10消费开压缩的消息 无反应,消费失败
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t1
==> logs/server.log <==
[2019-07-06 22:46:08,957] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
kafka.common.KafkaException: Wrong request type 18
at kafka.api.RequestKeys$.deserializerForKey(RequestKeys.scala:64)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:50)
at kafka.network.Processor.read(SocketServer.scala:450)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:748)
2.3.2 0.8消费开压缩的消息 Okay
bin/kafka-console-consumer.sh --zookeeper localhost:2181/ --topic t1
三. 0.10 启动
bin/kafka-server-start.sh -daemon config/server.properties
0.10 发送消息,不开压缩
bin/kafka-topics.sh --zookeeper localhost:2181/kafka010 --create --topic t1 --partitions 3 --replication-factor 1
bin/kafka-console-producer.sh --broker-list localhost:9092 --compression-codec none --topic t1
0.10 消费不开压缩的消息 Okay
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t1
0.8 消费不开压缩的消息 Okay
bin/kafka-console-consumer.sh --zookeeper localhost:2181/kafka010 --topic t1
0.10 发送消息,开压缩
bin/kafka-topics.sh --zookeeper localhost:2181/kafka010 --create --topic t2 --partitions 3 --replication-factor 1
bin/kafka-console-producer.sh --broker-list localhost:9092 --compression-codec snappy --topic t2
0.10消费开压缩的消息 Okay
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t2 --from-beginning
0.8消费开压缩的消息 Okay
bin/kafka-console-consumer.sh --zookeeper localhost:2181/kafka010 --topic t2 --from-beginning
网友评论