<meta charset="utf-8">
85_熟练掌握ES Java API_基于bulk实现多4S店销售数据批量上传
业务场景:有一个汽车销售公司,拥有很多家4S店,这些4S店的数据,都会在一段时间内陆续传递过来,汽车的销售数据,现在希望能够在内存中缓存比如1000条销售数据,然后一次性批量上传到es中去
image.png添加数据:有两条重复数据
PUT /car_shop/sales/1
{
"brand": "宝马",
"name": "宝马320",
"price": 320000,
"produce_date": "2017-01-01",
"sale_price": 300000,
"sale_date": "2017-01-21"
}
PUT /car_shop/sales/2
{
"brand": "宝马",
"name": "宝马320",
"price": 320000,
"produce_date": "2017-01-01",
"sale_price": 300000,
"sale_date": "2017-01-21"
}
java中利用api实现批量上传:
BulkRequestBuilder bulkRequest = client.prepareBulk();
bulkRequest.add(client.prepareIndex("car_shop", "sales", "3")
.setSource(jsonBuilder()
.startObject()
.field("brand", "奔驰")
.field("name", "奔驰C200")
.field("price", 350000)
.field("produce_date", "2017-01-05")
.field("sale_price", 340000)
.field("sale_date", "2017-02-03")
.endObject()
)
);
bulkRequest.add(client.prepareUpdate("car_shop", "sales", "1")
.setDoc(jsonBuilder()
.startObject()
.field("sale_price", "290000")
.endObject()
)
);
bulkRequest.add(client.prepareDelete("car_shop", "sales", "2"));
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {}
源码如下:
BulkUploadSalesDataApp
public class BulkUploadSalesDataApp {
@SuppressWarnings({ "resource", "unchecked" })
public static void main(String[] args) throws Exception {
Settings settings = Settings.builder()
.put("cluster.name", "elasticsearch")
.build();
TransportClient client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
IndexRequestBuilder indexRequestBuilder = client.prepareIndex("car_shop", "sales", "3")
.setSource(XContentFactory.jsonBuilder()
.startObject()
.field("brand", "奔驰")
.field("name", "奔驰C200")
.field("price", 350000)
.field("produce_date", "2017-01-20")
.field("sale_price", 320000)
.field("sale_date", "2017-01-25")
.endObject());
bulkRequestBuilder.add(indexRequestBuilder);
UpdateRequestBuilder updateRequestBuilder = client.prepareUpdate("car_shop", "sales", "1")
.setDoc(XContentFactory.jsonBuilder()
.startObject()
.field("sale_price", 290000)
.endObject());
bulkRequestBuilder.add(updateRequestBuilder);
DeleteRequestBuilder deleteReqeustBuilder = client.prepareDelete("car_shop", "sales", "2");
bulkRequestBuilder.add(deleteReqeustBuilder);
BulkResponse bulkResponse = bulkRequestBuilder.get();
for(BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
System.out.println("version: " + bulkItemResponse.getVersion());
}
client.close();
}
}
网友评论