美文网首页
elasticsearch spark

elasticsearch spark

作者: hehehehe | 来源:发表于2022-05-17 15:50 被阅读0次

https://www.elastic.co/guide/en/elasticsearch/hadoop/7.17/install.html

<dependency> 
 <groupId>org.elasticsearch</groupId>  
<artifactId>elasticsearch-spark-30_2.12</artifactId> 
<version>7.17.3</version>  
</dependency>

'''
es_url: ip地址
es_port:端口
es_user:用户名
es_pass:密码
'''

spark=SparkSession.builder\
            .config("spark.es.nodes",es_url)\
            .config("spark.es.port",es_port)\
            .config("es.net.http.auth.user",es_user)\
            .config("es.net.http.auth.pass",es_pass)\
            .config("es.mapping.id","id")\
            .config("es.nodes.wan.only","true")\
            .config("es.write.operation","upsert").getOrCreate()

'''
cancer_example:索引名(相当于库名)
_doc:类型(相当于表)
'''
# 读es
df=spark.read.format('org.elasticsearch.spark.sql').option('es.resource', 'cancer_example/_doc')
# 写es
df.write.format('org.elasticsearch.spark.sql').option('es.resource', 'cancer_example/_doc').mode("Append").save()



spark=SparkSession.builder.getOrCreate()

# 读es
df=spark.read.format('org.elasticsearch.spark.sql')\
            .option("spark.es.nodes",es_url)\
            .option("spark.es.port",es_port)\
            .option("es.net.http.auth.user",es_user)\
            .option("es.net.http.auth.pass",es_pass)\
            .option("es.mapping.id","id")\
            .option("es.nodes.wan.only","true")\
            .option("es.write.operation","upsert")\
            .option('es.resource', 'cancer_example/_doc')
# 写es
df.write.format('org.elasticsearch.spark.sql')\
            .option("spark.es.nodes",es_url)\
            .option("spark.es.port",es_port)\
            .option("es.net.http.auth.user",es_user)\
            .option("es.net.http.auth.pass",es_pass)\
            .option("es.mapping.id","id")\
            .option("es.nodes.wan.only","true")\
            .option("es.write.operation","upsert")\
            .option('es.resource', 'cancer_example/_doc').mode("Append").save()

spark = SparkSession \
    .builder \
    .appName('es connection') \
    .config('spark.jars.packages', "org.elasticsearch_elasticsearch-spark-20_2.11-6.8.7") \
    .getOrCreate()

df3 = spark.read \
    .format("org.elasticsearch.spark.sql") \
    .option("es.nodes", '节点') \
    .option('es.port', '端口') \
    .option("es.resource", '索引/索引类型') \
    .option('es.query', '?q=*') \
    .option('es.nodes.wan.only','true') \
    .option("es.nodes.discovery", "false") \
    .option("es.index.auto.create", "true") \
    .option("es.write.ignore_exception", "true") \
    .option("es.read.ignore_exception","true") \
    .load()

 df.write.format('org.elasticsearch.spark.sql') \
        .option('es.nodes', 'localhost') \
        .option('es.port', '9200') \
        .option('es.nodes.wan.only', 'true') \
        .option("es.nodes.discovery", "false") \
        .option('es.resource', 'test') \
        .save(mode='append')

相关文章

网友评论

      本文标题:elasticsearch spark

      本文链接:https://www.haomeiwen.com/subject/ptjnurtx.html