美文网首页
实战场景 Flink读取kafka数据,处理以后写入到Elast

实战场景 Flink读取kafka数据,处理以后写入到Elast

作者: __元昊__ | 来源:发表于2019-05-24 10:01 被阅读0次

    添加pom:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.9_2.11</artifactId>
            <version>1.6.1</version>
        </dependency>
    

    代码:

    import java.util.Properties
    
    import org.apache.flink.streaming.api._
    import org.apache.flink.streaming.connectors.kafka._
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.scala._
    
    object Flink_kafka {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        // 非常关键,一定要设置启动检查点!!
        env.enableCheckpointing(5000)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        //配置kafka信息
        val props = new Properties()
        props.setProperty("bootstrap.servers", "172.24.112.13:9092,172.24.112.14:9092,172.24.112.15:9092")
        props.setProperty("zookeeper.connect", "172.24.112.13:2181,172.24.112.14:2181,172.24.112.15:2181")
        props.setProperty("group.id", "test")
        //读取数据
        val consumer = new FlinkKafkaConsumer09[String]("test_kafka", new SimpleStringSchema(), props)
        //设置只读取最新数据
        consumer.setStartFromLatest()
        //添加kafka为数据源
        val stream = env.addSource(consumer)
    
        stream.print()
    
        env.execute("Kafka_Flink")
      }
    }
    

    启动程序进行测试:
    启动kafka的生产者,往test_kafka的topic里写数据

    kafka-console-producer --broker-list 172.24.112.13:9092 --topic test_kafka
    

    随便写点数据


    微信截图_20190524100553.png

    发现Flink程序段已经接收到kafka的数据


    微信截图_20190524100645.png

    Flink如何slink到ElasticSearch

    引入pom:

    <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
          <version>1.6.1</version>
        </dependency>
    

    可以看到flink和es依赖关系如下:


    QQ截图20190525170902.png

    代码:

    import java.util.{Date, Properties}
    import com.alibaba.fastjson.JSON
    import org.apache.flink.streaming.connectors.kafka._
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.api.common.functions.RuntimeContext
    import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
    import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
    import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
    import org.apache.http.HttpHost
    import org.elasticsearch.action.index.IndexRequest
    import org.elasticsearch.client.Requests
    
    
    object Flink_kafka {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        // 非常关键,一定要设置启动检查点!!
        env.enableCheckpointing(5000)
    
        //配置kafka信息
        val props = new Properties()
        props.setProperty("bootstrap.servers", "192.168.199.128:9092,192.168.199.131:9092,192.168.199.132:9092")
        props.setProperty("zookeeper.connect", "192.168.199.128:2181,192.168.199.131:2181,192.168.199.132:2181")
        props.setProperty("group.id", "test")
        //读取数据
        val consumer = new FlinkKafkaConsumer08[String]("log", new SimpleStringSchema(), props)
        //设置只读取最新数据
        consumer.setStartFromLatest()
        //添加kafka为数据源
        //18542360152   116.410588, 39.880172   2019-05-24 23:43:38
        val stream = env.addSource(consumer).map(
          x=>{
            JSON.parseObject(x)
          }
        ).map(x=>{
          x.getString("message")
        }).map(x=>{
          val jingwei=x.split("\\t")(1)
          val wei=jingwei.split(",")(0).trim
          val jing=jingwei.split(",")(1).trim
          val time=new Date().getTime
          val resultStr=wei+","+jing+","+time
          resultStr
        })
    
        stream.print()
    
        val httpHosts = new java.util.ArrayList[HttpHost]
        httpHosts.add(new HttpHost("192.168.199.128", 9200, "http"))
    
        val esSinkBuilder = new ElasticsearchSink.Builder[String](
          httpHosts,
          new ElasticsearchSinkFunction[String]{
            def createIndexRequest(element: String):IndexRequest={
              val json = new java.util.HashMap[String, String]
              json.put("wei", element.split(",")(0))
              json.put("jing", element.split(",")(1))
              json.put("time", element.split(",")(2))
    
              return Requests.indexRequest()
                .index("location-index")
                  .`type`("location")
                .source(json)
            }
    
            override def process(element: String, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
              requestIndexer.add(createIndexRequest(element))
            }
          }
        )
    
        //批量请求的配置;这将指示接收器在每个元素之后发出请求,否则将对它们进行缓冲。
        esSinkBuilder.setBulkFlushMaxActions(1)
    
        stream.addSink(esSinkBuilder.build())
        
        env.execute("Kafka_Flink")
      }
    }
    

    结果成功


    QQ截图20190525171625.png

    更多细节,参数配置等参考官方文档:

    相关文章

      网友评论

          本文标题:实战场景 Flink读取kafka数据,处理以后写入到Elast

          本文链接:https://www.haomeiwen.com/subject/dqiwzqtx.html