Consumer 与 partition
其实topic中的partition被分配到某个consumer上,也就是某个consumer订阅了某个partition,而不是message。所以在同一时间点上,订阅到同一个partition的consumer必然属于不同的Consumer Group。
在官方网站上,给出了这样一张图:
一个kafka cluster中的某个topic,有4个partition。有两个consumer group (A and B)订阅了该topic。
Consumer Group A有2个partition:p0、p1,Consumer Group B有4个partition:c3,c4,c5,c6。
经过分区分配后,consumer与partition的订阅关系如下:
Topic 中的4个partition在Consumer Group A中的分配情况如下:
C1 订阅p0,p3
C2 订阅p1,p2
Topic 中的4个partition在Consumer Group B中的分配情况如下:
C3 订阅p0
C4 订阅p3
C5 订阅p1
C6 订阅p2
New Consumer Configs
NAME | DESCRIPTION | TYPE | DEFAULT | VALID VALUES | IMPORTANCE |
---|---|---|---|---|---|
bootstrap.servers | 用于建立到Kafka集群的初始连接的主机/端口对列表。客户端将使用所有服务器,而不管在这里指定哪些服务器用于引导 - 该列表仅影响用于发现全套服务器的初始主机。这个清单应该在表格中host1:port1,host2:port2,...。由于这些服务器仅用于初始连接以发现完整的群集成员资格(可能会动态更改),因此此列表不必包含整套服务器(但可能需要多个服务器,以防服务器关闭) 。 | list | high | ||
key.deserializer | 实现org.apache.kafka.common.serialization.Deserializer接口的密钥的反序列化器类。 | class | high | ||
value.deserializer | 用于实现org.apache.kafka.common.serialization.Deserializer接口的值的反序列化器类。 | class | high | ||
fetch.min.bytes | 服务器为获取请求返回的最小数据量。如果没有足够的数据可用,请求将等待那么多的数据在应答请求之前积累。1字节的默认设置意味着只要有一个字节的数据可用,或者提取请求超时等待数据到达,就会立即应答提取请求。将其设置为大于1的值将导致服务器等待大量的数据累积,这可以稍稍提高服务器吞吐量,但需要花费一些额外的延迟时间。 当consumer向一个broker发起fetch请求时,broker返回的records的大小最小值。如果broker中数据量不够的话会wait,直到数据大小满足这个条件。取值范围是:[0, Integer.Max],默认值是1。默认值设置为1的目的是:使得consumer的请求能够尽快的返回。 |
int | 1 | [0,...] | high |
group.id | 标识此消费者所属的消费者群组的唯一字符串。如果消费者通过使用subscribe(topic)基于卡夫卡的偏移量管理策略来使用组管理功能,则此属性是必需的 | string | "" | high | |
heartbeat.interval.ms | 心跳间隔。心跳是在consumer与coordinator之间进行的。心跳是确定consumer存活,加入或者退出group的有效手段。这个值必须设置的小于session.timeout.ms,因为:当Consumer由于某种原因不能发Heartbeat到coordinator时,并且时间超过session.timeout.ms时,就会认为该consumer已退出,它所订阅的partition会分配到同一group 内的其它的consumer上。通常设置的值要低于session.timeout.ms的1/3。 | int | 3000 | high | |
max.partition.fetch.bytes | 一次fetch请求,从一个partition中取得的records最大大小。如果在从topic中第一个非空的partition取消息时,如果取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片情况下,只会返回这一条record。 broker、topic都会对producer发给它的message size做限制。所以在配置这值时,可以参考broker的message.max.bytes 和 topic的max.message.bytes的配置。 | int | 1048576 | [0,...] | high |
session.timeout.ms | 使用Kafka的组管理设施时,用于检测消费者失败的超时。消费者定期发送心跳来向经纪人表明其活跃度。如果代理在该会话超时到期之前没有收到心跳,那么代理将从该组中删除该消费者并启动重新平衡。请注意,该值必须在允许的范围内 Consumer session 过期时间。这个值必须设置在broker configuration中的group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。 |
int | 10000 | high | |
ssl.key.password | 密钥存储文件中的私钥密码。这对于客户端是可选的。 | password | null | high | |
ssl.keystore.location | 密钥存储文件的位置。这对客户端是可选的,可以用于客户端的双向认证。 | string | null | high | |
ssl.keystore.password | 密钥存储文件的商店密码。这对客户端是可选的,只有在配置了ssl.keystore.location时才需要。 | password | null | high | |
ssl.truststore.location | 信任存储文件的位置。 | string | null | high | |
ssl.truststore.password | 信任存储文件的密码。如果密码未设置,信任库的访问仍然可用,但完整性检查被禁用。 | password | null | high | |
auto.offset.reset | 这个配置项,是告诉Kafka Broker在发现kafka在没有初始offset,或者当前的offset是一个不存在的值(如果一个record被删除,就肯定不存在了)时,该如何处理。它有4种处理方式: ● earliest: 自动将偏移量重置为最早的偏移量 ● latest:自动将偏移量重置为最新的偏移量 ● none: 如果边更早的offset也没有的话,就抛出异常给consumer,告诉consumer在整个consumer group中都没有发现有这样的offset。 ● anything else: 如果不是上述3种,只抛出异常给consumer。 |
string | latest | [latest, earliest, none] | medium |
connections.max.idle.ms | 连接空闲超时时间。因为consumer只与broker有连接(coordinator也是一个broker),所以这个配置的是consumer到broker之间的。 | long | 540000 | medium | |
enable.auto.commit | Consumer 在commit offset时有两种模式:自动提交,手动提交。手动提交在前面已经说过。自动提交:是Kafka Consumer会在后台周期性的去commit。默认值是true。 | boolean | true | medium | |
exclude.internal.topics | 内部主题(如偏移)的记录是否应该暴露给消费者。如果设置为true从内部主题接收记录的唯一方法是订阅它 | boolean | true | medium | |
fetch.max.bytes | 一次fetch请求,从一个broker中取得的records最大大小。如果在从topic中第一个非空的partition取消息时,如果取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片情况下,只会返回这一条record。 broker、topic都会对producer发给它的message size做限制。所以在配置这值时,可以参考broker的message.max.bytes 和 topic的max.message.bytes的配置。 | int | 52428800 | [0,...] | medium |
isolation.level | 控制如何阅读事务处理的消息。如果设置为read_committed,consumer.poll()将仅返回已提交的事务消息。如果设置为read_uncommitted'(默认),consumer.poll()将返回所有的消息,甚至是已经中止的事务消息。非交易消息将在任一模式下无条件返回。 消息将始终以偏移顺序返回。因此,在 read_committed模式下,consumer.poll()将只返回到最后一个稳定偏移量(LSO)的消息,这比第一个打开事务的偏移量小。特别是在属于正在进行的交易的消息之后出现的任何消息将被扣留,直到相关的交易完成。因此,read_committed消费者在飞行交易中将无法读取高水印。 而且,当进入时 read_committed the seekToEnd method will return the LSO |
string | read_uncommitted | [read_committed, read_uncommitted] | medium |
max.poll.interval.ms | 在使用消费者组管理时,调用poll()之间的最大延迟。这提出了消费者在获取更多记录之前可以闲置的时间量的上界。如果在此超时到期之前未调用poll(),则认为使用者失败,并且组将重新平衡以将分区重新分配给其他成员。 | int | 300000 | [1,...] | medium |
max.poll.records | 在一次调用poll()中返回的最大记录数。 | int | 500 | [1,...] | medium |
partition.assignment.strategy | 当使用组管理时,客户端将用于在客户实例之间分配分区所有权的分区分配策略的类名 | list | class org.apache.kafka.clients.consumer.RangeAssignor | medium | |
receive.buffer.bytes | 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。如果值为-1,则将使用操作系统默认值。 | int | 65536 | [-1,...] | medium |
request.timeout.ms | 配置控制客户端等待请求响应的最长时间。如果在超时过去之前未收到响应,则客户端将在必要时重新发送请求,或者如果重试耗尽,则请求失败。 | int | 305000 | [0,...] | medium |
sasl.jaas.config | 用于JAAS配置文件使用的格式的SASL连接的JAAS登录上下文参数。 这里描述JAAS配置文件格式。 值的格式是:'(=)*;' | password | null | medium | |
sasl.kerberos.service.name | Kafka运行的Kerberos主体名称。这可以在Kafka的JAAS配置或Kafka的配置中定义。 | string | null | medium | |
sasl.mechanism | 用于客户端连接的SASL机制。这可能是安全提供者可用的任何机制。GSSAPI是默认的机制。 | string | GSSAPI | medium | |
security.protocol | 用于与经纪人沟通的协议。有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。 | string | PLAINTEXT | medium | |
send.buffer.bytes | 发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小。如果值为-1,则将使用操作系统默认值。 | int | 131072 | [-1,...] | medium |
ssl.enabled.protocols | 启用了SSL连接的协议列表。 | list | TLSv1.2,TLSv1.1,TLSv1 | medium | |
ssl.keystore.type | 密钥存储文件的文件格式。这对于客户端是可选的。 | string | JKS | medium | |
ssl.protocol | 用于生成SSLContext的SSL协议。默认设置是TLS,在大多数情况下这是很好的。最近的JVM中允许的值是TLS,TLSv1.1和TLSv1.2。较旧的JVM中可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不鼓励使用SSL。 | string | TLS | medium | |
ssl.provider | 用于SSL连接的安全提供程序的名称。默认值是JVM的默认安全提供程序。 | string | null | medium | |
ssl.truststore.type | 信任存储文件的文件格式。 | string | JKS | medium | |
auto.commit.interval.ms | 自动提交间隔。范围:[0,Integer.MAX],默认值是 5000 (5 s) 消费者偏移的频率以毫秒为单位自动提交给Kafka,如果enable.auto.commit设置为true。 |
int | 5000 | [0,...] | low |
check.crcs | 自动检查消耗的记录的CRC32。这可以确保没有在线或磁盘损坏的消息发生。这个检查会增加一些开销,所以在寻求极高性能的情况下可能会被禁用。 | boolean | true | low | |
client.id | 发出请求时传递给服务器的id字符串。这样做的目的是通过允许在服务器端请求日志中包含一个逻辑应用程序的名字来跟踪请求的来源,而不仅仅是ip / port。 | string | "" | low | |
fetch.max.wait.ms | Fetch请求发给broker后,在broker中可能会被阻塞的(当topic中records的总size小于fetch.min.bytes时),此时这个fetch请求耗时就会比较长。这个配置就是来配置consumer最多等待response多久。 | int | 500 | [0,...] | low |
interceptor.classes | 用作拦截器的类的列表。实现org.apache.kafka.clients.consumer.ConsumerInterceptor接口允许你拦截(也可能是变异)消费者收到的记录。默认情况下,没有拦截器。 | list | null | low | |
metadata.max.age.ms | 以毫秒为单位的时间段之后,即使我们没有看到任何分区领导变化,以主动发现任何新的代理或分区,我们强制更新元数据。 | long | 300000 | [0,...] | low |
metric.reporters | 用作度量记录的类的列表。实现org.apache.kafka.common.metrics.MetricsReporter接口允许插入将被通知新度量创建的类。JmxReporter始终包含在注册JMX统计信息中。 | list | "" | low | |
metrics.num.samples | 维持用于计算度量的样本数量。 | int | 2 | [1,...] | low |
metrics.recording.level | 指标的最高记录级别。 | string | INFO | [INFO, DEBUG] | low |
metrics.sample.window.ms | 计算指标样本的时间窗口。 | long | 30000 | [0,...] | low |
reconnect.backoff.max.ms | 重新连接到重复连接失败的代理程序时要等待的最长时间(以毫秒为单位)。如果提供的话,每个主机的退避将以指数方式增加,对于每个连续的连接失败,达到这个最大值。计算后退增加后,增加20%随机抖动以避免连接风暴。 | long | 1000 | [0,...] | low |
reconnect.backoff.ms | 尝试重新连接到给定主机之前等待的基本时间。这避免了在一个紧密的循环中重复连接到主机。该退避适用于客户端向经纪人的所有连接尝试。 | long | 50 | [0,...] | low |
retry.backoff.ms | 尝试重试对给定主题分区的失败请求之前等待的时间量。这样可以避免在某些故障情况下重复发送请求。 | long | 100 | [0,...] | low |
sasl.kerberos.kinit.cmd | Kerberos kinit命令路径。 | string | /usr/bin/kinit | low | |
sasl.kerberos.min.time.before.relogin | 登录线程在刷新尝试之间的休眠时间 | long | 60000 | low | |
sasl.kerberos.ticket.renew.jitter | 随机抖动增加到更新时间的百分比。 | double | 0.05 | low | |
sasl.kerberos.ticket.renew.window.factor | 登录线程将休眠,直到已经到达从上次刷新到票证到期的指定窗口时间因子,届时它将尝试更新票证。 | double | 0.8 | low | |
ssl.cipher.suites | 密码套件列表。这是用于使用TLS或SSL网络协议来协商网络连接的安全设置的认证,加密,MAC和密钥交换算法的命名组合。默认情况下,所有可用的密码套件都受支持。 | list | null | low | |
ssl.endpoint.identification.algorithm | 使用服务器证书验证服务器主机名的端点识别算法。 | string | null | low | |
ssl.keymanager.algorithm | 密钥管理器工厂用于SSL连接的算法。默认值是为Java虚拟机配置的密钥管理器工厂算法。 | string | SunX509 | low | |
ssl.secure.random.implementation | 用于SSL加密操作的SecureRandom PRNG实现。 | string | null | low | |
ssl.trustmanager.algorithm | 信任管理器工厂用于SSL连接的算法。默认值是为Java虚拟机配置的信任管理器工厂算法。 | string | PKIX | low |
网友评论