一、使用spark写数据到Elasticsearch中
- 连接spark,配置es(前提是maven中所需的基本Spark环境都配置好了,否则还需要添加spark-core的依赖等)
注:如果是在联网的环境下,在项目的配置文件pom.xml文件中添加依赖,这里我安装的Elasticsearch版本为7.6.2,这里对应自己Elasticsearch的版本,相关依赖的版本可以在官网上查看。
<dependency>
<groupId>org.apche.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apche.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apche.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>7.6.2</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>16.0.1</version>
</dependency>
如果是在脱机状态下,则还需要自己先根据自己Spark版本和Elasticsearch版本下载好对应的包,然后放入到C:\用户\用户名.m2\repository\中对应文件夹下,然后强制更新maven项目。
import org.apache.spark.sql.SparkSession;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import scala.Tuple2;
import java.util.Map
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaPairRDD;
// es.node默认是localhost,如果你的es集群不在本地,把ip换成你集群中master节点所在机器的ip地址
//es.port是设置es集群中master节点的端口,如果没有进行更改,就是9200,不知道可以去服务器上安装路径下查看config/elasticsearch.yml文件中http.port参数的值
SparkSession session=SparkSession.builder().master("local[*]")
.appName("yourappName")
.config("es.node","ip")
.config("es.port","9200")
.config("es.resource","index/type")
.config("es.index.auto.create",true)
// .config("es.mapping.id","id")
.config("es.nodes.wan.only", true)
.getOrCreate();
session.sparkContext().setLogLevel("WARN");
JavaSparkContext jsc=new JavaSparkContext(session.sparkContext());
其他参数配置可查看官方文档
- JavaEsSpark中有两个写入方法:saveToEs和saveJsonToEs
- JavaEsSpark.saveToEs(要写入的数据是Map格式)
- JavaEsSpark.saveJsonToEs(要写入的数据是Json格式,这里使用字符串拼接的方式,其实可以用例如阿里的fastjson包的JSON对象,方便,出错概率也少)
- 使用saveJsonToEs写入数据到ES
1.上传的样例数据 待上传数据.png 2.spark代码,这里也可以引入com.alibaba.fastjson.JSONObject
包,将需要写入的数据转换成json格式,避免字符串拼接时出现出错。
JavaRDD<String> readRDD=jsc.textFile("file:///E:/data/user");
JavaRDD<String> writeRDD=readRDD.map(new Function<String, String>() {
/**
* 将文本数据写入到ES索引中
*/
private static final long serialVersionUID = 1L;
public String call(String v1) throws Exception {
// TODO Auto-generated method stub
String[] info=v1.split("\t",-1);
String fournum;
fournum="{\"name\":\""+info[0]+"\",\"sex\":\""+info[1]+"\",\"age\":\""+info[2]+"\",\"num\":\""+info[3]+"\"}";
return fournum;
}
});
JavaEsSpark.saveJsonToEs(writeRDD,"test2/four");
- 使用saveToEs将本地文本文件写入到Elasticsearch中
//先读取文本数据到JavaRDD中,然后将String转换成Map格式,id自动生成
JavaRDD<String> readRDD=jsc.textFile("file:///E:/data/user");
JavaRDD<Map<String, Object>> writeRDD=readRDD.map(new Function<String, Map<String,Object>>() {
public Map<String, Object> call(String v1) throws Exception {
String[] info=v1.split(" ");
Map<String,Object> users=ImmutableMap.of("name",info[0],"sex",info[1],"age",info[2],"num",info[3]);
return users;
}
});
//需要数据是Map格式
JavaEsSpark.saveToEs(writeRDD, "users/admin");
上传结果可以在安装的可视化工具中查看,也可用curl查看。这里在head插件里可以看到。
head插件中查看上传后的数据.png注:
1.这里的索引是自动创建的。在插入数据前先看es中是否存在同名的索引,如果有如果结构一样,就不需要再自动创建,如果存在的不需要,可以使用curl命令删除:
curl -XDELETE "http://localhost:9200/index"
。也可以在es中先根据需求创建好索引,然后直接插入数据。2.插入数据时自动生成(如果想指定id,在spark中配置
es.mapping.id
那一句注释删掉)3.分片数和备份数都是1,这个是可以随时修改更新(可使用官方给的kibana工具,官网上有资料,方便快捷)的。
另外还有很多问题后面深入研究,例如:如何在继续往该索引中插入数据时,保证同一内容的数据不重复生成id插入
二、查询Elasticsearch中的数据
- 在所有字段上查询符合条件的数据,例如前面已经导入的数据:test索引中user类的数据。
注:如果只是查询已经在Elasticsearch中的数据,spark创建连接时,可以不用配置:“es.resource”参数,把“es.index.auto.create”参数设置为false
//查询数据中包含25的记录,网页上的请求地址:http://masterIP:9200/users/admin/_search/?q="25"&pretty
// 如果要查询索引中的所有数据,可以不传入第三个参数:"?q=\"25\""
JavaPairRDD<String, Map<String, Object>> searchRDD=JavaEsSpark.esRDD(jsc,"users/admin","?q=\"25\"");
searchRDD.foreach(new VoidFunction<Tuple2<String,Map<String,Object>>>() {
/**
* searchRDD中的第一个string的值是每条记录的在Elasticsearch索引中的id值
* 使用http://masterIP:9200/users/admin/id值?pretty 也可查看数据
*/
private static final long serialVersionUID = 1L;
public void call(Tuple2<String, Map<String, Object>> t) throws Exception {
// TODO Auto-generated method stub
System.out.println(t._1+"---------------"+t._2);
}
});
通过http请求查询符合条件的数据.png
通过id值查看数据内容.png
网友评论