美文网首页
elasticsearch spark

elasticsearch spark

作者: hehehehe | 来源:发表于2022-05-17 15:50 被阅读0次

    https://www.elastic.co/guide/en/elasticsearch/hadoop/7.17/install.html

    <dependency> 
     <groupId>org.elasticsearch</groupId>  
    <artifactId>elasticsearch-spark-30_2.12</artifactId> 
    <version>7.17.3</version>  
    </dependency>
    
    
    '''
    es_url: ip地址
    es_port:端口
    es_user:用户名
    es_pass:密码
    '''
    
    spark=SparkSession.builder\
                .config("spark.es.nodes",es_url)\
                .config("spark.es.port",es_port)\
                .config("es.net.http.auth.user",es_user)\
                .config("es.net.http.auth.pass",es_pass)\
                .config("es.mapping.id","id")\
                .config("es.nodes.wan.only","true")\
                .config("es.write.operation","upsert").getOrCreate()
    
    '''
    cancer_example:索引名(相当于库名)
    _doc:类型(相当于表)
    '''
    # 读es
    df=spark.read.format('org.elasticsearch.spark.sql').option('es.resource', 'cancer_example/_doc')
    # 写es
    df.write.format('org.elasticsearch.spark.sql').option('es.resource', 'cancer_example/_doc').mode("Append").save()
    
    
    
    spark=SparkSession.builder.getOrCreate()
    
    # 读es
    df=spark.read.format('org.elasticsearch.spark.sql')\
                .option("spark.es.nodes",es_url)\
                .option("spark.es.port",es_port)\
                .option("es.net.http.auth.user",es_user)\
                .option("es.net.http.auth.pass",es_pass)\
                .option("es.mapping.id","id")\
                .option("es.nodes.wan.only","true")\
                .option("es.write.operation","upsert")\
                .option('es.resource', 'cancer_example/_doc')
    # 写es
    df.write.format('org.elasticsearch.spark.sql')\
                .option("spark.es.nodes",es_url)\
                .option("spark.es.port",es_port)\
                .option("es.net.http.auth.user",es_user)\
                .option("es.net.http.auth.pass",es_pass)\
                .option("es.mapping.id","id")\
                .option("es.nodes.wan.only","true")\
                .option("es.write.operation","upsert")\
                .option('es.resource', 'cancer_example/_doc').mode("Append").save()
    
    
    spark = SparkSession \
        .builder \
        .appName('es connection') \
        .config('spark.jars.packages', "org.elasticsearch_elasticsearch-spark-20_2.11-6.8.7") \
        .getOrCreate()
    
    df3 = spark.read \
        .format("org.elasticsearch.spark.sql") \
        .option("es.nodes", '节点') \
        .option('es.port', '端口') \
        .option("es.resource", '索引/索引类型') \
        .option('es.query', '?q=*') \
        .option('es.nodes.wan.only','true') \
        .option("es.nodes.discovery", "false") \
        .option("es.index.auto.create", "true") \
        .option("es.write.ignore_exception", "true") \
        .option("es.read.ignore_exception","true") \
        .load()
    
     df.write.format('org.elasticsearch.spark.sql') \
            .option('es.nodes', 'localhost') \
            .option('es.port', '9200') \
            .option('es.nodes.wan.only', 'true') \
            .option("es.nodes.discovery", "false") \
            .option('es.resource', 'test') \
            .save(mode='append')
    

    相关文章

      网友评论

          本文标题:elasticsearch spark

          本文链接:https://www.haomeiwen.com/subject/ptjnurtx.html