美文网首页Kafka
Apache Flink 结合 Kafka 构建端到端的 Exa

Apache Flink 结合 Kafka 构建端到端的 Exa

作者: 专职掏大粪 | 来源:发表于2019-12-23 16:45 被阅读0次
      def main(args: Array[String]): Unit = {
        val properties = new Properties()
    
        properties.setProperty("bootstrap.servers", "localhost:9092")
        properties.setProperty("group.id", "origin-mobile-flow-event")
    //设置Kafka消费记录的应用程序设置所需的isolation.level,默认为read_uncommitted
        properties.setProperty("isolation.level", "read_committed")
        properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name)
        properties.setProperty(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512")
        properties.setProperty(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", "testacl", "testacl"))
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setMaxParallelism(if (env.getParallelism <= 128) 128 else env.getParallelism + env.getParallelism / 2)
        env.setParallelism(10)
    
        env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
        env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
    
        env.enableCheckpointing(1000)
        //    env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
        //      4,Time.of(10,TimeUnit.SECONDS)))
        env.setRestartStrategy(RestartStrategies failureRateRestart(
          1000, // max failures per unit
          Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
          Time.of(10, TimeUnit.SECONDS) // delay
        ))
    
        env.getConfig.enableSysoutLogging()
    
        var consumer: SourceFunction[String] = null
        //flink consumer from kafka
        val consumer011: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String]("test-ssa-event", new SimpleStringSchema(), properties)
        consumer011.setStartFromGroupOffsets()
    
        consumer = consumer011.asInstanceOf[SourceFunction[String]]
    
        val stream = env.addSource(consumer)
    
        /* 配置kafka sink */
        var properties1 = new Properties()
        properties1.setProperty("bootstrap.servers", "localhost:9092")
        properties1.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
        properties1.setProperty("transaction.timeout.ms", "60000")
    
        properties1.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name)
        properties1.setProperty(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512")
        properties1.setProperty(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", "testacl", "testacl"))
    
        var producer: SinkFunction[String] = null
    
        producer = new FlinkKafkaProducer011(
          "test-uuu-event",
          new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), properties1, Optional.empty(), FlinkKafkaProducer011.Semantic.EXACTLY_ONCE, FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE
          // serialization schema
        )
    
        producer.asInstanceOf[FlinkKafkaProducer011[String]]
    
        stream.addSink(producer)
    
        env.execute("对量 server  test")
    
      }
    
    

    如果启动出现如下报错,

    Kafka broker端配置的“transaction.max.timeout”默认为15分钟。client端生产者的这个属性将
    不允许大于服务端设置事务超时时间。
    默认情况下,"FlinkKafkaProducer011"设置"transaction.timeout"默认为
    1小时.在使用EXACTLY_ONCE语义的时候,应该将服务端broker的"transaction.max.timeout"增大

    org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by max.transaction.timeout.ms).
        at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:723)
        at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:648)
        at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
        at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:455)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:447)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:206)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
        at java.lang.Thread.run(Thread.java:748)
    

    Semantic.EXACTLY_ONCE 模式为每个FlinkKafkaProducer011实例使用固定大小的KafkaProducers池。每个检查点使用其中一个生产者。如果并发检查点的数量超过池大小,FlinkKafkaProducer011 将引发异常并将使整个应用程序失败。请相应地配置最大池大小和最大并发检查点数

    2019-09-25 17:34:21,424 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka commitId : 73be1e1168f91ee2
    2019-09-25 17:34:21,424 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011  - Starting 
    //默认是producer线程池是5
    FlinkKafkaProducer (5/10) to produce into default topic test-uuu-event
    2019-09-25 17:34:21,424 INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId Source: Custom Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-21] ProducerId set to -1 with epoch -1
    2019-09-25 17:34:21,665 INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId Source: Custom Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-21] ProducerId set to 5038 with epoch 375
    2019-09-25 17:34:22,019 INFO  org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - FlinkKafkaProducer011 4/10 - checkpoint 1871 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=Source: Custom Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-24, producerId=5036, epoch=375], transactionStartTime=1569404059556} from checkpoint 1871
    2019-09-25 17:34:22,021 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
    
    2019-09-25 17:34:17,980 INFO  org.apache.kafka.clients.producer.ProducerConfig              - ProducerConfig values: 
        acks = 1
        batch.size = 16384
        bootstrap.servers = [localhost:9092]
        buffer.memory = 33554432
        client.id = 
        compression.type = none
        connections.max.idle.ms = 540000
        enable.idempotence = true
        interceptor.classes = null
        key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 0
        retry.backoff.ms = 100
        sasl.jaas.config = [hidden]
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = SCRAM-SHA-512
        security.protocol = SASL_PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
    //transactional.id不用设置,默认会被覆盖.自己设置的不会生效
        transactional.id = Source: Custom Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-22
        value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    
    2019-09-25 17:34:17,980 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Instantiated a transactional producer.
    2019-09-25 17:34:17,980 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled.
    2019-09-25 17:34:17,980 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Overriding the default max.in.flight.requests.per.connection to 1 since idempontence is enabled.
    2019-09-25 17:34:17,980 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Overriding the default acks to all since idempotence is enabled.
    2019-09-25 17:34:17,981 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka version : 0.11.0.2
    2019-09-25 17:34:17,981 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka commitId : 73be1e1168f91ee2
    2019-09-25 17:34:17,981 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011  - Starting FlinkKafkaProducer (5/10) to produce into default topic test-uuu-event
    2019-09-25 17:34:17,981 INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId Source: Custom Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-22] ProducerId set to -1 with epoch -1
    2019-09-25 17:34:18,223 INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId Source: Custom Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-22] ProducerId set to 3048 with epoch 374
    2019-09-25 17:34:18,306 INFO  org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - FlinkKafkaProducer011 4/10 - checkpoint 1869 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=Source: Custom Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-23, producerId=4043, epoch=374], transactionStartTime=1569404055579} from checkpoint 1869
    2019-09-25 17:34:18,308 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
    2019-09-25 17:34:19,309 INFO  org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer  - Flushing new partitions
    
    

    相关文章

      网友评论

        本文标题:Apache Flink 结合 Kafka 构建端到端的 Exa

        本文链接:https://www.haomeiwen.com/subject/qulwuctx.html