send(消息发送)
提供以下三种消息发送的形式:
- sned
- send async
- send timeout
close
关闭一个producer,提供以下两种方式:
- close
- close async
intercept
拦截器接口主要提供以下三个方法:
-
close
关闭interceptor,主要用于执行一些资源清理工作 -
before send
用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算 -
on send ack
该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率
compression
- NONE
- LZ4
- ZLIB
- ZSTD
scheme
- java string hash
- murmur3_32hash
producer name
给生产者指定一个名字,如果没有手动指定,系统将会为该producer生成一个全局唯一的名字。
注意:在指定名称时,用户需要确保对于给定的topic,生产者名称在所有Pulsar的集群中是唯一的。 broker将强制执行只有一个给定名称的producer可以在topic上进行publish。
crypto
crypto key reader
- get public key
- get private key
add encryption key
添加producer用来加密数据密钥的public key.
当producer创建时,Pulsar客户端会检查是否有密钥添加到encryptionKeys中。 如果找到要添加的key,则针对每个key调用回调函数getKey(String keyName)来获取key的值。 应用程序应实现这个回调并返回pkcs8格式的密钥。 如果启用了压缩功能,则压缩后会对消息进行加密。 如果启用了批处理消息传递,则批处理消息也将被加密。
crypto failure action
-
FAIL
如果加密操作失败,FAIL是失败时,发送的默认选项
-
SEND
忽略加密失败并继续发送未加密的消息
flush
- flush
- flush async
topic
指定该producer将要把消息publish到哪一个topic
other option
block if queue full
当输出(outgoing)队列已满时,是否停止相应的操作
max pending msg
设置包含待处理消息的队列的最大大小,以便从broker接收确认。
当队列已满时,默认的,所有的调用都会失败,除非blockIfQueueFull设置为true。可以使用blockIfQueueFull来改变这个行为。
max pending msg across partitions
设置所有分区中的最大挂起消息数,此设置将用于降低每个分区的最大挂起消息
initial sequenceId
为producer生产的消息设置最开始的sequenceID,如果没有额外指定,那么接下来生产的第一条消息的sequenceID就是initialSequenceId + 1,第二条消息的sequenceID依次递增。
clone
基于当前的producer,copy一个producer出来。比如我们需要创建多个producer,并且这些producer有一些相同的属性可以复用,那么我们就可以基于这些相同的属性进行copy,然后对copy后的producer在做定制化的配置
get last sequenceID
获取producer publish的最后一个sequenceID。
如果系统中有两个名字一样的producer(原则上,这是不被允许的),该函数将返回在上一个producer发布的最后一条消息的sequenceID,如果没有消息被publish,则返回-1。
msg routing mode
- single partition
- round robin partition
- custom partition
batch
- max messages
- max publish delay
- enable batch
是否启用batch的功能
property
- property
设置单个属性 - properties
设置多个属性,可以包装到map中
网友评论