美文网首页flink
Flink之ElasticSearch应用

Flink之ElasticSearch应用

作者: 神奇的考拉 | 来源:发表于2019-03-22 18:30 被阅读0次

    准备

    一.docker安装es

    1.拉取镜像: 可以指定国内的镜像源 速度会快很多

    sudo docker pull docker.elastic.co/elasticsearch/elasticsearch
    

    2.查看已安装的es镜像

    sudo docker images
    

    本地镜像内容如下

    REPOSITORY                                     TAG                 IMAGE ID            CREATED             SIZE
    wurstmeister/kafka                             latest              eb5fa40d9f7f        9 days ago          420MB
    registry.docker-cn.com/library/redis           latest              0f88f9be5839        2 weeks ago         95MB
    wurstmeister/zookeeper                         latest              3f43f72cb283        2 months ago        510MB
    registry.docker-cn.com/library/elasticsearch   latest              5acf0e8da90b        6 months ago        486MB
    

    3.启动es镜像: 将es容器启动并在后台运行

    sudo docker run -d --name es -p 9200:9200 -p 9300:9300  \ 
            -e "discovery.type=single-node"  \ 
            registry.docker-cn.com/library/elasticsearch
    

    查看es容器是否启动成功

    sudo docker container ls
    

    输出如下内容:

    CONTAINER ID        IMAGE                                          COMMAND                  CREATED             STATUS              PORTS                                                NAMES
    6cd0610f00d7        registry.docker-cn.com/library/elasticsearch   "/docker-entrypoint.…"   3 hours ago         Up 3 hours          0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp       es
    2d5fb0099017        registry.docker-cn.com/library/redis           "docker-entrypoint.s…"   7 days ago          Up 2 days           0.0.0.0:6379->6379/tcp                               myredis
    779882fc6409        wurstmeister/kafka                             "start-kafka.sh"         8 days ago          Up 2 days           0.0.0.0:9092->9092/tcp                               kafka
    24a169067e20        wurstmeister/zookeeper                         "/bin/sh -c '/usr/sb…"   8 days ago          Up 2 days           22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   zookeeper
    

    4.配置文件elasticsearch.yml修改

    sudo docker exec -it es /bin/bash
    
    cd config
    

    修改内容

    sed  '1a http.cors.enabled: true\nhttp.cors.allow-origin: "*"'  elasticsearch.yml
    
    cat elasticsearch.yml
    

    退出容器: exit

    5.重启容器

    sudo docker restart es
    

    6.验证es容器是否正常
    浏览器输入: http://127.0.0.1:9200/_cat/indices?v
    查看es内容: http://127.0.0.1:9200/taxi_ride_idx/_search?q=*

    二.实现

    import com.jdd.streaming.demos.entity.TaxiRide;
    import com.jdd.streaming.demos.source.TaxiRideSource;
    import com.jdd.streaming.demos.utils.GeoUtils;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.functions.RuntimeContext;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.api.java.tuple.Tuple4;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
    import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
    import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
    import org.apache.flink.util.Collector;
    import org.apache.http.HttpHost;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.client.Requests;
    
    import java.util.*;
    
    /**
     * @Auther: dalan
     * @Date: 19-3-22 15:36
     * @Description:
     */
    public class SimpleESStream {
        // main
        public static void main(String[] args) throws Exception {
             final ParameterTool params = ParameterTool.fromArgs(args);
             String data = params.get("data","/home/wmm/go_bench/flink_sources/nycTaxiRides.gz");
             // 数据模拟参数
             int maxServingDelay = 60;
             int servingSpeedFactor = 600;
    
             // 窗口参数
             int countWindowLength = 15;  // 窗口大小
             int countWindowFrequency = 5; // 每5min计算一次
             int earlyCountThreshold = 50;
    
             // es相关参数
             boolean writeToElasticsearch = true;
             String elasticsearchHost = "";
             int elasticsearchPort = 9300;
    
             // 创建Environment
             final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
             env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            // 读取Source
            DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(data, maxServingDelay, servingSpeedFactor));
    
            // filter
            DataStream<TaxiRide> cleansedRides = rides
                    .filter(r->!r.isStart)
                    .filter(r -> GeoUtils.isInNYC(r.startLon, r.startLat));
    
            DataStream<Tuple2<Integer, Short>> cellIds = cleansedRides
                    .map(
                        new MapFunction<TaxiRide, Tuple2<Integer, Short>>() {
                            @Override
                            public Tuple2<Integer, Short> map(TaxiRide r) throws Exception {
                                return new Tuple2<Integer, Short>(GeoUtils.mapToGridCell(r.startLon, r.startLat), r.passengerCnt);
                            }
                        }
                    );
    
           // cellIds.print();
    
            DataStream<Tuple3<Integer, Long, Integer>> passengerCnts = cellIds
                    .keyBy(0)  // 分组key
                    .timeWindow(Time.minutes(countWindowLength), Time.minutes(countWindowFrequency))
                    .apply(
                            new WindowFunction<Tuple2<Integer, Short>, Tuple3<Integer, Long, Integer>, Tuple, TimeWindow>() {
                                @Override
                                public void apply(Tuple cell, TimeWindow window, Iterable<Tuple2<Integer, Short>> events, Collector<Tuple3<Integer, Long, Integer>> out) throws Exception {
                                    Integer count = 0;
                                    Iterator<Tuple2<Integer, Short>> iter = events.iterator();
    
                                    while (iter.hasNext()) {
                                        Tuple2<Integer, Short> t = iter.next();
                                        count += t.f1.intValue();
                                    }
                                    out.collect(new Tuple3<Integer, Long, Integer>(cell.getField(0), window.getEnd(), count));
                                }
                            }
    
                    );
    
            DataStream<Tuple4<Integer, Long, Tuple2<Double,Double>, Integer>> cntByLocaltion = passengerCnts
                    .map(
                            new MapFunction<Tuple3<Integer, Long, Integer>, Tuple4<Integer, Long, Tuple2<Double,Double>, Integer>>() {
                                @Override
                                public Tuple4<Integer, Long, Tuple2<Double,Double>, Integer> map(Tuple3<Integer, Long, Integer> r) throws Exception {
                                    return (new Tuple4<Integer, Long, Tuple2<Double,Double>, Integer>(r.f0, r.f1, getGridCellCenter(r.f0), r.f2));
                                }
                            }
                    );
    
            // cntByLocaltion.print();
            if(writeToElasticsearch){
                List<HttpHost> httpHosts = new ArrayList<>();
                httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
    
                Map<String, String> config = new HashMap<>();
                config.put("bulk.flush.max.actions", "1");
    
                // elasticsearch sink
                ElasticsearchSink.Builder<Tuple4<Integer, Long, Tuple2<Double,Double>, Integer>> builder = new ElasticsearchSink.Builder<Tuple4<Integer, Long, Tuple2<Double,Double>, Integer>>(
                        httpHosts,
                        new ElasticsearchSinkFunction<Tuple4<Integer, Long, Tuple2<Double,Double>, Integer>>() {
                            public IndexRequest createIndexRequest(Tuple4<Integer, Long, Tuple2<Double,Double>, Integer> element) { // 创建es对应的index
                                Map<String, Object> json = new HashMap<>();
                                json.put("localtion", (element.f2.f0+","+ element.f2.f1));
                                json.put("time", element.f1);
                                json.put("cnt", element.f3);
    
                                return Requests.indexRequest()
                                        .index("taxi_ride_idx")
                                        .type("test_type")
                                        .source(json);
                            }
    
                            @Override
                            public void process(Tuple4<Integer, Long, Tuple2<Double,Double>, Integer> element, RuntimeContext ctx, RequestIndexer indexer) { // 处理index
                                indexer.add(createIndexRequest(element));
                            }
                        });
    
                builder.setBulkFlushMaxActions(1);
                cntByLocaltion.addSink(builder.build());
            }
    
            env.execute("a simple es stream demo ");
        }
    
        // 辅助参数
        public static double LonEast = -73.7;
        public static double LonWest = -74.05;
        public static double LatNorth = 41.0;
        public static double LatSouth = 40.5;
    
        public static double LonWidth = 74.05 - 73.7;
        public static double LatHeight = 41.0 - 40.5;
    
        public static double DeltaLon = 0.0014;
        public static double DeltaLat = 0.00125;
    
        public static int CellCntX = 250;
        public static double CellCntY = 400;
        
       // 辅助方法: 用于一个中心点转为坐标经纬度
        private static Tuple2<Double, Double> getGridCellCenter(int gridCellId){
            int xIndex  = gridCellId % CellCntX;
            double lon = (Math.abs(LonWest) - (xIndex * DeltaLon) - (DeltaLon / 2)) * -1.0f;
    
            int yIndex = (gridCellId - xIndex) / CellCntX;
            double lat = (LatNorth - (yIndex * DeltaLat) - (DeltaLat / 2));
    
            return  new Tuple2<Double, Double>(lon, lat);
        }
    }
    

    3.源码

    github提供完整实例

    相关文章

      网友评论

        本文标题:Flink之ElasticSearch应用

        本文链接:https://www.haomeiwen.com/subject/yegrvqtx.html