def main(args: Array[String]): Unit = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "origin-mobile-flow-event")
//设置Kafka消费记录的应用程序设置所需的isolation.level,默认为read_uncommitted
properties.setProperty("isolation.level", "read_committed")
properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name)
properties.setProperty(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512")
properties.setProperty(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", "testacl", "testacl"))
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setMaxParallelism(if (env.getParallelism <= 128) 128 else env.getParallelism + env.getParallelism / 2)
env.setParallelism(10)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
env.enableCheckpointing(1000)
// env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
// 4,Time.of(10,TimeUnit.SECONDS)))
env.setRestartStrategy(RestartStrategies failureRateRestart(
1000, // max failures per unit
Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
Time.of(10, TimeUnit.SECONDS) // delay
))
env.getConfig.enableSysoutLogging()
var consumer: SourceFunction[String] = null
//flink consumer from kafka
val consumer011: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String]("test-ssa-event", new SimpleStringSchema(), properties)
consumer011.setStartFromGroupOffsets()
consumer = consumer011.asInstanceOf[SourceFunction[String]]
val stream = env.addSource(consumer)
/* 配置kafka sink */
var properties1 = new Properties()
properties1.setProperty("bootstrap.servers", "localhost:9092")
properties1.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
properties1.setProperty("transaction.timeout.ms", "60000")
properties1.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name)
properties1.setProperty(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512")
properties1.setProperty(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", "testacl", "testacl"))
var producer: SinkFunction[String] = null
producer = new FlinkKafkaProducer011(
"test-uuu-event",
new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), properties1, Optional.empty(), FlinkKafkaProducer011.Semantic.EXACTLY_ONCE, FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE
// serialization schema
)
producer.asInstanceOf[FlinkKafkaProducer011[String]]
stream.addSink(producer)
env.execute("对量 server test")
}
如果启动出现如下报错,
Kafka broker端配置的“transaction.max.timeout”默认为15分钟。client端生产者的这个属性将
不允许大于服务端设置事务超时时间。
默认情况下,"FlinkKafkaProducer011"设置"transaction.timeout"默认为
1小时.在使用EXACTLY_ONCE语义的时候,应该将服务端broker的"transaction.max.timeout"增大
org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by max.transaction.timeout.ms).
at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:723)
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:648)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:455)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:447)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:206)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
at java.lang.Thread.run(Thread.java:748)
Semantic.EXACTLY_ONCE 模式为每个FlinkKafkaProducer011实例使用固定大小的KafkaProducers池。每个检查点使用其中一个生产者。如果并发检查点的数量超过池大小,FlinkKafkaProducer011 将引发异常并将使整个应用程序失败。请相应地配置最大池大小和最大并发检查点数
2019-09-25 17:34:21,424 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 73be1e1168f91ee2
2019-09-25 17:34:21,424 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011 - Starting
//默认是producer线程池是5
FlinkKafkaProducer (5/10) to produce into default topic test-uuu-event
2019-09-25 17:34:21,424 INFO org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId Source: Custom Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-21] ProducerId set to -1 with epoch -1
2019-09-25 17:34:21,665 INFO org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId Source: Custom Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-21] ProducerId set to 5038 with epoch 375
2019-09-25 17:34:22,019 INFO org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - FlinkKafkaProducer011 4/10 - checkpoint 1871 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=Source: Custom Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-24, producerId=5036, epoch=375], transactionStartTime=1569404059556} from checkpoint 1871
2019-09-25 17:34:22,021 INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2019-09-25 17:34:17,980 INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = true
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = SCRAM-SHA-512
security.protocol = SASL_PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
//transactional.id不用设置,默认会被覆盖.自己设置的不会生效
transactional.id = Source: Custom Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-22
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
2019-09-25 17:34:17,980 INFO org.apache.kafka.clients.producer.KafkaProducer - Instantiated a transactional producer.
2019-09-25 17:34:17,980 INFO org.apache.kafka.clients.producer.KafkaProducer - Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled.
2019-09-25 17:34:17,980 INFO org.apache.kafka.clients.producer.KafkaProducer - Overriding the default max.in.flight.requests.per.connection to 1 since idempontence is enabled.
2019-09-25 17:34:17,980 INFO org.apache.kafka.clients.producer.KafkaProducer - Overriding the default acks to all since idempotence is enabled.
2019-09-25 17:34:17,981 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.11.0.2
2019-09-25 17:34:17,981 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 73be1e1168f91ee2
2019-09-25 17:34:17,981 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011 - Starting FlinkKafkaProducer (5/10) to produce into default topic test-uuu-event
2019-09-25 17:34:17,981 INFO org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId Source: Custom Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-22] ProducerId set to -1 with epoch -1
2019-09-25 17:34:18,223 INFO org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId Source: Custom Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-22] ProducerId set to 3048 with epoch 374
2019-09-25 17:34:18,306 INFO org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - FlinkKafkaProducer011 4/10 - checkpoint 1869 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=Source: Custom Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-23, producerId=4043, epoch=374], transactionStartTime=1569404055579} from checkpoint 1869
2019-09-25 17:34:18,308 INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2019-09-25 17:34:19,309 INFO org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer - Flushing new partitions
网友评论