https://www.elastic.co/guide/en/elasticsearch/hadoop/7.17/install.html
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-30_2.12</artifactId>
<version>7.17.3</version>
</dependency>
'''
es_url: ip地址
es_port:端口
es_user:用户名
es_pass:密码
'''
spark=SparkSession.builder\
.config("spark.es.nodes",es_url)\
.config("spark.es.port",es_port)\
.config("es.net.http.auth.user",es_user)\
.config("es.net.http.auth.pass",es_pass)\
.config("es.mapping.id","id")\
.config("es.nodes.wan.only","true")\
.config("es.write.operation","upsert").getOrCreate()
'''
cancer_example:索引名(相当于库名)
_doc:类型(相当于表)
'''
# 读es
df=spark.read.format('org.elasticsearch.spark.sql').option('es.resource', 'cancer_example/_doc')
# 写es
df.write.format('org.elasticsearch.spark.sql').option('es.resource', 'cancer_example/_doc').mode("Append").save()
spark=SparkSession.builder.getOrCreate()
# 读es
df=spark.read.format('org.elasticsearch.spark.sql')\
.option("spark.es.nodes",es_url)\
.option("spark.es.port",es_port)\
.option("es.net.http.auth.user",es_user)\
.option("es.net.http.auth.pass",es_pass)\
.option("es.mapping.id","id")\
.option("es.nodes.wan.only","true")\
.option("es.write.operation","upsert")\
.option('es.resource', 'cancer_example/_doc')
# 写es
df.write.format('org.elasticsearch.spark.sql')\
.option("spark.es.nodes",es_url)\
.option("spark.es.port",es_port)\
.option("es.net.http.auth.user",es_user)\
.option("es.net.http.auth.pass",es_pass)\
.option("es.mapping.id","id")\
.option("es.nodes.wan.only","true")\
.option("es.write.operation","upsert")\
.option('es.resource', 'cancer_example/_doc').mode("Append").save()
spark = SparkSession \
.builder \
.appName('es connection') \
.config('spark.jars.packages', "org.elasticsearch_elasticsearch-spark-20_2.11-6.8.7") \
.getOrCreate()
df3 = spark.read \
.format("org.elasticsearch.spark.sql") \
.option("es.nodes", '节点') \
.option('es.port', '端口') \
.option("es.resource", '索引/索引类型') \
.option('es.query', '?q=*') \
.option('es.nodes.wan.only','true') \
.option("es.nodes.discovery", "false") \
.option("es.index.auto.create", "true") \
.option("es.write.ignore_exception", "true") \
.option("es.read.ignore_exception","true") \
.load()
df.write.format('org.elasticsearch.spark.sql') \
.option('es.nodes', 'localhost') \
.option('es.port', '9200') \
.option('es.nodes.wan.only', 'true') \
.option("es.nodes.discovery", "false") \
.option('es.resource', 'test') \
.save(mode='append')
网友评论