Flink写入数据到ElasticSearch

作者: it_zzy | 来源:发表于2019-02-11 23:25 被阅读2次

    Flink写入数据到ElasticSearch

    前言

    我们知道flink自带了很多连接器Connector,,今天我们就用Elasticsearch Connector作为sink将数据写入到Elasticsearch(以下简称es)。
    Elasticsearch Connector

    es安装略,可以参考网上文章或者我之前写过的文章

    添加依赖

    可以看到flink和es依赖关系如下:

    Maven依赖 支持自 Elasticsearch版本
    flink-connector-elasticsearch_2.11 1.0.0 1.x
    flink-connector-elasticsearch2_2.11 1.0.0 2.x
    flink-connector-elasticsearch5_2.11 1.3.0 5.x
    flink-connector-elasticsearch6_2.11 1.6.0 6 and later versions

    Elasticsearch5.x

    因为之前用过的es是5.6.X,首先加入的maven依赖是5.x-flink-connector-elasticsearch5_2.11

    <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-elasticsearch5_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
    

    但是运行写入es的程序报如下错误:

    Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.lang.RuntimeException: Elasticsearch client is not connected to any Elasticsearch nodes!
        at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
        at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
        at com.hfjy.bigdata.flink.sink.FlinkSinkToES.main(FlinkSinkToES.java:81)
    Caused by: java.lang.RuntimeException: Elasticsearch client is not connected to any Elasticsearch nodes!
        at org.apache.flink.streaming.connectors.elasticsearch5.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:81)
        at org.apache.flink.streaming.connectors.elasticsearch5.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:48)
        at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:296)
        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
        at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
    

    对于上面的报错检查了es启动是否正常,检查了代码里host是否正确,也debug了代码,但是还未找到具体的原因,代码如下:

    public class FlinkSinkToES {
        private static final Logger log = LoggerFactory.getLogger(FlinkSinkToES.class);
    
        private static final String READ_TOPIC = "student";
    
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("zookeeper.connect", "localhost:2181");
            props.put("group.id", "student-group-1");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("auto.offset.reset", "latest");
    
    //        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            DataStreamSource<String> student = env.addSource(new FlinkKafkaConsumer011<>(
                    //这个 kafka topic 需要和上面的工具类的 topic 一致
                    READ_TOPIC,
                    new SimpleStringSchema(),
                    props)).setParallelism(1);
    //                .map(string -> JSON.parseObject(string, Student.class)); //Fastjson 解析字符串成 student 对象
            Map<String, String> config = new HashMap<>();
            config.put("cluster.name", "elasticsearch");
    // This instructs the sink to emit after every element, otherwise they would be buffered
            config.put("bulk.flush.max.actions", "1");
    //        config.put("auth_user","elastic");
    //        config.put("auth_password","changeme");
    
            List<InetSocketAddress> transportAddresses = new ArrayList<>();
            transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
    //        transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
    
            student.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
                public IndexRequest createIndexRequest(String element) {
                    Map<String, String> json = new HashMap<>();
                    json.put("data", element);
                    log.info("data:" + element);
                    return Requests.indexRequest()
                            .index("my-index-student-0211")
                            .type("my-type")
                            .source(json);
                }
    
                @Override
                public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                    indexer.add(createIndexRequest(element));
                }
            }));
    
            env.execute("flink learning connectors kafka");
        }
    }
    
    

    Elasticsearch6.x

    写入es5.x报上面的错,但是还未找到具体的原因,所以决定换做es6

    添加flink-es6的maven依赖

    <!--es6-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
    

    安装es6之后启动报了如下错误:

    ➜  elasticsearch-6.6.0 ./bin/elasticsearch
    [2019-02-11T14:53:02,255][WARN ][o.e.b.ElasticsearchUncaughtExceptionHandler] [unknown] uncaught exception in thread [main]
    org.elasticsearch.bootstrap.StartupException: java.lang.IllegalStateException: failed to obtain node locks, tried [[/data/es-6/data]] with lock id [0]; maybe these locations are not writable or multiple nodes were started without increasing [node.max_local_storage_nodes] (was [1])?
        at org.elasticsearch.bootstrap.Elasticsearch.init(Elasticsearch.java:163) ~[elasticsearch-6.6.0.jar:6.6.0]
        at org.elasticsearch.bootstrap.Elasticsearch.execute(Elasticsearch.java:150) ~[elasticsearch-6.6.0.jar:6.6.0]
        at org.elasticsearch.cli.EnvironmentAwareCommand.execute(EnvironmentAwareCommand.java:86) ~[elasticsearch-6.6.0.jar:6.6.0]
        at org.elasticsearch.cli.Command.mainWithoutErrorHandling(Command.java:124) ~[elasticsearch-cli-6.6.0.jar:6.6.0]
        at org.elasticsearch.cli.Command.main(Command.java:90) ~[elasticsearch-cli-6.6.0.jar:6.6.0]
        at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:116) ~[elasticsearch-6.6.0.jar:6.6.0]
        at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:93) ~[elasticsearch-6.6.0.jar:6.6.0]
    Caused by: java.lang.IllegalStateException: failed to obtain node locks, tried [[/data/es-6/data]] with lock id [0]; maybe these locations are not writable or multiple nodes were started without increasing [node.max_local_storage_nodes] (was [1])?
        at org.elasticsearch.env.NodeEnvironment.<init>(NodeEnvironment.java:297) ~[elasticsearch-6.6.0.jar:6.6.0]
        at org.elasticsearch.node.Node.<init>(Node.java:295) ~[elasticsearch-6.6.0.jar:6.6.0]
        at org.elasticsearch.node.Node.<init>(Node.java:265) ~[elasticsearch-6.6.0.jar:6.6.0]
        at org.elasticsearch.bootstrap.Bootstrap$5.<init>(Bootstrap.java:212) ~[elasticsearch-6.6.0.jar:6.6.0]
        at org.elasticsearch.bootstrap.Bootstrap.setup(Bootstrap.java:212) ~[elasticsearch-6.6.0.jar:6.6.0]
        at org.elasticsearch.bootstrap.Bootstrap.init(Bootstrap.java:333) ~[elasticsearch-6.6.0.jar:6.6.0]
        at org.elasticsearch.bootstrap.Elasticsearch.init(Elasticsearch.java:159) ~[elasticsearch-6.6.0.jar:6.6.0]
        ... 6 more
    

    查了下应该是已经启动了es,如果再次启动es会报上面这个错误,找到es的进程kill掉,重新启动
    参考: [https://blog.csdn.net/qq_38977441/article/details/80406126]

    ➜  elasticsearch-6.6.0 ps -ef | grep elastic
      501 94853 93152   0  2:37PM ttys011    0:35.04 /Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home//bin/java -Xms1g -Xmx1g -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -Des.networkaddress.cache.ttl=60 -Des.networkaddress.cache.negative.ttl=10 -XX:+AlwaysPreTouch -Xss1m -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Djna.nosys=true -XX:-OmitStackTraceInFastThrow -Dio.netty.noUnsafe=true -Dio.netty.noKeySetOptimization=true -Dio.netty.recycler.maxCapacityPerThread=0 -Dlog4j.shutdownHookEnabled=false -Dlog4j2.disable.jmx=true -Djava.io.tmpdir=/var/folders/t6/v40m6tfx1x1b1_2lg2078tf80000gn/T/elasticsearch-465341793791663807 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=data -XX:ErrorFile=logs/hs_err_pid%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -XX:+PrintGCApplicationStoppedTime -Xloggc:logs/gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=32 -XX:GCLogFileSize=64m -Des.path.home=/Users/zzy/Documents/zzy/software/elasticsearch-6.6.0 -Des.path.conf=/Users/zzy/Documents/zzy/software/elasticsearch-6.6.0/config -Des.distribution.flavor=default -Des.distribution.type=tar -cp /Users/zzy/Documents/zzy/software/elasticsearch-6.6.0/lib/* org.elasticsearch.bootstrap.Elasticsearch
      501 94869 94853   0  2:37PM ttys011    0:00.03 /Users/zzy/Documents/zzy/software/elasticsearch-6.6.0/modules/x-pack-ml/platform/darwin-x86_64/bin/controller
      501 95090 93152   0  2:55PM ttys011    0:00.00 grep --color=auto elastic
    

    安装kibana,启动报如下错误:

    ➜  kibana-6.6.0-linux-x86_64 ./bin/kibana
    ./bin/kibana: line 24: /Users/zzy/Documents/zzy/software/kibana-6.6.0-linux-x86_64/bin/../node/bin/node: cannot execute binary file
    ./bin/kibana: line 24: /Users/zzy/Documents/zzy/software/kibana-6.6.0-linux-x86_64/bin/../node/bin/node: Undefined error: 0
    

    可能是版本问题,安装的linux的,换做mac后,再次启动kibana,启动日志如下:

    ➜  kibana-6.6.0-darwin-x86_64 ./bin/kibana
      log   [08:02:11.812] [warning][plugin] Skipping non-plugin directory at /Users/zzy/Documents/zzy/software/kibana-6.6.0-darwin-x86_64/src/legacy/core_plugins/ems_util
      log   [08:02:12.877] [info][status][plugin:kibana@6.6.0] Status changed from uninitialized to green - Ready
      log   [08:02:12.910] [info][status][plugin:elasticsearch@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
      log   [08:02:12.912] [info][status][plugin:xpack_main@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
      log   [08:02:12.916] [info][status][plugin:graph@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
      log   [08:02:12.931] [info][status][plugin:monitoring@6.6.0] Status changed from uninitialized to green - Ready
      log   [08:02:12.934] [info][status][plugin:spaces@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
      log   [08:02:12.940] [warning][security] Generating a random key for xpack.security.encryptionKey. To prevent sessions from being invalidated on restart, please set xpack.security.encryptionKey in kibana.yml
      log   [08:02:12.944] [warning][security] Session cookies will be transmitted over insecure connections. This is not recommended.
      log   [08:02:12.949] [info][status][plugin:security@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
      log   [08:02:12.966] [info][status][plugin:searchprofiler@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
      log   [08:02:12.968] [info][status][plugin:ml@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
      log   [08:02:12.997] [info][status][plugin:tilemap@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
      log   [08:02:12.999] [info][status][plugin:watcher@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
      log   [08:02:13.007] [info][status][plugin:grokdebugger@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
      log   [08:02:13.011] [info][status][plugin:dashboard_mode@6.6.0] Status changed from uninitialized to green - Ready
      log   [08:02:13.018] [info][status][plugin:logstash@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
      log   [08:02:13.022] [info][status][plugin:beats_management@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
      log   [08:02:13.038] [info][status][plugin:apm@6.6.0] Status changed from uninitialized to green - Ready
      log   [08:02:13.173] [info][status][plugin:interpreter@6.6.0] Status changed from uninitialized to green - Ready
      log   [08:02:13.180] [info][status][plugin:canvas@6.6.0] Status changed from uninitialized to green - Ready
      log   [08:02:13.184] [info][status][plugin:license_management@6.6.0] Status changed from uninitialized to green - Ready
      log   [08:02:13.188] [info][status][plugin:index_management@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
      log   [08:02:13.207] [info][status][plugin:console@6.6.0] Status changed from uninitialized to green - Ready
      log   [08:02:13.210] [info][status][plugin:console_extensions@6.6.0] Status changed from uninitialized to green - Ready
      log   [08:02:13.213] [info][status][plugin:notifications@6.6.0] Status changed from uninitialized to green - Ready
      log   [08:02:13.215] [info][status][plugin:index_lifecycle_management@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
      log   [08:02:13.252] [info][status][plugin:infra@6.6.0] Status changed from uninitialized to green - Ready
      log   [08:02:13.254] [info][status][plugin:rollup@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
      log   [08:02:13.263] [info][status][plugin:remote_clusters@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
      log   [08:02:13.268] [info][status][plugin:cross_cluster_replication@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
      log   [08:02:13.274] [info][status][plugin:upgrade_assistant@6.6.0] Status changed from uninitialized to green - Ready
      log   [08:02:13.281] [info][status][plugin:metrics@6.6.0] Status changed from uninitialized to green - Ready
      log   [08:02:13.444] [info][status][plugin:timelion@6.6.0] Status changed from uninitialized to green - Ready
      log   [08:02:14.218] [warning][reporting] Generating a random key for xpack.reporting.encryptionKey. To prevent pending reports from failing on restart, please set xpack.reporting.encryptionKey in kibana.yml
      log   [08:02:14.223] [info][status][plugin:reporting@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
      log   [08:02:14.228] [info][status][plugin:elasticsearch@6.6.0] Status changed from yellow to green - Ready
      log   [08:02:14.352] [info][license][xpack] Imported license information from Elasticsearch for the [data] cluster: mode: basic | status: active
      log   [08:02:14.355] [info][status][plugin:xpack_main@6.6.0] Status changed from yellow to green - Ready
      log   [08:02:14.356] [info][status][plugin:graph@6.6.0] Status changed from yellow to green - Ready
      log   [08:02:14.357] [info][status][plugin:searchprofiler@6.6.0] Status changed from yellow to green - Ready
      log   [08:02:14.357] [info][status][plugin:ml@6.6.0] Status changed from yellow to green - Ready
      log   [08:02:14.357] [info][status][plugin:tilemap@6.6.0] Status changed from yellow to green - Ready
      log   [08:02:14.358] [info][status][plugin:watcher@6.6.0] Status changed from yellow to green - Ready
      log   [08:02:14.358] [info][status][plugin:grokdebugger@6.6.0] Status changed from yellow to green - Ready
      log   [08:02:14.358] [info][status][plugin:logstash@6.6.0] Status changed from yellow to green - Ready
      log   [08:02:14.359] [info][status][plugin:beats_management@6.6.0] Status changed from yellow to green - Ready
      log   [08:02:14.359] [info][status][plugin:index_management@6.6.0] Status changed from yellow to green - Ready
      log   [08:02:14.359] [info][status][plugin:index_lifecycle_management@6.6.0] Status changed from yellow to green - Ready
      log   [08:02:14.360] [info][status][plugin:rollup@6.6.0] Status changed from yellow to green - Ready
      log   [08:02:14.360] [info][status][plugin:remote_clusters@6.6.0] Status changed from yellow to green - Ready
      log   [08:02:14.360] [info][status][plugin:cross_cluster_replication@6.6.0] Status changed from yellow to green - Ready
      log   [08:02:14.360] [info][status][plugin:reporting@6.6.0] Status changed from yellow to green - Ready
      log   [08:02:14.361] [info][kibana-monitoring][monitoring-ui] Starting monitoring stats collection
      log   [08:02:14.369] [info][status][plugin:security@6.6.0] Status changed from yellow to green - Ready
      log   [08:02:14.431] [info][license][xpack] Imported license information from Elasticsearch for the [monitoring] cluster: mode: basic | status: active
      log   [08:02:15.707] [info][migrations] Creating index .kibana_1.
      log   [08:02:16.113] [info][migrations] Pointing alias .kibana to .kibana_1.
      log   [08:02:16.158] [info][migrations] Finished in 451ms.
      log   [08:02:16.159] [info][listening] Server running at http://127.0.0.1:5602
      log   [08:02:16.361] [info][status][plugin:spaces@6.6.0] Status changed from yellow to green - Ready
    

    查看索引信息

    GET _cat/indices?v
    
    health status index     uuid                   pri rep docs.count docs.deleted store.size pri.store.size
    green  open   .kibana_1 8nwhLfRcTmWOeNp5UTQGOQ   1   0          3            0     11.7kb         11.7kb
    

    执行flink写es6的程序后,开始是没有索引index-student的数据的,需要加上如下代码:

    esSinkBuilder.setRestClientFactory(new RestClientFactoryImpl());
            esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler());
    

    -- RetryRejectedExecutionFailureHandler 来自flink的包package org.apache.flink.streaming.connectors.elasticsearch.util;

    注意和官网的区别

    builder.setRestClientFactory(
      restClientBuilder -> {
        restClientBuilder.setDefaultHeaders(...)
        restClientBuilder.setMaxRetryTimeoutMillis(...)
        restClientBuilder.setPathPrefix(...)
        restClientBuilder.setHttpClientConfigCallback(...)
      }
    );
    

    Elasticsearch Connector

    public class RestClientFactoryImpl implements RestClientFactory {
        @Override
        public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
            Header[] headers = new BasicHeader[]{new BasicHeader("Content-Type","application/json")};
            restClientBuilder.setDefaultHeaders(headers); //以数组的形式可以添加多个header
            restClientBuilder.setMaxRetryTimeoutMillis(90000);
        }
    }
    

    再次通过kibana查看索引

    GET _cat/indices?v
    
    health status index         uuid                   pri rep docs.count docs.deleted store.size pri.store.size
    yellow open   index-student VUUIiS2fQX2p5-JyzxEa7A   5   1        300            0    152.4kb        152.4kb
    green  open   .kibana_1     8nwhLfRcTmWOeNp5UTQGOQ   1   0          3            0     11.9kb         11.9kb
    

    看到是有index-student这个索引的,说明flink写入es成功,查看索引数据

    GET /index-student/_search?pretty
    
    {
      "took" : 6,
      "timed_out" : false,
      "_shards" : {
        "total" : 5,
        "successful" : 5,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : 300,
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "index-student",
            "_type" : "student",
            "_id" : "3cy_22gBPPMO6TTdKysy",
            "_score" : 1.0,
            "_source" : {
              "data" : """{"age":123,"id":105,"name":"itzzy105","password":"password105"}"""
            }
          },
          {
            "_index" : "index-student",
            "_type" : "student",
            "_id" : "3sy_22gBPPMO6TTdKytx",
            "_score" : 1.0,
            "_source" : {
              "data" : """{"age":130,"id":112,"name":"itzzy112","password":"password112"}"""
            }
          },
          {
            "_index" : "index-student",
            "_type" : "student",
            "_id" : "5sy_22gBPPMO6TTdKyt7",
            "_score" : 1.0,
            "_source" : {
              "data" : """{"age":138,"id":120,"name":"itzzy120","password":"password120"}"""
            }
          },
          {
            "_index" : "index-student",
            "_type" : "student",
            "_id" : "7cy_22gBPPMO6TTdKyuA",
            "_score" : 1.0,
            "_source" : {
              "data" : """{"age":142,"id":124,"name":"itzzy124","password":"password124"}"""
            }
          },
          {
            "_index" : "index-student",
            "_type" : "student",
            "_id" : "9sy_22gBPPMO6TTdKyuK",
            "_score" : 1.0,
            "_source" : {
              "data" : """{"age":151,"id":133,"name":"itzzy133","password":"password133"}"""
            }
          },
          {
            "_index" : "index-student",
            "_type" : "student",
            "_id" : "-My_22gBPPMO6TTdKyuK",
            "_score" : 1.0,
            "_source" : {
              "data" : """{"age":156,"id":138,"name":"itzzy138","password":"password138"}"""
            }
          },
          {
            "_index" : "index-student",
            "_type" : "student",
            "_id" : "-8y_22gBPPMO6TTdKyuP",
            "_score" : 1.0,
            "_source" : {
              "data" : """{"age":155,"id":137,"name":"itzzy137","password":"password137"}"""
            }
          },
          {
            "_index" : "index-student",
            "_type" : "student",
            "_id" : "_sy_22gBPPMO6TTdKyuS",
            "_score" : 1.0,
            "_source" : {
              "data" : """{"age":162,"id":144,"name":"itzzy144","password":"password144"}"""
            }
          },
          {
            "_index" : "index-student",
            "_type" : "student",
            "_id" : "AMy_22gBPPMO6TTdKyyS",
            "_score" : 1.0,
            "_source" : {
              "data" : """{"age":159,"id":141,"name":"itzzy141","password":"password141"}"""
            }
          },
          {
            "_index" : "index-student",
            "_type" : "student",
            "_id" : "Acy_22gBPPMO6TTdKyyU",
            "_score" : 1.0,
            "_source" : {
              "data" : """{"age":160,"id":142,"name":"itzzy142","password":"password142"}"""
            }
          }
        ]
      }
    }
    

    附上flink写es的代码

    public class FlinkSinkToES6 {
    
        private static final Logger log = LoggerFactory.getLogger(FlinkSinkToES6.class);
    
        private static final String READ_TOPIC = "student-1";
    
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("zookeeper.connect", "localhost:2181");
            props.put("group.id", "student-group-1");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("auto.offset.reset", "latest");
    
            DataStreamSource<String> student = env.addSource(new FlinkKafkaConsumer011<>(
                    //这个 kafka topic 需要和上面的工具类的 topic 一致
                    READ_TOPIC,
                    new SimpleStringSchema(),
                    props)).setParallelism(1);
    //                .map(string -> JSON.parseObject(string, Student.class)); //Fastjson 解析字符串成 student 对象
            student.print();
            log.info("student:" + student);
            List<HttpHost> esHttphost = new ArrayList<>();
            esHttphost.add(new HttpHost("127.0.0.1", 9200, "http"));
    
            ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
                    esHttphost,
                    new ElasticsearchSinkFunction<String>() {
    
                        public IndexRequest createIndexRequest(String element) {
                            Map<String, String> json = new HashMap<>();
                            json.put("data", element);
                            log.info("data:" + element);
    
                            return Requests.indexRequest()
                                    .index("index-student")
                                    .type("student")
                                    .source(json);
                        }
    
                        @Override
                        public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                            indexer.add(createIndexRequest(element));
                        }
                    }
            );
    
            esSinkBuilder.setBulkFlushMaxActions(1);
    //        esSinkBuilder.setRestClientFactory(
    //                restClientBuilder -> {
    //                    restClientBuilder.setDefaultHeaders()
    //                }
    //        );
            esSinkBuilder.setRestClientFactory(new RestClientFactoryImpl());
            esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler());
    
            student.addSink(esSinkBuilder.build());
            env.execute("flink learning connectors kafka");
        }
    }
    

    kafka生产者代码

    public class KafkaUtils {
        private static final String broker_list = "localhost:9092";
        private static final String topic = "student-1";  //kafka topic 需要和 flink 程序用同一个 topic
    
        public static void writeToKafka() throws InterruptedException {
            Properties props = new Properties();
            props.put("bootstrap.servers", broker_list);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    //        KafkaProducer producer = new KafkaProducer<String, String>(props);//老版本producer已废弃
            Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
    
            try {
                for (int i = 1; i <= 100; i++) {
                    Student student = new Student(i, "itzzy" + i, "password" + i, 18 + i);
                    ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(student));
                    producer.send(record);
                    System.out.println("发送数据: " + JSON.toJSONString(student));
                }
                Thread.sleep(3000);
            }catch (Exception e){
    
            }
    
            producer.flush();
        }
    
        public static void main(String[] args) throws InterruptedException {
            writeToKafka();
        }
    }
    
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class Student {
    
        private int id;
        private String name;
        private String password;
        private int age;
    
    }
    

    参考:

    相关文章

      网友评论

        本文标题:Flink写入数据到ElasticSearch

        本文链接:https://www.haomeiwen.com/subject/mhzqeqtx.html