集群环境cdh575(kafka0.9.0):
- kafka0.9.0 官方高级消费例子
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
- 官方提供的例子 consumer拉取消息的时候会传入一个timeout时长,表示consumer等待server端返回数据的最长超时时间,如果传入0则立刻返回当前所有可用的消息,
/**
* Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have
* subscribed to any topics or partitions before polling for data.
* <p>
* On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last
* consumed offset can be manually set through {@link #seek(TopicPartition, long)} or automatically set as the last committed
* offset for the subscribed list of partitions
*
*
* @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns
* immediately with any records that are available now. Must not be negative.
* @return map of topic to records since the last fetch for the subscribed list of topics and partitions
*
* @throws org.apache.kafka.clients.consumer.InvalidOffsetException if the offset for a partition or set of
* partitions is undefined or out of range and no offset reset policy has been configured
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.AuthorizationException if caller does Read access to any of the subscribed
* topics or to the configured groupId
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. invalid groupId or
* session timeout, errors deserializing key/value pairs, or any new error cases in future versions)
*/
@Override
public ConsumerRecords<K, V> poll(long timeout) {
acquire();
try {
if (timeout < 0)
throw new IllegalArgumentException("Timeout must not be negative");
// poll for new data until the timeout expires
long start = time.milliseconds();
long remaining = timeout;
do {
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
if (!records.isEmpty()) {
// if data is available, then return it, but first send off the
// next round of fetches to enable pipelining while the user is
// handling the fetched records.
fetcher.initFetches(metadata.fetch());
client.poll(0);
return new ConsumerRecords<>(records);
}
long elapsed = time.milliseconds() - start;
remaining = timeout - elapsed;
} while (remaining > 0);
return ConsumerRecords.empty();
} finally {
release();
}
}
- kafka0.8.X 官方高级消费例子
package com.test.groups;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class ConsumerTest implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
//The interesting part here is the while (it.hasNext()) section. Basically this code reads from Kafka until you stop it.
while (it.hasNext())
System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
System.out.println("Shutting down Thread: " + m_threadNumber);
}
}
-
val ConsumerTimeoutMs = -1 得出当高级消费的时候 不配置消费超时 则会阻塞消费进程一直等到消费数据。
consumer.timeout.ms可以通过配置此参数来控制消费超时时长
IteratorTemplate.scala部分代码
def hasNext(): Boolean = {
if(state == FAILED)
throw new IllegalStateException("Iterator is in failed state")
state match {
case DONE => false
case READY => true
case _ => maybeComputeNext()
}
}
protected def makeNext(): T
def maybeComputeNext(): Boolean = {
state = FAILED
nextItem = makeNext()
if(state == DONE) {
false
} else {
state = READY
true
}
}
ConsumerIterator.scala
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.consumer
import kafka.utils.{IteratorTemplate, Logging, Utils}
import java.util.concurrent.{TimeUnit, BlockingQueue}
import kafka.serializer.Decoder
import java.util.concurrent.atomic.AtomicReference
import kafka.message.{MessageAndOffset, MessageAndMetadata}
import kafka.common.{KafkaException, MessageSizeTooLargeException}
/**
* An iterator that blocks until a value can be read from the supplied queue.
* The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
*
*/
class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int,
private val keyDecoder: Decoder[K],
private val valueDecoder: Decoder[V],
val clientId: String)
extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
private var currentTopicInfo: PartitionTopicInfo = null
private var consumedOffset: Long = -1L
private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId)
override def next(): MessageAndMetadata[K, V] = {
val item = super.next()
if(consumedOffset < 0)
throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset))
currentTopicInfo.resetConsumeOffset(consumedOffset)
val topic = currentTopicInfo.topic
trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
item
}
protected def makeNext(): MessageAndMetadata[K, V] = {
var currentDataChunk: FetchedDataChunk = null
// if we don't have an iterator, get one
var localCurrent = current.get()
if(localCurrent == null || !localCurrent.hasNext) {
if (consumerTimeoutMs < 0)
currentDataChunk = channel.take
else {
currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)
if (currentDataChunk == null) {
// reset state to make the iterator re-iterable
resetState()
throw new ConsumerTimeoutException
}
}
if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) {
debug("Received the shutdown command")
return allDone
} else {
currentTopicInfo = currentDataChunk.topicInfo
val cdcFetchOffset = currentDataChunk.fetchOffset
val ctiConsumeOffset = currentTopicInfo.getConsumeOffset
if (ctiConsumeOffset < cdcFetchOffset) {
error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data"
.format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo))
currentTopicInfo.resetConsumeOffset(cdcFetchOffset)
}
localCurrent = currentDataChunk.messages.iterator
current.set(localCurrent)
}
// if we just updated the current chunk and it is empty that means the fetch size is too small!
if(currentDataChunk.messages.validBytes == 0)
throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " +
"%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow."
.format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset))
}
var item = localCurrent.next()
// reject the messages that have already been consumed
while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) {
item = localCurrent.next()
}
consumedOffset = item.nextOffset
item.message.ensureValid() // validate checksum of message to ensure it is valid
new MessageAndMetadata(currentTopicInfo.topic, currentTopicInfo.partitionId, item.message, item.offset, keyDecoder, valueDecoder)
}
def clearCurrentChunk() {
try {
debug("Clearing the current data chunk for this consumer iterator")
current.set(null)
}
}
}
class ConsumerTimeoutException() extends RuntimeException()
ConsumerConfig.scala
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.consumer
import java.util.Properties
import kafka.api.OffsetRequest
import kafka.utils._
import kafka.common.{InvalidConfigException, Config}
object ConsumerConfig extends Config {
val RefreshMetadataBackoffMs = 200
val SocketTimeout = 30 * 1000
val SocketBufferSize = 64*1024
val FetchSize = 1024 * 1024
val MaxFetchSize = 10*FetchSize
val NumConsumerFetchers = 1
val DefaultFetcherBackoffMs = 1000
val AutoCommit = true
val AutoCommitInterval = 60 * 1000
val MaxQueuedChunks = 2
val MaxRebalanceRetries = 4
val AutoOffsetReset = OffsetRequest.LargestTimeString
val ConsumerTimeoutMs = -1
val MinFetchBytes = 1
val MaxFetchWaitMs = 100
val MirrorTopicsWhitelist = ""
val MirrorTopicsBlacklist = ""
val MirrorConsumerNumThreads = 1
val OffsetsChannelBackoffMs = 1000
val OffsetsChannelSocketTimeoutMs = 10000
val OffsetsCommitMaxRetries = 5
val OffsetsStorage = "zookeeper"
val MirrorTopicsWhitelistProp = "mirror.topics.whitelist"
val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"
val ExcludeInternalTopics = true
val DefaultPartitionAssignmentStrategy = "range" /* select between "range", and "roundrobin" */
val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
val DefaultClientId = ""
def validate(config: ConsumerConfig) {
validateClientId(config.clientId)
validateGroupId(config.groupId)
validateAutoOffsetReset(config.autoOffsetReset)
validateOffsetsStorage(config.offsetsStorage)
}
def validateClientId(clientId: String) {
validateChars("client.id", clientId)
}
def validateGroupId(groupId: String) {
validateChars("group.id", groupId)
}
def validateAutoOffsetReset(autoOffsetReset: String) {
autoOffsetReset match {
case OffsetRequest.SmallestTimeString =>
case OffsetRequest.LargestTimeString =>
case _ => throw new InvalidConfigException("Wrong value " + autoOffsetReset + " of auto.offset.reset in ConsumerConfig; " +
"Valid values are " + OffsetRequest.SmallestTimeString + " and " + OffsetRequest.LargestTimeString)
}
}
def validateOffsetsStorage(storage: String) {
storage match {
case "zookeeper" =>
case "kafka" =>
case _ => throw new InvalidConfigException("Wrong value " + storage + " of offsets.storage in consumer config; " +
"Valid values are 'zookeeper' and 'kafka'")
}
}
}
class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(props) {
import ConsumerConfig._
def this(originalProps: Properties) {
this(new VerifiableProperties(originalProps))
props.verify()
}
/** a string that uniquely identifies a set of consumers within the same consumer group */
val groupId = props.getString("group.id")
/** consumer id: generated automatically if not set.
* Set this explicitly for only testing purpose. */
val consumerId: Option[String] = Option(props.getString("consumer.id", null))
/** the socket timeout for network requests. Its value should be at least fetch.wait.max.ms. */
val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout)
require(fetchWaitMaxMs <= socketTimeoutMs, "socket.timeout.ms should always be at least fetch.wait.max.ms" +
" to prevent unnecessary socket timeouts")
/** the socket receive buffer for network requests */
val socketReceiveBufferBytes = props.getInt("socket.receive.buffer.bytes", SocketBufferSize)
/** the number of byes of messages to attempt to fetch */
val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize)
/** the number threads used to fetch data */
val numConsumerFetchers = props.getInt("num.consumer.fetchers", NumConsumerFetchers)
/** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */
val autoCommitEnable = props.getBoolean("auto.commit.enable", AutoCommit)
/** the frequency in ms that the consumer offsets are committed to zookeeper */
val autoCommitIntervalMs = props.getInt("auto.commit.interval.ms", AutoCommitInterval)
/** max number of message chunks buffered for consumption, each chunk can be up to fetch.message.max.bytes*/
val queuedMaxMessages = props.getInt("queued.max.message.chunks", MaxQueuedChunks)
/** max number of retries during rebalance */
val rebalanceMaxRetries = props.getInt("rebalance.max.retries", MaxRebalanceRetries)
/** the minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */
val fetchMinBytes = props.getInt("fetch.min.bytes", MinFetchBytes)
/** the maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes */
val fetchWaitMaxMs = props.getInt("fetch.wait.max.ms", MaxFetchWaitMs)
/** backoff time between retries during rebalance */
val rebalanceBackoffMs = props.getInt("rebalance.backoff.ms", zkSyncTimeMs)
/** backoff time to refresh the leader of a partition after it loses the current leader */
val refreshLeaderBackoffMs = props.getInt("refresh.leader.backoff.ms", RefreshMetadataBackoffMs)
/** backoff time to reconnect the offsets channel or to retry offset fetches/commits */
val offsetsChannelBackoffMs = props.getInt("offsets.channel.backoff.ms", OffsetsChannelBackoffMs)
/** socket timeout to use when reading responses for Offset Fetch/Commit requests. This timeout will also be used for
* the ConsumerMetdata requests that are used to query for the offset coordinator. */
val offsetsChannelSocketTimeoutMs = props.getInt("offsets.channel.socket.timeout.ms", OffsetsChannelSocketTimeoutMs)
/** Retry the offset commit up to this many times on failure. This retry count only applies to offset commits during
* shut-down. It does not apply to commits from the auto-commit thread. It also does not apply to attempts to query
* for the offset coordinator before committing offsets. i.e., if a consumer metadata request fails for any reason,
* it is retried and that retry does not count toward this limit. */
val offsetsCommitMaxRetries = props.getInt("offsets.commit.max.retries", OffsetsCommitMaxRetries)
/** Specify whether offsets should be committed to "zookeeper" (default) or "kafka" */
val offsetsStorage = props.getString("offsets.storage", OffsetsStorage).toLowerCase
/** If you are using "kafka" as offsets.storage, you can dual commit offsets to ZooKeeper (in addition to Kafka). This
* is required during migration from zookeeper-based offset storage to kafka-based offset storage. With respect to any
* given consumer group, it is safe to turn this off after all instances within that group have been migrated to
* the new jar that commits offsets to the broker (instead of directly to ZooKeeper). */
val dualCommitEnabled = props.getBoolean("dual.commit.enabled", if (offsetsStorage == "kafka") true else false)
/* what to do if an offset is out of range.
smallest : automatically reset the offset to the smallest offset
largest : automatically reset the offset to the largest offset
anything else: throw exception to the consumer */
val autoOffsetReset = props.getString("auto.offset.reset", AutoOffsetReset)
/** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
val consumerTimeoutMs = props.getInt("consumer.timeout.ms", ConsumerTimeoutMs)
/**
* Client id is specified by the kafka consumer client, used to distinguish different clients
*/
val clientId = props.getString("client.id", groupId)
/** Whether messages from internal topics (such as offsets) should be exposed to the consumer. */
val excludeInternalTopics = props.getBoolean("exclude.internal.topics", ExcludeInternalTopics)
/** Select a strategy for assigning partitions to consumer streams. Possible values: range, roundrobin */
val partitionAssignmentStrategy = props.getString("partition.assignment.strategy", DefaultPartitionAssignmentStrategy)
validate(this)
}
- kafka 0.8.x 官方低级消费例子
// When calling FetchRequestBuilder, it's important NOT to call .replicaId(), which is meant for internal use only.
// Setting the replicaId incorrectly will cause the brokers to behave incorrectly.
FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
.addFetch(a_topic, a_partition, readOffset, 100000)
.build();
FetchResponse fetchResponse = consumer.fetch(req);
if (fetchResponse.hasError()) {
// See code in previous section
}
numErrors = 0;
long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) {
System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
continue;
}
readOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
numRead++;
a_maxReads--;
}
if (numRead == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
- 低级消费接口通过获取topic的每个partition的logsize,offset来自定义消费位移,fetchReq 初始化请求时会传入最大等待时长,默认为val DefaultMaxWait = 0
SimpleConsumer初始化的时候会传入 soTimeout为socket连接的超时
SimpleConsumer初始化
SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
- FetchRequest.scala
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import kafka.utils.nonthreadsafe
import kafka.api.ApiUtils._
import kafka.common.{ErrorMapping, TopicAndPartition}
import kafka.consumer.ConsumerConfig
import kafka.network.RequestChannel
import kafka.message.MessageSet
import java.util.concurrent.atomic.AtomicInteger
import java.nio.ByteBuffer
import scala.collection.immutable.Map
case class PartitionFetchInfo(offset: Long, fetchSize: Int)
object FetchRequest {
val CurrentVersion = 1.shortValue
val DefaultMaxWait = 0
val DefaultMinBytes = 0
val DefaultCorrelationId = 0
def readFrom(buffer: ByteBuffer): FetchRequest = {
val versionId = buffer.getShort
val correlationId = buffer.getInt
val clientId = readShortString(buffer)
val replicaId = buffer.getInt
val maxWait = buffer.getInt
val minBytes = buffer.getInt
val topicCount = buffer.getInt
val pairs = (1 to topicCount).flatMap(_ => {
val topic = readShortString(buffer)
val partitionCount = buffer.getInt
(1 to partitionCount).map(_ => {
val partitionId = buffer.getInt
val offset = buffer.getLong
val fetchSize = buffer.getInt
(TopicAndPartition(topic, partitionId), PartitionFetchInfo(offset, fetchSize))
})
})
FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, Map(pairs:_*))
}
}
case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
correlationId: Int = FetchRequest.DefaultCorrelationId,
clientId: String = ConsumerConfig.DefaultClientId,
replicaId: Int = Request.OrdinaryConsumerId,
maxWait: Int = FetchRequest.DefaultMaxWait,
minBytes: Int = FetchRequest.DefaultMinBytes,
requestInfo: Map[TopicAndPartition, PartitionFetchInfo])
extends RequestOrResponse(Some(RequestKeys.FetchKey)) {
/**
* Partitions the request info into a map of maps (one for each topic).
*/
lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
/**
* Public constructor for the clients
*/
def this(correlationId: Int,
clientId: String,
maxWait: Int,
minBytes: Int,
requestInfo: Map[TopicAndPartition, PartitionFetchInfo]) {
this(versionId = FetchRequest.CurrentVersion,
correlationId = correlationId,
clientId = clientId,
replicaId = Request.OrdinaryConsumerId,
maxWait = maxWait,
minBytes= minBytes,
requestInfo = requestInfo)
}
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
buffer.putInt(correlationId)
writeShortString(buffer, clientId)
buffer.putInt(replicaId)
buffer.putInt(maxWait)
buffer.putInt(minBytes)
buffer.putInt(requestInfoGroupedByTopic.size) // topic count
requestInfoGroupedByTopic.foreach {
case (topic, partitionFetchInfos) =>
writeShortString(buffer, topic)
buffer.putInt(partitionFetchInfos.size) // partition count
partitionFetchInfos.foreach {
case (TopicAndPartition(_, partition), PartitionFetchInfo(offset, fetchSize)) =>
buffer.putInt(partition)
buffer.putLong(offset)
buffer.putInt(fetchSize)
}
}
}
def sizeInBytes: Int = {
2 + /* versionId */
4 + /* correlationId */
shortStringLength(clientId) +
4 + /* replicaId */
4 + /* maxWait */
4 + /* minBytes */
4 + /* topic count */
requestInfoGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
val (topic, partitionFetchInfos) = currTopic
foldedTopics +
shortStringLength(topic) +
4 + /* partition count */
partitionFetchInfos.size * (
4 + /* partition id */
8 + /* offset */
4 /* fetch size */
)
})
}
def isFromFollower = Request.isValidBrokerId(replicaId)
def isFromOrdinaryConsumer = replicaId == Request.OrdinaryConsumerId
def isFromLowLevelConsumer = replicaId == Request.DebuggingConsumerId
def numPartitions = requestInfo.size
override def toString(): String = {
describe(true)
}
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
val fetchResponsePartitionData = requestInfo.map {
case (topicAndPartition, data) =>
(topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, MessageSet.Empty))
}
val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData)
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, errorResponse)))
}
override def describe(details: Boolean): String = {
val fetchRequest = new StringBuilder
fetchRequest.append("Name: " + this.getClass.getSimpleName)
fetchRequest.append("; Version: " + versionId)
fetchRequest.append("; CorrelationId: " + correlationId)
fetchRequest.append("; ClientId: " + clientId)
fetchRequest.append("; ReplicaId: " + replicaId)
fetchRequest.append("; MaxWait: " + maxWait + " ms")
fetchRequest.append("; MinBytes: " + minBytes + " bytes")
if(details)
fetchRequest.append("; RequestInfo: " + requestInfo.mkString(","))
fetchRequest.toString()
}
}
@nonthreadsafe
class FetchRequestBuilder() {
private val correlationId = new AtomicInteger(0)
private var versionId = FetchRequest.CurrentVersion
private var clientId = ConsumerConfig.DefaultClientId
private var replicaId = Request.OrdinaryConsumerId
private var maxWait = FetchRequest.DefaultMaxWait
private var minBytes = FetchRequest.DefaultMinBytes
private val requestMap = new collection.mutable.HashMap[TopicAndPartition, PartitionFetchInfo]
def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = {
requestMap.put(TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize))
this
}
def clientId(clientId: String): FetchRequestBuilder = {
this.clientId = clientId
this
}
/**
* Only for internal use. Clients shouldn't set replicaId.
*/
private[kafka] def replicaId(replicaId: Int): FetchRequestBuilder = {
this.replicaId = replicaId
this
}
def maxWait(maxWait: Int): FetchRequestBuilder = {
this.maxWait = maxWait
this
}
def minBytes(minBytes: Int): FetchRequestBuilder = {
this.minBytes = minBytes
this
}
def requestVersion(versionId: Short): FetchRequestBuilder = {
this.versionId = versionId
this
}
def build() = {
val fetchRequest = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap)
requestMap.clear()
fetchRequest
}
}
网友评论